Java异步任务编排—CompletableFuture(二)

CompletableFuture API

  • 默认情况下CompletableFuture会使用公共的ForkJoinPool线程池,这个线程池默认创建的线程数是 CPU 的核数(也可以通过 JVM option:-Djava.util.concurrent.ForkJoinPool.common.parallelism 来设置ForkJoinPool线程池的线程数)。如果所有CompletableFuture共享一个线程池,那么一旦有任务执行一些很慢的 I/O 操作,就会导致线程池中所有线程都阻塞在 I/O 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

  • 方法不以Async结尾,意味着Action使用相同的线程执行,而Async可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行),如果以Async结尾,却又没自定义线程池,则还是使用公共的ForkJoinPool线程池,

1 创建异步任务 API

CompletableFuture创建异步任务,一般有supplyAsync和runAsync两个方法:

  • supplyAsync执行CompletableFuture任务,支持返回值。
  • runAsync执行CompletableFuture任务,没有返回值。

举个栗子:

public static void main(String[] args) {
    //可以自定义线程池
    ExecutorService executor = Executors.newCachedThreadPool();
    //runAsync的使用
    CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> System.out.println("runAsync,为了部落"), executor);
    //supplyAsync的使用
    CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
                System.out.print("supplyAsync,为了联盟");
                return "哈哈哈哈哈"; }, executor);
    //runAsync的future没有返回值,输出null
    System.out.println(runFuture.join());
    //supplyAsync的future,有返回值
    System.out.println(supplyFuture.join());
    executor.shutdown(); // 线程池需要关闭
}



//输出
runAsync,为了部落
null
supplyAsync,为了联盟哈哈哈哈哈

2 依赖关系

  • thenRun():不关心上一个任务的执行结果,无传参,无返回值

    做完第一个任务后,再做第二个任务,但是前后两个任务没有参数传递,第二个任务也没有返回值

    CompletableFuture<String> completableFuture   = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<Void> future = completableFuture.thenRun(() -> System.out.println("Computation finished."));
     
    future.get();
    
  • thenApply():依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,有返回值

      第一个任务执行完成后,执行第二个回调方法任务,会将第一个任务的执行结果,作为入参,传递到回调方法中,并且回调方法是有返回值的。
    

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future = completableFuture.thenApply(s -> s + " World");
    assertEquals("Hello World", future.get());

  • thenAccept(): 依赖上一个任务的结果,把前面任务的执行结果,交给后面的Function,无返回值

    第一个任务执行完成后,执行第二个回调方法任务,会将该任务的执行结果,作为入参,传递到回调方法中,但是回调方法是没有返回值的

    CompletableFuture<String> completableFuture= CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<Void> future = completableFuture.thenAccept(s -> System.out.println("Computation returned: " + s));
     
    future.get();
    
  • thenCompose():用来连接两个有依赖关系的任务,结果由第二个任务返回

    在第一个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future   = completableFuture.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
     
    assertEquals("Hello World", future.get());
    

3 组合关系

  • and集合关系
    • thenCombine():执行两个独立的任务,并对其结果执行某些操作,有返回值
    • thenAccepetBoth():两个任务执行完成后,将结果交给thenAccepetBoth处理,无返回值
    • runAfterBoth():两个任务都执行完成后,执行下一步操作(Runnable类型任务),不会把执行结果当做方法入参,且没有返回值
CompletableFuture<String> cfA = CompletableFuture.supplyAsync(() -> "resultA");
CompletableFuture<String> cfB = CompletableFuture.supplyAsync(() -> "resultB");
//想要使用两个Future结果时,但不需要将任何结果值进行返回时,可以用 thenAcceptBoth,它表示后续的处理不需要返回值,而 thenCombine 表示需要返回值
cfA.thenCombine(cfB, (resultA, resultB) -> "result A + B");
cfA.thenAcceptBoth(cfB, (resultA, resultB) -> {});
  • 聚合关系
    • applyToEither():两个任务哪个执行的快,就使用哪一个结果,有返回值
    • acceptEither():两个任务哪个执行的快,就消费哪一个结果,无返回值
    • runAfterEither():不会把执行结果当做方法入参,且没有返回值

applyToEither / acceptEither / runAfterEither 都表示将两个CompletableFuture组合起来,只要其中一个执行完了,就会执行某个任务

//第一个异步任务,休眠2秒,保证它执行晚点
        CompletableFuture<String> first = CompletableFuture.supplyAsync(()->{
            try{
                Thread.sleep(2000L);
                System.out.println("执行完第一个异步任务");}
            catch (Exception e){
                return "第一个任务异常";
            }
            return "第一个异步任务";
        });

        //第二个异步任务
        CompletableFuture<String> second = CompletableFuture.supplyAsync(() -> {
                            System.out.println("执行完第二个任务");
                            return "第一个任务还在睡觉,这是第二个任务";}
                        );
                
        CompletableFuture acceptEither =  second.acceptEitherAsync(first, result ->System.out.println(result+"==acceptEither"));
        CompletableFuture applyToEither = second.applyToEitherAsync(first,result->{
            System.out.println(result+"==applyToEither");
            return result;
        });
        CompletableFuture runAfterEither =  second.runAfterEitherAsync(first, () ->System.out.println("hello"));
  • 并行执行
    • allOf():当所有给定的 CompletableFuture 完成时,返回一个新的 CompletableFuture
CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2   = CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
 
CompletableFuture<Void> combinedFuture  = CompletableFuture.allOf(future1, future2, future3);
 
combinedFuture.get();
 
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());

allOf局限性在于它不会返回所有任务的综合结果。相反,你必须手动从Futures获取结果。幸运的是,CompletableFuture.join()方法和Java 8 Streams API可以解决:

String combined = Stream.of(future1, future2, future3).map(CompletableFuture::join).collect(Collectors.joining(" "));
 
assertEquals("Hello Beautiful World", combined);
  • anyOf():当任何一个给定的CompletablFuture完成时,返回一个新的CompletableFuture,如果执行的任务异常,anyOfCompletableFuture,执行get方法,会抛出异常
 CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello";
            }
        );

        CompletableFuture<String> future3   = CompletableFuture.supplyAsync(() -> "World");
        CompletableFuture<Object> combinedFuture  = CompletableFuture.anyOf(future1, future3);

        System.out.println(combinedFuture.get());

        System.out.println(future1.get());
        System.out.println(future3.get());
        
        //结果
        World
        Hello
        World

4 结果处理 异常捕获

  • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,无返回值;并且whenComplete方法返回的CompletableFutureresult是上个任务的结果
     CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
     CompletableFuture<String> future3   = future1.whenComplete((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
        });

    System.out.println(future3.get());
  • whenComplete:当任务完成时,将使用结果(或 null)和此阶段的异常(或 null如果没有)执行给定操作,有返回值;并且whenComplete方法返回的CompletableFutureresult是回调方法的结果
    CompletableFuture<String> future1    = CompletableFuture.supplyAsync(() -> "Hello");
    CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "world";
        });

   System.out.println(future3.get());
  • exceptionally:某个任务执行异常时,执行的回调方法,并且会把抛出的异常作为参数,传递到回调方法
CompletableFuture<String> orgFuture = CompletableFuture.supplyAsync(
                ()->{
                    System.out.println("当前线程名称:" + Thread.currentThread().getName());
                    throw new RuntimeException();
                }
        );

        CompletableFuture<String> exceptionFuture = orgFuture.exceptionally((e) -> {
            e.printStackTrace();
            return "歪歪歪?你的程序异常啦";
        });

        System.out.println(exceptionFuture.get());

5 超时处理

JDK 8 版本的CompletableFuture 没有timeout机制,timeout机制是指,如果forkjoin-pool(或者自定义线程池)中一个线程在规定时间内没有返回,那么就结束掉,而不是继续执行直到获取结果,比如main线程200ms内返回,但forkjoin-pool(或者自定义线程池)中某个执行线程执行400ms才返回,而其返回值根本没有被使用到。

实现方案:启动一个 ScheduledThreadpoolExecutor 线程在 timeout 时间后直接调用 CompletableFuture.completeExceptionally(new TimeoutException()),然后用 acceptEither() 或者 applyToEither 看是先计算完成还是先超时:

public class FutureUtil {

    /**
     * cpu 核心数
     */
    private static final int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

    // 最大超时时间
    private static final int TIMEOUT_VALUE = 1500;
    // 时间单位
    private static final TimeUnit TIMEOUT_UNIT = TimeUnit.MILLISECONDS;


    /**
     * Singleton delay scheduler, used only for starting and * cancelling tasks.
     */
    public static final class Delayer {

        static final ScheduledThreadPoolExecutor delayer;

        /**
         * 异常线程,不做请求处理,只抛出异常
         */
        static {
            delayer = new ScheduledThreadPoolExecutor(1, new DaemonThreadFactory());
            delayer.setRemoveOnCancelPolicy(true);
        }

        static ScheduledFuture<?> delay(Runnable command, long delay, TimeUnit unit) {
            return delayer.schedule(command, delay, unit);
        }

        static final class DaemonThreadFactory implements ThreadFactory {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                t.setName("CompletableFutureScheduler");
                return t;
            }
        }
    }

    /**
     * 根据服务器cpu自定义线程池
     */
    private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            AVALIABLE_PROCESSORS,
            3 * AVALIABLE_PROCESSORS,
            3,
            TimeUnit.SECONDS,
            new LinkedBlockingDeque<>(20),
            new ThreadPoolExecutor.CallerRunsPolicy()
    );

    /**
     * 有返回值的异步
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(Supplier<T> supplier){
        return supplyAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,supplier);
    }

    /**
     * 有返回值的异步 - 可设置超时时间
     * @param timeout
     * @param unit
     * @param supplier
     * @param <T>
     * @return
     */
    public static  <T> CompletableFuture<T> supplyAsync(long timeout, TimeUnit unit,Supplier<T> supplier){
        return CompletableFuture.supplyAsync(supplier, threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 无返回值的异步
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(Runnable runnable){
        return runAsync(TIMEOUT_VALUE,TIMEOUT_UNIT,runnable);
    }

    /**
     * 无返回值的异步 - 可设置超时时间
     * @param runnable
     * @return
     */
    public static CompletableFuture runAsync(long timeout, TimeUnit unit,Runnable runnable){
        return CompletableFuture.runAsync(runnable,threadPoolExecutor)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 统一处理异步结果
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(CompletableFuture... futures){
        return allOf(TIMEOUT_VALUE,TIMEOUT_UNIT,futures);
    }

    /**
     * 统一处理异步结果 - 可设置超时时间
     * @param futures
     * @return
     */
    public static CompletableFuture allOf(long timeout, TimeUnit unit,CompletableFuture... futures){
        return CompletableFuture.allOf(futures)
                .applyToEither(timeoutAfter(timeout,unit), Function.identity())
                .exceptionally(throwable -> {
                    throwable.printStackTrace();
                    log.error(throwable.getMessage());
                    return null;
                });
    }

    /**
     * 异步超时处理
     * @param timeout
     * @param unit
     * @param <T>
     * @return
     */
    public static <T> CompletableFuture<T> timeoutAfter(long timeout, TimeUnit unit) {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), timeout, unit);
        return result;
    }

    public static <T> CompletableFuture<T> timeoutAfter() {
        CompletableFuture<T> result = new CompletableFuture<T>();
        // timeout 时间后 抛出TimeoutException 类似于sentinel / watcher
        Delayer.delayer.schedule(() -> result.completeExceptionally(new TimeoutException()), TIMEOUT_VALUE, TIMEOUT_UNIT);
        return result;
    }

}

使用demo

 CompletableFuture<String> future1    = FutureUtil.supplyAsync(10,TimeUnit.MILLISECONDS,() -> {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "Hello";
        });

        CompletableFuture<String> future3   = future1.handle((a, throwable) -> {
            System.out.println("上个任务执行完啦,还把" + a + "传过来");
            return "world";
        });

        System.out.println(future3.get());
image.png

在 JDK 9,CompletableFuture 正式提供了 orTimeoutcompleteTimeout 方法,来准确实现异步超时控制。实现原理跟上面是一样的。

6 线程阻塞问题

要合理治理线程资源,最基本的前提条件就是要在写代码时,清楚地知道每一行代码都将执行在哪个线程上。下面我们看一下CompletableFuture的执行线程情况。

CompletableFuture实现了CompletionStage接口,通过丰富的回调方法,支持各种组合操作,每种组合场景都有同步和异步两种方法。

同步方法(即不带Async后缀的方法)有两种情况。

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。
  • 如果注册时被依赖的操作还未执行完,则由回调线程执行。

异步方法(即带Async后缀的方法):

  • 可以选择是否传递线程池参数Executor运行在指定线程池中;
  • 当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈)。

例如:

ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    System.out.println("supplyAsync 执行线程:" + Thread.currentThread().getName());
    //业务操作
    return "";
}, threadPool1);
//此时,如果future1中的业务操作已经执行完毕并返回,则该thenApply直接由当前main线程执行;否则,将会由执行以上业务操作的threadPool1中的线程执行。
future1.thenApply(value -> {
    System.out.println("thenApply 执行线程:" + Thread.currentThread().getName());
    return value + "1";
});
//使用ForkJoinPool中的共用线程池CommonPool
future1.thenApplyAsync(value -> {
//do something
  return value + "1";
});
//使用指定线程池
future1.thenApplyAsync(value -> {
//do something
  return value + "1";
}, threadPool1);

7 线程池死锁问题

前面提到,异步回调方法可以选择是否传递线程池参数Executor,这里我们建议强制传线程池,且根据实际情况做线程池隔离。

当不传递线程池时,会使用ForkJoinPool中的公共线程池CommonPool,这里所有调用将共用该线程池,核心线程数=处理器数量-1(单核核心线程数为1),所有异步回调都会共用该CommonPool,核心与非核心业务都竞争同一个池中的线程,很容易成为系统瓶颈。手动传递线程池参数可以更方便的调节参数,并且可以给不同的业务分配不同的线程池,以求资源隔离,减少不同业务之间的相互干扰。

线程池循环引用会导致死锁

public Object doGet() {
  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100));
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  //do sth
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
    }, threadPool1);
  return cf1.join();
}

如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

为了修复该问题,需要将父任务与子任务做线程池隔离,两个任务请求不同的线程池,避免循环依赖导致的阻塞。

8 异步RPC调用注意不要阻塞IO线程池

服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,053评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,527评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,779评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,685评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,699评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,609评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,989评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,654评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,890评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,634评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,716评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,394评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,976评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,950评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,191评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,849评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,458评论 2 342

推荐阅读更多精彩内容