Executor线程池源码分析

日期:2020/7/5  

最近看并发源码,把自个理解整理记录,说实话......一天不看就得重头看,脑子可能比较笨,要反复琢磨理解,进度比较慢,还没整理研究完.......最终目的能自己手写一个线程池

                                            线程池               

一.线程池使用场景

      1.单个任务处理时间比较短

      2.需要处理的任务量很大


二.线程池优势

        1.重用存在的线程,减少线程创建,消亡的开销,提高性能

        2.提高响应速度,当任务到达时,任务可以不要等到线程创建就立即执行    

        3.提高线程的可管理性,线程是稀缺资源,如果无线创建,会消耗系统资源,降低系统稳定性,线程池可以进行统一分配,调优和调度


三.线程池分析

        1.线程池的创建

        //利用Executors工具类去创建线程池

        ExecutorService executor = Executors.newFixedThreadPool(5);

                                    \Downarrow             

        public static ExecutorService  newFixedThreadPool(int nThreads) {

//参数:线程池数量,最大线程池数量,线程最大空闲存活时间,空闲时间计量单位,无空闲线程则存放到阻塞队列中

return new ThreadPoolExecutor(nThreads, nThreads,

                                  0L, TimeUnit.MILLISECONDS,

                                  new LinkedBlockingQueue());

}



        2.线程池的工作流程

        线程池的大小 maximumPoolSize = 核心线程5+非核心线程5 = 10

        当有任务进入的时候,会优先将任务分配到核心线程,其他剩余的任务放入到阻塞队列中去等待,如果此队列是一个有界队列,则直到放满为止,如果还有任务则创建非核心线程,如还有剩余任务没有分分配,则执行RejectedExecutionHandler拒绝策略
        


        3.线程池的重点属性

public class ThreadPoolExecutor extends AbstractExecutorService {

            private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,  0));             private static final int COUNT_BITS = Integer.SIZE ­ 3;             private static final int CAPACITY   = (1 << COUNT_BITS) ­ 1;

}

属性解析:    ctl 是对线程池的运行状态和线程池中有效线程的数量进行控制的一个字段, 它包含两 部分的信息: 线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount),这 里可以看到,使用了Integer类型来保存,高3位保存runState,低29位保存 workerCount。COUNT_BITS 就是29,CAPACITY就是1左移29位减1(29个1),这个常 量表示workerCount的上限值,大约是5亿。

ctl相关方法 :

        private static int runStateOf(int c)     { return c & ~CAPACITY; }         private static int workerCountOf(int c)  { return c & CAPACITY; }         private static int ctlOf(int rs, int wc) { return rs | wc; } 

runStateOf:获取运行状态;         workerCountOf:获取活动线程数; ctlOf:获取运行状态和活动线程数的 

      


  4.线程池存在的5种状态

RUNNING    = ­1 << COUNT_BITS; //高3位为111 

SHUTDOWN   =  0 << COUNT_BITS; //高3位为000 

STOP       =  1 << COUNT_BITS; //高3位为001 

TIDYING    =  2 << COUNT_BITS; //高3位为010 

TERMINATED =  3 << COUNT_BITS; //高3位为011

1.RUNNING:

        (1)状态说明:线程池处于RUNNING状态时,能够接收新任务,以及对已经添加的任务进行处理

        (2)状态切换:线程池的初始化状态是RUNNING,话句话说,线程池一但被创建,就处于RUNNING状态,并且线程池种的任务书为0

2.SHUTDOWN:

        (1)状态说明:线程池处在SHUTDOWN状态时,不接受新任务,但能处理已经添加的任务

        (2)状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN

3.STOP:

        (1):状态说明:线程池处在STOP状态时,不接受新任务,不处理已添加的任务,并且会中断正在处理的任务

        (2)状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN)-->STOP

4.TIDYING:

        (1) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING

状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在

ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;

可以通过重载terminated()函数来实现。

        (2) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也

为空时,就会由 SHUTDOWN -> TIDYING。 当线程池在STOP状态下,线程池中执行的

任务为空时,就会由STOP -> TIDYING。

5.TERMINARTED:

        (1) 状态说明:线程池彻底终止,就变成TERMINATED状态。

        (2) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -

> TERMINATED。


状态转换流程图



                            ThreadPoolExecutor

1.线程池的创建:

ThreadPoolExecutor(int corePoolSize,                                 

                                        int maximumPoolSize,                     

                                       long keepAliveTime,                        

                                       TimeUnit unit, 

                                     BlockingQueue<Runnable> workQueue, 

                                    ThreadFactory threadFactory, 

                                    RejectedExecutionHandler handler)   

参数解释:

        corePoolSize :       

线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当

前线程数等于corePoolSize;如果当前线程数为corePoolSize,继续提交的任务被保存到

阻塞队列中,等待被执行;

       maximumPoolSize:

线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线

程执行任务,前提是当前线程数小于maximumPoolSize;

        keepAliveTime:

线程池维护线程所允许的空闲时间,当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立刻销毁,而是会等待,知道等待的时间超过了keepAliveTime

        unit  :

keepAliveTime的单位

        workQueue:

用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供

了如下阻塞队列:

1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;

2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞

吐量通常要高于ArrayBlockingQuene;

3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到

另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于

LinkedBlockingQuene;

4、priorityBlockingQuene:具有优先级的无界阻塞队列;

        threadFactory:

它是ThreadFactory类型的变量,用来创建新线程。默认使用Executors.defaultThreadFactory() 来创建线程。使用默认的ThreadFactory来创建线程

时,会使新创建的线程具有相同的NORM_PRIORITY优先级并且是非守护线程,同时也设

置了线程的名称。

        handler:

线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须提供一中策略处理该任务,一般提供了4中策略

1、AbortPolicy:直接抛出异常,默认策略;

2、CallerRunsPolicy:用调用者所在的线程来执行任务;

3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;

4、DiscardPolicy:直接丢弃任务;

当然也可以根据应用场景实现RejectedExecutionHandler接口,自定义饱和策略,如

记录日志或持久化存储不能处理的任务。



1.任务提交(execute方法源码分析)

        public void execute() //提交任务无返回值

public void execute(Runnable command) {

        if (command ==null)

                    throw new NullPointerException();


        int c =ctl.get();

         /*\color{green}{workerCountOf方法取出低29位的值,表示当前的线程数;}

   \color{green}{如果当前活动线程数小于corePoolSize,则新建一个线程放入线程池中;}

       \color{green}{ 并把任务添加到该线程中}*/

        if (workerCountOf(c)<corePoolSize){

                \color{green}{*断还是maximumPoolSize}

                \color{green}{*如果为true,根据corePoolSize来判断;}

              \color{green}{  *如果为false,则根据maximumPoolSize来判断}

                if (addWorker(command, true))

                        return;

                \color{green}{ //如果添加失败,则重新获取ctl值}

                c =ctl.get();

    }

             //   \color{green}{如果当前线程是运行状态,并且任务添加到队列成功}

if (isRunning(c) &&workQueue.offer(command)) {

            int recheck =ctl.get();

        \color{green}{//再次判断线程的运行状态,如果不是运行状态,由于之前已经把command添加到workQueue中了,}

\color{green}{这时需要移除该command,执行通过后执行handler使用拒绝策略,进行处理}

          if (!isRunning(recheck) && remove(command))

                   reject(command);

\color{green}{//获取线程池中有效的线程数,如果是0,则执行addworker()方法}

\color{green}{//第一参数为null,表示在线程池中创建一个线程,但不去启动,第二个参数为false,表示添加时根据maximumPoolSize来判断}

          else if (workerCountOf(recheck) ==0)

                  addWorker(null, false);

    }

\color{green}{//执行到这里,有两种情况:}

        \color{green}{1.线程池已经不是RUNNING状态,                         }

           \color{green}{   2.线程池是RUNNING状态,但workerCount >= corePoolSize并且 workQueue已满}          \color{green}{//这时再次调用addWorker方法,但第二个参数是false,将线程池的有限线程数量上线设置为maximumPoolSize}

else if (!addWorker(command, false))

            reject(command);

}

\color{red}{个人总结:} 

 简单来说,在执行execute()方法时如果状态一直是RUNNING,执行过程如下:                

        1. 如果workerCount < corePoolSize,则创建并启动一个线程来执行新提交的任

务;

        2. 如果workerCount >= corePoolSize,且线程池内的阻塞队列未满,则将任务添

加到该阻塞队列中;

        3. 如 果 workerCount >= corePoolSize && workerCount <

            maximumPoolSize,且线程池内的阻塞队列已满,则创建并启动一个线程来执行新

            提交的任务;

        4. 如果workerCount >= maximumPoolSize,并且线程池内的阻塞队列已满, 则根

            据拒绝策略来处理该任务, 默认的处理方式是直接抛异常。

            这里要注意一下addWorker(null, false);,也就是创建一个线程,但并没有传入任务,因为

            任务已经被添加到workQueue中了,所以worker在执行的时候,会直接从workQueue中

            获取任务。所以,在workerCountOf(recheck) == 0时执行addWorker(null, false);也是

            为了保证线程池在RUNNING状态下必须要有一个线程来执行任务。


 execute方法执行流程



2.addWorker方法

    addWorker方法的主要工作是在线程池中创建一个新的线程并执行,代码如下:

private boolean addWorker(Runnable firstTask, boolean core) {

    retry:

    for (;;) {

            int c =ctl.get();

            \color{green}{//获取运行状态}

            int rs =runStateOf(c);

                \color{green}{//这个if判断,如果rs>=SHUTDOWN,则表示不能再接收新任务}

\color{green}{//接着判断一下3个条件,只要有1个不满足,则返回false}
\color{green}{1.rs==SHUTDOWN,这是表示关闭状态,不再接受新任务}
\color{green}{2.frisTask为空,3.阻塞队列不为空}
\color{green}{//因为队列中已经没有任务了,不需要再添加线程了}

            if (rs >=SHUTDOWN &&

            ! (rs ==SHUTDOWN &&

                firstTask ==null &&

                    !workQueue.isEmpty()))

                        return false;

        for (;;) {//这个自旋是来判断我们这个线程池是否可以创建我们的任务

            \color{green}{//获取线程数}

                    int wc =workerCountOf(c);  \color{green}{//如果wc超过CAPACITY(workCountd的上限值)}  \color{green}{//这里的core是addworker方法的第二个参数}                \color{green}{//如果为true则根据corePoolSize比较,否则根据maximumPoolSize比较}

                    if (wc >=CAPACITY ||

                        wc >= (core ?corePoolSize :maximumPoolSize))

                        return false;

                    \color{green}{//尝试增加workerCount,如果成功则跳出第一个循环}               
                   if (compareAndIncrementWorkerCount(c))

                                        break retry;

                    \color{green}{//如果新增失败,则获取ctl的值}

                         c =ctl.get();  

                        \color{green}{//如果当前运行状态不等于re,说明状态已经改变,翻译第一个for循环继续执行}

                          if (runStateOf(c) != rs)

                                continue retry;

        }

}

    boolean workerStarted =false;

    boolean workerAdded =false;

    Worker w =null;

    try {

        \color{green}{//根据firstTask对象来创建Worker对象,此类的分析在后面分析}

        w =new Worker(firstTask);

        final Thread t = w.thread;

        if (t !=null) {

            final ReentrantLock mainLock =this.mainLock;

            mainLock.lock();

            try {

                int rs =runStateOf(ctl.get());

\color{green}{//rs<SHUTDOWN表示是RUNNING状态}

\color{green}{/或者是SHUTDOWN状态,并且firstTask
是null,向线程池中添加线程}

                if (rs<SHUTDOWN||

                        (rs ==SHUTDOWN && firstTask ==null)) {

                        if (t.isAlive())

                                    throw new IllegalThreadStateException();
                        \color{green}{//workers是一个HashSet}

                        workers.add(w);

                    int s =workers.size();
                \color{green}{//记录这线程池中出现过的最大线程数量}

                    if (s >largestPoolSize)

                        largestPoolSize = s;

                    workerAdded =true;

                }

}finally {

mainLock.unlock();

            }

if (workerAdded) {

            \color{green}{//启动线程,这里执行了Worker类中的run方法}

               t.start();

                workerStarted =true;

            }

}

}finally {

            if (! workerStarted)

            addWorkerFailed(w);

    }

            return workerStarted;

}



3.Worker类

        线程池中的每一个线程被封装成一个Worker对象,        ThreadPool维护的其实就是一组Worker对象

        Worker继承了AQS,实现了Runnable接口,其中的firstTask和thread属性,firstTask用他来保存传入的任务,thread是在调用构造方法时通过ThreadFactory来创建的线程,用来处理任务的线程

        在调用构造方法时,需要把任务传入,这里通过getThreadFactory().newThread(this);来新建一个线程,newThread方法传入的参数是this,因为Worker本身继承了Runable接口,也就是一个线程,所以Worker对象在启动的时候回调用Worker类中的run方法

Worker类



4.RunWorker方法

        在Worker类中的run方法调用了runWorker方法来执行任务,runWorker方法的代码如下:

final void runWorker(Worker w) {            

        Thread wt = Thread.currentThread();

        Runnable task = w.firstTask;

       w.firstTask =null;

\color{green}{//   允许中断,因为在创建Worker的时候setStatus(-1),是不允许中断的,执行到这里可以中断了}

        w.unlock(); 

\color{green}{//是否因为异常退出循环}

        boolean completedAbruptly =true;

\color{green}{//线程池一直保持存活的原因}

                try {

\color{green}{//如果task为空,则通过getTask来获取任务(阻塞队列中)}

\color{red}{//getTask()方法在下面解析}

               while (task !=null || (task = getTask()) !=null) {

                    w.lock();

                    if ((runStateAtLeast(ctl.get(), STOP) ||

                    (Thread.interrupted() &&

                    runStateAtLeast(ctl.get(), STOP))) &&

                    !wt.isInterrupted())

                                wt.interrupt();

            try {

                beforeExecute(wt, task);

                Throwable thrown =null;

                try {

                            task.run();

                }catch (RuntimeException x) {

                thrown = x; throw x;

                }catch (Error x) {

                            thrown = x; throw x;

                }catch (Throwable x) {

                            thrown = x; throw new Error(x);

                }finally {

                            afterExecute(task, thrown);

                }

            }finally {

                            task =null;

                w.completedTasks++;

                w.unlock();

            }

}

completedAbruptly =false;

    }finally {

\color{red}{//下面讲解此方法,进程退出的逻辑}

processWorkerExit(w, completedAbruptly);

    }

}

\color{red}{总结 runWorker方法的执行过程}:

1. while循环不断地通过getTask()方法获取任务;

2. getTask()方法从阻塞队列中取任务;

3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是

中断状态;

4. 调用task.run()执行任务;

5. 如果task为null则跳出循环,执行processWorkerExit()方法;

6. runWorker方法执行完毕,也代表着Worker中的run方法执行完毕,销毁线程。

这里的beforeExecute方法和afterExecute方法在ThreadPoolExecutor类中是空的,留给

子类来实现。

completedAbruptly 变 量 来 表 示 在 执 行 任 务 过 程 中 是 否 出 现 了 异 常 , 在

processWorkerExit方法中会对该变量的值进行判断。




5.getTask方法

getTask方法用来从阻塞队列中获取任务

private RunnablegetTask() {        

\color{green}{// timeOut变量的值表示上次从阻塞队列中取任务时是否超时}

        boolean timedOut =false; 

               for (;;) {

                    int c =ctl.get();

                    int rs =runStateOf(c);

                    if (rs >=SHUTDOWN && (rs >=STOP ||workQueue.isEmpty())) {

                            decrementWorkerCount();

                                return null;

                    }

                    int wc =workerCountOf(c);

                \color{green}{//timed变量用于判断是否需要进行超时控制,allowCoreThreadTimeOut默认是false}

                \color{green}{//allowCoreThreadTimeOut 是否允许核心线程超时,如果允许,一旦超时,核心线程和非核心线程都会结束生命周期}

\color{green}{//wc>corePoolSize,说明有非核心线程,time为true,允许停掉}

\color{green}{//对于超过核心线程数量的这些线程,需要进行超时控制}

                    boolean timed =allowCoreThreadTimeOut || wc >corePoolSize;

                    if ((wc >maximumPoolSize || (timed && timedOut))

                        && (wc >1 ||workQueue.isEmpty())) {

                    \color{green}{//判断有效线程数>1或者阻塞队列是空的,那么尝试将workerCount减1}

                                    if (compareAndDecrementWorkerCount(c))

                                                return null;

                                                continue;

                                        }

try {

            Runnable r = timed ?

\color{green}{//timed为true,要进行超时判断,如果在keepAliveTime时间内没有获取到任务,则返回null}

            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :

            workQueue.take();

            if (r !=null)

                return r;

//这里设置timedOut=true,会在第二次for循环中返回null,workerCount数量减1

            timedOut =true;

        }catch (InterruptedException retry) {

timedOut =false;

        }

}

}



  6.processWorkerExit方法

            private void processWorkerExit(Worker w, boolean completedAbruptly) {

\color{green}{//如果completedAbruptly为true,说明线程在执行过程中出现了异常,需要将workerCount减1}

\color{green}{//如果线程执行过程中没有异常,说明在getTask()中已经对workerCount进行了减1操作,这里不需要再减}

                    if (completedAbruptly)

                                decrementWorkerCount();

                    final ReentrantLock mainLock =this.mainLock;

                    mainLock.lock();

                    try {

                        \color{green}{//统计完成的任务数}

                            completedTaskCount += w.completedTasks;

                        \color{green}{//从workers中移除,也就表示着从线程池中移除了一个工作线程}

                            workers.remove(w);

                    }finally {

                            mainLock.unlock();

                    }

                tryTerminate();

                int c =ctl.get();

               if (runStateLessThan(c, STOP)) {

                       if (!completedAbruptly) {

                                int min =allowCoreThreadTimeOut ?0 :corePoolSize;

                                if (min ==0 && !workQueue.isEmpty())

                                        min =1;

                                if (workerCountOf(c) >= min)

                                            return; 

                }

                addWorker(null, false);

        }

}



至此,processWorkerExit执行完之后,工作线程被销毁,以上就是整个工作线程的生

命周期,从execute方法开始,Worker使用ThreadFactory创建新的工作线程,

runWorker通过getTask获取任务,然后执行任务,如果getTask返回null,进入

processWorkerExit方法,整个线程结束,如图所示:


线程池的生命周期
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342