CompletableFuture避坑1——需要自定义线程池
CompletableFuture避坑2——allOf()超时时间不合理的后果
CompletableFuture避坑3——线程池的DiscardPolicy()导致整个程序卡死
1. 限制IO密集型任务的性能
CompletableFuture默认使用的线程池是 ForkJoinPool.commonPool(),commonPool是当前 JVM(进程) 上的所有 CompletableFuture、并行 Stream 共享的,commonPool 的目标场景是非阻塞的 CPU 密集型任务,其线程数默认为 CPU 数量减1,所以对于我们用java常做的IO密集型任务,默认线程池是远远不够使用的;在双核及以下机器上,默认线程池又会退化为为每个任务创建一个线程,相当于没有线程池。
以runAsync的代码举例,不指定线程池时,使用的是ASYNC_POOL
,而这个ASYNC_POOL
的大小,是根据 CPU 核数计算出来的(COMMON_PARALLELISM
)如果COMMON_PARALLELISM
小于1,USE_COMMON_POOL
为false(此时ForkJoinPool.commonPool()不支持并发),直接退化为 ThreadPerTaskExecutor,每个任务新开一个线程。
下面是部分代码及注释。
// 这段用来计算ForkJoinPool.commonPool()的线程池大小的
static {
try {
MethodHandles.Lookup l = MethodHandles.lookup();
CTL = l.findVarHandle(ForkJoinPool.class, "ctl", long.class);
MODE = l.findVarHandle(ForkJoinPool.class, "mode", int.class);
QA = MethodHandles.arrayElementVarHandle(ForkJoinTask[].class);
} catch (ReflectiveOperationException e) {
throw new ExceptionInInitializerError(e);
}
Class<?> ensureLoaded = LockSupport.class;
int commonMaxSpares = DEFAULT_COMMON_MAX_SPARES;
try {
String p = System.getProperty
("java.util.concurrent.ForkJoinPool.common.maximumSpares");
if (p != null)
commonMaxSpares = Integer.parseInt(p);
} catch (Exception ignore) {
}
COMMON_MAX_SPARES = commonMaxSpares;
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
modifyThreadPermission = new RuntimePermission("modifyThread");
common = AccessController.doPrivileged(new PrivilegedAction<>() {
public ForkJoinPool run() {
return new ForkJoinPool((byte) 0);
}
});
COMMON_PARALLELISM = Math.max(common.mode & SMASK, 1);
}
public static int getCommonPoolParallelism() {
return commonParallelism;
}
// ForkJoinPool.commonPool()线程池大小为1或0,就不使用ForkJoinPool.commonPool()了
private static final boolean USE_COMMON_POOL =
(ForkJoinPool.getCommonPoolParallelism() > 1);
// 为每个任务开一个线程的线程工厂
static final class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
private static final Executor ASYNC_POOL = USE_COMMON_POOL ?
ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
public static CompletableFuture<Void> runAsync(Runnable runnable) {
return asyncRunStage(ASYNC_POOL, runnable);
}
2. 子线程中不继承当前的类加载器
参考这个:https://zhuanlan.zhihu.com/p/339203275,作者已经写得很清楚了,我就不重复了。