Flink CDC实战之Mongo同步Mysql

简介

面对复杂的业务场景,企业可能会选用不同的数据库,这给业务之间数据交互,数据分析等带来一定的困扰,对此,数据同步起到很重要的作用,目前业内成熟的数据同步组件很多,支持实时同步的组件有:Canal,Maxwell,Debezium等等,Flink作为实时处理引擎,采用一种sql的方式方便快捷的实现了数据同步,笔者今天就以mongo同步到mysql为例做个演示,flink使用的版本为1.13.5。

Mongo环境搭建

Flink MongoDB CDC Connector是基于MongoDB Change Streams实现的,所以单机版的Mongo DB不支持。MongoDB 提供了副本集和分片集两种集群模部署模式,副本集相当于mysql的主从复制,集群模式相当于多实例分片存储集群。笔者在docker中部署了一个副本集群进行演示。

  1. 创建三个容器

docker run --name mongo0 -p 27000:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo1 -p 27001:27017 -d mongo --replSet "mg-cdc"
docker run --name mongo2 -p 27002:27017 -d mongo --replSet "mg-cdc"

  1. 进入容器mongo0

docker exec -it mongo0 /bin/bash

  1. 进入客户端并配置集群
bin/mongosh

//ifconfig 查看宿主机IP地址
config = {"_id":"mg-cdc",
          "members":[
          {"_id":0,host:"192.168.1.9:27000"},
          {"_id":1,host:"192.168.1.9:27001"},
          {"_id":2,host:"192.168.1.9:27002"}
          ]
}
 
rs.initiate(config)
rs.status()

如果容器重启了,会报错

MongoServerError: already initialized

需要重新配置集群

rs.reconfig(config)

可能报如下错误

MongoServerError: New config is rejected :: caused by :: replSetReconfig should only be run on a writable PRIMARY. Current state REMOVED;

根据报错信息加强制指令执行

rs.reconfig(config, {force:true})

  1. 创建Mongo新用户,给Flink MongoDB CDC使用
use admin;
db.createUser({
  user: "flinkuser",
  pwd: "flinkpw",
  roles: [
    { role: "read", db: "admin" },
    { role: "readAnyDatabase", db: "admin" }
  ]
});
  1. 测试changestream
use wlapp;
cursor = db.plan_joined_user.watch()
cursor.next()

Flink CDC 代码实现

  1. 相关依赖
<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-connector-mongodb-cdc</artifactId>
    <version>2.2.1</version>
</dependency>

<dependency>
    <groupId>com.ververica</groupId>
    <artifactId>flink-sql-connector-mongodb-cdc</artifactId>
    <version>2.2.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/connect-api -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-api</artifactId>
    <version>2.7.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.13.5</version>
</dependency>
  1. 代码
public class FlinkCdcSync {
    public static void main(String[] args) {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setBufferTimeout(BUFFER_TIMEOUT_MS);
        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS, CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(CHECKPOINT_TIMEOUT_MS);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(CHECKPOINT_MIN_PAUSE_MS);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.of(5, TimeUnit.MINUTES),Time.of(10, TimeUnit.SECONDS)));
        final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        tableEnv.executeSql("CREATE TABLE mongo_plan_joined_user (" +
                "  _id STRING," +
                "  plan_id STRING," +
                "  user_id STRING," +
                "  invite_share_log_id STRING," +
                "  joined_time STRING," +
                "  target_value STRING," +
                "  PRIMARY KEY(_id) NOT ENFORCED" +
                ") WITH (" +
                "  'connector' = 'mongodb-cdc'," +
                "  'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002'," +
                "  'username' = 'flinkuser'," +
                "  'password' = 'flinkpw'," +
                "  'database' = 'wlapp'," +
                "  'collection' = 'plan_joined_user'" +
                ")");

        tableEnv.executeSql("CREATE TABLE mysql_plan_joined_user (" +
                "  id STRING," +
                "  plan_id STRING," +
                "  user_id STRING," +
                "  invite_share_log_id STRING," +
                "  joined_time STRING," +
                "  target_value STRING," +
                "  PRIMARY KEY (id) NOT ENFORCED" +
                ") WITH (" +
                "   'connector' = 'jdbc'," +
                "   'url' = 'jdbc:mysql://localhost:3306/wlapp'," +
                "   'table-name' = 'plan_joined_user'," +
                "   'driver' = 'com.mysql.cj.jdbc.Drive'," +
                "   'username' = 'root'," +
                "   'password' = '123456'," +
                "   'scan.fetch-size' = '200'" +
                ")");

        tableEnv.executeSql("insert into mysql_plan_joined_user select * from mongo_plan_joined_user");
    }
}

Flink CDC SQL实现

  1. 相关jar包
  • avro-1.11.0.jar
  • kafka-clients-2.7.0.jar
  • connect-api-2.7.0.jar
  • flink-sql-connector-mongodb-cdc-2.2.1.jar
  • flink-connector-jdbc_2.11-1.13.5.jar
  • mysql-connector-java-8.0.29.jar
  1. 注意事项
  • kafka-client需要适配,本地采用的是2.7.0,过程中使用1.1.1会报错
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
  • flink客户端lib中不应该存放flink-connector-mongodb-cdc-2.2.1.jar、flink-connector-debezium-2.2.1.jar 因为flink-sql-connector-mongodb-cdc-2.2.1.jar 都已经以shade的方式打进去了,否则会报如下错误
java.lang.NoClassDefFoundError: com/ververica/cdc/debezium/internal/FlinkOffsetBackingStore
    at com.ververica.cdc.debezium.DebeziumSourceFunction.run(DebeziumSourceFunction.java:369)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
  1. 代码
SET 'table.dml-sync' = 'false';
SET 'state.backend' = 'filesystem';
SET 'state.checkpoints.dir' = 'hdfs://namespace/user/flink/checkpoints';
SET 'execution.checkpointing.mode' = 'EXACTLY_ONCE';
SET 'execution.checkpointing.interval' = '2min';
SET 'execution.checkpointing.min-pause' = '1min';
SET 'execution.checkpointing.max-concurrent-checkpoints' = '1';
SET 'execution.checkpointing.prefer-checkpoint-for-recovery' = 'true';
SET 'execution.runtime-mode' = 'streaming';

CREATE TABLE mongo_plan_joined_user (
    _id STRING,
    plan_id STRING,
    user_id STRING,
    invite_share_log_id STRING,
    joined_time STRING,
    target_value STRING,
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
    'connector' = 'mongodb-cdc',
    'hosts' = '127.0.0.1:27000,127.0.0.1:27001,127.0.0.1:27002',
    'username' = 'flinkuser',
    'password' = 'flinkpw',
    'database' = 'wlapp',
    'collection' = 'plan_joined_user'
);

CREATE TABLE mysql_plan_joined_user (
    id STRING,
    plan_id STRING,
    user_id STRING,
    invite_share_log_id STRING,
    joined_time STRING,
    target_value STRING,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://127.0.0.1:3306/data',
    'table-name' = 'plan_joined_user',
    'driver' = 'com.mysql.cj.jdbc.Driver',
    'username' = 'root',
    'password' = '123456',
    'scan.fetch-size' = '200'
);

INSERT INTO mysql_plan_joined_user SELECT * FROM mongo_plan_joined_user;

本例简单测试了mongo cdc同步mysql的场景,后续在生产中遇到坑也会同步更新。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容