1.功能
读取topic为 __consumer_offsets 里的数据,解析。
2. 代码
public static void main(String[] args) throws Exception {
Consumer<byte[], byte[]> consumer = createKafkaConsumer();
consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
while (true) {
ConsumerRecords<byte[], byte[]> records = consumer.poll(100);
Iterator<ConsumerRecord<byte[], byte[]>> iterator = records.iterator();
Map<String, Integer> map = Maps.newHashMap();
while (iterator.hasNext()) {
ConsumerRecord<byte[], byte[]> record = iterator.next();
if (record.key() == null) {
continue;
}
BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
byte[] value = record.value();
if (value == null) {
continue;
}
OffsetAndMetadata offset = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
if (baseKey instanceof OffsetKey) {
OffsetKey newKey = (OffsetKey) baseKey;
String group = newKey.key().group();
TopicPartition tp = newKey.key().topicPartition();
System.out.println(group + "," + tp.topic() + "," + tp.partition() + "," + offset.offsetMetadata().offset()));
}
}
}
}
static Consumer<byte[], byte[]> createKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test2");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer<byte[], byte[]>(props);
}
3.pom
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>2.0.1</version>
<exclusions>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>