思路
多个并行任务,执行的时候,最后一个任务执行后开始执行 (可以自己实现,建议不要,容易出问题)后面出具 reactor版的方案
参数说明:
size 并行个数
ChildTask<T> childTask 子任务
EndTask<T> endTask 合并结束后执行的任务
int timeout, 超时时间
Executor multiThreadExecutor 子任务执行线程
Executor complateThreadExecutor 任务结束后执行线程
AsyncThreadSwitchListener asyncThreadSwitchListene 线程切换参数
代码部分
public static <T> void disassemblyTasks(int size, ChildTask<T> childTask, EndTask<T> endTask, int timeout, Executor multiThreadExecutor,Executor complateThreadExecutor, AsyncThreadSwitchListener asyncThreadSwitchListener) {
asyncThreadSwitchListener.hold();
CompletableFuture<T>[] completableFutures = new CompletableFuture[size];
//执行子任务
for (int i = 0; i < size; i++) {
int finalI = i;
completableFutures[i] = new CompletableFuture<>();
multiThreadExecutor.execute(() -> {
asyncThreadSwitchListener.cover();
try {
childTask.run(completableFutures[finalI], finalI);
} finally {
asyncThreadSwitchListener.clear();
}
});
}
CompletableFuture<Void> voidCompletableFuture = CompletableFuture
.allOf(completableFutures)
.whenCompleteAsync((unused, throwable) -> {
asyncThreadSwitchListener.cover();
try {
endTask.run(completableFutures, throwable);
} finally {
asyncThreadSwitchListener.clear();
}
}, complateThreadExecutor);
//超时控制
CompletableFutureUtil.within(voidCompletableFuture, timeout, TimeUnit.MILLISECONDS);
}
/**
* <h1>同步场景会失效</h1>
* 线程切换回调函数
* 线程切换上下文通过这个进行切换
*/
public interface AsyncThreadSwitchListener {
AsyncThreadSwitchListener ASYNC_THREAD_SWITCH_LISTENR = new AsyncThreadSwitchListener() {
@Override
public void hold() {
}
@Override
public void clear() {
}
@Override
public void cover() {
}
};
void hold();
void clear();
void cover();
}
import java.util.concurrent.CompletableFuture;
public interface ChildTask<T> {
/**
* @description 子任务执行
* @author xinjiu
*/
void run(CompletableFuture<T> completableFuture, int number);
}
import java.util.concurrent.CompletableFuture;
public interface EndTask<T> {
//结束任务
void run(CompletableFuture<T>[] completableFutures, Throwable throwable);
}
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
/*
* 通用的异步处理工具 超时工具
* */
public class CompletableFutureUtil {
public static <T> void within(CompletableFuture<T> future, long timeout, TimeUnit unit) {
final CompletableFuture<T> timeoutFuture = timeoutAfter(timeout, unit);
// 哪个先完成 就apply哪一个结果 这是一个关键的API
future.applyToEitherAsync(timeoutFuture, Function.identity());
}
public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
CompletableFuture<T> result = new CompletableFuture<>();
// timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException("MultiTask timeOut :"+timeout)), timeout, unit);
return result;
}
/**
* Singleton delay scheduler, used only for starting and * cancelling tasks.
*/
static final class Delayer {
static final class DaemonThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("CompletableFutureDelayScheduler");
return t;
}
}
static final ScheduledThreadPoolExecutor delayer;
// 注意,这里使用一个线程就可以搞定 因为这个线程并不真的执行请求 而是仅仅抛出一个异常
static {
(delayer = new ScheduledThreadPoolExecutor(
1, new DaemonThreadFactory())).
setRemoveOnCancelPolicy(true);
}
}
}