CompletableFuture多任务当一个任务执行失败后如何快速失败返回

1.业务背景

存在如下的业务需求:对于一个批任务,包含多个子任务taskId,在多线程并发执行时,如果出现一个子任务执行失败,则要求该批次的所有任务立即停止,并返回降级的结果。

2.问题和难点

在执行比较耗时的任务,如IO请求,一般都会使用多线程并发的执行。如果想要实现一个子任务异常返回后,控制其他线程任务(执行、待执行)快速返回。一般需要线程间通讯的方式,或者使用线程响应中断等等。实现麻烦且繁琐,容易出错或者性能上提升有限。

3.基于CompletableFuture语义的快速失败方案。

CompletableFuture在Future的基础上提供了完全异步回调的解决方案。且支持complete(),直接返回结果。通过组合CompletableFuture的功能,可以简单的实现许多时序上的并发问题。

  public class ThreadExec {

    private ExecutorService executorService;

    private static Gson gson = new Gson();
    private List<CompletableFuture<BoResponse>> completableFutures = Lists.newArrayList();

    public ExecutorService getExecutorService() {
        return executorService;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public List<CompletableFuture<BoResponse>> getCompletableFutures() {
        return completableFutures;
    }

    public void setCompletableFutures(List<CompletableFuture<BoResponse>> completableFutures) {
        this.completableFutures = completableFutures;
    }

    public ThreadExec() {
        executorService = new ThreadPoolExecutor(5, 10, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(), new BasicThreadFactory.Builder().namingPattern("rule-timout-pol-%d").daemon(true).build());
    }
  public BoResponse handleTaskWithExp(int index) throws RuntimeException {
        try {
            Thread.sleep(2 * 1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 模拟处理时间
        if (index == 3) {
            throw new RuntimeException("抛一个异常");
        }
        BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(index).success(Boolean.TRUE).build();
        return boResponse;
    }

 /**
     * redis请求,比较耗时的多个请求,有一个执行失败,其他执行中的快速失败返回
     * waitForAllButAbortOnFirstException
     *
     * @param
     */

    public static void start3() {
        long t1 = System.currentTimeMillis();

        ThreadExec exec = new ThreadExec();
        CompletableFuture<String> future = new CompletableFuture<>();

        for (int i = 0; i < 1000; i++) {
            final int a = i;
            exec.getCompletableFutures().add(CompletableFuture.supplyAsync(() -> exec.handleTaskWithExp(a), exec.getExecutorService())
                    .exceptionally(e -> {
                        System.out.println(2233);
                        BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(a).success(Boolean.FALSE).build();
                        future.completeExceptionally(new RuntimeException());  //抛出一个异常
                        return boResponse;
                    })
            );
        }
        // 异步回来了
        CompletableFuture[] completableFutureArray = new CompletableFuture[exec.getCompletableFutures().size()];
        exec.getCompletableFutures().toArray(completableFutureArray);

        CompletableFuture<Void> allDoneFuture = CompletableFuture
                .allOf(completableFutureArray);

        future.thenApply(v -> v).exceptionally(e -> {
            exec.getCompletableFutures().forEach(boResponseCompletableFuture -> {
                BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(-1).success(Boolean.FALSE).build();
                boResponseCompletableFuture.complete(boResponse);

            });
            return null;
        });

        List<BoResponse> data = null;
        try {
            data = allDoneFuture
                    .thenApply(
                            v -> exec.getCompletableFutures().stream().map(CompletableFuture::join).collect(Collectors.toList()))
                    .get().stream().sorted(Comparator.comparing(BoResponse::getShard))
                    .collect(Collectors.toList());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        for (BoResponse bo : data) {
            System.out.println(gson.toJson(bo));
        }
        System.out.println(System.currentTimeMillis() - t1);
    }
public static void main(String[] args) {
        //stat1();
        //stat2();
        start3();
        DateTimeUtil.sleepQuietly(40 * 1000L);

    }

首先简要说下任务执行流程:handleTaskWithExp()方法负责具体任务处理,对于批任务,将结果返回添加到 List<CompletableFuture<BoResponse>> completableFutures这个任务列表中,并通过 allof来等待所有结果返回,allof方法会返回一个CompletableFuture 类的对象,对于allDoneFuture ,可以添加批任务执行完成后的处理逻辑,如排序等操作。

快速失败,在handleTaskWithExp中遇到异常时,主动抛出一个 RuntimeException,这个异常会被exceptionally方法进行捕获。但如何在一个子任务的exceptionally中去操作整个批任务的结果呢?此处我们可以通过一个中间人future 来进行通信,具体的方式可以看上文代码。future在子任务失败时,会主动的抛出一个异常,在thenApply执行中,会遍历所有的批任务,将批任务手动设置已完成。

4.注意的坑

对于CompletrFuture使用complete时,和 handle只会执行其中一个。而且如果handle后定义了thenApply()这种执行后的附加动作,在使用complete后是不会继续执行thenApply方法的。要自行处理后续逻辑。

   /**
     * complete先执行后,和supplyasync 执行的后置动作会不会再被触发呢?
     */
    public static void start4() {
            ThreadExec exec = new ThreadExec();
            for (int i = 0; i < 8; i++) {
                final int a = i;
                exec.getCompletableFutures().add(CompletableFuture.supplyAsync(() -> exec.handleTask(a), exec.getExecutorService())
                        .handle(new BiFunction<BoResponse, Throwable, BoResponse>() {
                            @Override
                            public BoResponse apply(BoResponse param, Throwable throwable) {
                                if (throwable == null) {
                                    param.setSuccess(true);
                                    System.out.println(param.getShard());
                                } else {
                                    System.out.println(throwable.getMessage());
                                }
                                return param;
                            }
                        }).thenApply(rep -> {
                            System.out.println(rep.getShard() + "thenApply 执行");
                            return null;
                        })
                );
            }
            // 随机释放一个
            BoResponse boResponse = BoResponse.builder().elapsedTime(100L).shard(2).success(Boolean.FALSE).build();
            exec.getCompletableFutures().get(2).complete(boResponse);
    }

用上面的测试用例测了下,发现输出如下

0
0thenApply 执行
1
1thenApply 执行
2
3
3thenApply 执行
4
4thenApply 执行
6
6thenApply 执行
5
5thenApply 执行
7
7thenApply 执行

shard=2的时候,thenApply就没有执行了,因为被complete抢先执行返回了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,911评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,014评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,129评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,283评论 1 264
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,159评论 4 357
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,161评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,565评论 3 382
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,251评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,531评论 1 292
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,619评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,383评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,255评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,624评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,916评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,199评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,553评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,756评论 2 335