4.0 atomikos JTA/XA全局事务

4.0 atomikos JTA/XA全局事务

 2018-02-05 02:47:40  1,720  2

Atomikos公司官方网址为:https://www.atomikos.com/。其旗下最著名的产品就是事务管理器。产品分两个版本:

TransactionEssentials:开源的免费产品

ExtremeTransactions:上商业版,需要收费。

这两个产品的关系如下图所示: 

TransactionEssentials:

1、实现了JTA/XA规范中的事务管理器(Transaction Manager)应该实现的相关接口,如:

    UserTransaction实现是com.atomikos.icatch.jta.UserTransactionImp,用户只需要直接操作这个类

    TransactionManager实现是com.atomikos.icatch.jta.UserTransactionManager

    Transaction实现是com.atomikos.icatch.jta.TransactionImp

2、针对实现了JDBC规范中规定的实现了XADataSource接口的数据库连接池,以及实现了JMS规范的MQ客户端提供一层封装。

     在上一节我们讲解JTA规范时,提到过XADataSource、XAConnection等接口应该由资源管理器RM来实现,而Atomikos的作用是一个事务管理器(TM),并不需要提供对应的实现。而Atomikos对XADataSource进行封装,只是为了方便与事务管理器整合。封装XADataSource的实现类为AtomikosDataSourceBean。典型的XADataSource实现包括:

    1、mysql官方提供的com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

    2、阿里巴巴开源的druid连接池,对应的实现类为com.alibaba.druid.pool.xa.DruidXADataSource

    3、tomcat-jdbc连接池提供的org.apache.tomcat.jdbc.pool.XADataSource

    而其他一些常用的数据库连接池,如dbcp、dbcp2或者c3p0,目前貌似尚未提供XADataSource接口的实现。如果提供给AtomikosDataSourceBean一个没有实现XADataSource接口的数据源,如c3p0的ComboPooledDataSource,则会抛出类似以下异常:

com.atomikos.jdbc.AtomikosSQLException: The class 'com.mchange.v2.c3p0.ComboPooledDataSource'

 specified by property 'xaDataSourceClassName' does not implement the required interface  

 javax.jdbc.XADataSource.  

 Please make sure the spelling is correct, and check your JDBC driver vendor's documentation.


ExtremeTransactions在TransactionEssentials的基础上额外提供了以下功能:

支持TCC:这是一种柔性事务

支持通过RMI、IIOP、SOAP这些远程过程调用技术,进行事务传播。

本文主要针对Atomikos开源版本的事务管理器实现TransactionEssentials进行讲解,包括:

1、直接使用TransactionEssentials的API

2、TransactionEssentials与spring、mybatis整合

3、Atomikos配置详解

直接使用TransactionEssentials的API

在maven项目的pom文件中引入以下依赖:

    com.atomikos

    transactions-jdbc

    4.0.6

    mysql

    mysql-connector-java

    5.1.39

新建mysql数据库表

需要注意的是,在mysql中,只有innodb引擎才支持XA事务,所以这里显式的指定了数据库引擎为innodb。

-- 新建数据库db_user;

create database db_user;

-- 在db_user库中新建user表

create table db_user.user(id int AUTO_INCREMENT PRIMARY KEY,name varchar(50)) engine=innodb;

-- 新建数据库db_account;

create database db_account;

-- 在db_account库中新建account表

create table db_account.account(user_id int,money double) engine=innodb;

另外,在本案例中,db_user库和db_account库是位于同一个mysql实例中的。 


案例代码:

    在使用了事务管理器之后,我们通过atomikos提供的UserTransaction接口的实现类com.atomikos.icatch.jta.UserTransactionImp来开启、提交和回滚事务。而不再是使用java.sql.Connection中的setAutoCommit(false)的方式来开启事务。其他JTA规范中定义的接口,开发人员并不需要直接使用。

import com.atomikos.icatch.jta.UserTransactionImp;

import com.atomikos.jdbc.AtomikosDataSourceBean;


import javax.transaction.SystemException;

import javax.transaction.UserTransaction;

import java.sql.Connection;

import java.sql.PreparedStatement;

import java.sql.ResultSet;

import java.sql.Statement;

import java.util.Properties;


public class AtomikosExample {


   private static AtomikosDataSourceBean createAtomikosDataSourceBean(String dbName) {

      // 连接池基本属性

      Properties p = new Properties();

      p.setProperty("url", "jdbc:mysql://localhost:3306/" + dbName);

      p.setProperty("user", "root");

      p.setProperty("password", "your password");


      // 使用AtomikosDataSourceBean封装com.mysql.jdbc.jdbc2.optional.MysqlXADataSource

      AtomikosDataSourceBean ds = new AtomikosDataSourceBean();

      //atomikos要求为每个AtomikosDataSourceBean名称,为了方便记忆,这里设置为和dbName相同

      ds.setUniqueResourceName(dbName);

      ds.setXaDataSourceClassName("com.mysql.jdbc.jdbc2.optional.MysqlXADataSource");

      ds.setXaProperties(p);

      return ds;

   }


   public static void main(String[] args) {


      AtomikosDataSourceBean ds1 = createAtomikosDataSourceBean("db_user");

      AtomikosDataSourceBean ds2 = createAtomikosDataSourceBean("db_account");


      Connection conn1 = null;

      Connection conn2 = null;

      PreparedStatement ps1 = null;

      PreparedStatement ps2 = null;


      UserTransaction userTransaction = new UserTransactionImp();

      try {

         // 开启事务

         userTransaction.begin();


         // 执行db1上的sql

         conn1 = ds1.getConnection();

         ps1 = conn1.prepareStatement("INSERT into user(name) VALUES (?)", Statement.RETURN_GENERATED_KEYS);

         ps1.setString(1, "tianshouzhi");

         ps1.executeUpdate();

         ResultSet generatedKeys = ps1.getGeneratedKeys();

         int userId = -1;

         while (generatedKeys.next()) {

            userId = generatedKeys.getInt(1);// 获得自动生成的userId

         }


         // 模拟异常 ,直接进入catch代码块,2个都不会提交

//        int i=1/0;


         // 执行db2上的sql

         conn2 = ds2.getConnection();

         ps2 = conn2.prepareStatement("INSERT into account(user_id,money) VALUES (?,?)");

         ps2.setInt(1, userId);

         ps2.setDouble(2, 10000000);

         ps2.executeUpdate();


         // 两阶段提交

         userTransaction.commit();

      } catch (Exception e) {

         try {

            e.printStackTrace();

            userTransaction.rollback();

         } catch (SystemException e1) {

            e1.printStackTrace();

         }

      } finally {

         try {

            ps1.close();

            ps2.close();

            conn1.close();

            conn2.close();

            ds1.close();

            ds2.close();

         } catch (Exception ignore) {

         }

      }

   }

}

2、TransactionEssentials与spring、mybatis整合

在pom中添加以下依赖 

    org.springframework

    spring-jdbc

    4.3.7.RELEASE

    org.springframework

    spring-context

    4.3.7.RELEASE

    org.mybatis

    mybatis

    3.4.1

    org.mybatis

    mybatis-spring

    1.3.1

新建User实体

package com.tianshouzhi.atomikos;

public class User {

   private int id;

   private String name;

   // setters and getters

}

新建Account实例

package com.tianshouzhi.atomikos;

public class Account {

   private int userId;

   private double money;

   // setters and getters

}

新建UserMapper接口,为了方便,这里使用了mybatis 注解方式,没有编写映射文件,作用是一样的

package com.tianshouzhi.atomikos.mappers.db_user;

import org.apache.ibatis.annotations.Insert;

import com.tianshouzhi.atomikos.User;

import org.apache.ibatis.annotations.Options;

public interface UserMapper {

   @Insert("INSERT INTO user(id,name) VALUES(#{id},#{name})")

   @Options(useGeneratedKeys = true, keyColumn = "id", keyProperty = "id")

   public void insert(User user);

}

新建AccountMapper接口

package com.tianshouzhi.atomikos.mappers.ds_account;

import com.tianshouzhi.atomikos.Account;

import org.apache.ibatis.annotations.Insert;

public interface AccountMapper {

    @Insert("INSERT INTO account(user_id,money) VALUES(#{userId},#{money})")

    public void insert(Account account);

}

新建使用JTA事务的bean,注意在使用jta事务的时候,依然可以使用spring的声明式事务管理

package com.tianshouzhi.atomikos;

import com.tianshouzhi.atomikos.mappers.db_user.UserMapper;

import com.tianshouzhi.atomikos.mappers.ds_account.AccountMapper;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.transaction.annotation.Transactional;

public class JTAService {

   @Autowired

   private UserMapper userMapper;//操作db_user库

   @Autowired

   private AccountMapper accountMapper;//操作db_account库

   @Transactional

   public void insert() {

      User user = new User();

      user.setName("wangxiaoxiao");

      userMapper.insert(user);


      //    int i = 1 / 0;//模拟异常,spring回滚后,db_user库中user表中也不会插入记录

      Account account = new Account();

      account.setUserId(user.getId());

      account.setMoney(123456789);

      accountMapper.insert(account);

   }

}

编写配置文件spring-atomikos.xml


       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"

       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd

       http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd">






          init-method="init" destroy-method="close">

        


                  value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />

        

            

                jdbc:mysql://localhost:3306/db_user

                root

                shxx12151022







          init-method="init" destroy-method="close">

        


                  value="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource" />

        

            

                jdbc:mysql://localhost:3306/db_account

                root

                shxx12151022






    

        



    

        




    

        


        



    

        


        





          destroy-method="close">

        




    

        




    


    

测试代码

package com.tianshouzhi.atomikos;


import org.springframework.context.ApplicationContext;

import org.springframework.context.support.ClassPathXmlApplicationContext;


public class AtomikosSpringMybatisExample {

   public static void main(String[] args) {

      ApplicationContext context = new ClassPathXmlApplicationContext("spring-atomikos.xml");

      JTAService jtaService = context.getBean("jtaService", JTAService.class);

      jtaService.insert();

   }

}

    建议读者先直接按照上述代码运行,以确定代码执行后,db_user库的user表和db_account的account表中的确各插入了一条记录,以证明我们的代码的确是操作了2个库,属于分布式事务。

    然后将JTAService中的异常模拟的注释打开,会发现出现异常后,两个库中都没有新插入的数据库,说明我们使用的JTA事务管理器的确保证数据的一致性了。

Atomikos配置

    在掌握了Atomikos基本使用之后,我们对Atomikos的配置进行一下简单的介绍。Atomikos在启动后,默认会从以下几个位置读取配置文件,这里笔者直接贴出atomikos源码进行说明:

com.atomikos.icatch.provider.imp.AssemblerImp#initializeProperties方法中定义了配置加载顺序逻辑: 

@Override

    public ConfigProperties initializeProperties() {

        //读取classpath下的默认配置transactions-defaults.properties

        Properties defaults = new Properties();

        loadPropertiesFromClasspath(defaults, DEFAULT_PROPERTIES_FILE_NAME);

        //读取classpath下,transactions.properties配置,覆盖transactions-defaults.properties中相同key的值

        Properties transactionsProperties = new Properties(defaults);

        loadPropertiesFromClasspath(transactionsProperties, TRANSACTIONS_PROPERTIES_FILE_NAME);

        //读取classpath下,jta.properties,覆盖transactions-defaults.properties、transactions.properties中相同key的值

        Properties jtaProperties = new Properties(transactionsProperties);

        loadPropertiesFromClasspath(jtaProperties, JTA_PROPERTIES_FILE_NAME);


        //读取通过java -Dcom.atomikos.icatch.file方式指定的自定义配置文件路径,覆盖之前的同名配置

        Properties customProperties = new Properties(jtaProperties);

        loadPropertiesFromCustomFilePath(customProperties);

        //最终构造一个ConfigProperties对象,来表示实际要使用的配置

        Properties finalProperties = new Properties(customProperties);

        return new ConfigProperties(finalProperties);

    }

配置文件优先级:transactions-defaults.properties

其中transactions-defaults.properties是atomikos自带的默认配置,位于transactions-xxx.jar中.

注意不同版本的默认配置可能不同。特别是3.x版本和4.x版本的差异比较明显。   

以下是4.0.6中 transactions-default.properties中配置内容,笔者对这些配置进行了归类,如下: 

===============================================================

============          事务管理器(TM)配置参数       ==============

===============================================================

#指定是否启动磁盘日志,默认为true。在生产环境下一定要保证为true,否则数据的完整性无法保证

com.atomikos.icatch.enable_logging=true

#JTA/XA资源是否应该自动注册

com.atomikos.icatch.automatic_resource_registration=true

#JTA事务的默认超时时间,默认为10000ms

com.atomikos.icatch.default_jta_timeout=10000

#事务的最大超时时间,默认为300000ms。这表示事务超时时间由 UserTransaction.setTransactionTimeout()较大者决定。4.x版本之后,指定为0的话则表示不设置超时时间

com.atomikos.icatch.max_timeout=300000

#指定在两阶段提交时,是否使用不同的线程(意味着并行)。3.7版本之后默认为false,更早的版本默认为true。如果为false,则提交将按照事务中访问资源的顺序进行。

com.atomikos.icatch.threaded_2pc=false

#指定最多可以同时运行的事务数量,默认值为50,负数表示没有数量限制。在调用 UserTransaction.begin()方法时,可能会抛出一个”Max number of active transactions reached”异常信息,表示超出最大事务数限制

com.atomikos.icatch.max_actives=50

#是否支持subtransaction,默认为true

com.atomikos.icatch.allow_subtransactions=true

#指定在可能的情况下,否应该join 子事务(subtransactions),默认值为true。如果设置为false,对于有关联的不同subtransactions,不会调用XAResource.start(TM_JOIN)

com.atomikos.icatch.serial_jta_transactions=true

#指定JVM关闭时是否强制(force)关闭事务管理器,默认为false

com.atomikos.icatch.force_shutdown_on_vm_exit=false

#在正常关闭(no-force)的情况下,应该等待事务执行完成的时间,默认为Long.MAX_VALUE

com.atomikos.icatch.default_max_wait_time_on_shutdown=9223372036854775807


===============================================================

=========        事务日志(Transaction logs)记录配置       =======

===============================================================

#事务日志目录,默认为./。

com.atomikos.icatch.log_base_dir=./

#事务日志文件前缀,默认为tmlog。事务日志存储在文件中,文件名包含一个数字后缀,日志文件以.log为扩展名,如tmlog1.log。遇到checkpoint时,新的事务日志文件会被创建,数字增加。

com.atomikos.icatch.log_base_name=tmlog

#指定两次checkpoint的时间间隔,默认为500

com.atomikos.icatch.checkpoint_interval=500


===============================================================

=========          事务日志恢复(Recovery)配置       =============

===============================================================

#指定在多长时间后可以清空无法恢复的事务日志(orphaned),默认86400000ms

com.atomikos.icatch.forget_orphaned_log_entries_delay=86400000

#指定两次恢复扫描之间的延迟时间。默认值为与com.atomikos.icatch.default_jta_timeout相同

com.atomikos.icatch.recovery_delay=${com.atomikos.icatch.default_jta_timeout}

#提交失败时,再抛出一个异常之前,最多可以重试几次,默认值为5

com.atomikos.icatch.oltp_max_retries=5

#提交失败时,每次重试的时间间隔,默认10000ms

com.atomikos.icatch.oltp_retry_interval=10000


===============================================================

=========          其他       =============================== ==

===============================================================

java.naming.factory.initial=com.sun.jndi.rmi.registry.RegistryContextFactory

com.atomikos.icatch.client_demarcation=false

java.naming.provider.url=rmi://localhost:1099

com.atomikos.icatch.rmi_export_class=none

com.atomikos.icatch.trust_client_tm=false

当我们想对默认的配置进行修改时,可以在classpath下新建一个jta.properties,覆盖同名的配置项即可。

关于不同版本配置的差异,请参考官方文档:https://www.atomikos.com/Documentation/JtaProperties

打印日志

4.x版本之后,优先尝试使用slf4j,如果没有则尝试使用log4j,如果二者都没有,则使用JUL。

参见:https://www.atomikos.com/Documentation/ConfiguringTheLogs

注意这里是说的是如何配置打印工作日志(work log),而前面说的是事务日志(transactions log),二者不是不同的。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容