ThreadPoolExecutor核心源码分析

Java提供的线程池(即ThreadPoolExecutor类)实现了线程的创建和管理、任务的调度与执行,因为减少了线程的创建、销毁等过程,所以当执行大量异步任务时,线程池可以提供更好的性能。本文将从源码角度对线程池的实现原理进行分析,如有必要,会在源码中添加标记,并对标记处的条件通过举例的形式进行复现,留作参考。

1. 线程池状态及转换

1.1 五种线程池状态

(1)RUNNING:接受新任务且会处理队列中的任务
(2)SHUTDOWN:不接受新任务但会处理队列中的任务
(3)STOP:不接受新任务,也不处理队列中的任务,并且会中断正在执行的任务
(4)TIDYING:处于SHUTDOWN状态时,队列为空且池为空或处于STOP状态时,池为空,都会切换到TIDYING状态
(5)TERMINATED:处于TIDYING状态时,调用terminated()方法并执行完毕后,会切换到TERMINATED状态

1.2 五种线程池状态的转换

从图中可以看出,当调用了shutdown()后,线程池不再接受新任务,在处理完队列中的任务且池中所有线程都退出后,会切换为TIDYING状态,执行完terminated()后会切换为TERMINATED状态;当调用了shutdownNow()后,线程池不再接受新任务,队列中的任务会移交到外部,池中所有线程退出后,会切换为TIDYING状态,执行完terminated()后会切换为TERMINATED状态。总之,调用shutdown()或shutdownNow()后,经过一系列处理,线程池最终会被终止。

2. ThreadPoolExecutor核心源码分析

2.1 简写说明
  • TF:ThreadFactory
  • TPE:ThreadPoolExecutor
  • REH:RejectedExecutionHandler
  • AQS:AbstractQueuedSynchronizer
2.2 TPE.Worker

Worker是TPE的内部类。Worker实现了Runnable,在构造方法中,Worker会与Thread进行绑定。Worker就是用于执行任务的工作线程,它会先执行自己的初始任务,之后会从阻塞队列中获取任务来执行。
Worker继承了AQS,它会使用AQS中的一些字段和方法,也重写了AQS中的一些方法,简化了自身加锁和解锁的过程。TPE.runWorker方法中的的加锁和解锁操作不是因为线程竞争(w.lock()和w.unlock()之间根本不存在竞争),而是为了对中断行为进行控制。Worker有三种状态(即AQS.state有三个值):

  • -1:表示Worker对应的线程未启动
  • 0:表示Worker对应的线程已启动,但正在等待着获取任务,处于空闲状态
  • 1:表示Worker对应的线程已启动且正在执行任务

当调用TPE.shutdown时,若状态为0,线程会被中断;当调用TPE.shutdownNow时,若状态为0或1,线程会被中断;当状态为-1时线程不可被中断。这些状态通过Worker.lock和Worker.unlock进行切换。

    private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        // 与当前Worker绑定的线程
        final Thread thread;
        // 每个Runnable就是一个任务
        // firstTask是Worker的初始任务,可能为null(后面会详细介绍)
        Runnable firstTask;
        // 记录当前Worker执行的任务数
        volatile long completedTasks;

        Worker(Runnable firstTask) {
            // 初始状态为-1,表示未启动,不能被中断
            setState(-1);
            this.firstTask = firstTask;
            // 用线程工厂创建线程(与this.thread = new Thread(this);类似)
            this.thread = getThreadFactory().newThread(this);
        }
        // 在addWorker中会创建Worker,并调用
        // Worker.thread.start,最终会调用到该run方法
        public void run() { runWorker(this); }
        // 将AQS.state设置为1
        public void lock() {...}
        // 将AQS.state设置为0
        public void unlock() {...}
        // 若Worker的状态(即AQS.state的值)为0或1,则对
        // Worker对应的线程(即this.thread)的中断标志位进行设置
        // 调用TPE.shutdownNow时,会调用到该方法
        void interruptIfStarted() {...}
        // 其他(略)
    }
2.3 runState、workerCount相关字段
    // ctl(control的缩写)是AtomicInteger类型,因此其大小与int类型一致,其
    // 初始值(即ctl.value的初始值)是 1110 0000 0000 0000 0000 0000 0000 0000
    // ctl包含两部分信息:
    //    高3位:线程池运行状态(runState)
    //   低29位:有效线程个数(workerCount)
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    // COUNT_BITS为29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    // CAPACITY为 2^29 -1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 线程池运行状态,可看出5种状态高3位均不相同,且从小到大单调递增,因为
    // 高位决定大小,所以后面会看到源码中会通过比较大小确定线程池的运行状态
    
    // RUNNING为 1110 0000 0000 0000 0000 0000 0000 0000
    private static final int RUNNING    = -1 << COUNT_BITS;
    // SHUTDOWN为 0000 0000 0000 0000 0000 0000 0000 0000
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    // STOP为 0010 0000 0000 0000 0000 0000 0000 0000
    private static final int STOP       =  1 << COUNT_BITS;
    // TIDYING为 0100 0000 0000 0000 0000 0000 0000 0000
    private static final int TIDYING    =  2 << COUNT_BITS;
    // TERMINATED为 0110 0000 0000 0000 0000 0000 0000 0000
    private static final int TERMINATED =  3 << COUNT_BITS;
2.4 runState、workerCount相关辅助方法

(1)截取、拼接ctl.value

    // 从c(c是某个时刻的ctl.value)中"截取"runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    // 从c中"截取"workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    // 将rs和ws"拼接"成ctl
    private static int ctlOf(int rs, int wc) { return rs | wc; }

(2)判断runState

    // 通过比较大小检查runState
    private static boolean runStateLessThan(int c, int s) { return c < s; }
    private static boolean runStateAtLeast(int c, int s) { return c >= s; }
    // c < SHUTDOWN说明runStateOf(c)是RUNNING
    private static boolean isRunning(int c) { return c < SHUTDOWN; }    

(3)修改workerCount

    // 以CAS方式将ctl.value加1(expect是某个时刻ctl.value的值)
    private boolean compareAndIncrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect + 1);
    }
    // 以CAS方式将ctl.value减1(expect是某个时刻ctl.value的值)
    private boolean compareAndDecrementWorkerCount(int expect) {
        return ctl.compareAndSet(expect, expect - 1);
    }
    // 以CAS方式将ctl.value减1,直到成功为止
    private void decrementWorkerCount() {
        do {} while (! compareAndDecrementWorkerCount(ctl.get()));
    }
2.5 核心字段
    // 阻塞队列
    // 线程池中的线程数超过核心线程数时,传入的Runnable会存入该队列
    // 为了兼容特殊用途的队列,源码中仅用workQueue.isEmpty判断队列是否为空
    private final BlockingQueue<Runnable> workQueue;
    // 只要涉及到workers的操作,都要加锁
    private final ReentrantLock mainLock = new ReentrantLock(); 
    // workers就是存着所有Worker的线程池
    private final HashSet<Worker> workers = new HashSet<Worker>();  
    // 外部可能会调用TPE.awaitTermination等待线程池进入TERMINATED状态,
    // 在TPE.tryTerminate中会调用termination.signalAll唤醒这些等待着的线程
    private final Condition termination = mainLock.newCondition();  
    // largestPoolSize记录了线程池中的线程数峰值(即Worker数的最大值)
    // 操作该字段时也需要mainLock
    private int largestPoolSize;
    // 每个Worker线程退出时会将自己执行的任务数(即Worker.
    // completedTasks字段的值)累加到completedTaskCount中
    // 操作该字段时也需要mainLock
    private long completedTaskCount;
    
    // 下面6个字段是用户可设置的,都用volatile修饰来保证可见性
    // 线程工厂(默认为Executors.DefaultThreadFactory)
    private volatile ThreadFactory threadFactory;
    // 拒绝策略的handler(默认为AbortPolicy)
    // Java预定义了4种:
    //   AbortPolicy:抛异常
    //   CallerRunsPolicy:若线程池仍处于RUNNING状态,则将任务交给调用execute的外部线程
    //                     (这将减慢新任务的提交速率,算是一种简单的反馈机制)
    //   DiscardPolicy:直接丢弃(DiscardPolicy.rejectedExecution是空实现)
    //   DiscardOldestPolicy:若线程池仍处于RUNNING状态,则删掉队列中的最老的(即第一个)任务,
    //                        重新执行execute,可能会再次失败,但会不断重复
    private volatile RejectedExecutionHandler handler;
    // 等待任务的空闲线程的超时时长
    // 当超过核心线程数或allowCoreThreadTimeOut为
    // true时,空闲线程等待新任务的时长是keepAliveTime
    private volatile long keepAliveTime;
    // 为false时,核心线程会无限期等待新的任务
    // 为true时,核心线程等待新任务的时长是keepAliveTime
    private volatile boolean allowCoreThreadTimeOut;    
    // 核心线程数(线程池中可以保持存活的最小线程个数,
    // 当allowCoreThreadTimeOut为true时,最小值为0)
    private volatile int corePoolSize;
    // 最大线程数
    // 虽然外部最大可将maximumPoolSize设置为Integer.MAX_VALUE,
    // 但源码中会进行判断,不允许超过CAPACITY(即2^29-1)
    private volatile int maximumPoolSize;   
2.6 execute
    // 一个Runnable就是一个任务
    public void execute(Runnable command) {
        // command为null,则抛异常
        if (command == null)
            throw new NullPointerException();
        // 获取最新的ctl.value
        int c = ctl.get();
        
        // step1:添加核心线程     
        // (其实核心线程和非核心线程只是逻辑上的一种叫法,后面会看到,
        // 二者是用数值进行区分的,而不是类型,所以二者实际没有差别)
        
        // 判断workerCount是否小于核心线程数
        if (workerCountOf(c) < corePoolSize) {
            // 添加Worker
            // 传入true,则方法里会以corePoolSize作为判断依据
            if (addWorker(command, true)) // 标记e
                // 添加成功,直接返回
                return;
            // 到这里说明添加失败,ctl.value可能已经改变,重新获取
            c = ctl.get();
        }
        // step2:将任务添加到阻塞队列 
        if (isRunning(c) && workQueue.offer(command)) {
            // 标记a
            // 入队期间,线程状态可能已经改变,重新获取ctl.value进行检查
            int recheck = ctl.get(); 
            // 若线程状态改变,则移除command,并执行拒绝策略
            if (! isRunning(recheck) && remove(command)) // 标记b
                reject(command);
            // 若workerCount为0,添加一个Worker
            else if (workerCountOf(recheck) == 0) // 标记c
                addWorker(null, false); // 标记d
        }
        // step3:添加非核心线程
        // 传入false,则方法里会以maximumPoolSize作为判断依据
        else if (!addWorker(command, false)) // 标记f
            // step4:添加失败,执行拒绝策略
            reject(command);
    }

(1)标记b处

  • 例1
    假设有两个Worker线程thread0、thread1,若thread0执行到标记a处时,thread1调用了shutdown方法并执行完毕,之后thread0执行到标记b处时两个条件均为true
  • 例2
    将上面例子中的shutdown改为shutdownNow,其他不变,thread0执行到标记b处时条件1为true,条件2为false(因为shutdownNow(shutdownNow的返回值类型为List<Runnable>)中会将队列中的任务移除,返回给外部,所以调用remove时,队列为空,因此条件2会返回false)

(2)标记c处

  • 例1
    若自定义了一个TF,重写的newThread返回null,用该TF初始化TPE,调用execute会执行到这里且条件成立,不过因为重写的newThread返回null,所以标记d处仍会失败
2.7 addWorker
    // execute中标记e和标记f两处调用addWorker时传入的firstTask
    // 不为null,其余调用addWorker的地方传入的firstTask均为null。
    
    // firstTask不为null时,Worker会先执行firstTask,再从队列中获取任务并执行;
    // firstTask为null时,Worker直接从队列中获取任务并执行。 
    
    // Worker中firstTask字段存在的意义:
    //   对标记a处:核心线程是不依赖队列的,若没有firstTask,任务就没有地方存
    //   对标记b处:因为队列已满,若没有firstTask,任务也没地方存
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            // 外层循环:检查线程池运行状态
            int rs = runStateOf(c);
            // 为便于理解,将if中的整个条件取反:
            //   即 当 rs == RUNNING || 
            //          (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
            //   成立时,if中的条件不成立,继续向后执行
            // 要使上面条件2(即||后面的整个条件)成立,firstTask要为null,因为线程池处于
            // SHUTDOWN状态时是不接受新任务的(firstTask不为null时,表示传进来的是新任务)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                // 内层循环:检查workerCount
                int wc = workerCountOf(c);
                // 判断wc是否超过对应线程数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false; 
                // 以CAS方式将ctl.value加1   
                if (compareAndIncrementWorkerCount(c))
                    // 加1成功则跳出整个循环
                    break retry;
                // 加1失败,说明ctl.value被其他线程"抢先"修改,获取最新的ctl.value
                c = ctl.get(); 
                // 检查线程池运行状态
                if (runStateOf(c) != rs)
                    // 成功说明加1失败是线程池运行状态被修改,跳至外层循环,处理线程池运行状态
                    continue retry;
                // 到这里说明线程池运行状态不变,而是workerCount改变,继续内层循环,处理workerCount
            }
        }
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 创建Worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            // 若自定义了一个TF,重写的newThread返回null,用
            // 该TF实例化TPE,调用execute会执行到这里时t为null
            if (t != null) { // 标记a
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // 重新获取runState
                    int rs = runStateOf(ctl.get());
                    // 跳出上面循环体后,在获取到锁之前runState可能改变,这里重新检查
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { // 标记b
                        // 若自定义了一个TF,重写的newThread中启动
                        // 了创建的线程且正在运行,则这里条件成立
                        if (t.isAlive()) // 标记c
                            throw new IllegalThreadStateException();
                        // 将新建的Worker添加到线程池中
                        workers.add(w);
                        // 更新largestPoolSize
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        // workerAdded为true表示添加成功
                        workerAdded = true; 
                    }
                } finally {
                    mainLock.unlock();
                }
                // 添加成功,启动线程
                if (workerAdded) {
                    // 会调用到Worker.run方法
                    t.start();
                    // workerStarted为true表示启动成功
                    workerStarted = true;
                }
            }
        } finally {
            // 上面标记a、标记b处不成立,标记c处
            // 成立时都会导致workerStarted为false
            if (! workerStarted)
                // 回滚Worker的添加过程
                addWorkerFailed(w);
        }
        return workerStarted;
    }
2.8 addWorkerFailed
    private void addWorkerFailed(Worker w) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (w != null)
                // 将w从线程池中移除
                workers.remove(w);
            // 以CAS+重试的方式将ctl.value减1,直到成功为止    
            decrementWorkerCount();
            // 尝试终止(后面会介绍)
            tryTerminate();
        } finally {
            mainLock.unlock();
        }
    }   
2.9 runWorker
    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        // 标记a
        // w的初始状态为-1,调用unlock将
        // 状态设置为0,表示线程已启动
        w.unlock(); 
        // 标记b
        // completedAbruptly用来标识执行任务时是否发生异常
        boolean completedAbruptly = true;
        try {
            // task不为null则先执行自己的task
            // 之后不断调用getTask()从队列中获取任务,直到返回null为止
            while (task != null || (task = getTask()) != null) {
                // 将w的状态从0设置为1,表示正在执行任务
                w.lock();
                // 这里的逻辑是要保证:
                //   当线程池处于STOP状态时,线程的中断标志位要被设置,
                //   否则要确保中断标志位被清除,以免影响后面任务的执行。
                //   在执行Thread.interrupted期间,其他线程可能调用了
                //   shutdownNow,因此调用runStateAtLeast重新进行检查
                // 标记c
                if ((runStateAtLeast(ctl.get(), STOP) || // 标记d
                     (Thread.interrupted() && // 标记e
                      runStateAtLeast(ctl.get(), STOP))) && // 标记f
                    !wt.isInterrupted())  // 标记g
                    wt.interrupt(); // 标记h
                try {
                    // 钩子方法
                    // 若该方法抛出异常,就不会将completedAbruptly更改为false,这种情况
                    // 下,processWorkerExit中传入的completedAbruptly为true,表示发生异常
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 执行任务
                        // 直接调用run方法
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 钩子方法
                        // 将task.run抛出的异常信息通过thrown传入afterExecute
                        // 进行处理,该方法中也可能抛出异常,会导致线程退出
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // w完成的任务数+1
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // completedAbruptly为false表示未抛出异常
            completedAbruptly = false;
        } finally {
            // 退出Worker
            processWorkerExit(w, completedAbruptly);
        }
    }

(1)标记c处

  • 例1
    假设有两个线程thread0、thread1,若thread0执行到标记a处时,thread1调用了shutdownNow并执行完毕,之后thread0会执行标记d(true)、标记g(true)、标记h
  • 例2
    假设有两个线程thread0、thread1,若thread0执行到标记b处时,thread1调用了shutdownNow并执行完毕,之后thread0会执行标记d(false)、标记e(true)、标记 f(true)、标记g(true)、标记h
  • 例3
    假设有两个线程thread0、thread1,若thread0执行到标记b处时,thread1调用了shutdown并执行完毕,之后thread0会执行标记d(false)、标记e(true)、标记f(false)
2.10 getTask
    private Runnable getTask() {
        // timedOut用来标记是否超时
        boolean timedOut = false;
        for (;;) {
            // 获取最新的ctl.value
            int c = ctl.get();
            int rs = runStateOf(c);
            // 整个if中的条件可转化为:
            //   rs >= STOP || (rs == SHUTDOWN && workQueue.isEmpty())
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // 以CAS+重试的方式将ctl.value减,直到成功为止
                decrementWorkerCount();
                // 不再处理任务,返回null
                return null;
            }
            int wc = workerCountOf(c);
            // 当外部调用了allowCoreThreadTimeOut(true)或wc超过核心线程数时,timed为true
            // 对于条件2,当核心线程(即通过addWorker(command, true)创建的线程)先到这里且
            // wc > corePoolSize和下面的(wc > 1 || workQueue.isEmpty())成立,那么退出的
            // 将是核心线程,所以说核心线程和非核心线程并没有区别,只是逻辑上的叫法不同,
            // 仅通过数目对二者进行限制,而不存在与核心线程或非核心线程对应的明确的类型
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            // 对各个条件进行解释:
            //   条件1:
            //     条件1.1:当外部调用setMaximumPoolSize方法后,可能使wc > maximumPoolSize成立
            //     条件1.2:timed由上一行语句决定,timedOut在下面进行设置
            //   条件2:
            //     条件2.1 :wc > 1表示若目前线程池中的线程数超过1个,则当前线程就可以退出(即
            //              只要线程池中还有一个线程,那么当前线程就可以退出,不管队列是否为空)
            //     条件2.2:只要队列为空就可以退出
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { 
                // 标记a
                // 以CAS方式尝试将ctl.value减1,可能会失败
                if (compareAndDecrementWorkerCount(c)) // 标记b
                    return null; // 成功   标记c
                // 失败后继续循环判断
                continue;
            }

            try {
                // 从队列中获取任务
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 阻塞指定时长
                    workQueue.take(); // 无限期阻塞
                if (r != null)
                    return r;
                // 到这里说明r为null,调用了上面的poll,且在keepAliveTime时长内未获取到任务r就为null
                // timedOut为true表示等待任务超时
                timedOut = true;
            } catch (InterruptedException retry) {
                // 若调用了poll进入阻塞状态且在keepAliveTime内
                // 被中断或调用了take在阻塞时被中断都会来到这里
                // 在这里将timedOut还原为false
                timedOut = false;
            }
        }
    }

(1)execute的标记c处

  • 例1 (初始:corePoolSize为1、allowCoreThreadTimeOut为true)
    假设有两个线程thread0、thread1,thread0先调用execute,最终会执行到getTask的标记a处,之后thread1调用execute,会执行到execute的标记a处,之后thread0继续执行标记b、标记c,之后thread1执行到标记c处时条件会成立(因为thread0在标记b处将workerCount减1后,workerCount变为0),之后执行标记d处也会添加Worker成功
2.11 processWorkerExit
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        // completedAbruptly为true表示发生异常
        // 创建TPE的子类,重写beforeExecute或afterExecute,并在重写的方法中抛出异常,
        // 这里的completedAbruptly就为true,直接在下面标记e处添加新的Woker
        if (completedAbruptly) 
            // 以CAS+重试的方式将ctl.value减1,直到成功为止    
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 累加完成的任务数
            completedTaskCount += w.completedTasks;
            // 将w从线程池中移除
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate(); // 标记a
        // 获取最新的ctl.value
        int c = ctl.get();
        // runState为RUNNING或SHUTDOWN状态时条件成立
        if (runStateLessThan(c, STOP)) { // 标记b
            // 未发生异常
            if (!completedAbruptly) { // 标记c
                //可以说min是当前线程池可持有的最少线程数量
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    // 队列不为空时至少得有1个Worker线程
                    min = 1;
                // 当前workerCount不小于min则无需添加Worker   
                if (workerCountOf(c) >= min) // 标记d
                    return; 
            }
            // 若发生异常或workerCount小于min,则添加一个新的Worker
            addWorker(null, false); // 标记e
        }
    }   

(1)针对processWorkerExit方法的两个例子(初始:corePoolSize为2)

  • 例1
    假设有两个线程thread0、thread1,若thread0执行到runWorker的标记a处时,thread1调用了shutdown并执行完毕,线程池的状态被改为SHUTDOWN,之后getTask会返回null,thread0会执行到processWorkerExit的标记a处时会将线程池的状态从SHUTDOWN改为TERMINATED,之后执行到标记c处时条件为false,方法结束
  • 例2
    假设有三个线程thread0、thread1、thread2,若thread0、thread1执行到runWorker的标记a处时,thread1调用了shutdown并执行完毕,线程池的状态被改为SHUTDOWN,之后thread0先执行进入processWorkerExit中,会执行标记b(true)、标记c(true)、标记d(false),标记e
2.12 shutdown
    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 权限检查
            checkShutdownAccess();
            // 若runState < SHUTDOWN,则将runState改为SHUTDOWN,否则runState不变
            advanceRunState(SHUTDOWN);
            // 中断所有已启动但空闲着的线程,即中断所有状态(也就是
            // AQS.state的值)为0的Woker线程,使多余的空闲线程迅速退出
            interruptIdleWorkers();
            // 为ScheduledThreadPoolExecutor设置的钩子方法
            onShutdown(); 
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
    }
2.13 shutdownNow
    public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            // 权限检查
            checkShutdownAccess();
            // 若runState < STOP,则将runState改为STOP,否则runState不变
            advanceRunState(STOP);
            // 中断所有已启动线程,即中断所有状态
            // (也就是AQS.state的值)为0或1的Woker线程
            interruptWorkers();
            // 将队列中的任务移除,返回给外部
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        // 尝试终止
        tryTerminate();
        return tasks;
    }
2.14 tryTerminate
    // 该方法负责将线程池的状态设置为TERMINATED
    // 该方法必须在所有可能导致线程池终止的方法中调用,如addWorkerFailed、processWorkerExit、
    // shutdown、shutdownNow等方法中都会调用tryTerminate尝试终止线程池
    final void tryTerminate() {
        for (;;) {
            // 获取最新的ctl.value
            int c = ctl.get();
            if (isRunning(c) || // 标记a
                runStateAtLeast(c, TIDYING) || // 标记b
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 标记c
                return;
            // workerCount不为0
            if (workerCountOf(c) != 0) { // 标记d
                // 这里仅中断一个空闲的Worker线程,防止所有线程都处于阻塞状
                // 态,被中断的线程也会在退出时调用tryTerminate传播终止信号
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            // 到这里有两种情况:
            //   情况1:runState是SHUTDOWN、且队列为空、且workerCount是0
            //   情况2:runState是STOP、且workerCount为0
            final ReentrantLock mainLock = this.mainLock;
            // 标记e处要调用signalAll(调用signalAll的线程必须持有锁),所以这里必须上锁
            mainLock.lock();
            try {
                // 将ctl.value设置为TIDYING,线程池进入TIDYING状态
                // 因为有锁,所以各个线程会轮流到这里,但只有第一个到这里的会执行成功
                // (这里就是复用了一下compareAndSet的代码,并不是为了保证原子性)
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        // 钩子方法,交由子类实现
                        terminated();
                    } finally {
                        // 将ctl.value设置为TERMINATED,线程池进入TERMINATED状态
                        // 只有一个线程会到这里,直接设置即可
                        ctl.set(ctlOf(TERMINATED, 0));
                        // "唤醒"所有调用了awaitTermination等待线程池进入TERMINATED状态的线程
                        termination.signalAll(); // 标记e
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // CAS失败说明其他线程已成功将线程池设置为TIDYING,当前线程会在标记b处跳出
        }
    }   

(1)标记a处

  • 例1 (初始:corePoolSize为1,allowCoreThreadTimeOut为true)
    假设有1个线程thread0,调用execute后最终会通过processWorkerExit执行到tryTerminate的标记a处,此时条件成立

(2)标记b处

  • 例1
    说明其他线程已经将runState设置为TIDYING或TERMINATED,当前线程就没必要再进行设置了,return即可

(3)标记c处

  • 例1 (初始:corePoolSize为1,allowCoreThreadTimeOut为true)
    假设有3个线程thread0、thread1、thread2,thread0先调用execute执行到getTask的标记a处,之后thread1调用execute,执行到它的标记a处,之后thread0执行getTask的标记b、标记c。此时workerCount为0,队列中有1个任务。之后thread2调用shutdown执行到tryTerminate的标记c处,runState为SHUTDOWN且队列不为空,条件成立

(4)标记d处

  • 例1 (初始:corePoolSize为1,allowCoreThreadTimeOut为true)
    假设两个线程thread0、thread1,若thread0调用execute执行到getTask的标记a处,之后thread1调用shutdown执行到tryTerminate的标记d处,此时条件成立
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,098评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,213评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,960评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,519评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,512评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,533评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,914评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,574评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,804评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,563评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,644评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,350评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,933评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,908评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,146评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,847评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,361评论 2 342

推荐阅读更多精彩内容