一、seata是什么
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
在19年初的时候就关注过这个中间件(当时叫Fescar),并且对它的源码进行了一下分析--阿里分布式事务解决方案fescar简析。然而当时并不成熟,并不能直接用于商用,主要有以下几个问题。
- TC的实现不完善。不支持HA,xid的生成,session的存储,锁的实现等都是以DEMO的方式提供,不能直接用于线上环境,需要二次开发
- 回滚失败的补偿机制不完善
- 性能问题。seata当时宣称只有在第一阶段提交的时候进行加锁,相对传统的两阶段提交对性能有比较大的提升。但现实情况是为了保证回滚操作的成功,还必须要有一个全局锁,事实上相比XA的方式我个人认为在系统的吞吐量上个人认为不会有太大的变化。而且由于这种方式实现的是类似补偿性事务的方式,又会引入一个可见性的问题(第二阶段提交前就已经能看到第一阶段提交的结果)
- RPC框架。由于是阿里系的中间件,因此第一版实现的是基于dubbo,非dubbo的rpc框架需要根据自己的情况做二次开发。
二、模式
- AT
- TCC
- SAGA(新特性)
- XA(新特性)
三、核心组件
- TC
事务协调者
- TM
事务发起者。定义事务边界
- RM
事务参与者
四、demo演示
五、源码分析
注:本文使用的版本为v1.2.0
1. undo_log的产生和删除机制
- undo_log的产生
查看flushUndoLogs调用栈
flushUndoLogs:200, AbstractUndoLogManager (io.seata.rm.datasource.undo)
processGlobalTransactionCommit:221, ConnectionProxy (io.seata.rm.datasource)
doCommit:196, ConnectionProxy (io.seata.rm.datasource)
lambda$commit$0:184, ConnectionProxy (io.seata.rm.datasource)
call:-1, 362578118 (io.seata.rm.datasource.ConnectionProxy$$Lambda$197)
execute:289, ConnectionProxy$LockRetryPolicy (io.seata.rm.datasource)
commit:183, ConnectionProxy (io.seata.rm.datasource)
...
execute:108, ExecuteTemplate (io.seata.rm.datasource.exec)
execute:49, ExecuteTemplate (io.seata.rm.datasource.exec)
...
update:927, JdbcTemplate (org.springframework.jdbc.core)
deduct:51, StorageServiceImpl (io.seata.samples.dubbo.service.impl)
ConnectionProxy为Connection的代理类。
private void processGlobalTransactionCommit() throws SQLException {
try {
//**新特性,分支事务注册到TC改为在提交前进行,而不是在一开始就获取一个branchId
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}
try {
//根据数据库的类型获取对应的UndoLogManager进行刷写undolog日志
UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this);
//原connection的commit操作
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
report(false);
throw new SQLException(ex);
}
if (IS_REPORT_SUCCESS_ENABLE) {
report(true);
}
context.reset();
}
AbstractUndoLogManager.java
@Override
public void flushUndoLogs(ConnectionProxy cp) throws SQLException {
//通过连接代理获取连接的上下文,这里先不分析xid的传递机制,留给后面的部分进行分析
ConnectionContext connectionContext = cp.getContext();
if (!connectionContext.hasUndoLog()) {
return;
}
String xid = connectionContext.getXid();
long branchId = connectionContext.getBranchId();
BranchUndoLog branchUndoLog = new BranchUndoLog();
branchUndoLog.setXid(xid);
branchUndoLog.setBranchId(branchId);
//具体的undolog的内容
branchUndoLog.setSqlUndoLogs(connectionContext.getUndoItems());
UndoLogParser parser = UndoLogParserFactory.getInstance();
byte[] undoLogContent = parser.encode(branchUndoLog);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Flushing UNDO LOG: {}", new String(undoLogContent, Constants.DEFAULT_CHARSET));
}
//实际的写入操作,不同的关系型数据库有不同的实现
insertUndoLogWithNormal(xid, branchId, buildContext(parser.getName()), undoLogContent,
cp.getTargetConnection());
}
为了直观显示,我这里给出了一条undo_log的数据
id: 32
branch_id: 2011290555
xid: 172.17.0.1:8091:2011290554
context: serializer=jackson
rollback_info: {"@class":"io.seata.rm.datasource.undo.BranchUndoLog","xid":"172.17.0.1:8091:2011290554","branchId":2011290555,"sqlUndoLogs":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.undo.SQLUndoLog","sqlType":"UPDATE","tableName":"storage_tbl","beforeImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"storage_tbl","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":4,"value":4},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":201}]]}]]},"afterImage":{"@class":"io.seata.rm.datasource.sql.struct.TableRecords","tableName":"storage_tbl","rows":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Row","fields":["java.util.ArrayList",[{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"id","keyType":"PRIMARY_KEY","type":4,"value":4},{"@class":"io.seata.rm.datasource.sql.struct.Field","name":"count","keyType":"NULL","type":4,"value":199}]]}]]}}]]}
log_status: 0
log_created: 2020-05-10 10:02:53
log_modified: 2020-05-10 10:02:53
ext: NULL
这个操作为把用户的库存记录-2。beforeImage的count为201,afterImage的count为199。
- RPC框架整合(xid传递)
- 锁的机制(全局锁和局部锁)
- session状态存储
- 全局ID生成