最近在组内做了中间件的简单分享,主题是流行的开源中间件RocketMQ和我司开源的QMQ之间的实现和区别。
RocketMQ最早由阿里开源,是相当流行的一款消息中间件,QMQ最早在我司内部使用,后来也开源了。两者的实现原理大致相同,但是细节之处还是有很多不一样的地方,这些不同之处的实现也很值得思考。
架构介绍
RocketMQ的架构使用过的人应该都很熟悉,分为Producer,Consumer,Broker,NameServer四大集群,NameServer本身无状态,主要是维护Broker的地址等路由信息,Broker是消息的中转站,还负责消息的存储等。
QMQ的架构与RocketMQ大同小异,meta server负责状态和路由信息的维护,server的功能与broker相当,但是延时和定时消息被单独拆分出了一个服务delay server。我想这么做也是由业务决定的,酒店,度假,机票订单都经常会有提前预定的情况,消息并不需要实时消费,类似的业务场景比较常见。
消息存储与消费
RocketMQ存储
RocketMQ对消息的存储是混合型的结构,单一broker下所有的消息都保存在commitlog一个文件中。逻辑上是如此,实际上还是分了很多个文件的,单个文件最大的大小是1G,文件名是起始偏移量。单一commitlog是一个相对抽象的概念。除了commitlog之外 ,还有consumequeue和indexfile两个索引文件。consumequeue顾名思义,是用来给消费者读取的,broker会把要消费的消息推到consumequeue中,每一个consumequeue都对应了一个消费者,但也不是一一对应的关系,多个consumequeue是可以对应到同一个消费者的。broker接收到来自生产者的消息后,写入commitlog并且刷盘,随后将消息的offset等信息负载一个消费队列,然后写入。
QMQ消息存储
在了解QMQ的设计之前,我想先说说RocketMQ的存储设计有哪些缺点。对于ConsumeQueue,或者叫partition,数量固定的情况下,如果消费者客户端与其数量不一致,那很容易发生一个消费者客户端负载了多个消息队列从而导致很忙碌,但是其他客户端很空闲的情况,即负载不均衡,同时扩容也不太方便,增加机器并不能实时的增加ConsumeQueue与之对应。基于这个问题,QMQ对存储模型做了改进。
存储还是分了三个部分,分别是message log, consume log, pull log。messagelog与RocketMQ的commitlog作用基本一致,consume log作用和consumeQueue作用也基本一致,pull log是QMQ增加在两者之间的中间层,用来方便扩容
- message log 存储生产者发送来的消息,是消息的存储单元
- consume log一个log对应一个topic,存储的是commit log的索引信息
- pull log存储的是consume log的索引信息,每一个Pull log对应了一个消费者客户端。
这样一来,消费队列与消费者之间的关系被解耦,当消费者增加时,新增一个Pull log,就可以实时负载到topic下对应的消息,积压的消息也可以被及时消费。
网上关于QMQ能找到的资料不多,对于这样的存储模型,我也还存在几个问题,希望日后能找到答案。
- 消息的积压是积压在consume log中还是Pull log中?按照能实时扩容的功能分析,应该是积压在consume log中,那么消费者客户端在拉取的时候是实时的将consume log的偏移量推到pull log中吗?这样是否会影响性能,或者造成重复消费
- 增加一层中间层对性能的影响如何
事务消息
RocketMQ事务消息
RocketMQ对事务消息的理念是,保证最终一致性,通过两阶段提交的方式来实现生产者端与消费者端的状态一致。
如图所示,生产者推送一个prepare到broker,broker将消息记录,并且定时向生产者轮询。
生产者可以配置在消息消费成功之后的逻辑。
QMQ事务消息
如图所示,QMQ的事务是通过关系型数据库来实现。即每一个使用QMQ的应用,都必须新建一个指定的table,用来保存要发送的消息。在server上会有定时的任务来轮询db,将未消费的消息进行消费。同时,生产者也可以将业务逻辑与消息写入db放到同一个数据库事务中,保证了业务逻辑和消息发送成功这两个操作的原子性。
但是作为一个消息中间件,部署时必须要依赖db,复杂度略有增加。
并且,消息发送的可靠性需要和业务的写db绑定,也就是放在同一个事务中,对业务逻辑有一定的侵入性。
同时还会带来一个问题,当一个方法只是做纯消息转发,没有任何其他的业务逻辑去操作db,那么如何保证消息持久化失败之后的补偿处理呢?对于这个场景,QMQ提供了MessageSendStateListener来监听发送后的状态,该接口存在两个方法,onSuccess和onFailed,可以对发送失败后的逻辑自定义。