参考文档
https://github.com/ververica/flink-cdc-connectors
安装
下载jar包
https://github.com/ververica/flink-cdc-connectors/wiki/Downloads
这里建议使用maven下载
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.11.0</version>
</dependency>
<dependency>
<groupId>com.alibaba.ververica</groupId>
<!-- add the dependency matching your database -->
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>1.1.0</version>
</dependency>
将jar包放到flink集群flink/lib下
注意这里需要放置flink-cdc-connectors和flink-jdbc-connectors
重启集群
案例:MySQLSource—>Flink SQL—>MysqlSink
在MySQL数据库中新建数据库
新建数据库inventory
新建表products,company作为cdc同步表
新建表result作为products,company join后结果存放的表
CREATE TABLE `products` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `company` (
`id` int NOT NULL,
`company` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
CREATE TABLE `result` (
`id` int NOT NULL,
`name` varchar(45) DEFAULT NULL,
`description` varchar(45) DEFAULT NULL,
`weight` decimal(10,3) DEFAULT NULL,
`company` varchar(45) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
在数据库中手动插入数据
启动SQL-client.sh客户端
新建动态表sql
-- creates a mysql cdc table source
--同步products表
CREATE TABLE mysql_binlog (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3)
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'inventory',
'table-name' = 'products'
);
--同步company表
CREATE TABLE mysql_company (
id INT NOT NULL,
company STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'inventory',
'table-name' = 'company'
);
--存放join之后结果的表,注意建表时必须指定主键,不然会报错
CREATE TABLE mysql_result (
id INT NOT NULL,
name STRING,
description STRING,
weight DECIMAL(10,3),
company STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/inventory',
'username' = 'root',
'password' = 'root',
'table-name' = 'result'
);
查询数据
>show tables;
------------------------------------
mysql_binlog
------------------------------------
>select * from mysql_binlog;
显示以下结果,证明表数据同步成功
在MySQL客户端动态新增数据,可以观察到SQL-client客户端数据也一起跟着变化
执行以下SQL将表mysql_binlog 和表mysql_company 进行左外连接,结果插入到mysql_result
注意:这里执行insert语句来触发数据同步执行
insert into mysql_result (id,name,description,weight,company)
select
a.id,
a.name,
a.description,
a.weight,
b.company
from mysql_binlog a
left join mysql_company b
on a.id = b.id;
生成计划
执行以上语句会生成一个session job任务,对数据进行流式处理。
flink cdc connector和 flink jdbc connector区别
- flink jdbc connector更接近批处理,没有实时同步数据的能力
flink cdc connector也有其局限性:
- 支持的数据库:MySQL,PostgreSql
- 由于cdc connector在同步新增数据时是伪装成为MySQL slave同步MySQL的binlog,仅仅支持同步新增和修改的数据,对删除的数据无法做出处理。