Fescar TC-beigin流程

开篇

 这篇文章是用来讲解清楚TC(Transaction Coordinator:事务协调器)在处理TM发送过来的begin操作(事务开启操作)。

 核心逻辑包括GlobalSession对象的生成、GlobalSession的持久化以及XID生成。另外一个重点是GlobalSession的状态初始化为GlobalStatus.Begin。


TC begin 流程说明

  • 1.创建GlobalSession对象,GlobalSession.createGlobalSession()。

  • 2.添加周期监听器到GlobalSession当中,生命周期对象为DefaultSessionManager。

  • 3.启动GlobalSession的周期监听器,添加GlobalSession对象到全局sessionMap对象。

  • 4.启动GlobalSession的周期监听器,持久化GlobalSession对象。


TC begin 源码分析

public class DefaultCore implements Core {

    @Override
    public String begin(String applicationId, String transactionServiceGroup, 
                        String name, int timeout) 
    throws TransactionException {

        // 创建全局GlobalSession对象
        GlobalSession session = GlobalSession.createGlobalSession(
                applicationId, transactionServiceGroup, name, timeout);

        // 全局GlobalSession对象添加生命周期监听器SessionHolder.getRootSessionManager()
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());

        // 启动全局Session对象GlobalSession
        session.begin();

        // 返回新生成的XID返回
        return XID.generateXID(session.getTransactionId());
    }
}

说明:

  • GlobalSession.createGlobalSession()创建全局GlobalSession对象。

  • session.addSessionLifecycleListener()给GlobalSession对象添加生命周期监听器。

  • session.begin()方法通过生命周期监听器保存全局GlobalSession对象。

  • sessionHolder.getRootSessionManager()返回DefaultSessionManager对象。

  • XID.generateXID()创建XID值。


public class GlobalSession implements SessionLifecycle, SessionStorable {
    // 生命周期监听器的容器
    private ArrayList<SessionLifecycleListener> lifecycleListeners 
                                              =  new ArrayList<>();

    public static GlobalSession createGlobalSession(String applicationId, 
              String txServiceGroup, String txName, int timeout) {
        GlobalSession session = 
         new GlobalSession(applicationId, txServiceGroup, txName, timeout);
        return session;
    }

    public GlobalSession(String applicationId, String transactionServiceGroup, 
                         String transactionName, int timeout) {
        // 生成transactionId对象。
        this.transactionId = UUIDGenerator.generateUUID();
        this.status = GlobalStatus.Begin;

        this.applicationId = applicationId;
        this.transactionServiceGroup = transactionServiceGroup;
        this.transactionName = transactionName;
        this.timeout = timeout;
    }

    // 添加生命周期监听器
    public void addSessionLifecycleListener(
         SessionLifecycleListener sessionLifecycleListener) {
         lifecycleListeners.add(sessionLifecycleListener);
    }

    public void begin() throws TransactionException {
        this.status = GlobalStatus.Begin;
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
            lifecycleListener.onBegin(this);
        }
    }
}


// 生成TransactionId的类和方法
public class UUIDGenerator {

    private static AtomicLong UUID = new AtomicLong(1000);
    private static int UUID_INTERNAL = 200000000;

    public static long generateUUID() {
        long id = UUID.incrementAndGet();
        if (id > 2000000000) {
            synchronized (UUID) {
                if (UUID.get() >= id) {
                    id -= 2000000000;
                    UUID.set(id);
                }
            }
        }
        return id;
    }
}

说明:

  • GlobalSession构造器内部通过UUIDGenerator.generateUUID()生成transactionId。

  • addSessionLifecycleListener()方法添加生命周期监听器DefaultSessionManager。

  • begin()方法调用生命周期监听器的onBegin()方法(lifecycleListener.onBegin),实现GlobalSession的持久化。


public class SessionHolder {

    private static final String ROOT_SESSION_MANAGER_NAME = "root.data";
    private static final String ASYNC_COMMITTING_SESSION_MANAGER_NAME = "async.commit.data";
    private static final String RETRY_COMMITTING_SESSION_MANAGER_NAME = "retry.commit.data";
    private static final String RETRY_ROLLBACKING_SESSION_MANAGER_NAME = "retry.rollback.data";
    private static SessionManager ROOT_SESSION_MANAGER;
    private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
    private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;

    public static void init(String sessionStorePath) throws IOException {
        if (sessionStorePath == null) {
            ROOT_SESSION_MANAGER = new DefaultSessionManager(ROOT_SESSION_MANAGER_NAME);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        } else {
            if (!sessionStorePath.endsWith("/")) {
                sessionStorePath = sessionStorePath + "/";
            }
            ROOT_SESSION_MANAGER = new FileBasedSessionManager(ROOT_SESSION_MANAGER_NAME, sessionStorePath);
            ASYNC_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(ASYNC_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_COMMITTING_SESSION_MANAGER = new DefaultSessionManager(RETRY_COMMITTING_SESSION_MANAGER_NAME);
            RETRY_ROLLBACKING_SESSION_MANAGER = new DefaultSessionManager(RETRY_ROLLBACKING_SESSION_MANAGER_NAME);
        }
    }

    public static final SessionManager getRootSessionManager() {
        if (ROOT_SESSION_MANAGER == null) {
            throw new ShouldNeverHappenException("SessionManager is NOT init!");
        }
        return ROOT_SESSION_MANAGER;
    }
}

说明:

  • getRootSessionManager()返回DefaultSessionManager对象,实现生命周期接口。


public class DefaultSessionManager extends AbstractSessionManager {

    public DefaultSessionManager(String name) {
        super(name);
        transactionStoreManager = new TransactionStoreManager() {
            @Override
            public boolean writeSession(LogOperation logOperation, 
                                        SessionStorable session) {
                return false;
            }

            @Override
            public void shutdown() {

            }

            @Override
            public List<TransactionWriteStore> readWriteStoreFromFile(int readSize, 
                boolean isHistory) {
                return null;
            }

            @Override
            public boolean hasRemaining(boolean isHistory) {
                return false;
            }
        };
    }
}

public abstract class AbstractSessionManager 
                      implements SessionManager, SessionLifecycleListener {

    protected static final Logger LOGGER = LoggerFactory.getLogger(AbstractSessionManager.class);

    protected Map<Long, GlobalSession> sessionMap = new ConcurrentHashMap<>();

    protected TransactionStoreManager transactionStoreManager;

    protected String name;

    public AbstractSessionManager(String name) {
        this.name = name;
    }

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        transactionStoreManager.writeSession(LogOperation.GLOBAL_ADD, session);
        sessionMap.put(session.getTransactionId(), session);

    }

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
        addGlobalSession(globalSession);
    }
}

说明:

  • DefaultSessionManager是GlobalSession的生命周期管理器。
  • DefaultSessionManager的父类AbstractSessionManager实现SessionLifecycleListener接口。
  • DefaultSessionManager的调用父类AbstractSessionManager的onBegin()方法。
  • onBegin()方法内部执行addGlobalSession()方法添加GlobalSession对象。
  • addGlobalSession()方法执行transactionStoreManager.writeSession()执行持久化,自定义的TransactionStoreManager啥都不操作。
  • transactionStoreManager是DefaultSessionManager内生成TransactionStoreManager对象。
  • addGlobalSession()方法执行sessionMap.put()保存GlobalSession对象。


Fescar源码分析连载

Fescar 源码解析系列

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容

  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981阅读 15,842评论 2 11
  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,076评论 1 32
  • 循环引用:http://ios.jobbole.com/82077/类别的作用功能:1.扩充现有类的功能2.对现有...
    得一切从简阅读 491评论 0 1
  • 前端开发面试题 <a name='preface'>前言</a> 只看问题点这里 看全部问题和答案点这里 本文由我...
    自you是敏感词阅读 747评论 0 3
  • by 夏侯砯琰 透过回忆的窗 很荒凉 却并不凄凉 只因你 还在我的心上 会发亮 就像是白月光 明天啊 又会谪往何...
    夏侯砯琰阅读 274评论 0 2