背景
可靠性和可用性是每个mq系统最重要的两个特征。虽然成熟的mq对这两个特性支持的很好,但是mq仍然需要一些其他的手段去确保一个消息的整条链路没有问题。通过一些方法,我们应该看到一个消息的完整链路并且迅速找到消息处理过程中失败的根源问题。
所以RocketMQ 支持消息轨迹跟踪的特性,我们能轻松的找到和分析息处理过程中失败的根源问题。并且我们能查到很多参数的值,比如消息发送耗时,消息消费耗时,broker端存储时间等等。
设计架构
架构包括两个部分,存储端和客户端收集
生产者消费者的多线程模式和阻塞队列
在客户端,对于生产者,我们能收集信息(比如,发送和消费消息耗费的时间,对应broker存储时间,对应broker的ip等等)并且把信息放入阻塞队列。对于消费者,使用一个叫做MQ-AsyncArrayDispatcher的线程从阻塞队列去把消息轨迹跟踪的信息取出。然后这个异步的线程(MQ-AsyncArrayDispatcher)把消息的轨迹跟踪打包作为AsyncAppenderRequest 任务提交给线程池执行。
最后,AsyncAppenderRequest task主要的执行过程是把从客户端收集到的轨迹跟踪信息发送到一个特殊的broker 节点上。
实现两个hook
包括“SendMessageHook/ConsumeMessageHook”,用这两个雷我们能够在发布和订阅消息的前后收集到消息的轨迹跟踪数据。
新定义一个特殊的broke节点去存储消息轨迹跟踪数据
在一个集群中我们能定义一个特殊的broker服务节点去存储消息轨迹跟踪的数据。我们在broker.properties文件中,能够加一个flag(比如autoTraceBrokerEnable)去定义这个broker是否是一个用来存储消息轨迹跟踪数据的特殊节点。
- autoTraceBrokerEnable is false。表明这个broker 是一个普通的节点,然后"Trace_Topic”将不去建立在这个节点上。并且正常的消息还会正常处理。
- autoTraceBrokerEnable is true。表明broker是一个特殊的节点,它是特别用来存储消息轨迹跟踪数据的。并且"Trace_Topic"在broker开始阶段自动创建,这个节点自动在nameserver注册 它拥有的topic集合(包括Trace_Topic)。这样,在一个RocketMQ 集群中,仅仅有一个特殊的broker节点去存储消息轨迹跟踪数据。并且客户端(包括发布和订阅消息)会通过nameserver知道那个broker节点是负责收集消息轨迹跟踪数据的,并发送。
如何查询轨迹跟踪数据?
举个例子,保存消息轨迹跟踪数据的topic被称作"RMQ_SYS_TRACE_DATA_XXX”而不是普通消息的topic。但是仍然用普通消息查询方式(通过messageId + topic,topic+ key or topic实现)通过RocketMQ console查询是不可行的,查不到我们期望的结果。所以,当发送被客户端收集的消息轨迹跟踪数据时,我们能通过使用普通消息的msgId(不是offset MsgId) 或key去填充消息轨迹跟踪数据的keyset 属性,所以Broker端能够根据普通消息的msgId或key去创建索引文件。
并且在broker端,我们能够通过业务进程调用QueryMessageProcessor去调用broker端的queryMessage() 方法。
源码分析
接口的设计和变化
- 数据传输的异步接口:增加AsyncDispatcher 类
public interface AsyncDispatcher {
void start() throws MQClientException;
boolean append(Object ctx);
void flush() throws IOException;
void shutdown();
}
- 定义两个hook的实现:
a.ClientSendMessageTraceHookImpl
b.ClientConsumeMessageTraceHookImpl - 定义和写了对应的数据模型:
a.TraceBean
b.TraceConstants
c.TraceContext
d.TraceDataEncoder
e.TraceDispatcherType
f.TraceTransferBean
g.TuxeTraceType