RocketMQ4.X
JMS
Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口
- JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API
- 使用场景:
- 核心应用
- 解耦:订单系统-》物流系统
- 异步:用户注册-》发送邮件,初始化信息
- 削峰:秒杀、日志处理
- 跨平台 、多语言
- 分布式事务、最终一致性
- RPC调用上下游对接,数据源变动->通知下属
- 核心应用
消息中间件常见概念和编程模型
- 常见概念
- JMS提供者:连接面向消息中间件的,JMS接口的一个实现,RocketMQ,ActiveMQ,Kafka等等
- JMS生产者(Message Producer):生产消息的服务
- JMS消费者(Message Consumer):消费消息的服务
- JMS消息:数据对象
- JMS队列:存储待消费消息的区域
- JMS主题:一种支持发送消息给多个订阅者的机制
- JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)
- 基础编程模型
- MQ中需要用的一些类
- ConnectionFactory :连接工厂,JMS 用它创建连接
- Connection :JMS 客户端到JMS Provider 的连接
- Session: 一个发送或接收消息的线程
- Destination :消息的目的地;消息发送给谁.
- MessageConsumer / MessageProducer: 消息消费者,消息生产者
主流消息队列和技术选型讲解
-
ActiveMQ:http://activemq.apache.org/
- Apache出品,历史悠久,支持多种语言的客户端和协议,支持多种语言Java, .NET, C++ 等,基于JMS Provider的实现
缺点:吞吐量不高,多队列的时候性能下降,存在消息丢失的情况,比较少大规模使用
-
Kafka:http://kafka.apache.org/
- 是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理大规模的网站中的所有动作流数据(网页浏览,搜索和其他用户的行动),副本集机制,实现数据冗余,保障数据尽量不丢失;支持多个生产者和消费者
缺点:不支持批量和广播消息,运维难度大,文档比较少, 需要掌握Scala
-
RabbitMQ:http://www.rabbitmq.com/
- 是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
缺点:使用Erlang开发,阅读和修改源码难度大
-
RocketMQ:http://rocketmq.apache.org/
- 阿里开源的一款的消息中间件, 纯Java开发,具有高吞吐量、高可用性、适合大规模分布式系统应用的特点, 性能强劲(零拷贝技术),支持海量堆积, 支持指定次数和时间间隔的失败消息重发,支持consumer端tag过滤、延迟消息等,在阿里内部进行大规模使用,适合在电商,互联网金融等领域使用
RocketMQ4.x消息队列介绍
Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件
-
特点
- 支持Broker和Consumer端消息过滤
- 支持发布订阅模型,和点对点,
- 支持拉pull和推push两种消息模式
- 单一队列百万消息、亿级消息堆积
- 支持单master节点,多master节点,多master多slave节点
- 任意一点都是高可用,水平拓展,Producer、Consumer、队列都可以分布式
- 消息失败重试机制、支持特定level的定时消息
- 新版本底层采用Netty
- 4.3.x支持分布式事务
- 适合金融类业务,高可用性跟踪和审计功能。
-
概念
Producer:消息生产者
Producer Group:消息生产者组,发送同类消息的一个消息生产组
Consumer:消费者
Consumer Group:消费同类消息的多个实例
Tag:标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务的消息
-
Topic:主题, 如订单类消息,queue是消息的物理管理单位,而topic是逻辑管理单位。一个topic下可以有多个queue,
默认自动创建是4个,手动创建是8个
Message:消息,每个message必须指定一个topic
Broker:MQ程序,接收生产的消息,提供给消费者消费的程序
Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现、路由、元数据信息,可以多个部署,互相独立(比zookeeper更轻量)
Offset: 偏移量,可以理解为消息进度
commit log: 消息存储会写在Commit log文件里面
Springboot2.X整合RocketMQ4.X
-
导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.4.0</version> </dependency>
-
实例
配置信息
public class PayConfig { // nameServer地址,多个可以用;分开 public static final String NAME_SERVER_ADDR = "139.224.101.91:9876"; // 指定topic // 通过命令查看 ./bin/mqbroker -m // autoCreateTopicEnable=true 则自动创建topic public static final String PAY_TOPIC = "pay_test_topic"; }
生成者
import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; @Component public class PayProducer { private String producerGroup = "pay_producer_group"; private DefaultMQProducer producer; /** * 获取DefaultMQProducer */ public DefaultMQProducer getProducer() { return this.producer; } /** * 无参构造 */ public PayProducer() { producer = new DefaultMQProducer(producerGroup); // 指定nameServer地址 producer.setNamesrvAddr(PayConfig.NAME_SERVER_ADDR); start(); } /** * 对象在使用之前必须要调用一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 一般在应用上下文,使用上下文监听器进行关闭 */ public void shutdown() { this.producer.shutdown(); } }
消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; @Component public class PayConsumer { private String consumerGroup = "pay_consumer_group"; private DefaultMQPushConsumer consumer; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(consumerGroup); consumer.setNamesrvAddr(PayConfig.NAME_SERVER_ADDR); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(PayConfig.PAY_TOPIC, "*"); // 注册监听 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { Message msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; }catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
访问入口
import cn.net.rocketmq.jms.PayConfig; import cn.net.rocketmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class PayController { @Resource PayProducer payProducer; @GetMapping("/payMess") public String payMessage(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { Message message = new Message(PayConfig.PAY_TOPIC, "taga", ("rocketMq" + text).getBytes()); SendResult send = payProducer.getProducer().send(message); System.out.println("SendResult==" + send); return "ok"; } }
结果
SendResult==SendResult [sendStatus=SEND_OK, msgId=C0A80B6E292018B4AAC27FB005A20000, offsetMsgId=8BE0655B00002A9F0000000000057F7B, messageQueue=MessageQueue [topic=pay_test_topic, brokerName=broker-a, queueId=2], queueOffset=0] ConsumeMessageThread_4 Receive New Messages: rocketMq5555 topic=pay_test_topic, tags=taga, keys=null, msg=rocketMq5555
生产者核心配置及核心知识
核心配置
type | desc |
---|---|
compressMsgBodyOverHowmuch | 消息超过默认字节4096后进行压缩 |
retryTimesWhenSendFailed | 失败重发次数 |
maxMessageSize | 最大消息配置,默认128k |
topicQueueNums | 主题下面的队列数量,默认是4 |
autoCreateTopicEnable | 是否自动创建主题Topic, 开发建议为true,生产要为false |
defaultTopicQueueNums | 自动创建服务器不存在的topic,默认创建的队列数 |
autoCreateSubscriptionGroup | 是否允许 Broker 自动创建订阅组,建议线下开发开启,线上关闭 |
brokerClusterName | 集群名称 |
brokerId | 0表示Master主节点 大于0表示从节点 |
brokerIP1 | Broker服务地址 |
brokerRole | broker角色 ASYNC_MASTER/ SYNC_MASTER/ SLAVE |
deleteWhen | 每天执行删除过期文件的时间,默认每天凌晨4点 |
flushDiskType | 刷盘策略, 默认为 ASYNC_FLUSH(异步刷盘), 另外是SYNC_FLUSH(同步刷盘) |
listenPort | Broker监听的端口号 |
mapedFileSizeCommitLog | 单个conmmitlog文件大小,默认是1GB |
mapedFileSizeConsumeQueue | ConsumeQueue每个文件默认存30W条,可以根据项目调整 |
storePathRootDir | 存储消息以及一些配置信息的根目录 默认为用户的 ${HOME}/store |
storePathCommitLog | commitlog存储目录默认为${storePathRootDir}/commitlog |
storePathIndex | 消息索引存储路径 |
syncFlushTimeout | 同步刷盘超时时间 |
diskMaxUsedSpaceRatio | 检测可用的磁盘空间大小,超过后会写入报错 |
核心知识
消息发送状态
status | desc |
---|---|
FLUSH_DISK_TIMEOUT | 没有在规定时间内完成刷盘 (刷盘策略需要为SYNC_FLUSH 才会出这个错误) |
FLUSH_SLAVE_TIMEOUT | 主从模式下,broker是SYNC_MASTER, 没有在规定时间内完成主从同步 |
SLAVE_NOT_AVAILABLE | 主从模式下,broker是SYNC_MASTER, 但是没有找到被配置成Slave的Broker |
SEND_OK | 发送成功,没有发生上面的三种问题 |
// 源码
package org.apache.rocketmq.client.producer;
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
消息发送模式
发送方式 | 发送 TPS | 发送结果反馈 | 可靠性 |
---|---|---|---|
同步发送 | 快 | 有 | 不丢失 |
异步发送 | 快 | 有 | 不丢失 |
单向发送 | 最快 | 无 | 可能丢失 |
-
同步发送
场景:重要通知邮件、报名短信通知、营销短信系统等
官网:http://rocketmq.apache.org/docs/simple-example/
样例:
public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 同步发送 DefaultMQProducer mqProducer = payProducer.getProducer(); Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes()); SendResult sendResult = mqProducer.send(msg); System.out.println(sendResult); System.out.println(mqProducer.toString()); return "Success !!!"; }
-
异步发送
场景:对RT时间敏感,可以支持更高的并发,回调成功触发相对应的业务,比如注册成功后通知积分系统发放优惠券
官网:http://rocketmq.apache.org/docs/simple-example/
样例:
public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 异步发送 DefaultMQProducer mqProducer = payProducer.getProducer(); Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes()); mqProducer.send(msg, new SendCallback() { // 成功回调 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } // 异常处理 @Override public void onException(Throwable e) { // 人为补偿机制 e.printStackTrace(); } }); return "Success !!!"; }
-
单向发送(OneWay)
场景:主要是日志收集,适用于某些耗时非常短,但对可靠性要求并不高的场景, 也就是LogServer, 只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答
官网:https://rocketmq.apache.org/docs/simple-example/
样例:
public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { // 单向发送 DefaultMQProducer mqProducer = payProducer.getProducer(); Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes()); mqProducer.sendOneway(msg); return "Success !!!"; }
消息重试及处理
-
生产者Producer重试(异步和单向下配置无效)
- 消息重投(保证数据的高可靠性),本身内部支持重试,默认次数是2,
- 如果网络情况比较差,或者跨集群则建改多几次
public PayProducer() { this.producer = new DefaultMQProducer(this.payProducerGroup); // 也可以在发送具体消息时设置者重试次数 // DefaultMQProducer mqProducer = payProducer.getProducer(); // mqProducer.setRetryTimesWhenSendFailed(3); this.producer.setRetryTimesWhenSendFailed(3); this.producer.setNamesrvAddr(jmsConfig.NAME_SERVER); this.start(); }
-
消费端重试
原因:消息处理异常、broker端到consumer端各种问题,如网络原因闪断,消费处理失败,ACK返回失败等等问题
-
注意:
-
重试间隔时间配置 ,默认每条消息最多重试 16 次
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
超过重试次数人工补偿
消费端去重
消费重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续消费新的消息,
一条消息无论重试多少次,这些重试消息的 Message ID,key 不会改变。
-
@Component public class PayConsumer { private String payConsumerGroup = "pay_consumer_group"; private DefaultMQPushConsumer consumer; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(payConsumerGroup); consumer.setNamesrvAddr(jmsConfig.NAME_SERVER); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); consumer.subscribe(jmsConfig.PAY_TOPIC, "*"); // 默认是集群模式(MessageModel.CLUSTERING),广播模式(MessageModel.BROADCASTING)不支持消息重试机制 // consumer.setMessageModel(MessageModel.BROADCASTING); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { int times = 0; try { MessageExt msg = msgs.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody())); String topic = msg.getTopic(); String body = new String(msg.getBody(), "utf-8"); String tags = msg.getTags(); String keys = msg.getKeys(); // 重试次数 times = msg.getReconsumeTimes(); System.out.println("重试次数= " + times); // 测试消费端消息重试机制 if(keys.equals("pay_key")) { throw new Exception(); } System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { if(times >= 2) { System.out.println("消息重试测试大于2次,记录数据库"); // 通知broker,消息消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start ..."); } }
延迟消息
Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息,目前支持固定精度的消息
源代码:rocketmq-store > MessageStoreConfig.java 属性 messageDelayLevel
"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
使用场景:通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// 同步发送
DefaultMQProducer mqProducer = payProducer.getProducer();
Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
// 设置消息延迟级别,从1开始
msg.setDelayTimeLevel(2);
SendResult sendResult = mqProducer.send(msg);
System.out.println(sendResult);
System.out.println(mqProducer.toString());
return "Success !!!";
}
顺序消息
-
顺序消息:消息的生产和消费顺序一致
-
全局顺序:topic下面全部消息都要有序(少用)
性能要求不高,所有的消息严格按照 FIFO 原则进行消息发布和消费的场景,并行度成为消息系统的瓶颈, 吞吐量不够
-
局部顺序:只要保证一组消息被顺序消费即可(RocketMQ使用)
性能要求高 - 电商的订单创建,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息、订单交易成功消息 都会按照先后顺序来发布和消费
-
顺序发布:对于指定的一个 Topic,客户端将按照一定的先后顺序发送消息
顺序消费:对于指定的一个 Topic,按照一定的先后顺序接收消息,即先发送的消息一定会先被客户端接收到。
-
注意:
- 顺序消息暂不支持广播模式
- 顺序消息不支持异步发送方式,否则将无法严格保证顺序
生产者-使用MessageQueueSelector将消息发送到topic下的指定queue
DefaultMQProducer mqProducer = payProducer.getProducer();
Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
// 如果arg是订单号是字符串,则进行hash,得到一个hash值
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
}, 0);
消费者-使用MessageListenerOrderly监听消息,自带单线程消费消息
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
MessageExt msg = list.get(0);
try {
int queueId = msg.getQueueId();
String msgBody = new String(msg.getBody(), "UTF-8");
String msgId = msg.getMsgId();
System.out.printf("consumer queueId=%s --- msgBody=%s --- msgId=%s \r\n", queueId, msgBody, msgId);
return ConsumeOrderlyStatus.SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
-
MessageQueueSelector
生产消息使用MessageQueueSelector投递到Topic下指定的queue
- 应用场景:顺序消息,分摊负载
- 默认topic下的queue数量是4,可以配置
- 支持同步,异步发送指定的MessageQueue
- 选择的queue数量必须小于配置的,否则会出错
public String sendPayMsg(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
// MessageQueueSelector发送到topic下的指定queue
DefaultMQProducer mqProducer = payProducer.getProducer();
Message msg = new Message(jmsConfig.PAY_TOPIC, "taga", "pay_key", ("hello rocketMq " + text).getBytes());
SendResult sendResult = mqProducer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer queueNum = (Integer) arg;
return mqs.get(queueNum);
}
}, 0);
System.out.println(sendResult);
return "Success !!!";
}
- MessageListenerOrderly
- Consumer会平均分配queue的数量
- 并不是简单禁止并发处理,而是为每个Consumer Quene加个锁,消费每个消息前,需要获得这个消息所在的Queue的锁,这样同个时间,同个Queue的消息不被并发消费,但是不同Queue的消息可以并发处理
消费者核心配置及核心知识
核心配置
-
consumeFromWhere
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
- CONSUME_FROM_FIRST_OFFSET:初次从消息队列头部开始消费,即历史消息(还储存在broker的)全部消费一遍,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_LAST_OFFSET: 默认策略,初次从该队列最尾开始消费,即跳过历史消息,后续再启动接着上次消费的进度开始消费
- CONSUME_FROM_TIMESTAMP(极少用) : 从某个时间点开始消费,默认是半个小时以前,后续再启动接着上次消费的进度开始消费
-
allocateMessageQueueStrategy
- 负载均衡策略算法,即消费者分配到queue的算法,默认值是AllocateMessageQueueAveragely即取模平均分配
-
offsetStore:消息消费进度存储器 offsetStore 有两个策略:
- LocalFileOffsetStore 和 RemoteBrokerOffsetStor 广播模式默认使用LocalFileOffsetStore 集群模式默认使用RemoteBrokerOffsetStore
consumeThreadMin 最小消费线程池数量
consumeThreadMax 最大消费线程池数量
pullBatchSize: 消费者去broker拉取消息时,一次拉取多少条。可选配置
consumeMessageBatchMaxSize: 单次消费时一次性消费多少条消息,批量消费接口才有用,可选配置
-
messageModel : 消费者消费模式,集群模式CLUSTERING(默认) ,广播模式BROADCASTING
consumer.setMessageModel(MessageModel.BROADCASTING);
核心知识
集群和广播模式
- 集群模式(默认):
- Consumer实例平均分摊消费生产者发送的消息
- 例子:订单消息,一般是只被消费一次
- 广播模式:
- 广播模式下消费消息:投递到Broker的消息会被每个Consumer进行消费,一条消息被多个Consumer消费,广播消费中ConsumerGroup暂时无用
- 例子:群公告,每个人都需要消费这个消息
- queue与消费者
- 如果是4个队列,8个消息,4个节点则会各消费2条,如果不对等,则负载均衡会分配不均,
- 如果consumer实例的数量比message queue的总数量还多的话,多出来的consumer实例将无法分到queue,也就无法消费到消息,也就无法起到分摊负载的作用,所以需要控制让queue的总数量大于等于consumer的数量
tag标签
一个Message只有一个Tag,tag是二级分类, 用于消息过滤
-
tag过滤位置
- Broker端过滤,减少了无用的消息的进行网络传输,增加了broker的负担
- Consumer端过滤,完全可以根据业务需求进行实习,但是增加了很多无用的消息传输
-
tag过滤原理
- Broker:遍历message queue存储的 message tag和 订阅传递的tag 的hashcode做对比,符合的则传输给Consumer,不一样则跳过,在consume queue :(message queue在CommitLog的标记)存储的是对应的hashcode, 对比也是通过hashcode对比;
- Consumer:收到过滤消息后也会进行匹配操作,但是是对比真实的message tag而不是hashcode;
- 总结:
- consume queue存储使用hashcode定长,节约空间
- 过滤中不访问commit log,可以高效过滤
- 如果存在hash冲突,Consumer端可以进行再次确认
-
tag过滤表达式
一般是监听 * ,或者指定 tag,|| 运算 , SLQ92 , FilterServer等;
-
tag性能高,逻辑简单
consumer.subscribe(jmsConfig.ORDER_TOPIC, "order_create || order_finished");
-
SQL92 性能差点,支持复杂逻辑(只支持PushConsumer中使用),如果使用SQL92语法,配置文件broker.conf中添加配置:enablePropertyFilter=true
- 语法:> , < = ,IS NULL, AND, OR, NOT 等,sql where后续的语法即可(大部分)
// 发送端 Message msg = new Message(jmsConfig.ORDER_TOPIC, "order_create", "pay_key", ("hello rocketMq " + text).getBytes()); msg.putUserProperty("amount", "60"); // 消费者 consumer.subscribe(jmsConfig.ORDER_TOPIC, MessageSelector.bySql("amount > 50"));
-
-
注意事项:
- 订阅关系一致:订阅关系由 Topic和 Tag 组成,同一个 group name,订阅的 topic和tag 必须是一样;如果订阅关系要一致,会造成消费混乱,甚至会造成消息丢失
- 如果想使用多个Tag,可以使用sql表达式,但是不建议
- 建议:单一职责,多个队列
PushConsumer/PullConsumer
- Push和Pull优缺点分析
- Push:实时性高;但增加服务端负载,消费端能力不同,如果Push推送过快,消费端会出现很多问题
- Pull:消费者从Server端拉取消息,主动权在消费者端,可控性好;但 间隔时间不好设置,间隔太短,则空请求,浪费资源;间隔时间太长,则消息不能及时处理
- 长轮询: Client请求Server端也就是Broker的时候, Broker会保持当前连接一段时间 默认是15s,如果这段时间内有消息到达,则立刻返回给Consumer,没消息的话 超过15s,则返回空,再进行重新请求;主动权在Consumer中,Broker即使有大量的消息 也不会主动提送Consumer, 缺点:服务端需要保持Consumer的请求,会占用资源,需要客户端连接数可控 否则会一堆连接
- PushConsumer本质是长轮训
- 系统收到消息后自动处理消息和offset,如果有新的Consumer加入会自动做负载均衡,
- 在broker端可以通过longPollingEnable=true来开启长轮询
- 消费端代码:DefaultMQPushConsumerImpl->pullMessage->PullCallback
- 服务端代码:broker.longpolling
- 虽然是push,但是代码里面大量使用了pull,是因为使用长轮训方式达到Push效果,既有pull有的,又有Push的实时性
- 优雅关闭:主要是释放资源和保存Offset, 调用shutdown()即可 ,参考 @PostConstruct、@PreDestroy
- PullConsumer需要自己维护Offset(参考官方例子)
- 官方例子路径:org.apache.rocketmq.example.simple.PullConsumer
- 获取MessageQueue遍历
- 客户维护Offset,需用用户本地存储Offset,存储内存、磁盘、数据库等
- 处理不同状态的消息 FOUND、NO_NEW_MSG、OFFSET_ILLRGL、NO_MATCHED_MSG、4种状态
- 灵活性高可控性强,但是编码复杂度会高
- 优雅关闭:主要是释放资源和保存Offset,需用程序自己保存好Offset,特别是异常处理的时候
Offset
消息偏移量
- offset
- message queue是无限长的数组,一条消息进来下标就会涨1,下标就是offset,消息在某个Message Queue里的位置,通过offset的值可以定位到这条消息,或者指示Consumer从这条消息开始向后处理;
- message queue中的maxOffset表示消息的最大offset, maxOffset并不是最新的那条消息的offset,而是最新消息的offset+1,minOffset则是现存的最小offset。
- fileReserveTime=48 (配置文件中)默认消息存储48小时后,消费会被物理地从磁盘删除,message queue的min offset也就对应增长;所以比minOffset还要小的那些消息已经不在broker上了,就无法被消费;
- 类型(父类是OffsetStore)
- 本地文件类型:DefaultMQPushConsumer的BROADCASTING模式,各个Consumer没有互相干扰,使用LoclaFileOffsetStore,把Offset存储在本地;
- Broker代存储类型:DefaultMQPushConsumer的CLUSTERING模式,由Broker端存储和控制Offset的值,使用RemoteBrokerOffsetStore
- 作用:主要是记录消息的偏移量,以供多个消费者进行准确消费
- 建议:采用pushConsumer,RocketMQ自动维护OffsetStore;如果使用pullConsumer,需要自己进行维护OffsetStore
CommitLog
RocketMq中消息存储是由ConsumeQueue和CommitLog配合完成
-
ConsumeQueue: 是逻辑队列, CommitLog是真正存储消息文件的,ConsumeQueue存储的是queue指向物理存储的地址;Topic下的每个message queue都有对应的ConsumeQueue文件,内容也会被持久化到磁盘;
默认地址:store/consumequeue/{topicName}/{queueid}/fileName
-
CommitLog:
CommitLog是消息文件的存储地址
- 生成规则:每个文件的默认1G =1024 * 1024 * 1024,commitlog的文件名fileName,名字长度为20位,左边补零,剩余为起始偏移量;比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1 073 741 824Byte;当这个文件满了,第二个文件名字为00000000001073741824,起始偏移量为1073741824, 消息存储的时候会顺序写入文件,当文件满了则写入下一个文件
- 定位消息位置:例如 1073742827 为物理偏移量,则其对应的相对偏移量为 1003 = 1073742827 - 1073741824,并且该偏移量位于第二个 CommitLog。
ZeroCopy
零拷贝
请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升
-
对应零拷贝技术有mmap及sendfile
- mmap:小文件传输快(RocketMQ 选择这种方式,mmap+write 方式,小块数据传输,效果会比 sendfile 更好)
- sendfile:大文件传输比mmap快
应用:Kafka、Netty、RocketMQ等都采用了零拷贝技术;Java中的TransferTo()实现了Zero-Copy
-
RocketMq高效原因分析
- CommitLog顺序写, 存储了MessagBody、message key、tag等信息
- ConsumeQueue随机读 + 操作系统的PageCache + 零拷贝技术ZeroCopy
传统拷贝
-
ZeroCopy
分布式事务消息
分布式事务
- 来源:单体应用—>拆分为分布式应用;
- 一个接口需要调用多个服务,且操作不同的数据库,数据一致性难保障;
RokcetMQ分布式事务消息
-
RocketMQ事务消息:
- RocketMQ 提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
-
半消息Half Message:
- 暂不能消费,Producer已经将消息成功发送到了Broker端,但是服务端未收到生产者对该消息的二次确认,此时该消息被标记成“暂不能投递”状态,处于该种状态下的消息即半消息
-
消息回查:
- 由于网络闪断、生产者应用重启等原因,导致某条事务消息的二次确认丢失,消息队列 RocketMQ 服务端通过扫描发现某条消息长期处于“半消息”时,需要主动向消息生产者询问该消息的最终状态(Commit 或是 Rollback),该过程即消息回查。
-
整体交互流程
Producer向broker端发送消息。
服务端将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息。
发送方开始执行本地事务逻辑。
发送方根据本地事务执行结果向服务端提交二次确认(Commit 或是 Rollback),服务端收到 Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息;服务端收到 Rollback 状态则删除半消息,订阅方将不会接受该消息
在断网或者是应用重启的特殊情况下,上述步骤 4 提交的二次确认最终未到达服务端,经过固定时间后服务端将对该消息发起消息回查
发送方收到消息回查后,需要检查对应消息的本地事务执行的最终结果
发送方根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤 4 对半消息进行操作
-
RocketMQ事务消息的状态
- COMMIT_MESSAGE: 提交事务消息,消费者可以消费此消息
- ROLLBACK_MESSAGE:回滚事务消息,消息会在broker中删除,消费者不能消费
- UNKNOW:Broker需要回查确认消息的状态
-
关于事务消息的消费
- 事务消息consumer端的消费方式和普通消息是一样的,RocketMQ能保证消息能被consumer收到(消息重试等机制,最后也存在consumer消费失败的情况,这种情况出现的概率极低)。
- 注意点:TransactionMQProducer 的groupName要唯一,不能和普通的producer一样
执行本地事务-回查本地事务
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TransactionListenerImpl implements TransactionListener {
/**
* 执行本地事务
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
System.out.println("----> executeLocalTransaction <----");
String msgBody = new String(msg.getBody());
String key = msg.getKeys();
String transactionId = msg.getTransactionId();
System.out.printf("----> executeLocalTransaction transactionId=%s, key=%s, msgBody=%s \r\n", transactionId, key, msgBody);
// TODO 执行本地事务 begin
// TODO 执行本地事务 end
// 测试-二次确认消息
int status = Integer.valueOf(arg.toString());
// COMMIT_MESSAGE 消费者可以消费消息
if(1 == status) {
return LocalTransactionState.COMMIT_MESSAGE;
}
// ROLLBACK_MESSAGE 消息回滚,broker会删除半消息
if(2 == status) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
// UNKNOW broker会进行回查消息
if(3 == status) {
return LocalTransactionState.UNKNOW;
}
return null;
}
/**
* 回查本地事务
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
System.out.println("----> checkLocalTransaction <----");
String msgBody = new String(msg.getBody());
String key = msg.getKeys();
String transactionId = msg.getTransactionId();
System.out.printf("----> checkLocalTransaction transactionId=%s, key=%s, msgBody=%s", transactionId, key, msgBody);
// 要么提交,要么回滚,根据业务需求检查本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
}
事务消息生产者
import cn.net.rocketmq.config.jmsConfig;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.springframework.stereotype.Component;
import java.util.concurrent.*;
@Component
public class TransactionProducer {
private String transProducerGroup = "trans_producer_group";
private TransactionMQProducer producer;
private TransactionListener transactionListener;
// 自定义线程池给定线程池名称
//创建自定义线程池
//@param corePoolSize 池中所保存的核心线程数
//@param maximumPoolSize 池中允许的最大线程数
//@param keepActiveTime 非核心线程空闲等待新任务的最长时间
//@param timeunit keepActiveTime参数的时间单位
//@param blockingqueue 任务队列
private ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
// 构造器初始化
public TransactionProducer() {
transactionListener = new TransactionListenerImpl();
producer = new TransactionMQProducer(transProducerGroup);
producer.setTransactionListener(transactionListener);
producer.setExecutorService(executorService);
producer.setNamesrvAddr(jmsConfig.NAME_SERVER);
start();
}
// 获取TransactionMQProducer
public TransactionMQProducer getProducer() {
return this.producer;
}
// TransactionMQProducer开始入口
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
// TransactionMQProducer结束入口
public void shutDown() {
this.producer.shutdown();
}
}
事务消息消费者
import cn.net.rocketmq.config.jmsConfig;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class TransactionConsumer {
private String payConsumerGroup = "trans_consumer_group";
private DefaultMQPushConsumer consumer;
public TransactionConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(payConsumerGroup);
consumer.setNamesrvAddr(jmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(jmsConfig.TRANSACTION_TOPIC, "*");
// 并行消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
try {
MessageExt msg = msgs.get(0);
System.out.printf("----> TransactionConsumer %s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("----> TransactionConsumer topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
consumer.start();
System.out.println("TransactionConsumer consumer start ...");
}
}
访问入口
import cn.net.rocketmq.config.jmsConfig;
import cn.net.rocketmq.jms.TransactionProducer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class TransactionProducerController {
@Resource
private TransactionProducer transactionProducer;
@GetMapping("/sendTransMsg")
public String sendTransMsg(String text, String otherParam) throws MQClientException {
Message msg = new Message(jmsConfig.TRANSACTION_TOPIC, "trans_tags", "trans_keys", text.getBytes());
TransactionSendResult sendResult = transactionProducer.getProducer().sendMessageInTransaction(msg, otherParam);
System.out.println("sendResult=" + sendResult);
return "Success !!!";
}
}