回顾
上一篇文章我们说到,各种分布式事务解决方案的特点,其中最后提到了可靠消息事务最终一致性这种解决方案,而我们这篇文章的标题也是它,没错,我们接下来要详细的分析该解决方案的实现细节了,上一篇文章在介绍该解决方案时,已经说了那个执行流程分析图,仅仅只是一个粗略图而已,实际上,可靠消息事务最终一致性的设计是非常复杂的。那么为什么要花那么多时间来详细分析它的实现细节呢?原因是接下来我们开发的这个分布式事务框架,就是使用可靠消息事务最终一致性的方案,选择它的理由很简单,就是因为它比较复杂,所以实现的过程中,能学到的技术和得到的锻炼比较多。
总体设计
先来看看以下这张设计图:
这张设计图是在上一章的执行流程基础上,再补充了一些细节,实际上,要完成可靠消息最终一致性,并不是仅仅依靠消息队列就行了,还需要很多其他组件共同协作,这些组件在以不修改业务方法的前提下,通过组件或者扩展的方式整合到项目中,具有可插拔性,这样的话,才能做到对业务项目的侵入性低的目的。
那么通过上述的总体设计流程图,我们可以提取出以下几个组件:1、拦截器组件(Interceptor),2、事务协调组件(Coordinator),3、事务日志存储组件(Repository),4、可靠消息组件(MQ),5、补偿调用组件(Invoker),6、定时器组件(Scheduler),7、初始化组件(Initiator)。接下来,我们详细的看看,每一个组件是什么,做什么,并且它们之间如何协调工作的。
注意:以下所涉及的代码不是真正的实现,而是伪代码
1、拦截器组件(Interceptor)
通过设计图的分析,我们可以知道,有两个地方需要拦截的,一个是在发起RPC请求时,需要对请求进行拦截,一个是RPC请求到达了远程目的方法后,执行方法前的拦截。
-
发起RPC请求的拦截
在发起RPC请求的拦截器中,我们需要告诉事务协调者,自己的角色是事务发起者的角色,这一步是至关重要的,因为一个事务发起者需要保存这多个事务参与者的信息,举个例子:转账(transfer)业务方法开启了分布式事务,并且transfer方法中有两个远程方法,分别是扣钱(decrease)和加钱(augment)。那transfer就是事务发起者,decrease和augment就是事务参与者。
transfer在分别调用decrease和augment者两个远程方法时,先进入了拦截器,把decrease和augment这两个事务参与者的信息添加到transfer这个事务发起者中,也就是我们常说的一对多关系。
在执行完了transfer方法后,整个transfer这个事务发起者下有多少个事务参与者都添加完毕了,接下来就把自己的这个事务发起者角色告诉事务协调者,到这里,发起RPC请求的拦截器做的事情就结束,接下来就是事务协调者做的事情了,我们后面在来讲事务协调者。 结合以下分析图理解: -
执行业务方法前的拦截
跟发起RPC请求的拦截类似,在RPC请求到达了目的方法后,开始执行方法,但在执行方法前,我们还需要做一些事情,就是把自己作为事务参与者的角色告知事务协调者,还是以拦截器的形式做这些事情,但是拦截器又如何知道自己将要执行的业务方法就是事务参与者呢?这就要回到RPC请求上了:transfer远程RPC调用decrease和augment时,需要给这两个地址追加一个参数,该参数具体是什么东西没有关系,只要能知道事务发起者是谁就行了(暂且我们叫该参数为transactionId),那么decrease和augment所在的过滤器获取RPC参数名为transactionId的值,发现有值,那就把自己确定问事务参与者,并且通过transactionId的值知道,自己所属的事务发起者是谁。 结合以下分析图理解:
2、事务协调组件(Coordinator)
事务协调者是所有分布式事务解决方案都会有的一个核心处理器,它作为各个分布在不同JVM中的本地事务间接通讯的桥梁。在可靠消息事务最终一致性的解决方案中,它主要负责事务的发起、事务的参与、事务的提交、事务出错处理、发送事务MQ消息等,凡是涉及到事务的生命周期操作,都经过它做统筹处理。它直接操作下一个要讲解的组件—事务日志存储组件(Repository)。
3、事务日志存储组件(Repository)
事务日志存储组件主要负责储存事务日志的操作,事务日志的核心信息为:事务id、事务类和方法,事务状态、事务角色、事务参与者集合。所以针对事务的操作非常多,会涉及到多线程和大并发的问题,所以这里到时候需要考虑线程安全和支持大并发的设计。 结合以下分析图理解:
4、可靠消息组件(MQ)
可靠消息组件顾名思义,就是发送MQ消息的,而消息体就是:事务发起者中的各个事务参与者详细信息,还是以我们上面的例子说明:transfer在分别调用decrease和augment者两个远程方法,transfer是事务发起者,decrease和augment是transfer的事务参与者,在transfer方法执行完后,得到一个事务信息,该信息传给了事务协调者,事务协调者会遍历该事务中的参与者列表,每遍历到一个事务参与者,就往该参与者所监听的MQ消息地点发送消息,消息体的核心信息为:“参与者的业务方法”。这样的话参与者如果在执行业务的过程中报错了,还可以到MQ中获取消息,重新执行业务方法。结合以下分析图理解(为了分析图更直观,删减了一些流程):
5、补偿调用组件(Invoker)
消息补偿组件起到的作用就是:各个事务参与者在各种异常情况下,没法正常执行业务方法,比如:事务发起者transfer调用了decrease和augment,但是augment所在的服务器宕机,导致不能接受RPC请求,这样事务就有问题了,decrease扣钱成功,augment加钱失败。这时MQ就起作用了,我们刚刚已经知道,在transfer方法执行完后,会给MQ的decrease和augment地点发送消息,消息体有个核心信息:“执行的业务方法”。那这样,augment所在的服务器重启后,马上监听到MQ的augment地点消息,然后把消息交给事务补偿调用组件(Invoker),Invoker重新调用消息体中的业务方法,完成事务补偿。 同理,如果augment所在的服务器没有宕机,但却在执行业务方法的过程中报错,则MQ会新增一条消息,等着augment获取,并重新执行业务方法。那为什么报错了会自动往MQ发消息呢?这个涉及到定时器组件(Scheduler),我们下面再详细分析定时器的作用。
但这里需要特别注意的是,Invoker需要做好幂等性的操作,因为transfer方法执行完了,事务协调者就会往decrease和augment两个事务参与者监听的消息地点发送消息,所以,不管decrease和augment是否正常执行,消息都会发送出去,那问题就来了,通过RPC已经正常调用了一次,然后监听到MQ消息又调用了一次,就调用了两次了,所以事务补偿调用组件需要做好幂等性,防止业务方法执行多次。
6、定时器组件(Scheduler)
定时器组件(Scheduler)就是我们刚刚提到的一种情况的问题:服务器正常运行,能接收RPC远程调用请求,但是执行过程中报错,那这时,定时器就起作用了,定时器在项目初始化时就需要设置好。
业务方法执行报错,进入捕捉异常流程,通知事务协调者,把对应的事务日志状态修改为“错误”状态,然后定时器每隔一段时间扫描状态为“错误”的事务日志,扫描到之后,即往MQ发送消息,那接下来的流程,又回到了事务补偿的流程了。事务补偿成功后,需要把对应的事务日志状态修改为“提交”状态。结合以下分析图理解(为了分析图更直观,删减了一些流程):
7、初始化组件(Initiator)
这一个组件就不用多说了,我们设置定时器,监听MQ的Destination,还有做一些初始化参数设置等,都是需要在这里执行。
总结
分布式事务本身就是一件非常复杂的事情,所以在设计阶段就要考虑的比较完善,这样在具体实现的时候才不会有太多问题。而通过这篇文章的讲解,已经从宏观上了解了这种“可靠消息事务最终一致性”解决方案的来龙去脉,相当于设计稿就有了。那接下来,我们就要根据这个设计稿来做具体功能的实现。在写代码实现的过程中,由于会接触到更细节的问题,这些细节又不可能在设计阶段面面俱到,所以很可能会出现具体实现与设计不一样的情况,这是正常的,只要在合理的调整范围内,大体流程上并没有改变就行了。