CompletableFuture API
默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰
方法不以
Async
结尾,意味着Action使用相同的线程执行,而Async
可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行),如果以Async
结尾,却又没自定义线程池,则还是使用公共的ForkJoinPool线程池,
1 创建异步任务 API
CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:
- supplyAsync执行CompletableFuture任务,支持返回值。
- runAsync执行CompletableFuture任务,没有返回值。
举个栗子:
public static void main(String[] args) {
//可以自定义线程池
ExecutorService executor = Executors.newCachedThreadPool();
//runAsync的使用
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,为了部落"), executor);
//supplyAsync的使用
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
System.out.print("supplyAsync,为了联盟");
return "哈哈哈哈哈"; }, executor);
//runAsync的future没有返回值,输出null
System.out.println(runFuture.join());
//supplyAsync的future,有返回值
System.out.println(supplyFuture.join());
executor.shutdown(); // 线程池需要关闭
}
//输出
runAsync,为了部落
null
supplyAsync,为了联盟哈哈哈哈哈
2 依赖关系
-
thenRun():不关心上一个任务的执行结果,无传参,无返回值
做完第一个任务后,再做第二个任务,但是前后两个任务没有参数传递,第二个任务也没有返回值
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<Void> future = completableFuture.thenRun(() -> System.out.println("Computation finished.")); future.get();
-
thenApply():依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,有返回值
第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
assertEquals("Hello World", future.get()); -
thenAccept(): 依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,无返回值
第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的
CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<Void> future = completableFuture.thenAccept(s -> System.out.println("Computation returned: " + s)); future.get();
-
thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回
在第一个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的
CompletableFuture
实例CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello"); CompletableFuture<String> future = completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World")); assertEquals("Hello World", future.get());
3 组合关系
- and集合关系
- thenCombine():执行两个独立的任务,并对其结果执行某些操作,有返回值
- thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
- runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务),不会把执行结果当做方法入参,且没有返回值
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
//想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用 thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
- 聚合关系
- applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
- acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
- runAfterEither():不会把执行结果当做方法入参,且没有返回值
applyToEither
/ acceptEither
/ runAfterEither
都表示将两个CompletableFuture
组合起来,只要其中一个执行完了,就会执行某个任务
//第一个异步任务,休眠2秒,保证它执行晚点
CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
try{
Thread.sleep(2000L);
System.out.println("执行完第一个异步任务");}
catch (Exception e){
return "第一个任务异常";
}
return "第一个异步任务";
});
//第二个异步任务
CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
System.out.println("执行完第二个任务");
return "第一个任务还在睡觉,这是第二个任务";}
);
CompletableFuture acceptEither = second.acceptEitherAsync(first, result ->System.out.println(result+"==acceptEither"));
CompletableFuture applyToEither = second.applyToEitherAsync(first,result->{
System.out.println(result+"==applyToEither");
return result;
});
CompletableFuture runAfterEither = second.runAfterEitherAsync(first, () ->System.out.println("hello"));
- 并行执行
- allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf(future1, future2, future3);
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
allOf局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:
String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
- anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture,如果执行的任务异常,
anyOf
的CompletableFuture
,执行get方法,会抛出异常
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello";
}
);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Object> combinedFuture = CompletableFuture.anyOf(future1, future3);
System.out.println(combinedFuture.get());
System.out.println(future1.get());
System.out.println(future3.get());
//结果
World
Hello
World
4 结果处理 异常捕获
- whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,无返回值;并且
whenComplete
方法返回的CompletableFuture
的result是上个任务的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future3 = future1.whenComplete((a, throwable) -> {
System.out.println("上个任务执行完啦,还把" + a + "传过来");
});
System.out.println(future3.get());
- whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,有返回值;并且
whenComplete
方法返回的CompletableFuture
的result是回调方法的结果。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future3 = future1.handle((a, throwable) -> {
System.out.println("上个任务执行完啦,还把" + a + "传过来");
return "world";
});
System.out.println(future3.get());
- exceptionally:某个任务执行异常时,执行的回调方法,并且会把抛出的异常作为参数,传递到回调方法
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
()->{
System.out.println("当前线程名称:" + Thread.currentThread().getName());
throw new RuntimeException();
}
);
CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
e.printStackTrace();
return "歪歪歪?你的程序异常啦";
});
System.out.println(exceptionFuture.get());
5 超时处理
JDK 8 版本的CompletableFuture 没有timeout机制,timeout机制是指,如果forkjoin-pool(或者自定义线程池)中一个线程在规定时间内没有返回,那么就结束掉,而不是继续执行直到获取结果,比如main线程200ms内返回,但forkjoin-pool(或者自定义线程池)中某个执行线程执行400ms才返回,而其返回值根本没有被使用到。
实现方案:启动一个 ScheduledThreadpoolExecutor
线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException())
,然后用 acceptEither()
或者 applyToEither
看是先计算完成还是先超时:
public class FutureUtil {
/**
* cpu 核心数
*/
private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();
// 最大超时时间
private static final int TIMEOUT_VALUE = 1500;
// 时间单位
private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
/**
* Singleton delay scheduler, used only for starting and * cancelling tasks.
*/
public static final class Delayer {
static final ScheduledThreadPoolExecutor delayer;
/**
* 异常线程,不做请求处理,只抛出异常
*/
static {
delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
delayer.setRemoveOnCancelPolicy(true);
}
static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureScheduler");
return t;
}
}
}
/**
* 根据服务器cpu自定义线程池
*/
private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
AVALIABLE_PROCESSORS,
3 * AVALIABLE_PROCESSORS,
3,
TimeUnit.SECONDS,
new LinkedBlockingDeque<>(20),
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 有返回值的异步
* @param supplier
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
}
/**
* 有返回值的异步 - 可设置超时时间
* @param timeout
* @param unit
* @param supplier
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}
/**
* 无返回值的异步
* @param runnable
* @return
*/
public static CompletableFuture runAsync(Runnable runnable){
return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
}
/**
* 无返回值的异步 - 可设置超时时间
* @param runnable
* @return
*/
public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
return CompletableFuture.runAsync(runnable,threadPoolExecutor)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}
/**
* 统一处理异步结果
* @param futures
* @return
*/
public static CompletableFuture allOf(CompletableFuture... futures){
return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
}
/**
* 统一处理异步结果 - 可设置超时时间
* @param futures
* @return
*/
public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
return CompletableFuture.allOf(futures)
.applyToEither(timeoutAfter(timeout,unit), Function.identity())
.exceptionally(throwable -> {
throwable.printStackTrace();
log.error(throwable.getMessage());
return null;
});
}
/**
* 异步超时处理
* @param timeout
* @param unit
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<T>();
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
return result;
}
public static <T> CompletableFuture<T> timeoutAfter() {
CompletableFuture<T> result = new CompletableFuture<T>();
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
return result;
}
}
使用demo
CompletableFuture<String> future1 = FutureUtil.supplyAsync(10,TimeUnit.MILLISECONDS,() -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "Hello";
});
CompletableFuture<String> future3 = future1.handle((a, throwable) -> {
System.out.println("上个任务执行完啦,还把" + a + "传过来");
return "world";
});
System.out.println(future3.get());
在 JDK 9,CompletableFuture
正式提供了 orTimeout
、completeTimeout
方法,来准确实现异步超时控制。实现原理跟上面是一样的。
6 线程阻塞问题
要合理治理线程资源,最基本的前提条件就是要在写代码时,清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下CompletableFuture的执行线程情况。
CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。
同步方法(即不带Async后缀的方法)有两种情况。
- 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
- 如果注册时被依赖的操作还未执行完,则由回调线程执行。
异步方法(即带Async后缀的方法):
- 可以选择是否传递线程池参数Executor运行在指定线程池中;
- 当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。
例如:
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
//业务操作
return "";
}, threadPool1);
//此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
future1.thenApply(value -> {
System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
return value + "1";
});
//使用ForkJoinPool中的共用线程池CommonPool
future1.thenApplyAsync(value -> {
//do something
return value + "1";
});
//使用指定线程池
future1.thenApplyAsync(value -> {
//do something
return value + "1";
}, threadPool1);
7 线程池死锁问题
前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。
当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。
线程池循环引用会导致死锁
public Object doGet() {
ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
//do sth
return CompletableFuture.supplyAsync(() -> {
System.out.println("child");
return "child";
}, threadPool1).join();//子任务
}, threadPool1);
return cf1.join();
}
如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。
为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。
8 异步RPC调用注意不要阻塞IO线程池
服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。