3-fescar源码分析-分支事务注册
一、官方介绍
3.RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。
那这一篇主要分析fescar如何执行业务逻辑?TM 获取到全局事务及XID后,开始执行各个rpc服务的业务逻辑,那么此时如何将各个rpc进行划分成分支事务并且注册到TC,进而如何准备数据集回滚脚本的。
--
二、(原理)源码分析
紧接着上一篇的TC获取全局事务及XID分析,依然借助官网的example例图进行出发。
2.1 demo
-
继续看下官网的结构图:
项目中存在官方的example模块,里面就模拟了上图的相关流程:先回到本节主题:**分支事务注册**
2.2 rpc业务执行
-
TransactionalTemplate执行业务逻辑
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException { ... // Do Your Business rs = business.execute(); ... return rs; }
这里就是具体的rpc服务的逻辑业务了,即:BusinessServiceImpl中的加了注解的@GlobalTransactional方法
@Override @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx") public void purchase(String userId, String commodityCode, int orderCount) { LOGGER.info("purchase begin ... xid: " + RootContext.getXID()); storageService.deduct(commodityCode, orderCount); orderService.create(userId, commodityCode, orderCount); throw new RuntimeException("xxx"); }
紧接着调用rpc服务StorageServiceImpl 的deduct方法:
@Override public void deduct(String commodityCode, int count) { ... jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode}); ... }
回头看到配置文件中:
<bean id="storageDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy"> <constructor-arg ref="storageDataSource" /> </bean> <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate"> <property name="dataSource" ref="storageDataSourceProxy" /> </bean> <bean id="service" class="com.alibaba.fescar.tm.dubbo.impl.StorageServiceImpl"> <property name="jdbcTemplate" ref="jdbcTemplate"/> </bean>
-
JdbcTemplate分析
此处jdbcTemplate用的是spring提供的,且dataSource指向的是com.alibaba.fescar.rm.datasource.DataSourceProxy.
debug跟踪JdbcTemplate逻辑到一下方法:@Override public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) throws DataAccessException { ... Connection con = DataSourceUtils.getConnection(getDataSource()); PreparedStatement ps = null; try { Connection conToUse = con; if (this.nativeJdbcExtractor != null && this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) { conToUse = this.nativeJdbcExtractor.getNativeConnection(con); } ps = psc.createPreparedStatement(conToUse); applyStatementSettings(ps); PreparedStatement psToUse = ps; if (this.nativeJdbcExtractor != null) { psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps); } T result = action.doInPreparedStatement(psToUse); handleWarnings(ps); return result; } catch (SQLException ex) { ... } finally { ... } }
核心逻辑就在:
Connection con = DataSourceUtils.getConnection(getDataSource()); #JdbcAccessor public DataSource getDataSource() { return this.dataSource; } #JdbcAccessor @Override public ConnectionProxy getConnection() throws SQLException { assertManaged(); Connection targetConnection = targetDataSource.getConnection(); return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType()); }
@Override public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action) throws DataAccessException { ... Connection con = DataSourceUtils.getConnection(getDataSource()); PreparedStatement ps = null; try { Connection conToUse = con; if (this.nativeJdbcExtractor != null && this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) { conToUse = this.nativeJdbcExtractor.getNativeConnection(con); } ps = psc.createPreparedStatement(conToUse); applyStatementSettings(ps); PreparedStatement psToUse = ps; if (this.nativeJdbcExtractor != null) { psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps); } T result = action.doInPreparedStatement(psToUse); handleWarnings(ps); return result; } catch (SQLException ex) { ... } finally { ... } }
jdbcTemplate继承至JdbcAccessor,还记的开始的配置文件中配置的dataSource吗?因此该获取的dataSource指向的是com.alibaba.fescar.rm.datasource.DataSourceProxy。
那么继续看getConnection就是自然调用的是DataSourceProxy中的,最终返回ConnectionProxy。
紧接着ps = psc.createPreparedStatement(conToUse);调用的就是AbstractConnectionProxy#createPreparedStatement()逻辑,返回PreparedStatementProxy。
继续往后追踪到executeUpdate逻辑:自然此时这里的executeUpdate;就是PreparedStatementProxy中的方法体:
@Override public int executeUpdate() throws SQLException { return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() { @Override public Integer execute(PreparedStatement statement, Object... args) throws SQLException { return statement.executeUpdate(); } }); }
以上就是整个jdbc的追溯逻辑了,进而跟踪后面的代码流程。
-
ExecuteTemplate
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer, StatementProxy<S> statementProxy, StatementCallback<T, S> statementCallback, Object... args) throws SQLException { if (!RootContext.inGlobalTransaction()) { // Just work as original statement return statementCallback.execute(statementProxy.getTargetStatement(), args); } if (sqlRecognizer == null) { sqlRecognizer = SQLVisitorFactory.get( statementProxy.getTargetSQL(), statementProxy.getConnectionProxy().getDbType()); } Executor<T> executor = null; if (sqlRecognizer == null) { executor = new PlainExecutor<T, S>(statementProxy, statementCallback); } else { switch (sqlRecognizer.getSQLType()) { case INSERT: executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case UPDATE: executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case DELETE: executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer); break; case SELECT_FOR_UPDATE: executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer); break; default: executor = new PlainExecutor<T, S>(statementProxy, statementCallback); break; } } T rs = null; try { rs = executor.execute(args); } catch (Throwable ex) { ... } return rs; }
- 根据不同的语句生成不同的执行器
- 具体实现会保存更改前后的数据镜像并插入到undo log里并commit。只有回滚用undo log里的数据生成sql语句回滚
- 由于AT事务在第一阶段已提交,所以commit过程是由Asyncworker异步删除undo log,真正的commit是在第一阶段完成的
@Override public Object execute(Object... args) throws Throwable { String xid = RootContext.getXID(); statementProxy.getConnectionProxy().bind(xid); return doExecute(args); }
获取XID,将当前数据库连接绑定到XID,执行逻辑:
protected T executeAutoCommitTrue(Object[] args) throws Throwable { T result = null; AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); LockRetryController lockRetryController = new LockRetryController(); try { connectionProxy.setAutoCommit(false); while (true) { try { result = executeAutoCommitFalse(args); connectionProxy.commit(); break; } catch (LockConflictException lockConflict) { lockRetryController.sleep(lockConflict); } } } catch (Exception e) { ... return result; }
protected T executeAutoCommitFalse(Object[] args) throws Throwable { TableRecords beforeImage = beforeImage(); T result = statementCallback.execute(statementProxy.getTargetStatement(), args); TableRecords afterImage = afterImage(beforeImage); statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage); return result; }
上叙代码很清晰:
- 1.获取脚本执行前数据镜像
- 2.执行脚本逻辑(此时还未commit)
- 3.获取脚本执行后数据镜像
- 4.根据前后数据镜像及XID等信息准备回滚的ubdo脚本
执行完以上逻辑后,commit,此处的commit是将上叙执行结果集回滚的脚本进行提交,但提交前要做一件重要的事,那就是本节中重心:注册分支事务:
connectionProxy.commit();
#ConnectionProxy @Override public void commit() throws SQLException { if (context.inGlobalTransaction()) { try { /** 去TC注册分支事务 */ register(); ... UndoLogManager.flushUndoLogs(this); targetConnection.commit(); } catch (Throwable ex) { ... } report(true); context.reset(); } else { targetConnection.commit(); } }
--
2.3.RM 注册分支事务
-
branchRegister 注册事务分支
private void register() throws TransactionException { Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.buildLockKeys()); context.setBranchId(branchId); } # DataSourceManager @Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException { try { BranchRegisterRequest request = new BranchRegisterRequest(); request.setTransactionId(XID.getTransactionId(xid)); request.setLockKey(lockKeys); request.setResourceId(resourceId); BranchRegisterResponse response = (BranchRegisterResponse) RmRpcClient.getInstance().sendMsgWithResponse(request); if (response.getResultCode() == ResultCode.Failed) { throw new TransactionException(response.getTransactionExceptionCode(), "Response[" + response.getMsg() + "]"); } return response.getBranchId(); } catch (TimeoutException toe) { ... } }
- 1.通过rpcclient到服务端注册分支事务
- 2.将返回的事务分支BranchId保存进ConnectionContext,用以事务分支的对称。
上面的代码跟开启事务逻辑基本一致,同样是发送“注册分支事务”消息给server端,server收到事务分支注册消息后进行逻辑处理。
分支事务注册成功之后,数据逻辑执行、回滚脚本执行,完成整个分支事务的注册。
那么接下类分析的是server端收到事务分支的注册信息后如何处理了。
--
2.3.TC 收到消息,开启处理分支事务的注册
-
前面接受netty消息的逻辑与begin事务逻辑类似,此处不做强调,直接导核心逻辑:处理分支注册
#DefaultCoordinator @Override protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response, RpcContext rpcContext) throws TransactionException { response.setTransactionId(request.getTransactionId()); response.setBranchId( core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(), XID.generateXID(request.getTransactionId()), request.getLockKey())); }
继续跟踪:
@Override public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException { GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin); BranchSession branchSession = new BranchSession(); branchSession.setTransactionId(XID.getTransactionId(xid)); branchSession.setBranchId(UUIDGenerator.generateUUID()); branchSession.setApplicationId(globalSession.getApplicationId()); branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup()); branchSession.setBranchType(branchType); branchSession.setResourceId(resourceId); branchSession.setLockKey(lockKeys); branchSession.setClientId(clientId); if (!branchSession.lock()) { throw new TransactionException(LockKeyConflict); } try { globalSession.addBranch(branchSession); } catch (RuntimeException ex) { throw new TransactionException(FailedToAddBranch); } return branchSession.getBranchId(); }
逻辑很明晰:
- 1.根据XID获取全局事务GlobalSession
- 2.构造一个事务分支:BranchSession,并赋值相关属性
- 3.将BranchSession进行上锁处理
- 4.将分支事务BranchSession加入全局事务GlobalSession
- 5.返回创建的branchId给客户端
- 6.完成分支的注册工作。
此时 TC 与 RM 即维持了同步的branchId信息。
至此,事务分支的注册完成。
--
三.未完待续。。。
后续分析主要还是根据example官方实例分为:事务回滚、事务提交进行。
同时后续每一流程都紧密关联Server,因此还会频繁回到上叙server启动后,收到消息被触发的后续逻辑。