相对于Kafka的生产者API,消费者的API略显繁杂,本文总结了0.11.0版本的kafka消费者的几种消费模式,供大家参考,该版本的所有消费方法均在KafkaConsumer<K,V>中实现。
- 订阅主题(subscribe)消费, 自动提交offset
最简单的消费模式, 参考代码如下 :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
在这种情况下, kafka根据auto.commit.interval.ms
参数周期性自动提交offset, 并将offset存储在自身的broker上.
- 订阅主题(subscribe)消费, 手动提交offset
参考代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
自动提交offset的弊端在于, 系统是根据poll方法, 而不是根据你的业务逻辑来判断消费是否完成, 所以存在着漏数据的可能性. 所以我们需要通过手动提交offset来保证在数据都处理完成后再提交offset, 这样就不会存在漏数据的可能了. 但是有可能存在重复消费, 手动提交offset可以实现"至少一次"的消费语义.
需要注意的是, 这里的consumer.commitSync()
是将当前每个分区的offset全部提交, 如果你想针对针对每个分区进行单独提交, 也可以用commitSync(java.util.Map<TopicPartition,OffsetAndMetadata> offsets)
, 参考代码如下:
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
- 订阅分区(assign)消费, 手动/自动提交offset
在前两种消费模式中, 我们并没有指定分区, 只需要指定消费的主题即可, 因为此时kafka会动态地根据消费者的数量去为消费者匹配合适的分区, 这种动态调整模式一般被称为"再平衡"(rebalance), 一般发生在某个消费组中有新的消费者加入时.
而在这种assign分区模式下, 我们可以为某个消费者指定特定的分区, 从而实现更灵活的消费模式.参考代码如下:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
通过consumer.assign()
指定分区后, 接下来同样循环调用consumer.poll()
即可进行消费了.在这种模式下, kafka不会对该消费者进行动态管理, 因此为了避免offset提交冲突, 官方建议为每个消费者指定唯一的消费组.
- 自定义offset消费(seek)和外部数据源存储offset
在以上三种消费模式中, 不论是指定主题(subscribe)和指定分区(assign), kafka都是根据自身存储的offset来确定消费的起始位置的, 而有的时候我们需要手动指定消费的起始offset, 这个时候可以通过consumer.seek(TopicPartition, Long)
实现, 该方法会对offset进行更新, 并根据更新后的offset进行消费.
外部数据源存储offset
当kafka的broker自身存储offset时, 很难对消费数据处理和提交offset进行事务控制, 所以只能实现 "至多一次" 或者 "至少一次" 的消费语义. 而有的时候我们需要进行 "精准一次" 的消费语义, 这时候就可以将offset保存至外部数据源(例如mysql), 实现逻辑如下
(1) 设置enable.auto.commit=false
(2) 将ConsumerRecord
中的offset和你的数据处理结果一同存入到外部数据源, 并添加事务控制.
(3) 在重启的时候读取offset, 并通过seek(TopicPartition,Long)重置offset
需要注意的是, 在这种消费模式话, 最好采用指定分区(assign)的消费模式, 因为不存在分区的重新分配, 从而避免多个消费者修改同一分区offset导致的数据丢失. 而如果你采用了订阅主题(subscribe)模式, 你可以通过自定义ConsumerRebalanceListener
进行控制, 避免冲突. 这种场景下, 我们在订阅时采用subscribe(Collection, ConsumerRebalanceListener)
, 此时当某个分区从消费者A被重新分配给消费者B时, 消费者A可以通过ConsumerRebalanceListener.onPartitionsRevoked(Collections)
方法来提交offset, 而消费者B可以通过ConsumerRebalanceListener.onPartitionsAssigned(Collections)
方法来查看offset - 消费流控制
在订阅主题模式下, 如果一个消费者被分配了多个分区, 那么该消费者对每个分区的消费优先级都是一致的. 如果我们想对某个分区的消费进行控制, 可以单独采用pause(Collection)
或resume(Collection)
来停止/重启对某些分区的消费. 需要注意的是此方法只对订阅主题模式的消费起作用.
总结:
- kafka的消费API支持订阅主题(
subscribe
)和指定分区(assign
), 同时还可以通过seek
对offset进行重置, 进行灵活的消费控制. - 提交offset的时候, 可以用
consumer.commitSync()
提交所有分区当前poll后的offset, 也可以用consumer.commitSync(offsets)
来手动指定提交的offset, 当然也可以将offset存储在外部数据源, 配合seek
实现 "精准一次" 的消费语义. - 文章内容来源 https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html