关于使用CompletableFuture过程中线程等待的问题

关于使用CompletableFuture过程中线程等待的问题

在电商的应用场景中,通过异步多线程获取服务端信息比较常见,如用户打开个人中心查看个人综合信息,可能会展示用户的账户余额、优惠券、积分、消费红包等等信息,这时服务端就会通过异步线程将所需信息汇总后一并返回给用户。如果按单线程逐一返回个人信息,用户等待的时间显然是不能接受的,通过异步多线程的方式大大减少请求的响应时间。

jdk8简化了异步任务的写法,提供了很多异步任务的计算方式。

1、示例(先从一个简单测试示例代码,查看运行的结果)
import lombok.extern.slf4j.Slf4j;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

@Slf4j
public class CompletableFutureApplicationTests {

    public static void main(String[] args) {
        log.info("当前cpu核数:{}", Runtime.getRuntime().availableProcessors());
        Instant start = Instant.now();
        List<CompletableFuture<Integer>> list = new ArrayList<>();
        //模拟任务需要10个调用
        for (int i = 0; i < 10; i++) {
            list.add(CompletableFuture.supplyAsync(() -> info()));
        }

        list.forEach(e -> {
            try {
                log.info("{}", e.get());
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        });

        Instant end = Instant.now();
        log.info("执行任务完成,耗时共:{}ms", Duration.between(start, end).toMillis());

    }

    public static Integer info() {
        //模拟任务耗时
        try {
            Thread.sleep(1000);
            log.info("{}",Thread.currentThread().getName());
        } finally {
            return new Random().nextInt(1000);
        }
    }
}

执行此程序,运行结果如下

09:20:49.603 [main] INFO * - 当前cpu核数:4
09:20:50.672 [ForkJoinPool.commonPool-worker-1] INFO * - ForkJoinPool.commonPool-worker-1
09:20:50.672 [main] INFO * - 750
09:20:50.673 [ForkJoinPool.commonPool-worker-2] INFO * - ForkJoinPool.commonPool-worker-2
09:20:50.673 [ForkJoinPool.commonPool-worker-3] INFO * - ForkJoinPool.commonPool-worker-3
09:20:50.673 [main] INFO * - 941
09:20:50.673 [main] INFO * - 480
09:20:51.672 [ForkJoinPool.commonPool-worker-1] INFO * - ForkJoinPool.commonPool-worker-1
09:20:51.672 [main] INFO * - 939
09:20:51.673 [ForkJoinPool.commonPool-worker-2] INFO * - ForkJoinPool.commonPool-worker-2
09:20:51.673 [ForkJoinPool.commonPool-worker-3] INFO * - ForkJoinPool.commonPool-worker-3
09:20:51.673 [main] INFO * - 722
09:20:51.673 [main] INFO * - 781
09:20:52.673 [ForkJoinPool.commonPool-worker-1] INFO * - ForkJoinPool.commonPool-worker-1
09:20:52.673 [main] INFO * - 868
09:20:52.674 [ForkJoinPool.commonPool-worker-2] INFO * - ForkJoinPool.commonPool-worker-2
09:20:52.674 [main] INFO * - 632
09:20:52.674 [ForkJoinPool.commonPool-worker-3] INFO * - ForkJoinPool.commonPool-worker-3
09:20:52.678 [main] INFO * - 471
09:20:53.673 [ForkJoinPool.commonPool-worker-1] INFO * - ForkJoinPool.commonPool-worker-1
09:20:53.673 [main] INFO * - 498
09:20:53.687 [main] INFO * - 执行任务完成,耗时共:4066ms
2、结果分析(dump线程等待)

可以看到在cpu核心数为4个的机器上运行,其运行结果超过4秒,如果按异步线程运行的话,耗时应该是在1秒多一点才符合预期。

从打印的日志信息中可以看到main主线程的时间在50 51 52 53秒返回,子线程ForkJoinPool.commonPool-worker的最大编号为3,而且是只3个子线程循环执行任务,说明10个任务同时调用时发生了线程等待,导致结果不符合预期。

  • 跟踪源码方法分析

查看supplyAsync方法源码

    public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }

在此主要查看asyncPool参数,看起来像是异步线程池

    private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

看到是一个变量,根据useCommonPool判断创建的方式,继续看useCommonPool参数

private static final boolean useCommonPool =
        (ForkJoinPool.getCommonPoolParallelism() > 1);

继续查看方法getCommonPoolParallelism中的commonParallelism取值,从静态代码段中获取(只贴出关键部分)

        common = java.security.AccessController.doPrivileged
            (new java.security.PrivilegedAction<ForkJoinPool>() {
                public ForkJoinPool run() { return makeCommonPool(); }});
        int par = common.config & SMASK; // report 1 even if threads disabled
        commonParallelism = par > 0 ? par : 1;

通过位操作,获取par的值,继续跟踪参数common.config,再重点看makeCommonPool方法(只贴出关键部分)

        int parallelism = -1;
        try {  // ignore exceptions in accessing/parsing properties
            String pp = System.getProperty
                ("java.util.concurrent.ForkJoinPool.common.parallelism");
        ...
        if (parallelism < 0 && // default 1 less than #cores
            (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
            parallelism = 1;
        if (parallelism > MAX_CAP)
            parallelism = MAX_CAP;

参数parallelism初始赋值为-1,可通过系统参数java.util.concurrent.ForkJoinPool.common.parallelism变更并发数,同时增加了并发数范围的控制,<font color=red>需要特别注意parallelismif条件中赋值,核数大于1赋值后为false</font>。举例:如cpu只有1核的话,通过Runtime.getRuntime().availableProcessors()获取到cpu核心数,计算并发数只为1,按上面的程序如果在1核环境下需要10秒(可以vmware上创建1核心环境安装系统运行查看结果),如cpu核数大于1的话则并发数为cpu核数减1。

3、配置系统参数,设置并发数为10
增加启动参数
-Djava.util.concurrent.ForkJoinPool.common.parallelism=10

运行结果即可观察到可以并行启动10个线程执行任务,总耗时在1秒范围;如果改为9个并发则在2秒的范围。

4、使用线程模式

查看方法CompletableFuture.supplyAsync,可传入参数Executor,如果传入一个线程池最大线程只有一个的线程,则预期结果将在10秒范围,代码如下,可验证结果

public static void main(String[] args) {
        log.info("当前cpu核数:{}", Runtime.getRuntime().availableProcessors());
        Instant start = Instant.now();
        List<CompletableFuture<Integer>> list = new ArrayList<>();
        ExecutorService executor = Executors.newFixedThreadPool(1);
        //模拟任务需要10个调用
        for (int i = 0; i < 10; i++) {
            list.add(CompletableFuture.supplyAsync(() -> info(), executor));
        }

        list.forEach(e -> {
            try {
                log.info("{}", e.get());
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        });

        Instant end = Instant.now();
        log.info("执行任务完成,耗时共:{}ms", Duration.between(start, end).toMillis());
        executor.shutdown();
    }

调整线程数为10,即可以1秒范围内执行完成

5、CompletableFuture常用方法基本使用
方法名 说明
CompletableFuture.runAsync 无返回值运行任务
CompletableFuture.suppliAsync 有返回值运行任务
CompletableFuture.allOf(...).get() 多个任务全部执行完成后才继续后面处理
CompletableFuture.anyOf(...).get() 多个任务任何一个执行完成后即继续后面处理
CompletableFuture.thenApply 接收执行结果,有返回值,可将返回值继续往下传递
CompletableFuture.thenAccept 接收执行结果,无返回值
6、总结

针对不同的场景需要使用不同的线程方式

  • cpu密集型的任务,建议直接使用cpu的核心数创建线程,以避免频繁的线程切换造成线程等待;
  • io密集型的任务,建议使用线程方式,一般io等待时间远远超过线程等待时间;
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345

推荐阅读更多精彩内容