ShardingSphere 如何完美驾驭分布式事务与 XA 协议?

0 前言

基于上一文基础,详细展开 ShardingSphere 分布式事务实现。先看支持强一致性事务的XAShardingTransactionManager。

1 XAShardingTransactionManager

回到 ShardingSphere,来到 sharding-transaction-xa-core 工程的 XAShardingTransactionManager 类,分布式事务的 XA 实现类。

1.1 类定义和变量

// 实现ShardingTransactionManager接口
public final class XAShardingTransactionManager implements ShardingTransactionManager {
    
    // 保存一组 XATransactionDataSource
    private final Map<String, XATransactionDataSource> cachedDataSources = new HashMap<>();
    
    private final XATransactionManager xaTransactionManager = XATransactionManagerLoader.getInstance().getTransactionManager();

...
}

XATransactionManager 实例加载仍采用 JDK ServiceLoader 类:

private XATransactionManager load() {
        Iterator<XATransactionManager> xaTransactionManagers = ServiceLoader.load(XATransactionManager.class).iterator();
        XATransactionManager result = xaTransactionManagers.next();
        return result;
}

XATransactionManager 是对第三方 XA 事务管理器的抽象,通过上述代码,可看到在找不到合适XATransactionManager时,系统会默认创建一个AtomikosTransactionManager。而这XATransactionManager的定义实际位于单独的一个代码工程sharding-transaction-xa-spi,接口定义:

public interface XATransactionManager extends AutoCloseable {

    //初始化 XA 事务管理器
    void init();

    //注册事务恢复资源
    void registerRecoveryResource(String dataSourceName, XADataSource xaDataSource);

    //移除事务恢复资源
    void removeRecoveryResource(String dataSourceName, XADataSource xaDataSource);

    //嵌入一个 SingleXAResource 资源
    void enlistResource(SingleXAResource singleXAResource);

    //返回 TransactionManager
    TransactionManager getTransactionManager();
}

详细用法还要结合具体XATransactionManager实现类进行理解。这里我们还发现了一个 SingleXAResource,这个类同样位于 sharding-transaction-xa-spi 工程中,名称上看应该是对 JTA 中 XAResource 接口的实现:

public final class SingleXAResource implements XAResource {

    private final String resourceName;

    private final XAResource delegate;

    @Override
    public void start(final Xid xid, final int i) throws XAException {
        delegate.start(xid, i);
    } 
    @Override
    public void commit(final Xid xid, final boolean b) throws XAException {
        delegate.commit(xid, b);
    }

    @Override
    public void rollback(final Xid xid) throws XAException {
        delegate.rollback(xid);
    } 
    @Override
    public boolean isSameRM(final XAResource xaResource) {
        SingleXAResource singleXAResource = (SingleXAResource) xaResource;
        return resourceName.equals(singleXAResource.getResourceName());
    }
    …
}

虽实现JTA的XAResource接口,但更像是代理类,具体操作还是委托给内部XAResource实现。

2 XA分布式事务的核心类

2.1 XADataSource

属JDBC规范内容,为获取XAConnection。

构建

XADataSourceFactory负责生成具体XADataSource:

public static XADataSource build(final DatabaseType databaseType, final DataSource dataSource) {
        XADataSourceDefinition xaDataSourceDefinition = XADataSourceDefinitionFactory.getXADataSourceDefinition(databaseType);
        XADataSource result = createXADataSource(xaDataSourceDefinition);
        Properties xaProperties = xaDataSourceDefinition.getXAProperties(SWAPPER.swap(dataSource));
        PropertyUtils.setProperties(result, xaProperties);
        return result;
}

先用到XADataSourceDefinition接口:

public interface XADataSourceDefinition extends DatabaseTypeAwareSPI {

    //获取 XA 驱动类名
    Collection<String> getXADriverClassName();

    //获取 XA 属性
    Properties getXAProperties(DatabaseAccessConfiguration databaseAccessConfiguration);
}

该接口继承DatabaseTypeAwareSPI:

public interface DatabaseTypeAwareSPI { 
    //获取数据库类型
    String getDatabaseType();
}

ShardingSphere继承 DatabaseTypeAwareSPI 接口的只有 XADataSourceDefinition 接口,而后者存在一批实现类,整体的类层结构如下所示:

以MySQLXADataSourceDefinition为例,该类分别实现DatabaseTypeAwareSPI 和 XADataSourceDefinition 这两个接口中所定义的三个方法:

public final class MySQLXADataSourceDefinition implements XADataSourceDefinition {

    @Override
    public String getDatabaseType() {
        return "MySQL";
    }

    @Override
    public Collection<String> getXADriverClassName() {
        return Arrays.asList("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource", "com.mysql.cj.jdbc.MysqlXADataSource");
    }

    @Override
    public Properties getXAProperties(final DatabaseAccessConfiguration databaseAccessConfiguration) {
        Properties result = new Properties();
        result.setProperty("user", databaseAccessConfiguration.getUsername());
        result.setProperty("password", Optional.fromNullable(databaseAccessConfiguration.getPassword()).or(""));
        result.setProperty("URL", databaseAccessConfiguration.getUrl());
        … 
        return result;
    }
}

作为数据库供应商,MySQL 提供两个 XADataSource 驱动程序。getXAProperties发现 URL、Username 和 Password 等信息是通过 DatabaseAccessConfiguration 对象获取。

因为 DatabaseTypeAwareSPI 接口,各种 XADataSourceDefinition 也是基于 SPI 加载的,获取 XADataSourceDefinition 的工厂类 XADataSourceDefinitionFactory 中验证:

public final class XADataSourceDefinitionFactory {

    private static final Map<DatabaseType, XADataSourceDefinition> XA_DATA_SOURCE_DEFINITIONS = new HashMap<>();

    static {
       //通过 ServiceLoader 加载 XADataSourceDefinition
        for (XADataSourceDefinition each : ServiceLoader.load(XADataSourceDefinition.class)) {
            XA_DATA_SOURCE_DEFINITIONS.put(DatabaseTypes.getActualDatabaseType(each.getDatabaseType()), each);
        }
    }

    public static XADataSourceDefinition getXADataSourceDefinition(final DatabaseType databaseType) {
        return XA_DATA_SOURCE_DEFINITIONS.get(databaseType);
    }
}

sharding-transaction-xa-core 工程中的 SPI 配置:

当根据数据库类型获取对应 XADataSourceDefinition 之后,即可根据 XADriverClassName 来创建具体的 XADataSource:

private static XADataSource loadXADataSource(final String xaDataSourceClassName) {
        Class xaDataSourceClass;
            //加载 XADataSource 实现类
  xaDataSourceClass = Thread.currentThread().getContextClassLoader().loadClass(xaDataSourceClassName);
  return (XADataSource) xaDataSourceClass.newInstance();
}

先从当前线程的 ContextClassLoader 中加载目标驱动的实现类,如加载不到,直接反射创建,最后返回 XADataSource 的实例对象。

获取 XADataSource 的实例对象之后,我们需要设置它的属性,这部分工作是由 DataSourceSwapper 类来完成的。在这里,ShardingSphere 针对不同类型的数据库连接池工具还专门做了一层封装,提取了 DataSourcePropertyProvider 接口用于对 DataSource的URL 、Username 和 Password 等基础信息进行抽象。

DataSourcePropertyProvider 接口定义:

public interface DataSourcePropertyProvider {
    String getDataSourceClassName();
    String getURLPropertyName();
    String getUsernamePropertyName();
    String getPasswordPropertyName();
}

DataSourcePropertyProvider 实现类:

  • DefaultDataSourcePropertyProvider
  • HikariCPPropertyProvider:默认使用,SPI 配置验证:
public final class HikariCPPropertyProvider implements DataSourcePropertyProvider {

    @Override
    public String getDataSourceClassName() {
        return "com.zaxxer.hikari.HikariDataSource";
    }

    @Override
    public String getURLPropertyName() {
        return "jdbcUrl";
    }

    @Override
    public String getUsernamePropertyName() {
        return "username";
    }

    @Override
    public String getPasswordPropertyName() {
        return "password";
    }
}

DataSourceSwapper#swap 反射构建 findGetterMethod 工具方法,以获取 URL、Username 和 Password 等信息,并返回DatabaseAccessConfiguration对象供具体 XADataSourceDefinition 使用。

public DatabaseAccessConfiguration swap(final DataSource dataSource) {
        DataSourcePropertyProvider provider = DataSourcePropertyProviderLoader.getProvider(dataSource);
            String url = (String) findGetterMethod(dataSource, provider.getURLPropertyName()).invoke(dataSource);
            String username = (String) findGetterMethod(dataSource, provider.getUsernamePropertyName()).invoke(dataSource);
            String password = (String) findGetterMethod(dataSource, provider.getPasswordPropertyName()).invoke(dataSource);
            return new DatabaseAccessConfiguration(url, username, password);
}

XADataSource 构建完毕,XADataSourceFactory 为中心的类图:

2.2 XAConnection

JDBC 规范接口。负责创建 XAConnection 的工厂类 XAConnectionFactory:

public final class XAConnectionFactory {
 
    // 基于普通 Connection 创建 XAConnection
    public static XAConnection createXAConnection(final DatabaseType databaseType, final XADataSource xaDataSource, final Connection connection) {
        // 根据数据库类型分别构建了对应的 ConnectionWrapper
        switch (databaseType.getName()) {
            case "MySQL":
                    // 返回 XAConnection
                return new MySQLXAConnectionWrapper().wrap(xaDataSource, connection);
                    ...
        }
    }
}

MySQLXAConnectionWrapper 实现 XAConnectionWrapper 接口,先看它:

public interface XAConnectionWrapper {
    // 基于 XADataSource 把 Connection 包装成 XAConnection
    XAConnection wrap(XADataSource xaDataSource, Connection connection);
}

按传入XADataSource、Connection创建新XAConnection。XAConnectionWrapper 接口类层结构:

MySQLXAConnectionWrapper#warp

@Override
public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) {
        // 将传入的 Connection 转变为一个真实的连接对象
        Connection physicalConnection = unwrapPhysicalConnection(xaDataSource.getClass().getName(), connection);
        Method method = xaDataSource.getClass().getDeclaredMethod("wrapConnection", Connection.class);
        method.setAccessible(true);
        // 通过反射包装 Connection 对象
        return (XAConnection) method.invoke(xaDataSource, physicalConnection);
}

再基于 XADataSource#wrapConnection,通过反射对这物理连接进行包装,形成一个 XAConnection 对象。

MySQL有两种 XADataSource 驱动类。而 MySQLXAConnectionWrapper 也找到如下这两种驱动类:

public final class MySQLXAConnectionWrapper implements XAConnectionWrapper {
    
    String MYSQL_XA_DATASOURCE_5 = "com.mysql.jdbc.jdbc2.optional.MysqlXADataSource";
    
    String MYSQL_XA_DATASOURCE_8 = "com.mysql.cj.jdbc.MysqlXADataSource";

根据数据库版本,两个驱动类行为也不同。因此,处理也不同:

private Connection unwrapPhysicalConnection(final String xaDataSourceClassName, final Connection connection) {
        switch (xaDataSourceClassName) {
            case MYSQL_XA_DATASOURCE_5:
                return (Connection) connection.unwrap(Class.forName("com.mysql.jdbc.Connection"));
            case MYSQL_XA_DATASOURCE_8:
                return (Connection) connection.unwrap(Class.forName("com.mysql.cj.jdbc.JdbcConnection"));
        }
}

对比看 PostgreSQLXAConnectionWrapper#wrap:

public XAConnection wrap(final XADataSource xaDataSource, final Connection connection) {
        BaseConnection physicalConnection = (BaseConnection) connection.unwrap(Class.forName("org.postgresql.core.BaseConnection"));
        return new PGXAConnection(physicalConnection);
}

2.3 XATransactionDataSource

XAShardingTransactionManager用的 DataSource 并非 JDBC 原生 XADataSource,而是XATransactionDataSource:

private final DatabaseType databaseType;
private final String resourceName;
private final DataSource dataSource;
private XADataSource xaDataSource;
private XATransactionManager xaTransactionManager; 
     
public XATransactionDataSource(final DatabaseType databaseType, final String resourceName, final DataSource dataSource, final XATransactionManager xaTransactionManager) {
        this.databaseType = databaseType;
        this.resourceName = resourceName;
        this.dataSource = dataSource;
        this.xaDataSource = XADataSourceFactory.build(databaseType, dataSource);
        this.xaTransactionManager = xaTransactionManager;
            // 将构建的 XADataSource 作为一种资源进行注册
        xaTransactionManager.registerRecoveryResource(resourceName, xaDataSource);
}

getConnection

public Connection getConnection() throws SQLException, SystemException, RollbackException {
            ...
        // 从DataSource构建一个Connection
        Connection result = dataSource.getConnection();
        // 通过 XAConnectionFactory 创建一个 XAConnection
        XAConnection xaConnection = XAConnectionFactory.createXAConnection(databaseType, xaDataSource, result);
        // 从 XATransactionManager 获取 Transaction 对象
        final Transaction transaction = xaTransactionManager.getTransactionManager().getTransaction();
        // 判断当前线程是否存在这 Transaction
        if (!enlistedTransactions.get().contains(transaction)) {
                // 将 XAConnection 中的 XAResource 与目标 Transaction 对象关联
            transaction.enlistResource(new SingleXAResource(resourceName, xaConnection.getXAResource()));
            // Transaction 中注册一个 Synchronization 接口
            transaction.registerSynchronization(new Synchronization() {
                @Override
                public void beforeCompletion() {
                    enlistedTransactions.get().remove(transaction);
                }

                @Override
                public void afterCompletion(final int status) {
                    enlistedTransactions.get().clear();
                }
            });
            // 将该 Transaction 对象放入当前线程
            enlistedTransactions.get().add(transaction);
        }
        return result;
}

XATransactionDataSource 中存在一个 ThreadLocal 变量 enlistedTransactions,保存当前线程的 Transaction 列表:

private final ThreadLocal<Set<Transaction>> enlistedTransactions = new ThreadLocal<Set<Transaction>>() {
        @Override
        public Set<Transaction> initialValue() {
            return new HashSet<>();
        }
};

close

@Override
public void close() {
  // 将资源移出
  xaTransactionManager.removeRecoveryResource(resourceName, xaDataSource);
}

3 从源码到开发

ShardingSphere 作为完全兼容 JDBC 规范的分布式数据库中间件,同样完成针对分布式事务中的相关对象的兼容。本文进一步强化对 JDBC 规范的理解和如何扩展JDBC 规范中核心接口的方法。同时,在 MySQLXAConnectionWrapper 这个 Wrapper 类中,使用反射创建 XAConnection 对象的实现方法。这些开发技巧都值得应用。

4 总结

ShardingSphere 提供强一致性、最终一致性两种实现。本文研究了基于 XA 协议的分片事务管理器 XAShardingTransactionManager,理解 XAShardingTransactionManager 中 XADataSource、XAConnection 等核心对象的关键还是要站在 JDBC 规范基础,掌握与分布式事务集成和兼容的整个过程。

FAQ

Q:ShardingSphere 中对分布式环境下的强一致性事务做了哪些维度抽象?

ShardingSphere 在处理分布式环境中的强一致性事务时,进行了多个维度的抽象来确保数据一致性和系统的可扩展性。以下是 ShardingSphere 针对强一致性事务做出的主要抽象维度:

  1. 事务管理抽象
    ShardingSphere 对事务管理进行了抽象,支持不同的事务模型,比如本地事务和分布式事务。分布式事务可以采用两阶段提交(2PC)或三阶段提交(3PC)等协议进行协调。此外,ShardingSphere 还引入了基于柔性事务的最佳努力交付(Best Efforts Delivery, BED)和最终一致性事务,以提供更高的灵活性。

  2. 事务协调器抽象
    ShardingSphere 设计了事务协调器(Transaction Coordinator),用于在分布式环境下管理和协调事务。通过事务协调器,系统可以在各个分片数据库之间实现事务的全局一致性。协调器负责事务的开始、提交和回滚操作,并监控事务的状态,确保所有参与节点的一致性。

  3. 锁机制抽象
    为了确保在分布式事务中各个节点的数据一致性,ShardingSphere 引入了分布式锁机制的抽象。在分布式场景下,锁机制用于协调不同事务对同一资源的访问,防止并发冲突。ShardingSphere 提供了基于数据库层面的锁管理,同时支持多种分布式锁实现方式,例如基于 Zookeeper 的分布式锁。

  4. 隔离级别与并发控制抽象
    ShardingSphere 支持不同的事务隔离级别,通过抽象不同的并发控制机制,如读写锁、行级锁等,来确保事务在分布式环境中的隔离性。在高并发的环境中,这种抽象使得系统能够在性能和一致性之间取得平衡。

  5. 数据一致性保障机制抽象
    ShardingSphere 对数据一致性保障机制进行了抽象设计,包括数据校验、补偿机制和失败重试策略等。特别是在发生网络分区或节点故障时,这些机制能够确保分布式事务最终能够达到一致性状态。

  6. 柔性事务与最终一致性支持
    为了在性能和一致性之间找到平衡,ShardingSphere 提供了柔性事务(Flexible Transaction)支持,允许系统在某些场景下使用最终一致性模型,如异步补偿和定期对账等方式,确保数据的一致性和系统的高可用性。

通过以上抽象维度,ShardingSphere 为分布式环境下的强一致性事务提供多种实现方式,使得系统能够在分布式数据库和多数据源架构下平衡一致性和性能需求。

关注我,紧跟本系列专栏文章,咱们下篇再续!

作者简介:魔都架构师,多家大厂后端一线研发经验,在分布式系统设计、数据平台架构和AI应用开发等领域都有丰富实践经验。

各大技术社区头部专家博主。具有丰富的引领团队经验,深厚业务架构和解决方案的积累。

负责:

  • 中央/分销预订系统性能优化
  • 活动&券等营销中台建设
  • 交易平台及数据中台等架构和开发设计
  • 车联网核心平台-物联网连接平台、大数据平台架构设计及优化
  • LLM Agent应用开发
  • 区块链应用开发
  • 大数据开发挖掘经验
  • 推荐系统项目

目前主攻市级软件项目设计、构建服务全社会的应用系统。

参考:

本文由博客一文多发平台 OpenWrite 发布!

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,013评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,205评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,370评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,168评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,153评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,954评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,271评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,916评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,382评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,877评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,989评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,624评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,209评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,199评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,418评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,401评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,700评论 2 345

推荐阅读更多精彩内容