Kafka Connect简介
Kafka Connect是Kafka的开源组件Confluent提供的功能,用于实现Kafka与外部系统的连接。Kafka Connect同时支持分布式模式和单机模式,另外提供了一套完整的REST接口,用于查看和管理Kafka Connectors,还具有offset自动管理,可扩展等优点。
使用Kafka Connect连接Kafka和Elasticsearch
1 测试环境准备
ES服务、Kafka服务、kafka topic:kafka_es_test
2 Kafka Connect 安装
下载地址:https://www.confluent.io/download/
3 Worker配置
1) 配置参考 ,参考如下 :
[通用配置]
Kafka Connect默认使用AvroConverter,使用该AvroConverter时必须先启动Schema Registry服务
2) 实际操作
修改./schema-registry/connect-avro-standalone.properties,配置bootstrap.servers=kafka服务地址
4 Elasticsearch Connector配置
1) 配置参考
[Elasticsearch Configuration Options]
2) 实际操作
对./kafka-connect-elasticsearch/quickstart-elasticsearch.properties做如下修改:
name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=kafka_es_test
key.ignore=true
connection.url=http://ES服务地址
type.name=kafka-connect
注意: 其中topics、kafka中topic名称、Elasticsearch的索引名三者通常一致,也可以通过topic.index.map来设置从topic名到Elasticsearch索引名的映射
5 启动connector
5.1 启动Schema Registry服务
该服务需要指定一个zookeeper地址或Kafka地址,以存储schema数据。具体操作如下:
1) 启动Zookeeper
./bin/zookeeper-server-start-daemon etc/kafka/zookeeper.properties
2) 启动kafka
./bin/kafka-server-start-daemon etc/kafka/server.properties
3) 启动schema Registry
./bin/schema-registry-start-daemon etc/schema-registry/schema-registry.properties
4) 使用netstat -natpl 查看各服务端口是否正常启动 :zookeeper 、kafka 、schema registry各自的地址
5.2 启动Connector
./bin/connect-standalone -daemon etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties
以上启动各服务均可在logs目录下找到对应日志
6 启动Kafka Producer
1) 启动Producer
./bin/kafka-avro-console-producer --broker-list XXXX:9092 --topic kafka_es_test --property value.schema='{"type":"record","name":"person","fields":[{"name":"nickname","type":"string"}]}'
2) 输入如下数据
{"nickname":"michel"}{"nickname":"mushao"}
7 Kibana验证结果
查看索引 ,GET_cat/indices结果可以看到名为kafka_es_test的索引被成功创建
Confluent CLI
CLI目前只是适用于开发阶段,不能用于生产环境。
它可以一键启动包括zookeeper,kafka,schema registry, kafka rest, connect等在内的多个服务;使用CLI启动默认配置为启动Distributed的Connector,需要通过环境变量来修改配置
使用Confluent CLI
confluent CLI提供了丰富的命令,包括服务启动,服务停止,状态查询,日志查看等
1) 启动./bin/confluent start
2) 检查confluent运行状态./bin/confluent status
3) 问题定位,如果第二步出现问题,可以使用log命令查看,如connect未启动成功则./bin/confluent log connect
4) 加载Elasticsearch Connector
a) 查看connector
./bin/confluentlistconnectors
b) 加载Elasticsearch connector
./bin/confluentloadelasticsearch-sink
结果,输出quickstart-elasticsearch.properties配置文件中包含的信息
{"name":"elasticsearch-sink","config": {"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector","tasks.max":"1","topics":"kafka_es_test","key.ignore":"true","connection.url":"http://192.168.0.8:9200","type.name":"kafka-connect","name":"elasticsearch-sink" },"tasks": [],"type":null}
5) 使用producer生产数据,并使用kibana验证是否写入成功
Kafka Connect Rest API
Kafka Connect提供了一套完成的管理Connector的接口,详情参考[Kafka Connect REST Interface]。该接口可以实现对Connector的创建,销毁,修改,查询等操作
1) GET connectors 获取运行中的connector列表
2) POST connectors 使用指定的名称和配置创建connector
3) GET connectors/(string:name) 获取connector的详细信息
4) GET connectors/(string:name)/config 获取connector的配置
5) PUT connectors/(string:name)/config 设置connector的配置
6) GET connectors/(string:name)/status 获取connector状态
7) POST connectors/(stirng:name)/restart 重启connector
8) PUT connectors/(string:name)/pause 暂停connector
9) PUT connectors/(string:name)/resume 恢复connector
10)DELETEconnectors/(string:name)/ 删除connector
11)GETconnectors/(string:name)/tasks 获取connectors任务列表
12)GET/connectors/(string:name)/tasks/(int: taskid)/status获取任务状态
13) POST /connectors/(string:name)/tasks/(int: taskid)/restart 重启任务
14)GET/connector-plugins/ 获取已安装插件列表
15) PUT /connector-plugins/(string:name)/config/validate验证配置
原文链接