简介
面对复杂的业务场景,企业可能会选用不同的数据库,这给业务之间数据交互,数据分析等带来一定的困扰,对此,数据同步起到很重要的作用,目前业内成熟的数据同步组件很多,支持实时同步的组件有:Canal,Maxwell,Debezium等等,Flink作为实时处理引擎,采用一种sql的方式方便快捷的实现了数据同步,笔者今天就以mongo同步到mysql为例做个演示,flink使用的版本为1.13.5。
Mongo环境搭建
Flink MongoDB CDC Connector是基于MongoDB Change Streams实现的,所以单机版的Mongo DB不支持。MongoDB 提供了副本集和分片集两种集群模部署模式,副本集相当于mysql的主从复制,集群模式相当于多实例分片存储集群。笔者在docker中部署了一个副本集群进行演示。
- 创建三个容器
docker run --name mongo0 -p 27000:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo1 -p 27001:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo2 -p 27002:27017 -d mongo --replSet "mg-cdc"
- 进入容器mongo0
docker exec -it mongo0 /bin/bash
- 进入客户端并配置集群
bin/mongosh
//ifconfig 查看宿主机IP地址
config = {"_id":"mg-cdc",
"members":[
{"_id":0,host:"192.168.1.9:27000"},
{"_id":1,host:"192.168.1.9:27001"},
{"_id":2,host:"192.168.1.9:27002"}
]
}
rs.initiate(config)
rs.status()
如果容器重启了,会报错
MongoServerError: already initialized
需要重新配置集群
rs.reconfig(config)
可能报如下错误
MongoServerError: New config is rejected :: caused by :: replSetReconfig should only be run on a writable PRIMARY. Current state REMOVED;
根据报错信息加强制指令执行
rs.reconfig(config, {force:true})
- 创建Mongo新用户,给Flink MongoDB CDC使用
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
});
- 测试changestream
use wlapp;
cursor = db.plan_joined_user.watch()
cursor.next()
Flink CDC 代码实现
- 相关依赖
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-sql-connector-mongodb-cdc</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>2.7.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.13.5</version>
</dependency>
- 代码
public class FlinkCdcSync {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setBufferTimeout(BUFFER_TIMEOUT_MS);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE mongo_plan_joined_user (" +
" _id STRING," +
" plan_id STRING," +
" user_id STRING," +
" invite_share_log_id STRING," +
" joined_time STRING," +
" target_value STRING," +
" PRIMARY KEY(_id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'mongodb-cdc'," +
" 'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002'," +
" 'username' = 'flinkuser'," +
" 'password' = 'flinkpw'," +
" 'database' = 'wlapp'," +
" 'collection' = 'plan_joined_user'" +
")");
tableEnv.executeSql("CREATE TABLE mysql_plan_joined_user (" +
" id STRING," +
" plan_id STRING," +
" user_id STRING," +
" invite_share_log_id STRING," +
" joined_time STRING," +
" target_value STRING," +
" PRIMARY KEY (id) NOT ENFORCED" +
") WITH (" +
" 'connector' = 'jdbc'," +
" 'url' = 'jdbc:mysql://localhost:3306/wlapp'," +
" 'table-name' = 'plan_joined_user'," +
" 'driver' = 'com.mysql.cj.jdbc.Drive'," +
" 'username' = 'root'," +
" 'password' = '123456'," +
" 'scan.fetch-size' = '200'" +
")");
tableEnv.executeSql("insert into mysql_plan_joined_user select * from mongo_plan_joined_user");
}
}
Flink CDC SQL实现
- 相关jar包
- avro-1.11.0.jar
- kafka-clients-2.7.0.jar
- connect-api-2.7.0.jar
- flink-sql-connector-mongodb-cdc-2.2.1.jar
- flink-connector-jdbc_2.11-1.13.5.jar
- mysql-connector-java-8.0.29.jar
- 注意事项
- kafka-client需要适配,本地采用的是2.7.0,过程中使用1.1.1会报错
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
- flink客户端lib中不应该存放flink-connector-mongodb-cdc-2.2.1.jar、flink-connector-debezium-2.2.1.jar 因为flink-sql-connector-mongodb-cdc-2.2.1.jar 都已经以shade的方式打进去了,否则会报如下错误
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:369)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
- 代码
SET 'table.dml-sync' = 'false';
SET 'state.backend' = 'filesystem';
SET 'state.checkpoints.dir' = 'hdfs://namespace/user/flink/checkpoints';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2min';
SET 'execution.checkpointing.min-pause' = '1min';
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
SET 'execution.runtime-mode' = 'streaming';
CREATE TABLE mongo_plan_joined_user (
_id STRING,
plan_id STRING,
user_id STRING,
invite_share_log_id STRING,
joined_time STRING,
target_value STRING,
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc',
'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'wlapp',
'collection' = 'plan_joined_user'
);
CREATE TABLE mysql_plan_joined_user (
id STRING,
plan_id STRING,
user_id STRING,
invite_share_log_id STRING,
joined_time STRING,
target_value STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://127.0.0.1:3306/data',
'table-name' = 'plan_joined_user',
'driver' = 'com.mysql.cj.jdbc.Driver',
'username' = 'root',
'password' = '123456',
'scan.fetch-size' = '200'
);
INSERT INTO mysql_plan_joined_user SELECT * FROM mongo_plan_joined_user;
结
本例简单测试了mongo cdc同步mysql的场景,后续在生产中遇到坑也会同步更新。