1.业务背景
存在如下的业务需求:对于一个批任务,包含多个子任务taskId,在多线程并发执行时,如果出现一个子任务执行失败,则要求该批次的所有任务立即停止,并返回降级的结果。
2.问题和难点
在执行比较耗时的任务,如IO请求,一般都会使用多线程并发的执行。如果想要实现一个子任务异常返回后,控制其他线程任务(执行、待执行)快速返回。一般需要线程间通讯的方式,或者使用线程响应中断等等。实现麻烦且繁琐,容易出错或者性能上提升有限。
3.基于CompletableFuture语义的快速失败方案。
CompletableFuture在Future的基础上提供了完全异步回调的解决方案。且支持complete(),直接返回结果。通过组合CompletableFuture的功能,可以简单的实现许多时序上的并发问题。
public class ThreadExec {
private ExecutorService executorService;
private static Gson gson = new Gson();
private List<CompletableFuture<BoResponse>> completableFutures = Lists.newArrayList();
public ExecutorService getExecutorService() {
return executorService;
}
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
public List<CompletableFuture<BoResponse>> getCompletableFutures() {
return completableFutures;
}
public void setCompletableFutures(List<CompletableFuture<BoResponse>> completableFutures) {
this.completableFutures = completableFutures;
}
public ThreadExec() {
executorService = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(), new BasicThreadFactory.Builder().namingPattern("rule-timout-pol-%d").daemon(true).build());
}
public BoResponse handleTaskWithExp(int index) throws RuntimeException {
try {
Thread.sleep(2 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 模拟处理时间
if (index == 3) {
throw new RuntimeException("抛一个异常");
}
BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(index).success(Boolean.TRUE).build();
return boResponse;
}
/**
* redis请求,比较耗时的多个请求,有一个执行失败,其他执行中的快速失败返回
* waitForAllButAbortOnFirstException
*
* @param
*/
public static void start3() {
long t1 = System.currentTimeMillis();
ThreadExec exec = new ThreadExec();
CompletableFuture<String> future = new CompletableFuture<>();
for (int i = 0; i < 1000; i++) {
final int a = i;
exec.getCompletableFutures().add(CompletableFuture.supplyAsync(() -> exec.handleTaskWithExp(a), exec.getExecutorService())
.exceptionally(e -> {
System.out.println(2233);
BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(a).success(Boolean.FALSE).build();
future.completeExceptionally(new RuntimeException()); //抛出一个异常
return boResponse;
})
);
}
// 异步回来了
CompletableFuture[] completableFutureArray = new CompletableFuture[exec.getCompletableFutures().size()];
exec.getCompletableFutures().toArray(completableFutureArray);
CompletableFuture<Void> allDoneFuture = CompletableFuture
.allOf(completableFutureArray);
future.thenApply(v -> v).exceptionally(e -> {
exec.getCompletableFutures().forEach(boResponseCompletableFuture -> {
BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(-1).success(Boolean.FALSE).build();
boResponseCompletableFuture.complete(boResponse);
});
return null;
});
List<BoResponse> data = null;
try {
data = allDoneFuture
.thenApply(
v -> exec.getCompletableFutures().stream().map(CompletableFuture::join).collect(Collectors.toList()))
.get().stream().sorted(Comparator.comparing(BoResponse::getShard))
.collect(Collectors.toList());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
for (BoResponse bo : data) {
System.out.println(gson.toJson(bo));
}
System.out.println(System.currentTimeMillis() - t1);
}
public static void main(String[] args) {
//stat1();
//stat2();
start3();
DateTimeUtil.sleepQuietly(40 * 1000L);
}
首先简要说下任务执行流程:handleTaskWithExp()方法负责具体任务处理,对于批任务,将结果返回添加到 List<CompletableFuture<BoResponse>> completableFutures这个任务列表中,并通过 allof来等待所有结果返回,allof方法会返回一个CompletableFuture 类的对象,对于allDoneFuture ,可以添加批任务执行完成后的处理逻辑,如排序等操作。
快速失败,在handleTaskWithExp中遇到异常时,主动抛出一个 RuntimeException,这个异常会被exceptionally方法进行捕获。但如何在一个子任务的exceptionally中去操作整个批任务的结果呢?此处我们可以通过一个中间人future 来进行通信,具体的方式可以看上文代码。future在子任务失败时,会主动的抛出一个异常,在thenApply执行中,会遍历所有的批任务,将批任务手动设置已完成。
4.注意的坑
对于CompletrFuture使用complete时,和 handle只会执行其中一个。而且如果handle后定义了thenApply()这种执行后的附加动作,在使用complete后是不会继续执行thenApply方法的。要自行处理后续逻辑。
/**
* complete先执行后,和supplyasync 执行的后置动作会不会再被触发呢?
*/
public static void start4() {
ThreadExec exec = new ThreadExec();
for (int i = 0; i < 8; i++) {
final int a = i;
exec.getCompletableFutures().add(CompletableFuture.supplyAsync(() -> exec.handleTask(a), exec.getExecutorService())
.handle(new BiFunction<BoResponse, Throwable, BoResponse>() {
@Override
public BoResponse apply(BoResponse param, Throwable throwable) {
if (throwable == null) {
param.setSuccess(true);
System.out.println(param.getShard());
} else {
System.out.println(throwable.getMessage());
}
return param;
}
}).thenApply(rep -> {
System.out.println(rep.getShard() + "thenApply 执行");
return null;
})
);
}
// 随机释放一个
BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(2).success(Boolean.FALSE).build();
exec.getCompletableFutures().get(2).complete(boResponse);
}
用上面的测试用例测了下,发现输出如下
0
0thenApply 执行
1
1thenApply 执行
2
3
3thenApply 执行
4
4thenApply 执行
6
6thenApply 执行
5
5thenApply 执行
7
7thenApply 执行
shard=2的时候,thenApply就没有执行了,因为被complete抢先执行返回了。