rocketMQ架构图:
组成介绍
- Producer:消息发布的角色,支持分布式集群方式部署。发送消息时,Producer 会随机选择一个nameserver建立长连接,定期获取topic路由信息(包括所有队列列表)。Producer会轮询队列列表,与队列所在的broker建立长连接从而向broker发送消息,投递的过程支持快速失败并且低延迟。
- Nameserver:提供broker的动态注册与发现。主要包含两个功能:
- Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
- 路由信息管理,每个NameServer将保存关于Broker集群的整个路由信息和用于客户端查询的队列信息。然后Producer和Conumser通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。
NameServer通常也是集群的方式部署,各实例间相互不进行信息通讯。
- BrokerServer: Broker主要负责消息的存储、投递和查询以及服务高可用保证。broker架构:
- Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息 - Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
- HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
-
Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。
- ConsumerServer: 消息消费的角色,支持分布式集群方式部署。支持以push推(特殊的pull模式),pull拉两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。
功能与特性
- 基于Message Id, Message Key消息查询
- 消息轨迹查询
- 集群消费和广播消费
- topic 级别权限控制
- 重置消费位点,在消费者组级别重置消费者队列位点
- 重试队列和死信队列
使用样例
- 生产者同步发送
Message message = new Message("topic1", "inOrder", "messageId_" + i,
("hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(message);
- 生产者异步发送
Message msg = new Message("topic1", "async","uniqueId123123123",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接收异步返回结果的回调
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
- 生产者sendOneway
Message message = new Message("topic1", "sendOneWay", "messageId_" + i,
("hello world" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.sendOneway(message);
- 顺序消息
同一个订单的不同业务消息需要保证顺序,根据业务唯一标识订单id对队列集合数量取模,保证每个订单的消息只会发送到一个队列中。
//生产者
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根据订单id选择发送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//订单id
//消费者
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每个queue有唯一的consume线程来消费, 订单对每个queue(分区)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() +
"queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模拟业务逻辑处理中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
- 延时消息
//生产者
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
message.setDelayTimeLevel(3);
// 发送消息
producer.send(message);
//消费者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages,
ConsumeConcurrentlyContext context) {
// 延迟消息 消息生成 和 消息store 之间延迟
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" +
message.getMsgId() + "] " + (message.getStoreTimestamp() -
message.getBornTimestamp()) + "ms later");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
- 重试队列和死信队列
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
// 返回此状态会判断是否存在重试topic 没有就创建
// 默认重试16次。16次后进入死信队列,等待后续订阅处理
//三种导致重试的情况
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
// return null;
// throw new RuntimeException();
}
});
- 消息过滤方式
//by tag
consumer.subscribe("topic1", "TagA || TagC || TagD");
// by SQL
consumer.subscribe("topic1", MessageSelector.bySql("a between 0 and 3"));
消息持久化和消费逻辑
消息持久化
消息发送到broker时会在broker节点生成三类文件。windows存储目录在C:\Users\YOURUSERNAME\store
- commitlog
存储消息的主体 -
consumeQueue
单个文件由30W个条目组成,单个条目由8字节的commitlog物理偏移量、4字节的消息长度、8字节tag hashcode组成,用于快速定位消息实体的索引。每个topic中的每个队列都有对应的一个consumeQueue文件。
- indexFile
提供了一种可以通过key或时间区间来查询消息的方法底层实现为hashMap
消息生产和消费逻辑
- 生产者通过负载均衡策略从队列列表中选择一个队列,再与队列所在的broker建立连接,从而发送消息。broker会持久化消息到commitLog文件,同时在consumeQueue文件中生成一个条目,条目中记录消息在commitLog中的偏移量、消息长度和tag的hashcode。消费者可以高效的通过条目找到对应的消息。
- 消费者以消费者组为单位订阅Topic,每个消费者组在topic所有队列中的Offset都不同,代表每个消费者组消费是隔离的。
最佳实践
- topic和tag使用场景
不同的消息类型(普通消息,事务消息,定时(延时消息),顺序消息)使用不同topic,不能使用tag区分。
业务相关联的消息使用tag区分,业务不直接相关的用topic区分
消息优先级和数据量级不同的用topic区分,可能导致优先级高的消息不能及时被消费或者等待现象。 - 消息幂等
可以将业务唯一标识设置为message的key,再通过代码实现幂等。 -
订阅关系一致
同一个消费者组下consumer实例订阅的topic,tag必须保持一致。
正确的示例:
错误的示例:
常见问题
- 消息轨迹中消息详情报错?
配置traceTopicEnable=true,生成生产者和消费者示例时设置enableMsgTrace=true - broker中消息存储文件commitlog保存时间?3天 (fileReservedTime) 每天4am删除
- consumeQueue indexFile构建?
- 不同的消息类型不共用Topic。
- 并发消费 顺序消费 实现
并发消费: 一个队列有多个消费者线程消费
顺序消费:一个队列只有一个消费者线程消费 - 生产者 消费者 重试机制
生产者同步模式下 默认重试两次,异步,oneway模式不会重试
消费者消费失败后最多重试16次,如果再失败会被投递到死信队列中。 - messageId 生成策略
由broker IP, port 加上offset根据一定算法生成, 所以根据messageId查询效率较高。 - 延时消息的实现
消息发送到broker后,由broker实现。延迟存储到commitLog库。