ThreadPoolExecutor如何实现任务的提交和执行的呢?
首先,看一下ThreadPoolExecutor的Worker内部类。
Worker
ThreadPoolExecutor定义了内部类Worker来表征线程池中的工作线程:
// 继承了AQS,并实现了Runnable接口
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
// 工作线程
final Thread thread;
// 待执行的任务
Runnable firstTask;
// 当前线程已执行的任务数
volatile long completedTasks;
// 构造函数
Worker(Runnable firstTask) {
// 调用AQS的setState方法将锁状态设置为-1
setState(-1);
this.firstTask = firstTask;
// 通过线程工厂创建线程
// 注意: 创建线程时会将当前worker传入,worker本身也是一个runnable
this.thread = getThreadFactory().newThread(this);
}
// 定义启动函数
// addWorker()-->t.start()-->t.run()-->worker.run()
public void run() {
runWorker(this);
}
// 0代表无锁状态
// 1代表有锁状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 实现AQS的tryAcquire方法
protected boolean tryAcquire(int unused) {
// CAS将状态值由0更新为1
if (compareAndSetState(0, 1)) {
// 若成功,则将当前线程设置为锁独占线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 实现AQS的tryRelease方法
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
// 将状态值为0
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
// 中断已启动线程
void interruptIfStarted() {
Thread t;
// getState() >= 0 代表线程处于非Running状态
// (t = thread) != null 代表工作线程不为null
// !t.isInterrupted() 代表当前线程未被中断过
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
为何将线程包装成worker呢?其实主要为了实现工作线程和空闲线程的识别。
- 正在执行任务的线程为工作线程;
- 未执行任务的线程为空闲线程。
Worker继承了AQS,并定义了tryAcquire和tryRelease方法。线程需要获取锁才可以执行任务,任务执行完毕后释放锁。
当检测到线程有锁时,则说明该线程为工作线程;反之,当检测到线程无锁时,则说明该线程为空闲线程。
下面从线程执行方法开始跟一下源码:
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// worker数目小于corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 将command作为firstTask创建1个核心worker
if (addWorker(command, true))
return;
c = ctl.get();
}
// 此时核心线程数已满,尝试创建非核心线程处理command任务
// 如果线程池状态为running且将当前任务添加到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 如果线程池状态不为running且当前任务已成功移除出阻塞队列,则执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果线程池工作线程数目为0,则添加1个非核心工作线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 如果线程池状态不为running
// 或者线程池状态为running且当前任务添加到阻塞队列失败(阻塞队列已满),则尝试添加非核心工作线程并处理当前任务
// 若失败,则执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
归纳一下任务提交流程:
- 若当前工作线程数目小于corePoolSize,则创建新的核心线程,并将command任务提交给该新建的核心线程执行;
- 若当前工作线程数目已等于corePoolSize,则将command任务添加到阻塞队列;
- 若command任务未添加到阻塞队列(阻塞队列已满),则创建新的非核心线程,并将command任务提交给该新建的非核心线程执行。
可以看到,execute方法主要落脚在addWorker方法上。
addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层循环,负责判断线程池状态
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 第一种情况: rs >= SHUTDOWN,即线程池的状态为SHUTDOWN、STOP、TIDYING、TERMINATED,此时没必要添加工作线程
// 第二种情况: 下列3种情况只要满足1种,则没必要要添加工作线程
// (1) rs != SHUTDOWN(隐含rs >= SHUTDOWN),即线程池状态为STOP、TIDYING、TERMINATED
// (2) firstTask != null(隐含rs == SHUTDOWN),当线程池状态为SHUTDOWN时,如果firstTask != null,此时添加任务会被拒绝
// (3) workQueue.isEmpty()(隐含rs == SHUTDOWN && firstTask == null),如果此时任务队列为空,则没必要添加工作线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 内层循环,将Worker数目+1
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// CAS将worker数目+1,成功则跳出retry循环
if (compareAndIncrementWorkerCount(c))
break retry;
// CAS将worker数目+1失败,再次读取ctl
c = ctl.get(); // Re-read ctl
// 如果线程池状态发生改变,则跳出内层循环,继续外层循环
if (runStateOf(c) != rs)
continue retry;
}
}
// 1. 将线程添加到Workers Set集合
// 2. 启动线程
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
// rs < SHUTDOWN,表明线程池状态为RUNNING
// rs == SHUTDOWN && firstTask == null,当线程池状态为SHUTDOWN,且Worker的初始任务为null,但workQueue中可能有未执行完的任务,此时仍需添加worker
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 若线程添加成功,则启动该线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
概括一下上述流程:
- 判断线程池当前是否为可以添加worker线程的状态,可以则继续进行下一步,不可以则return false;
- 线程池状态>shutdown,可能为stop、tinying、terminated,不能添加worker线程;
- 线程池状态为shutdown,且firstTask不为空,不能添加worker线程,因为shutdown状态的线程池不接收新任务;
- 线程池状态为shutdown,firstTask为空,且workQueue也为空,不能添加worker线程,因为firstTask为空是为了添加一个没有任务的线程再从workQueue获取Task,而workQueue为空,说明添加无任务线程已经没有意义。
- 线程池当前线程数量是否超过上限(corePoolSize或maximumPoolSize),超过了return false,没超过则对workerCount+1,继续下一步;
- 在线程池ReentrantLock保证下,向Workers Set中添加新创建的worker实例,添加完成后解锁;
- 当worker添加成功后,则启动该线程。
t.start()方法很有意思,因为t为worker持有的线程,t初始化时传入的runnable又为worker本身。
t.start()本质上会调用到Thread的run方法:
@Override
public void run() {
if (target != null) {
target.run();
}
}
Thread的run()方法又会调用到runnable的run()方法,worker继承了Runnable接口,并覆写了run方法:
public void run() {
runWorker(this);
}
本质上调用的是ThreadPoolExecutor的runWorker方法:
runWorker
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// 因为初始化后的worker的state值为-1,需要通过unlock()方法将state值置为0,保证worker可执行lock()操作
w.unlock();
boolean completedAbruptly = true;
try {
// 阻塞执行(直到task为空才退出)
while (task != null || (task = getTask()) != null) {
// 需要获取worker独占锁,且不重入
// 执行任务前,获取worker锁,任务执行完毕后,才释放worker锁
// 只要检测到worker为已获取锁状态,则证明该worker为active状态
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 处理worker退出
processWorkerExit(w, completedAbruptly);
}
}
概括一下上述流程:
- 首先通过unlock()方法将state值置为0(初始化后的worker的state值为-1,无法成功执行lock()操作),保证worker后续可以获取锁以便执行任务;
- 阻塞获取worker自身持有的task及阻塞队列中的task,然后执行;
- 当获取不到task时,释放掉worker锁变为空闲线程;
- 最后执行processWorkerExit方法处理空闲线程。
接着看一下getTask和processWorkerExit方法。
getTask
private Runnable getTask() {
// poll获取任务是否超时
boolean timedOut = false;
for (;;) { ①
int c = ctl.get();
int rs = runStateOf(c);
// 第一种情况: rs的状态为stop、tinying、terminated
// 第二种情况: rs的状态为shutdown,且workQueue.isEmpty
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { ②
// 循环CAS减少worker数量,直到成功
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否有超时机制timed
// 1. allowCoreThreadTimeOut允许核心线程空闲超时后回收
// 2. wc > corePoolSize代表非核心线程空闲均会超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; ③
// 1. wc > maximumPoolSize且wc > 1
// 2. (timed && timedOut)且wc > 1
// 3. wc > maximumPoolSize且workQueue.isEmpty()
// 4. (timed && timedOut)且workQueue.isEmpty()
if ((wc > maximumPoolSize || (timed && timedOut)) ④
&& (wc > 1 || workQueue.isEmpty())) { ⑤
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ? ⑥
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
timed标识当前线程是否具有超时机制:
- wc > corePoolSize时,代表当前worker线程为非核心线程,timed恒等于true,说明非核心线程均具有超时机制;
- wc <= corePoolSize时,代表当前worker线程为核心线程,核心线程默认不具有超时机制(allowCoreThreadTimeOut默认为false),仅有allowCoreThreadTimeOut配置为true时,核心线程才具有线程机制。
换句话说。只要timed为true,当前worker线程必然具有超时机制。
(1)核心线程
假设某核心线程已将task执行完,且workQueue也为空,线程在runWorker()方法里继续阻塞执行getTask()方法,因为allowCoreThreadTimeOut默认为false,且wc <= corePoolSize,故timed为false。
此时,第一个判断:(wc > maximumPoolSize || (timed && timedOut)为false,直接执行⑥处代码,等待keepAliveTime后,因为workQueue为空,所以超时之后返回null,并将timeOut设置为true,接着继续执行①处循环。
由于timed一直等于false,所以该空闲的核心线程会一直阻塞在①处。
(2)非核心线程
假设某非核心线程已将task执行完,且workQueue也为空,线程在runWorker()方法里继续阻塞执行getTask()方法,因为allowCoreThreadTimeOut默认为false,但此时wc > corePoolSize,故timed为true。
此时,第一个判断:(wc > maximumPoolSize || (timed && timedOut)为false,直接执行⑥处代码,等待keepAliveTime后,因为workQueue为空,所以超时之后返回null,并将timeOut设置为true,接着继续执行①处循环。
继续执行到第一个判断,此时(wc > maximumPoolSize || (timed && timedOut)为true,继续执行第二个判断: (wc > 1 || workQueue.isEmpty()),此时第二个判断为true,尝试将工作线程数减1,若成功,则直接返回null,若失败,则继续执行①处循环,直到工作线程数减1操作成功。
(3)wc > maximumPoolSize
正常情况下,wc不会大于maximumPoolSize,因为添加worker时,会先判断线程数是否超过maximumPoolSize,若超过则不执行addWorker操作。之所以出现wc > maximumPoolSize,可能是某线程执行了setMaximumPoolSize操作,新设置的maximumPoolSize低于现有worker数。
此时当前worker执行getTask操作时,由于wc > maximumPoolSize,循环执行compareAndDecrementWorkerCount操作,直到成功返回null。接着跳出addWorker的while循环,继续执行finally代码块的processWorkerExit操作。
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly)
decrementWorkerCount();
// 从Workers Set中移除worker
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 任何对线程池有负效益的操作时,都需要尝试终止线程池
tryTerminate();
int c = ctl.get();
// 线程池状态为running或shutdown时,如果不是突然终止的,但当前线程数量少于需要维护的线程数量,则addWorker()
// 如果corePoolSize为0且workQueue不为空,则创建1个线程逐渐消耗掉workQueue中的任务
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
// allowCoreThreadTimeOut默认为false,min为核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min为0,即不需要维持核心线程数
// 但workQueue不为空,至少保持一个线程
if (min == 0 && ! workQueue.isEmpty())
min = 1;
// 如果线程数量大于最小数量,直接返回
// 否则下面至少要addWorker一个
if (workerCountOf(c) >= min)
return;
}
addWorker(null, false);
}
}
如果线程是突然终止的,说明是task执行时出现异常导致的,即run()方法执行时发生异常,那正在工作的线程数量需要减1。
如果不是突然终止的,说明是worker线程没有task可执行,不用减1,因为getTask()方法中已经减1了。