Java提供的线程池(即ThreadPoolExecutor类)实现了线程的创建和管理、任务的调度与执行,因为减少了线程的创建、销毁等过程,所以当执行大量异步任务时,线程池可以提供更好的性能。本文将从源码角度对线程池的实现原理进行分析,如有必要,会在源码中添加标记,并对标记处的条件通过举例的形式进行复现,留作参考。
1. 线程池状态及转换
1.1 五种线程池状态
(1)RUNNING:接受新任务且会处理队列中的任务
(2)SHUTDOWN:不接受新任务但会处理队列中的任务
(3)STOP:不接受新任务,也不处理队列中的任务,并且会中断正在执行的任务
(4)TIDYING:处于SHUTDOWN状态时,队列为空且池为空或处于STOP状态时,池为空,都会切换到TIDYING状态
(5)TERMINATED:处于TIDYING状态时,调用terminated()方法并执行完毕后,会切换到TERMINATED状态
1.2 五种线程池状态的转换
从图中可以看出,当调用了shutdown()后,线程池不再接受新任务,在处理完队列中的任务且池中所有线程都退出后,会切换为TIDYING状态,执行完terminated()后会切换为TERMINATED状态;当调用了shutdownNow()后,线程池不再接受新任务,队列中的任务会移交到外部,池中所有线程退出后,会切换为TIDYING状态,执行完terminated()后会切换为TERMINATED状态。总之,调用shutdown()或shutdownNow()后,经过一系列处理,线程池最终会被终止。
2. ThreadPoolExecutor核心源码分析
2.1 简写说明
- TF:ThreadFactory
- TPE:ThreadPoolExecutor
- REH:RejectedExecutionHandler
- AQS:AbstractQueuedSynchronizer
2.2 TPE.Worker
Worker是TPE的内部类。Worker实现了Runnable,在构造方法中,Worker会与Thread进行绑定。Worker就是用于执行任务的工作线程,它会先执行自己的初始任务,之后会从阻塞队列中获取任务来执行。
Worker继承了AQS,它会使用AQS中的一些字段和方法,也重写了AQS中的一些方法,简化了自身加锁和解锁的过程。TPE.runWorker方法中的的加锁和解锁操作不是因为线程竞争(w.lock()和w.unlock()之间根本不存在竞争),而是为了对中断行为进行控制。Worker有三种状态(即AQS.state有三个值):
- -1:表示Worker对应的线程未启动
- 0:表示Worker对应的线程已启动,但正在等待着获取任务,处于空闲状态
- 1:表示Worker对应的线程已启动且正在执行任务
当调用TPE.shutdown时,若状态为0,线程会被中断;当调用TPE.shutdownNow时,若状态为0或1,线程会被中断;当状态为-1时线程不可被中断。这些状态通过Worker.lock和Worker.unlock进行切换。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 与当前Worker绑定的线程
final Thread thread;
// 每个Runnable就是一个任务
// firstTask是Worker的初始任务,可能为null(后面会详细介绍)
Runnable firstTask;
// 记录当前Worker执行的任务数
volatile long completedTasks;
Worker(Runnable firstTask) {
// 初始状态为-1,表示未启动,不能被中断
setState(-1);
this.firstTask = firstTask;
// 用线程工厂创建线程(与this.thread = new Thread(this);类似)
this.thread = getThreadFactory().newThread(this);
}
// 在addWorker中会创建Worker,并调用
// Worker.thread.start,最终会调用到该run方法
public void run() { runWorker(this); }
// 将AQS.state设置为1
public void lock() {...}
// 将AQS.state设置为0
public void unlock() {...}
// 若Worker的状态(即AQS.state的值)为0或1,则对
// Worker对应的线程(即this.thread)的中断标志位进行设置
// 调用TPE.shutdownNow时,会调用到该方法
void interruptIfStarted() {...}
// 其他(略)
}
2.3 runState、workerCount相关字段
// ctl(control的缩写)是AtomicInteger类型,因此其大小与int类型一致,其
// 初始值(即ctl.value的初始值)是 1110 0000 0000 0000 0000 0000 0000 0000
// ctl包含两部分信息:
// 高3位:线程池运行状态(runState)
// 低29位:有效线程个数(workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// COUNT_BITS为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// CAPACITY为 2^29 -1
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 线程池运行状态,可看出5种状态高3位均不相同,且从小到大单调递增,因为
// 高位决定大小,所以后面会看到源码中会通过比较大小确定线程池的运行状态
// RUNNING为 1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
// SHUTDOWN为 0000 0000 0000 0000 0000 0000 0000 0000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// STOP为 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// TIDYING为 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// TERMINATED为 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
2.4 runState、workerCount相关辅助方法
(1)截取、拼接ctl.value
// 从c(c是某个时刻的ctl.value)中"截取"runState
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 从c中"截取"workerCount
private static int workerCountOf(int c) { return c & CAPACITY; }
// 将rs和ws"拼接"成ctl
private static int ctlOf(int rs, int wc) { return rs | wc; }
(2)判断runState
// 通过比较大小检查runState
private static boolean runStateLessThan(int c, int s) { return c < s; }
private static boolean runStateAtLeast(int c, int s) { return c >= s; }
// c < SHUTDOWN说明runStateOf(c)是RUNNING
private static boolean isRunning(int c) { return c < SHUTDOWN; }
(3)修改workerCount
// 以CAS方式将ctl.value加1(expect是某个时刻ctl.value的值)
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
// 以CAS方式将ctl.value减1(expect是某个时刻ctl.value的值)
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
// 以CAS方式将ctl.value减1,直到成功为止
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
2.5 核心字段
// 阻塞队列
// 线程池中的线程数超过核心线程数时,传入的Runnable会存入该队列
// 为了兼容特殊用途的队列,源码中仅用workQueue.isEmpty判断队列是否为空
private final BlockingQueue<Runnable> workQueue;
// 只要涉及到workers的操作,都要加锁
private final ReentrantLock mainLock = new ReentrantLock();
// workers就是存着所有Worker的线程池
private final HashSet<Worker> workers = new HashSet<Worker>();
// 外部可能会调用TPE.awaitTermination等待线程池进入TERMINATED状态,
// 在TPE.tryTerminate中会调用termination.signalAll唤醒这些等待着的线程
private final Condition termination = mainLock.newCondition();
// largestPoolSize记录了线程池中的线程数峰值(即Worker数的最大值)
// 操作该字段时也需要mainLock
private int largestPoolSize;
// 每个Worker线程退出时会将自己执行的任务数(即Worker.
// completedTasks字段的值)累加到completedTaskCount中
// 操作该字段时也需要mainLock
private long completedTaskCount;
// 下面6个字段是用户可设置的,都用volatile修饰来保证可见性
// 线程工厂(默认为Executors.DefaultThreadFactory)
private volatile ThreadFactory threadFactory;
// 拒绝策略的handler(默认为AbortPolicy)
// Java预定义了4种:
// AbortPolicy:抛异常
// CallerRunsPolicy:若线程池仍处于RUNNING状态,则将任务交给调用execute的外部线程
// (这将减慢新任务的提交速率,算是一种简单的反馈机制)
// DiscardPolicy:直接丢弃(DiscardPolicy.rejectedExecution是空实现)
// DiscardOldestPolicy:若线程池仍处于RUNNING状态,则删掉队列中的最老的(即第一个)任务,
// 重新执行execute,可能会再次失败,但会不断重复
private volatile RejectedExecutionHandler handler;
// 等待任务的空闲线程的超时时长
// 当超过核心线程数或allowCoreThreadTimeOut为
// true时,空闲线程等待新任务的时长是keepAliveTime
private volatile long keepAliveTime;
// 为false时,核心线程会无限期等待新的任务
// 为true时,核心线程等待新任务的时长是keepAliveTime
private volatile boolean allowCoreThreadTimeOut;
// 核心线程数(线程池中可以保持存活的最小线程个数,
// 当allowCoreThreadTimeOut为true时,最小值为0)
private volatile int corePoolSize;
// 最大线程数
// 虽然外部最大可将maximumPoolSize设置为Integer.MAX_VALUE,
// 但源码中会进行判断,不允许超过CAPACITY(即2^29-1)
private volatile int maximumPoolSize;
2.6 execute
// 一个Runnable就是一个任务
public void execute(Runnable command) {
// command为null,则抛异常
if (command == null)
throw new NullPointerException();
// 获取最新的ctl.value
int c = ctl.get();
// step1:添加核心线程
// (其实核心线程和非核心线程只是逻辑上的一种叫法,后面会看到,
// 二者是用数值进行区分的,而不是类型,所以二者实际没有差别)
// 判断workerCount是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加Worker
// 传入true,则方法里会以corePoolSize作为判断依据
if (addWorker(command, true)) // 标记e
// 添加成功,直接返回
return;
// 到这里说明添加失败,ctl.value可能已经改变,重新获取
c = ctl.get();
}
// step2:将任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
// 标记a
// 入队期间,线程状态可能已经改变,重新获取ctl.value进行检查
int recheck = ctl.get();
// 若线程状态改变,则移除command,并执行拒绝策略
if (! isRunning(recheck) && remove(command)) // 标记b
reject(command);
// 若workerCount为0,添加一个Worker
else if (workerCountOf(recheck) == 0) // 标记c
addWorker(null, false); // 标记d
}
// step3:添加非核心线程
// 传入false,则方法里会以maximumPoolSize作为判断依据
else if (!addWorker(command, false)) // 标记f
// step4:添加失败,执行拒绝策略
reject(command);
}
(1)标记b处
- 例1
假设有两个Worker线程thread0、thread1,若thread0执行到标记a处时,thread1调用了shutdown方法并执行完毕,之后thread0执行到标记b处时两个条件均为true - 例2
将上面例子中的shutdown改为shutdownNow,其他不变,thread0执行到标记b处时条件1为true,条件2为false(因为shutdownNow(shutdownNow的返回值类型为List<Runnable>)中会将队列中的任务移除,返回给外部,所以调用remove时,队列为空,因此条件2会返回false)
(2)标记c处
- 例1
若自定义了一个TF,重写的newThread返回null,用该TF初始化TPE,调用execute会执行到这里且条件成立,不过因为重写的newThread返回null,所以标记d处仍会失败
2.7 addWorker
// execute中标记e和标记f两处调用addWorker时传入的firstTask
// 不为null,其余调用addWorker的地方传入的firstTask均为null。
// firstTask不为null时,Worker会先执行firstTask,再从队列中获取任务并执行;
// firstTask为null时,Worker直接从队列中获取任务并执行。
// Worker中firstTask字段存在的意义:
// 对标记a处:核心线程是不依赖队列的,若没有firstTask,任务就没有地方存
// 对标记b处:因为队列已满,若没有firstTask,任务也没地方存
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
// 外层循环:检查线程池运行状态
int rs = runStateOf(c);
// 为便于理解,将if中的整个条件取反:
// 即 当 rs == RUNNING ||
// (rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())
// 成立时,if中的条件不成立,继续向后执行
// 要使上面条件2(即||后面的整个条件)成立,firstTask要为null,因为线程池处于
// SHUTDOWN状态时是不接受新任务的(firstTask不为null时,表示传进来的是新任务)
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 内层循环:检查workerCount
int wc = workerCountOf(c);
// 判断wc是否超过对应线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 以CAS方式将ctl.value加1
if (compareAndIncrementWorkerCount(c))
// 加1成功则跳出整个循环
break retry;
// 加1失败,说明ctl.value被其他线程"抢先"修改,获取最新的ctl.value
c = ctl.get();
// 检查线程池运行状态
if (runStateOf(c) != rs)
// 成功说明加1失败是线程池运行状态被修改,跳至外层循环,处理线程池运行状态
continue retry;
// 到这里说明线程池运行状态不变,而是workerCount改变,继续内层循环,处理workerCount
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 创建Worker
w = new Worker(firstTask);
final Thread t = w.thread;
// 若自定义了一个TF,重写的newThread返回null,用
// 该TF实例化TPE,调用execute会执行到这里时t为null
if (t != null) { // 标记a
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 重新获取runState
int rs = runStateOf(ctl.get());
// 跳出上面循环体后,在获取到锁之前runState可能改变,这里重新检查
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // 标记b
// 若自定义了一个TF,重写的newThread中启动
// 了创建的线程且正在运行,则这里条件成立
if (t.isAlive()) // 标记c
throw new IllegalThreadStateException();
// 将新建的Worker添加到线程池中
workers.add(w);
// 更新largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
// workerAdded为true表示添加成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 添加成功,启动线程
if (workerAdded) {
// 会调用到Worker.run方法
t.start();
// workerStarted为true表示启动成功
workerStarted = true;
}
}
} finally {
// 上面标记a、标记b处不成立,标记c处
// 成立时都会导致workerStarted为false
if (! workerStarted)
// 回滚Worker的添加过程
addWorkerFailed(w);
}
return workerStarted;
}
2.8 addWorkerFailed
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
// 将w从线程池中移除
workers.remove(w);
// 以CAS+重试的方式将ctl.value减1,直到成功为止
decrementWorkerCount();
// 尝试终止(后面会介绍)
tryTerminate();
} finally {
mainLock.unlock();
}
}
2.9 runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 标记a
// w的初始状态为-1,调用unlock将
// 状态设置为0,表示线程已启动
w.unlock();
// 标记b
// completedAbruptly用来标识执行任务时是否发生异常
boolean completedAbruptly = true;
try {
// task不为null则先执行自己的task
// 之后不断调用getTask()从队列中获取任务,直到返回null为止
while (task != null || (task = getTask()) != null) {
// 将w的状态从0设置为1,表示正在执行任务
w.lock();
// 这里的逻辑是要保证:
// 当线程池处于STOP状态时,线程的中断标志位要被设置,
// 否则要确保中断标志位被清除,以免影响后面任务的执行。
// 在执行Thread.interrupted期间,其他线程可能调用了
// shutdownNow,因此调用runStateAtLeast重新进行检查
// 标记c
if ((runStateAtLeast(ctl.get(), STOP) || // 标记d
(Thread.interrupted() && // 标记e
runStateAtLeast(ctl.get(), STOP))) && // 标记f
!wt.isInterrupted()) // 标记g
wt.interrupt(); // 标记h
try {
// 钩子方法
// 若该方法抛出异常,就不会将completedAbruptly更改为false,这种情况
// 下,processWorkerExit中传入的completedAbruptly为true,表示发生异常
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
// 直接调用run方法
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 钩子方法
// 将task.run抛出的异常信息通过thrown传入afterExecute
// 进行处理,该方法中也可能抛出异常,会导致线程退出
afterExecute(task, thrown);
}
} finally {
task = null;
// w完成的任务数+1
w.completedTasks++;
w.unlock();
}
}
// completedAbruptly为false表示未抛出异常
completedAbruptly = false;
} finally {
// 退出Worker
processWorkerExit(w, completedAbruptly);
}
}
(1)标记c处
- 例1
假设有两个线程thread0、thread1,若thread0执行到标记a处时,thread1调用了shutdownNow并执行完毕,之后thread0会执行标记d(true)、标记g(true)、标记h - 例2
假设有两个线程thread0、thread1,若thread0执行到标记b处时,thread1调用了shutdownNow并执行完毕,之后thread0会执行标记d(false)、标记e(true)、标记 f(true)、标记g(true)、标记h - 例3
假设有两个线程thread0、thread1,若thread0执行到标记b处时,thread1调用了shutdown并执行完毕,之后thread0会执行标记d(false)、标记e(true)、标记f(false)
2.10 getTask
private Runnable getTask() {
// timedOut用来标记是否超时
boolean timedOut = false;
for (;;) {
// 获取最新的ctl.value
int c = ctl.get();
int rs = runStateOf(c);
// 整个if中的条件可转化为:
// rs >= STOP || (rs == SHUTDOWN && workQueue.isEmpty())
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 以CAS+重试的方式将ctl.value减,直到成功为止
decrementWorkerCount();
// 不再处理任务,返回null
return null;
}
int wc = workerCountOf(c);
// 当外部调用了allowCoreThreadTimeOut(true)或wc超过核心线程数时,timed为true
// 对于条件2,当核心线程(即通过addWorker(command, true)创建的线程)先到这里且
// wc > corePoolSize和下面的(wc > 1 || workQueue.isEmpty())成立,那么退出的
// 将是核心线程,所以说核心线程和非核心线程并没有区别,只是逻辑上的叫法不同,
// 仅通过数目对二者进行限制,而不存在与核心线程或非核心线程对应的明确的类型
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 对各个条件进行解释:
// 条件1:
// 条件1.1:当外部调用setMaximumPoolSize方法后,可能使wc > maximumPoolSize成立
// 条件1.2:timed由上一行语句决定,timedOut在下面进行设置
// 条件2:
// 条件2.1 :wc > 1表示若目前线程池中的线程数超过1个,则当前线程就可以退出(即
// 只要线程池中还有一个线程,那么当前线程就可以退出,不管队列是否为空)
// 条件2.2:只要队列为空就可以退出
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 标记a
// 以CAS方式尝试将ctl.value减1,可能会失败
if (compareAndDecrementWorkerCount(c)) // 标记b
return null; // 成功 标记c
// 失败后继续循环判断
continue;
}
try {
// 从队列中获取任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 阻塞指定时长
workQueue.take(); // 无限期阻塞
if (r != null)
return r;
// 到这里说明r为null,调用了上面的poll,且在keepAliveTime时长内未获取到任务r就为null
// timedOut为true表示等待任务超时
timedOut = true;
} catch (InterruptedException retry) {
// 若调用了poll进入阻塞状态且在keepAliveTime内
// 被中断或调用了take在阻塞时被中断都会来到这里
// 在这里将timedOut还原为false
timedOut = false;
}
}
}
(1)execute的标记c处
- 例1 (初始:corePoolSize为1、allowCoreThreadTimeOut为true)
假设有两个线程thread0、thread1,thread0先调用execute,最终会执行到getTask的标记a处,之后thread1调用execute,会执行到execute的标记a处,之后thread0继续执行标记b、标记c,之后thread1执行到标记c处时条件会成立(因为thread0在标记b处将workerCount减1后,workerCount变为0),之后执行标记d处也会添加Worker成功
2.11 processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly为true表示发生异常
// 创建TPE的子类,重写beforeExecute或afterExecute,并在重写的方法中抛出异常,
// 这里的completedAbruptly就为true,直接在下面标记e处添加新的Woker
if (completedAbruptly)
// 以CAS+重试的方式将ctl.value减1,直到成功为止
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加完成的任务数
completedTaskCount += w.completedTasks;
// 将w从线程池中移除
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试终止
tryTerminate(); // 标记a
// 获取最新的ctl.value
int c = ctl.get();
// runState为RUNNING或SHUTDOWN状态时条件成立
if (runStateLessThan(c, STOP)) { // 标记b
// 未发生异常
if (!completedAbruptly) { // 标记c
//可以说min是当前线程池可持有的最少线程数量
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
// 队列不为空时至少得有1个Worker线程
min = 1;
// 当前workerCount不小于min则无需添加Worker
if (workerCountOf(c) >= min) // 标记d
return;
}
// 若发生异常或workerCount小于min,则添加一个新的Worker
addWorker(null, false); // 标记e
}
}
(1)针对processWorkerExit方法的两个例子(初始:corePoolSize为2)
- 例1
假设有两个线程thread0、thread1,若thread0执行到runWorker的标记a处时,thread1调用了shutdown并执行完毕,线程池的状态被改为SHUTDOWN,之后getTask会返回null,thread0会执行到processWorkerExit的标记a处时会将线程池的状态从SHUTDOWN改为TERMINATED,之后执行到标记c处时条件为false,方法结束 - 例2
假设有三个线程thread0、thread1、thread2,若thread0、thread1执行到runWorker的标记a处时,thread1调用了shutdown并执行完毕,线程池的状态被改为SHUTDOWN,之后thread0先执行进入processWorkerExit中,会执行标记b(true)、标记c(true)、标记d(false),标记e
2.12 shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 若runState < SHUTDOWN,则将runState改为SHUTDOWN,否则runState不变
advanceRunState(SHUTDOWN);
// 中断所有已启动但空闲着的线程,即中断所有状态(也就是
// AQS.state的值)为0的Woker线程,使多余的空闲线程迅速退出
interruptIdleWorkers();
// 为ScheduledThreadPoolExecutor设置的钩子方法
onShutdown();
} finally {
mainLock.unlock();
}
// 尝试终止
tryTerminate();
}
2.13 shutdownNow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 权限检查
checkShutdownAccess();
// 若runState < STOP,则将runState改为STOP,否则runState不变
advanceRunState(STOP);
// 中断所有已启动线程,即中断所有状态
// (也就是AQS.state的值)为0或1的Woker线程
interruptWorkers();
// 将队列中的任务移除,返回给外部
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试终止
tryTerminate();
return tasks;
}
2.14 tryTerminate
// 该方法负责将线程池的状态设置为TERMINATED
// 该方法必须在所有可能导致线程池终止的方法中调用,如addWorkerFailed、processWorkerExit、
// shutdown、shutdownNow等方法中都会调用tryTerminate尝试终止线程池
final void tryTerminate() {
for (;;) {
// 获取最新的ctl.value
int c = ctl.get();
if (isRunning(c) || // 标记a
runStateAtLeast(c, TIDYING) || // 标记b
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 标记c
return;
// workerCount不为0
if (workerCountOf(c) != 0) { // 标记d
// 这里仅中断一个空闲的Worker线程,防止所有线程都处于阻塞状
// 态,被中断的线程也会在退出时调用tryTerminate传播终止信号
interruptIdleWorkers(ONLY_ONE);
return;
}
// 到这里有两种情况:
// 情况1:runState是SHUTDOWN、且队列为空、且workerCount是0
// 情况2:runState是STOP、且workerCount为0
final ReentrantLock mainLock = this.mainLock;
// 标记e处要调用signalAll(调用signalAll的线程必须持有锁),所以这里必须上锁
mainLock.lock();
try {
// 将ctl.value设置为TIDYING,线程池进入TIDYING状态
// 因为有锁,所以各个线程会轮流到这里,但只有第一个到这里的会执行成功
// (这里就是复用了一下compareAndSet的代码,并不是为了保证原子性)
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 钩子方法,交由子类实现
terminated();
} finally {
// 将ctl.value设置为TERMINATED,线程池进入TERMINATED状态
// 只有一个线程会到这里,直接设置即可
ctl.set(ctlOf(TERMINATED, 0));
// "唤醒"所有调用了awaitTermination等待线程池进入TERMINATED状态的线程
termination.signalAll(); // 标记e
}
return;
}
} finally {
mainLock.unlock();
}
// CAS失败说明其他线程已成功将线程池设置为TIDYING,当前线程会在标记b处跳出
}
}
(1)标记a处
- 例1 (初始:corePoolSize为1,allowCoreThreadTimeOut为true)
假设有1个线程thread0,调用execute后最终会通过processWorkerExit执行到tryTerminate的标记a处,此时条件成立
(2)标记b处
- 例1
说明其他线程已经将runState设置为TIDYING或TERMINATED,当前线程就没必要再进行设置了,return即可
(3)标记c处
- 例1 (初始:corePoolSize为1,allowCoreThreadTimeOut为true)
假设有3个线程thread0、thread1、thread2,thread0先调用execute执行到getTask的标记a处,之后thread1调用execute,执行到它的标记a处,之后thread0执行getTask的标记b、标记c。此时workerCount为0,队列中有1个任务。之后thread2调用shutdown执行到tryTerminate的标记c处,runState为SHUTDOWN且队列不为空,条件成立
(4)标记d处
- 例1 (初始:corePoolSize为1,allowCoreThreadTimeOut为true)
假设两个线程thread0、thread1,若thread0调用execute执行到getTask的标记a处,之后thread1调用shutdown执行到tryTerminate的标记d处,此时条件成立