阅读源码前需要大概了解的基础知识
Java8 源码阅读 - AbstractQueuedSynchronizer
Java8源码阅读 - FutureTask
Java8源码阅读 - Executor、ExecutorService、ExecutorCompletionService
特性
线程池核心所解决的问题就是提供了一种绑定和资源管理的方式,通过控制一些参数达到优化执行每个任务的开销,通常在执行大量异步任务时有明显的性能提升,线程池还维护了一些基本的统计信息,帮助来了解池中的基本状态;
ThreadPoolExecutor提供了许多可调参数和可扩展的钩子来应对在不同场景下的广泛应用;
corePoolSize和maximumPoolSize
当在方法execute
中提交一个新任务,并且运行的线程小于corePoolSize
时,即使其他工作线程处于空闲状态,将创建一个新线程来处理请求新的任务。如果有超过corePoolSize
但小于maximumPoolSize
的线程在运行,则只有在工作队列已满时才会创建新线程。通过将corePoolSize
和maximumPoolSize
设置为相同达到创建一个固定大小的线程池。通过将maximumPoolSize
设置为一个没有上限的值,例如Integer.MAX_VALUE
,则允许池容纳任意数量的并发任务。corePoolSize
和maximumPoolSize
仅在构建时设置,但也可以使用setCorePoolSize
和setMaximumPoolSize
动态更改它们;默认情况下,即使是核心线程也只是在新任务到达时才被创建和启动,但是可以使用方法
prestartCoreThread
或prestartAllCoreThreads
动态地覆盖这一点,如果使用非空队列构造池,则可能需要预启动线程。新线程是使用
ThreadFactory
创建的。如果没有另外指定,默认是Executors#defaultThreadFactory
,它创建的线程都位于相同的线程组中,并且具有相同的优先级和非守护状态。通过提供不同的ThreadFactory
,可以更改线程的名称、线程组、优先级、守护进程状态等。如果ThreadFactory#newThread
返回null则表示创建线程失败,执行程序可能将继续执行,但可能无法执行任何任务。Keep Alive Time
如果当前池中有超过corePoolSize
的线程,那么如果空闲时间超过keepAliveTime
,多余的线程将被终止。这提供了一种在线程池中未被积极使用时减少资源消耗的方法。如果线程池未来变得更加活跃时就会构造新的线程。默认情况下,该策略仅适用于拥有多于corePoolSize
线程的情况。Queuing
使用BlockingQueue
来传输和保存提交的任务。这个队列中如果运行的线程小于corePoolSize
,遇到新任务则总是希望添加新线程而不是排队。如果正在运行的任务大于corePoolSize
,执行程序总是希望对请求进行排队,而不是添加新线程。如果不能对请求进行排队,将创建一个新线程,除非该线程超过maximumPoolSize
,在这种情况下,任务将被拒绝。无界队列
当所有corePoolSize大小的线程都处于繁忙状态时,使用无界队列,例如没有定义大小的LinkedBlockingQueue
,将导致新任务在队列中等待。创建线程不会超过corePoolSize
的大小,因此设置maximumPoolSize
的值不会有任何效果。当每个任务完全独立于其他任务时,无界队列的用法可能是合适的,因为排队任务不会影响其他任务的执行;例如在web页面服务器中,尽管这种类型的排队在平滑瞬时请求爆发的状况下很有用,但在任务持续以快于处理任务的速度到达时,可能会出现无限制的工作队列增长。有界队列
有界队列(例如ArrayBlockingQueue
)在使用有限的maximumPoolSize
时有助于防止资源耗尽,但是调优和控制可能更困难。队列大小和最大池的大小可以相互交换,使用大队列和容量小的池可以最小化CPU的使用、操作系统资源和上下文切换开销,但是可能会导致更低的吞吐量。如果任务经常阻塞(例如大量I/O等待场景),系统可能会为比设置所允许的更多线程安排时间。使用小队列通常需要更大容量的池子,这会让cpu更忙,也会降低吞吐量。拒绝任务
当池子或者使用有限的工作队列达到饱和时,我们提供的任务会返回RejectedExecutionHandler
异常,ThreadPoolExecutor
提供4个内置处理策略;分别是AbortPolicy
,CallerRunsPolicy
,DiscardPolicy
和DiscardOldestPolicy
,当然也是允许自己扩展实现;钩子函数
ThreadPoolExecutor
提供了任务执行前beforeExecute
和任务执行后afterExecute
的钩子,当然还有terminated
来执行线程池关闭的一些特殊处理;Finalization
如果程序不再引用线程池,那么线程池可能会自动被关闭,如果想要确保忘记调用关闭也能回收未引用的池子,可以通过设置keepAliveTime
、使用allowCoreThreadTimeOut
或者设置0个核心线程corePoolSize
;
细节分析
构造器
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
构造函数设置一些初始的设置项,相关参数的含义上面都有,而平常使用较多的Executors.newFixedThreadPool
,默认使用的是无界队列LinkedBlockingQueue
;
线程池状态
线程池状态由一个原子整数ctl储存,包括了以下两个含义
- workerCount:表示有效线程运行的数目
- runState:表示线程池的运行状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
为了将两个状态塞进一个int类型,将workerCount
限制为上限是229-1(大约5亿)个线程,而不是231-1个;
runState的主要生命周期有以下几个:
- RUNNING:运行中,允许接受新任务并处理排队的任务;
- SHUTDOWN:不接受新任务,但处理排队的任务;
- STOP:不接受新任务、不处理排队的任务和尝试中断正在进行的任务;
- TIDYING:所有任务都已终止,
workerCount
为0,过渡到TIDYING状态并将运行terminate()
钩子方法; - TERMINATED:当
terminate()
方法运行完成;
// 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0000 0001 1101
private static final int COUNT_BITS = Integer.SIZE - 3;
// 0000 0000 0000 0000 0000 0000 0000 0000 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 1111 1111 1111 1111 1111 1111 1111 1111 1110 0000 0000 0000 0000 0000 0000 0000
private static final int RUNNING = -1 << COUNT_BITS;
// 0
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000 0010 0000 0000 0000 0000 0000 0000 0000
private static final int STOP = 1 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000 0100 0000 0000 0000 0000 0000 0000 0000
private static final int TIDYING = 2 << COUNT_BITS;
// 0000 0000 0000 0000 0000 0000 0000 0000 0110 0000 0000 0000 0000 0000 0000 0000
private static final int TERMINATED = 3 << COUNT_BITS;
状态之间的转换如下:
- RUNNING -> SHUTDOWN
调用shutdown()
后,可能会隐式地调用finalize()
; - (RUNNING or SHUTDOWN) -> STOP
调用shutdownNow()
后; - SHUTDOWN -> TIDYING
当等待队列和线程池是空的; - STOP -> TIDYING
当线程池是空的; - TIDYING -> TERMINATED
当terminate()
方法运行完成;
根据文档上的阐述,一个Integer
要表示两个含义,这里采用的方法就是在32位长的二进制中用低29位来表达workerCount
,剩下的高3位来代表runState
状态;
private static int ctlOf(int rs, int wc) { return rs | wc; }
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
通过ctlOf
来拼接runState
和workerCount
,比如状态为RUNNING
,个数为0个,通过ctlOf(RUNNING, 1)
计算出来的数值就是
1110 0000 0000 0000 0000 0000 0000 0000
|
0000 0000 0000 0000 0000 0000 0000 0001
=
1110 0000 0000 0000 0000 0000 0000 0001
那么runState
状态的计算方法就是
1110 0000 0000 0000 0000 0000 0000 0001
&
~(0001 1111 1111 1111 1111 1111 1111 1111)
->
1110 0000 0000 0000 0000 0000 0000 0000
=
1110 0000 0000 0000 0000 0000 0000 0001
只取高3位,而workerCount
就是取低29位;
前置知识
要掌握整个流程,先需要了解一些源码之中出现的属性或类;
// 保持活动线程的最小数量,最小值为0,allowCoreThreadTimeOut设置后corePoolSize就无效了;
private volatile int corePoolSize;
// 线程池容量的最大值
private volatile int maximumPoolSize;
corePoolSize
和maximumPoolSize
的说明参数上面;
线程工厂
private volatile ThreadFactory threadFactory;
所有线程都是通过该工厂创建的,默认使用的是DefaultThreadFactory
,创建线程时可能会遇到创建失败的情况,比如OutOfMemoryError
,所以说调用者需要处理启动线程失败的现象,比如进行清理并关闭线程池,让其正确的退出;
Worker
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
/** 每个worker都对应一个线程,如果为null则表示创建线程失败,通常是异常如OOM */
final Thread thread;
/** Worker捆绑的FutureTask,可能是null */
Runnable firstTask;
/** 完成的任务的计数器 */
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // 禁止中断,直到调用runWorker函数
this.firstTask = firstTask;
// 将自己(worker实现Runnable接口)传递给thread
this.thread = getThreadFactory().newThread(this);
}
// Thread.run的委托
public void run() { runWorker(this); }
...
}
Worker主要是维护线程的中断状态,对
AbstractQueuedSynchronizer
类进行了扩展,实现了一个简单的不可重入互斥锁,以简化获取和释放每个任务执行的锁。不是使用ReentrantLock
是因为不希望worker在调用诸如setCorePoolSize
之类的池控制方法时能够重新获得锁。同时为了在线程真正开始运行任务之前抑制中断事件,将锁状态初始化为负值直到在runWorker
中清除;
这是文档中对于Worker类的解释,提到的以下关键的几点
- 实现了一个简单的不可重入互斥锁,以简化获取和释放每个任务执行的锁;
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
// 加锁失败
// 重入锁的话这里会执行重入的逻辑,这里就简单的返回fail
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
省去了ReentrantLock
中复杂的重入、排队等待逻辑,加锁成功就成功,失败就失败;
- 为了在线程真正开始运行任务之前抑制中断事件,将锁状态初始化为负值;
Worker(Runnable firstTask) {
setState(-1); // 禁止中断,直到调用runWorker函数
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
在初始化Worker时将state设置为-1,表示不响应中断事件,具体就是执行worker#tryLock
失败,在开启任务runWorker
后才允许线程响应中断;
- 不是使用重入锁是因为不希望worker在调用诸如
setCorePoolSize
之类的池控制方法时能够重新获得锁;
这个光看字面意思会较难以理解,具体就是interruptIdleWorkers
处理中断有关,后面看到再分析;
Workers
private final HashSet<Worker> workers = new HashSet<Worker>();
Worker是保存在一个HashSet集合中,遍历、添加和删除操作之前需要持有下面这个锁才能操作;
private final ReentrantLock mainLock = new ReentrantLock();
看下文档中对这个的解释:
虽然我们可以使用某种类型的并发集,但事实证明使用锁通常更好。其中一个原因是,它串行化了
interruptIdleWorkers
操作,从而避免了不必要的中断风暴,尤其是在调用shutdown
。否则,退出的线程将同时中断那些尚未中断的线程。它还简化了一些相关的统计数据如最大池大小等。我们同时也在shutdown
和shutdownNow
中持有锁;
假如说worker集合使用的是并发安全的集合比如ConcurrentHashMap
,后者在遍历时做了很多功夫保证线程安全,但是依赖消耗额外的空间,在interruptIdleWorkers
中需要对worker集合进行遍历,使用重入锁可以很简单保证每个worker中线程中断状态的正确性;
private int largestPoolSize;
保存线程池的最大值,也是在持有mainLock
下才能访问;
private final BlockingQueue<Runnable> workQueue;
workQueue
用来储存排队的任务,当当前运行的任务数量大于corePoolSize
时,会暂时保存到该等待队列中,默认是无界队列LinkedBlockingQueue
;
添加worker
/*
core:如果为真则使用corePoolSize作为界,否则使用maximumPoolSize;
firstTask:新线程应该首先运行的任务(如果没有则为null)。当队列小于corePoolSize的线程
(在这种情况下会启动一个线程)或队列已满时,使用一个初始的第一个任务创建Workers来绕过队列。
最初,空闲线程通常是通过prestartCoreThread创建的,或者用来替换其他即将完成的worker。
*/
private boolean addWorker(Runnable firstTask, boolean core) {
// firstTask不为null意味着worker
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
// 意味着连接池调用了shutdown或者已经被关闭
// && workQueue不为空
// 或者firstTask为null
// 或者连接池调用了shutdown,正处于SHUTDOWN状态,此时不接受新任务
return false;
// 准备添加新任务
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 根据core参数决定采用哪个值
return false;
// 更新workerCount
if (compareAndIncrementWorkerCount(c))
// cas成功,更新workerCount=workerCount+1
break retry;
// cas失败,被其他线程抢占资源,重试
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
// runState状态不再是RUNNING
continue retry;
// CAS因workerCount变更而失效;重试该循环
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 构建worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 持有锁时重新检查。
// 退出当ThreadFactory故障,或者在获取锁之前关闭线程池。
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) { // rs处于SHUTDOWN状态
// runState是RUNNING或者runState是SHUTDOWN
// SHUTDOWN不允许接受新的任务,但是还会处理在等待队列中的任务,所以firstTask需要为null
if (t.isAlive())
// 为什么要有这个判断呢
// 如果对一个线程调用两次start,那么第二次会抛出该异常
// 这里是防止workers集合添加worker后但是后面的t.start()却启动失败
// 符合fast-fail思想
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
// 更新最大池数目
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 失败处理
addWorkerFailed(w);
}
return workerStarted;
}
addWorker
主要是分为两个阶段,第一个阶段是判断连接池状态是否处于运行RUNNING
状态,如果状态处于SHUTDOWN
,此时连接池是不接受新的任务,如果状态处于可接受新任务的状态且当前的worker
数量小于预设条件,使用CAS更新workerCount
;第二阶段就是在成功更新workerCount
后,将任务包装成Worker
类后添加到workers
合集中;
也就是说这个方法的核心功能就是将FutureTask包装成Worker
并添加到workers
合集中,并更新workerCount
;该方法返回失败的条件如下:
- 线程池状态不是RUNNING且,连接池调用了shutdown,正处于SHUTDOWN状态,或者调用
addWorker
时传的firstTask
参数为null,或者任务等待队列不是空的; -
core
为true条件下workerCount
小于corePoolSize
或者core
为false条件下workerCount
小于maximumPoolSize
; - 启动线程时失败
提交任务
public <T> Future<T> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
默认使用的是FutureTask
来包装任务,看到protected
关键字就意味着我们可以自己来扩展newTaskFor
的返回结果,比如ForkJoinPool
;
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
// 当前运行线程小于corePoolSize
if (addWorker(command, true))
// 添加worker成功并将thread启动,直接返回
return;
// addWorker失败,常见的情况就是调用shutdown或者
// 因为其他线程添加成功导致现在wc大于corePoolSize
c = ctl.get(); // 重新获取runState和workerCount
}
// 条件:
// 当前运行线程大于等于corePoolSize,或者addWorker失败;
// 如果线程池没有被关闭,向等待队列中添加任务
if (isRunning(c) && workQueue.offer(command)) {
// 进入排队队列成功
int recheck = ctl.get(); // 重新获取runState和workerCount
if (! isRunning(recheck) && remove(command)) // 检查线程是否还处于RUNNING状态,不是的话进行回滚
reject(command); // 拒绝任务
else if (workerCountOf(recheck) == 0)
// runState=RUNNING,workerCount=0,重新添加worker到workerSet
addWorker(null, false);
}
// runState不再是RUNNING,或者向等待队列中入队失败,即等待队列以及满了
else if (!addWorker(command, false)) // 再次尝试添加worker,但是由maximumPoolSize决定上限
reject(command);
}
// 回滚任务
public boolean remove(Runnable task) {
// 移除排队队列的任务
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
提交任务时大概会经历这几个过程:
- 如果运行的线程小于corePoolSize,则尝试启动一个新线程执行任务。对
addWorker
失败的线程重新检查runState和workerCount; - 如果一个任务可以成功地进入排队队列,如果发现当前的工作线程数量为0时会再次尝试添加一个
Worker
线程,因为自上次检查以来已有线程退出(之前发现workerCount
大于corePoolSize
,现在workerCount
为0,意味着有Worker
已经完成退出),注意这里addWorker(null, false)
,传进去的任务是null,因为这时候任务刚刚进入排队队列里面;
或者这时候线程池后调用了shutdown
关闭,会将在等待队列中的任务回滚; - 如果排队队列已经满了而导致不能入队,则再次尝试添加新线程。如果添加失败,意味着线程池已经饱和,因此拒绝任务;
如果addWorker
返回成功,那么这时候线程已经开始执行相应的任务了;
执行任务
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // 可能为null
w.firstTask = null;
w.unlock(); // // 开始允许worker响应中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
// 获取到了任务
w.lock(); // 加非重入锁,保证每个线程同一时刻只能执行一个任务
// 如果线程池处于STOP状态通常就是调用了shutdownNow,确保所有worker被中断;
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // hook
Throwable thrown = null;
try {
// 注意这个try-catch的对象是FutureTask#run
task.run(); // 最终执行任务的调用起点,调用的就是FutureTask#run
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // hook
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
firstTask
也是有空的可能性,比如上面提到的刚开始添加addWorker
时发现workerCount
大于corePoolSize
,再进入排队队列后发现当前workerCount
为0,意味着一些Worker已经完成退出空出了空闲容量,那么会重新执行addWorker(null, false)
,也就是firstTask
为null的原因之一;
整个方法的大概执行流程:
firstTask
不为空的情况下,那么就从该初始任务开始,否则需要通过getTask
从等待队列获取,一般情况下只要线程池在运行中,就可以获得任务,但是有可能会返回null,原因下面有总结;如果获取到了任务,需要确保线程池处于可响应任务的状态(RUNNING或者SHUTDOWN),否则线程池被关闭了要确保中断事件通知到每个
Worker
关联的线程;每个任务运行之前都有一个对
beforeExecute
钩子的调用,但这也可能会抛出一个异常,在这种情况下我们在不处理任务直接退出worker
;假设
beforeExecute
正常完成,就会开始执行这个任务,并储存它可能抛出的任何异常并发送给afterExecute
,注意这里的try-catch
的对象是FutureTask#run
,但是FutureTask#run
内部也是有个try-catch
,那个try-catch
的对象才是我们执行的任务(就是下面这个匿名内部类);在afterExecute
中看到的任何异常都会导致线程死亡和Worker
退出;
ExecutorService executorService = new Executors.newFixedThreadPool(1);
try {
Future future = executorService.submit(() -> {
// 这个内部类的异常由FutureTask捕获并储存
Object obj = null;
System.out.println(obj.toString());
});
future.get(); // 获取FutureTask的结果或者异常
} catch (Exception e) {
e.printStackTrace();
}
- 如果获取不到任务或者执行过程中遇到异常而导致的非正常退出,最后都是通过
processWorkerExit
关闭Worker
;
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 仅在必要时检查队列是否为空
// ArrayBlockingQueue#isEmpty是要加锁的
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// runState处于SHUTDOWN状态时不再接受新的任务,
// runState处于STOP及以上时不会处理排队的任务,同时等待队列为空了也是如此(因为没有排队任务了)
decrementWorkerCount(); // workerCount--
return null; // 没有任务可以处理了
}
int wc = workerCountOf(c);
// 判断worker允不允许超时,allowCoreThreadTimeOut为true或者是非核心的worker都会超时
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 满足条件:
// 1.当前的workerCount已经超过了maximumPoolSize所设置的最大容量,或者(worker允许超时 || 先前已经获取任务失败)
// 2.等待队列不为空,或者workerCount > 1
if (compareAndDecrementWorkerCount(c)) // workerCount--
return null;
continue; // cas失败,重新来一遍检查
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : // 可超时的阻塞等待
workQueue.take(); // 阻塞直到有任务到来
// 一般来说workerCount < corePoolSize 就会一直阻塞等待任务;
// 如果设置了allowCoreThreadTimeOut 或者 corePoolSize < workerCount < maximumPoolSize
// 就是进行可超时的等待任务,搭配keepAliveTime熟悉来处理
if (r != null)
return r;
timedOut = true; // 没有获取到任务
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask
可能返回空的场景如下:
- 有超出
maximumPoolSize
大小的worker进来; - 线程池已经被关闭了,即状态不为
RUNNING
和SHUTDOWN
; - 线程池处于
SHUTDOWN
状态,即调用了shutdown()
,且等待队列为空; - worker从等待队列中获取任务超时,即满足
allowCoreThreadTimeOut || workerCount > corePoolSize
条件;
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// completedAbruptly为true意味着在worker执行过程中因为异常而导致非正常退出
if (completedAbruptly) // 如果abrupt为true, 需要调整workerCount
decrementWorkerCount(); // workerCount--
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
tryTerminate(); // 尝试进入TERMINATE状态
int c = ctl.get();
if (runStateLessThan(c, STOP)) { // runState处于RUNNING或者SHUTDOWN状态
if (!completedAbruptly) {
// worker正常退出
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
// 等待队列还有任务的情况下,至少要保持一个常驻worker存在
min = 1;
if (workerCountOf(c) >= min)
// 如果当前worker数量过多,则不需要创建一个新的worker
return;
}
// worker非正常退出,需要重新创建一个
addWorker(null, false);
}
}
无论是正常还是非正常退出,每个Worker
退出后都会执行这个方法,首先是将其从worker
集合中移除,然后判断是否能进入TERMINATE
状态,最后通过corePoolSize
来判断是否需要保持最低限度的worker
存活来创建worker
;
final void tryTerminate() {
for (;;) {
int c = ctl.get(); // 获取workerCount和runState
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
// 还不能转换到TERMINATEDe状态的情况:
// 1. 线程池状态为RUNNING
// 2. 线程池状态为TIDYING或者已经是TERMINATED
// 3. 线程池处于SHUTDOWN状态,但是等待队列中还有任务需要处理
return;
// 到这里意味着满足切换到终止状态的条件了
if (workerCountOf(c) != 0) {
// 如果workerCount非零,则中断空闲的worker以确保关闭信号传播
interruptIdleWorkers(ONLY_ONE);
// 每个worker退出时都会重新走到tryTerminate方法
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
// 切换到TIDYING状态成功
try {
terminated(); // hook
} finally {
ctl.set(ctlOf(TERMINATED, 0));
// 切换到TERMINATED状态
termination.signalAll(); // 通知所有等待termination事件的线程
}
return; // 线程池终止成功
}
} finally {
mainLock.unlock();
}
// CAS失败,重试
}
}
在一些地方都会有这个判断方法tryTerminate
,该方法主要是尝试将符合条件的线程池的状态变成TERMINATED
,当然不符合状态就不会发生什么事情;
小结
到这里线程池基本的运行流程已经清晰了,重新梳理一下;
首先是corePoolSize
和maximumPoolSize
的关系,
无界队列
如果我们在构造器中使用的是无界队列(或者说容量很大的有界队列),每一个submit
的任务会被封装在一个FutureTask
里面,然后判断当前任务数是否少于corePoolSize
,如果是则创建Worker
开始执行;如果不是会加入等待队列中;
无界队列的特点就是只创建和corePoolSize
数目相同的Worker
执行,其他多余的任务将排队等待,这种模式的适合于CPU密集型任务,将corePoolSize
设置为CPU数目一样或者几倍,可以减少线程切换时带来的上下文切换等消耗,但是缺点就是提交任务的速度大于执行任务的速度时会造成任务累积,同时大量占用内存;
有界队列
如果我们在构造器中使用的是有界队列(容量不大),分为几个步骤:
同样每一个
submit
的任务会被封装在一个FutureTask
里面,然后判断当前任务数是否少于corePoolSize
,如果是则创建Worker
开始执行;如果当前任务数大于
corePoolSize
,就会尝试加入等待队列,如果添加失败,则会根据maximumPoolSize
决定是否要拒绝任务添加,如果小于当前运行的worker
小于maximumPoolSize
,会创建一个新的worker
执行任务,如果大于maximumPoolSize
就会拒绝该任务;
这种模式的特点就是worker
的数量会动态增减,最低会保持corePoolSize
数量个worker
存活,最高允许创建maximumPoolSize
个任务(corePoolSize
和maximumPoolSize
当然也可以一样);适用的场景比如IO密集型场景,可能大量的时间都被阻塞在等待IO,这时候大容量的线程池能允许处理更多的任务,更加充分的利用CPU;
关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查权限是否允许执行shutdown
advanceRunState(SHUTDOWN); // 将状态修改成SHUTDOWN
interruptIdleWorkers(); // 中断所有可中断的worker
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate(); // 将状态变成TERMINATED
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
// worker.tryLock成功意味着Worker处于可中断状态,
// 即调用了Worker#runWorker
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
调用shutdown
关闭线程池,先是将状态变成SHUTDOWN
,然后中断所有worker
,这里的interruptIdleWorkers
中断的可中断的Worker
,即调用了runWorker
且不是在执行任务过程的worker
,因为worker
执行任务前会加锁;
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess(); // 检查权限是否允许执行shutdown
advanceRunState(STOP); // 将状态修改成STOP
interruptWorkers(); // 尝试中断所有允许中断的worker
tasks = drainQueue(); // 取出所有等待的任务
} finally {
mainLock.unlock();
}
tryTerminate(); // 将线程池状态修改成Terminate
return tasks;
}
// 中断所有活跃的workers
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt(); // 中断任务,但是如果不响应中断异常的线程可能不会发生什么,所以并不一定保证中断成功
} catch (SecurityException ignore) {
}
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList); // 一次性取出所有任务
if (!q.isEmpty()) { // 如果队列是DelayQueue,可能drainTo没法取出所有元素
for (Runnable r : q.toArray(new Runnable[0])) { // 轮询遍历
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
shutdownNow
会尝试停止所有正在执行的任务,并停止等待任务的处理,同时返回正在等待执行的任务列表。interruptWorkers
中会尝试中断所有活跃的worker
,但是也只是尝试,如果任务不能响应中断任务,都可能永远不会终止该线程;
参数调整
内置的一些方法允许动态调整线程池的核心参数;
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize; // delta>0,增加核心核数,delta<0,减少核心核数
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
// 如果当前的工作线程数量大于新的corePoolSize
interruptIdleWorkers(); // 中断空闲的worker
else if (delta > 0) {
int k = Math.min(delta, workQueue.size());
// 由于不确定是否需要那么多线程
// 所以启动和当前等待队列任务一样大小的worker,如果队列在此过程中变为空,则停止。
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
指定一个corePoolSize
,如果是将该值调小,则会即刻尝试中断空闲中的线程;如果是将该值调大,会从新corePoolSize
和等待队列的大小中选一个最小值,然后创建与最小值一样的worker,注意这里的调整不是一触而就的,而是慢慢的将以前的worker退出或者慢慢的增加worker,直到满足新corePoolSize
值;
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
同理设置maximumPoolSize
时,当前worker
数量大于新值时,同样会尝试中断空闲线程,让worker
数量慢慢将到新的maximumPoolSize
;
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
设置线程在终止之前保持空闲状态的时间限制,如果当前池中线程的数量超过了核心数量,那么多余的线程将被终止。
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// 如果在遍历过程中遇到干扰,选择慢速路径。
// 为遍历创建副本,并为已取消的项调用remove。
// 慢路径更可能是O(N*N)
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate();
}
尝试从工作队列中删除所有已被取消的任务,这种方法可以用作存储回收操作;被取消的任务永远不会执行,但可能会累积在工作队列中,直到工作线程可以主动删除它们。现在可以调用这个purge
方法尝试删除它们。
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
如果当前的核心线程没有达到设置的corePoolSize
,则启动一个worker,这是一种预加载机制,在某些场景下可能有用,比如刚刚把corePoolSize
的数量从1调整到n>1,在任务来临前可以快速启动一个常驻worker;
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
当然还有一次性启动corePoolSize
个worker的方法可供选择;
总结
日常开发中无时无刻不在接触线程池或连接池,通过学习ThreadPoolExecutor,帮助我们更好的理解内部的机理,从而衍生到比如阿帕奇的httpcomponents中PoolingHttpClientConnectionManager
,或者NGINX的连接池,或者Rabbitmq的连接池,看看是不是有异曲同工之妙,总之看一个积累一个,日积月累总会有所突破;