基本架构:
RocketMQ:
每个Broker与所有NameServer保持心跳
每个Producer/Consumer与其中一个NameServer建立长连接,与当前生产/消费的Topic涉及到的所有Broker保持心跳
NameServer之间互不通信
每组Broker(Master/Slave)互不通信
Kafka:
多个Broker连接到相同的ZooKeeper集群
一个Topic对应一个或多个Partition。
Topic是逻辑单位。Partition是物理单位。
Partition有多个Replica,均衡分布在不同Broker,其中一个为Leader,负责消息读写。
其他Replica则是Follower,Follower定期到Leader上同步数据。
每个Partition副本对应一个磁盘上的日志文件夹
高可用高可靠机制:
Kafka:
保证Producer发送消息可靠性
- producer.type=sync
- request.require.acks:设置为ALL或者-1,等待所有ISR接收到消息后再给Producer发送Response。要配合设置Broker的ISR相关参数。
保证Broker消息可靠,高可用和吞吐量
通过如下配置来保证Broker消息可靠性:
- default.replication.factor:设置为大于等于3,保证一个partition中至少有两个Replica,并且replication.factor > min.insync.replicas
- min.insync.replicas:设置为大于等于2,保证ISR中至少有两个Replica
- unclean.leader.election.enable=false,那么就意味着非ISR中的副本不能够参与选举,避免脏Leader。
Kafka的ISR机制可自动动态调整同步复制的Replica,将慢(可能是暂时的慢)Follower踢出ISR,将同步赶上的Follower拉回ISR,避免最慢的Follower拖慢整体速度,最大限度地兼顾了可靠性和可用性
。
Kafka对环境的适应和机器的利用效率要强于RocketMQ。
保证Consumer消费消息的可靠性
enable.auto.commit=false 关闭自动提交位移,消息处理完成之后再提交offset
每个Consumer Group独立维护offset,互不干扰,不存在线程安全问题。
RocketMQ:
多个Master模式同步刷盘(磁盘配置为RAID10)
多Msater多Slave模式,同步双写
Msater宕机Slave只提供存量消息的读,后续的读写由其它Master承担
负载均衡机制:
Kafka:
Partition的数量应该要大于对应Consumer的数量
。并建议Partition的数量大于集群Broker的数量,这样Leader partition可以均匀的分布在各个Broker中,最终使得集群负载均衡。
Producer和Topic下所有partition leader保持socket连接。Borker扩容时,Producer可以直接感知。
消息由producer直接通过socket发送到broker。producer决定消息被路由到哪个partition
。可以采用、random、key-hash、轮询等策略。Kafka提供了接口供用户实现自定义Partition,用户可以为每个消息指定一个partitionKey,通过这个key来实现一些hash分区算法。
Controller负责协调负载均衡
Kafka会依照默认和配置的策略,自动的均衡打散分布Partition、Leader和Follower。并在Broker宕机和扩容时自动触发reloadbalancing,增加程序并行能力和高效。
创建Topic时可指定parition数量来适应不同Topic的消息量。
RocketMQ:
NameServer通过心跳监控Broker,一旦Broker失联,调整Topic跟Broker的对应关系
Broker通过心跳监控Consumer,一旦Consumer失联,如果该Consumer属于某个消费组,则触发消费组负载均衡
发送消息通过轮询队列的方式发送,每个队列接收平均的消息量。
多个队列可以部署在一台机器上,也可以分别部署在多台不同的机器
可通过控制台命令配置Broker的Topic和队列,实现调整负载
一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。
一个Consumer Group下的多个Consumer以均摊方式消费消息
Consumer数量要小于等于队列数量,否则多余的Consumer将不能消费消息。
刷盘机制:
RocketMQ:
所有数据单独储存到commit Log ,同时只会写一个文件,一个文件满1G,再写新文件,真正的完全顺序写盘
。对最终用户展现的队列实际只储存消息在Commit Log的位置信息。
随机读,读取pagecache时,即使只访问1K的消息,系统也会提前预读出更多的数据,尽可能让读命中pagecache,减少IO操作,所以内存越大越好。
Kafka:
partition少的时候,基本上是顺序写;在partition特别多的时候,就变成了随机写,性能会急剧下降
。尽量在业务上避免过多partition
保证消息顺序:
尽可能从业务上避开消息的顺序性
保证消息顺序,前提是保证不丢消息,以及消息去重
在MQ的模型中,顺序需要由3个阶段去保障:
消息被发送时保持顺序
消息被存储时保持和发送的顺序一致
消息被消费时保持和存储的顺序一致
发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A一定在B之前。而消费保持和存储一致则要求消息A、B到达Consumer之后必须按照先A后B的顺序被处理。
Kafka:
- Producer端串行发送消息,max.in.flight.requests.per.connection = 1 限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求。
- 所有发送的消息,用同一个key,这样同样的key会落在同一个partition里面。
- consumer端,Kafka保证,1个partition只能被1个consumer消费。
RocketMQ:
- produce在发送消息的时候,配置MessageQueueSelector,把消息发到同一个队列(queue)中。
- 消费者注册消息监听器为MessageListenerOrderly,这样就可以保证消费端只有一个线程去消费消息
分布式事务一致性:
保证at least once和业务上消息去重
将大事务拆分成小事务,通俗说
第一步、先保证本地事务和消息发送同时成功或失败
第二步、如果第一步成功,确保消息被消费,同时远程事务成功
第二步可通过消费状态表确保消息成功消费,或者通过消息中间件的重试机制来实现
RocketMQ实现了重试队列和死信队列机制,死信队列的消息需要人工干预处理
并且需要在业务上实现幂等和消息去重
实现分布式事务一致性的三种方式:
- 事务消息表:
先执行本地事务。如果成功,记录到事务消息表,通过定时轮询事务消息表确保成功发送消息。第二步如上。
定时轮询对数据库读写压力很大
- 经典事务消息:
将消息发送放在本地事务中,消息发送失败则回滚事务。第二步如上。
发送消息可能响应迟缓,放在数据库事务中值得考量
- 类似于RocketMQ的分阶段事务消息
RocketMQ:
首先发送Prepared消息,消息发送成功后才开始执行本地事务。之后发送确认消息,本地事务成功则发送Commit消息,失败则发送Rollback消息。
如果确认消息发送失败?RocketMQ会定期扫描消息集群中的事务消息,这时候发现了Prepared消息,它会向消息发送者确认。
RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
如果当前Producer宕机,RocketMQ会向该Producer所属的生产组其他Producer发送确认消息。
java代码中通过实现TransactionListener接口,来实现RocketMQ的事务消息机制。
public interface TransactionListener {
/**
* 发送prepare消息成功后回调该方法用于执行本地事务
* @param msg 回传的消息,利用transactionId即可获取到该消息的唯一Id
* @param arg 调用send方法时传递的参数,当send时候若有额外的参数可以传递到send方法中,这里能获取到
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/**
* @param msg 通过获取transactionId来判断这条消息的本地事务执行状态
* @return 返回事务状态,COMMIT:提交 ROLLBACK:回滚 UNKNOW:回调
*/
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
TransactionListener接口有两个方法:
executeLocalTransaction方法用于在发送prepare消息成功后执行本地事务。
checkLocalTransaction方法在确认消息发送失败,RocketMQ扫描到Prepared消息并向消息发送者确认时调用,用来通知RocketMQ本地事务是否成功。
kafka:
不提供完整的事务一致性保证,需要使用者自行实现
本文草成,还有很多细节会逐步补充完善上去。
并且本人才疏学浅,在此抛砖引玉,如有错漏,敬请不吝指正。