世间的一切都可以用时间来解决
前言:
因项目需要,需要调研debezium,也就是捕获数据更改(CDC)。翻阅了网上的一些资料和官网信息,将之总结如下:
环境准备:
ubuntu18.04 + mysql5.7 + Apache-kafka-2.4 + Debezium-connector-mysql-1.4.2-Final-plugin + kafka-connector-jdbc
组件安装
1.首先针对于mysql5.7要开启binlog,在配置文件(my.cnf 或者mysql.cnf)中添加如下配置,配置文件在/etc/mysql/目录下:
[mysqld]
server-id = 112233
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
在这里要注意:bind-address要改为具体的IP或者0.0.0.0,不然后续的kafka-connector启动会出错 ,目录在/etc/mysql/mysql.conf.d/mysqld.conf
bind-address = 0.0.0.0
重启mysql 服务
/etc/init.d/mysql restart
登录mysql,执行set global show_compatibility_56=on; select variable_value as "BINARY LOGGING STATUS (log-bin) ::"
FROM information_schema.global_variables WHERE variable_name='log_bin';的结果是on 说明开启成功。
mysql> set global show_compatibility_56=on;
Query OK, 0 rows affected (0.00 sec)
mysql> set global show_compatibility_56=on; select variable_value as "BINARY LOGGING STATUS (log-bin) ::"
Query OK, 0 rows affected (0.00 sec)
-> FROM information_schema.global_variables WHERE variable_name='log_bin';
+------------------------------------+
| BINARY LOGGING STATUS (log-bin) :: |
+------------------------------------+
| ON |
+------------------------------------+
1 row in set, 1 warning (0.00 sec)
- 解压Debezium-connector-mysql-1.4.2-Final-plugin(解压后的目录名为:debezium-connector-mysql) 并将kafka-connector-jdbcjar包放入到debezium-connector-mysql目录下,以下是放之后的结果:
antlr4-runtime-4.7.2.jar
debezium-api-1.4.2.Final.jar
debezium-connector-mysql-1.4.2.Final.jar
debezium-core-1.4.2.Final.jar
debezium-ddl-parser-1.4.2.Final.jar
failureaccess-1.0.1.jar
guava-30.0-jre.jar
mysql-binlog-connector-java-0.23.3.jar
mysql-connector-java-8.0.21.jar
kafka-connect-jdbc-5.0.0.jar
- 更改kafka配置目录下config/connect-standalone.properties 中的plugin.path:
plugin.path=/debezium-connector-mysql
- 创建mysql 配置文件,命名为mysql.properties:
name=test2
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=192.168.89.120
database.port=3306
database.user=root
database.password=12345678
database.server.id=112233 #要跟数据的server-id一致
database.server.name=fullfillment
database.history.kafka.bootstrap.servers=192.168.89.120:19092
database.history.kafka.topic=test2.fullfillment
include.schema.changes=true
- 启动kafka-connector
切换到kafka bin目录下:执行以下语句:
./connect-standalone.sh ../config/connect-standalone.properties ../config/mysql.perpertis
这样就成功运行了debezium!