TX-LCN核心源码解读
TX-LCN是基于Java编写的分布式事务解决方案框架,主要提供三种主流的解决方案
- LCN模式,通过代理JDBC Connection来控制协调多组本地原子事务的提交与关闭
- TCC模式,属于框架级别解决方案,对业务入侵性极大
- TXC模式,核心方案为查询 + 分布式锁的分布式事务解决方案,由淘宝团队提出
核心组件
- TC,作为分布式事务组件的客户端角色,主要作用在于治理本地事务
- TM,作为分布式事务组件的服务端角色,主要作用在于协调事务组
核心概念
- 事务组,
group
,描述整个分布式环境下运行的各个事务(以一个request所需要完成的事务)组合而成的一组事务。 - 事务单元,
unit
,描述一个事务组内除开主事务之外的从事务,一个从事务表示一个事务单元。
核心流程
核心源码
以LCN
模式作为实践方案,源码也以LCN
作为解读对象。LCN
模式是什么模式?
LCN
模式是通过代理Connection
的方式实现对本地事务的操作,然后在由TxManager
统一协调控制事务。当本地事务提交回滚或者关闭连接时将会执行假操作,该代理的连接将由LCN
连接池管理。
创建事务组
/**
* Client创建事务组操作集合
*
* @param groupId groupId
* @param unitId unitId
* @param transactionInfo transactionInfo
* @param transactionType transactionType
* @throws TransactionException 创建group失败时抛出
*/
public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
throws TransactionException {
//创建事务组
try {
// 日志
txLogger.transactionInfo(groupId, unitId,
"create group > {} > groupId: {xid}, unitId: {uid}", transactionType);
// 创建事务组消息
reliableMessenger.createGroup(groupId);
// 缓存发起方切面信息
aspectLogger.trace(groupId, unitId, transactionInfo);
} catch (RpcException e) {
// 通讯异常
dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
} catch (LcnBusinessException e) {
// 创建事务组业务失败
dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
}
txLogger.transactionInfo(groupId, unitId, "create group over");
}
TC
发起创建事务组仅仅是像服务端发起请求,参数为groupId
,核心的代码在于TM
接到请求后的处理。
public class CreateGroupExecuteService implements RpcExecuteService {
// ...
@Override
public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
// ...
transactionManager.begin(transactionCmd.getGroupId());
}
}
public class DefaultDTXContextRegistry implements DTXContextRegistry {
private final FastStorage fastStorage;
@Override
public DTXContext create(String groupId) throws TransactionException {
// ..
fastStorage.initGroup(groupId);
}
}
public class RedisStorage implements FastStorage {
// ...
@Override
public void initGroup(String groupId) {
// 将groupId存入Redis
redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, "root", "");
redisTemplate.expire(REDIS_GROUP_PREFIX + groupId, managerConfig.getDtxTime() + 10000, TimeUnit.MILLISECONDS);
}
}
加入事务组
/**
* Client加入事务组操作集合
*
* @param groupId groupId
* @param unitId unitId
* @param transactionType transactionType
* @param transactionInfo transactionInfo
* @throws TransactionException 加入事务组失败时抛出
*/
public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo) throws TransactionException {
// 询问TM加入事务组
// 该groupId由远程RPC通过header方式携带到从事务
reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState());
// 异步检测
dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);
// ...
}
//
@Override
public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
txLogger.taskInfo(groupId, unitId, "start delay checking task");
// 异步阻塞的方式
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
try {
TxContext txContext = globalContext.txContext(groupId);
if (Objects.nonNull(txContext)) {
synchronized (txContext.getLock()) {
txLogger.info(groupId, unitId, Transactions.TAG_TASK,
"checking waiting for business code finish.");
txContext.getLock().wait();
}
}
int state = reliableMessenger.askTransactionState(groupId, unitId);// 询问事务组是否成功
txLogger.taskInfo(groupId, unitId, "ask transaction state {}", state);
if (state == -1) {
txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
onAskTransactionStateException(groupId, unitId, transactionType);
} else {
transactionCleanTemplate.clean(groupId, unitId, transactionType, state);// 事务清理,即commit or rollback
aspectLogger.clearLog(groupId, unitId);
}
} catch (RpcException e) {
onAskTransactionStateException(groupId, unitId, transactionType);
} catch (TransactionClearException | InterruptedException e) {
txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
}
}, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);// 时间为最大事务时间,该时间由TM配置,在TC初始化时从TM放取到
delayTasks.put(groupId + unitId, scheduledFuture);
}
// 询问事务是否成功
private void onAskTransactionStateException(String groupId, String unitId, String transactionType) {
try {
// 通知TxManager事务补偿
txMangerReporter.reportTransactionState(groupId, unitId, TxExceptionParams.ASK_ERROR, 0);
log.warn("{} > has compensation info!", transactionType);
// 事务回滚, 保留适当的补偿信息
transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, 0);
} catch (TransactionClearException e) {
log.error("{} > clean transaction error.", transactionType);
}
}
TC
核心工作是申请加入事务组,并启动异步任务在事务最大时间后访问TM
事务组的全局事务状态来进行事务协调。
创建异步任务对象并将其缓存在本地内存的delayTasks
, 如果在事务最大时间内已经完成并调用则将该任务取消。
@Override
public void stopDelayChecking(String groupId, String unitId) {
ScheduledFuture scheduledFuture = delayTasks.get(groupId + unitId);
if (Objects.nonNull(scheduledFuture)) {
txLogger.taskInfo(groupId, unitId, "cancel {}:{} checking.", groupId, unitId);
scheduledFuture.cancel(true); // 取消任务
}
}
而在TM
端则进行如下处理
// 加入事务组
// RedisStorage
public void saveTransactionUnitToGroup(String groupId, TransactionUnit transactionUnit) throws FastStorageException {
if (Optional.ofNullable(redisTemplate.hasKey(REDIS_GROUP_PREFIX + groupId)).orElse(false)) {
redisTemplate.opsForHash().put(REDIS_GROUP_PREFIX + groupId, transactionUnit.getUnitId(), transactionUnit);
return;
}
throw new FastStorageException("attempts to the non-existent transaction group " + groupId, FastStorageException.EX_CODE_NON_GROUP);
}
// TC 询问该事务组全局事务状态
public int transactionState(String groupId) {
int state = exceptionService.transactionState(groupId); // 查询数据库t_tx_exception得到该事务的状态
//存在数据时返回数据状态
if (state != -1) {
return state;
}
// 查询redis该事务组的全局状态
return dtxContextRegistry.transactionState(groupId);
// 为什么会先查数据库,再查redis ? 后文会有说明,主要是考虑事务补偿时的问题
}
// 返回的最终状态
public class AskTransactionStateExecuteService implements RpcExecuteService {
@Override
public Serializable execute(TransactionCmd transactionCmd) {
int state = transactionManager.transactionState(transactionCmd.getGroupId());
return state == -1 ? 0 : state;
}
}
通知事务组
主事务的业务代码执行完毕,最终必须调用通知事务组进行全局事务协调。通知完成后进行事务清理
/**
* Client通知事务组操作集合
*
* @param groupId groupId
* @param unitId unitId
* @param transactionType transactionType
* @param state transactionState
*/
public void notifyGroup(String groupId, String unitId, String transactionType, int state) {
// ...
// 事务通知
reliableMessenger.notifyGroup(groupId, state);
// 事务清理
transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
// 通知异常(RPC调用异常)
dtxExceptionHandler.handleNotifyGroupMessageException(Arrays.asList(groupId, state, unitId, transactionType), e);
// ...
}
// 当TC调用TM抛出异常时,会正常的按照当前事务的状态进行提交,并将结果上报到TM
public void handleNotifyGroupMessageException(Object params, Throwable ex) {
// 参数中取出事务的状态
// ....
// 按状态正常结束事务(切面补偿记录将保留)
// TxManager 存在请求异常或者响应异常两种情况。当请求异常时这里的业务需要补偿,当响应异常的时候需要做状态的事务清理。
// 请求异常时
// 参与放会根据上报补偿记录做事务的提交。
// 响应异常时
// 参与反会正常提交事务,本地业务提示事务。
// 该两种情况下补偿信息均可以忽略,可直接把本地补偿记录数据删除。
String unitId = (String) paramList.get(2);
String transactionType = (String) paramList.get(3);
try {
transactionCleanTemplate.compensationClean(groupId, unitId, transactionType, state);// 本地事务提交
} catch (TransactionClearException e) {
log.error("{} > compensationClean transaction error.", transactionType);
}
// 上报Manager,上报直到成功.
txMangerReporter.reportTransactionState(groupId, null, TxExceptionParams.NOTIFY_GROUP_ERROR, state);
// 提交的事务记录到t_tx_exception表中,所以会看到前文TC询问事务状态时,会优先查询数据库,而不是直接查redis
}
通知事务组的概念,应该理解为,主事务告知TM
进行全部的事务协调,即TM
仅会通知各个从事务进行commit or rollback
,而不会通知主事务进行commit or rollback
。因为在前文看到创建事务组时,TM
并没有将主事务unitId
记录下来。而从事务加入事务组时,除了记录全局事务组Id,还包括事务单元unitId
.
public Serializable execute(TransactionCmd transactionCmd) throws TxManagerException {
try {
// 从redis取事务状态
int transactionState = transactionManager.transactionStateFromFastStorage(transactionCmd.getGroupId());
boolean hasThrow = false;
if (transactionState == 0) {
commitState = 0;
hasThrow = true;
}
// 事务状态为1进行全局事务提交
if (commitState == 1) {
transactionManager.commit(dtxContext);
} else if (commitState == 0) {
transactionManager.rollback(dtxContext);
}
// ...
} catch (TransactionException e) {
throw new TxManagerException(e);
} finally {
transactionManager.close(transactionCmd.getGroupId());
// 系统日志
txLogger.transactionInfo(transactionCmd.getGroupId(), "", "notify group over");
}
return null;
}
// 事务通知
private void notifyTransaction(DTXContext dtxContext, int transactionState) throws TransactionException {
for (TransactionUnit transUnit : dtxContext.transactionUnits()) {
NotifyUnitParams notifyUnitParams = new NotifyUnitParams();
notifyUnitParams.setGroupId(dtxContext.getGroupId());
notifyUnitParams.setUnitId(transUnit.getUnitId());
notifyUnitParams.setUnitType(transUnit.getUnitType());
notifyUnitParams.setState(transactionState);
txLogger.info(dtxContext.getGroupId(),
notifyUnitParams.getUnitId(), Transactions.TAG_TRANSACTION, "notify %s's unit: %s",
transUnit.getModId(), transUnit.getUnitId());
try {
// 这里在5.0.1会出现信道问题,什么是信道问题?比如此时有两台push注册到TM上,而某一刻的全局事务所在的本地事务只在其中一台,而通知的时候如果modId一致则会取到第一个
// 如下get(0) . 解决的办法是生成modId时去的是Mac地址+端口+服务名称,保证了不同实例的全局唯一
List<String> modChannelKeys = rpcClient.remoteKeys(transUnit.getModId());
if (modChannelKeys.isEmpty()) {
// record exception
throw new RpcException("offline mod.");
}
MessageDto respMsg =
rpcClient.request(modChannelKeys.get(0), MessageCreator.notifyUnit(notifyUnitParams));
if (!MessageUtils.statusOk(respMsg)) {
// 提交/回滚失败的消息处理
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitBusinessException(params, respMsg.loadBean(Throwable.class));
}
} catch (RpcException e) {
// 提交/回滚通讯失败
List<Object> params = Arrays.asList(notifyUnitParams, transUnit.getModId());
rpcExceptionHandler.handleNotifyUnitMessageException(params, e);
} finally {
txLogger.transactionInfo(dtxContext.getGroupId(), notifyUnitParams.getUnitId(), "notify unit over");
}
}
}
// 当通知出现异常时,将信息记录到t_tx_exception表
public class ManagerRpcExceptionHandler implements RpcExceptionHandler {
@Override
public void handleNotifyUnitMessageException(Object params, Throwable e) {
// notify unit message error, write txEx
List paramList = ((List) params);
String modName = (String) paramList.get(1);
NotifyUnitParams notifyUnitParams = (NotifyUnitParams) paramList.get(0);
WriteTxExceptionDTO writeTxExceptionReq = new WriteTxExceptionDTO(notifyUnitParams.getGroupId(),
notifyUnitParams.getUnitId(), modName, notifyUnitParams.getState());
writeTxExceptionReq.setRegistrar((short) 0);
compensationService.writeTxException(writeTxExceptionReq);// 记录到t_tx_exception
// 记住客户端主动查询时,优先查数据库,再查redis的事务状态
}
}
总结
TX-LCN
作为分布式解决方案是比较优秀的方案,代码逻辑也比较简单,但是如果应用Crash,就可能出现数据不一致的情况,而且这种数据不一致的情况必须人肉修复。
比如主事务在进行NotifyGroup
时出现RpcException
主事务会根据当前事务的状态进行commit
or rollback
,之后会上报TM
记录补偿信息,假如在记录补偿时失败了(应用在这个点Crash
)了,那么主事务提交了,并且TM
并不能完整地协调好从事务
的全局事务状态。
为什么需要人肉修复呢?其实从源码上可以分析出,TX-LCN
解决的场景时将本地的事务通过事务协调器进行协调,但是本质上并没有将事务分布式节点化,即本地事务的成功与失败无法在不同的节点进行处理
。