CompletableFuture 组合式异步编程

       最近学习dubbo 源码, 敲了些vertx 代码,觉得异步编程的风格是有多么的帅,lambda表达式写的美又很有逻辑。java 强语言能写成这样也是很美的一件事情。

       这遍文章简单的看看CompletableFuture 是怎么使用的,背后都干了啥事,先看看dubbo上的completableFuture。Dubbo 的future 使用,请看这篇博客http://dubbo.apache.org/en-us/blog/dubbo-new-async.html, 一直到dubbo 2.7.0 才出现 CompletableFuture,dubbo 2.6.x 会报序列化的错误, 因为 CompletableFuture<object> 不是一个序列化的对象,没有 implement serialization. 

(1). 首先看看 CompletableFuture 有哪些基本的用法。CompletableFuture 是一个class,实现future 和 CompletionStage(完成阶段) 接口。多个CompletionStage可以有先后顺序, 可以 多个任务同时完成,可以上一个任务执行的结果传给下个任务。有人说是线程编排,也有其道理。 CompletableFuture 不支持使用 callable,而支持Runnable 和 supplier, 这些都是被@FunctionalInterface 注解标注的接口。 有lambda 的表达特性。supplier 代替 callable 支持 有返回结果的异步。 

我们看一个例子:

ExecutorService executor = Executors.newFixedThreadPool(3); // 默认可以通过ForkJoinPool 产生的线程来执行任务,其线程数是 CPU 核数➖1,不然就使用 ThreadPerTaskExecutor

Supplier<Integer> externalTask = () -> {

// do something

return 3;

}

CompletableFuture.supplyAsync(externalTask, executor). 

我们发现其本质是使用executor 线程 执行 externalTask的任务,任务只完成之后将结果放到 Future 中,通过get 方法获取结果。Future get 的方法是同步还是异步呢? 我觉得会阻塞,一直拿到正确的结果(异常, 或者 线程计算出来的结果), 它和 completableFuture join 方法类似。对于Runnable 接口, 可使用 runAsync方法。

(2) 那么 CompletableFuture 如何触发 ConpletionStage 呢?  我们来看看 

public boolean complete(T value), public boolean completeExceptionally(Throwable ex) 。 我们发现在源码中或者别的异步框架中,比如前端的promise,当任务完成之后,会设置 complete,表示任务完成成功或者失败。complete会触发依赖他们的completionStage。下面是java 部分的代码,说实话我有点看不懂,好像是将依赖 放入 stack 中,然后一个一个依赖触发


*/

final void postComplete() {

/*

* On each step, variable f holds current dependents to pop

* and run.  It is extended along only one path at a time,

* pushing others to avoid unbounded recursion.

*/

    CompletableFuture f =this; Completion h;

    while ((h = f.stack) !=null ||

(f !=this && (h = (f =this).stack) !=null)) {

CompletableFuture d; Completion t;

        if (f.casStack(h, t = h.next)) {

if (t !=null) {

if (f !=this) {

pushStack(h);

continue;

                }

h.next =null;    // detach

            }

f = (d = h.tryFire(NESTED)) ==null ?this : d;

        }

}

}。

我们看一下

public CompletableFuture<T> whenComplete(

    BiConsumer<? super T, ? super Throwable> action)。 其中action 表示回调函数,类型是BiConsumer,接收 结果 和异常。那么我们如何来写一个有顺序性的任务呢? completableFuture.supplyAsync(externalTask).whenComplete((result, ex) ->{

// result 是上一个task 执行的结果

//ex执行的异常

}), 但是这个方法可能会造成同步阻塞,因为 supplyAsync 和 后面的 whenComplete 是同一个线程执行的,当然也有可能是不是同个线程执行,看注册时, supplyAsync 这个线程结束没有。我们可以使用 下面的 方法来异步执行

public CompletableFuture whenCompleteAsync(

    BiConsumer<? super T, ? super Throwable> action)

public CompletableFuture whenCompleteAsync(

    BiConsumer<? super T, ? super Throwable> action, Executor executor)    

但是如果我们只对异常结果感兴趣,我们可以使用下面的方法

public CompletableFuture<T> exceptionally(

    Function<Throwable, ? extends T> fn)

(3) : 构建依赖

无结果的依赖调用: CompletableFuture.runAsync(taskA).thenRun(taskB).thenRun(taskC).join // 异步版本的是 thenRunAsync

有结果的依赖调用: 

// Supplier<String> taskA = () -> "hello";  Function<String, String> taskB = (t) -> t.toUpperCase(); Consumer<String> taskC = (t) -> System.out.println("consume: " + t);

CompletableFuture.supplyAsync(taskA)

    .thenApply(taskB)

    .thenAccept(taskC)

    .join();

runAfterBoth(对应任务Runnable), thenCombine(对应任务BiFunction), thenAcceptBoth(对应任务类型BiConsumer)

Supplier<String> taskA = () -> "taskA";

CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> "taskB");

BiFunction<String, String, String> taskC = (a, b) -> a + "," + b;

String ret = CompletableFuture.supplyAsync(taskA)

        .thenCombineAsync(taskB, taskC)

        .join();

其中还有更为神奇的是allOf 和 anyOf, 列子都是来自于老马编程的,写的着实很好。


CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {

    delayRandom(100, 1000);

    return "helloA";

}, executor);

CompletableFuture<Void> taskB = CompletableFuture.runAsync(() -> {

    delayRandom(2000, 3000);

}, executor);

CompletableFuture<Void> taskC = CompletableFuture.runAsync(() -> {

    delayRandom(30, 100);

    throw new RuntimeException("task C exception");

}, executor);

CompletableFuture.allOf(taskA, taskB, taskC).whenComplete((result, ex) -> {

    if (ex != null) {

        System.out.println(ex.getMessage());

    }

    if (!taskA.isCompletedExceptionally()) {

        System.out.println("task A " + taskA.join());

    }

});


后面的线程使用的是ForkJoinPool,会创建相应的worker:tryAddWorker。里面的代码真的是太底层了,unsafe class 会涉及很多

(4), 咱们再回头简单看看 DefaultFuture, FutureAdapter 继承 completableFuture,futureAdapter中的 future.setCallback,会调用默认的DefaultFuture的setcallback。setCallback 会invoke callback:invokeCallback(callback);

其实dubbo 里面的异步支持比 2.6.x的异步代码,改变很大,也很精妙。等有时间再说把,先看看CompletableFuture 强大的功能。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,590评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 86,808评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,151评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,779评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,773评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,656评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,022评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,678评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,038评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,659评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,756评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,411评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,005评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,973评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,053评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,495评论 2 343

推荐阅读更多精彩内容