参考:
最佳实践
rocketmq中,一个消费组内的所有消费者,其订阅关系必须保持一致。
同一个消费组中,不一致的订阅行为有:
- 多个消费者订阅同一个主题的不同tag
- 多个消费者订阅不同的主题
不一致的订阅行为,会导致部分消息丢失。
订阅不同tag
rocket mq的tag级别的消息过滤功能,其实现原理包括两方面:
- 存储模型
- 消费拉取模型
存储模型
rocket mq为每一个topic建立索引,方便消费者按照topic进行消费,其具体实现为ConsumeQueue(消息队列)。consumeQueue的设计极具技巧性,队列中每个条目长度固定,组成为:
- 8字节的commitLog物理偏移量
- 4字节的消息长度
- 8字节的消息tag hashcode
这种设计使得消费队列具备类似数组般随机访问的特性,提高consumeQueue的读取性能。
tag过滤
tag消息过滤时:
- 在consumeQueue中根据tag的hashcode过滤
- 因为不同的tag可能具备相同的hashcode(哈希冲突),所以在消费者拉取到消息后,根据tag值精确比较
消息丢失
为什么同一消费组,多个消费者订阅同一topic不同的tag,会发生消息丢失?
使用集群模式消费时,同一个消费组内的多个消费者共同完成topic的多个consumeQueue消费,一个消费组只会分配到其中某几个consumeQueue,且同一时间,一个consumeQueue只会分配给一个消费组。如图:
问题的核心是:同一个tag的消息会分布在不同的consumeQueue中,而队列的消息只会被一个消费者消费,那么这个consumerQueue上,不同于消费者订阅tag的消息,将不会被消费,造成堆积。
所以,rocketmq中,一个消费组内的消费者,其tag过滤行为必须一致。
订阅不同topic
如下,消费组consumer-group-name
下的两个消费者consumer 1
consumer2
分别订阅了两个不同的topic topic A
与 topic B
:
由于consumerQueue的分配是以消费组为单位的,会均匀分配给消费组下的消费者,而不会在意消费者本身订阅的是哪个主题。
topic A的consumerQueue q4
分配给了consumer2,但是consumer2订阅的是topic B,因此部分topic A的消息得不到消费。
所以,rocketmq中,一个消费组内的消费者,其订阅的topic必须是同一个
总结
得出结论:
rocketmq中,一个消费组内的所有消费者,其订阅关系必须保持一致。