CompletableFuture避坑1——需要自定义线程池
CompletableFuture避坑2——allOf()超时时间不合理的后果
CompletableFuture避坑3——线程池的DiscardPolicy()导致整个程序卡死
运行下面这段程序
import com.alibaba.fastjson.JSONObject;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ThreadPoolTest {
private static final Logger log = LoggerFactory.getLogger(ThreadPoolTest.class);
public static void main(String[] args) {
List<CompletableFuture> futures = new ArrayList<>(3);
CompletableFuture first = CompletableFuture.runAsync(() -> {
try {
Thread.sleep(5000);
System.out.println("first");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).exceptionally(e -> {
e.printStackTrace();
return null;
});
futures.add(first);
Long startTime = System.currentTimeMillis();
log.info("开始等待所有线程结束,时间:{}", startTime);
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).orTimeout(2000, TimeUnit.MILLISECONDS).join();
} catch (Exception e) {
log.error("等待所有线程结束发生异常:", e);
} finally {
Long endTime = System.currentTimeMillis();
log.info("等待结束,时间:{}, 耗时:{}秒", startTime, (endTime-startTime)/1000);
try {
log.info(JSONObject.toJSONString(first));
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
输出
10:57:59.612 [main] INFO com.example.demo.ThreadPoolTest2 - 开始等待所有线程结束,时间:1627268279609
10:58:01.642 [main] ERROR com.example.demo.ThreadPoolTest2 - 等待所有线程结束发生异常:
java.util.concurrent.CompletionException: java.util.concurrent.TimeoutException
at java.base/java.util.concurrent.CompletableFuture.reportJoin(CompletableFuture.java:412)
at java.base/java.util.concurrent.CompletableFuture.join(CompletableFuture.java:2044)
at com.example.demo.ThreadPoolTest2.main(ThreadPoolTest2.java:46)
Caused by: java.util.concurrent.TimeoutException: null
at java.base/java.util.concurrent.CompletableFuture$Timeout.run(CompletableFuture.java:2792)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
10:58:01.643 [main] INFO com.example.demo.ThreadPoolTest2 - 等待结束,时间:1627268279609, 耗时:2秒
10:58:01.726 [main] INFO com.example.demo.ThreadPoolTest2 - {"cancelled":false,"completedExceptionally":false,"done":false,"numberOfDependents":2}
10:58:04.309 [HomePageCardFeatureQueryThreadPool-0] INFO com.example.demo.ThreadPoolTest2 - 线程first结束
可以看到,设置allof的等待时间(2s)比线程实际执行时间(5s)短,等待超时结束时,线程并没有被杀死,也没有被取消,也没有抛异常,这个线程继续运行着,直到正常结束。
这会有什么问题呢?
如果接口中存在这种代码,大量请求超时返回,但是实际子线程还在运行,接口又接收更多的请求,继续创建子线程(没有使用线程池)或者进入线程池排队,前者会导致 OOM,后者则会加剧接口超时(任务都在排队,等不到执行)导致接口乃至整个服务完全不可用。
实际代码中我们有很多io操作,一般都会设置超时时间(不设置的后果很严重),如果子线程中调用了这些方法,那么allof()方法的超时时间一定要大于最大的超时时间(如果有串行操作,还需要累加)。