1 RocketMQ
1.1 为什么要选RocketMQ
总结一下:
选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度
。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多,其它几种:
-
RabbitMQ
:
优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置
缺点:性能和吞吐量不太理想,不易进行二次开发 -
RocketMQ
:
优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区
缺点:兼容性上不是太好 -
Kafka
:
优点:拥有强大的性能及吞吐量,兼容性很好
缺点:由于“攒一波再处理”导致延迟比较高
1.2 RocketMQ优缺点
RocketMQ
优点:
- 单机吞吐量:十万级
- 可用性:非常高,分布式架构
- 消息可靠性:经过参数优化配置,消息可以做到0丢失
- 功能支持:MQ功能较为完善,还是分布式的,扩展性好
- 支持10亿级别的消息堆积,不会因为堆积导致性能下降
- 源码是Java,方便结合公司自己的业务二次开发
- 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
-
RocketMQ
在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验
RocketMQ
缺点:
- 支持的客户端语言不多,目前是Java及c++,其中c++不成熟
- 没有在
MQ
核心中去实现JMS
等接口,有些系统要迁移需要修改大量代码
1.3 消息模型
1.3.1 消息队列模型
消息队列有两种模型:队列模型
和发布/订阅模型
- 队列模型
这是最初的一种消息队列模型,对应着消息队列发-存-收
的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。
- 发布/订阅模型
如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。解决的方式就是发布/订阅模型。
在发布 - 订阅
模型中,消息的发送方称为发布者(Publisher
),消息的接收方称为订阅者(Subscriber
),服务端存放消息的容器
称为主题(Topic
)。发布者将消息发送到主题中,订阅者在接收消息之前需要先订阅主题
。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
它和 队列模式
的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费
1.3.2 RocketMQ消息模型
RocketMQ
使用的消息模型是标准的发布-订阅
模型,在RocketMQ
的术语表中,生产者、消费者和主题,与发布-订阅模型中的概念是完全一样的。
1.3.3 RocketMQ中成员
RocketMQ
本身的消息是由下面几部分组成:
1.3.3.1 Message
Message
(消息)就是要传输的信息
一条消息必须有一个主题(Topic
),主题可以看做是你的信件要邮寄的地址。
一条消息也可以拥有一个可选的标签(Tag
)和额处的键值对,它们可以用于设置一个业务Key
并在 Broker
上查找此消息以便在开发期间查找问题。
1.3.3.2 Topic
Topic
(主题)可以看做消息的归类,它是消息的第一级类型
。比如一个电商系统可以分为:交易消息
、物流消息
等,一条消息必须有一个 Topic
Topic
与生产者和消费者的关系非常松散,一个 Topic
可以有0个
、1个
、多个生产者
向其发送消息,一个生产者也可以同时向不同的 Topic
发送消息。
一个 Topic
也可以被 0个
、1个
、多个消费者
订阅。
1.3.3.3 Tag
Tag
(标签)可以看作子主题
,它是消息的第二级类型
,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic
而不同的 Tag
来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag
标签有助于保持你的代码干净和连贯,并且还可以为 RocketMQ
提供的查询系统提供帮助。
1.3.3.4 Group
- Consumer Group :
RocketMQ
中,订阅者的概念是通过消费组(Consumer Group
)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被Consumer Group1
消费过,也会再给Consumer Group2
消费。
消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。默认情况,如果一条消息被消费者Consumer1
消费了,那同组的其他消费者就不会再收到这条消息。 - Producer Group :
生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组,一群Topic
相同的Producer
1.3.3.5 Message Queue
Message Queue
(消息队列),一个 Topic
下可以设置多个消息队列,Topic
包括多个 Message Queue
,如果一个 Consumer
需要获取 Topic
下所有的消息,就要遍历所有的 Message Queue
。
RocketMQ
还有一些其它的Queue
——例如ConsumerQueue
在 RocketMQ
中,%RETRY%
和 %DLQ%
是两种特殊类型的队列,主要与消息重试
和死信
处理机制相关。这些队列通常用于处理消费失败的消息,帮助保证消息最终能够被处理或妥善记录。
-
%RETRY%
队列
%RETRY%
(重试队列)用于存储消费失败的消息,目的是在消费失败时,允许消息经过一定的延迟后重新投递给消费者进行重试。每个消费组对应一个%RETRY%
队列,命名为%RETRY%<消费组名>
- 使用场景:
如果消息消费失败,RocketMQ 会自动将消息推送到对应的 %RETRY% 队列中。
消费者消费失败的消息会被放入这个队列中,并在设置的重试间隔后被重新消费。
RETRY 队列是按照一定的策略进行投递的,比如可以通过配置定时重试时间间隔、最大重试次数等。 - 相关配置:
每个消息都会有一个重试次数的计数(通常是RETRY
队列的消息属性)。
重试次数达到上限后,消息将会被投递到%DLQ%
(死信队列)中。 - 特点:
多次重试:消息可以根据配置多次投递,给消费者更多机会进行消费。
延迟投递:重试消息通常会有一定的延迟,避免过快地重新消费导致的雪崩效应。
- 使用场景:
-
%DLQ%
队列
%DLQ%
(死信队列)用于存储消费失败并且重试超过最大次数的消息的队列。死信队列通常用于存储无法正常消费的消息,以便后续进行人工干预或补偿机制。每个消费组对应一个%DLQ%
队列,命名为%DLQ%<消费组名>
- 使用场景:
当消息的消费失败超过了预定的最大重试次数后,RocketMQ 会自动将该消息投递到%DLQ%
(死信队列)。
%DLQ%
队列的消息通常需要人工或其他系统来进行处理,比如人工查看、日志分析、补偿消费等。 - 相关配置:
RocketMQ
允许配置最大重试次数,默认情况下是16次
。如果消费者处理失败超过这个次数,消息就会被转发到死信队列%DLQ%
。
消息在死信队列中的生命周期与正常队列相同,仍然可以被消费者读取,但通常需要手动干预
或采取其他措施
。 - 特点:
消息隔离:死信队列用于隔离那些消费失败的消息,避免这些消息影响到正常的消息消费流。
手动处理:死信队列中的消息通常需要手动干预,可能需要通过日志分析或补偿机制来进行处理。
- 使用场景:
1.3.3.6 Offset
在Topic
的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ
为每个消费组在每个队列上维护一个消费位置(Consumer Offset
),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一
也可以这么说,Queue
是一个长度无限的数组,Offset
就是下标。
1.3.3.7 总结图示
RocketMQ
的消息模型中,这些就是比较关键的概念了
画张图总结一下
1.4 消息的消费模式
消息消费模式有两种:Clustering
(集群消费)和Broadcasting
(广播消费)
默认情况下就是集群消费
,这种模式下一个消费者组
共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
而广播消费消息会发给消费者组中的每一个消费者进行消费。
1.5 RoctetMQ基本架构
先看图,RocketMQ
的基本架构
RocketMQ
一共有四个部分组成:NameServer
,Broker
,Producer 生产者
,Consumer 消费者
,它们对应了:发现
、发
、存
、收
,为了保证高可用,一般每一部分都是集群部署的
类比一下我们生活的邮政系统——
邮政系统要正常运行,离不开下面这四个角色, 一是发信者
,二 是收信者
, 三是负责暂存传输的邮局
, 四是负责协调各个地方邮局的管理机构
。对应到 RocketMQ 中,这四个角色就是 Producer
、 Consumer
、 Broker
、NameServer
1.5.1 NameServer
NameServer
是一个无状态的服务器,角色类似于 Kafka
使用的 Zookeeper
,但比 Zookeeper
更轻量。
特点:
每个 NameServer
结点之间是相互独立,彼此没有任何信息交互。
Nameserver
被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer
在发送消息前从 NameServer
中获取 Topic
的路由信息也就是发往哪个 Broker
,Consumer
也会定时从 NameServer
获取 Topic
的路由信息,Broker
在启动时会向 NameServer
注册,并定时进行心跳连接,且定时同步维护的 Topic
到 NameServer
功能主要有两个:
- 和
Broker
结点保持长连接。 - 维护
Topic
的路由信息。
1.5.2 Broker
消息存储和中转角色,负责存储和转发消息
Broker
内部维护着一个个 Consumer Queue
,用来存储消息的索引,真正存储消息的地方是 CommitLog
(日志文件)
单个 Broker
与所有的 Nameserver
保持着长连接和心跳,并会定时将 Topic
信息同步到 NameServer
,和 NameServer
的通信底层是通过 Netty
实现的。
1.5.3 Producer
消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。
Producer
由用户进行分布式部署,消息由Producer
通过多种负载均衡模式发送到Broker
集群,发送低延时,支持快速失败。
RocketMQ
提供了三种方式发送消息:同步
、异步
和单向
-
同步发送
:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。 -
异步发送
:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。 -
单向发送
:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集
1.5.4 Consumer
消息消费者,负责消费消息,一般是后台系统负责异步消费。
Consumer
也由用户部署,支持PUSH
和PULL
两种消费模式,支持集群消费和广播消费,提供实时的消息订阅机制。
-
Pull
:拉取型消费者(Pull Consumer
)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以Pull
称为主动消费型
-
Push
:推送型消费者(Push Consumer
)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以Push
称为被动消费类型
,但其实从实现上看还是从消息服务器中拉取消息,不同于Pull
的是Push
首先要注册消费监听器,当监听器处触发后才开始消费消息
2 原理
2.1 RocketMQ整体工作流程
简单来说,RocketMQ
是一个分布式消息队列,也就是消息队列
+分布式系统
作为消息队列,它是发-存-收
的一个模型,对应的就是Producer、Broker、Cosumer
;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer
所以我们看一下它主要的工作流程:RocketMQ
由NameServer
注册中心集群、Producer
生产者集群、Consumer
消费者集群和若干Broker
(RocketMQ
进程)组成:
-
Broker
在启动的时候去向所有的NameServer
注册,并保持长连接,每30s发送一次心跳 -
Producer
在发送消息的时候从NameServer
获取Broker
服务器地址,根据负载均衡算法选择一台服务器来发送消息 -
Conusmer
消费消息的时候同样从NameServer
获取Broker
地址,然后主动拉取消息来消费
2.2 为什么RocketMQ不使用Zookeeper作为注册中心
Kafka
我们都知道采用Zookeeper
作为注册中心——当然也开始逐渐去Zookeeper
,RocketMQ
不使用Zookeeper
其实主要可能从这几方面来考虑:
- 基于可用性的考虑
根据CAP
理论,同时最多只能满足两个点,而Zookeeper
满足的是CP
,也就是说Zookeeper
并不能保证服务的可用性,Zookeeper
在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。 - 基于性能的考虑
NameServer
本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper
的写是不可扩展的,Zookeeper
要解决这个问题只能通过划分领域,划分多个Zookeeper
集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP
中的A的设计,导致服务之间是不连通的。 - 持久化的机制来带的问题
ZooKeeper
的ZAB
协议对每一个写请求,会在每个ZooKeeper
节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot
)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。 - 消息发送应该
弱
依赖注册中心
RocketMQ
的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer
获取到Broker
地址后缓存到本地,如果NameServer
整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。
2.3 Broker保存数据(CommitLog,ConsumeQueue,Indexfile)
RocketMQ
主要的存储文件包括CommitLog
文件、ConsumeQueue
文件、Indexfile
文件
-
CommitLog
:消息主体以及元数据的存储主体,存储Producer
端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G
, 文件名长度为20位
,左边补零,剩余为起始偏移量,比如00000000000000000000
代表了第一个文件,起始偏移量为0
,文件大小为1G=1073741824
;当第一个文件写满了,第二个文件为00000000001073741824
,起始偏移量为1073741824
,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
CommitLog
文件保存于${Rocket_Home}/store/commitlog
目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G
,写满后自动生成一个新的文件。
-
ConsumeQueue
:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ
是基于主题topic
的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog
文件中根据topic
检索消息是非常低效的。
Consumer
即可根据ConsumeQueue
来查找待消费的消息。其中,ConsumeQueue
(逻辑消费队列)作为消费消息的索引,保存了指定Topic
下的队列消息在CommitLog
中的起始物理偏移量offset
,消息大小size
和消息Tag
的HashCode
值
ConsumeQueue
文件可以看成是基于Topic
的CommitLog
索引文件,故ConsumeQueue
文件夹的组织方式如下:topic/queue/file
三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
同样ConsumeQueue
文件采取定长设计,每一个条目共20
个字节,分别为8
字节的CommitLog
物理偏移量、4字节的消息长度、8字节tag hashcode
,单个文件由30W
个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue
文件大小约5.72M
;
-
IndexFile
:IndexFile
(索引文件)提供了一种可以通过key
或时间区间来查询消息的方法。Index
文件的存储位置是:{fileName}
,文件名fileName
是以创建时的时间戳命名的,固定的单个IndexFile
文件大小约为400M
,一个IndexFile
可以保存2000W
个索引,IndexFile
的底层存储设计为在文件系统中实现HashMap
结构,故RocketMQ
的索引文件其底层实现为hash
索引
总结一下:RocketMQ
采用的是混合型的存储结构,即为Broker
单个实例下所有的队列共用一个日志数据文件(即为CommitLog
)来存储。
RocketMQ
的混合型存储结构(多个Topic
的消息实体内容都存储于一个CommitLog
中)针对Producer
和Consumer
分别采用了数据
和索引
部分相分离的存储结构,Producer
发送消息至Broker
端,然后Broker
端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog
中。
只要消息被刷盘持久化至磁盘文件CommitLog
中,那么Producer
发送的消息就不会丢失。正因为如此,Consumer
也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker
允许等待30s
的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这里,RocketMQ
的具体做法是,使用Broker
端的后台服务线程—ReputMessageService
不停地分发请求并异步构建ConsumeQueue
(逻辑消费队列)和IndexFile
(索引文件)数据。
2.4 RocketMQ怎么对文件进行读写
RocketMQ
对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache
、顺序读写
、零拷贝
2.4.1 PageCache、顺序读取
在RocketMQ
中,ConsumeQueue
逻辑消费队列存储的数据较少,并且是顺序读取,在page cache
机制的预读取作用下,Consume Queue
文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog
消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO
调度算法,比如设置调度算法为Deadline
(此时块存储采用SSD的话),随机读的性能也会有所提升。
页缓存(PageCache
)是OS
对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS
使用PageCache
机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache
。对于数据的写入,OS
会先写入至Cache
内,随后通过异步的方式由pdflush
内核线程将Cache
内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache
的情况,OS
从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取
2.4.2 零拷贝
RocketMQ
主要通过MappedByteBuffer
对文件进行读写操作。其中,利用了NIO
中的FileChannel
模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap
的方式减少了传统IO
,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ
的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。
什么是零拷贝
在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态
切换
- 从磁盘复制数据到内核态内存;
- 从内核态内存复制到用户态内存;
- 然后从用户态内存复制到网络驱动的内核态内存;
- 最后是从网络驱动的内核态内存复制到网卡中进行传输。
所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O
的性能。零拷贝比较常见的实现方式是mmap
,这种机制在Java
中是通过MappedByteBuffer
实现的。
2.5 消息刷盘怎么实现
RocketMQ
提供了两种刷盘策略:同步刷盘
和异步刷盘
-
同步刷盘
:在消息达到Broker
的内存之后,必须刷到commitLog
日志文件中才算成功,然后返回Producer
数据已经发送成功。 -
异步刷盘
:异步刷盘是指消息达到Broker
内存后就返回Producer
数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog
日志文件中
Broker
在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中
刷盘的最终实现都是使用NIO
中的 MappedByteBuffer.force()
将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker
把消息写到CommitLog
映射区后,就会等待写入完成
异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示。
2.6 RocketMQ的负载均衡
RocketMQ
中的负载均衡都在Client
端完成,具体来说的话,主要可以分为Producer
端发送消息时候的负载均衡和Consumer
端订阅消息的负载均衡。
2.6.1 Producer的负载均衡
Producer
端在发送消息的时候,会先根据Topic
找到指定的TopicPublishInfo
,在获取了TopicPublishInfo
路由信息后,RocketMQ
的客户端在默认方式下selectOneMessageQueue()
方法会从TopicPublishInfo
中的messageQueueList
中选择一个队列(MessageQueue
)进行发送消息。具这里有一个sendLatencyFaultEnable
开关变量,如果开启,在随机递增取模的基础上,再过滤掉not available
的Broker
代理。
Producer负载均衡:索引递增随机取模
public MessageQueue selectOneMessageQueue(){
//索引递增
int index = this.sendWhichQueue.incrementAndGet();
//利用索引取随机数,取余数
int pos = Math.abs(index) % this.messageQueueList.size();
if(pos<0){
pos=0;
}
return this.messageQueueList.get(pos);
}
所谓的latencyFaultTolerance
,是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency
超过550Lms
,就退避3000Lms
;超过1000L
,就退避60000L
;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue
)来发送消息,latencyFaultTolerance
机制是实现消息发送高可用的核心关键所在。
2.6.2 Consumer的负载均衡
在RocketMQ
中,Consumer
端的两种消费模式(Push/Pull
)都是基于拉模式来获取消息的,而在Push
模式只是对pull
模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull
)中,均需要Consumer
端知道从Broker
端的哪一个消息队列中去获取消息。因此,有必要在Consumer
端来做负载均衡,即Broker
端中多个MessageQueue
分配给同一个ConsumerGroup
中的哪些Consumer
消费。
Consumer
端的心跳包发送
在Consumer
启动后,它就会通过定时任务不断地向RocketMQ
集群中的所有Broker
实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker
端在收到Consumer
的心跳消息后,会将它维护在ConsumerManager
的本地缓存变量—consumerTable
,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable
中,为之后做Consumer
端的负载均衡提供可以依据的元数据信息。Consumer
端实现负载均衡的核心类—RebalanceImpl
在Consumer
实例的启动流程中的启动MQClientInstance
实例部分,会完成负载均衡服务线程—RebalanceService
的启动(每隔20s执行一次)。
通过查看源码可以发现,RebalanceService
线程的run()
方法最终调用的是RebalanceImpl
类的rebalanceByTopic()
方法,这个方法是实现Consumer
端负载均衡的核心。
rebalanceByTopic()
方法会根据消费者通信类型为广播模式
还是集群模式
做不同的逻辑处理
2.7 RocketMQ消息长轮询
所谓的长轮询,就是Consumer
拉取消息,如果对应的Queue
如果没有数据,Broker
不会立即返回,而是把 PullReuqest
hold起来,等待 queue
消息后,或者长轮询阻塞时间到了,再重新处理该 queue
上的所有 PullRequest
PullMessageProcessor#processRequest
//如果没有拉到数据
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允许 suspend,默认开启
if (brokerAllowSuspend && hasSuspendFlag) {
long pollingTimeMills = suspendTimeoutMillisLong;
if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
}
String topic = requestHeader.getTopic();
long offset = requestHeader.getQueueOffset();
int queueId = requestHeader.getQueueId();
//封装一个PullRequest
PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
//把PullRequest挂起来
this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
response = null;
break;
}
挂起的请求,有一个服务线程会不停地检查,看queue
中是否有数据,或者超时。
PullRequestHoldService#run()
@Override
public void run() {
log.info("{} service started", this.getServiceName());
while (!this.isStopped()) {
try {
if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
this.waitForRunning(5 * 1000);
} else {
this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
}
long beginLockTimestamp = this.systemClock.now();
//检查hold住的请求
this.checkHoldRequest();
long costTime = this.systemClock.now() - beginLockTimestamp;
if (costTime > 5 * 1000) {
log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
}
} catch (Throwable e) {
log.warn(this.getServiceName() + " service has exception. ", e);
}
}
log.info("{} service end", this.getServiceName());
}