为什么要写这个小工具
- 在之前的文章 Kafka重置消费的Offset 介绍过可以利用librdkafka 来写一个重置offset的小工具;
-
librdkafka有个小问题,在当前的版本里作者限制了提交最早的offset, 可以看这个issue: Allow re-Committing offsets;
- 当kafka集群里有一台broker机器坏掉无法修复,对于一个没有复本的topic, 针对这台坏掉的broker上的partition, 将无法继续提交offset, 需要停掉consumer, 重置offset,然后再重启consumer;
- 如果线上有大量这样的topic和对应的consumer, 重启所有consumer不是一个好的办法 :(
获取这个工具
Usage:
--broker_list arg kafka broker list
--topic arg kafka topic name
--group arg consumer group name
--partition_list arg reset partiton list
"":all parition(default value)
Or 1,2,3...
--reset_pos arg (=0) reset paritions to position:
0:earliest
1:latest
原理说明:
- 其实就是利用librdkafka 提供的api来subscribe这个topic, 然后强制提交重置的offset;
- 线上已运行的consumer不需要停止;
- 由于kafka rebalance的特点, 这个工具也不是百分百的每次都有效, 但在我的测试中成功率还是相当高, 相比手动重置再重启consumer要省时省力得多;
进一步改进:
- 这个工具只针对一个topic, 一个group, 由于我们已知是哪台broker坏掉, 因此我们可以扫描出所有有问题的topic的partition和group, 均自动完成offset的重置;