前言
如果有人问我:“你了解Java线程池吗”,我不打算回答Java中常用的几种线程池,也记不住。从线程池的上层API来看,再多种的线程池,无非是参数的不同,让它们呈现出了不同的特性,那么这些特性到底依赖什么样的原理实现,就更值得去深究,也是本文的目的。
试着回答以下几个问题:
- 线程池如何实现
- 非核心线程延迟死亡,如何做到
- 核心线程为什么不会死
- 如何释放核心线程
- 非核心线程能成为核心线程吗
- Runnable在线程池里如何执行
- 线程数如何做选择
- 常见的不同类型的线程池的功效如何做到
如果以上问题回答不出一二三,可以借鉴本文。
基础知识
要了解线程池,必然涉及到ThreadPoolExecutor。ThreadPoolExecutors实现了线程池所需的最小功能集,已能hold住很多场景。常见的线程池类型,通过Executors提供的API,屏蔽了构造参数细节来创建ThreadPoolExecutors,因为不了解具体参数含义的话,可能拿到的线程池与设想的会有偏差。
构造参数与对象成员变量
- corePoolSize:核心线程数,期望保持的并发状态
- maximumPoolSize:最大线程数,允许超载,虽然期望将并发状态保持在一定范围,但是在任务过多时,增加非核心线程来处理任务。非核心线程数 = maximumPoolSize - corePoolSize
- workQueue:阻塞队列,存储线程任务Runnable
- keepAliveTime:在没有任务时,线程存活时间
- threadFactory:用来构建线程
- handler:当任务已满,并且无法再增加线程数时,或拒绝添加任务时,所执行的策略
Worker
线程池中的工作线程以Worker作为体现,真正工作的线程为Worker的成员变量,Worker即是Runnable,又是同步器。Worker从工作队列中取出任务来执行,并能通过Worker控制任务状态。
ctl
ctl用来控制线程池的状态,并用来表示线程池线程数量。在线程池中,有以下五种状态
- RUNNABLE:运行状态,接受新任务,持续处理任务队列里的任务
- SHUTDOWN:不再接受新任务,但要处理任务队列里的任务
- STOP:不接受新任务,不再处理任务队列里的任务,中断正在进行中的任务
- TIDYING:表示线程池正在停止运作,中止所有任务,销毁所有工作线程
- TERMINATED:表示线程池已停止运作,所有工作线程已被销毁,所有任务已被清空或执行完毕
状态转换关系如下图
ctl类型为AtomicInteger,那用一个基础如何表示以上五种状态以及线程池工作线程数量呢?int型变量占用4字节,共32位,因此采用位表示,可以解决上述问题。5种状态使用5种数值进行表示,需要占用3位,余下的29位就可以用来表示线程数。因此,高三位表示进程状态,低29位为线程数量,代码如下:
// 值为29
private static final int COUNT_BITS = Integer.SIZE - 3;
// 高三位全为0,低29位全为1,因此线程数量的表示范围为 0 ~ 2^29
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
因为ctl分位来表示状态和数量,下面几个状态仅看有效位的值
*/
// 有效值为 111
private static final int RUNNING = -1 << COUNT_BITS;
// 有效值为 000
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 有效值为 001
private static final int STOP = 1 << COUNT_BITS;
// 有效值为 010
private static final int TIDYING = 2 << COUNT_BITS;
// 有效值为 011
private static final int TERMINATED = 3 << COUNT_BITS;
// 默认状态为RUNNING,线程数量为0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
既然采用了int分位表示线程池状态和线程数量,那么线程池自然提供了方法来获取状态与数量
- runStateOf(): 获取线程池状态
- workerCountOf(): 获取工作线程数量
两函数均为二进制操作,代码不贴,可用下图说明:
线程池实现
添加任务
线程池可以通过submit()、execute()提交线程任务,其中,submit()可以通过Future拿到执行结果,内部也是通过execute()向线程池提交线程任务.
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
// 获取当前ctl值
int c = ctl.get();
// 当前线程数少于最大核心线程数
if (workerCountOf(c) < corePoolSize) {
// 添加核心线程,添加线程任务
if (addWorker(command, true))
return;
// 上面的过程期间,ctl可能已被更改,获取最新值
c = ctl.get();
}
// 线程池状态为RUNNABLE,向工作队列添加任务
if (isRunning(c) && workQueue.offer(command)) {
// 再次检查用
int recheck = ctl.get();
// 线程不处于RUNNABLE状态,移除任务
if (! isRunning(recheck) && remove(command))
// 执行拒绝任务策略
reject(command);
else if (workerCountOf(recheck) == 0)
// 执行到这里说明已没有可用的工作线程,创建新的工作现线程
// ,并从任务队列里取任务。因为在这个时刻,存在所有工作线程
// 都被释放的可能,为了应对这个线程池“假死”的情况,所以创建
// 了新的工作线程
addWorker(null, false);
}
// 添加非核心队列来执行线程任务
else if (!addWorker(command, false))
// 说明线程池达到饱和,或者线程池shut down,执行拒绝策略
reject(command);
}
当有任务到来时,按照如下策略进行:
- 如果当前核心线程数量没达到最大值corePoolSize,创建新线程来执行此任务
- 如果当前核心线程到达最大,向阻塞队列添加任务
- 如果核心线程已满,阻塞队列已满,尝试开启非核心线程来执行任务
- 如果线程池不处于RUNNABLE状态,或者处于饱和状态,执行任务拒绝策略
线程池是按照上面123的顺序来处理新进的任务的,并且在每一个过程中,会检查ctl的最新值有效性,因为在处理过程中线程池的各种状态随时可能发生了改变。
不过是通过添加核心或是通过添加非核心线程来执行任务,都是通过addWorker()来完成,下面是代码
private boolean addWorker(Runnable firstTask, boolean core) {
// 这个是类似 goto 的语法,代码有效片段是下面第一for循环
retry:
for (;;) {
int c = ctl.get();
// 获取程序状态
int rs = runStateOf(c);
/**
这一个条件需要仔细理解。
1. 当线程处于STOP、TIDYING、TERMINATED时,线程池是拒绝执行任务的
因此不需要任务,也不添加线程
2. 当线程处于SHUTDOWN状态时,线程池需要把任务处理完,才会到达后面的
TIDYING、TERMINATED状态。因此,如果阻塞队列还有任务的话,继续添加
线程来加快处理。
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
// 获取线程数
int wc = workerCountOf(c);
// 线程数超过或等于能表示的上限
// 或 比较 核心线程数达到上限,或比较线程池允许的最大线程数,取决于core
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS操作增加线程数,跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 上面的CAS操作没成功,检查线程池状态与开始是否一致,
// 如果一致,继续执行此for循环,否则重新执行retry代码块,
// 自旋以期CAS成功,后续才能添加线程
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 将线程任务加入Worker,新增了Worker,就是新增了线程
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// 再次检查线程池状态
// 1. 处于RUNNABLE状态,继续添加线程执行任务
// 2. 处于SHUTDOWN状态,到这里说明队列里还有任务要执行
// 增加线程期望让任务执行快一点
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 这里说明发生了意外状况,新建的线程不可用
if (t.isAlive())
throw new IllegalThreadStateException();
// 添加worker进集合
workers.add(w);
int s = workers.size();
// largestPoolSize可以表示线程池达到的最大并发
if (s > largestPoolSize)
largestPoolSize = s;
// 添加线程成功
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 启动新添加的线程
t.start();
// 线程启动成功
workerStarted = true;
}
}
} finally {
if (! workerStarted)
// 线程启动失败,移除work,销毁线程
addWorkerFailed(w);
}
return workerStarted;
}
以上代码做了如下几件事:
- 线程池处于 RUNNBALE 或者处于 SHUTDOWN 并在阻塞队列里还有任务时,需要添加新线程。自旋确保 CAS 成功,然后添加新线程
- 线程存于Worker,线程池存有Worker信息,就能访问线程
- 线程启动失败,则移除Worker,销毁线程
addWorkerFailed()操作就不进去看了,首先是将Worker移除,然后通过CAS操作更新ctl,最后调用tryTerminate()操作尝试中止线程池。
执行任务
之前的代码开启了新线程并让线程执行,但是没有看到有Runnable提交。之前说过Worker本身为Runnable,并且存有为Thread类型的成员变量。线程池执行的任务的线程,也就是Workder里的Thread。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
Worker(Runnable firstTask) {
setState(-1);
// firstTask就是addWorker()带来的Runnable
this.firstTask = firstTask;
// 通过ThreadFactory创建线程,将自己作为Runnable提交
this.thread = getThreadFactory().newThread(this);
}
......
}
因此线程执行后,执行的是Worker.run(),run()则调用了ThreadPoolExecutor.runWorker()
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock();
boolean completedAbruptly = true;
try {
// task一开始是firstTask, 后面就通过getTask()从阻塞队列里拿任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 线程池状态检查
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 这个方法子类可以重写,在任务执行前有回调
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 这个方法子类可以重写,在任务执行后有回调
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 线程池已没有任务了,工作线程达到了可退出的状态,Worker退出
processWorkerExit(w, completedAbruptly);
}
}
线程首个任务为firstTask,之后通过getTask()就从阻塞队列里任务。线程池提供了beforeExecute()和afterExecute()通知子类任务执行前后的回调,让子类有时机能执行自己的事情。如果线程池已没有任务了,工作线程达到了可退出的状态,则将线程退出。
主要看getTask() 和 processWorkerExit()
private Runnable getTask() {
// 超时标志
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 检查线程池和阻塞队列状态
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 减少线程数
decrementWorkerCount();
return null;
}
// 获取线程数
int wc = workerCountOf(c);
// 线程等待方式标志位判断依据
// allowCoreThreadTimeOut代表核心线程是不是能退出,如果核心线程能退出,就更别说非核心线程了
// 另一个则是看是否存在非核心线程
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 超时,或者并且线程超标超标,返回null,让上一层函数退出线程
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 如果timed为true,则使用poll等待最多keepAliveTime时间获取任务
// 如果timed为false,使用take()获取任务,阻塞线程,直到可以从阻塞队列拿到任务
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 超时
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
线程池里的线程从阻塞队列里拿任务,如果存在非核心线程,假设阻塞队列里没有任务,那么非核心线程也要在等到keepAliveTime时间后才会释放。如果当前仅有核心线程存在,如果允许释放核心线程的话,也就和非核线程的处理方式一样,反之,则通过take()一直阻塞直到拿到任务,这也就是线程池里的核心线程为什么不死的原因。
从之前的代码一直看到这,并没有发现有明显的标志来标志核心线程与非核心线程,而是以线程数来表达线程身份。0 ~ corePoolSize 表示线程池里只有核心线程,corePoolSize ~ maximumPoolSize 表示线程池里核心线程满,存在非核心线程。然后,根据区间状态做有差异的处理。可以大胆猜测,线程池实际并不区分核心线程与非核心线程,是根据当前的总体并发状态来决定怎样处理线程任务。corePoolSize是线程池希望达到并保持的并发状态,而corePoolSize ~ maximumPoolSize则是线程池允许的并发的超载状态,不希望长期保持。
释放线程
在线程没有拿到任务后,退出线程,通过processWorkerExit()可以证实上述所言。
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 到这里说明线程中断,先通过decrementWorkerCount()减少线程数值
// 否则,说明是线程没有从阻塞队列获取到线程
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// completedTaskCount记录线程池总共完成的任务
// w.completedTasks则是线程完成的任务数
completedTaskCount += w.completedTasks;
// 移除Worker
workers.remove(w);
} finally {
mainLock.unlock();
}
// 线程池状态改变,尝试中止线程池
tryTerminate();
int c = ctl.get();
// 检查线程池状态,线程池处于RUNNABLE或者SHUTDOWN则进入
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// 线程池最小数量,取决于是否能释放核心线程
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果任务队列还有线程,最起码都要有一个线程来处理任务
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return;
}
// 因为线程中断,可能导致没有线程来执行阻塞队列里的任务
// 因此尝试创建线程去执行任务
addWorker(null, false);
}
}
释放工作线程也并没有区分核心与非核心,也是随机进行的。所谓随机,就是在前面所说的区间范围内,根据释放策略,哪个线程先达到获取不到任务的状态,就释放哪个线程。
文中多次出现tryTerminate(),但不深入去看了。里边最主要的操作是,发现可以中止线程池时,中止,并调用terminated()进行通知。如果线程池处于RUNNABLE状态,什么也不做,否则尝试中断一个线程。 中断线程则是通过interruptIdleWorker()操作,就不展开了。
到这里就能能明白线程池的原理的,如下图
线程池里有容纳一定的Worker,Worker中的线程就是线程池中用来执行任务的线程。当有任务加入线程时,根据线程池状态的不同,有不同的步骤。当核心线程未满时,创建新线程来执行;否则将任务加入到阻塞队列;否则创建非核心线程来执行。而线程获取任务的方式有两种,根据线程池容量区间,以及是否可以释放核心线程来使用take()或者poll()来获取任务,其中poll()在一定时间内获取不到任务,则当前线程会被释放。
当然,在addWorker()方法来有任务添加失败的策略,也就是RejectedExecutionHandler。ThreadPoolExecutor实现了四种策略来进行处理,简单了解即可:
- CallerRunsPolicy: 如果线程池没有SHUTODOWN的话,直接执行任务
- AbortPolicy: 抛出异常,说明当前情况的线程池不希望得到接收不了任务的状态
- DiscardOldestPolicy: 丢弃阻塞队列最旧的任务
- DiscardPolicy: 什么也不做
需要注意的是,默认情况下策略为AbortPolicy。
总结
做个总结:
- 线程池倾向于使用核心线程来处理任务,从任务的添加策略可以看出,先考虑创建核心线程处理,再考虑放到阻塞队列,再考虑创建非核心线程处理。以上都不行,则使用任务拒绝策略
- 通过向阻塞队列取任务的不同操作,能确保线程的存活,take()保证核心线程不死,poll()保证非核心线程存活等待一定时间
- 线程池不区分核心线程和非核心线程,线程池是期望达到corePoolSize的并发状态,并允许在不得已情况下超载,达到corePoolSize ~ maximumPoolSize 的并发状态
- 线程池状态和线程数量用ctl表示,高三位为状态,低29位为当前线程池数量
- 线程池对状态的检测非常苛刻,几乎在所有稍微耗时或影响下一步操作正确性的代码前都校验ctl
线程池中有很多值得学习的东西,线程容量调整的设计、ctl的设计、任务调度的设计等。也有需要更深的储备才能看懂的实现,这里点出,以备近一步学习,如同步器的使用,并发场景的考虑与应用等。
下面回答开篇提出的问题。
问答
线程池如何实现
总结就是这个问题的答案
非核心线程延迟死亡,如何实现
通过阻塞队列poll(),让线程阻塞等待一段时间,如果没有取到任务,则线程死亡
核心线程为什么不死
通过阻塞队列take(),让线程一直等待,直到获取到任务
如何释放核心线程
将allowCoreThreadTimeOut设置为true。可用下面代码实验
// 伪代码
{
// 允许释放核心线程,等待时间为100毫秒
es.allowCoreThreadTimeOut(true);
for(......){
// 向线程池里添加任务,任务内容为打印当前线程池线程数
Thread.currentThread().sleep(200);
}
}
线程数会一直为1。 如果allowCoreThreadTimeOut为false,线程数会逐渐达到饱和,然后大家一起阻塞等待。
非核心线程能成为核心线程吗
线程池不区分核心线程于非核心线程,只是根据当前线程池容量状态做不同的处理来进行调整,因此看起来像是有核心线程于非核心线程,实际上是满足线程池期望达到的并发状态。
Runnable在线程池里如何执行
线程执行Worker,Worker不断从阻塞队列里获取任务来执行。如果任务加入线程池失败,则在拒绝策略里,还有处理机会。
线程数如何做选择
这就要看任务类型是计算密集型任务还是IO密集型任务了,区别在于CPU占用率。计算密集型任务涉及内存数据的存取,CPU处于忙绿状态,因此并发数相应要低一些。而IO密集型任务,因为外部设备速度不匹配问题,CPU更多是处于等待状态,因此可以把时间片分给其他线程,因此并发数可以高一些。
常见的不同类型的线程池的功效如何做到
常见的线程池有:
- CachedThreadPool:适合异步任务多,但周期短的场景
- FixedThreadPool: 适合有一定异步任务,周期较长的场景,能达到有效的并发状态
- SingleThreadExecutor: 适合任务串行的场景
- ScheduledThreadPool: 适合周期性执行任务的场景
对于如何选择线程池就要看具体的场景,其中的差异通过构造参数可以到达效果,通过之前的分析,就能知道参数的具体作用以及为什么能达到效果。取FixedThreadPool来看,抛砖引玉。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
nThreads个数量核心线程持续并发任务,没有非核心线程,如果没有任务,则通过take()阻塞等待,不允许核心线程死亡。并且阻塞队列为LinkedBlockingQueue,容量为Integer.MAX_VALUE,可以视为无界队列,更难走到拒绝添加线程逻辑。
参考
线程池原理
彻底理解Java线程池原理篇
Java线程池---ThreadPoolExecutor中的ctl变量
JUC锁框架_AbstractQueuedSynchronizer详细分析