ThreadPoolExecutor + CountDownLatch 实际应用

记录下我自己工作中遇到的一个实际问题。

场景

遍历一个参数集合,并请求第三方API拉取数据,由于总数据量过大(过亿/天),所以运行时长接近30小时,不能满足业务方需求。为此,我使用线程池改造成并发拉取来提升拉取速度。

目标

1.多线程并发拉取,但是考虑到第三方有qps限制,不能一味增加线程数,所以要设定最大并发上限。
2.任务结束后记录拉取总数。

实现

多线程的介绍可以参见大神的博文 Java并发编程:线程池的使用.。

为了实现始终以不超过最大并发数目的线程数并发执行:
1、可以用线程池封装好的Executors.newFixedThreadPool(corePoolSize);
2、自定义:可以设置核心线程数corePoolSize为我期望的最大并发数,并且设置队列大小和maximumPoolSize不小于我的总任务数(直接设置为总任务数即可)。这很重要,根据线程池的特性,这里直接引用作者原话:

如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;

如果队列长度设置小了,很容易满员,此时如果提交的任务数未超过maximumPoolSize,则线程池会继续创建新的线程,这就导致线程数超过我期望的最大并发数,因此队列大小必须大于等于我的总任务数。

还有,如果maximumPoolSize小于总任务数,可能会抛异常,原话:

如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;

所以maximumPoolSize必须大于等于我的总任务数。

为了实现任务结束后记录总数,可以借助并发包工具类CountDownLatch,因为多线程是异步的,CountDownLatch可以阻塞当前线程,直到所有子线程完成。

下面是一个例子

package com.yzy.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class Main1 {
    public static void main(String[] args) throws InterruptedException {

        // 线程安全的计数器
        AtomicInteger totalRows = new AtomicInteger(0);

        // 创建线程池,其中核心线程10,也是我期望的最大并发数,最大线程数和队列大小都为30,即我的总任务数
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 30, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(30));

        //或者ExecutorService executor = Executors.newFixedThreadPool(corePoolSize);

        // 初始化CountDownLatch,大小为30
        CountDownLatch countDownLatch = new CountDownLatch(30);

        // 模拟遍历参数集合
        for (int i = 0; i < 30; i++) {
            // 往线程池提交任务
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    int times = 0;
                    // 模拟数据拉取过程可能需要分页
                    while (true) {
                        // 模拟每个任务需要分页5次
                        if (times >= 5) {
                            break;
                        }
                        times++;

                        // 模拟计数
                        totalRows.incrementAndGet();
                        try {
                            // 模拟耗时
                            Thread.sleep(Long.valueOf(String.valueOf(new Double(Math.random() * 1000).intValue())));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                    // 子线程完成,countDownLatch执行countDown
                    countDownLatch.countDown();
                }
            });
            // 打印线程池运行状态
            System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                    executor.getQueue().size() + ",已执行结束的任务数目:" + executor.getCompletedTaskCount());
        }
        // 标记多线程关闭,但不会立马关闭
        executor.shutdown();

        // 阻塞当前线程,知道所有子线程都执行countDown方法才会继续执行
        countDownLatch.await();

        // 打印线程池运行状态
        System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" +
                executor.getQueue().size() + ",已执行结束的任务数目:" + executor.getCompletedTaskCount());

        // 打印计数
        System.out.println("结束:" + totalRows.get());
    }
}

控制台输出:

线程池中线程数目:1,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:6,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:9,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:0,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:1,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:2,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:3,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:4,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:6,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:7,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:8,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:9,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:10,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:11,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:12,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:13,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:14,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:15,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:16,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:17,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:18,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:19,已执行结束的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:20,已执行结束的任务数目:0
线程池中线程数目:0,队列中等待执行的任务数目:0,已执行结束的任务数目:30
结束:150

可以看出,线程池最大并发数为10,也是我所期望的数值。最终输出的计数也是正确的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,524评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,869评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,813评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,210评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,085评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,117评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,533评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,219评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,487评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,582评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,362评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,218评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,589评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,899评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,176评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,503评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,707评论 2 335

推荐阅读更多精彩内容

  • 为什么使用线程池 当我们在使用线程时,如果每次需要一个线程时都去创建一个线程,这样实现起来很简单,但是会有一个问题...
    闽越布衣阅读 4,269评论 10 45
  • 小娟,感恩有你 小娟:生命中有你是我的福气,今天借着练习写作就写一封感谢信给你,借此机会来表达我的谢意! 说你名如...
    萍子2阅读 185评论 0 0
  • 我们的信任在现今的社会中正在被一点点的丢掉! 信任是一个非常有价值的无形的东西。一个人,一个公司,一个国家都需要信...
    大張冰阅读 95评论 4 6
  • by hfy 每日新闻一分钟:美国人最害怕什么健康问题?_国外媒体资讯 - 可可英语 A new study sh...
    123逍遥游阅读 120评论 0 0