Flink CDC实时增量入湖

Flink 使用介绍相关文档目录

Flink 使用介绍相关文档目录

环境信息

  • Hadoop 3.1.1
  • Flink 1.17.2
  • Hudi 0.15.0
  • MySQL 5.7.x

MySQL开启Binlog

具体步骤参见:Flink 使用之 MySQL CDC

编译Hudi

首先clone HUDI项目。

git clone https://github.com/apache/hudi.git

切换到release-0.15.0分支。然后执行如下编译命令:

mvn clean package -Dflink1.17 -Dscala2.12 -Dspark3.3 -DskipTests -Pflink-bundle-shade-hive3

编译完毕后输出的hudi-flink1.17-bundle-0.15.0.jar位于hudi/packaging/hudi-flink-bundle/target中。复制走备用。

Flink配置

下载Flink 1.17.2二进制包解压到服务器任意目录备用。

配置checkpoint

编辑$FLINK_HOME/conf/flink-conf.yaml文件,加入如下配置启用checkpoint。示例checkpoint间隔时间为3s。

execution.checkpointing.interval: 3s

添加依赖

添加如下依赖到$FLINK_HOME/lib目录中。

  • flink-sql-connector-mysql-cdc-3.1.0.jar 点我下载
  • hudi-flink1.17-bundle-0.15.0.jar
  • mysql-connector-java-8.0.27.jar 点我下载

操作演示

下面以student表为例,演示MySQL CDC到Hudi全过程。

student表有3个字段:

  • id int类型
  • name varchar类型
  • score int类型

首先我们创建MySQL表,并写入初始数据:

create table student(
    id int,
    name varchar(50),
    score int
);

insert into student values(1, 'Paul', 123456),(2, 'Kate', 654321),(3, 'Peter', 222222);

接下来进入Flink SQL Client,创建student CDC表和hudi_student表,然后插入CDC表的内容到Hudi表中。需要执行如下SQL:

CREATE TABLE student (
    id int,
    name string,
    score int,
    PRIMARY KEY(id) NOT ENFORCED
) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.0.1',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'database-name' = 'demo',
    'table-name' = 'student',
    'scan.incremental.snapshot.chunk.key-column' = 'id'
);

CREATE TABLE hudi_student (
    id int,
    name string,
    score int,
    PRIMARY KEY(id) NOT ENFORCED
) with (
    'connector' = 'hudi',
    'path' = 'hdfs:///hudi_student',
    'table.type' = 'MERGE_ON_READ',
    'compaction.async.enabled' = 'true',
    'compaction.trigger.strategy' = 'num_commits',
    'compaction.delta_commits' = '5'
);


insert into hudi_student select * from student;

需要注意的是,如果MySQL表没有指定主键约束,CDC表的属性必须要添加'scan.incremental.snapshot.chunk.key-column'配置项用来指定主键。

最后执行:

select * from hudi_student;

可以查看同步到hudi_student表中的全量数据。

          id                           name       score
           1                           Paul      123456
           2                           Kate      654321
           3                          Peter      222222

新增/修改/删除同步演示

新增数据

在MySQL控制台执行:

insert into student values(4, Tom, 444444);

等待一段时间之后,在Flink SQL Client执行:

select * from hudi_student;

得到查询结果:

          id                           name       score
           1                           Paul      123456
           2                           Kate      654321
           3                          Peter      222222
           4                           Tom       444444

新的数据已经追加到了Hudi表中。

修改数据

在MySQL控制台执行:

update student set score=333333 where id=2;

等待一段时间之后,在Flink SQL Client执行:

select * from hudi_student;

得到查询结果:

          id                           name       score
           1                           Paul      123456
           2                           Kate      333333
           3                          Peter      222222
           4                           Tom       444444

发现id为2的数据已经修改。

删除数据

在MySQL控制台执行:

delete from student where id=2;

等待一段时间之后,在Flink SQL Client执行:

select * from hudi_student;

得到查询结果:

          id                           name       score
           1                           Paul      123456
           3                          Peter      222222
           4                           Tom       444444

发现id为2的数据已经删除。

MySQL CDC实时增量入湖的配置和演示过程到此结束。

附录

Oracle CDC情况说明

经本人试验Oracle CDC入湖存在如下问题:

Oracle CDC使用Flink SQL的方式只能获取到最后一条数据。无法读取到初始的全量数据。应为Flink CDC bug所致。参见:https://developer.aliyun.com/ask/623649

Oracle CDC的延迟非常高。即便是按照文档添加了:

'debezium.log.mining.strategy' = 'online_catalog',
'debezium.log.mining.continuous.mine' = 'true'

效果仍然不好。

Oracle CDC增量修改的数据同步到Hudi表之后会导致Hudi表数据消失,原因未知。

本人结论,不建议使用Oracle CDC实时增量入湖,现阶段问题较多。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容