多线程事务回滚尝试
我们知道spring的事务管理是将事务状态存储于ThreadLocal当中,在进行事务传播的时候,这些变量也会进行传递,可一旦进入到多线程操作的时候,跨线程的事务会出现阻断,子流程的事务会处于独立的事务自制情况,于是思考能否在创建多线程的时候存储事务的状态值,下面列出代码:
List<TransactionStatus> transactionErrorList = Collections.synchronizedList(new ArrayList<TransactionStatus>());List<TransactionStatus> transactionCommitList = Collections.synchronizedList(new ArrayList<TransactionStatus>());List<Thread> threadList = Collections.synchronizedList(new ArrayList<Thread>());CountDownLatch latch = new CountDownLatch(customers.size());// 补全数据customers.forEach(customer -> { Thread thread = new Thread(() -> { customer.setCreatorLdap(request.getLdap()); customer.setCreatorName(request.getUserName()); String enterpriseName = Optional.ofNullable(customer.getEnterprises()).filter(CollectionUtils::isNotEmpty).map(enterprises -> { return enterprises.stream().map(CustomerEnterprise::getEnterpriseName).filter(StringUtils::isNotEmpty).collect(Collectors.joining("|")); }).orElse(""); customer.setEnterpriseName(enterpriseName); DefaultTransactionDefinition def = new DefaultTransactionDefinition(); def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW); TransactionStatus status = transactionManager.getTransaction(def); try { customerMapper.insert(customer); transactionCommitList.add(status); } catch (Exception e) { status.setRollbackOnly(); transactionErrorList.add(status); } finally { latch.countDown(); } }); thread.start();});latch.wait();if (CollectionUtils.isEmpty(transactionErrorList)) { transactionCommitList.forEach(transactionStatus -> { transactionManager.commit(transactionStatus); });}
这里使用countDownLatch让母线程进行等待,等待所有子线程处理完成,注意,这里的事务还未提交,使用的线程内部事务自制,思考在母线程中进行数据处理,处理结果为失败,这里没有留下运行截图,报错的意思大概是当前现成不存在该事物状态
共用sqlSession
在母线程创建sqlsession,传递至子线程中共用session,在母线程中手动对sqlsesion进行数据的提交与回滚,上代码:
SqlSession sqlSession = sqlSessionTemplate.getSqlSessionFactory().openSession(ExecutorType.BATCH, false);CountDownLatch latch = new CountDownLatch(customers.size());// 补全数据customers.parallelStream().forEach(customer -> { customer.setCreatorLdap(request.getLdap()); customer.setCreatorName(request.getUserName()); String enterpriseName = Optional.ofNullable(customer.getEnterprises()) .filter(CollectionUtils::isNotEmpty) .map(enterprises -> { return enterprises.stream().map(CustomerEnterprise::getEnterpriseName) .filter(StringUtils::isNotEmpty) .collect(Collectors.joining("|")); }).orElse(""); customer.setEnterpriseName(enterpriseName); try{ CustomerMapper customerMapperByHand= sqlSession.getMapper(CustomerMapper.class); customerMapperByHand.updateByIdSelective(customer); }catch (Exception e){ EPLogger.error().withMessage(e.getMessage()).flush(); }finally { latch.countDown(); }});try { latch.await(); sqlSession.commit(); sqlSession.clearCache();} catch (InterruptedException e) { sqlSession.rollback();}finally { sqlSession.close();}
实测有效,但实际mybatis不推荐手动使用sessionFactory创建sqlsession,特别是这种线程共用的情况,想想也就明白,多线程共用代表着其先后顺序不可控,无法控制sql的执行顺序,某种程度上算线程不安全的,多线程共用一个seesion很容易造成长连接,占用系统资源。这种处理方式某种程度上和手动创建connection进行共享是一个道理。
事务补偿
实际正式环境还是使用的事物补偿,上面的方式不敢直接在正式环境使用