可靠消息与分布式事务
1 概述
前面的文章https://zhuanlan.zhihu.com/p/92866118讲述了Seata对分布式事务的支持,当生产环境中没有seata的部署时,我们如何通过可靠消息例如RocketMQ处理分布式事务。
2 分布式事务的困难
分布式事务,全局事务就是一个分布式系统,我们都知道,事务具有ACID特性,但是对于分布式环境,ACID是不能完整的得到保障的。
在分布式系统中,有个著名的CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。
但是在实际的分布式环境中,P是一定会存在的,所以在分布式环境中。当P出现时,我们需要在C和A之间做取舍,要和选择CP,要么选择AP。
有没有CA系统呢?有,传统的关系型数据库就是CA系统,因为它不会出现分区的情况,单库提供的ACID保障,它不是分布式的。常用的存储系统在CAP中的取舍如下图所示:
由此可知,分布式事务同样也要在C和A之间做取舍,因此分布式事务难做,在发生P时,我们需要放弃C和A中的一个。
3 事务消息的分布式事务
在使用可靠消息做分布式事务时,我们需要保证本地事务与消息的发送能有原子性,即本地事务成功则消息发送成功,本地事务失败则消息不发送或消息回滚,而RocketMQ的事务消息支持两阶段提交和事务回查,能保证消息的发送与本地事务之间的原子性。
我们先看看RocketMQ官网的示例:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
TransactionListener的实现如下:
import ...
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
TransactionListener的定义如下:
其中包含两个方法:
· executeLocalTransaction方法会在发送消息后调用,用于执行本地事务,如果本地事务执行成功,rocketmq再提交消息
· checkLocalTransaction用于对本地事务做检查,rocketmq依赖此方法做补偿,补偿由rocketmq的broker主动发起,因为消息的commit或者rollback可能会失败,因此需要这种补偿机制保证消息的commit或者rollback执行成功
而当一个事务消息commit成功后,消费者才能消费到此消息,RocketMQ通过事务消息的方式提供了分布式事务的支持。
RocketMQ事务消息的分布式事务方案示意图如下:
Producer的本地事务和消息的发送是具有原子性的,因此当本地事务成功后,consumer一定会接收到消息。
但是通过RocketMQ的事务消息实现的分布式事务是有局限性的,它默认了当本地事务执行成功时,consumer收到消息后的业务处理一定能执行成功,一旦consumer无法完成事务,全局事务将无法回滚。举个例子,对于库存扣减类需求,下单后需要扣减库存,而扣减库存在独立的服务中,此时如果单纯的使用事务消息无法满足,因为库存为0后,扣减会失败,而此时全局事务无法回滚。
对于事务消息实现的分布式事务,它在ACID特性上的保障非常弱:
· 原子性:部分场景下能保证,但是对于库存扣减类需求,不适合单纯的使用事务消息处理分布式事务,因为没法回滚
· 一致性:不能保证,只能得到最终一致
· 隔离性:无保证,多个事务之间不隔离
· 持久性:能保证
可靠消息实现的分布式事务,实际上是在发生P时,选择了A放弃了C,它是BASE原理的应用,BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)的简写。BASE是对CAP中一致性和可用性权衡的结果,核心思想是即使无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使得系统达到最终一致性。
不仅仅是事务消息,TCC和Saga模式也是BASE理论的应用,特征了一定的C。
4 利用事务消息做回滚
前文讲述了使用事务消息完成分布式事务的限制,且无法回滚。而我们使用服务接口调用时,在缺少TCC/Saga等分布式事务解决方案的情况下,无法得到事务的保证,无事务的情况下整个过程的调用方式如下:
这个过程缺少事务的保证,当系统A需要回滚时,系统B无法完成回滚,因为接口调用也可能失败,即使系统B提供回滚接口也无法完全保证回滚成功。
我们可以通过事务消息来做回滚,这样做虽然有些别扭,但相比前文描述的用事务消息做分布式事务而言,一致性虽然没有完全得到保障,但它保障了在非回滚的场景下的一致性,整个过程如下:
为什么说这样做有点别扭,就是因为这里将事务消息用于回滚,系统A提交事务后,需要将事务消息回滚,这样做的目的是防止系统B消费到事务消息。
如果系统A出错,需要回滚时,整个过程如下:
当系统A出现异常回滚后,需要提交事务消息使得系统B可消费到事务消息从而回滚系统B的事务。
这样做的优点:
· 实际上类似于saga模式的分布式事务,拿MQ当事务的协调者,能用于库存扣减类场景,场景限制少
· 当没有发生回滚时,能保证一致性,发生回滚时,能达到最终一致
· 无额外的依赖,在没有seata这类分布式事务解决方案的环境下可行
缺点:
· 使用上非常别扭,要将事务消息的提交与回滚与本地事务反过来,本地事务成功则消息事务回滚,本地事务回滚则提交消息事务