使用场景
- 日常中使用工具替换繁杂的kafka命令
- kafka安装配置依赖了JVM环境,对于需要在线上容器进行连接kafka 以排查问题场景,可以通过快速安装kafka命令行工具来实现。
下面介绍一下kcat
与kafkactl
。
1.kcat(mac 系统为例)
kcat(Kafkacat)是一个基于命令行的工具,用于与Apache Kafka进行交互。它提供了一种简单而灵活的方式来发送和接收消息,并执行其他与Kafka相关的操作。
(1)安装
brew install kcat
(2)发送消息
## 从标准输入读取
... | kcat -b <broker> -t <topic> -p <partition>
## 或者直接发送
kcat -P -b ...
(3)消费消息
## Consumer mode (writes messages to stdout):
kcat -b <broker> -t <topic> -p <partition>
## or:
kcat -C -b ...
(4)示例
启动kafka
zookeeper-server-start /usr/local/etc/zookeeper/zoo.cfg
kafka-server-start /usr/local/etc/kafka/server.properties
创建一个test_topic
kafka-topics --bootstrap-server localhost:9092 --topic test_topic --create
启动一个消费者:
## Consume from all partitions from 'test_topic' topic
kcat -C -b localhost:9092 -t test_topic
发送消息:
echo "hello,world" |kcat -b localhost:9092 -P -t test_topic
[图片上传失败...(image-f68cbc-1719657461306)]
如果是ACL topic加上-X
设置ACL参数
-- 消息发送(message.txt为kafka消息内容)
kcat -b localhost:9092 -P -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=username -X sasl.password=password -t test_topic -l message.txt
-- 消费消息
kcat -b localhost:9092 -P -X security.protocol=sasl_plaintext -X sasl.mechanism=SCRAM-SHA-256 -X sasl.username=username -X sasl.password=password -G consumegroup test_topic
2.kafkactl (ubuntu系统为例)
Kafkactl是一个开源的命令行工具,用于管理和操作Apache Kafka集群。它提供了一组命令和功能,使得与Kafka集群进行交互和管理变得更加方便
(1)安装
- 下载kafkactl_5.0.6_linux_amd64.tar.gz
- 解压
tar -xzf kafkactl_5.0.6_linux_amd64.tar.gz
(2)示例
新建config.yml
contexts:
# default context without SASL
default:
brokers:
- broker1:9092
requestTimeout: 15s
# sasl context as admin
sasl-admin:
brokers:
- broker2:9092
requestTimeout: 15s
sasl:
enabled: true
username: username
password: password
mechanism: scram-sha512
current-context: sasl-admin
消息发送
cat message.json | ./kafkactl produce yourtopic --input-format=json --config-file=./config.yml
or
./kafkactl produce shopee-luckyvideo-labeling-filtered-users-test-co --file=message.json --input-format=json --config-file=./config.yml
如果执行出现报错:can't unmarshal line: unexpected end of JSON input
,留意message.json文件是不是放置了格式化后的json串,需要用compact形式存放。
消息消费:
./kafkactl consume yourtopic --from-beginning --config-file=./config.yml
切换context为default
./kafkactl config use-context default --config-file=config.yml
参考文档
https://github.com/edenhill/kcat
https://github.com/deviceinsight/kafkactl