#!/usr/bin/env python
# coding:utf-8
import sys
from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
if __name__ == '__main__':
if (len(sys.argv) < 6):
print("usage <kafkaHost> <kafkaPort> <groupid> <topic> <partition> <offset>")
sys.exit(0)
kafkaHost = sys.argv[1]
kafkaPort = sys.argv[2]
groupid = sys.argv[3]
topic = sys.argv[4]
partition = int(sys.argv[5])
offset = int(sys.argv[6])
# init kafka consumer
consumer = KafkaConsumer(group_id=groupid,
bootstrap_servers='{kafka_host}:{kafka_port}'.format(
kafka_host=kafkaHost, kafka_port=kafkaPort))
# 分配topic and partition
consumer.assign([TopicPartition(topic, partition)])
offsets = {}
meta = consumer.partitions_for_topic(topic)
offsets[TopicPartition(topic, partition)] = OffsetAndMetadata(offset, meta)
consumer.seek(TopicPartition(topic, partition), offset)
consumer.commit(offsets)
kafka 重置offset
©著作权归作者所有,转载或内容合作请联系作者
- 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
- 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
- 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
推荐阅读更多精彩内容
- 为什么要写这个小工具 在之前的文章 Kafka重置消费的Offset 介绍过可以利用librdkafka 来写一个...
- 由图中可以看出,broker1和broker2都有topic A的partition0和partition1分区,...
- Apache Kafka 集群环境搭建 - - ITeye技术网站http://bigcat2013.iteye....
- Kafka 0.9版本正式使用Java版本的producer替换了原Scala版本的producer。 1、Kaf...