[TOC]
对于kafka消费者来说,我们通常需要监控或者检查一下consumer的状态,除了进程本身的状态和系统状态,最关心的可能就是消费者的消费速度是不是太慢,有没有消息滞后/堆积了,对于高版本的kafka,kafka自身就提供了比较方便的命令,而对于低版本的kafka,可能需要自己通过命令组合查看,或者安装第三方监控应用。
本文提供了高低版本通过命令的方式查看consumer消费状态的方法,供参考。
1. kafka_0.8.x系列
1.1 要求
- 消费者存储offset在zk
- 需要放一个zk运行包到脚本执行的环境,执行时需要zk客户端;即需要将zk的bin加入到PATH
- 需要放一个kafka运行包到脚本执行环境,执行时需要kafka客户端;即需要将kakfa的bin加入到PATH
最简单的做法就是在脚本部署机器上放上zk和kafka的安装包,然后配置好各自的环境变量,也就是要能够直接执行zkCli.sh/kafka-topics.sh等命令
1.2 脚本
#kafka broker节点地址,如果有多个,以逗号隔开
KAFKA_BROKER_LIST=172.21.37.178:39091
#监控的consumer所消费的kafka topic
KAFKA_TOPIC=risk_info
#topic的分区数量
KAFKA_TOPIC_PARTITION_COUNT=12
#zk地址和端口列表,,如果有多个,以逗号隔开
ZOOKEEPER_LIST=172.21.34.92:32181
#consumer的group id
CONSUMER_GROUP_ID=kafka0822_test1
CHECK_RES_FILE=consumer_check.log
echo "ATTENTION: this script is useful for kafka 0.8.x and consumer with zookeeper."
echo "start to fetch kafka consumer status now. please wait for about 10s. result will be put in file $CHECK_RES_FILE"
offsets[0]=0
consumed[0]=0
#kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -2
echo "START----------------------------------------------------------------------------" >> $CHECK_RES_FILE
echo "check kafka latest offset command: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -1" >> $CHECK_RES_FILE
eval $(kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list $KAFKA_BROKER_LIST --topic $KAFKA_TOPIC --time -1|grep -v kafka|grep -v grep|awk -F: '{print "offsets["(FNR-1)"]="$3}')
#echo "kafka latest offset:" >> $CHECK_RES_FILE
#for var in ${offsets[@]};
#do
# echo $var >> $CHECK_RES_FILE
#done
for ((i=0; i<$KAFKA_TOPIC_PARTITION_COUNT; i++))
do
if [ $i == 0 ]; then
echo "check kafka consumed offset command:zkCli.sh -server $ZOOKEEPER_LIST get /consumers/$CONSUMER_GROUP_ID/offsets/$KAFKA_TOPIC/$i|tail -1" >> $CHECK_RES_FILE
fi
consumed[$i]=`zkCli.sh -server $ZOOKEEPER_LIST get /consumers/$CONSUMER_GROUP_ID/offsets/$KAFKA_TOPIC/$i|tail -1`
done
for ((i=0; i<$KAFKA_TOPIC_PARTITION_COUNT; i++))
do
LAG=`expr ${offsets[$i]} - ${consumed[$i]}`
echo "topic:$KAFKA_TOPIC, partition:$i, consumer group:$CONSUMER_GROUP_ID, latest offset:${offsets[$i]}, consumed offset:${consumed[$i]}, lag:$LAG" >> $CHECK_RES_FILE
done
echo "END----------------------------------------------------------------------------" >> $CHECK_RES_FILE
执行时,需要修改的地方为
#kafka 节点host和端口,如果写多个,用逗号隔开
KAFKA_BROKER_LIST=172.21.37.178:39091
#要查看的topic
KAFKA_TOPIC=risk_info
#topic的分区数量,需要确定好
KAFKA_TOPIC_PARTITION_COUNT=12
#zookeeper的host和端口,如果写多个,用逗号隔开
ZOOKEEPER_LIST=172.21.34.92:32181
#consumer group
CONSUMER_GROUP_ID=kafka0822_test1
执行结果为:
结果文件默认保存在kafka_consumer_check_res.log中
cat kafka_consumer_check_res.log
显示:
START----------------------------------------------------------------------------
check kafka latest offset command: kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 172.21.37.178:39091 --topic risk_info --time -1
check kafka consumed offset command:zkCli.sh -server 172.21.34.92:32181 get /consumers/kafka0822_test1/offsets/risk_info/0|tail -1
topic:risk_info, partition:0, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:1, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:2, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:3, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1082, lag:1
topic:risk_info, partition:4, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:5, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1081, lag:2
topic:risk_info, partition:6, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:7, consumer group:kafka0822_test1, latest offset:1083, consumed offset:1082, lag:1
topic:risk_info, partition:8, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:9, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:10, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1081, lag:1
topic:risk_info, partition:11, consumer group:kafka0822_test1, latest offset:1082, consumed offset:1080, lag:2
END----------------------------------------------------------------------------
具体只需要关注结果文件最后的lag值,以确定是否滞后
1.3 检查建议
- 如果每个分区的堆积在两位数以内,且没有增长趋势,则表示consumer没有大问题
- 如果堆积数量不断增长,则需要检查消费者是否有问题
由于命令行的zk客户端执行较慢,因此如果分区较多的时候,在kafka吞吐量高峰时期,靠后的分区检查结果可能显示LAG为负数,这是因为高峰时期offset更新的比较快,我们从kafka里查询offset只在脚本的开头执行了一次,而zk命令的执行过程却要很久(每个分区要执行一次查询请求命令),此时zk里的offset已经被consumer提交成相对更新的,因此出现了二者相减出现了负数
2. kafka 0.9.x系列
具体是不是0.9不记得了,kafka的版本比较多,如果0.9.x不支持,可以继续使用上一节的脚本
命令:
//首先我们需要知道当前有哪些消费者group,如果已知,此步骤可忽略
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --list
bin/kafka-consumer-groups.sh --bootstrap-server BORKER_HOST1:PORT1,BORKER_HSOT2:PORT2 --group GROUP_NAME --describe
- BROKER_HOST是kafka server的ip地址,PORTt是server的监听端口。多个host port之间用逗号隔开
- 第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略
- 第二条命令是查看具体的消费者group的详情信息,需要给出group的名称
例如,首先列出消费者group列表
bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --list
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
console-consumer-89764
console-consumer-45728
我们以console-consumer-8976为例,查看详情
bin/kafka-consumer-groups.sh --bootstrap-server 172.21.37.194:39092 --group console-consumer-89764 --describe
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
tttttttt_topic 0 641 641 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /172.21.37.194 consumer-1
tttttttt_topic 1 632 632 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /172.21.37.194 consumer-1
tttttttt_topic 2 699 699 0 consumer-1-c313db2b-7758-4de0-8cbd-025997d1a4cc /172.21.37.194 consumer-1
其中
- TOPIC:该group里消费的topic名称
- PARTITION:分区编号
- CURRENT-OFFSET:该分区当前消费到的offset
- LOG-END-OFFSET:该分区当前latest offset
- LAG:消费滞后区间,为LOG-END-OFFSET-CURRENT-OFFSET,具体大小需要看应用消费速度和生产者速度,一般过大则可能出现消费跟不上,需要引起应用注意
- CONSUMER-ID:server端给该分区分配的consumer编号
- HOST:消费者所在主机
- CLIENT-ID:消费者id,一般由应用指定