在JDK8中进行多线程调用时,如果想指定一个超时时间,若子线程执行超时则直接熔断处理,该怎么优雅地实现呢?
从Java 8开始引入了CompletableFuture,它针对Future做了改进,可以传入回调对象,当异步任务完成或者发生异常时,自动调用回调对象的回调方法。
但是,在JDK8中没有提供子线程超时处理机制。试着看了下JDK9的代码,果然有了相应的实现,具体可参考JDK9的CompletableFuture类。下面是我将其中的部分代码提取出来实现了一个增强类,可以在JDK8中正常运行。
import java.util.Arrays;
import java.util.concurrent.*;
import java.util.function.BiConsumer;
/**
* CompletableFuture功能增强类
* <p>对jdk8中的CompletableFuture进行功能增强</p>
* <p>参考jdk9</p>
*
* @author wangyu
* @date 2021/9/24 10:51
*/
public class CompletableFutureHelper {
/**
* 单例延时调度程序,仅用于启动和取消任务
*/
static final class Delayer {
static ScheduledFuture<?> delay(Runnable command, long delay,
TimeUnit unit) {
return delayer.schedule(command, delay, unit);
}
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
// 注意,这里使用一个线程就可以搞定 因为这个线程并不真的执行请求 仅仅只是用于启动和取消任务
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new CompletableFutureHelper.Delayer.DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
/**
* 超时时以抛异常的形式结束任务
*/
static final class Timeout implements Runnable {
final CompletableFuture<?> f;
Timeout(CompletableFuture<?> f) {
this.f = f;
}
@Override
public void run() {
if (f != null && !f.isDone()) {
f.completeExceptionally(new TimeoutException());
}
}
}
/**
* 在超时时完成
*/
static final class DelayedCompleter<U> implements Runnable {
final CompletableFuture<U> f;
final U u;
DelayedCompleter(CompletableFuture<U> f, U u) {
this.f = f;
this.u = u;
}
@Override
public void run() {
if (f != null) {
f.complete(u);
}
}
}
/**
* 取消不需要的超时任务
*/
static final class Canceller implements BiConsumer<Object, Throwable> {
final Future<?> f;
Canceller(Future<?> f) {
this.f = f;
}
@Override
public void accept(Object ignore, Throwable ex) {
if (ex == null && f != null && !f.isDone()) {
f.cancel(false);
}
}
}
/**
* 若执行超时则返回默认值,否则返回计算出来的结果值
*
* @param future 源future
* @param value 超时返回的默认值
* @param timeout 超时时间
* @param unit 超时时间单位
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> completeOnTimeout(CompletableFuture<T> future, T value, long timeout, TimeUnit unit) {
if (future == null || unit == null) {
throw new NullPointerException();
}
if (null == future.getNow(null)) {
future.whenComplete(new Canceller(Delayer.delay(new DelayedCompleter<T>(future, value), timeout, unit)));
}
return future;
}
/**
* 若执行超时则抛出异常,否则返回计算出来的结果值
*
* @param future 源future
* @param timeout 超时时间
* @param unit 超时时间单位
* @param <T>
* @return
*/
public static <T> CompletableFuture<T> orTimeout(CompletableFuture<T> future, long timeout, TimeUnit unit) {
if (future == null || unit == null) {
throw new NullPointerException();
}
if (null == future.getNow(null)) {
future.whenComplete(new Canceller(Delayer.delay(new Timeout(future), timeout, unit)));
}
return future;
}
/**
* 包装所有的任务,加上超时处理,超时的任务默认返回null
*
* @param timeout 超时时间
* @param unit 超时时间单位
* @param cfs 任务
* @return CompletableFuture<Void>
* @see CompletableFuture#allOf(CompletableFuture[])
*/
public static CompletableFuture<Void> allOfWithCompleteOnTimeout(long timeout, TimeUnit unit, CompletableFuture<?>... cfs) {
Arrays.stream(cfs).forEach(cf -> cf = completeOnTimeout(cf, null, timeout, unit));
return CompletableFuture.allOf(cfs);
}
}
测试代码如下:
public static void main(String[] args) throws ExecutionException, InterruptedException, IOException {
long start = System.currentTimeMillis();
CompletableFuture future1 = CompletableFuture.supplyAsync(()-> {
sleep(3L);
System.out.printf("future1 [%s] end\n", Thread.currentThread().getName());
return "[data] future1";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(()-> {
sleep(8L);
System.out.printf("future2 [%s] end\n", Thread.currentThread().getName());
return "[data] future2";
});
// CompletableFuture.allOf(future1, future2).join();
System.out.println("main get start...");
//场景1:包装任务,加上超时处理,超时的任务默认返回null
// future1 = CompletableFutureHelper.completeOnTimeout(future1, null, 4, TimeUnit.SECONDS);
// future2 = CompletableFutureHelper.completeOnTimeout(future2, null, 4, TimeUnit.SECONDS);
// CompletableFuture.allOf(future1, future2).join();
//场景2:包装任务,加上超时处理,任一任务超时则抛错TimeoutException
// future1 = CompletableFutureHelper.orTimeout(future1, 4, TimeUnit.SECONDS);
// future2 = CompletableFutureHelper.orTimeout(future2, 4, TimeUnit.SECONDS);
// CompletableFuture.allOf(future1, future2).join();
//此一行代码等价于场景一3行代码,使用更方便;不过场景一中可以分别指定不同的默认返回值,更加灵活
CompletableFutureHelper.allOfWithCompleteOnTimeout(4, TimeUnit.SECONDS, future1, future2).join();
//超时时间设置成7995毫秒,执行时间为8s的线程不一定会超时(本次测试精度大概在10ms左右,7990时,执行8s的任务大概率当做超时处理)
// CompletableFutureHelper.allOfWithCompleteOnTimeout(7995, TimeUnit.MILLISECONDS, future1, future2).join();
System.out.println("[get future1] " + future1.get());
System.out.println("[get future2] " + future2.get());
System.out.println("main get end...");
System.out.printf("main [%s] end cost %d ms\n", Thread.currentThread().getName(), System.currentTimeMillis()-start);
System.in.read();
}
private static void sleep(long seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
测试代码打印内容如下
main get start...
future1 [ForkJoinPool.commonPool-worker-1] end
[get future1] [data] future1
[get future2] null
main get end...
main [main] end cost 4057 ms
future2 [ForkJoinPool.commonPool-worker-2] end