Spring for Apache Kafka中消费相关的类的作用和关系

kafka的java client中提供的消费者对象就是KafkaConsumer,但是在Spring for Apache Kafka中消费相关的接口和类有MessageListener、ConsumerFactory、KafkaMessageListenerContainer、ConcurrentMessageListenerContainer、KafkaListenerContainerFactory,在不看文档的情况下可能会被他们之间的关系和作用搞懵。这里把这些内容做一个整理,以防后面再忘掉。

基础的消费方式

我们先来看一看纯java的client怎么用:


微信截图_20210602135515.png

通过initConfig方法初始化配置之后传给KafkaConsumer对象,通过poll方法拉取消息进行消费。由于poll的代码包含在一个while循环内,在当前消息消费完成之后就会再一次拉取消息。注意,这段代码并没有提交offset的逻辑,如何提交offset和何时提交不在这篇文章里讨论范围内。

多线程实现

KafakConsumer是非线程安全的,但并不意味着我们在消费消息的时候只能以单线程的方式执行。如果生产者发送消息的速度大于消费者处理消息的速度,那么就会有越来越多的消息得不到及时的消费,造成了一定的延迟。除此之外,由于Kafka 中消息保留机制的作用,有些消息有可能在被消费之前就被清理了,从而造成消息的丢失。我们可以通过多线程的方式来实现消息消费,多线程的目的就是为了提高整体的消费能力。多线程的实现方式有多种,下面介绍两种。

1. 线程封闭,即为每个线程实例化一个KafkaConsumer对象

微信截图_20210602143442.png

一个线程对应一个KafkaConsumer实例,我们可以称之为消费线程。一个消费线程可以消费一个或多个分区中的消息,所有的消费线程都隶属于同一个消费组。这种实现方式的并发度受限于分区的实际个数。


微信截图_20210602143726.png

2. 将处理消息模块改成多线程的实现方式

微信截图_20210602144520.png
微信截图_20210602144714.png

微信截图_20210602144726.png

代码中的RecordHandler类是用来处理消息的,而KafkaConsumerThread类对应的是一个消费线程,里面通过线程池的方式来调用RecordHandler 处理一批批的消息。注意KafkaConsumerThread类中ThreadPoolExecutor里的最后一个参数设置的是CallerRunsPolicy(),这样可以防止线程池的总体消费能力跟不上poll()拉取的能力,从而导致异常现象的发生。这种实现方式还可以横向扩展,通过开启多个 KafkaConsumerThread 实例来进一步提升整体的消费能力。第二种实现方式相比第一种实现方式而言,除了横向扩展的能力,还可以减少TCP连接对系统资源的消耗,不过缺点就是对于消息的顺序处理就比较困难了。

Spring的方式

message listener

spring的文档中说

You can receive messages by configuring a MessageListenerContainer and providing a message listener or by using the @KafkaListener annotation.

意思是需要配置一个MessageListenerContainer并且提供给其一个message listener才可以进行消费。那么我们来看一下message listener接口的定义:

public interface MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data);

}

public interface AcknowledgingMessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);

}

public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);

}

public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 

    void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

public interface BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data);

}

public interface BatchAcknowledgingMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);

}

public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);

}

public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 

    void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);

}

所有onMessage方法的参数中都有的参数就是ConsumerRecord,单条消息或者是一个列表。可见spring对kafka的消费逻辑进行了细分,而message listener是负责获取到消息之后相应的业务逻辑处理的部分,与消息是如何拉取的无关,类似于前面提到的kafka多线程消费第二种方式代码中的RecrodsHandler,但是message listener并不是一个独立的线程类,它只是供KafkaMessageListenerContainer调用的类,后面会提到。

MessageListenerContainer

message listener需要提供给一个MessageListenerContainer才获取到消息,MessageListenerContainer又分为KafkaMessageListenerContainer和ConcurrentMessageListenerContainer,我们先来看KafkaMessageListenerContainer。

public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)

构造函数的参数列表需要一个ConsumerFactory和一个ContainerProperties,可想而知KafkaMessageListenerContainer用用户提供的ConsumerFactory生成了一个KafkaConsumer负责拉取消息。而ContainerProperties从构造函数来看,主要是负责配置该consumer消费哪写topic和分区。

public ContainerProperties(TopicPartitionOffset... topicPartitions)

public ContainerProperties(String... topics)

public ContainerProperties(Pattern topicPattern)

而创建一个完整的KafkaMessageListenerContainer的代码如下

ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
    ...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
                        new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
                        new KafkaMessageListenerContainer<>(cf, containerProps);
return container;

可以看到通过ContainerProperties的setMessageListener方法为container配置一个listener。

提交消费位置

当配置了kafkaConsumer的enable.auto.commit属性为true之后,kafkaConsumer会负责自动提交offset。当配置为false之后,就由KafkaMessageListenerContainer或者message listener负责提交offset。提交的策略由container的AckMode属性配置:

  • RECORD: Commit the offset when the listener returns after processing the record.
  • BATCH: Commit the offset when all the records returned by the poll() have been processed.
  • TIME: Commit the offset when all the records returned by the poll() have been processed, as long as the ackTime since the last commit has been exceeded.
  • COUNT: Commit the offset when all the records returned by the poll() have been processed, as long as ackCount records have been received since the last commit.
  • COUNT_TIME: Similar to TIME and COUNT, but the commit is performed if either condition is true.
  • MANUAL: The message listener is responsible to acknowledge() the Acknowledgment. After that, the same semantics as BATCH are applied.
  • MANUAL_IMMEDIATE: Commit the offset immediately when the Acknowledgment.acknowledge() method is called by the listener.

    注意,当AckMode配置为MANUAL的时候,offset由meesage listener负责提交。那么MessageListenerContainer、MessageListener、ConsumerFactory之间的关系就理清楚了:MessageListenerContainer通过ConsumerFactory创建并引用了一个KafkaConsumer负责拉取消息,并且将消息提供给MessageListener负责逻辑处理,同时也负责处理提交消费位移的逻辑。
    微信截图_20210602165813.png

ConcurrentMessageListenerContainer

ConcurrentMessageListenerContainer通过委托的方式利用一个或多个KafkaMessageListenerContainer的实例去实现多线程消费。
ConcurrentMessageListenerContainer的构造函数类似于MessageListenerContainer:

public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory, ContainerProperties containerProperties)

ConcurrentMessageListenerContainer有一个控制并发量的属性,如:调用 container.setConcurrency(3)会创建3个KafkaMessageListenerContainer的实例。这种多线程的方式比较类似于我们在之前介绍的多线程消费方式的第一种,如果要实现第二种,可能只能在一个message listener内部实现多线程处理了。

@KafkaListener

@KafkaListener注解是用来指定一个bean的方法当作一个container的listener:

public class Listener {

    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }

}

注意这里listen(String data)方法的参数是String而不是ConsumerRecord<K, V> data。那么要实现这个功能就需要在一个@Configuration类上加上注解@EnableKafka(这是在只使用了spring,而没用spring boot的情况下),并且要配置一个listener container factory, 默认情况下一个名为kafkaListenerContainerFactory的bean是需要的。以下代码展示如何配置一个ConcurrentMessageListenerContainer:

@Configuration
@EnableKafka
public class KafkaConfig {

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }
}

到目前为止,开头提到的MessageListener、ConsumerFactory、KafkaMessageListenerContainer、ConcurrentMessageListenerContainer、KafkaListenerContainerFactory这些类我们都已经提到过,并且也描述了它们的作用。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容