背景:
在项目中使用多线程抓取第三方数据执行数据入库时,如果某个子线程执行异常,其他子线事务全部回滚,spring对多线程无法进行事务控制,是因为多线程底层连接数据库的时候,是使用的线程变量(TheadLocal),线程之间事务隔离,每个线程有自己的连接,事务肯定不是同一个了。
解决办法
思想就是使用两个CountDownLatch实现子线程的二段提交
步骤:
1、主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务,并使用BlockingDeque存储线程的返回结果。
2、使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。
3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。
线程池工具类
package com.yunshidi.freight.wf.operation;
import java.lang.reflect.Constructor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.logging.Handler;
public class ThreadPoolTool {
/** * 多线程任务
* @param transactionManager
* @param data
* @param threadCount
* @param params
* @param clazz
*/
public void excuteTask(DataSourceTransactionManager transactionManager, List data,int threadCount, Map params, Class clazz) {
if(data ==null|| data.size() == 0) {
return;
}
int batch = 0;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
//监控子线程的任务执行
CountDownLatch childMonitor = new CountDownLatch(threadCount);
//监控主线程,是否需要回滚
CountDownLatch mainMonitor = new CountDownLatch(1);
//存储任务的返回结果,返回true表示不需要回滚,反之,则回滚(由链表结构组成的双向阻塞队列)
BlockingDeque results = new LinkedBlockingDeque(threadCount);
RollBack rollback = new RollBack(false);
try {
LinkedBlockingQueue queue = splitQueue(data, threadCount);//由链表结构组成的有界阻塞队列
while(true) {
List list = queue.poll();
if(list ==null) {
break;
}
batch++;
params.put("batch", batch);
Constructor constructor = clazz.getConstructor(new Class[]{
CountDownLatch.class, CountDownLatch.class, BlockingDeque.class, RollBack.class, DataSourceTransactionManager.class, Object.class, Map.class});
ThreadTask task = (ThreadTask) constructor.newInstance(childMonitor, mainMonitor, results, rollback, transactionManager, list, params);
executor.execute(task);
}
// 1、主线程将任务分发给子线程,然后使用childMonitor.await();阻塞主线程,等待所有子线程处理向数据库中插入的业务。
childMonitor.await();
System.out.println("主线程开始执行任务");
//根据返回结果来确定是否回滚
for(int i = 0; i < threadCount; i++) {
Boolean result = results.take();
if(!result) {
//有线程执行异常,需要回滚子线程
rollback.setNeedRoolBack(true);
}
}
// 3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。
mainMonitor.countDown();
} catch (Exception e) {
log.error(e.getMessage());
} finally {
//关闭线程池,释放资源
executor.shutdown();
}
}
/** * 队列拆分
*
* @param data 需要执行的数据集合
* @param threadCount 核心线程数
* @return*/
private LinkedBlockingQueue splitQueue(List data, int threadCount) {
LinkedBlockingQueue queueBatch =new LinkedBlockingQueue();
int total = data.size();
int oneSize = total / threadCount;
int start;
int end;
for(int i = 0; i < threadCount; i++) {
start = i * oneSize;
end = (i + 1) * oneSize;
if(i < threadCount - 1) {
queueBatch.add(data.subList(start, end));
} else {
queueBatch.add(data.subList(start, data.size()));
}
}
return queueBatch;
}
}
任务执行类
package com.yunshidi.freight.wf.operation;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
/**
* 任务执行类
*/
public abstract class ThreadTask implements Runnable {
/**
* 监控子任务的执行
*/
private CountDownLatch childMonitor;
/**
* 监控主线程
*/
private CountDownLatch mainMonitor;
/**
* 存储线程的返回结果
*/
private BlockingDeque resultList;
/**
* 回滚类
*/
private RollBack rollback;
private Map params;
protected Object obj;
protected DataSourceTransactionManager transactionManager;
protected TransactionStatus status;
public ThreadTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
this.childMonitor = childCountDown;
this.mainMonitor = mainCountDown;
this.resultList = result;
this.rollback = rollback;
this.transactionManager = transactionManager;
this.obj = obj;
this.params = params;
initParam();
}
/**
* 事务回滚
*/
private void rollBack() {
System.out.println(Thread.currentThread().getName() + "开始回滚");
transactionManager.rollback(status);
}
/**
* 事务提交
*/
private void submit() {
System.out.println(Thread.currentThread().getName() + "提交事务");
transactionManager.commit(status);
}
protected Object getParam(String key) {
return params.get(key);
}
public abstract void initParam();
/**
* 执行任务,返回false表示任务执行错误,需要回滚
*
* @return
*/
public abstract boolean processTask();
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "子线程开始执行任务");
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRED);
status = transactionManager.getTransaction(def);
Boolean result = processTask();
//向队列中添加处理结果
resultList.add(result);
//2、使用childMonitor.countDown()释放子线程锁定,同时使用mainMonitor.await();阻塞子线程,将程序的控制权交还给主线程。
childMonitor.countDown();
try {
//等待主线程的判断逻辑执行完,执行下面的是否回滚逻辑
mainMonitor.await();
} catch (Exception e) {
log.error(e.getMessage());
}
System.out.println(Thread.currentThread().getName() + "子线程执行剩下的任务");
//3、主线程检查子线程执行任务的结果,若有失败结果出现,主线程标记状态告知子线程回滚,然后使用mainMonitor.countDown();将程序控制权再次交给子线程,子线程检测回滚标志,判断是否回滚。
if (rollback.isNeedRoolBack()) {
rollBack();
} else {
//事务提交
submit();
}
}
}
事务回滚类
package com.yunshidi.freight.wf.operation;
import lombok.Data;
@Data
public class RollBack {
public RollBack(boolean needRoolBack) {
this.needRoolBack = needRoolBack;
}
private boolean needRoolBack;
}
使用线程池工具:
1,首先建立自己的任务执行类 并且 extends ThreadTask ,实现initParam()和processTask()方法
package com.yunshidi.freight.wf.operation;
import java.util.List;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CountDownLatch;
public class TestTask extends ThreadTask {
/**
* 分批处理的数据
*/
private List objectList;
/**
* 可能需要注入的某些服务
*/
private TestService testService;
public TestTask(CountDownLatch childCountDown, CountDownLatch mainCountDown, BlockingDeque result, RollBack rollback, DataSourceTransactionManager transactionManager, Object obj, Map params) {
super(childCountDown, mainCountDown, result, rollback, transactionManager, obj, params);
}
@Override
public void initParam() {
this.objectList = (List) getParam("objectList");
this.testService = (TestService) getParam("testService");
}
/**
* 执行任务,返回false表示任务执行错误,需要回滚
* @return
* */
@Override
public boolean processTask() {
try {
for (Object o : objectList) {
testService.list();
System.out.println(o.toString()+"执行自己的多线程任务逻辑");
}
return true;
} catch (Exception e) {
return false;
}
}
}
2,编写主任务执行方法
package com.yunshidi.freight.wf.operation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration;
import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class test {
@Autowired
private ThreadPoolTool threadPoolTool;
@Autowired
private TestService testService;
@Autowired
private DataSourceTransactionManager transactionManager;
/**
* 执行多线程任务方法
*/
public void testThreadTask() {
try {
int threadCount = 5;
//需要分批处理的数据
List objectList = new ArrayList<>();
Map params =new HashMap<>();
params.put("objectList",objectList);
params.put("testService",testService);
//调用多线程工具方法
threadPoolTool.excuteTask(transactionManager,objectList,threadCount,params, TestTask.class);
}catch (Exception e){
throw new RuntimeException(e.getMessage());
}
}
}