CompletableFuture
简介
扩展 Future
功能,通过 CompletionStage
提供函数式编程的能力,简化异步编程
基本用法
-
CompletableFuture
,实现了Future<T>
,CompletionStage<T>
两个接口。 -
Executor
参数可以手动指定线程池,否则默认ForkJoinPool.commonPool()
系统级公共线程池。这些线程都是守护线程,主线程结束守护线程不结束,只有JVM关闭时,生命周期终止。 - 以 Async 结尾的方法都是异步执行,即到线程池取个新线程执行
创建
-
supplyAsync
创建有返回值的任务 参数Supplier
-
runAsync
建没有返回值的任务 参数Runnable
-
completedFuture
创建已完成的任务 参数为值
消费
-
whenComplete
whenCompleteAsync
消费任务结果 参数BiConsumer
-
exceptionally
消费任务异常 参数Function
-
handle
handleAsync
消费任务结果和异常 参数BiFunction
变换
thenApply
thenApplyAsync
有入参有返回值 参数 FunctionthenAccept
thenAcceptAsync
有入参无返回值 参数Consumer
thenRun
thenRunAsync
无入参无返回值 参数Runnable
thenCompose
thenComposeAsync
构成新任务 参数Function
组合
thenCombine
thenCombineAsync
双线有入参有返回值 参数CompletionStage
BiFunction
thenAcceptBoth
thenAcceptBothAsync
双线有入参无返回值 参数CompletionStage
BiConsumer
runAfterBoth
runAfterBothAsync
双线无入参无返回值 参数CompletionStage
Runnable
applyToEither
applyToEitherAsync
二选快有入参有返回值 参数CompletionStage
Function
acceptEither
acceptEitherAsync
二选快有入参无返回值 参数CompletionStage
Consumer
runAfterEither
runAfterEitherAsync
二选快无入参无返回值 参数CompletionStage
Runnable
群体
-
anyOf
只要有完成即触发有返回值 参数CompletableFuture...
-
allOf
全部完成才触发无返回值,可接收异常 参数CompletableFuture...
fun testAnyOfAndAllOf() {
val f1: CompletableFuture<String> = CompletableFuture.supplyAsync {
TimeUnit.SECONDS.sleep(2)
return@supplyAsync Thread.currentThread().name
}.whenComplete { t, u ->
println("t1: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}
val f2: CompletableFuture<String> = CompletableFuture.supplyAsync {
TimeUnit.SECONDS.sleep(1)
return@supplyAsync Thread.currentThread().name
}.whenCompleteAsync { t, u ->
println("t2: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}
CompletableFuture.anyOf(f1, f2)
.whenComplete { t, u ->
println("anyOf: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}
CompletableFuture.allOf(f1, f2)
.whenComplete { t, u ->
println("allOf: ${System.currentTimeMillis()}: ${Thread.currentThread().name}: $t $u")
}.join()
}
t2: 1550851312825: ForkJoinPool.commonPool-worker-2: ForkJoinPool.commonPool-worker-2 null
anyOf: 1550851312825: ForkJoinPool.commonPool-worker-2: ForkJoinPool.commonPool-worker-2 null
t1: 1550851313822: ForkJoinPool.commonPool-worker-1: ForkJoinPool.commonPool-worker-1 null
allOf: 1550851313822: ForkJoinPool.commonPool-worker-1: null null
完成
-
complete
completeExceptionally
cancel
触发完成 -
isDone
isCompletedExceptionally
isCancelled
判断任务是否已经结束
获取结果
-
get
阻塞等待,获取任务结果,会抛出ExecutionException
InterruptedException
CancellationException
-
join
阻塞等待,获取任务结果,异常CompletionException
CancellationException
-
getNow
立即取到结果,任务未结束则返回传入参数,异常CompletionException
CancellationException
切换线程
使用 Executor
切换执行任务线程,例如在 Android 中在主线程和子线程切换
fun testExecutor() {
CompletableFuture.supplyAsync(Supplier {
TimeUnit.SECONDS.sleep(2)
println("t1: ${Thread.currentThread().name}") // t1: pool-1-thread-1
return@Supplier "t1"
}, AppExecutors.io).thenApplyAsync(Function<String, String> {
println("t2: ${Thread.currentThread().name}") // t2: main
return@Function "$it.t2"
}, AppExecutors.main).thenApplyAsync(Function<String, String> {
println("t3: ${Thread.currentThread().name}") // t3: pool-2-thread-1
return@Function "$it.t3"
}, AppExecutors.single).whenCompleteAsync(BiConsumer { t, u ->
println("t4: ${Thread.currentThread().name}: $t $u") // t4: main: t1.t2.t3 null
}, AppExecutors.main)
}
object AppExecutors {
val io: ExecutorService by lazy {
Executors.newCachedThreadPool()
}
val compute: ExecutorService by lazy {
val threadCount = 3
Executors.newFixedThreadPool(threadCount)
}
val single: ExecutorService by lazy {
Executors.newSingleThreadExecutor()
}
val main: Executor by lazy {
MainThreadExecutor()
}
private class MainThreadExecutor : Executor {
private val mainThreadHandler: Handler = Handler(Looper.getMainLooper())
override fun execute(command: Runnable?) {
mainThreadHandler.post(command);
}
}
}
参考
CompletableFuture
Java8新特性整理之CompletableFuture:组合式、异步编程(七)
CompletableFuture 使用详解
个人愚见:分布式框架中使用CompletableFuture提高效率
Java并发编程系列一:Future和CompletableFuture解析与使用