最近一直在学习MQ相关的内容,之前也阅读过的RabbitMQ的官方文档,相比于Kafka和RocketMQ,RabbitMQ真的算是一个轻量级的消息中间件了。在了解这类消息中间件的事务实现时,发现大致的思路都离不开两阶段(2pc)或者或者事务补偿机制等思路。以下简单对Kafka、RocketMQ的事务实现的做下学习笔记。
RocketMQ
生产者发送事务消息
生产者发送消息可以查看源码org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#sendMessageInTransaction
public TransactionSendResult sendMessageInTransaction(final Message msg,
final LocalTransactionExecuter localTransactionExecuter, final Object arg)
throws MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
// ignore DelayTimeLevel parameter
if (msg.getDelayTimeLevel() != 0) {
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_DELAY_TIME_LEVEL);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
// 这里给消息添加了属性,标明这是一个事务消息,也就是半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
// 执行本地事务
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransactionState);
log.info(msg.toString());
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
// 根据事务消息和本地事务的执行结果 localTransactionState,决定提交或回滚事务消息
// 这里给 Broker 发送提交或回滚事务的 RPC 请求。
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
transactionSendResult.setSendStatus(sendResult.getSendStatus());
transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
transactionSendResult.setTransactionId(sendResult.getTransactionId());
transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
}
具体分为以下几个步骤:
1、首先给待发送消息添加了一个属性PROPERTY_TRANSACTION_PREPARED,标明这是一个事务消息,也就是半消息,然后会像发送普通消息一样去把这条消息发送到 Broker 上。
2、发送成功了,就开始调用我们之前提供的接口org.apache.rocketmq.client.producer.TransactionListener 的实现类中,执行本地事务的方法executeLocalTransaction() 来执行本地事务
3、根据半消息的发送结果和本地事务的执行结果,来决定提交或者回滚事务,在实现方法org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#endTransaction中,producer给Broker发送了一个单向的RPC请求,告知Broker完成事务的提交或者回滚。
由于有事务反查的机制来兜底,这个 RPC 请求即使失败或者丢失,也都不会影响事务最终的结果。最后构建事务消息的发送结果,并返回。
在这里补充下RocketMQ的API
org.apache.rocketmq.client.producer.TransactionListener#executeLocalTransaction
--当发送事务消息(半消息)成功后调用,该方法会被调用执行本地事务。
org.apache.rocketmq.client.producer.TransactionListener#checkLocalTransaction
--当没有响应给事务消息(半消息)时,服务端Broker会自动调用该方法来校验检查事务的状态
Broker处理半消息
Broker 在处理 Producer 发送消息的请求时,会根据消息中的属性判断一下,这条消息是普通消息还是半消息:
org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage
处理半消息org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
// 记录消息的主题和队列,到新的属性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
// 替换消息的主题和队列为:RMQ_SYS_TRANS_HALF_TOPIC,0
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
return msgInner;
}
RocketMQ 并没有把半消息保存到消息中客户端指定的那个队列中,而是记录了原始的主题队列后,把这个半消息保存在了一个特殊的内部主题RMQ_SYS_TRANS_HALF_TOPIC 中,使用的队列号固定为 0。这个主题和队列对消费者是不可见的,所以里面的消息永远不会被消费。这样,就保证了在事务提交成功之前,这个半消息对消费者来说是消费不到的。
消息反查实现
--org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService
在 Broker 的TransactionalMessageCheckService 服务中启动了一个定时器,定时从半消息队列中读出所有待反查的半消息,针对每个需要反查的半消息,Broker 会给对应的 Producer 发一个要求执行事务状态反查的 RPC 请求,这部分的逻辑在方法org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage 中,根据 RPC 返回响应中的反查结果,来决定这个半消息是需要提交还是回滚,或者后续继续来反查。
提交或者回滚事务实现的逻辑是差不多的,首先把半消息标记为已处理,如果是提交事务,那就把半消息从半消息队列中复制到这个消息真正的主题和队列中去,如果要回滚事务,这一步什么都不需要做,最后结束这个事务。这部分逻辑的实现在org.apache.rocketmq.broker.processor.EndTransactionProcessor 这个类中。
Kafka
Kafka事务
解决的问题是,确保在一个事务中发送的多条消息,要么都成功,要么都失败。注意,这里面的多条消息不一定要在同一个主题和分区中,可以是发往多个主题和分区的消息。当然,你可以在 Kafka 的事务执行过程中,加入本地事务,来实现和RocketMQ 中事务类似的效果,但是 Kafka 是没有事务反查机制的。
Kafka 的这种事务机制,单独来使用的场景不多。更多的情况下被用来配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once 语义。我在Kafka的官方文档中提出了kafka的 ExactlyOnce,但是和我们通常理解的消息队列的服务水平中的 Exactly Once 是不一样的。
我们通常理解消息队列的服务水平中的 Exactly Once,它指的是,消息从生产者发送到Broker,然后消费者再从 Broker 拉取消息,然后进行消费。这个过程中,确保每一条消息恰好传输一次,不重不丢。我们之前说过,包括 Kafka 在内的几个常见的开源消息队列,都只能做到 At Least Once,也就是至少一次,保证消息不丢,但有可能会重复。做不到Exactly Once。
Kafka 中的 Exactly Once 又是解决的什么问题呢?它解决的是,在流计算中,用 Kafka作为数据源,并且将计算结果保存到 Kafka 这种场景下,数据从 Kafka 的某个主题中消费,在计算集群中计算,再把计算结果保存在 Kafka 的其他主题中。这样的过程中,保证每条消息都被恰好计算一次,确保计算结果正确。
Kafka的事务实现
kafka的协调者并不是一个独立的进程,而是 Broker 进程的一部分,协调者和分区一样通过选举来保证自身的可用性。
Kafka 集群中有一个特殊的用于记录事务日志的主题,这个事务日志主题的实现和普通的主题是一样的,里面记录的数据就是类似于“开启事务”“提交事务”这样的事务日志。日志主题同样也包含了很多的分区。在 Kafka 集群中,可以存在多个协调者,每个协调者负责管理和使用事务日志中的几个分区。这样设计的目的是为了能并行执行多个事务,提升性能。
事务流程
1、生产者会给协调者发一个请求来开启事务,协调者在事务日志中记录下事务 ID。
2、生产者在发送消息之前,还要给协调者发送请求,告知发送的消息属于哪个主题和分区,这个信息也会被协调者记录在事务日志中。
3、生产者就可以像发送普通消息一样来发送事务消息,kafka直接发给 Broker,保存在这些消息对应的分区中,Kafka 会在客户端的消费者中,暂时过滤未提交的事务消息。
4、消息发送完成后,生产者给协调者发送提交或回滚事务的请求,由协调者来开始两阶段提交,完成事务。两阶段如下:
第一阶段,协调者把事务的状态设置为“预提交”,并写入事务日志。到这里,实际上事务已经成功了,无论接下来发生什么情况,事务最终都会被提交。
第二阶段,协调者在事务相关的所有分区中,都会写一条“事务结束”的特殊消息,当 Kafka 的消费者,也就是客户端,读到这个事务结束的特殊消息之后,它就可以把之前暂时过滤的那些未提交的事务消息,放行给业务代码进行消费了。最后,协调者记录最后一条事务日志,标识这个事务已经结束了。
Kafka 这个两阶段的流程,准备阶段,生产者发消息给协调者开启事务,然后消息发送到每个分区上。提交阶段,生产者发消息给协调者提交事务,协调者给每个分区发一条“事务结束”的消息,完成分布式事务提交。
简单总结
在学习Kafka的相关内容(跟随着李玥老师的《消息队列高手课》)时还是对这种事务实现比较模糊,需要从官方去寻找答案,以及对Kafka的源码进行剖析。结尾对比下两个消息中间件产品的差异:
实现方面:RocketMQ 是把这些消息暂存在一个特殊的队列中,待事务提交后再移动到业务队列中;而 Kafka 直接把消息放到对应的业务分区中,配合客户端过滤来暂时屏蔽进行中的事务消息。
适用场景:,RocketMQ的事务适用于解决本地事务和发消息的数据一致性问题,而 Kafka 的事务则是用于实现它的 Exactly Once 机制,应用于实时计算的场景中。