几种状态值的32位二进制表示(无符号向左移29位)
COUNT_BITS :Integer.SIZE - 3 --->00000000000000000000000000011101 (十进制数值为:29)
CAPACITY :(1 << COUNT_BITS) - 1--->00011111111111111111111111111111 (十进制数值为:2^29-1)
RUNNING :-1 << COUNT_BITS --->11100000000000000000000000000000 (高3位)
SHUTDOWN : 0 << COUNT_BITS --->00000000000000000000000000000000 (高3位)
STOP : 1 << COUNT_BITS --->00100000000000000000000000000000 (高3位)
TIDYING : 2 << COUNT_BITS --->01000000000000000000000000000000 (高3位)
TERMINATED : 3 << COUNT_BITS --->01100000000000000000000000000000 (高3位)
线程池一共有五种状态, 分别是:
- RUNNING :能接受新提交的任务,并且也能处理阻塞队列中的任务;
- SHUTDOWN:关闭状态,不再接受新提交的任务,但却可以继续处理阻塞队列中已保存的任务。在线程池处于 RUNNING 状态时,调用 shutdown()方法会使线程池进入到该状态。(finalize() 方法在执行过程中也会调用shutdown()方法进入该状态);
- STOP:不能接受新任务,也不处理队列中的任务,会中断正在处理任务的线程。在线程池处于 RUNNING 或 SHUTDOWN 状态时,调用 shutdownNow() 方法会使线程池进入到该状态;
- TIDYING:如果所有的任务都已终止了,workerCount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入TERMINATED 状态。
- TERMINATED:在terminated() 方法执行完后进入该状态,默认terminated()方法中什么也没有做。进入TERMINATED的条件如下:
- 线程池不是RUNNING状态;
- 线程池状态不是TIDYING状态或TERMINATED状态;
- 如果线程池状态是SHUTDOWN并且workerQueue为空;
- workerCount为0;
- 设置TIDYING状态成功。
runStateOf方法的执行逻辑
- ctl与线程数量大小取反的结果进行按位运算,实际就是只保留高3位的值。计算结果为线程池的运行状态。
workerCountOf方法的执行逻辑
- ctl与线程数量大小进行按位与运算,实际就是只保留低29位的值。计算结果为当前运行中的线程数量。
ctlOf方法的执行逻辑
- runStateOf方法的执行结果和workerCountOf方法的执行结果按位进行或运算。实际是合并高3位和低29位的值到一个int类型的对象中去。
getTask方法执行逻辑
- 获取线程池的运行状态;
- 获取当前有效线程数量;
- 默认根据当前有效线程数量大于核心线程数量来决定是否执行超时控制;
- 在执行execute方法时,如果当前线程池的线程数量超过了corePoolSize且小于maximumPoolSize,并且workQueue已满时,则可以增加额外的工作线程直接执行线程,但是线程在执行getTask方法时如果超时没有获取到任务,也就是timedOut为true的情况,说明workQueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corePoolSize数量的线程销毁掉,保持线程数量在corePoolSize即可。
- 从getTask方法的内部逻辑可以看到构建线程池时参数keepAliveTime的意义,即:线程池维护的核心线程外的线程所允许的空闲时间。只有当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime,此时如果获取任务超时返回,在下一次循环中如果还有超过核心线程的线程,则对当前有效线程减一操作,直到线程减少到核心线程的数量阻塞等待新任务。
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
从它们的具体实现来看,它们实际上也是调用ThreadPoolExecutor,只不过参数都已配置好了。
ThreadPoolExecutor类是线程池中最核心的一个类。
ThreadPoolExecutor 线程池核心源码分析注释
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* ThreadPoolExecutor 线程池核心源码分析注释
*/
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* ctl 是int型数字(4字节/32位),高3位表示线程池状态,低29位表示线程数据
* 此时ctl为二进制值为: 1010 0000 0000 0000 0000 0000 0000 0000
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
/**
* 值为29
*/
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 线程数容量,低29位表示有效的线程数
* 二进制值为:0001 1111 1111 1111 1111 1111 1111 1111
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
/**
* 入参为ctl的值
* ~CAPACITY 对CAPACITY取非的未运算
* ~CAPACITY:高3位为1低29位全为0
* 运算结果为ctl的高3位, 也就是运行状态
*
* @param c
* @return
*/
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/**
* 入参为ctl的值
* CAPACITY:高3位为0低29位全为1
* 运算结果为ctl的低29位, 也就是有效的线程数量
*
* @param c
* @return
*/
private static int workerCountOf(int c) {
return c & CAPACITY;
}
/**
* 入参为高3位的运行状态和低29位的有效线程数
* 或运算,结果为一个完整的32位数
*
* @param rs
* @param wc
* @return
*/
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
/**
* 运行状态比较
*
* @param c
* @param s
* @return
*/
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
/**
* 判断是否位RUNNING
* (小于SHUTDOWN的状态只有RUNNING)
*
* @param c
* @return
*/
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
/**
* 使用CAS增加一个有效线程
*
* @param expect
* @return
*/
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
/**
* 使用CAS减少一个有效线程
*
* @param expect
* @return
*/
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
/**
* 减少一个有效线程
*/
private void decrementWorkerCount() {
do {
} while (!compareAndDecrementWorkerCount(ctl.get()));
}
private final BlockingQueue<Runnable> workQueue;
private final ReentrantLock mainLock = new ReentrantLock();
/**
* Set containing all worker threads in pool. Accessed only when
* holding mainLock.
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* Wait condition to support awaitTermination
*/
private final Condition termination = mainLock.newCondition();
/**
* Tracks largest attained pool size. Accessed only under
* mainLock.
*/
private int largestPoolSize;
/**
* Counter for completed tasks. Updated only on termination of
* worker threads. Accessed only under mainLock.
*/
private long completedTaskCount;
private volatile ThreadFactory threadFactory;
/**
* Handler called when saturated or shutdown in execute.
*/
private volatile RejectedExecutionHandler handler;
/**
* Timeout in nanoseconds for idle threads waiting for work.
* Threads use this timeout when there are more than corePoolSize
* present or if allowCoreThreadTimeOut. Otherwise they wait
* forever for new work.
*/
private volatile long keepAliveTime;
/**
* If false (default), core threads stay alive even when idle.
* If true, core threads use keepAliveTime to time out waiting
* for work.
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* Core pool size is the minimum number of workers to keep alive
* (and not allow to time out etc) unless allowCoreThreadTimeOut
* is set, in which case the minimum is zero.
*/
private volatile int corePoolSize;
/**
* Maximum pool size. Note that the actual maximum is internally
* bounded by CAPACITY.
*/
private volatile int maximumPoolSize;
/**
* The default rejected execution handler
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
/**
* Worker类,每个Worker类包含了一个线程、一个初始任务、一个任务计算器
*/
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/**
* Delegates main run loop to outer runWorker
*/
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() {
acquire(1);
}
public boolean tryLock() {
return tryAcquire(1);
}
public void unlock() {
release(1);
}
public boolean isLocked() {
return isHeldExclusively();
}
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
private void advanceRunState(int targetState) {
for (; ; ) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
/**
* 尝试终止线程池
*/
final void tryTerminate() {
for (; ; ) {
int c = ctl.get();
// 只有当前状态为STOP 或者 SHUTDOWN并且队列为空,才会尝试整理并终止
// 1: 当前状态为RUNNING,则不尝试终止,直接返回
// 2: 当前状态为TIDYING或TERMINATED,代表有其他线程正在执行终止,直接返回
// 3: 当前状态为SHUTDOWN 并且 workQueue不为空,则不尝试终止,直接返回
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 走到这代表线程池可以终止(通过上面的校验)
// 如果此时有效线程数不为0, 将中断一个空闲的Worker,以确保关闭信号传播
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 使用CAS将ctl的运行状态设置为TIDYING,有效线程数设置为0
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
// 将ctl的运行状态设置为TERMINATED,有效线程数设置为0
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
/**
* Interrupts all threads, even if active. Ignores SecurityExceptions
* (in which case some threads may remain uninterrupted).
*/
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
/**
* Common form of interruptIdleWorkers, to avoid having to
* remember what the boolean argument means.
*/
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private static final boolean ONLY_ONE = true;
/*
* Misc utilities, most of which are also exported to
* ScheduledThreadPoolExecutor
*/
/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
/**
* Performs any further cleanup following run state transition on
* invocation of shutdown. A no-op here, but used by
* ScheduledThreadPoolExecutor to cancel delayed tasks.
*/
void onShutdown() {
}
/**
* State check needed by ScheduledThreadPoolExecutor to
* enable running tasks during shutdown.
*
* @param shutdownOK true if should return true if SHUTDOWN
*/
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
/**
* Drains the task queue into a new list, normally using
* drainTo. But if the queue is a DelayQueue or any other kind of
* queue for which poll or drainTo may fail to remove some
* elements, it deletes them one by one.
*/
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
/**
* 添加一个Worker对象
*
* @param firstTask
* @param core
* @return
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (; ; ) {
int c = ctl.get();//获取ctl的当前值
int rs = runStateOf(c);//获取线程池当前的运行状态
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
for (; ; ) {
int wc = workerCountOf(c);//获取有效的线程数
//判断线程数有没有超过阀值
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//使用CAS将workerCount+1, 修改成功则跳出循环,否则进入下面的状态判断
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新获取ctl的当前值
// 判断当前运行状态,如果不等于上面获取的运行状态rs,
// 说明rs被其他线程修改了,跳到retry重新校验线程池状态
if (runStateOf(c) != rs)
continue retry;
// 走到这里说明compareAndIncrementWorkerCount失败;
// 重试内部循环(状态没变,则继续内部循环,尝试使用CAS修改workerCount
}
}
boolean workerStarted = false;// Worker的线程是否启动
boolean workerAdded = false;// Worker是否成功增加
Worker w = null;
try {
w = new Worker(firstTask);// 用firstTask和当前线程创建一个Worker
final Thread t = w.thread;// 拿到Worker对应的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 加锁的情况下重新获取当前的运行状态
int rs = runStateOf(ctl.get());
// 如果当前的运行状态为RUNNING,
// 或者当前的运行状态为SHUTDOWN并且firstTask为空,则通过校验
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())// 预先校验线程是可以启动的
throw new IllegalThreadStateException();
workers.add(w);// 将刚创建的worker添加到工作者列表
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {// 如果Worker添加成功,则启动线程执行
t.start();
workerStarted = true;
}
}
} finally {
if (!workerStarted) // 如果Worker的线程没有成功启动
addWorkerFailed(w);// 则进行回滚, 移除之前添加的Worker
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate();
} finally {
mainLock.unlock();
}
}
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果Worker是异常死亡(completedAbruptly=true),则workerCount-1;
// 如果completedAbruptly为false的时候(正常超时退出),则代表task=getTask()等于null,
// getTask()方法中返回null的地方,都已经将workerCount - 1,所以此处无需再-1
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;// 该Worker完成的任务数加到总完成的任务数
workers.remove(w);// 移除该Worker
} finally {
mainLock.unlock();
}
tryTerminate();// 有Worker线程移除,可能是最后一个线程退出,需要尝试终止线程池
int c = ctl.get();
if (runStateLessThan(c, STOP)) {// 如果线程池的运行状态还没停止(RUNNING或SHUTDOWN)
if (!completedAbruptly) { // 如果Worker不是异常死亡
// min为线程池的理论最小线程数:如果允许核心线程超时则min为0,否则min为核心线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min为0,工作队列不为空,将min设置为1,确保至少有1个Worker来处理队列里的任务
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 当前有效的线程数>=min,直接返回;
if (workerCountOf(c) >= min)
return; // replacement not needed
// 如果代码走到这边,代表workerCountOf(c) < min,此时会走到下面的addWorker方法。
// 通过getTask方法我们知道,当allowCoreThreadTimeOut为false
// 并且workerCount<=corePoolSize时,是不会走到processWorkerExit方法的。
// 因此走到这边只可能是当前移除的Worker是最后一个Worker,但是此时工作
// 队列还不为空,因此min被设置成了1,所以需要在添加一个Worker来处理工作队列。
}
addWorker(null, false);
}
}
/**
* Worker从工作队列获取任务
*
* @return
*/
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (; ; ) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池运行状态为停止,或者可以停止(状态为SHUTDOWN并且队列为空)
// 则返回null,代表当前Worker需要移除
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断当前Worker是否可以被移除, 即当前Worker是否可以一直等待任务。
// 如果allowCoreThreadTimeOut为true,或者workerCount大于核心线程数,
// 则当前线程是有超时时间的(keepAliveTime),无法一直等待任务。
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 如果wc超过最大线程数 或者 当前线程会超时并且已经超时,
// 并且wc > 1 或者 工作队列为空,则返回null,代表当前Worker需要移除
if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
// 根据线程是否会超时调用相应的方法,poll为带超时的获取任务方法
// take()为不带超时的获取任务方法,会一直阻塞直到获取到任务
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
/**
* 线程池的核心方法
*
* @param command
*/
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
* 线程池处理逻辑分4部分
* 1、当前正在运行的线程数小于指定的corePoolSize大小,直接创建新的线程;
* 2、当前正在运行的线程数不小于指定的corePoolSize大小,但是等待队列未满(可以放入的等待队列,则放入到等待队列等待处理);
* 3、当前正在运行的线程数不小于指定的corePoolSize大小,并且等待队列已满,则会创建新的线程,直到线程数达到最大线程数;
* 4、当前正在运行的线程数不小于指定的corePoolSize大小,并且等待队列已满,并且创建新的线程达到最大线程数,则走拒绝策略。
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (!isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} else if (!addWorker(command, false))
reject(command);
}
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(SHUTDOWN);
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
public boolean isShutdown() {
return !isRunning(ctl.get());
}
public boolean isTerminating() {
int c = ctl.get();
return !isRunning(c) && runStateLessThan(c, TERMINATED);
}
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (; ; ) {
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
/**
* Invokes {@code shutdown} when this executor is no longer
* referenced and it has no threads.
*/
protected void finalize() {
shutdown();
}
/**
* Sets the thread factory used to create new threads.
*
* @param threadFactory the new thread factory
* @throws NullPointerException if threadFactory is null
* @see #getThreadFactory
*/
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
/**
* Returns the thread factory used to create new threads.
*
* @return the current thread factory
* @see #setThreadFactory(ThreadFactory)
*/
public ThreadFactory getThreadFactory() {
return threadFactory;
}
/**
* Sets a new handler for unexecutable tasks.
*
* @param handler the new handler
* @throws NullPointerException if handler is null
* @see #getRejectedExecutionHandler
*/
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
/**
* Returns the current handler for unexecutable tasks.
*
* @return the current handler
* @see #setRejectedExecutionHandler(RejectedExecutionHandler)
*/
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
/**
* Sets the core number of threads. This overrides any value set
* in the constructor. If the new value is smaller than the
* current value, excess existing threads will be terminated when
* they next become idle. If larger, new threads will, if needed,
* be started to execute any queued tasks.
*
* @param corePoolSize the new core size
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @see #getCorePoolSize
*/
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
/**
* Returns the core number of threads.
*
* @return the core number of threads
* @see #setCorePoolSize
*/
public int getCorePoolSize() {
return corePoolSize;
}
/**
* Starts a core thread, causing it to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed. This method will return {@code false}
* if all core threads have already been started.
*
* @return {@code true} if a thread was started
*/
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
/**
* Same as prestartCoreThread except arranges that at least one
* thread is started even if corePoolSize is 0.
*/
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
/**
* Starts all core threads, causing them to idly wait for work. This
* overrides the default policy of starting core threads only when
* new tasks are executed.
*
* @return the number of threads started
*/
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
interruptIdleWorkers();
}
/**
* Returns the thread keep-alive time, which is the amount of time
* that threads in excess of the core pool size may remain
* idle before being terminated.
*
* @param unit the desired time unit of the result
* @return the time limit
* @see #setKeepAliveTime(long, TimeUnit)
*/
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
/* User-level queue utilities */
/**
* Returns the task queue used by this executor. Access to the
* task queue is intended primarily for debugging and monitoring.
* This queue may be in active use. Retrieving the task queue
* does not prevent queued tasks from executing.
*
* @return the task queue
*/
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
/**
* Removes this task from the executor's internal queue if it is
* present, thus causing it not to be run if it has not already
* started.
* <p>
* <p>This method may be useful as one part of a cancellation
* scheme. It may fail to remove tasks that have been converted
* into other forms before being placed on the internal queue. For
* example, a task entered using {@code submit} might be
* converted into a form that maintains {@code Future} status.
* However, in such cases, method {@link #purge} may be used to
* remove those Futures that have been cancelled.
*
* @param task the task to remove
* @return {@code true} if the task was removed
*/
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); // In case SHUTDOWN and now empty
return removed;
}
/**
* Tries to remove from the work queue all {@link Future}
* tasks that have been cancelled. This method can be useful as a
* storage reclamation operation, that has no other impact on
* functionality. Cancelled tasks are never executed, but may
* accumulate in work queues until worker threads can actively
* remove them. Invoking this method instead tries to remove them now.
* However, this method may fail to remove tasks in
* the presence of interference by other threads.
*/
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>) r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
/* Statistics */
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate number of threads that are actively
* executing tasks.
*
* @return the number of threads
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns the largest number of threads that have ever
* simultaneously been in the pool.
*
* @return the number of threads
*/
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have ever been
* scheduled for execution. Because the states of tasks and
* threads may change dynamically during computation, the returned
* value is only an approximation.
*
* @return the number of tasks
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* Returns the approximate total number of tasks that have
* completed execution. Because the states of tasks and threads
* may change dynamically during computation, the returned value
* is only an approximation, but one that does not ever decrease
* across successive calls.
*
* @return the number of tasks
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
/**
* Returns a string identifying this pool, as well as its state,
* including indications of run state and estimated worker and
* task counts.
*
* @return a string identifying this pool, as well as its state
*/
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
/* Extension hooks */
/**
* Method invoked prior to executing the given Runnable in the
* given thread. This method is invoked by thread {@code t} that
* will execute task {@code r}, and may be used to re-initialize
* ThreadLocals, or to perform logging.
* <p>
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.beforeExecute} at the end of
* this method.
*
* @param t the thread that will run task {@code r}
* @param r the task that will be executed
*/
protected void beforeExecute(Thread t, Runnable r) {
}
/**
* Method invoked upon completion of execution of the given Runnable.
* This method is invoked by the thread that executed the task. If
* non-null, the Throwable is the uncaught {@code RuntimeException}
* or {@code Error} that caused execution to terminate abruptly.
* <p>
* <p>This implementation does nothing, but may be customized in
* subclasses. Note: To properly nest multiple overridings, subclasses
* should generally invoke {@code super.afterExecute} at the
* beginning of this method.
* <p>
* <p><b>Note:</b> When actions are enclosed in tasks (such as
* {@link FutureTask}) either explicitly or via methods such as
* {@code submit}, these task objects catch and maintain
* computational exceptions, and so they do not cause abrupt
* termination, and the internal exceptions are <em>not</em>
* passed to this method. If you would like to trap both kinds of
* failures in this method, you can further probe for such cases,
* as in this sample subclass that prints either the direct cause
* or the underlying exception if a task has been aborted:
* <p>
* <pre> {@code
* class ExtendedExecutor extends ThreadPoolExecutor {
* // ...
* protected void afterExecute(Runnable r, Throwable t) {
* super.afterExecute(r, t);
* if (t == null && r instanceof Future<?>) {
* try {
* Object result = ((Future<?>) r).get();
* } catch (CancellationException ce) {
* t = ce;
* } catch (ExecutionException ee) {
* t = ee.getCause();
* } catch (InterruptedException ie) {
* Thread.currentThread().interrupt(); // ignore/reset
* }
* }
* if (t != null)
* System.out.println(t);
* }
* }}</pre>
*
* @param r the runnable that has completed
* @param t the exception that caused termination, or null if
* execution completed normally
*/
protected void afterExecute(Runnable r, Throwable t) {
}
/**
* Method invoked when the Executor has terminated. Default
* implementation does nothing. Note: To properly nest multiple
* overridings, subclasses should generally invoke
* {@code super.terminated} within this method.
*/
protected void terminated() {
}
/* Predefined RejectedExecutionHandlers */
/**
* A handler for rejected tasks that runs the rejected task
* directly in the calling thread of the {@code execute} method,
* unless the executor has been shut down, in which case the task
* is discarded.
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() {
}
/**
* Executes task r in the caller's thread, unless the executor
* has been shut down, in which case the task is discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() {
}
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
/**
* A handler for rejected tasks that silently discards the
* rejected task.
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() {
}
/**
* Does nothing, which has the effect of discarding task r.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* A handler for rejected tasks that discards the oldest unhandled
* request and then retries {@code execute}, unless the executor
* is shut down, in which case the task is discarded.
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() {
}
/**
* Obtains and ignores the next task that the executor
* would otherwise execute, if one is immediately available,
* and then retries execution of task r, unless the executor
* is shut down, in which case task r is instead discarded.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
注意:线程池中的每一个线程被封装成一个Worker对象,ThreadPool维护的其实就是一组Worker对象。