写在前面:
我先假设kafka已经安装好
1.opt 目录下创建一个文件夹 /opt/kafka-connect-pak
2.去confluent官网下载最新版confluent tar包,我的是 confluent-6.0.1
3.解压安装包 tar -xvf xxx.tar(里面包含kafka,kafka-connect,zookeeper等等,我们只需要使用kafka-connect和schema-registry)
- cd /opt/kafka-connect-pak/confluent-6.0.1/etc/schema-registry
修改配置文件
配置项&说明 示例
4.1.connect-avro-distributed.properties
配置项 | 说明 | 示例 |
---|---|---|
bootstrap.servers | kafka集群broker地址 | bootstrap.servers=101.120.172.41:9092,101.120.172.42:9092,101.120.172.43:9092 |
group.id | 给定confluent集群一个id名称 | group.id=tcly-dm-kafka-connect-cluster |
key.converter | 选定key的传输格式,建议使用avro | key.converter=io.confluent.connect.avro.AvroConverter |
value.converter | 选定value的传输格式,建议使用avro | key.converter=io.confluent.connect.avro.AvroConverter |
Key.converter.schema.registry.url | Key数据结构存储地址(Schema-registry服务的地址) | key.converter.schema.registry.url=http://101.120.218.111:8081 |
value.converter.schema.registry.url | value数据结构存储地址(Schema-registry服务的地址) | value.converter.schema.registry.url=http://101.120.218.111:8081 |
config.storage.topic | 集群配置存储的topic,建议自定义 | config.storage.topic=tcly-dm-connect-configs |
offset.storage.topic | 集群各个topic偏移量保存的topic,建议自定义 | offset.storage.topic=tcly-dm-connect-offsets |
status.storage.topic | 集群任务状态保存的topic,建议自定义 | status.storage.topic=tcly-dm-connect-statuses |
request.timeout.ms | 请求超时时间,建议配置大一点 | request.timeout.ms =30000 |
plugin.path | 此为增加的插件的路径(直接填为绝对路径) | plugin.path=/opt/kafka-connect-pak/confluent-6.0.1/share/java |
4.2 schema-registry.properties
配置项 | 说明 | 示例 |
---|---|---|
listeners | schema-registry服务的ip和端口 | listeners=http://0.0.0.0:8081 |
kafkastore.bootstrap.servers | Kafka集群地址 | kafkastore.bootstrap.servers=PLAINTEXT://101.120.172.41:9092,PLAINTEXT://101.120.172.42:9092,PLAINTEXT://101.120.172.43:9092 |
kafkastore.topic | 传输的数据结构保存的topic(一旦集群使用,不能更改) | kafkastore.topic=dm_schemas |
5.下载debezium-sqlserver-connect去(https://debezium.io)官网下载
放到此路径下 /opt/kafka-connect-pak/confluent-6.0.1/share/java
直接解压tar包即可
启动schema-registry 注册服务
/opt/kafka-connect-pak/confluent-6.0.1/bin/schema-registry-start /opt/kafka-connect-pak/confluent-6.0.1/etc/schema-registry/schema-registry.properties &启动 kafka connect
/opt/kafka-connect-pak/confluent-6.0.1/bin/connect-distributed /opt/kafka-connect-pak/confluent-6.0.1/etc/schema-registry/connect-avro-distributed.properties &sql server 开启cdc功能,百度教程很多
9.通过rest API注册连接器(这个就不说了,直接参考debezium官网的文档),ps ,有时kafka connect会报 decimal 的 精度异常,连接器配置"decimal.handling.mode":"string",当成String来处理就好了
10.消费实时同步的数据
导包:
<repositories>
<repository>
<id>confluent</id>
<url>http://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.1</version>
</dependency>
百度搜索spring-kafkaListener
消息使用avro解序列化
消息格式参考debezium官网的说明,很详细