可靠消息与分布式事务

可靠消息与分布式事务

1 概述

前面的文章https://zhuanlan.zhihu.com/p/92866118讲述了Seata对分布式事务的支持,当生产环境中没有seata的部署时,我们如何通过可靠消息例如RocketMQ处理分布式事务。

2 分布式事务的困难

分布式事务,全局事务就是一个分布式系统,我们都知道,事务具有ACID特性,但是对于分布式环境,ACID是不能完整的得到保障的。

在分布式系统中,有个著名的CAP定理,指的是在一个分布式系统中,一致性(Consistency)、可用性(Availability)、分区容错性(Partition tolerance)。CAP 原则指的是,这三个要素最多只能同时实现两点,不可能三者兼顾。

但是在实际的分布式环境中,P是一定会存在的,所以在分布式环境中。当P出现时,我们需要在C和A之间做取舍,要和选择CP,要么选择AP。

有没有CA系统呢?有,传统的关系型数据库就是CA系统,因为它不会出现分区的情况,单库提供的ACID保障,它不是分布式的。常用的存储系统在CAP中的取舍如下图所示:

image.png

由此可知,分布式事务同样也要在C和A之间做取舍,因此分布式事务难做,在发生P时,我们需要放弃C和A中的一个。

3 事务消息的分布式事务

在使用可靠消息做分布式事务时,我们需要保证本地事务与消息的发送能有原子性,即本地事务成功则消息发送成功,本地事务失败则消息不发送或消息回滚,而RocketMQ的事务消息支持两阶段提交和事务回查,能保证消息的发送与本地事务之间的原子性。

我们先看看RocketMQ官网的示例:

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;

public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        TransactionListener transactionListener = new TransactionListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
        ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 10; i++) {
            try {
                Message msg =
                    new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();
    }
}

TransactionListener的实现如下:

import ...
 
   public class TransactionListenerImpl implements TransactionListener {
       private AtomicInteger transactionIndex = new AtomicInteger(0);
 
       private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
 
       @Override
       public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
           int value = transactionIndex.getAndIncrement();
           int status = value % 3;
           localTrans.put(msg.getTransactionId(), status);
           return LocalTransactionState.UNKNOW;
       }
 
       @Override
       public LocalTransactionState checkLocalTransaction(MessageExt msg) {
           Integer status = localTrans.get(msg.getTransactionId());
           if (null != status) {
               switch (status) {
                   case 0:
                       return LocalTransactionState.UNKNOW;
                   case 1:
                       return LocalTransactionState.COMMIT_MESSAGE;
                   case 2:
                       return LocalTransactionState.ROLLBACK_MESSAGE;
               }
           }
           return LocalTransactionState.COMMIT_MESSAGE;
       }
   }

TransactionListener的定义如下:

image.png

其中包含两个方法:

· executeLocalTransaction方法会在发送消息后调用,用于执行本地事务,如果本地事务执行成功,rocketmq再提交消息

· checkLocalTransaction用于对本地事务做检查,rocketmq依赖此方法做补偿,补偿由rocketmq的broker主动发起,因为消息的commit或者rollback可能会失败,因此需要这种补偿机制保证消息的commit或者rollback执行成功

而当一个事务消息commit成功后,消费者才能消费到此消息,RocketMQ通过事务消息的方式提供了分布式事务的支持。

RocketMQ事务消息的分布式事务方案示意图如下:

image.png

Producer的本地事务和消息的发送是具有原子性的,因此当本地事务成功后,consumer一定会接收到消息。

但是通过RocketMQ的事务消息实现的分布式事务是有局限性的,它默认了当本地事务执行成功时,consumer收到消息后的业务处理一定能执行成功,一旦consumer无法完成事务,全局事务将无法回滚。举个例子,对于库存扣减类需求,下单后需要扣减库存,而扣减库存在独立的服务中,此时如果单纯的使用事务消息无法满足,因为库存为0后,扣减会失败,而此时全局事务无法回滚。

对于事务消息实现的分布式事务,它在ACID特性上的保障非常弱:

· 原子性:部分场景下能保证,但是对于库存扣减类需求,不适合单纯的使用事务消息处理分布式事务,因为没法回滚

· 一致性:不能保证,只能得到最终一致

· 隔离性:无保证,多个事务之间不隔离

· 持久性:能保证

可靠消息实现的分布式事务,实际上是在发生P时,选择了A放弃了C,它是BASE原理的应用,BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)的简写。BASE是对CAP中一致性和可用性权衡的结果,核心思想是即使无法做到强一致性,但每个应用都可以根据自身的业务特点,采用适当的方式来使得系统达到最终一致性。

不仅仅是事务消息,TCC和Saga模式也是BASE理论的应用,特征了一定的C。

4 利用事务消息做回滚

前文讲述了使用事务消息完成分布式事务的限制,且无法回滚。而我们使用服务接口调用时,在缺少TCC/Saga等分布式事务解决方案的情况下,无法得到事务的保证,无事务的情况下整个过程的调用方式如下:

image.png

这个过程缺少事务的保证,当系统A需要回滚时,系统B无法完成回滚,因为接口调用也可能失败,即使系统B提供回滚接口也无法完全保证回滚成功。

我们可以通过事务消息来做回滚,这样做虽然有些别扭,但相比前文描述的用事务消息做分布式事务而言,一致性虽然没有完全得到保障,但它保障了在非回滚的场景下的一致性,整个过程如下:

image.png

为什么说这样做有点别扭,就是因为这里将事务消息用于回滚,系统A提交事务后,需要将事务消息回滚,这样做的目的是防止系统B消费到事务消息。

如果系统A出错,需要回滚时,整个过程如下:

image.png

当系统A出现异常回滚后,需要提交事务消息使得系统B可消费到事务消息从而回滚系统B的事务。

这样做的优点:

· 实际上类似于saga模式的分布式事务,拿MQ当事务的协调者,能用于库存扣减类场景,场景限制少

· 当没有发生回滚时,能保证一致性,发生回滚时,能达到最终一致

· 无额外的依赖,在没有seata这类分布式事务解决方案的环境下可行

缺点:

· 使用上非常别扭,要将事务消息的提交与回滚与本地事务反过来,本地事务成功则消息事务回滚,本地事务回滚则提交消息事务

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,921评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,635评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,393评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,836评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,833评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,685评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,043评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,694评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,671评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,670评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,779评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,424评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,027评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,984评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,214评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,108评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,517评论 2 343

推荐阅读更多精彩内容