学习大纲
一、消费者和消费组
Kafka消费者是消费组的一部分,当多个消费者形成一个消费组来消费主题时,每个消费者会收到不同分区的消息。假设有一个T1主题,该主题有4个分区,同时我们有一个消费组G1,这个消费组只有一个消费者C1。那么消费者C1将会收到这4个分区的消息,如下所示:
Kafka一个很重要的特性就是,只需写入一次消息,可以支持任意多的应用读取这个消息。换句话说,每个应用都可以读到全量的消息。为了使得每个应用都能读到全量消息,应用需要有不同的消费组。对于上面的例子,假如我们新增了一个新的消费组G2,而这个消费组有两个消费者,那么会是这样的:
二、订阅主题和分区
创建完消费者后我们便可以订阅主题了,只需要通过调用subscribe()方法即可,这个方法接收一个主题列表
KafkaConsumer<String,string> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
另外,我们也可以使用正则表达式来匹配多个主题,而且订阅之后如果又有匹配的新主题,那么这个消费组会立即对其进行消费。正则表达式在连接Kafka与其他系统时非常有用。比如订阅所有的测试主题:
consumer.subscribe(Pattern.compile("jia*"));
指定订阅的分区
//指定订阅的分区
consumer.assign(Arrays.asList(new TopicPartition("topic0701",0)));
也可以通过注解方式进行监听
@KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = KafkaProducer.TOPIC_GROUP1)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
log.info(KafkaProducer.TOPIC_GROUP1+"KafkaConsumer 接收到消息");
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消费了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
三、位移提交
对于Kafka中的分区而言,它的每条消息都有唯一的offset,用来表示消息在分区中的位置。
当我们调用poll()时,该方法会返回我们没有消费的消息。当消息从broker返回消费者时,broker并不跟踪这些消息是否被消费者接收到,Kafka让消费者自身来管理消费的位移,并向消费者提供更新位移的接口,这种更新位移方式称为提交(commit)。
- 自动提交
这种方式让消费者来管理位移,应用本身不需要显式操作。当我们将enable.auto.commit设置为true,那么消费者会在poll方法调用后每隔5秒(由auto.commit.interval.ms指定)提交一次位移。和很多其他操作一样,自动提交也是由poll()方法来驱动的﹔在调用poll()时,消费者判断是否到达提交时间,如果是则提交上一次poll返回的最大位移。
需要注意到,这种方式可能会导致消息重复消费。假如,某个消费者poll消息后,应用正在处理消息,在3秒后Kafka进行了重平衡,那么由于没有更新位移导致重平衡后这部分消息重复消费。 - 同步提交
consumer.commitsync( );//同步提交消费位移
- 异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。
但是异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。举个例子,假如我们发起了一个异步提交commitA,此时的提交位移为2000,随后又发起
了一个异步提交commitB且位移为3000;commitA提交失败但commitB提交成功,此时commitA进行重试并成功的话,会将实际上将已经提交的位移从3000回滚到2000,导致消息重复消费。
四、指定位移消费
到目前为止,我们知道消息的拉取是根据pol()方法中的逻辑来处理的,但是这个方法对于普通开发人员来说就是个黑盒处理,无法精确掌握其消费的起始位置。
seek()方法正好提供了这个功能,让我们得以追踪以前的消费或者回溯消费。
public static void main(string[]args){
Properties props =initConfig();
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
// timeout参数设置多少合适?太短会使分区分配失败,太长又有可能造成一些不必要的等待
consumer.poll (Duration.ofMillis (2000) );
//获取消费者所分配到的分区
Set<TopicPartition> assignment = consumer.assignment();
system.out.println(assignment) ;
for (TopicPartition tp : assignment){
//参数partition表示分区,offset表示指定从分区的哪个位置开始消费
consumer.seek(tp,10);
}
//consumer.seek (new ropicPartition(topic,0),10);
while (true){
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
// consume the record.
for (ConsumerRecord<String,String>record:records){
system.out.println(record.offset() +":" +record.value());
}
}
}
- 指定从分区末尾倒回来消费
//从分区末尾倒回来消费
Map<TopicPartition,Long> offsets = consumer.endoffsets(assignment);
for ( TopicPartition tp : assignment) {
consumer.seek(tp,offsets.get(tp));
}
五、再均衡监听器
再均衡是指分区的所属从一个消费者转移到另外一个消费者的行为,它为消费组具备了高可用性和伸缩性提供了保障,使得我们既方便又安全地删除消费组内的消费者或者往消费组内添加消费者。不过再均衡发生期间,消费者是无法拉取消息的。