RocketMQ之原理深入讲解

1 RocketMQ

1.1 为什么要选RocketMQ

image.png

总结一下:
选择中间件的可以从这些维度来考虑:可靠性,性能,功能,可运维行,可拓展性,社区活跃度。目前常用的几个中间件,ActiveMQ作为“老古董”,市面上用的已经不多,其它几种:

  • RabbitMQ
    优点:轻量,迅捷,容易部署和使用,拥有灵活的路由配置
    缺点:性能和吞吐量不太理想,不易进行二次开发
  • RocketMQ
    优点:性能好,高吞吐量,稳定可靠,有活跃的中文社区
    缺点:兼容性上不是太好
  • Kafka
    优点:拥有强大的性能及吞吐量,兼容性很好
    缺点:由于“攒一波再处理”导致延迟比较高

1.2 RocketMQ优缺点

RocketMQ优点:

  • 单机吞吐量:十万级
  • 可用性:非常高,分布式架构
  • 消息可靠性:经过参数优化配置,消息可以做到0丢失
  • 功能支持:MQ功能较为完善,还是分布式的,扩展性好
  • 支持10亿级别的消息堆积,不会因为堆积导致性能下降
  • 源码是Java,方便结合公司自己的业务二次开发
  • 天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况
  • RocketMQ在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验

RocketMQ缺点:

  • 支持的客户端语言不多,目前是Java及c++,其中c++不成熟
  • 没有在MQ核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

1.3 消息模型

1.3.1 消息队列模型

消息队列有两种模型:队列模型发布/订阅模型

  1. 队列模型
    这是最初的一种消息队列模型,对应着消息队列发-存-收的模型。生产者往某个队列里面发送消息,一个队列可以存储多个生产者的消息,一个队列也可以有多个消费者,但是消费者之间是竞争关系,也就是说每条消息只能被一个消费者消费。
    image.png
  1. 发布/订阅模型
    如果需要将一份消息数据分发给多个消费者,并且每个消费者都要求收到全量的消息。很显然,队列模型无法满足这个需求。解决的方式就是发布/订阅模型。
    发布 - 订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先订阅主题。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。
    image.png

它和 队列模式的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。唯一的不同点在于:一份消息数据是否可以被多次消费

1.3.2 RocketMQ消息模型

RocketMQ使用的消息模型是标准的发布-订阅模型,在RocketMQ的术语表中,生产者、消费者和主题,与发布-订阅模型中的概念是完全一样的。

1.3.3 RocketMQ中成员

RocketMQ本身的消息是由下面几部分组成:

image.png

1.3.3.1 Message

Message(消息)就是要传输的信息
一条消息必须有一个主题(Topic),主题可以看做是你的信件要邮寄的地址。
一条消息也可以拥有一个可选的标签(Tag)和额处的键值对,它们可以用于设置一个业务Key 并在 Broker 上查找此消息以便在开发期间查找问题。

1.3.3.2 Topic

Topic(主题)可以看做消息的归类,它是消息的第一级类型。比如一个电商系统可以分为:交易消息物流消息等,一条消息必须有一个 Topic

Topic 与生产者和消费者的关系非常松散,一个 Topic 可以有0个1个多个生产者向其发送消息,一个生产者也可以同时向不同的 Topic 发送消息。
一个 Topic 也可以被 0个1个多个消费者订阅。

1.3.3.3 Tag

Tag(标签)可以看作子主题,它是消息的第二级类型,用于为用户提供额外的灵活性。使用标签,同一业务模块不同目的的消息就可以用相同 Topic 而不同的 Tag 来标识。比如交易消息又可以分为:交易创建消息、交易完成消息等,一条消息可以没有 Tag

标签有助于保持你的代码干净和连贯,并且还可以为 RocketMQ 提供的查询系统提供帮助。

1.3.3.4 Group

  • Consumer Group :
    RocketMQ中,订阅者的概念是通过消费组(Consumer Group)来体现的。每个消费组都消费主题中一份完整的消息,不同消费组之间消费进度彼此不受影响,也就是说,一条消息被Consumer Group1消费过,也会再给Consumer Group2消费。
    消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每个消费者负责消费组内的一部分消息。默认情况,如果一条消息被消费者Consumer1消费了,那同组的其他消费者就不会再收到这条消息。
  • Producer Group :
    生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组,一群Topic相同的Producer

1.3.3.5 Message Queue

Message Queue(消息队列),一个 Topic 下可以设置多个消息队列,Topic 包括多个 Message Queue ,如果一个 Consumer 需要获取 Topic下所有的消息,就要遍历所有的 Message Queue

RocketMQ还有一些其它的Queue——例如ConsumerQueue
RocketMQ 中,%RETRY%%DLQ% 是两种特殊类型的队列,主要与消息重试死信处理机制相关。这些队列通常用于处理消费失败的消息,帮助保证消息最终能够被处理或妥善记录。

  • %RETRY%队列
    %RETRY%(重试队列)用于存储消费失败的消息,目的是在消费失败时,允许消息经过一定的延迟后重新投递给消费者进行重试。每个消费组对应一个 %RETRY% 队列,命名为%RETRY%<消费组名>
    • 使用场景:
      如果消息消费失败,RocketMQ 会自动将消息推送到对应的 %RETRY% 队列中。
      消费者消费失败的消息会被放入这个队列中,并在设置的重试间隔后被重新消费。
      RETRY 队列是按照一定的策略进行投递的,比如可以通过配置定时重试时间间隔、最大重试次数等。
    • 相关配置:
      每个消息都会有一个重试次数的计数(通常是 RETRY 队列的消息属性)。
      重试次数达到上限后,消息将会被投递到 %DLQ%(死信队列)中。
    • 特点:
      多次重试:消息可以根据配置多次投递,给消费者更多机会进行消费。
      延迟投递:重试消息通常会有一定的延迟,避免过快地重新消费导致的雪崩效应。
  • %DLQ%队列
    %DLQ%(死信队列)用于存储消费失败并且重试超过最大次数的消息的队列。死信队列通常用于存储无法正常消费的消息,以便后续进行人工干预或补偿机制。每个消费组对应一个 %DLQ% 队列,命名为%DLQ%<消费组名>
    • 使用场景:
      当消息的消费失败超过了预定的最大重试次数后,RocketMQ 会自动将该消息投递到 %DLQ%(死信队列)。
      %DLQ% 队列的消息通常需要人工或其他系统来进行处理,比如人工查看、日志分析、补偿消费等。
    • 相关配置:
      RocketMQ 允许配置最大重试次数,默认情况下是16次。如果消费者处理失败超过这个次数,消息就会被转发到死信队列 %DLQ%
      消息在死信队列中的生命周期与正常队列相同,仍然可以被消费者读取,但通常需要手动干预采取其他措施
    • 特点:
      消息隔离:死信队列用于隔离那些消费失败的消息,避免这些消息影响到正常的消息消费流。
      手动处理:死信队列中的消息通常需要手动干预,可能需要通过日志分析或补偿机制来进行处理。

1.3.3.6 Offset

Topic的消费过程中,由于消息需要被不同的组进行多次消费,所以消费完的消息并不会立即被删除,这就需要RocketMQ为每个消费组在每个队列上维护一个消费位置(Consumer Offset),这个位置之前的消息都被消费过,之后的消息都没有被消费过,每成功消费一条消息,消费位置就加一

也可以这么说,Queue 是一个长度无限的数组,Offset 就是下标。

1.3.3.7 总结图示

RocketMQ的消息模型中,这些就是比较关键的概念了
画张图总结一下

image.png

1.4 消息的消费模式

消息消费模式有两种:Clustering(集群消费)和Broadcasting(广播消费)

image.png

默认情况下就是集群消费,这种模式下一个消费者组共同消费一个主题的多个队列,一个队列只会被一个消费者消费,如果某个消费者挂掉,分组内其它消费者会接替挂掉的消费者继续消费。
而广播消费消息会发给消费者组中的每一个消费者进行消费。

1.5 RoctetMQ基本架构

先看图,RocketMQ的基本架构

image.png

RocketMQ 一共有四个部分组成:NameServerBrokerProducer 生产者Consumer 消费者,它们对应了:发现,为了保证高可用,一般每一部分都是集群部署的

类比一下我们生活的邮政系统——
邮政系统要正常运行,离不开下面这四个角色, 一是发信者,二 是收信者, 三是负责暂存传输的邮局, 四是负责协调各个地方邮局的管理机构。对应到 RocketMQ 中,这四个角色就是 ProducerConsumerBrokerNameServer

image.png

1.5.1 NameServer

NameServer 是一个无状态的服务器,角色类似于 Kafka使用的 Zookeeper,但比 Zookeeper 更轻量。

特点:
每个 NameServer 结点之间是相互独立,彼此没有任何信息交互。
Nameserver 被设计成几乎是无状态的,通过部署多个结点来标识自己是一个伪集群,Producer 在发送消息前从 NameServer中获取 Topic 的路由信息也就是发往哪个 BrokerConsumer 也会定时从 NameServer获取 Topic的路由信息,Broker 在启动时会向 NameServer 注册,并定时进行心跳连接,且定时同步维护的 TopicNameServer
功能主要有两个:

  • Broker 结点保持长连接。
  • 维护 Topic 的路由信息。

1.5.2 Broker

消息存储和中转角色,负责存储和转发消息

Broker 内部维护着一个个 Consumer Queue,用来存储消息的索引,真正存储消息的地方是 CommitLog(日志文件)

单个 Broker 与所有的 Nameserver 保持着长连接和心跳,并会定时将 Topic 信息同步到 NameServer,和 NameServer 的通信底层是通过 Netty 实现的。

1.5.3 Producer

消息生产者,业务端负责发送消息,由用户自行实现和分布式部署。

Producer由用户进行分布式部署,消息由Producer通过多种负载均衡模式发送到Broker集群,发送低延时,支持快速失败。
RocketMQ 提供了三种方式发送消息:同步异步单向

  • 同步发送:同步发送指消息发送方发出数据后会在收到接收方发回响应之后才发下一个数据包。一般用于重要通知消息,例如重要通知邮件、营销短信。
  • 异步发送:异步发送指发送方发出数据后,不等接收方发回响应,接着发送下个数据包,一般用于可能链路耗时较长而对响应时间敏感的业务场景,例如用户视频上传后通知启动转码服务。
  • 单向发送:单向发送是指只负责发送消息而不等待服务器回应且没有回调函数触发,适用于某些耗时非常短但对可靠性要求并不高的场景,例如日志收集

1.5.4 Consumer

消息消费者,负责消费消息,一般是后台系统负责异步消费。

Consumer也由用户部署,支持PUSHPULL两种消费模式,支持集群消费和广播消费,提供实时的消息订阅机制。

  • Pull:拉取型消费者(Pull Consumer)主动从消息服务器拉取信息,只要批量拉取到消息,用户应用就会启动消费过程,所以 Pull 称为主动消费型
  • Push:推送型消费者(Push Consumer)封装了消息的拉取、消费进度和其他的内部维护工作,将消息到达时执行的回调接口留给用户应用程序来实现。所以 Push 称为被动消费类型,但其实从实现上看还是从消息服务器中拉取消息,不同于 Pull 的是 Push 首先要注册消费监听器,当监听器处触发后才开始消费消息

2 原理

2.1 RocketMQ整体工作流程

简单来说,RocketMQ是一个分布式消息队列,也就是消息队列+分布式系统

作为消息队列,它是发-存-收的一个模型,对应的就是Producer、Broker、Cosumer;作为分布式系统,它要有服务端、客户端、注册中心,对应的就是Broker、Producer/Consumer、NameServer

所以我们看一下它主要的工作流程:RocketMQNameServer注册中心集群、Producer生产者集群、Consumer消费者集群和若干BrokerRocketMQ进程)组成:

  1. Broker在启动的时候去向所有的NameServer注册,并保持长连接,每30s发送一次心跳
  2. Producer在发送消息的时候从NameServer获取Broker服务器地址,根据负载均衡算法选择一台服务器来发送消息
  3. Conusmer消费消息的时候同样从NameServer获取Broker地址,然后主动拉取消息来消费
    image.png

2.2 为什么RocketMQ不使用Zookeeper作为注册中心

Kafka我们都知道采用Zookeeper作为注册中心——当然也开始逐渐去ZookeeperRocketMQ不使用Zookeeper其实主要可能从这几方面来考虑:

  1. 基于可用性的考虑
    根据CAP理论,同时最多只能满足两个点,而Zookeeper满足的是CP,也就是说Zookeeper并不能保证服务的可用性,Zookeeper在进行选举的时候,整个选举的时间太长,期间整个集群都处于不可用的状态,而这对于一个注册中心来说肯定是不能接受的,作为服务发现来说就应该是为可用性而设计。
  2. 基于性能的考虑
    NameServer本身的实现非常轻量,而且可以通过增加机器的方式水平扩展,增加集群的抗压能力,而Zookeeper的写是不可扩展的,Zookeeper要解决这个问题只能通过划分领域,划分多个Zookeeper集群来解决,首先操作起来太复杂,其次这样还是又违反了CAP中的A的设计,导致服务之间是不连通的。
  3. 持久化的机制来带的问题
    ZooKeeperZAB 协议对每一个写请求,会在每个ZooKeeper节点上保持写一个事务日志,同时再加上定期的将内存数据镜像(Snapshot)到磁盘来保证数据的一致性和持久性,而对于一个简单的服务发现的场景来说,这其实没有太大的必要,这个实现方案太重了。而且本身存储的数据应该是高度定制化的。
  4. 消息发送应该依赖注册中心
    RocketMQ的设计理念也正是基于此,生产者在第一次发送消息的时候从NameServer获取到Broker地址后缓存到本地,如果NameServer整个集群不可用,短时间内对于生产者和消费者并不会产生太大影响。

2.3 Broker保存数据(CommitLog,ConsumeQueue,Indexfile)

RocketMQ主要的存储文件包括CommitLog文件、ConsumeQueue文件、Indexfile文件

image.png

image.png
  • CommitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。
    CommitLog文件保存于${Rocket_Home}/store/commitlog目录中,从图中我们可以明显看出来文件名的偏移量,每个文件默认1G,写满后自动生成一个新的文件。

    image.png

  • ConsumeQueue:消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。
    Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息TagHashCode
    ConsumeQueue文件可以看成是基于TopicCommitLog索引文件,故ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}
    同样ConsumeQueue文件采取定长设计,每一个条目共20个字节,分别为8字节的CommitLog物理偏移量、4字节的消息长度、8字节tag hashcode,单个文件由30W个条目组成,可以像数组一样随机访问每一个条目,每个ConsumeQueue文件大小约5.72M

    image.png

  • IndexFileIndexFile(索引文件)提供了一种可以通过key或时间区间来查询消息的方法。Index文件的存储位置是:{fileName},文件名fileName是以创建时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile可以保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故RocketMQ的索引文件其底层实现为hash索引

    image.png

总结一下:RocketMQ采用的是混合型的存储结构,即为Broker单个实例下所有的队列共用一个日志数据文件(即为CommitLog)来存储。

RocketMQ的混合型存储结构(多个Topic的消息实体内容都存储于一个CommitLog中)针对ProducerConsumer分别采用了数据索引部分相分离的存储结构,Producer发送消息至Broker端,然后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。

只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正因为如此,Consumer也就肯定有机会去消费这条消息。当无法拉取到消息后,可以等下一次消息拉取,同时服务端也支持长轮询模式,如果一个消息拉取请求未拉取到消息,Broker允许等待30s的时间,只要这段时间内有新消息到达,将直接返回给消费端。
这里,RocketMQ的具体做法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据。

image.png

2.4 RocketMQ怎么对文件进行读写

RocketMQ对文件的读写巧妙地利用了操作系统的一些高效文件读写方式——PageCache顺序读写零拷贝

2.4.1 PageCache、顺序读取

RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为Deadline(此时块存储采用SSD的话),随机读的性能也会有所提升。

页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取

2.4.2 零拷贝

RocketMQ主要通过MappedByteBuffer对文件进行读写操作。其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO,将磁盘文件数据在操作系统内核地址空间的缓冲区,和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

什么是零拷贝
在操作系统中,使用传统的方式,数据需要经历几次拷贝,还要经历用户态/内核态切换

  1. 从磁盘复制数据到内核态内存;
  2. 从内核态内存复制到用户态内存;
  3. 然后从用户态内存复制到网络驱动的内核态内存;
  4. 最后是从网络驱动的内核态内存复制到网卡中进行传输。
image.png

所以,可以通过零拷贝的方式,减少用户态与内核态的上下文切换和内存拷贝的次数,用来提升I/O的性能。零拷贝比较常见的实现方式是mmap,这种机制在Java中是通过MappedByteBuffer实现的。

image.png

2.5 消息刷盘怎么实现

RocketMQ提供了两种刷盘策略:同步刷盘异步刷盘

  • 同步刷盘:在消息达到Broker的内存之后,必须刷到commitLog日志文件中才算成功,然后返回Producer数据已经发送成功。
  • 异步刷盘:异步刷盘是指消息达到Broker内存后就返回Producer数据已经发送成功,会唤醒一个线程去将数据持久化到CommitLog日志文件中

Broker在消息的存取时直接操作的是内存(内存映射文件),这可以提供系统的吞吐量,但是无法避免机器掉电时数据丢失,所以需要持久化到磁盘中

刷盘的最终实现都是使用NIO中的 MappedByteBuffer.force() 将映射区的数据写入到磁盘,如果是同步刷盘的话,在Broker把消息写到CommitLog映射区后,就会等待写入完成
异步而言,只是唤醒对应的线程,不保证执行的时机,流程如图所示。

image.png

2.6 RocketMQ的负载均衡

RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡。

2.6.1 Producer的负载均衡

Producer端在发送消息的时候,会先根据Topic找到指定的TopicPublishInfo,在获取了TopicPublishInfo路由信息后,RocketMQ的客户端在默认方式下selectOneMessageQueue()方法会从TopicPublishInfo中的messageQueueList中选择一个队列(MessageQueue)进行发送消息。具这里有一个sendLatencyFaultEnable开关变量,如果开启,在随机递增取模的基础上,再过滤掉not availableBroker代理。

Producer负载均衡:索引递增随机取模
public MessageQueue selectOneMessageQueue(){
    //索引递增
    int index = this.sendWhichQueue.incrementAndGet();
    //利用索引取随机数,取余数
    int pos = Math.abs(index) % this.messageQueueList.size();
    if(pos<0){
        pos=0;  
    }
    return this.messageQueueList.get(pos);
}

所谓的latencyFaultTolerance,是指对之前失败的,按一定的时间做退避。例如,如果上次请求的latency超过550Lms,就退避3000Lms;超过1000L,就退避60000L;如果关闭,采用随机递增取模的方式选择一个队列(MessageQueue)来发送消息,latencyFaultTolerance机制是实现消息发送高可用的核心关键所在。

2.6.2 Consumer的负载均衡

RocketMQ中,Consumer端的两种消费模式(Push/Pull)都是基于拉模式来获取消息的,而在Push模式只是对pull模式的一种封装,其本质实现为消息拉取线程在从服务器拉取到一批消息后,然后提交到消息消费线程池后,又“马不停蹄”的继续向服务器再次尝试拉取消息。如果未拉取到消息,则延迟一下又继续拉取。在两种基于拉模式的消费方式(Push/Pull)中,均需要Consumer端知道从Broker端的哪一个消息队列中去获取消息。因此,有必要在Consumer端来做负载均衡,即Broker端中多个MessageQueue分配给同一个ConsumerGroup中的哪些Consumer消费。

  1. Consumer端的心跳包发送
    Consumer启动后,它就会通过定时任务不断地向RocketMQ集群中的所有Broker实例发送心跳包(其中包含了,消息消费分组名称、订阅关系集合、消息通信模式和客户端id的值等信息)。Broker端在收到Consumer的心跳消息后,会将它维护在ConsumerManager的本地缓存变量—consumerTable,同时并将封装后的客户端网络通道信息保存在本地缓存变量—channelInfoTable中,为之后做Consumer端的负载均衡提供可以依据的元数据信息。

  2. Consumer端实现负载均衡的核心类—RebalanceImpl
    Consumer实例的启动流程中的启动MQClientInstance实例部分,会完成负载均衡服务线程—RebalanceService的启动(每隔20s执行一次)。
    通过查看源码可以发现,RebalanceService线程的run()方法最终调用的是RebalanceImpl类的rebalanceByTopic()方法,这个方法是实现Consumer端负载均衡的核心。
    rebalanceByTopic()方法会根据消费者通信类型为广播模式还是集群模式做不同的逻辑处理

2.7 RocketMQ消息长轮询

所谓的长轮询,就是Consumer拉取消息,如果对应的Queue如果没有数据,Broker不会立即返回,而是把 PullReuqest hold起来,等待 queue消息后,或者长轮询阻塞时间到了,再重新处理该 queue 上的所有 PullRequest

image.png

PullMessageProcessor#processRequest

 //如果没有拉到数据
case ResponseCode.PULL_NOT_FOUND:
// broker 和 consumer 都允许 suspend,默认开启
if (brokerAllowSuspend && hasSuspendFlag) {
    long pollingTimeMills = suspendTimeoutMillisLong;
    if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
          pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
     }

    String topic = requestHeader.getTopic();
    long offset = requestHeader.getQueueOffset();
    int queueId = requestHeader.getQueueId();
    //封装一个PullRequest
    PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
    this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
   //把PullRequest挂起来
   this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
     response = null;
     break;
}

挂起的请求,有一个服务线程会不停地检查,看queue中是否有数据,或者超时。

PullRequestHoldService#run()

@Override
public void run() {
   log.info("{} service started", this.getServiceName());
   while (!this.isStopped()) {
       try {
          if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
                 this.waitForRunning(5 * 1000);
          } else {
                    this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
                }

         long beginLockTimestamp = this.systemClock.now();
         //检查hold住的请求
         this.checkHoldRequest();
         long costTime = this.systemClock.now() - beginLockTimestamp;
         if (costTime > 5 * 1000) {
              log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
         }
        } catch (Throwable e) {
            log.warn(this.getServiceName() + " service has exception. ", e);
            }
        }
    log.info("{} service end", this.getServiceName());
    }

转载于:https://mp.weixin.qq.com/s/2TeUYAodDKG0gvuuVzAdPw

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容