Spring Boot整合RocketMQ之事务消息

上次简单的了解了一下在Spring Boot下通过使用rocketmq-spring-boot-starter进行普通消息的发送、接收以及使用集群模式来模拟实现广播模式,文章链接。今天来学习一下RocketMQ事务消息的发送。
RocketMQ的事务消息分为3种状态,分别是提交状态、回滚状态、中间状态:

TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。

当然因为在项目中我使用的是rocketmq-spring-boot-starter,所以表述上略有不同,但是本质是一样的。
事务消息在解决分布式事务的场景中感觉还是很有用的,虽然我们现在项目的分布式事务是通过Seata来实现的,但是通过事务消息或者消息的最终一次性也是可以的。
事务消息总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。这三个阶段是前后关联的,只有发送Prepared消息成功,才会执行本地事务,本地事务返回的状态是提交,那么就会发送最终的确认消息。如果在结束消息事务时,本地事务状态失败,那么Broker回查线程定时(默认1分钟)扫描每个存储事务状态的表格文件,如果是已经提交或者回滚的消息直接跳过,如果是Prepared状态则会向生产者发起一个检查本地事务的请求。

一、代码修改

首先我创建有一个Service来发送事务消息,代码没有什么特殊的含义,只是拿来当一个demo,代码如下:

    public Boolean save(OrderEntity orderEntity) {
        Message<OrderEntity> message = MessageBuilder.withPayload(orderEntity).build();

        log.info(">>>> send tx message start,tx_group={},destination={},payload={} <<<<",TX_GROUP,ORDER_TOPIC + ORDER_TAG,orderEntity);
        TransactionSendResult sendResult = rocketMQTemplate.sendMessageInTransaction("tx_order","order_topic:" + "tx_tag",message,orderEntity.getUserName());
        String sendStatus = sendResult.getSendStatus().name();
        String localTXState = sendResult.getLocalTransactionState().name();
        log.info(">>>> send status={},localTransactionState={} <<<<",sendStatus,localTXState);
        return Boolean.TRUE;
    }

使用RocketMQTemplate发送事务消息和普通消息略有不同的是,需要指一个事务生产者组,当然如果传入null,则会使用默认值rocketmq_transaction_default_global_name,发生消息的地址和普通消息一样都Topic:Tag,另外一点不同的是除了发生的Message之外,还可以发送其他的额外参数,不过这些参数只会在执行本地事务的时候会用到。
接下来我们创建一个消息的监听器(消费者),这个和普通消息的监听器一样,代码如下:

@Component
@RocketMQMessageListener(consumerGroup = "tx_consumer",topic = "order_topic")
public class OrderListener implements RocketMQListener<String>{

    @Override
    public void onMessage(String message) {
        log.info(">>>> message={} <<<<",message);
    }
}

除了消费者之外,我们还需要创建事务消息生产者端的消息监听器,注意是生产者,不是消费者,我们需要实现的是RocketMQLocalTransactionListener接口,代码如下:

@RocketMQTransactionListener(txProducerGroup = "tx_order")
public class OrderTXMsgListener implements RocketMQLocalTransactionListener {

    @Autowired
    private UserRepository userRepository;

    private static final Gson GSON = new Gson();

    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 执行本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
            String userName = (String) arg;
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
        log.info(">>>> TX message listener check local transaction, message={} <<<<",msg.getPayload());
        // 检查本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
        } catch (Exception e) {
            // 异常就回滚
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.ROLLBACK;
        }
        return result;
    }
}

@RocketMQTransactionListener表明这个一个生产端的消息监听器,需要配置监听的事务消息生产者组。而实现RocketMQLocalTransactionListener接口,重写执行本地事务的方法和检查本地事务方法。下面,我们通过修改生产者端事务监听器的代码来观察代码的执行情况。

二、消息事务测试

首先还是正常的启动项目,在执行本地事务方法中正常情况下返回的值是COMMIT,即提交事务,这种情况下消费者会直接消费消息,而略过检查本地事务的方法。调用该接口,项目日志输出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=lisi, price=8848.00, address=CN-SC-CD-05, createTime=null, updateTime=null, status=20) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[119], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690C3418B4AAC2842438960000, rocketmq_TAGS=tx_tag, id=f32f4848-9acf-20bb-2501-0e6088765897, contentType=application/json, timestamp=1595749766307}],args=lisi <<<<
>>>> send status=SEND_OK,localTransactionState=COMMIT_MESSAGE <<<<
>>>> message={"id":null,"userName":"lisi","price":8848.00,"address":"CN-SC-CD-05","createTime":null,"updateTime":null,"status":"20"} <<<<

通过日志分析可以看出,在执行完本地事务方法之后,返回的本地事务状态是COMMIT_MESSAGE,接着消费者消费消息,和我们的预期是一样的。
接下来我们修改下执行本地事务的方法,让该方法返回状态为RocketMQLocalTransactionState.UNKNOWN,修改之后如下:

    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        log.info(">>>> TX message listener execute local transaction, message={},args={} <<<<",msg,arg);
        // 执行本地事务
        RocketMQLocalTransactionState result = RocketMQLocalTransactionState.COMMIT;
        try {
            String jsonString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
            OrderEntity orderEntity = GSON.fromJson(jsonString, OrderEntity.class);
            String userName = (String) arg;
            int r = 11 / 0;
        } catch (Exception e) {
            log.error(">>>> exception message={} <<<<",e.getMessage());
            result = RocketMQLocalTransactionState.UNKNOWN;
        }
        return result;
    }

这样因为发生异常,该方法返回的结果是UNKNOWN,根据上文的分析,执行本地事务方法之后应该会执行检查本地事务方法,重启项目之后,再次调用一下接口,查看日志输出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhangsan, price=90001.00, address=CN-SC-CD-02, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_TAGS=tx_tag, id=dfd215f4-2aa6-f377-d1a7-ebbe3875769a, contentType=application/json, timestamp=1595750272928}],args=zhangsan <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430µs170ns).
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F00000000000156AB, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=ea3c3a7a-23c6-5acf-4c0f-0fa42f795b41, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750310890}] <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[124], headers={rocketmq_QUEUE_ID=0, TRANSACTION_CHECK_TIMES=2, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750272923, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000015892, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC2842BF39A0000, rocketmq_SYS_FLAG=0, id=cddfa35c-c8b2-cb1b-dce7-a26c6888b99a, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750374536}] <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<
>>>> message={"id":null,"userName":"zhangsan","price":90001.00,"address":"CN-SC-CD-02","createTime":null,"updateTime":null,"status":"10"} <<<<

根据日志输出,在Service中返回的事务消息发送状态是SEND_OK,但是返回的本地事务状态是UNKNOW,所以需要执行检查本地事务方法,但是这里出现了一个问题就是检查本地事务方法执行了两次,而且事务消息也被消费了两次,感觉有点不正常了,但是检查发现两条信息日志中rocketmq_TRANSACTION_ID是一样的,这是什么情况??会不会和HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=1m22s578ms430µs170ns).有关呢,因为当时自己使用的DEBUG模式,看代码停留了一段时间,这样导致Broker发起的第一个回查线程挂起,而这时Broker又启动了一个线程,从而执行了两次检查事务的代码,而该方法返回的是COMMIT,所以。
不使用DEBUG模式重新测试一下,日志如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=wangwu, price=9876.00, address=CN-SC-CD-00, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_TAGS=tx_tag, id=464edcfe-09c1-cc4a-5ac3-f3df888b0102, contentType=application/json, timestamp=1595750727701}],args=wangwu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=UNKNOW <<<<
>>>> TX message listener check local transaction, message=GenericMessage [payload=byte[121], headers={rocketmq_QUEUE_ID=3, TRANSACTION_CHECK_TIMES=1, rocketmq_TAGS=tx_tag, rocketmq_BORN_TIMESTAMP=1595750727699, rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_MESSAGE_ID=C0A8006900002A9F0000000000016109, rocketmq_TRANSACTION_ID=C0A800690E9D18B4AAC28432E4130005, rocketmq_SYS_FLAG=0, id=77765356-fc4d-6d05-3531-6a67fbbed7f7, rocketmq_BORN_HOST=192.168.0.105, contentType=application/json, timestamp=1595750790917}] <<<<
>>>> message={"id":null,"userName":"wangwu","price":9876.00,"address":"CN-SC-CD-00","createTime":null,"updateTime":null,"status":"10"} <<<<

这里输出的日志信息又没有问题了,我个人认为上面应该就是DEBUG导致的,这里就不再探讨了。
接下来测试一下,在执行本地事务方法中返回ROLLBACK的情况,这里代码就省略了,直接返回ROLLBACK。日志输出如下:

>>>> send tx message start,tx_group=tx_order,destination=order_topic:tx_tag,payload=OrderEntity(id=null, userName=zhaoliu, price=10000.00, address=CN-SC-CD-03, createTime=null, updateTime=null, status=10) <<<<
>>>> TX message listener execute local transaction, message=GenericMessage [payload=byte[123], headers={rocketmq_TOPIC=order_topic, rocketmq_FLAG=0, rocketmq_TRANSACTION_ID=C0A800691A3A18B4AAC284F72B910000, rocketmq_TAGS=tx_tag, id=d5b24a82-8d8b-90ad-7322-adfe2c4f3026, contentType=application/json, timestamp=1595763591062}],args=zhaoliu <<<<
>>>> exception message=/ by zero <<<<
>>>> send status=SEND_OK,localTransactionState=ROLLBACK_MESSAGE <<<<

没有执行检验本地事务的方法,和之前说的一样。到这里我觉得应该基本上可以明白生产者端消息监听器中两个方法的具体作用了,主要还是理解RocketMQ事务消息的基本原理。
校验本地事务方法的返回值和执行本地事务方法的返回值的作用是一样的,这里就不再测试了。
网上找了一个图,感觉非常的直观:

图-1.png

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342