老生常谈,分布式架构,分布式事务,那分布式事务是个怎么回事呢,那我们先了解什么是分布式架构。
简单来说 把功能模块化和领域化,然后各个邻域协调工作完成一个功能。还不太懂就看下下面这个图吧
是不是看了这个图 ,也是还是看不懂,看不懂就看不懂吧, 那我解释下吧 ,
需求 : 我需要一个 六角星,去开启光明能量
实现 :长方形 + 圆 + 六边形 = 六角星
那么很多人都这么想。那我就加就好了呀 那们A 系统就出来了 。可是需求量增加,一个A 系统 抗不住,或者说,当客户需要七角星怎么办,一个系统,只能在里面改造,耦合性太高,要求也很高,因为必须了解整个A 系统怎么创造出来的六角星,才能改造下,生产出七角星 ,而且还不能影响六角星的生产,反正各种弊端。
那么就有人想,那能不能把 长方形,圆形,六边形解放出来, 而且还可以随时加入不同的其他模块,而尽量不影响其他的模块。那么B 系统就是应运而生了。那么呢 ,B 系统应该怎么理解呢。那我这里先简单介绍下把。
B 系统发一个请求进来,长条的长方型(网关) 解析之后,告诉系统我需要六角星,那么需要用到,长方形,圆形,六边形。因为他们之间是独立的,那么就会出现单点故障的问题,那们六边形就会创建不出来或者创造出一个乱七八糟的东西。这些都不知我们要的, 那我们应该怎么办呢,好吧 。说了这么就 今天我的主角正式上场,分布式事务。
分布式事务
关于事务 必要要了解
ACID
原子性(Atomicity)
原子性是指事务是一个不可分割的工作单位,事务中的操作要么都发生,要么都不发生。
一致性(Consistency)
事务前后数据的完整性必须保持一致。
隔离性(Isolation)
事务的隔离性是多个用户并发访问数据库时,数据库为每一个用户开启的事务,不能被其他事务的操作数据所干扰,多个并发事务之间要相互隔离。
持久性(Durability)
持久性是指一个事务一旦被提交,它对数据库中数据的改变就是永久性的,接下来即使数据库发生故障也不应该对其有任何影响
关于分布式事务,就不能不说 CAP 理论 和BASE 理论
CAP :
C : Consistency(一致性)
A : Availability(可用性)
P: Partition tolerance(分区容错性)
一般来说,满足CAP 的架构是不存在的,但是业务使然,肯定不能出错,所以很多架构和框架都是在一致性和可用性 做取舍。
BASE
base 是在CAP 的基础上提出了的一个理论,BASE是Basically Available(基本可用)、Soft state(软状态)和Eventually consistent(最终一致性)三个短语的缩写。BASE理论是对CAP中一致性和可用性权衡的结果,其来源于对大规模互联网系统分布式实践的总结, 是基于CAP定理逐步演化而来的。BASE理论的核心思想是:即使无法做到强一致性,但每个应用都可以根据自身业务特点,采用适当的方式来使系统达到最终一致性。
BASE理论面向的是大型高可用可扩展的分布式系统,和传统的事物ACID特性是相反的,它完全不同于ACID的强一致性模型,而是通过牺牲强一致性来获得可用性,并允许数据在一段时间内是不一致的,但最终达到一致状态。但同时,在实际的分布式场景中,不同业务单元和组件对数据一致性的要求是不同的,因此在具体的分布式系统架构设计过程中,ACID特性和BASE理论往往又会结合在一起。
分布式事务,现在主流的 2pc 二阶段式提交。tcc (try -comfirm -cancel) ,还有就是MQ 补偿,为了事务的最终一致性。
2pc 二阶段式提交
其实二阶段提交时什么呢,就是加入了一个协调者(tx) 可以联想下上面那个图,下面那个长条的长方形,把它理解为tx,那么他时怎么管理的呢,首先,tx 要知道这三个模块是属于同一个事务的,那就要有唯一的ID 标识,我们称 tx_id。
那么其实二段式提交就是,把提交事务的工作交给了tx, 他来判断整个事务到底成功了没有,如果成功,那么他通知各个模块提交事务,如果没有成功,那么通知各个模块回滚事务,从而达到一致性。
TCC (try -confirm -cancel)
TCC 很好理解,故名思意,也很好用,但是这个侵入业务中,写起来简单,但是麻烦。怎么说呢,下面写份伪代码,大概的思路就是这样的。
def do ={ do something ...}
def confirm = { confirm }
def cancel = { cancel }
def main ={
try
do
catch cancel
confirm
}
do 方法是 具体的业务,cancel 和 confirm 这里是要一个main 里面,但是正常业务很多都是 回调 或者补偿进来的。这只是个思路啦,每个TCC 框架都会有自己的一套 具体的实现的。
MQ 补偿的,暂时这次先不讲。
说了这么多理论,那我们是不是开始上硬菜了,好吧,直接上代码。
dapeng 的全局事务
dapeng 的全局事务的设计就是为了解决 分布式事务的问题。
首先,先了解两个概念
名词 | 描述 |
---|---|
全局事务 | 一个支持分布式事务服务接口方法的开始到结束称为一个全局事务 |
事务过程 | 一个全局事务中调用其它支持分布式事务接口方法,该方法的开始到结束称为一个事务过程 |
架构设计:
接口设计
service DemoService{
/**
* @SoaGlobalTransactional
**/
void Do()
i32 doAddStock(i32 id,i32 num)
i32 doSubStock(i32 id,i32 num)
}
service ProcessDemoService{
/**
* @IsSoaTransactionProcess
**/
i32 doAddStockForProcess(i32 id,i32 num)
i32 doAddStockForProcess_rollback(i32 id,i32 num)
/**
* @IsSoaTransactionProcess
**/
i32 doSubStockForProcess(i32 id,i32 num)
i32 doSubStockForProcess_rollback(i32 id,i32 num)
}
dapeng 使用thrift 协议生成接口,只要在接口方法处声明,@SoaGlobalTransactional 这个是全局事务开启的方法,@IsSoaTransactionProcess 声明这个是过程方法,过程方法需求定义自己的回滚方法,规则是 方法名_rollback。 rollback 方法和过程方法入参一致(有些版本不需要参数)
实现
override def Do(): Unit = {
// 1 do 2 do addStock 3 do doSubStock
LOG.info(" ---------------- do start -----------------")
// 都在同一个事务里 , 主流程出错
val id = 1
LOG.info(s"---------------------- (1) id ->${id} ")
LOG.info(s" id -> ${BaseHelper.getTransactionId} , seq -> ${BaseHelper.getTransactionSequence} ")
new ProcessDemoServiceClient().doAddStockForProcess(id,10)
new ProcessDemoServiceClient().doSubStockForProcess(id,5)
err
LOG.info( "-------------- do end ---------------")
}
private def err: Unit ={
assert(false," err ")
}
override def doAddStock(id:Int,num: Int): Int = {
DemoSql.doAddStock(id,num)
1
}
override def doSubStock(id:Int,num: Int): Int = {
DemoSql.doSubStock(id,num)
2
}
过程事务方法
override def doAddStockForProcess(id: Int, num: Int): Int = {
LOG.info(s" id-> ${BaseHelper.getTransactionId()} , seq -> ${BaseHelper.getTransactionSequence()} ")
DemoSql.doAddStock(id,num)
}
override def doAddStockForProcess_rollback(id: Int, num: Int): Int = {
LOG.info(s" id-> ${BaseHelper.getTransactionId()} , seq -> ${BaseHelper.getTransactionSequence()} ")
DemoSql.doSubStock(id,num)
}
override def doSubStockForProcess(id: Int, num: Int): Int = {
LOG.info(s" id-> ${BaseHelper.getTransactionId()} , seq -> ${BaseHelper.getTransactionSequence()} ")
DemoSql.doSubStock(id,num)
}
override def doSubStockForProcess_rollback(id: Int, num: Int): Int = {
LOG.info(s" id-> ${BaseHelper.getTransactionId()} , seq -> ${BaseHelper.getTransactionSequence()} ")
DemoSql.doAddStock(id,num)
}
执行do 主事务方法,调用两个过程方法,最后err ,主事务回滚,过程事务执行rollback 方法,达到最终一致性。
日志贴图
从这里可以看出,dapeng 的分布式事务设计,主要是tcc 的设计思路。
然后我们看下最近热门的 seata 框架,阿里开源出来的解决分布式事务的一个框架。
首先我们先看下seata 的git 的地址
https://github.com/seata/seata
从简介中可看出来,seata 是有一个 tc ,从上文的对分布式事务处理方案中可以得出,这seate 应该是一个 2pc 二阶段式提交 提交框架,那事实上是这样的吗?
我们继续往下看,seate 的文档
https://github.com/seata/seata/wiki
从图中可看到 , Business 是TM ,向TC注册Global Transation ,确定applicationId,整个调用链就能通过这个ID关联起来,调用Storage,向TC 注册,seate 的一个创新的地点,也就是"解决" 二阶段提交 的一个方法是,先子系统commit事务,会加个锁锁住这条记录防止更新,记录undolog,并向TC注册。下个服务的调用链Order,Account 也一样,commit,加锁,记录undolog,从图中看出,每个子系统都是RM,当TM,也就是Business 完成之后,就会通知TC,Global commit ,那么TC 会通知每个RM,每个RM 就会执行commit ,释放锁 。如果调用链中出现异常,这个异常就会返回到TM,TM 会向TC 发出rollback 请求。TC 会给各个TM 发出回滚的通知,每个TM 会解析undolog,回滚数据,释放锁。
在这个过程中,主要是 TC ,TM , RM 这个三个角色在各司其职,然后完成整个分布式事务工作。从这个过程中,感觉seate 像不像 二阶段式提交,和TCC 的混合体(反正我是这样觉得的)。
TC 作为一个协调者,掌控全局,但是却没有强制挂起子系统的事务,而是让其提交事务,释放资源,这个解决了二阶段式提交的效率问题。而创新的创建了一个undolog 的表,记录undolog, 释放了TCC 中cancel 的工作量和解决入侵业务的问题。
总结:seate 是一个很优秀的分布式事务中间件。
至于seata 是怎么做的,源码分析什么的,可以看下seate 的文档,很多人分析的很不错,我在这里就不说了。
当然,我们从GitHub 上查下分布式事务的解决方案,也会有很多解决方案,像Raincat, hmily ,lcn 等都是非常优秀的框架。这些下次分解咯