前言
CDC(Change Data Capture)即变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等,笔者将会在以下几个方面介绍它的使用。
- 相关依赖
- 使用fink cdc connector直连binlog,DataStream API实现
- 使用fink cdc connector直连binlog,SQL实现
- canal基本配置,采集数据到Kafka
- flink sql对接kafka数据进行实时计算
- 以changelog json的方式转换canal数据流
- 添加cdc connector所需依赖
<dependency>
<groupId>com.alibaba.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>${cdc.version}</version>
</dependency>
- DataStream API实现binlog采集和实时计算
package com.dpf.flink;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import io.debezium.data.Envelope.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import java.util.List;
import java.util.Properties;
public class BinlogStreamExample {
public static void main(String[] args) throws Exception {
Properties extralPro = new Properties();
extralPro.setProperty("AllowPublicKeyRetrieval", "true");
SourceFunction<String> sourceFunction = MySQLSource.<SourceRecord>builder()
.hostname("localhost")
.port(3306)
.databaseList("renren_fast") // monitor all tables under inventory database
.username("root")
.password("12345678")
.debeziumProperties(extralPro)
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().registerKryoType(SourceRecord.class);
env.setParallelism(1);
final DataStreamSource<String> source = env.addSource(sourceFunction);
source.print();
env.execute();
}
public static String extractBeforeData(Struct value, Schema schema) {
final Struct before = value.getStruct("before");
final List<Field> fields = before.schema().fields();
JSONObject jsonObject = new JSONObject();
for (Field field : fields) {
jsonObject.put(field.name(), before.get(field));
}
return jsonObject.toJSONString();
}
public static String extractAfterData(Struct value, Schema schema) {
final Struct after = value.getStruct("after");
final List<Field> fields = after.schema().fields();
JSONObject jsonObject = new JSONObject();
for (Field field : fields) {
jsonObject.put(field.name(), after.get(field));
}
return jsonObject.toJSONString();
}
public static class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema {
@Override
public TypeInformation getProducedType() {
return TypeInformation.of(String.class);
}
@Override
public void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {
final Operation op = Envelope.operationFor(sourceRecord);
final String source = sourceRecord.topic();
Struct value = (Struct) sourceRecord.value();
final Schema schema = sourceRecord.valueSchema();
if (op != Operation.CREATE && op != Operation.READ) {
if (op == Operation.DELETE) {
String data = extractBeforeData(value, schema);
String record = new JSONObject()
.fluentPut("source", source)
.fluentPut("data", data)
.fluentPut("op", RowKind.DELETE.shortString())
.toJSONString();
collector.collect(record);
} else {
String beforeData = extractBeforeData(value, schema);
String beforeRecord = new JSONObject()
.fluentPut("source", source)
.fluentPut("data", beforeData)
.fluentPut("op", RowKind.UPDATE_BEFORE.shortString())
.toJSONString();
collector.collect(beforeRecord);
String afterData = extractAfterData(value, schema);
String afterRecord = new JSONObject()
.fluentPut("source", source)
.fluentPut("data", afterData)
.fluentPut("op", RowKind.UPDATE_AFTER.shortString())
.toJSONString();
collector.collect(afterRecord);
}
} else {
String data = extractAfterData(value, schema);
String record = new JSONObject()
.fluentPut("source", source)
.fluentPut("data", data)
.fluentPut("op", RowKind.INSERT.shortString())
.toJSONString();
collector.collect(record);
}
}
}
}
该方法中默认的序列化类是StringDebeziumDeserializationSchema和RowDataDebeziumDeserializeSchema,显然不能满足我们的需求,笔者自定义了JsonDebeziumDeserializeSchema序列化类,将原始数据转化为了json格式,后续用户可以根据不同的表封装成相应的POJO类。用户在使用的过程中还需要将AllowPublicKeyRetrieval参数设置为true,这是mysql权限相关的配置。
- Flink SQL实现binlog数据采集和实时计算
package com.dpf.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class BinlogTableExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql("" +
" CREATE TABLE mysql_binlog ( " +
" user_id INT, " +
" username STRING, " +
" mobile STRING, " +
" password STRING, " +
" create_time STRING " +
" ) WITH ( " +
" 'connector' = 'mysql-cdc', " +
" 'hostname' = 'localhost', " +
" 'port' = '3306', " +
" 'username' = 'root', " +
" 'password' = '12345678', " +
" 'database-name' = 'renren_fast', " +
" 'table-name' = 'tb_user' " +
" )" +
"");
tableEnvironment.executeSql("" +
"CREATE TABLE kafka_binlog ( " +
" user_id INT, " +
" user_name STRING, " +
" mobile STRING, " +
" password STRING, " +
" create_time STRING, " +
" PRIMARY KEY (user_id) NOT ENFORCED" +
") WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = 'mysql_binlog', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")" +
"");
tableEnvironment.executeSql("insert into kafka_binlog select * from mysql_binlog");
}
}
此方法明显简洁很多,需要注意的是数据在转运到kafka的时候,kafka connector必须设置为upsert语意,原因是cdc connector返回的是回撤流,到kafka这块如果变成追加流的话,写入会报错:
Exception in thread "main" org.apache.flink.table.api.TableException: Table sink 'default_catalog.default_database.kafka_binlog' doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[default_catalog, default_database, mysql_binlog]], fields=[user_id, username, mobile, password, create_time])
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.createNewNode(FlinkChangelogModeInferenceProgram.scala:389)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.visit(FlinkChangelogModeInferenceProgram.scala:310)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor.org$apache$flink$table$planner$plan$optimize$program$FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$visitChild(FlinkChangelogModeInferenceProgram.scala:348)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:337)
at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyModifyKindSetTraitVisitor$$anonfun$3.apply(FlinkChangelogModeInferenceProgram.scala:336)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.Range.foreach(Range.scala:160)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
- canal安装后面抽时间细讲,这里给出binlog大概的数据格式
{
"data": [
{
"user_id": "2",
"username": "aaa",
"mobile": "ddd",
"password": "ddd",
"create_time": "2018-03-23 22:37:41"
}
],
"database": "renren_fast",
"es": 1624418195000,
"id": 2,
"isDdl": false,
"mysqlType": {
"user_id": "bigint",
"username": "varchar(50)",
"mobile": "varchar(20)",
"password": "varchar(64)",
"create_time": "datetime"
},
"old": [
{
"password": "ccc"
}
],
"pkNames": [
"user_id"
],
"sql": "",
"sqlType": {
"user_id": -5,
"username": 12,
"mobile": 12,
"password": 12,
"create_time": 93
},
"table": "tb_user",
"ts": 1624418196154,
"type": "UPDATE"
}
- Flink SQL对接Kafka中json格式的binlog数据
package com.dpf.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 读取kafka中canal json的数据,解析之后以json的方式存入kafka dwd层
*/
public class KafkaTranslateJson {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql("" +
"CREATE TABLE ods_binlog ( " +
" user_id INT, " +
" username STRING, " +
" mobile STRING, " +
" password STRING, " +
" create_time STRING " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'ods_binlog', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'properties.enable.auto.commit' = 'false', " +
" 'properties.session.timeout.ms' = '90000', " +
" 'properties.request.timeout.ms' = '325000', " +
" 'value.format' = 'canal-json' " +
")" +
"");
tableEnvironment.executeSql("" +
"CREATE TABLE kafka_binlog ( " +
" user_id INT, " +
" user_name STRING, " +
" mobile STRING, " +
" password STRING, " +
" create_time STRING, " +
" PRIMARY KEY (user_id) NOT ENFORCED" +
") WITH ( " +
" 'connector' = 'upsert-kafka', " +
" 'topic' = 'mysql_binlog', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'key.format' = 'json', " +
" 'value.format' = 'json' " +
")" +
"");
tableEnvironment.executeSql("insert into kafka_binlog select * from ods_binlog");
}
}
这里对Flink CDC原生的功能进行了扩展,为了实现根据canal type的过滤,源码修改如下:
- 修改org.apache.flink.formats.json.canal.CanalJsonDecodingFormat
static enum ReadableMetadata {
DATABASE("database", (DataType)DataTypes.STRING().nullable(), DataTypes.FIELD("database", DataTypes.STRING()), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.getString(pos);
}
}),
TABLE("table", (DataType)DataTypes.STRING().nullable(), DataTypes.FIELD("table", DataTypes.STRING()), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.getString(pos);
}
}),
//todo 修改元数据,添加binlog-type
BINLOG_TYPE("binlog-type", (DataType)DataTypes.STRING().nullable(), DataTypes.FIELD("type", DataTypes.STRING()), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.getString(pos);
}
}),
SQL_TYPE("sql-type", (DataType)DataTypes.MAP((DataType)DataTypes.STRING().nullable(), (DataType)DataTypes.INT().nullable()).nullable(), DataTypes.FIELD("sqlType", DataTypes.MAP((DataType)DataTypes.STRING().nullable(), (DataType)DataTypes.INT().nullable())), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.getMap(pos);
}
}),
PK_NAMES("pk-names", (DataType)DataTypes.ARRAY(DataTypes.STRING()).nullable(), DataTypes.FIELD("pkNames", DataTypes.ARRAY(DataTypes.STRING())), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.getArray(pos);
}
}),
INGESTION_TIMESTAMP("ingestion-timestamp", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), DataTypes.FIELD("ts", DataTypes.BIGINT()), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.isNullAt(pos) ? null : TimestampData.fromEpochMillis(row.getLong(pos));
}
}),
EVENT_TIMESTAMP("event-timestamp", (DataType)DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).nullable(), DataTypes.FIELD("es", DataTypes.BIGINT()), new MetadataConverter() {
private static final long serialVersionUID = 1L;
public Object convert(GenericRowData row, int pos) {
return row.isNullAt(pos) ? null : TimestampData.fromEpochMillis(row.getLong(pos));
}
});
final String key;
final DataType dataType;
final Field requiredJsonField;
final MetadataConverter converter;
private ReadableMetadata(String key, DataType dataType, Field requiredJsonField, MetadataConverter converter) {
this.key = key;
this.dataType = dataType;
this.requiredJsonField = requiredJsonField;
this.converter = converter;
}
}
不改会报如下错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Invalid metadata key 'value.binlog-type' in column 'binlog_type' of table 'default_catalog.default_database.ods_binlog'. The DynamicTableSource class 'org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource' supports the following metadata keys for reading:
value.database
value.table
value.sql-type
value.pk-names
value.ingestion-timestamp
value.event-timestamp
topic
partition
headers
leader-epoch
offset
timestamp
timestamp-type
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.lambda$validateAndApplyMetadata$6(DynamicSourceUtils.java:403)
at java.util.ArrayList.forEach(ArrayList.java:1257)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateAndApplyMetadata(DynamicSourceUtils.java:395)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:158)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
- 修改org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema
private static RowType createJsonRowType(
DataType physicalDataType, List<ReadableMetadata> readableMetadata) {
// Canal JSON contains other information, e.g. "ts", "sql", but we don't need them
DataType root =
DataTypes.ROW(
DataTypes.FIELD("data", DataTypes.ARRAY(physicalDataType)),
DataTypes.FIELD("old", DataTypes.ARRAY(physicalDataType)),
DataTypes.FIELD("type", DataTypes.STRING()),
ReadableMetadata.DATABASE.requiredJsonField,
ReadableMetadata.TABLE.requiredJsonField);
// append fields that are required for reading metadata in the root
final List<DataTypes.Field> rootMetadataFields =
readableMetadata.stream()
.filter(m -> m != ReadableMetadata.DATABASE && m != ReadableMetadata.TABLE && m != ReadableMetadata.BINLOG_TYPE)
.map(m -> m.requiredJsonField)
.distinct()
.collect(Collectors.toList());
return (RowType) DataTypeUtils.appendRowFields(root, rootMetadataFields).getLogicalType();
}
不改会报如下错误:
Exception in thread "main" org.apache.flink.table.api.ValidationException: Field names must be unique. Found duplicates: [type]
at org.apache.flink.table.types.logical.RowType.validateFields(RowType.java:272)
at org.apache.flink.table.types.logical.RowType.<init>(RowType.java:157)
at org.apache.flink.table.types.utils.DataTypeUtils.appendRowFields(DataTypeUtils.java:181)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.createJsonRowType(CanalJsonDeserializationSchema.java:370)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.<init>(CanalJsonDeserializationSchema.java:111)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema.<init>(CanalJsonDeserializationSchema.java:61)
at org.apache.flink.formats.json.canal.CanalJsonDeserializationSchema$Builder.build(CanalJsonDeserializationSchema.java:188)
at org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.createRuntimeDecoder(CanalJsonDecodingFormat.java:104)
at org.apache.flink.formats.json.canal.CanalJsonDecodingFormat.createRuntimeDecoder(CanalJsonDecodingFormat.java:46)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.createDeserialization(KafkaDynamicSource.java:427)
at org.apache.flink.streaming.connectors.kafka.table.KafkaDynamicSource.getScanRuntimeProvider(KafkaDynamicSource.java:199)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
- 以changelog json的方式转换canal数据流
connector地址:https://github.com/ververica/flink-cdc-connectors/tree/master/flink-format-changelog-json
实验用例:
package com.dpf.flink;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 读取kafka中canal json的数据,解析之后以json的方式存入kafka dwd层
*/
public class KafkaTranslateChangelogJson {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
tableEnvironment.executeSql("" +
"CREATE TABLE ods_binlog ( " +
" user_id INT, " +
" username STRING, " +
" mobile STRING, " +
" password STRING, " +
" create_time STRING " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'ods_binlog', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'properties.enable.auto.commit' = 'false', " +
" 'properties.session.timeout.ms' = '90000', " +
" 'properties.request.timeout.ms' = '325000', " +
" 'value.format' = 'canal-json' " +
")" +
"");
tableEnvironment.executeSql("" +
"CREATE TABLE dwd_binlog ( " +
" user_id INT, " +
" user_name STRING, " +
" mobile STRING, " +
" password STRING, " +
" create_time STRING " +
") WITH ( " +
" 'connector' = 'kafka', " +
" 'topic' = 'dwd_binlog', " +
" 'properties.bootstrap.servers' = '127.0.0.1:9092', " +
" 'format' = 'changelog-json'" +
")" +
"");
tableEnvironment.executeSql("insert into dwd_binlog select * from ods_binlog");
}
}
输出结果
{
"data": {
"user_id": 8,
"user_name": "gg",
"mobile": "gg",
"password": "gg",
"create_time": "2020-03-23 22:37:41"
},
"op": "+I"
}
结
flink-cdc相关的知识就总结到这里,讲比较基础,希望对大家的工作有帮助。