本文章主要是对开源的kafka-connect-cdc-mssql进行编译并集成到confluent平台中,鉴于还不太熟悉部分平台的功能,仅简单介绍此次实施的步骤。因
1.安装sql-server的docker image
安装步骤参考
注意版本以及ubuntu的docker需要root用户
安装结束后使用如下命令进入sql server客户端操作
root@xh:# sqlcmd -S localhost -U SA -P '密码' //注意密码必须是大小写加特殊字符,否则无法创建dcoker镜像
- 下载并编译源码
使用maven进行编译,你需要提前下载JDBC driver,可自行搜索,下载地址,注意自己的sql-server的JDBC driver版本即可
之后maven insatll:
cd ~/Projects/IdeaProjects/kafka-connect-cdc-mssql //x项目所在位置
$ mvn install:install-file -DgroupId=com.microsoft.sqlserver -DartifactId=sqljdbc4 -Dversion=6.0.7130 -Dpackaging=jar -Dfile=<path to the download> //注意你自己的sqljdbc版本即可
然后编译,因为test环境不同,易出错,可使用如下命令:
$ mvn clean package -Dmaven.test.skip=true
$ cp ./target/kafka-connect-cdc-mssql-0.0.1-SNAPSHOT.jar ~/workspace/confluent-3.3.0/share/java/kafka-connect-mssql //把生成的jar包放到confluent下的/share/java/自己创建的文件夹下
- 启动kafka
首先启动zookeeper
cd ~/worksapce/kafka/zookeeper-3.3.6/ //zookeeper的目录
bin/zkServer.sh start
启动kafka
cd .. //接上,kafka的目录
sudo bin/kafka-server-start.sh config/server.properties
以下命令因不涉及到schema-registry,可暂时忽略不考虑
cd ~/workspace/confluent-3.3.0
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties
- 配置sqlserver.properties
之后配置mssql-kafka-connector的相关文件,并在confluent下的etc中创建目录sqlserver,且在该目录下创建sqlserver.properties,其内容如下:
name=connector1
tasks.max=1
connector.class=com.github.jcustenborder.kafka.connect.cdc.mssql.MsSqlSourceConnector
#connector.class=/home/xh/workspace/confluent-3.3.0/share/java/kafka-connect-mssql/kafka-connect-cdc-mssql-0.0.1-SNAPSHOT.jar
# Set these required values
initial.database=Test
server.name=10.19.138.199
password=Xyj123456.
server.port=1433
username=SA
#topicFormat.format=kafka-mssql
change.tracking.tables=dbo.Inventory2 //注意dbo是schemaName,不要写成Test(DatabaseName)
- 启动mssql-kafka-connector
$ cd ~/workspace/confluent-3.3.0
$ bin/connect-standalone etc/kafka/connect-standalone.properties etc/sqlserver/sqlserver.properties
根据错误提示到相应的行去找相应的maven依赖(~/.m2/repository/下) 并把该依赖jar包copy到~/workspace/confluent-3.3.0/share/java/kafka-connect-mssql/下,分别如下:
注意其依赖的父类kafka-connector-cdc也需要下载下来且使用maven编译好,同在该作者的github上。
6.在sql-server中允许database和table的change tracking
参考网址
命令如下:
1>ALTER DATABASE 数据库名 SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)
2>go
1>ALTER TABLE 表名 ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)
2>go
另外会出现一个提示要求你允许数据库snapshot隔离,命令如下:
1>ALTER DATABASE 数据库名 SET ALLOW_SNAPSHOT_ISOLATION ON
2>go
- 使用该mssql-kafka-connector
在sql server客户端插入一条数据,另开一个kafka消费者的端口,可以看到相应的binlog输出
接下来要去测试吞吐量和时延了~Come on!