啃不透--线程池

引子   

上一篇文章《火焰图--记一次cpu降温过程》提到接手了单车投放的应用,上周已经有大量流量切到新应用,然而心中还是惴惴不安的,投放动作业务逻辑复杂,使用了线程池异步处理。对于线程池认知只是停留在一个模糊的状态。这是一个什么状态呢:感觉什么都懂,就是说不出来(就像考试前啥都懂,考试时咬笔杆)。每次使用线程遇到问题都不能用已经有知识去思考,比如3个核心参数应该如何设置,阻塞队列应该用什么...通过搜索引擎查询后,又感觉自己懂了,实际上只是做了一次无用的循环,这次输入的内容,因为没有体系根基,很快又被新的内容冲刷得毫无痕迹。

  最近加入同事文博组织的虚拟PMO团队--Thor,大家在交流分享结构化思维时,方才意识到自己在学习线程池上花了很多时间,还是不能清楚的描述它,更本原因就是没有从宏观上认识,建立初始知识体系,没有这个基础,零散学习只是增加了很多无效时间。这也就是为啥有的人学习快,有些慢;有的人可以举一反三,有的确不能。

  一个良好的学习过程应该是:

  1,找到初始化知识体系。

      2,补充丰富知识体系。

      我们都知道系统学习可以建立初始化知识体系,比如阅读一本相关书籍,写一些文章。那么如何补充丰富呢,一句话:拿到锤子,看到什么都是钉子。那应该如何理解这句话呢:这篇文章就是这个的实践。

                                        文博分享的关于结构化思维的脑图

为什么要用线程池

  随着处理器的核心越来越多,利用多线程技术可以把计算逻辑拆分成多个片段,分配到多个核心上,可以显著减少处理时间,提高效率;复杂的业务逻辑,可以使用多线程并发处理,缩短响应时间,提高用户体验。java的线程机制是抢占式协作多线程, 调用机制会周期性的中断线程,将上下文切换到另一个进程,线程越多,竞争会越多,切换会更频繁。所以增加线程带来的性能增加不是线性的,这就是amdahl定律。

  再者,线程的创建与销毁,上下文切换都不是免费的。《并发编程实战》一书中对于线程创建与销毁开销说明:

Thread lifecycle overhead. Thread creation and teardown are not free. The actual overhead varies across platforms, but thread creation takes time, introducing latency into request processing, and requires some processing activity by the JVM and OS. If requests are frequent and lightweight, as in most server applications, creating a new thread for each request can consume significative computing resources.

  大意如下:“线程生命周期开销:创建和销毁都是有代价的。实际开销虽因平台有所不同,但是都要消耗时间,jvm和os 需要执行一些处理程序;在大数请求频繁的服务端应用中,如果为每个请求创建一个线程将消耗非常可观的计算机资源”。以上概念层的开销,那一个java线程的创建实际开销则是这样的:

A large block of memory has to be allocated and initialized for the thread stack. 为线程栈分配内存

System calls need to be made to create / register the native thread with the host OS.  为os 创建和注册本地线程进行系统调用

Descriptors needs to be created, initialized and added to JVM internal data structures.  创建和初始化描述符,添加到jvm内部的数据结构。

  上下问切换(context switching)也是有开销的,需要分配内存存储当前状态,克隆系统调用等,具体可以参考文末参考资料[2]

    正是因为创建线程的代价是如此昂贵的(expensive),所以线程池出现了, 它以“池化”思想来管理资源,按需创建,分配,回收;并重复利用已有的线程资源。既然大家都用线程池,那么它的”真面目“是怎么样的呢-- 从源开开始。

源码分析

  java为多线程编程提供了良好的,考究并且一致的编程模型,让我们只需关注问题本身,而ThreadPoolExecutor类就是java为我们提供的线程池模型,其继承体系如下图,顶层接口定义了统一的行为,并将任务提交与任务执行的策略解藕开来;而AbstractExecutorService 抽象任务执行流程并串连起来;如此,子类只用关注某个具体方法了。

   一般而言 ThreadPoolExecutor.execute(Runnable()) 是我们使用线程池的入口

public void execute(Runnable command) {

    if(command ==null)    // 三种情况:

    intc = ctl.get();

    //1,线程数 少于 核心线程 直接创建线程if(workerCountOf(c) < corePoolSize) {

        if(addWorker(command,true))

            return;

        c = ctl.get();

    }

    //线程数数超过 核心线程,但是blockqueue 未满,enqueue.if(isRunning(c) && workQueue.offer(command)) {

        intrecheck = ctl.get();

        if(! isRunning(recheck) && remove(command))

            reject(command);

        elseif(workerCountOf(recheck) == 0)

            addWorker(null,false);

    }

    // queue 已经满,直接创建线程(超过max  reject)elseif(!addWorker(command,false))

        reject(command);

}

  execute方法的三个分支,决定了线程池中线程的创建执行策略(面试中经常碰到的场景就是:添加了多个任务时,创建了多少个线程):

  1,线程数 少于 核心线程 直接创建线程

  2,线程数数超过 核心线程,但是blockqueue 未满,enqueue.

  3, queue 已经满,直接创建线程(超过max reject)

  下图展示了线程的创建过程

   上面的代码中的判断条件中有两个:workerCountOf(c) -- 获取当前线程数; isRunning(c)  -- 线程池是否是运行状态。这两个方法的参数都是一个int类型,那么一个int是如何能同时表示两个类型呢。一个int 4个字节,32位,这里就是用指定位数(3位)来表示状态,剩下的29位表示线程数,下图展示了这个关系。jdk中还有一些其他类也同步用了这样方法,比如:ReentrantReadWriteLock,高16位表示共享锁的数量,低16位表示互斥锁的数量。


// CAPACITY= 00011111111111111111111111111111(29个1)

// 获取当前线程数// 线程池的最大数就是2^29-1privatestaticintworkerCountOf(int c)  {

returnc & CAPACITY;

}

  线程池做为一个对象,有自己的状态机,其状态变化是有内部事件驱动的。下图展示了每个状态以及对应值(状态值是3位二进制),及对应的行为。这里有个插曲:以前面试被问到线程池shutwon和stop的差别。当时认识不清说得特别含糊,其实从这两个状态的英文单词的含义就可以看出7,8分了。 showdown 牛津翻译为:the act of closing a factory or business or stopping a large machine from working, either temporarily or permanently。体现的是进行时,closing,stopping;stop 意思比较多,但都是表示的一个意思:end /  not continue。大师的变量名命名那真是相当精确的,要不怎么都提倡程序员学好英语呢。

   看完了线程池的调度入口,了解了线程池的状态机,我们继续来看下方法 addWorker(Runnable firstTask, boolean core),前文说到线程池的把任务的提交和执行解藕,那就是如何串连的呢,addWorker方法就很好的完成的这个串连。这个方法主要分两个部分:

  1,根据线程池状态及线程数判断是返回还是继续。其中第一个 if 条件尤为复杂,已经有注释。

  2,创建工作进程对象 Worker w ,并执行其持有的线程对象thread 的start 方法。顺利让解藕的执行部分开始工作。

  这里的代码逻辑不复杂,有一个标记还是有意思的: retry:(标记,可以写成任意如:abc:) / continue retry ;(跳出当前循环) /break retry; (跳出外层循环)。 以后跳出双重循环是不是也可以这样写?

privatebooleanaddWorker(Runnable firstTask,boolean core) {

        retry:

        for (;;) {

            intc = ctl.get();

            intrs = runStateOf(c);

            // Check if queue empty only if necessary.

      // 如果 是shutdown 以上, 后在有三个条件 都满足才可以接续执行

        1, shutdown 执行原有任务,可能加新任务。

        2, firstTask 必须为空。

        3, queue 不能为空(有任务才能接续执行。)

if(rs >= SHUTDOWN &&                ! (rs == SHUTDOWN &&                  firstTask ==null&&                  ! workQueue.isEmpty()))

                returnfalse;

            for (;;) {

                intwc = workerCountOf(c);

                if(wc >= CAPACITY ||                    wc >= (core ? corePoolSize : maximumPoolSize))

                    returnfalse;

                if (compareAndIncrementWorkerCount(c))

                    break retry;

                c = ctl.get();// Re-read ctlif(runStateOf(c) != rs)

                    continue retry;

                // else CAS failed due to workerCount change; retry inner loop            }

        }

        booleanworkerStarted =false;

        booleanworkerAdded =false;

        Worker w =null;

        try {

            w =new Worker(firstTask);

            finalThread t = w.thread;

            if(t !=null) {

                finalReentrantLock mainLock =this.mainLock;

                mainLock.lock();

                try {

                    // Recheck while holding lock.

                    // Back out on ThreadFactory failure or if

                    // shut down before lock acquired.intrs = runStateOf(ctl.get());

                    if(rs < SHUTDOWN ||                        (rs == SHUTDOWN && firstTask ==null)) {

                        if(t.isAlive())// precheck that t is startablethrownew IllegalThreadStateException();

                        workers.add(w);

                        ints = workers.size();

                        if(s > largestPoolSize)

                            largestPoolSize = s;

                        workerAdded =true;

                    }

                } finally {

                    mainLock.unlock();

                }

                if (workerAdded) {

                    t.start();

                    workerStarted =true;

                }

            }

        } finally {

            if(! workerStarted)

                addWorkerFailed(w);

        }

        return workerStarted;

    }

   接下来任务的执行就交给了工作线程 Worker w 了,这是一个内部类实现了接口 Runnable,构造函数中对的 属性thread初始化传是this,  如此 addWorker 方法中的 t.start(); 就顺利调用了Worker的run 方法了,而run方法又调用 runWorker。所以真正执行任务的最终方法在这里 -- runWorker。

Worker

Worker(Runnable firstTask) {

    setState(-1);// inhibit interrupts until runWorkerthis.firstTask = firstTask;

    // 注意:这里,这个线程 传的runnable 是this, 也就是 worker本身, 所以start()后进入runnable状态,等到获取时间片后,就执行 run方法。this.thread = getThreadFactory().newThread(this);

}

}

/** Delegates main run loop to outer runWorker  */

public void run() {

    runWorker(this);

}


  我们继续来读最关键的方法runWorker,我删除了一些判断以及异常处理的代码,让我们可以清晰看到处理逻辑:获取任务,执行,回收线程。获取任务有两种情况:1,线程数小于核心数和队列满了但线程未到最大线程数时直接传入了任务;2,从阻塞获取任务,getTask()方法完成了这一任务

finalvoid runWorker(Worker w) {

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

        w.firstTask =null;

        w.unlock(); // allow interruptsbooleancompletedAbruptly =true;

        try {

            while(task !=null|| (task = getTask()) !=null) {

                w.lock();try {

                    Throwable thrown =null;

                    try {

                        task.run();                    }

            afterExecute(task, thrown);                    } finally { task =null;

            // 统计完成任务数

                    w.completedTasks++;

                    w.unlock();

                }

            }

            completedAbruptly =false;

        } finally{//回收工作线程,尝试更新状态。processWorkerExit(w, completedAbruptly);        }    }

ThreadPoolExecutor 中定义了 HashSetworker 工作线程队列,BlockingQueueworkQueue 任务队列 来实现了工作线程和任务管理与解藕。到里线程的任务添加流程和执行过程就分析完了,当然中间抛弃了大量细节,比如锁的使用,比如状态机的变化等等。还是如前文所说,先建立初始化知识体系,后面再研究细节补充体系,每次的投入都是在强化它,再也不是无效时间了。简版调用时序图如下:

线程池监控

文章开头提到流量增大,心中不安,很大一部分原因,就是因为无法监控到线上线程池的状态,比如阻塞队列中的任务数,活跃线程数,线程池大小等等。当然这也是原于早前的无知,平时我们写代码主要分成两部分:功能性代码,实现业务功能;运维性代码,监控程序状态,分析问题。大师的代码也不例外,只是优雅很多。ThreadPoolExecutor 中有提供了相关运维代码,并在runWorker 中使用模板方法设计模式,为我们获取线程池状态等信息提供接口了,比如:beforeExecute(wt, task);  afterExecute(task, thrown);   ThreadPoolExecutor中这两个方法都是空实现,我们可以继承,并重写完成状态的获取。获取线程池运维状态提代了如下方法下图。

  参考了一位网友的代码(忘记出处了),继承ThreadPoolExecutor ,重写afterExecute,打印线程池相关信息

@Slf4jpublicclassThreadPoolMonitorextends ThreadPoolExecutor {

    private String poolName;

    /**    * 调用线程池的构造方法,并记录线程池名

    *

    * @param corePoolSize    线程池核心线程数

    * @param maximumPoolSize 线程池最大线程数

    * @param keepAliveTime  线程的最大空闲时间

    * @param unit            空闲时间的单位

    * @param workQueue      保存被提交任务的队列

    * @param poolName        线程池名称

    */publicThreadPoolMonitor(intcorePoolSize,intmaximumPoolSize,long keepAliveTime,

                            TimeUnit unit, BlockingQueue workQueue, String poolName) {

        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,

                new HamThreadFactory(poolName));

        this.poolName = poolName;

    }

    /**    * 任务执行之后,将相关状态记录日志。

    */    @Override

    protectedvoid afterExecute(Runnable r, Throwable t) {

        // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、

        // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、

        // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止log.info("{}-pool-monitor: " +                        " PoolSize: {}, CorePoolSize: {}, Active: {}, " +                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +                        "MaximumPoolSize: {},  KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",

                this.poolName,this.getPoolSize(),this.getCorePoolSize(),this.getActiveCount(),

                this.getCompletedTaskCount(),this.getTaskCount(),this.getQueue().size(),this.getLargestPoolSize(),

                this.getMaximumPoolSize(),this.getKeepAliveTime(TimeUnit.MILLISECONDS),this.isShutdown(),this.isTerminated());

    }

}

View Code

结语

  最近学习一直停留在输入(看)层面,所看内容无法转化成自己的知识体系,因而很多东西都无法深入,我们当然知道原因,但是总是说忙得没时间整理。入职哈啰后看到很多优秀的人都是每天大量记录心得感想。等到文博和我们分享他的读书笔记时,从震撼到忏愧。知识只有经历了 输入-消化-输出 才会最终成为我们拥有的。为此文博还在Thor团队发起对赌打卡 -- 哈哈哈,我们当然要应战。

  文章到这里就结束了,因为个人经验还有很多不足,文章中的分析也比较粗浅,甚至有错误的地方,希望大家可以拍砖,狠狠的拍。

   成为一名优秀的程序员!

   参考资料:

[1]https://intellipaat.com/community/36170/why-is-creating-a-thread-said-to-be-expensive

[2]https://eli.thegreenplace.net/2018/measuring-context-switching-and-memory-overheads-for-linux-threads/

[3]https://mp.weixin.qq.com/s/baYuX8aCwQ9PP6k7TDl2Ww

[4] 《java并发编程》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,732评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,496评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,264评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,807评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,806评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,675评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,029评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,683评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,704评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,666评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,773评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,413评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,016评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,204评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,083评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,503评论 2 343

推荐阅读更多精彩内容