前言
最近公司需要搭建基于 Flink 框架的实时数仓,为了保证数仓的稳定性,决定引用消息中间件 Kafka 。Kafka 的作用在于 "削峰填谷" ,所谓的“削峰填谷”就是指缓冲上下游瞬时突发流量,使其更平滑。特别是对于那种发送能力很强的上游系统,如果没有消息引擎的保护,“脆弱”的下游系统可能会直接被压垮导致全链路服务“雪崩”。但是,一旦有了消息引擎,它能够有效地对抗上游的流量冲击,真正做到将上游的“峰”填满到“谷”中,避免了流量的震荡。消息引擎系统的另一大好处在于发送方和接收方的松耦合,这也在一定程度上简化了应用的开发,减少了系统间不必要的交互。
那么问题来了,如何将源库中的全量、增量数据导入至 Kafka 主题中呢?本文章将采用 Kafka connector 的方式连接 MySQL ,来实现端到端的数据同步。
JDBC Connector
JDBC connector
允许您通过JDBC驱动程序将任何关系型数据库中的数据导入到Kafka的主题Topic中。通过使用JDBC,这个连接器可以支持各种数据库,不需要为每个数据库定制代码。
通过定期地执行SQL查询语句并为结果集中的每一行创建输出记录来加载数据。在默认情况下,在一个数据库中的所有表都会被复制,每个表都复制到自己的输出主题。数据库那些新的或删除的表被监视并自动适应调整。当从表中复制数据时,连接器可以仅仅加载新增或修改的行通过指定哪些列应当被用来发现新增或修改的数据。
测试环境
IP | 操作系统 | 配置 | 服务器用途 |
---|---|---|---|
172.21.22.186 | CentOS Linux release 7.3.1611 (Core) | 4vC/8Gb/400Gb | 测试集群 |
172.21.22.187 | CentOS Linux release 7.3.1611 (Core) | 4vC/8Gb/400Gb | 测试集群 |
172.21.22.188 | CentOS Linux release 7.3.1611 (Core) | 4vC/8Gb/400Gb | 测试集群 |
172.21.23.238 | CentOS Linux release 7.3.1611 (Core) | 4vC/8Gb/400Gb | MySQL/1 |
前置条件
主机名 | JDK | ZK | Kafka |
---|---|---|---|
master | jdk1.8.0_251 | zookeeper-3.4.14 | kafka_2.11-2.2.0 |
slave1 | jdk1.8.0_251 | zookeeper-3.4.14 | kafka_2.11-2.2.0 |
slave2 | jdk1.8.0_251 | zookeeper-3.4.14 | kafka_2.11-2.2.0 |
以下测试请在能够正常生产消费的
kafka
集群中进行
1. incrementing 自增模式
准备工作
- 连接 mysql 创建测试库、表
[bigdata@slave2 ~]$ mysql -h 172.21.23.238 -u root -p
[bigdata@slave2 ~]$ # 密码省略
mysql> create database test_kafka_connector;
mysql> use test_kafka_connector;
# 创建 `test_kafka_connector` 库 omneo_incrementing测试表
CREATE TABLE IF NOT EXISTS test_kafka_connector.omneo_incrementing(
pid int(11) NOT NULL AUTO_INCREMENT,
uuid varchar(100) NOT NULL,
firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
birthdate varchar(20),
postalcode varchar(20),
city varchar(20),
sexe varchar(20),
status varchar(20),
PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
# 创建 `kafka_connector` 库 kafka_omneo_incrementing测试表
CREATE TABLE IF NOT EXISTS kafka_connector.kafka_omneo_incrementing(
pid int(11) NOT NULL AUTO_INCREMENT,
uuid varchar(100) NOT NULL,
firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
birthdate varchar(20),
postalcode varchar(20),
city varchar(20),
sexe varchar(20),
status varchar(20),
PRIMARY KEY (pid)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
- 切换至
$KAFKA_HOME/config
目录,修改/新增source-incrementing-mysql.properties
,内容如下:
name=mysql-a-source-omneo
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://172.21.23.238:3306/test_kafka_connector?user=root&password=1qaz@WSX
# incrementing 自增
mode=incrementing
# 自增字段 pid
incrementing.column.name=pid
# 白名单表 person
table.whitelist=omneo_incrementing
# topic前缀 mysql-kafka-
topic.prefix=mysql-kafka-
- 修改、新增
sink-incrementing-mysql.properties
,内容如下:
name=mysql-a-sink-omneo
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-omneo_incrementing
# 配置JDBC链接
connection.url=jdbc:mysql://172.21.23.238:3306/kafka_connector?user=root&password=1qaz@WSX
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafka_omneo_incrementing
- 创建 kafka topic
mysql-kafka-omneo-incrementing
,命令如下
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-omneo-incrementing
- 启动
kafka connect
,命令如下
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/source-incrementing-mysql.properties $KAFKA_HOME/config/sink-incrementing-mysql.properties
启动成功如下图所示
- 向
test_kafka_connector.omneo_incrementing
插入10条测试数据
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(1,'3280983542a7-5bdb-d564-3133-276ae3ce','Nicole','Spaggiari','06/03/1942','34270','Miami','female','maried');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(2,'9089753542a7-5bdb-d564-3133-276ae3ce','Gaelle','O'Dell','23/09/2002','05274','Ottawa','unknown','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(3,'8939383542a7-5bdb-d564-3133-276ae3ce','Kevin','Spaggiari','23/05/1965','H2M1N6','Maugio','unknown','single');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(4,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier','O'Dell','30/06/1998','05274','Cocagne','male','maried');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(5,'9186073542a7-5bdb-d564-3133-276ae3ce','Gauthier','Garbuet','20/03/2000','34920','Ballancourt','female','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(6,'8983983542a7-5bdb-d564-3133-276ae3ce','Gaelle','Berrouard','20/05/1946','91610','Cocagne','female','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(7,'4518023542a7-5bdb-d564-3133-276ae3ce','Nicole','Spaggiari','30/06/1998','34920','Le Cres','male','veuve');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(8,'9721973542a7-5bdb-d564-3133-276ae3ce','Maurice','Berrouard','23/05/1965','H2M1N6','Ballancourt','male','en couple');
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(9,'4037673542a7-5bdb-d564-3133-276ae3ce','Gaelle','Berrouard','30/06/1998','H2M2V5','Ballancourt','male','single');
插入成功如下图所示
- 启动 kafka consumer 消费
mysql-kafka-omneo_incrementing
kafka-console-consumer.sh --bootstrap-server master:9092 --topic mysql-kafka-omneo_incrementing --from-beginning
观察 consumer 控制台,发现 omneo
表中的10条数据已经生产至了 mysql-kafka_omneo
主题当中
insert into omneo_incrementing(pid, uuid, firstname, lastname, birthdate, postalcode, city, sexe, status) values(10,'6633863542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'23/09/2002','H2M2V5','Maugio','female','veuve');
- 观察 consumer 控制台,发现
mysql-kafka_omneo
主题中pid=10
条数据被消费到了控制台。
- 更新
pid=10
的 firstname="world"
INSERT INTO omneo VALUES (10,'6633863542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'23/09/2002','H2M2V5','Maugio','female','veuve') ON DUPLICATE KEY UPDATE firstname="world";
- 查看 mysql 中的
omneo
表发现值已经被改变
- consumer 控制台
pid=10
的记录值并没有被更新
incrementing 结论
在 JDBC Sink Connector 官网中指出insert.mode有且仅有两个值insert.mode=insert
只接收标准的INSERT SQL新增语句。而insert.mode=upsert
接收新增和更新,当对主键修改时也可以洞察并且输出。而insert是无法满足此要求的,因此根据实际业务使用的场景选择insert.mode。
我将
insert.mode=insert
修改为insert.mode=upsert
,再次运行修改过后的数据并没有将修改过后的最新数据生产至kafka topic
修改字段值之前修改字段值之后
我并没看出insert
和upsert
两个参数的区别,希望测试成功的人留言指正或者微信指正,感谢!!!
2. timestamp 时间戳模式
- 创建
test_kafka_connector
库 omneo_timestamp 测试表
CREATE TABLE IF NOT EXISTS test_kafka_connector.omneo_timestamp(
pid int(11) NOT NULL AUTO_INCREMENT,
uuid varchar(100) NOT NULL,
firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
birthdate varchar(20),
postalcode varchar(20),
city varchar(20),
sexe varchar(20),
status varchar(20),
commenttime timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
注意timestamp必须指定not null,否则会报错! (无法使用时间戳列进行增量查询因为时间戳字段是可为空的)
- 创建
kafka_connector
库 kafka_omneo_timestamp 测试表
CREATE TABLE IF NOT EXISTS kafka_connector.kafka_omneo_timestamp(
pid int(11) NOT NULL AUTO_INCREMENT,
uuid varchar(100) NOT NULL,
firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
birthdate varchar(20),
postalcode varchar(20),
city varchar(20),
sexe varchar(20),
status varchar(20),
commenttime timestamp NULL DEFAULT NULL,
PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 切换至
$KAFKA_HOME/config
目录,修改/新增source-timestamp-mysql.properties
,内容如下:
name=mysql-a-source-omneo
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://172.21.23.238:3306/test_kafka_connector?user=root&password=1qaz@WSX
# incrementing 自增
mode=timestamp
# 自增字段 pid
timestamp.column.name=commenttime
# 白名单表 person
table.whitelist=omneo_timestamp
# topic前缀 mysql-kafka-
topic.prefix=mysql-kafka-
- 修改、新增
source-timestamp-mysql.properties
,内容如下:
name=mysql-a-sink-omneo
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-omneo
# 配置JDBC链接
connection.url=jdbc:mysql://172.21.23.238:3306/kafka_connector?user=root&password=1qaz@WSX
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafkatable
table.name.format=kafka_omneo_timestamp
- 创建 kafka topic
mysql-kafka-omneo-timestamp
,命令如下
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-omneo_timestamp
- 启动
kafka connect
,命令如下
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/source-timestamp-mysql.properties $KAFKA_HOME/config/sink-timestamp-mysql.properties
- 向
test_kafka_connector.omneo_timestamp
插入4条测试数据
insert into omneo_timestamp values(1,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:00:01');
insert into omneo_timestamp values(2,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:03:45');
insert into omneo_timestamp values(3,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:06:01');
insert into omneo_timestamp values(4,'8379693542a7-5bdb-d564-3133-276ae3ce','Gauthier',"O'Dell",'30/06/1998','05274','Cocagne','male','maried','2018-01-01 00:10:01');
插入成功如下图所示
- 观察 consumer 控制台,发现
mysql-kafka-omneo_timestamp
主题数据被消费到了控制台。
- 修改 pid 为
2
和4
的firstname
,并修改业务时间字段commenttime
update omneo_timestamp set firstname = "world" ,commenttime="2018-12-20 15:55:10" where pid in(2,4);
修改完成如下所示
- 观察 consumer 控制台,发现
mysql-kafka-omneo_timestamp
主题数据产生了变化如下
timestamp 结论
- 如果修改的时间戳早于latest time ,则不会洞察到更新。例如MySQL中的now()获取当前时间就是很好的能被获取到的例子。
- 源表向目标表传输数据,假设有两条(或以上)的数据行拥有同样的时间戳,如果在传输第二条的过程中崩溃,恢复过后第二条将会被丢失,因为latest time已经被记录过了,他只会去找更新的下一次时间。这种方式虽然能获取到update更新,但是不够稳健。而如果使用自增字段加时间戳字段混合的方式,即使崩溃也能记录到更新的最大ID,恢复之后可以被找到不会丢失。因此我们更推荐第三种方式!timestamp+incrementing
incrementing + timestamp 混合模式
1.创建 test_kafka_connector
库 omneo_incrementing_timestamp 测试表
CREATE TABLE IF NOT EXISTS test_kafka_connector.omneo_incrementing_timestamp(
pid int(11) NOT NULL AUTO_INCREMENT,
uuid varchar(100) NOT NULL,
firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
birthdate varchar(20),
postalcode varchar(20),
city varchar(20),
sexe varchar(20),
status varchar(20),
commenttime timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 创建
kafka_connector
库 kafka_omneo_incrementing_timestamp 测试表
CREATE TABLE IF NOT EXISTS kafka_connector.kafka_omneo_incrementing_timestamp(
pid int(11) NOT NULL AUTO_INCREMENT,
uuid varchar(100) NOT NULL,
firstname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
lastname varchar(20) CHARACTER SET utf8 DEFAULT NULL,
birthdate varchar(20),
postalcode varchar(20),
city varchar(20),
sexe varchar(20),
status varchar(20),
commenttime timestamp NOT NULL DEFAULT current_timestamp,
PRIMARY KEY (pid)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 切换至
$KAFKA_HOME/config
目录,修改/新增source-incrementing-timestamp-mysql.properties
name=mysql-a-source-omneo_incrementing
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://172.21.23.238:3306/test_kafka_connector?user=root&password=1qaz@WSX
# incrementing 自增
mode=timestamp+incrementing
# 时间戳字段
timestamp.column.name=commenttime
# 自增字段 pid
incrementing.column.name=pid
# 白名单表 person
table.whitelist=omneo_incrementing_timestamp
# topic前缀 mysql-kafka-
topic.prefix=mysql-kafka-
- 修改、新增
sink-incrementing-timestamp-mysql.properties
name=mysql-a-sink-omneo
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名称
topics=mysql-kafka-omneo_incrementing_timestamp
# 配置JDBC链接
connection.url=jdbc:mysql://172.21.23.238:3306/kafka_connector?user=root&password=1qaz@WSX
# 不自动创建表,如果为true,会自动创建表,表名为topic名称
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面两个参数配置了以pid为主键更新
pk.mode = record_value
pk.fields = pid
#表名为kafka_omneo_incrementing_timestamp
table.name.format=kafka_omneo_incrementing_timestamp
- 创建 kafka topic
mysql-kafka-omneo-incrementing-timestamp
,命令如下
kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic mysql-kafka-omneo_incrementing_timestamp
- 启动
kafka connect
,命令如下
connect-standalone.sh $KAFKA_HOME/config/connect-standalone.properties $KAFKA_HOME/config/source-incrementing-timestamp-mysql.properties $KAFKA_HOME/config/sink-incrementing-timestamp-mysql.properties
- 向
test_kafka_connector.omneo_incrementing_timestamp
插入4条测试数据
insert into omneo_incrementing_timestamp values(1,"0049683542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H2M2V5","Ballancourt","male","en couple","2020-05-09 11:01:54");
insert into omneo_incrementing_timestamp values(2,"8338623542a7-5bdb-d564-3133-276ae3ce","Gauthier","Garbuet","23/05/1965","05274","Cocagne","female","maried","2020-05-09 11:01:54");
insert into omneo_incrementing_timestamp values(3,"3374573542a7-5bdb-d564-3133-276ae3ce","Maurice","Samuel","01/11/1977","H0H0H0","Ottawa","male","en couple","2020-05-09 11:01:54");
insert into omneo_incrementing_timestamp values(4,"5494133542a7-5bdb-d564-3133-276ae3ce","Nicole","Garbuet","01/11/1977","H0H0H0","Maugio","unknown","single","2020-05-09 11:01:54");
- 观察 consumer 控制台,发现
mysql-kafka-omneo_incrementing_timestamp
主题数据被消费到了控制台。
kafka-console-consumer.sh --bootstrap-server master:9092 --topic mysql-kafka-omneo_incrementing_timestamp --from-beginning
- 修改 pid 为
2
和4
的firstname
,并修改业务时间字段commenttime
update omneo_incrementing_timestamp set firstname = "world" ,commenttime="2020-12-20 15:55:10" where pid in(2,4);
修改完成如下所示
- 观察 consumer 控制台,发现
mysql-kafka-omneo_incrementing_timestamp
主题,发现手动修改的两条数据并没有被消费到控制台。
timestamp + incrementing 混合模式结论
由以上测试结果可知,这次的测试不知道是我的粗心导致哪些细节的地方没有注意到,还是 kafka connect 的 BUG,后续我还会继续去排查问题的原因,如果有小伙伴测试成功了,希望小伙伴在留言/私信不吝赐教。
虽然本人测试失败了,但是还是推荐大家使用 canal 来完成 MySQL 数据近实时同步至 Kafka ,因为 kafka connect 是通过 select 的语句将 Mysql中的数据查询出来然后再写入 kafka ,这里会有一个瓶颈问题
当查询的数据量大于千万级别的时候,查询速度会特别的慢(没有建索引),而 canal 则是通过监控 MySQL 的 binlog 日志来完成数据同步的,大数据量的场景下建议使用 canal
。
请参考笔者另一篇文章:Canal + Kafka 实现 MySQL 的 binlog 近实时同步