生产者和消费者是消息队列的两个重要角色,生产者向消息队列写人数据,消费者从消息队列里读取数据, RocketMQ 的大部分用户只需要和生产者、消费者打交道。本章具体介绍不同类型生产者和消费者的特点,以及和它们相关的Offset 和Log 。
3.1 不同类型的消费者
根据使用者对读取操作的控制情况,消费者可分为两种类型。一个是DefaultMQPushConsumer ,由系统控制读取操作,收到消息后自动调用传人的处理方法来处理;另一个是DefaultMQPullConsumer ,读取操作中的大部分功能由使用者自主控制。
3.1.1 DefaultMQPushConsumer 的使用
使用DefaultMQPushConsumer 主要是设置好各种参数和传人处理消息的函数。系统收到消息后自动调用处理函数来处理消息,自动保存Offset ,而且加入新的DefaultMQPushConsumer 后会自动做负载均衡。
DefaultMQPushConsumer 需要设置三个参数: 一是这个Consumer 的GroupName ,二是NameServer 的地址和端口号,三是Topic 的名称, 下面将分别进行详细介绍。
1 ) Consumer 的GroupName 用于把多个Consumer 组织到一起, 提高并发处理能力, GroupName 需要和消息模式( MessageModel )配合使用。
RocketMQ 支持两种消息模式: Clustering 和Broadcasting 。
- 在Clustering 模式下,同一个ConsumerGroup ( GroupName 相同) 里的每个Consumer 只消费所订阅消息的一部分内容, 同一个ConsumerGroup里所有的Consumer 消费的内容合起来才是所订阅Topic 内容的整体,从而达到负载均衡的目的。
- 在Broadcasting 模式下,同一个ConsumerGroup 里的每个Consumer 都能消费到所订阅Topic 的全部消息,也就是一个消息会被多次分发,被多个Consumer 消费。
2)NameServer 的地址和端口号,可以填写多个,用分号隔开,达到消除单点故障的目的, 比如“ ip1:port;ip2:port;ip3:port ” 。
3 ) Topic 名称用来标识消息类型, 需要提前创建。如果不需要消费某个Topic 下的所有消息,可以通过指定消息的Tag 进行消息过滤,比如:Consumer. subscribe (”Topic Test”,’tag1 || tag2 || tag3 ”), 表示这个Consumer 要消费“ TopicTest ”下带有tag1或tag2 或tag3 的消息(Tag 是在发送消息时设置的标签) 。在填写Tag 参数的位置,用null 或者“*”表示要消费这个Topic的所有消息。
3.1.2 DefaultMQPushConsumer 的处理流程
DefaultMQPushConsuer 的源码中有很多PullRequest 语句,比如DefaultMQPushConsumerlmpl.this.executePullRequestlmmediately(pullRequest)。为什么“ PushConsumer ”中使用“ PullRequest ”呢? 这是通过“长轮询”方式达到Push 效果的方法,长轮询方式既有Pull 的优点,又兼具Push 方式的实时性。
Push 方式是Server 端接收到消息后,主动把消息推送给Client 端,实时性高。对于一个提供队列服务的Server 来说,用Push 方式主动推送有很多弊端:首先是加大Server 端的工作量,进而影响Server 的性能;其次, Client 的处理能力各不相同, Client 的状态不受Server 控制,如果Client 不能及时处理Server 推送过来的消息,会造成各种潜在问题。
Pull 方式是Client 端循环地从Server 端拉取消息,主动权在Client 手里,自己拉取到一定量消息后,处理妥当了再接着取。Pull 方式的问题是循环拉取消息的间隔不好设定,间隔太短就处在一个“忙等”的状态,浪费资源;每个Pull 的时间间隔太长Server 端有消息到来时有可能没有被及时处理。
“长轮询”方式通过Client 端和Server 端的配合,达到既拥有Pull 的优点,又能达到保证实时性的目的。
从Broker 的源码中可以看出,服务端接到新消息请求后, 如果队列里没有新消息,并不急于返回,通过一个循环不断查看状态,每次waitForRunning一段时间(默认是5 秒), 然后后再Check。默认情况下当Broker 一直没有新消息, 第三次Check 的时候, 等待时间超过Request 里面的BrokerSuspendMaxTimeMillis , 就返回空结果。在等待的过程中, Broker 收到了新的消息后会直接调用notifyMessageArriving 函数返回请求结果。“长轮询”的核心是, Broker 端HOLD 住客户端过来的请求一小段时间,在这个时间内有新消息到达,就利用现有的连接立刻返回消息给Consumer 。“长轮询”的主动权还是掌握在Consumer 手中, Broker即使有大量消息积压,也不会主动推送给Consumer 。
长轮询方式的局限性,是在HOLD 住Consumer 请求的时候需要占用资源,它适合用在消息队列这种客户端连接数可控的场景中。
3.1.4 DefaultMQPullConsumer
使用DefaultMQPullConsumer 像使用DefaultMQPushConsumer 一样需要设置各种参数,写处理消息的函数,同时还需要做额外的事情。
- 获取Message Queue 并遍历
一个Topic 包括多个Message Queue ,如果这个Consumer 需要获取Topic下所有的消息,就要遍历多有的Message Queue 。如果有特殊情况,也可以选择某些特定的Message Queue 来读取消息。 - 维护Offsetstore
从一个Message Queue 里拉取消息的时候,要传人Offset 参数(long 类型的值),随着不断读取消息, Offset 会不断增长。这个时候由用户负责把Offset存储下来,根据具体情况可以存到内存里、写到磁盘或者数据库里等。 - 根据不同的消息状态做不同的处理
拉取消息的请求发出后,会返回: FOUND 、NO_MATCHED_MSG 、NO_NEW_MSG 、OFFSET_ILLEGAL 四种状态,需要根据每个状态做不同的处理。比较重要的两个状态是FOUNT 和NO NEW MSG ,分别表示获取到消息和没有新的消息。
实际情况中可以把while (true)放到外层,达到无限循环的目的。因为Pull Consumer 需要用户自己处理遍历Message Queue 、保存Offset ,所以Pull Consumer 有更多的自主性和灵活性。
3.1.5 Consumer 的启动、关闭流程
消息队列一般是提供一个不间断的持续性服务, Consumer 在使用过程中,如何才能优雅地启动和关闭,确保不漏掉或者重复消费消息呢?
Consumer 分为Push 和l Pull 两种方式,对于Pull Consumer 来说,使用者主动权很高,可以根据实际需要暂停、停止、启动消费过程。需要注意的是Offset 的保存,要在程序的异常处理部分增加把Offset 写人磁盘方面的处理,记准了每个Message Queue 的Offset ,才能保证消息消费的准确性。
DefaultMQPushConsumer 的退出, 要调用shutdown() 函数, 以便释放资源、保存Offset 等。这个调用要加到Consumer 所在应用的退出逻辑中。
Push Consumer 在启动的时候,会做各种配置检查,然后连接NameServer获取Topic 信息,启动时如果遇到异常,比如无法连接NameServer,程序仍然可以正常启动不报错(日志里有WARN 信息) 。在单机环境下可以测试这种情况,启动DefaultMQPushConsumer 时故意把Name Server 地址填错,程序仍然可以正常启动,但是不会收到消息。
为什么DefaultMQPushConsumer 在无法连接NameServer 时不直接报错退出呢? 这和分布式系统的设计有关, RocketMQ 集群可以有多个NameServer 、Broker ,某个机器出异常后整体服务依然可用。所以DefaultMQPushConsumer被设计成当发现某个连接异常时不立刻退出,而是不断尝试重新连接。可以进行这样一个测试,在DefaultMQPushConsumer 正常运行的时候,手动kill 掉Broker 或NameServer ,过一会儿再启动。会发现DefaultMQPushConsumer 不会出错退出,在服务恢复后正常运行,在服务不可用的这段时间,仅仅会在日志里报异常信息。
如果需要在DefaultMQPushConsumer 启动的时候,及时暴露配置问题,该如何操作呢? 可以在Consumer.start()语句后调用: Consumer. fetchSubscribeMessageQueues(”TopicName”) ,这时如果配置信息写得不准确,或者当前服务不可用,这个语句会报MQC!ientException 异常。
3.2 不同类型的生产者
生产者向消息队列里写人消息,不同的业务场景需要生产者采用不同的写人策略。比如同步发送、异步发送、延迟发送、发送事务消息等, 下面具体介绍。
3.2.1 DefaultMQProducer
生产者发送消息默认使用的是DefaultMQProducer 类,发送消息要经过五个步骤:
- 设置Producer 的GroupName 。
- 设置lnstanceName ,当一个Jvm 需要启动多个Producer 的时候,通过设置不同的InstanceName 来区分,不设置的话系统使用默认名称“ DEFAULT ” 。
- 设置发送失败重试次数,当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证不丢消息,可以设置多重试几次。
- 设置NameServer 地址。
- 组装消息并发送。
消息的发送有同步和异步两种方式,上面的使用的是异步方式。在第2 章的例子中用的是同步方式。消息发送的返回状态有如下四种: FLUSH_DISK_TIMEOUT 、FLUSH_SLAVE_TIMEOUT 、SLAVE_NOT_AVAILABLE 、SEND_OK ,不同状态在不同的刷盘策略和同步策略的配置下含义是不同的。
- FLUSH_DISK_TIMEOUT : 表示没有在规定时间内完成刷盘(需要Broker 的刷盘策略设置成SYNC FLUSH 才会报这个错误) 。
- FLUSH_SLAVE_TIMEOUT :表示在主备方式下,并且Broker 被设置成SYNC_MASTER 方式,没有在设定时间内完成主从同步。
- SLAVE_NOT_AVAILABLE : 这个状态产生的场景和FLUSH_SLAVE_TIMEOUT 类似, 表示在主备方式下,并且Broker 被设置成SYNC_MASTER ,但是没有找到被配置成Slave 的Broker 。
- SEND_OK :表示发送成功,发送成功的具体含义,比如消息是否已经被存储到融盘?消息是否被同步到了Slave 上?消息在Slave 上是否被写人磁盘?需要结合所配置的刷盘策略、主从策略来定。这个状态还可以简单理解为,没有发生上面列出的三个问题状态就是SEND_OK 。
写一个高质量的生产者程序,重点在于对发送结果的处理,要充分考虑各种异常,写清对应的处理逻辑。
3.2.2 发送延迟消息
RocketMQ 支持发送延迟消息, Broker 收到这类消息后,延迟一段时间再处理, 使消息在规定的一段时间后生效。
延迟消息的使用方法是在创建Message 对象时,调用setDelayTimeLevel ( intlevel ) 方法设置延迟时间, 然后再把这个消息发送出去。目前延迟的时间不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s /1m/2m/ 3m/4m/5m/6m /
7m /8m/9m/10m /20m /30m /1h /2h) 。比如setDelayTimeLevel(3) 表示延迟10s 。
3.2.3 自定义消息发送规则
一个Topic 会有多个Message Queue ,如果使用Producer 的默认配置,这个Producer 会轮流向各个Message Queue 发送消息。Consumer 在消费消息的时候,会根据负载均衡策略,消费被分配到的Message Queue ,如果不经过特定的设置,某条消息被发往l哪个Message Queue ,被哪个Consumer 消费是未知的。
如果业务需要我们把消息发送到指定的Message Queue 里,比如把同一类型的消息都发往相同的Message Queue , 该怎么办呢? 可以用MessageQueueSelector.
发送消息的时候,把MessageQueueSelector 的对象作为参数,使用public SendResult send ( Message msg, MessageQueueSelector selector, Object arg)函数发送消息即可。在MessageQueueSelector 的实现中,根据传人的Object 参数,或者根据Message 消息内容确定把消息发往那个Message Queue ,返回被选中的Message Queue 。
3.2.4 对事务的支持
RocketMQ 的事务消息,是指发送消息事件和其他事件需要同时成功或同时失败。比如银行转账, A 银行的某账户要转一万元到B 银行的某账户。A 银行发送“B 银行账户增加一万元” 这个消息,要和“从A 银行账户扣除一万元”这个操作同时成功或者同时失败。
RocketMQ 采用两阶段提交的方式实现事务消息, TransactionMQProducer处理上面情况的流程是,先发一个“准备从B 银行账户增加一万元”的消息,发送成功后做从A 银行账户扣除一万元的操作,根据操作结果是否成功,确定之前的“准备从B 银行账户增加一万元”的消息是做commit 还是rollback ,具体流程如下:
- 发送方向RocketMQ 发送“待确认”消息。
- RocketMQ 将收到的“待确认” 消息持久化成功后, 向发送方回复消息已经发送成功,此时第一阶段消息发送完成。
- 发送方开始执行本地事件逻辑。
- 发送方根据本地事件执行结果向RocketMQ 发送二次确认(Commit 或是Rollback) 消息, RocketMQ 收到Commit 状态则将第一阶段消息标记为可投递,订阅方将能够收到该消息;收到Rollback 状态则删除第一阶段的消息,订阅方接收不到该消息。
- 如果出现异常情况,步骤4 提交的二次确认最终未到达RocketMQ,服务器在经过固定时间段后将对“待确认”消息、发起回查请求。
- 发送方收到消息回查请求后(如果发送一阶段消息的Producer 不能工作,回查请求将被发送到和Producer 在同一个Group 里的其他Producer),通过检查对应消息的本地事件执行结果返回Commit 或Roolback 状态。
- RocketMQ 收到回查请求后,按照步骤4的逻辑处理。
上面的逻辑似乎很好地实现了事务消息功能,它也是RocketMQ 之前的版本实现事务消息的逻辑。但是因为RocketMQ 依赖将数据顺序写到磁盘这个特征来提高性能,步骤4却需要更改第一阶段消息的状态,这样会造成磁盘Catch 的脏页过多, 降低系统的性能。所以RocketMQ 在4.x 的版本中将这部分功能去除。系统中的一些上层Class 都还在,用户可以根据实际需求实现自己的事务功能。
3.3 如何存储队列位置信息
实际运行中的系统,难免会遇到重新消费某条消息、跳过一段时间内的消息等情况。这些异常情况的处理,都和Offset 有关。本节主要分析Offset 的存储位置,以及如何根据需要调整Offset 的值。
首先来明确一下Offset 的含义,RocketMQ 中,一种类型的消息会放到一个Topic 里,为了能够并行, 一般一个Topic 会有多个Message Queue(也可以设置成一个),Offset 是指某个Topic下的一条消息在某个Message Queue 里的位置,通过Offset 的值可以定位到这条消息,或者指示Consumer 从这条消息开始向后继续处理。OffsetStore 使用Json 格式存储,简洁明了.
在使用DefaultMQPushConsumer 的时候,我们不用关心OffsetStore 的事,但是如果Pull Consumer ,我们就要自己处理OffsetStore 了。在3.1.4 节的Pull Consumer 示例中, 代码里把Offset 存到了内存,没有持久化存储,这样就可能因为程序的异常或重启而丢失Offset ,在实际应用中不推荐这样做。
了解OffsetStore 的存储机制以后,我们看看如何设置Consumer 读取消息的初始位置。DefaultMQPushConsumer 类里有个函数用来设置从哪儿开始消费消息:比如setConsumeFromWhere(ConsumeFrom Where.CONSUME FROM_FIRST OFFSET),这个语句设置从最小的Offset 开始读取。如果从队列开始到感兴趣的消息之间有很大的范围,用CONSUME_FROM_FIRST_OFFSET 参数就不合适了,可以设置从某个时间开始消费消息, 比如Consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUMEFROM_TIMESTAMP), Consumer.setConsumeTimestamp ("20131223171201 ") , 时间戳格式是精确到秒的。
注意设置读取位置不是每次都有效,它的优先级默认在Offset Store 后面,比如在DefaultMQPushConsumer 的BROADCASTING 方式下,默认是从Broker 里读取某个Topic 对应ConsumerGroup 的Offset , 当读取不到Offset 的时候,ConsumeFrom Where 的设置才生效。大部分情况下这个设置在ConsumerGroup 初次启动时有效。如果Consumer 正常运行后被停止, 然后再启动, 会接着上次的Offset 开始消费, ConsumeFromWhere 的设置元效。
3.4 自定义日志输出
Log 是监控系统状态, 排查问题的重要手段, RocketMQ 的默认Log 存储位置是:${user.home }/Logs/rocketmqLogs, Log 配置文件的设置可以通过JVM启动参数、环境变量、代码中的设置语句这三种方式来配置。
3.5 本章小结
对消息队列使用者来说, Consumer 和Producer 是打交道最多的两个类型。本章详细介绍了两种类型的Consumer 和一种类型的Producer ,用户在使用的时候基于业务需求来选择合适的类型。最后重点介绍了Offset 和Log ,了解Offset 机制是正确使用RocketMQ 的基础, 合理使用Log 可以大幅提高开发、调试的效率。下一章将介绍RocketMQ 的NameServer 模块。