kafka的java client中提供的消费者对象就是KafkaConsumer,但是在Spring for Apache Kafka中消费相关的接口和类有MessageListener、ConsumerFactory、KafkaMessageListenerContainer、ConcurrentMessageListenerContainer、KafkaListenerContainerFactory,在不看文档的情况下可能会被他们之间的关系和作用搞懵。这里把这些内容做一个整理,以防后面再忘掉。
基础的消费方式
我们先来看一看纯java的client怎么用:
通过initConfig方法初始化配置之后传给KafkaConsumer对象,通过poll方法拉取消息进行消费。由于poll的代码包含在一个while循环内,在当前消息消费完成之后就会再一次拉取消息。注意,这段代码并没有提交offset的逻辑,如何提交offset和何时提交不在这篇文章里讨论范围内。
多线程实现
KafakConsumer是非线程安全的,但并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,下面介绍两种。
1. 线程封闭,即为每个线程实例化一个KafkaConsumer对象
一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数。
2. 将处理消息模块改成多线程的实现方式
代码中的RecordHandler类是用来处理消息的,而KafkaConsumerThread类对应的是一个消费线程,里面通过线程池的方式来调用RecordHandler 处理一批批的消息。注意KafkaConsumerThread类中ThreadPoolExecutor里的最后一个参数设置的是CallerRunsPolicy(),这样可以防止线程池的总体消费能力跟不上poll()拉取的能力,从而导致异常现象的发生。这种实现方式还可以横向扩展,通过开启多个 KafkaConsumerThread 实例来进一步提升整体的消费能力。第二种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。
Spring的方式
message listener
spring的文档中说
You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.
意思是需要配置一个MessageListenerContainer并且提供给其一个message listener才可以进行消费。那么我们来看一下message listener接口的定义:
public interface MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> {
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
所有onMessage方法的参数中都有的参数就是ConsumerRecord,单条消息或者是一个列表。可见spring对kafka的消费逻辑进行了细分,而message listener是负责获取到消息之后相应的业务逻辑处理的部分,与消息是如何拉取的无关,类似于前面提到的kafka多线程消费第二种方式代码中的RecrodsHandler,但是message listener并不是一个独立的线程类,它只是供KafkaMessageListenerContainer调用的类,后面会提到。
MessageListenerContainer
message listener需要提供给一个MessageListenerContainer才获取到消息,MessageListenerContainer又分为KafkaMessageListenerContainer和ConcurrentMessageListenerContainer,我们先来看KafkaMessageListenerContainer。
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
构造函数的参数列表需要一个ConsumerFactory和一个ContainerProperties,可想而知KafkaMessageListenerContainer用用户提供的ConsumerFactory生成了一个KafkaConsumer负责拉取消息。而ContainerProperties从构造函数来看,主要是负责配置该consumer消费哪写topic和分区。
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
而创建一个完整的KafkaMessageListenerContainer的代码如下
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
可以看到通过ContainerProperties的setMessageListener方法为container配置一个listener。
提交消费位置
当配置了kafkaConsumer的enable.auto.commit属性为true之后,kafkaConsumer会负责自动提交offset。当配置为false之后,就由KafkaMessageListenerContainer或者message listener负责提交offset。提交的策略由container的AckMode属性配置:
- RECORD: Commit the offset when the listener returns after processing the record.
- BATCH: Commit the offset when all the records returned by the poll() have been processed.
- TIME: Commit the offset when all the records returned by the poll() have been processed, as long as the ackTime since the last commit has been exceeded.
- COUNT: Commit the offset when all the records returned by the poll() have been processed, as long as ackCount records have been received since the last commit.
- COUNT_TIME: Similar to TIME and COUNT, but the commit is performed if either condition is true.
- MANUAL: The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as BATCH are applied.
-
MANUAL_IMMEDIATE: Commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.
注意,当AckMode配置为MANUAL的时候,offset由meesage listener负责提交。那么MessageListenerContainer、MessageListener、ConsumerFactory之间的关系就理清楚了:MessageListenerContainer通过ConsumerFactory创建并引用了一个KafkaConsumer负责拉取消息,并且将消息提供给MessageListener负责逻辑处理,同时也负责处理提交消费位移的逻辑。
ConcurrentMessageListenerContainer
ConcurrentMessageListenerContainer通过委托的方式利用一个或多个KafkaMessageListenerContainer的实例去实现多线程消费。
ConcurrentMessageListenerContainer的构造函数类似于MessageListenerContainer:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)
ConcurrentMessageListenerContainer有一个控制并发量的属性,如:调用 container.setConcurrency(3)会创建3个KafkaMessageListenerContainer的实例。这种多线程的方式比较类似于我们在之前介绍的多线程消费方式的第一种,如果要实现第二种,可能只能在一个message listener内部实现多线程处理了。
@KafkaListener
@KafkaListener注解是用来指定一个bean的方法当作一个container的listener:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
注意这里listen(String data)方法的参数是String而不是ConsumerRecord<K, V> data。那么要实现这个功能就需要在一个@Configuration类上加上注解@EnableKafka(这是在只使用了spring,而没用spring boot的情况下),并且要配置一个listener container factory, 默认情况下一个名为kafkaListenerContainerFactory的bean是需要的。以下代码展示如何配置一个ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
到目前为止,开头提到的MessageListener、ConsumerFactory、KafkaMessageListenerContainer、ConcurrentMessageListenerContainer、KafkaListenerContainerFactory这些类我们都已经提到过,并且也描述了它们的作用。