最近学习dubbo 源码, 敲了些vertx 代码,觉得异步编程的风格是有多么的帅,lambda表达式写的美又很有逻辑。java 强语言能写成这样也是很美的一件事情。
这遍文章简单的看看CompletableFuture 是怎么使用的,背后都干了啥事,先看看dubbo上的completableFuture。Dubbo 的future 使用,请看这篇博客http://dubbo.apache.org/en-us/blog/dubbo-new-async.html, 一直到dubbo 2.7.0 才出现 CompletableFuture,dubbo 2.6.x 会报序列化的错误, 因为 CompletableFuture<object> 不是一个序列化的对象,没有 implement serialization.
(1). 首先看看 CompletableFuture 有哪些基本的用法。CompletableFuture 是一个class,实现future 和 CompletionStage(完成阶段) 接口。多个CompletionStage可以有先后顺序, 可以 多个任务同时完成,可以上一个任务执行的结果传给下个任务。有人说是线程编排,也有其道理。 CompletableFuture 不支持使用 callable,而支持Runnable 和 supplier, 这些都是被@FunctionalInterface 注解标注的接口。 有lambda 的表达特性。supplier 代替 callable 支持 有返回结果的异步。
我们看一个例子:
ExecutorService executor = Executors.newFixedThreadPool(3); // 默认可以通过ForkJoinPool 产生的线程来执行任务,其线程数是 CPU 核数➖1,不然就使用 ThreadPerTaskExecutor
Supplier<Integer> externalTask = () -> {
// do something
return 3;
}
CompletableFuture.supplyAsync(externalTask, executor).
我们发现其本质是使用executor 线程 执行 externalTask的任务,任务只完成之后将结果放到 Future 中,通过get 方法获取结果。Future get 的方法是同步还是异步呢? 我觉得会阻塞,一直拿到正确的结果(异常, 或者 线程计算出来的结果), 它和 completableFuture join 方法类似。对于Runnable 接口, 可使用 runAsync方法。
(2) 那么 CompletableFuture 如何触发 ConpletionStage 呢? 我们来看看
public boolean complete(T value), public boolean completeExceptionally(Throwable ex) 。 我们发现在源码中或者别的异步框架中,比如前端的promise,当任务完成之后,会设置 complete,表示任务完成成功或者失败。complete会触发依赖他们的completionStage。下面是java 部分的代码,说实话我有点看不懂,好像是将依赖 放入 stack 中,然后一个一个依赖触发
*/
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture f =this; Completion h;
while ((h = f.stack) !=null ||
(f !=this && (h = (f =this).stack) !=null)) {
CompletableFuture d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t !=null) {
if (f !=this) {
pushStack(h);
continue;
}
h.next =null; // detach
}
f = (d = h.tryFire(NESTED)) ==null ?this : d;
}
}
}。
我们看一下
public CompletableFuture<T> whenComplete(
BiConsumer<? super T, ? super Throwable> action)。 其中action 表示回调函数,类型是BiConsumer,接收 结果 和异常。那么我们如何来写一个有顺序性的任务呢? completableFuture.supplyAsync(externalTask).whenComplete((result, ex) ->{
// result 是上一个task 执行的结果
//ex执行的异常
}), 但是这个方法可能会造成同步阻塞,因为 supplyAsync 和 后面的 whenComplete 是同一个线程执行的,当然也有可能是不是同个线程执行,看注册时, supplyAsync 这个线程结束没有。我们可以使用 下面的 方法来异步执行
public CompletableFuture whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture whenCompleteAsync(
BiConsumer<? super T, ? super Throwable> action, Executor executor)
但是如果我们只对异常结果感兴趣,我们可以使用下面的方法
public CompletableFuture<T> exceptionally(
Function<Throwable, ? extends T> fn)
(3) : 构建依赖
无结果的依赖调用: CompletableFuture.runAsync(taskA).thenRun(taskB).thenRun(taskC).join // 异步版本的是 thenRunAsync
有结果的依赖调用:
// Supplier<String> taskA = () -> "hello"; Function<String, String> taskB = (t) -> t.toUpperCase(); Consumer<String> taskC = (t) -> System.out.println("consume: " + t);
CompletableFuture.supplyAsync(taskA)
.thenApply(taskB)
.thenAccept(taskC)
.join();
runAfterBoth(对应任务Runnable), thenCombine(对应任务BiFunction), thenAcceptBoth(对应任务类型BiConsumer)
Supplier<String> taskA = () -> "taskA";
CompletableFuture<String> taskB = CompletableFuture.supplyAsync(() -> "taskB");
BiFunction<String, String, String> taskC = (a, b) -> a + "," + b;
String ret = CompletableFuture.supplyAsync(taskA)
.thenCombineAsync(taskB, taskC)
.join();
其中还有更为神奇的是allOf 和 anyOf, 列子都是来自于老马编程的,写的着实很好。
CompletableFuture<String> taskA = CompletableFuture.supplyAsync(() -> {
delayRandom(100, 1000);
return "helloA";
}, executor);
CompletableFuture<Void> taskB = CompletableFuture.runAsync(() -> {
delayRandom(2000, 3000);
}, executor);
CompletableFuture<Void> taskC = CompletableFuture.runAsync(() -> {
delayRandom(30, 100);
throw new RuntimeException("task C exception");
}, executor);
CompletableFuture.allOf(taskA, taskB, taskC).whenComplete((result, ex) -> {
if (ex != null) {
System.out.println(ex.getMessage());
}
if (!taskA.isCompletedExceptionally()) {
System.out.println("task A " + taskA.join());
}
});
后面的线程使用的是ForkJoinPool,会创建相应的worker:tryAddWorker。里面的代码真的是太底层了,unsafe class 会涉及很多
(4), 咱们再回头简单看看 DefaultFuture, FutureAdapter 继承 completableFuture,futureAdapter中的 future.setCallback,会调用默认的DefaultFuture的setcallback。setCallback 会invoke callback:invokeCallback(callback);
其实dubbo 里面的异步支持比 2.6.x的异步代码,改变很大,也很精妙。等有时间再说把,先看看CompletableFuture 强大的功能。