CompletableFuture 异步处理任务总结

oracle JDK8 有关内容的文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html

创建异步任务

runAsync 执行 CompletableFuture 任务,没有返回值

static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

supplyAsync 执行 CompletableFuture 任务,可有返回值

static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

如果不指定 Executor 实现,则使用 ForkJoinPool.commonPool() 作为执行异步代码的线程池

创建异步任务后,可根据需求进行如下的操作:

方法名称 类型 传参 返回值
thenRun 单任务消费 无传参 无返回值
thenRunAsync 单任务消费 无传参 无返回值
thenApply 单任务消费 要传参 有返回值
thenApplyAsync 单任务消费 要传参 有返回值
thenAccept 单任务消费 要传参 无返回值
thenAcceptAsync 单任务消费 要传参 无返回值
thenCombine 双任务消费(与) 要传参(两个任务的执行结果) 有返回值
thenCombineAsync 双任务消费(与) 要传参(两个任务的执行结果) 有返回值
thenAcceptBoth 双任务消费(与) 要传参(两个任务的执行结果) 无返回值
thenAcceptBothAsync 双任务消费(与) 要传参(两个任务的执行结果) 无返回值
runAfterBoth 双任务消费(与) 无传参 无返回值
runAfterBothAsync 双任务消费(与) 无传参 无返回值
applyToEither 双任务消费(或) 要传参(已完成任务的执行结果) 有返回值
applyToEitherAsync 双任务消费(或) 要传参(已完成任务的执行结果) 有返回值
acceptEither 双任务消费(或) 要传参(已完成任务的执行结果) 无返回值
acceptEitherAsync 双任务消费(或) 要传参(已完成任务的执行结果) 无返回值
runAfterEither 双任务消费(或) 无传参 无返回值
runAfterEitherAsync 双任务消费(或) 无传参 无返回值
whenComplete 单任务消费 要传参(正常返回值和异常) 无返回值
whenCompleteAsync 单任务消费 要传参(正常返回值和异常) 无返回值
handle 单任务消费 要传参(正常返回值和异常) 有返回值
handleAsync 单任务消费 要传参(正常返回值和异常) 有返回值
exceptionally 单任务消费 要传参 (异常) 无返回值
thenCompose 单任务消费 要传参 有返回值
allOf 多任务消费(与) 要传参(任务列表) 无返回值
anyOf 多任务消费(或) 要传参(任务列表) 无返回值

不带 Async 版本由上一个任务的线程继续执行该任务,Async 版本可以指定执行该异步任务的 Executor 实现,如果不指定,默认使用 ForkJoinPool.commonPool()

单任务消费

回调方法 类型 传参 返回值
thenRun 单任务消费 无传参 无返回值
thenRunAsync 单任务消费 无传参 无返回值
thenAccept 单任务消费 要传参 无返回值
thenAcceptAsync 单任务消费 要传参 无返回值
thenApply 单任务消费 要传参 有返回值
thenApplyAsync 单任务消费 要传参 有返回值
public static void main(String[] args) throws Exception {
    var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
        System.out.println("supplyAsyncTask=" + Thread.currentThread().getName());
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "";
    }, executor);

    // thenApply
    var thenApplyTask = supplyAsyncTask.thenApply((param) -> {
        System.out.println("thenApplyTask=" + Thread.currentThread().getName());
        return "";
    });

    // thenApplyAsync不指定线程池
    var thenApplyAsyncTask = supplyAsyncTask.thenApplyAsync((param) -> {
        System.out.println("thenApplyAsyncTask=" + Thread.currentThread().getName());
        return "";
    });

    // thenApplyAsync指定线程池
    var thenApplyAsyncTask2 = supplyAsyncTask.thenApplyAsync((param) -> {
        System.out.println("thenApplyAsyncTask2=" + Thread.currentThread().getName());
        return "";
    }, executor);

    // 不调用get()将不执行回调
    thenApplyAsyncTask.get();
    thenApplyAsyncTask2.get();

    // 关闭线程池
    executor.shutdown();
}

输出结果:

supplyAsyncTask=pool-1-thread-1
thenApplyAsyncTask2=pool-1-thread-2
thenApplyTask=pool-1-thread-2
thenApplyAsyncTask=ForkJoinPool.commonPool-worker-3

双任务消费(与)

将两个 CompletableFuture 组合起来,只有这两个都正常执行完了,才会执行某个任务。

方法名称 类型 传参 返回值
thenCombine 双任务消费(与) 有传参(两个任务的执行结果) 有返回值
thenCombineAsync 双任务消费(与) 有传参(两个任务的执行结果) 有返回值
thenAcceptBoth 双任务消费(与) 有传参(两个任务的执行结果) 无返回值
thenAcceptBothAsync 双任务消费(与) 有传参(两个任务的执行结果) 无返回值
runAfterBoth 双任务消费(与) 无传参 无返回值
runAfterBothAsync 双任务消费(与) 无传参 无返回值
public static void main(String[] args) throws Exception {

    var task1 = CompletableFuture.supplyAsync(() -> "task1");
    var task2 = CompletableFuture.supplyAsync(() -> "task2");
    var task3 = CompletableFuture.supplyAsync(() -> "task3");

    task1.thenCombine(task2, (param1, param2) -> {
        // task1task2
        System.out.println(param1 + param2);
        return param1 + param2;
    }).thenCombine(task3, (param12, param3) -> {
        // task1task2task3
        System.out.println(param12 + param3);
        return param12 + param3;
    });

    task1.thenAcceptBoth(task2, (param1, param2) -> {
        // task1task2
        System.out.println(param1 + param2);
    }).thenAcceptBoth(task3, (param12, param3) -> {
        // nulltask3
        System.out.println(param12 + param3);
    });

    task1.runAfterBoth(task2, () -> {
        // task1 and task2
        System.out.println("task1 and task2");
    });
}

双任务消费(或)

将两个 CompletableFuture 组合起来,只要其中一个执行完了,就执行回调方法。

方法名称 类型 传参 返回值
applyToEither 双任务消费(或) 有传参(已完成任务的执行结果) 有返回值
applyToEitherAsync 双任务消费(或) 有传参(已完成任务的执行结果) 有返回值
acceptEither 双任务消费(或) 有传参(已完成任务的执行结果) 无返回值
acceptEitherAsync 双任务消费(或) 有传参(已完成任务的执行结果) 无返回值
runAfterEither 双任务消费(或) 无传参 无返回值
runAfterEitherAsync 双任务消费(或) 无传参 无返回值
public static void main(String[] args) throws Exception {

    var task1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task1";
    });
    var task2 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task2";
    });
    var task3 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task3";
    });

    task1.applyToEither(task2, (param) -> {
        // applyToEither=task2
        System.out.println("applyToEither=" + param);
        return param;
    }).acceptEither(task3, (param) -> {
        // acceptEither=task2 或 acceptEither=task3
        System.out.println("acceptEither=" + param);
    }).get();

    // task1 or task2
    task1.runAfterEither(task2,()-> System.out.println("task1 or task2"));
}

其他

whenComplete、whenCompleteAsync

某个任务执行完成后,执行的回调方法,无返回值。可以访问 CompletableFuture 的结果和异常作为参数,使用它们并执行想要的操作。此方法并不能转换完成的结果。会内部抛出异常。其正常返回的 CompletableFuture 的结果来自上个任务。

handle、handleAsync

不论正常返回还是出异常都会进入 handle,参数通常为 new BiFunction<T, Throwable, R>();,其中

  • T:上一任务传入的对象类型
  • Throwable:上一任务传入的异常
  • R:返回的对象类型

handle 和 thenApply 的区别:如果任务出现异常不会进入 thenApply;任务出现异常也会进入 handle,可对异常处理。

handle 和 whenComplete 的区别:handle 可对传入值 T 进行转换,并产生自己的返回结果 R;whenComplete 的返回值和上级任务传入的结果一致,不能转换。

whenComplete、whenCompleteAsync、handle 和 handleAsync 的输入参数一个是正常结果一个是异常结果,而 exceptionally 的输入参数为异常结果。

public static void main(String[] args) throws Exception {

    var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
        // 制造一个异常
        // int value = 1 / 0;
        return "supplyAsyncTask";
    });

    var handle = supplyAsyncTask.handle((s, throwable) -> {
        if (Optional.ofNullable(throwable).isPresent()) {
            return throwable.getMessage();
        }
        return new ArrayList() {{
            add(s);
        }};
    });

    // supplyAsyncTask异常时,输出1:java.lang.ArithmeticException: / by zero
    // 输出2:[supplyAsyncTask]
    System.out.println(handle.get());
}

exceptionally

某个任务执行抛出异常时执行的回调方法。抛出异常作为参数,传递到回调方法。仅处理异常情况。如果任务成功完成,那么将被跳过。

public static void main(String[] args) throws Exception {

    var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
        double error=1/0;
        return "ok";
    }, executor).exceptionally((e)->{
        // java.lang.ArithmeticException: / by zero
        System.out.println(e.getMessage());
        return "error";
    });

    // "error"
    System.out.println(supplyAsyncTask.get());
}

complete

如果尚未完成,则将 get() 和相关方法返回的值设置为给定值。如果此调用导致此 CompletableFuture 转换到完成状态,则返回 true,否则返回 false。文档描述:

If not already completed, sets the value returned by get() and related methods to the given value.

Params:
value – the result value
Returns:
true if this invocation caused this CompletableFuture to transition to a completed state, else false
public static void main(String[] args) throws Exception {
    var task1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(15);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return 10;
    });

    // 若get放在此处,一直等待task1完成,输出10
    // System.out.println(task1.get());
    // 强制task1完成,输出true
    System.out.println(task1.complete(5));
    // 输出5
    System.out.println(task1.get());
    // task1已完成,输出false
    System.out.println(task1.complete(50));
}

thenCompose

源码定义

public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;

thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为入参,执行指定的方法。该方法会返回一个新的 CompletableFuture 实例

public static void main(String[] args) throws Exception {
    var task1 = CompletableFuture.supplyAsync(() -> 10);

    var task2 = task1.thenCompose(param -> {
        System.out.println("this is task2 param=" + param);
        return CompletableFuture.supplyAsync(() -> {
            System.out.println("this is task2 square");
            return Math.pow(param, 2);
        });
    }).thenApply(param -> {
        System.out.println("thenApply get the square=" + param);
        return param;
    });

    var task3 = task1.thenCompose(param -> {
        System.out.println("this is task3 param=" + param);
        return CompletableFuture.runAsync(() -> {
            System.out.println("this is task3 square");
            System.out.println(Math.pow(param, 2));
        });
    });

    System.out.println("task2 get=" + task2.get());
    System.out.println("task3 get=" + task3.get());
}

输出:
this is task2 param=10
this is task2 square
thenApply get the square=100.0
this is task3 param=10
this is task3 square
100.0
task2 get=100.0
task3 get=null

allOf

静态方法,阻塞等待所有给定的 CompletableFuture 执行结束后,返回一个 CompletableFuture<Void> 结果。所有任务都执行完成后,才执行 allOf 的回调方法。如果任意一个任务异常,执行 get 方法时会抛出异常。

public static void main(String[] args) throws Exception {

    var task1 = CompletableFuture.supplyAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        return "task1";
    });

    var task2 = CompletableFuture.runAsync(() -> {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        int value = 1 / 0;
        System.out.println("task2 is over");
    });

    CompletableFuture.allOf(task1, task2).whenComplete((param, throwable) -> {
        // null
        System.out.println(param);
    }).exceptionally(throwable -> {
        // task3 allOf throwable=java.lang.ArithmeticException: / by zero
        System.out.println("task3 allOf throwable=" + throwable.getMessage());
        return null;
    }).get();
}

anyOf

静态方法,阻塞等待任意一个给定的 CompletableFuture 对象执行结束后,返回一个 CompletableFuture<Void> 结果。任意一个任务执行完,就执行 anyOf 的回调方法。如果执行的任务异常,执行 get 方法时会抛出异常。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,839评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,543评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,116评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,371评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,384评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,111评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,416评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,053评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,558评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,007评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,117评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,756评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,324评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,315评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,539评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,578评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,877评论 2 345

推荐阅读更多精彩内容