oracle JDK8 有关内容的文档:https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html
创建异步任务
runAsync 执行 CompletableFuture 任务,没有返回值
static CompletableFuture<Void> runAsync(Runnable runnable)
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
supplyAsync 执行 CompletableFuture 任务,可有返回值
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
如果不指定 Executor 实现,则使用
ForkJoinPool.commonPool()
作为执行异步代码的线程池
创建异步任务后,可根据需求进行如下的操作:
方法名称 | 类型 | 传参 | 返回值 |
---|---|---|---|
thenRun | 单任务消费 | 无传参 | 无返回值 |
thenRunAsync | 单任务消费 | 无传参 | 无返回值 |
thenApply | 单任务消费 | 要传参 | 有返回值 |
thenApplyAsync | 单任务消费 | 要传参 | 有返回值 |
thenAccept | 单任务消费 | 要传参 | 无返回值 |
thenAcceptAsync | 单任务消费 | 要传参 | 无返回值 |
thenCombine | 双任务消费(与) | 要传参(两个任务的执行结果) | 有返回值 |
thenCombineAsync | 双任务消费(与) | 要传参(两个任务的执行结果) | 有返回值 |
thenAcceptBoth | 双任务消费(与) | 要传参(两个任务的执行结果) | 无返回值 |
thenAcceptBothAsync | 双任务消费(与) | 要传参(两个任务的执行结果) | 无返回值 |
runAfterBoth | 双任务消费(与) | 无传参 | 无返回值 |
runAfterBothAsync | 双任务消费(与) | 无传参 | 无返回值 |
applyToEither | 双任务消费(或) | 要传参(已完成任务的执行结果) | 有返回值 |
applyToEitherAsync | 双任务消费(或) | 要传参(已完成任务的执行结果) | 有返回值 |
acceptEither | 双任务消费(或) | 要传参(已完成任务的执行结果) | 无返回值 |
acceptEitherAsync | 双任务消费(或) | 要传参(已完成任务的执行结果) | 无返回值 |
runAfterEither | 双任务消费(或) | 无传参 | 无返回值 |
runAfterEitherAsync | 双任务消费(或) | 无传参 | 无返回值 |
whenComplete | 单任务消费 | 要传参(正常返回值和异常) | 无返回值 |
whenCompleteAsync | 单任务消费 | 要传参(正常返回值和异常) | 无返回值 |
handle | 单任务消费 | 要传参(正常返回值和异常) | 有返回值 |
handleAsync | 单任务消费 | 要传参(正常返回值和异常) | 有返回值 |
exceptionally | 单任务消费 | 要传参 (异常) | 无返回值 |
thenCompose | 单任务消费 | 要传参 | 有返回值 |
allOf | 多任务消费(与) | 要传参(任务列表) | 无返回值 |
anyOf | 多任务消费(或) | 要传参(任务列表) | 无返回值 |
不带 Async 版本由上一个任务的线程继续执行该任务,Async 版本可以指定执行该异步任务的 Executor 实现,如果不指定,默认使用
ForkJoinPool.commonPool()
单任务消费
回调方法 | 类型 | 传参 | 返回值 |
---|---|---|---|
thenRun | 单任务消费 | 无传参 | 无返回值 |
thenRunAsync | 单任务消费 | 无传参 | 无返回值 |
thenAccept | 单任务消费 | 要传参 | 无返回值 |
thenAcceptAsync | 单任务消费 | 要传参 | 无返回值 |
thenApply | 单任务消费 | 要传参 | 有返回值 |
thenApplyAsync | 单任务消费 | 要传参 | 有返回值 |
public static void main(String[] args) throws Exception {
var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsyncTask=" + Thread.currentThread().getName());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "";
}, executor);
// thenApply
var thenApplyTask = supplyAsyncTask.thenApply((param) -> {
System.out.println("thenApplyTask=" + Thread.currentThread().getName());
return "";
});
// thenApplyAsync不指定线程池
var thenApplyAsyncTask = supplyAsyncTask.thenApplyAsync((param) -> {
System.out.println("thenApplyAsyncTask=" + Thread.currentThread().getName());
return "";
});
// thenApplyAsync指定线程池
var thenApplyAsyncTask2 = supplyAsyncTask.thenApplyAsync((param) -> {
System.out.println("thenApplyAsyncTask2=" + Thread.currentThread().getName());
return "";
}, executor);
// 不调用get()将不执行回调
thenApplyAsyncTask.get();
thenApplyAsyncTask2.get();
// 关闭线程池
executor.shutdown();
}
输出结果:
supplyAsyncTask=pool-1-thread-1
thenApplyAsyncTask2=pool-1-thread-2
thenApplyTask=pool-1-thread-2
thenApplyAsyncTask=ForkJoinPool.commonPool-worker-3
双任务消费(与)
将两个 CompletableFuture 组合起来,只有这两个都正常执行完了,才会执行某个任务。
方法名称 | 类型 | 传参 | 返回值 |
---|---|---|---|
thenCombine | 双任务消费(与) | 有传参(两个任务的执行结果) | 有返回值 |
thenCombineAsync | 双任务消费(与) | 有传参(两个任务的执行结果) | 有返回值 |
thenAcceptBoth | 双任务消费(与) | 有传参(两个任务的执行结果) | 无返回值 |
thenAcceptBothAsync | 双任务消费(与) | 有传参(两个任务的执行结果) | 无返回值 |
runAfterBoth | 双任务消费(与) | 无传参 | 无返回值 |
runAfterBothAsync | 双任务消费(与) | 无传参 | 无返回值 |
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> "task1");
var task2 = CompletableFuture.supplyAsync(() -> "task2");
var task3 = CompletableFuture.supplyAsync(() -> "task3");
task1.thenCombine(task2, (param1, param2) -> {
// task1task2
System.out.println(param1 + param2);
return param1 + param2;
}).thenCombine(task3, (param12, param3) -> {
// task1task2task3
System.out.println(param12 + param3);
return param12 + param3;
});
task1.thenAcceptBoth(task2, (param1, param2) -> {
// task1task2
System.out.println(param1 + param2);
}).thenAcceptBoth(task3, (param12, param3) -> {
// nulltask3
System.out.println(param12 + param3);
});
task1.runAfterBoth(task2, () -> {
// task1 and task2
System.out.println("task1 and task2");
});
}
双任务消费(或)
将两个 CompletableFuture 组合起来,只要其中一个执行完了,就执行回调方法。
方法名称 | 类型 | 传参 | 返回值 |
---|---|---|---|
applyToEither | 双任务消费(或) | 有传参(已完成任务的执行结果) | 有返回值 |
applyToEitherAsync | 双任务消费(或) | 有传参(已完成任务的执行结果) | 有返回值 |
acceptEither | 双任务消费(或) | 有传参(已完成任务的执行结果) | 无返回值 |
acceptEitherAsync | 双任务消费(或) | 有传参(已完成任务的执行结果) | 无返回值 |
runAfterEither | 双任务消费(或) | 无传参 | 无返回值 |
runAfterEitherAsync | 双任务消费(或) | 无传参 | 无返回值 |
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task1";
});
var task2 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task2";
});
var task3 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task3";
});
task1.applyToEither(task2, (param) -> {
// applyToEither=task2
System.out.println("applyToEither=" + param);
return param;
}).acceptEither(task3, (param) -> {
// acceptEither=task2 或 acceptEither=task3
System.out.println("acceptEither=" + param);
}).get();
// task1 or task2
task1.runAfterEither(task2,()-> System.out.println("task1 or task2"));
}
其他
whenComplete、whenCompleteAsync
某个任务执行完成后,执行的回调方法,无返回值。可以访问 CompletableFuture 的结果和异常作为参数,使用它们并执行想要的操作。此方法并不能转换完成的结果。会内部抛出异常。其正常返回的 CompletableFuture 的结果来自上个任务。
handle、handleAsync
不论正常返回还是出异常都会进入 handle,参数通常为 new BiFunction<T, Throwable, R>();
,其中
- T:上一任务传入的对象类型
- Throwable:上一任务传入的异常
- R:返回的对象类型
handle 和 thenApply 的区别:如果任务出现异常不会进入 thenApply;任务出现异常也会进入 handle,可对异常处理。
handle 和 whenComplete 的区别:handle 可对传入值 T 进行转换,并产生自己的返回结果 R;whenComplete 的返回值和上级任务传入的结果一致,不能转换。
whenComplete、whenCompleteAsync、handle 和 handleAsync 的输入参数一个是正常结果一个是异常结果,而 exceptionally 的输入参数为异常结果。
public static void main(String[] args) throws Exception {
var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
// 制造一个异常
// int value = 1 / 0;
return "supplyAsyncTask";
});
var handle = supplyAsyncTask.handle((s, throwable) -> {
if (Optional.ofNullable(throwable).isPresent()) {
return throwable.getMessage();
}
return new ArrayList() {{
add(s);
}};
});
// supplyAsyncTask异常时,输出1:java.lang.ArithmeticException: / by zero
// 输出2:[supplyAsyncTask]
System.out.println(handle.get());
}
exceptionally
某个任务执行抛出异常时执行的回调方法。抛出异常作为参数,传递到回调方法。仅处理异常情况。如果任务成功完成,那么将被跳过。
public static void main(String[] args) throws Exception {
var supplyAsyncTask = CompletableFuture.supplyAsync(() -> {
double error=1/0;
return "ok";
}, executor).exceptionally((e)->{
// java.lang.ArithmeticException: / by zero
System.out.println(e.getMessage());
return "error";
});
// "error"
System.out.println(supplyAsyncTask.get());
}
complete
如果尚未完成,则将 get()
和相关方法返回的值设置为给定值。如果此调用导致此 CompletableFuture 转换到完成状态,则返回 true,否则返回 false。文档描述:
If not already completed, sets the value returned by get() and related methods to the given value.
Params:
value – the result value
Returns:
true if this invocation caused this CompletableFuture to transition to a completed state, else false
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(15);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return 10;
});
// 若get放在此处,一直等待task1完成,输出10
// System.out.println(task1.get());
// 强制task1完成,输出true
System.out.println(task1.complete(5));
// 输出5
System.out.println(task1.get());
// task1已完成,输出false
System.out.println(task1.complete(50));
}
thenCompose
源码定义
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn);
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) ;
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn, Executor executor) ;
thenCompose 方法会在某个任务执行完成后,将该任务的执行结果作为入参,执行指定的方法。该方法会返回一个新的 CompletableFuture 实例
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> 10);
var task2 = task1.thenCompose(param -> {
System.out.println("this is task2 param=" + param);
return CompletableFuture.supplyAsync(() -> {
System.out.println("this is task2 square");
return Math.pow(param, 2);
});
}).thenApply(param -> {
System.out.println("thenApply get the square=" + param);
return param;
});
var task3 = task1.thenCompose(param -> {
System.out.println("this is task3 param=" + param);
return CompletableFuture.runAsync(() -> {
System.out.println("this is task3 square");
System.out.println(Math.pow(param, 2));
});
});
System.out.println("task2 get=" + task2.get());
System.out.println("task3 get=" + task3.get());
}
输出:
this is task2 param=10
this is task2 square
thenApply get the square=100.0
this is task3 param=10
this is task3 square
100.0
task2 get=100.0
task3 get=null
allOf
静态方法,阻塞等待所有给定的 CompletableFuture 执行结束后,返回一个 CompletableFuture<Void>
结果。所有任务都执行完成后,才执行 allOf 的回调方法。如果任意一个任务异常,执行 get 方法时会抛出异常。
public static void main(String[] args) throws Exception {
var task1 = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "task1";
});
var task2 = CompletableFuture.runAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
int value = 1 / 0;
System.out.println("task2 is over");
});
CompletableFuture.allOf(task1, task2).whenComplete((param, throwable) -> {
// null
System.out.println(param);
}).exceptionally(throwable -> {
// task3 allOf throwable=java.lang.ArithmeticException: / by zero
System.out.println("task3 allOf throwable=" + throwable.getMessage());
return null;
}).get();
}
anyOf
静态方法,阻塞等待任意一个给定的 CompletableFuture 对象执行结束后,返回一个 CompletableFuture<Void>
结果。任意一个任务执行完,就执行 anyOf 的回调方法。如果执行的任务异常,执行 get 方法时会抛出异常。