3-fescar(seata)源码分析-分支事务注册

3-fescar源码分析-分支事务注册

一、官方介绍

3.RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖。

那这一篇主要分析fescar如何执行业务逻辑?TM 获取到全局事务及XID后,开始执行各个rpc服务的业务逻辑,那么此时如何将各个rpc进行划分成分支事务并且注册到TC,进而如何准备数据集回滚脚本的。

--

二、(原理)源码分析

紧接着上一篇的TC获取全局事务及XID分析,依然借助官网的example例图进行出发。

2.1 demo
  • 继续看下官网的结构图:


    image.png
项目中存在官方的example模块,里面就模拟了上图的相关流程:先回到本节主题:**分支事务注册**
2.2 rpc业务执行
  • TransactionalTemplate执行业务逻辑

    public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
            ...
            // Do Your Business
            rs = business.execute();
            ...
            return rs;
        }
    

    这里就是具体的rpc服务的逻辑业务了,即:BusinessServiceImpl中的加了注解的@GlobalTransactional方法

    @Override
    @GlobalTransactional(timeoutMills = 300000, name = "dubbo-demo-tx")
    public void purchase(String userId, String commodityCode, int orderCount) {
        LOGGER.info("purchase begin ... xid: " + RootContext.getXID());
        storageService.deduct(commodityCode, orderCount);
        orderService.create(userId, commodityCode, orderCount);
        throw new RuntimeException("xxx");
    }
    

    紧接着调用rpc服务StorageServiceImpl 的deduct方法:

    @Override
    public void deduct(String commodityCode, int count) {
        ...
        jdbcTemplate.update("update storage_tbl set count = count - ? where commodity_code = ?", new Object[] {count, commodityCode});
        ...
    }
    

    回头看到配置文件中:

    <bean id="storageDataSourceProxy" class="com.alibaba.fescar.rm.datasource.DataSourceProxy">
        <constructor-arg ref="storageDataSource" />
    </bean>
    <bean id="jdbcTemplate" class="org.springframework.jdbc.core.JdbcTemplate">
        <property name="dataSource" ref="storageDataSourceProxy" />
    </bean>
    
    <bean id="service" class="com.alibaba.fescar.tm.dubbo.impl.StorageServiceImpl">
        <property name="jdbcTemplate" ref="jdbcTemplate"/>
    </bean>
    
  • JdbcTemplate分析

    此处jdbcTemplate用的是spring提供的,且dataSource指向的是com.alibaba.fescar.rm.datasource.DataSourceProxy.
    debug跟踪JdbcTemplate逻辑到一下方法:

    @Override
    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
            throws DataAccessException {
    
        ...
    
        Connection con = DataSourceUtils.getConnection(getDataSource());
        PreparedStatement ps = null;
        try {
            Connection conToUse = con;
            if (this.nativeJdbcExtractor != null &&
                    this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) {
                conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
            }
            ps = psc.createPreparedStatement(conToUse);
            applyStatementSettings(ps);
            PreparedStatement psToUse = ps;
            if (this.nativeJdbcExtractor != null) {
                psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps);
            }
            T result = action.doInPreparedStatement(psToUse);
            handleWarnings(ps);
            return result;
        }
        catch (SQLException ex) {
            ...     }
        finally {
            ...
        }
    }
    

    核心逻辑就在:

    Connection con = DataSourceUtils.getConnection(getDataSource());
    
    #JdbcAccessor
    public DataSource getDataSource() {
        return this.dataSource;
    }
    
    #JdbcAccessor
    @Override
    public ConnectionProxy getConnection() throws SQLException {
        assertManaged();
        Connection targetConnection = targetDataSource.getConnection();
        return new ConnectionProxy(this, targetConnection, targetDataSource.getDbType());
    }
    
    @Override
    public <T> T execute(PreparedStatementCreator psc, PreparedStatementCallback<T> action)
            throws DataAccessException {
    
        ...
    
        Connection con = DataSourceUtils.getConnection(getDataSource());
        PreparedStatement ps = null;
        try {
            Connection conToUse = con;
            if (this.nativeJdbcExtractor != null &&
                    this.nativeJdbcExtractor.isNativeConnectionNecessaryForNativePreparedStatements()) {
                conToUse = this.nativeJdbcExtractor.getNativeConnection(con);
            }
            ps = psc.createPreparedStatement(conToUse);
            applyStatementSettings(ps);
            PreparedStatement psToUse = ps;
            if (this.nativeJdbcExtractor != null) {
                psToUse = this.nativeJdbcExtractor.getNativePreparedStatement(ps);
            }
            T result = action.doInPreparedStatement(psToUse);
            handleWarnings(ps);
            return result;
        }
        catch (SQLException ex) {
            ...
        }
        finally {
            ...     }
    }
    

    jdbcTemplate继承至JdbcAccessor,还记的开始的配置文件中配置的dataSource吗?因此该获取的dataSource指向的是com.alibaba.fescar.rm.datasource.DataSourceProxy。
    那么继续看getConnection就是自然调用的是DataSourceProxy中的,最终返回ConnectionProxy。
    紧接着ps = psc.createPreparedStatement(conToUse);调用的就是AbstractConnectionProxy#createPreparedStatement()逻辑,返回PreparedStatementProxy。
    继续往后追踪到executeUpdate逻辑:

    自然此时这里的executeUpdate;就是PreparedStatementProxy中的方法体:

    @Override
    public int executeUpdate() throws SQLException {
        return ExecuteTemplate.execute(this, new StatementCallback<Integer, PreparedStatement>() {
            @Override
            public Integer execute(PreparedStatement statement, Object... args) throws SQLException {
                return statement.executeUpdate();
            }
        });
    }
    

    以上就是整个jdbc的追溯逻辑了,进而跟踪后面的代码流程。

  • ExecuteTemplate

    public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
                                                     StatementProxy<S> statementProxy,
                                                     StatementCallback<T, S> statementCallback,
                                                     Object... args) throws SQLException {
    
        if (!RootContext.inGlobalTransaction()) {
            // Just work as original statement
            return statementCallback.execute(statementProxy.getTargetStatement(), args);
        }
    
        if (sqlRecognizer == null) {
            sqlRecognizer = SQLVisitorFactory.get(
                    statementProxy.getTargetSQL(),
                    statementProxy.getConnectionProxy().getDbType());
        }
        Executor<T> executor = null;
        if (sqlRecognizer == null) {
            executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
        } else {
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case UPDATE:
                    executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case DELETE:
                    executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
                    break;
                case SELECT_FOR_UPDATE:
                    executor = new SelectForUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                    break;
                default:
                    executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
                    break;
            }
        }
        T rs = null;
        try {
            rs = executor.execute(args);
    
        } catch (Throwable ex) {
           ...
        }
        return rs;
    }
    
    • 根据不同的语句生成不同的执行器
    • 具体实现会保存更改前后的数据镜像并插入到undo log里并commit。只有回滚用undo log里的数据生成sql语句回滚
    • 由于AT事务在第一阶段已提交,所以commit过程是由Asyncworker异步删除undo log,真正的commit是在第一阶段完成的
    @Override
    public Object execute(Object... args) throws Throwable {
        String xid = RootContext.getXID();
        statementProxy.getConnectionProxy().bind(xid);
        return doExecute(args);
    }
    

    获取XID,将当前数据库连接绑定到XID,执行逻辑:

    protected T executeAutoCommitTrue(Object[] args) throws Throwable {
        T result = null;
        AbstractConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
        LockRetryController lockRetryController = new LockRetryController();
        try {
            connectionProxy.setAutoCommit(false);
            while (true) {
                try {
                    result = executeAutoCommitFalse(args);
                    connectionProxy.commit();
                    break;
                } catch (LockConflictException lockConflict) {
                    lockRetryController.sleep(lockConflict);
                }
            }
    
        } catch (Exception e) {
           ...
        return result;
    }
    
     protected T executeAutoCommitFalse(Object[] args) throws Throwable {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        statementProxy.getConnectionProxy().prepareUndoLog(sqlRecognizer.getSQLType(), sqlRecognizer.getTableName(), beforeImage, afterImage);
        return result;
    }
    

    上叙代码很清晰:

    • 1.获取脚本执行前数据镜像
    • 2.执行脚本逻辑(此时还未commit)
    • 3.获取脚本执行后数据镜像
    • 4.根据前后数据镜像及XID等信息准备回滚的ubdo脚本

    执行完以上逻辑后,commit,此处的commit是将上叙执行结果集回滚的脚本进行提交,但提交前要做一件重要的事,那就是本节中重心:注册分支事务

    connectionProxy.commit();
    
    #ConnectionProxy
    @Override
    public void commit() throws SQLException {
        if (context.inGlobalTransaction()) {
            try {
                /** 去TC注册分支事务 */
                register();
                ...
                UndoLogManager.flushUndoLogs(this);
                targetConnection.commit();
            } catch (Throwable ex) {
                ...
            }
            report(true);
            context.reset();
            
        } else {
            targetConnection.commit();
        }
    }
    

    --

2.3.RM 注册分支事务
  • branchRegister 注册事务分支

    private void register() throws TransactionException {
        Long branchId = DataSourceManager.get().branchRegister(BranchType.AT, getDataSourceProxy().getResourceId(), null, context.getXid(), context.buildLockKeys());
        context.setBranchId(branchId);
    }
    
    # DataSourceManager
    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
        try {
            BranchRegisterRequest request = new BranchRegisterRequest();
            request.setTransactionId(XID.getTransactionId(xid));
            request.setLockKey(lockKeys);
            request.setResourceId(resourceId);
    
            BranchRegisterResponse response = (BranchRegisterResponse) RmRpcClient.getInstance().sendMsgWithResponse(request);
            if (response.getResultCode() == ResultCode.Failed) {
                throw new TransactionException(response.getTransactionExceptionCode(), "Response[" + response.getMsg() + "]");
            }
            return response.getBranchId();
        } catch (TimeoutException toe) {
           ...
        }
    }
    
    • 1.通过rpcclient到服务端注册分支事务
    • 2.将返回的事务分支BranchId保存进ConnectionContext,用以事务分支的对称。

    上面的代码跟开启事务逻辑基本一致,同样是发送“注册分支事务”消息给server端,server收到事务分支注册消息后进行逻辑处理。

    分支事务注册成功之后,数据逻辑执行、回滚脚本执行,完成整个分支事务的注册。

    那么接下类分析的是server端收到事务分支的注册信息后如何处理了。

    --

2.3.TC 收到消息,开启处理分支事务的注册
  • 前面接受netty消息的逻辑与begin事务逻辑类似,此处不做强调,直接导核心逻辑:处理分支注册

    #DefaultCoordinator
    @Override
    protected void doBranchRegister(BranchRegisterRequest request, BranchRegisterResponse response,
                                    RpcContext rpcContext) throws TransactionException {
        response.setTransactionId(request.getTransactionId());
        response.setBranchId(
            core.branchRegister(request.getBranchType(), request.getResourceId(), rpcContext.getClientId(),
                XID.generateXID(request.getTransactionId()), request.getLockKey()));
    }
    

    继续跟踪:

    @Override
    public Long branchRegister(BranchType branchType, String resourceId, String clientId, String xid, String lockKeys) throws TransactionException {
        GlobalSession globalSession = assertGlobalSession(XID.getTransactionId(xid), GlobalStatus.Begin);
    
        BranchSession branchSession = new BranchSession();
        branchSession.setTransactionId(XID.getTransactionId(xid));
        branchSession.setBranchId(UUIDGenerator.generateUUID());
        branchSession.setApplicationId(globalSession.getApplicationId());
        branchSession.setTxServiceGroup(globalSession.getTransactionServiceGroup());
        branchSession.setBranchType(branchType);
        branchSession.setResourceId(resourceId);
        branchSession.setLockKey(lockKeys);
        branchSession.setClientId(clientId);
    
        if (!branchSession.lock()) {
            throw new TransactionException(LockKeyConflict);
        }
        try {
            globalSession.addBranch(branchSession);
        } catch (RuntimeException ex) {
            throw new TransactionException(FailedToAddBranch);
    
        }
        return branchSession.getBranchId();
    }
    

    逻辑很明晰:

    • 1.根据XID获取全局事务GlobalSession
    • 2.构造一个事务分支:BranchSession,并赋值相关属性
    • 3.将BranchSession进行上锁处理
    • 4.将分支事务BranchSession加入全局事务GlobalSession
    • 5.返回创建的branchId给客户端
    • 6.完成分支的注册工作。

    此时 TC 与 RM 即维持了同步的branchId信息。

    至此,事务分支的注册完成。

--

三.未完待续。。。

后续分析主要还是根据example官方实例分为:事务回滚、事务提交进行。
同时后续每一流程都紧密关联Server,因此还会频繁回到上叙server启动后,收到消息被触发的后续逻辑。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容