在一个应用程序中,我们需要多次使用线程,也就意味着需要多次创建并销毁线程。而创建并销毁线程的过程势必会消耗内存。而在Java中,内存资源是及其宝贵的,所以,就提出了线程池的概念
构造函数
public ThreadPoolExecutor(int corePoolSize, // 核心线程数
int maximumPoolSize, // 最大线程数(包含核心线程数量)
long keepAliveTime, // 大于核心线程数时 淘汰线程的时间
TimeUnit unit, // 时间单位
BlockingQueue<Runnable> workQueue, // 传输和保存等然任务的阻塞队列
ThreadFactory threadFactory, // 用于创建新线程
RejectedExecutionHandler handler) { // 线程的淘汰策略
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0) // 这里会对线程数量以及等待时间做一些基本校验
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
线程池中的状态
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
// 初始值 0001 1111 1111 1111 1111 1111 1111 1111
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS; // 111
private static final int SHUTDOWN = 0 << COUNT_BITS; // 000
private static final int STOP = 1 << COUNT_BITS; // 001
private static final int TIDYING = 2 << COUNT_BITS; // 010
private static final int TERMINATED = 3 << COUNT_BITS; // 011
可以看到线程池使用一个AtomicInteger来表示当前线程的情况,其中高三位表示当前线程池的状态,而低29位表示线程池的
需要注意的是 -1的二进制表示 是 1的二进制 取反在加1:
1: 0000 0000 0000 0000 0000 0000 0000 0001
取反: 1111 1111 1111 1111 1111 1111 1111 1110
加1: 1111 1111 1111 1111 1111 1111 1111 1111
线程状态流转:
运行状态 状态描述
RUNNING 接收新任务,并且也能处理阻塞队列中的任务。
SHUTDOWN 不接收新任务,但是却可以继续处理阻塞队列中的任务。
STOP 不接收新任务,同时也不处理队列任务,并且中断正在进行的任务。
TIDYING 所有任务都已终止,workercount(有效线程数)为0,线程转向 TIDYING 状态将会运行 terminated() 钩子方法。
TERMINATED terminated() 方法调用完成后变成此状态。
execute
execute 是我们提交一个线程的核心方法, 我们以该方法为入口看看线程池的实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 当前池中线程数量小于corePoolSize,基于传入的command创建新的线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 创建新线程
return;
c = ctl.get();
}
// 大于corePoolSize的话 则入队(阻塞队列的offer方法是不会阻塞的 插入失败直接返回false)
if (isRunning(c) && workQueue.offer(command)) {
// 当前线程池是running状态 并且入队成功
int recheck = ctl.get();
// recheck下如果不是running状态 删除掉这个命令
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) // 池中没有线程了 创建一个
addWorker(null, false);
}
else if (!addWorker(command, false)) // 尝试创建非核心线程
reject(command); // 创建失败 则执行reject 策略
}
addWorker
首先我们来看下 worker是个什么东西:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/**
* Thread this worker is running in. Null if factory fails.
*/
final Thread thread;
/**
* Initial task to run. Possibly null.
*/
Runnable firstTask;
/**
* Per-thread task counter
*/
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
*
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
}
可以看出, worker 本身继承Runnable,并且也是AQS的子类,所以worker本身也可以完成AQS的一些阻塞操作,这里自己继承AQS而不是使用ReentrangLock的目的就是防止重入。并且,在worker的构造方法中,会使用ThreadFactory创建一个新的线程,传入的Runnable就是自己,所以说线程池中跑的就是一个个Worker,然后通过firstTask保存用户传进来的任务,然后通过run方法,调用到runWorker,从而执行用户任务, 关于runWorker,后面解析。
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 获取线程池当前状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
整个方法可以拆分成两部分 :
第一部分:
retry: // 标识一个循环 如果有多层嵌套循环, 可以直接跳到retry标识的某个循环
for (;;) {
int c = ctl.get();
int rs = runStateOf(c); // 获取线程池当前状态
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
在外层的for循环里,有一个判断:
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty()))
return false;
转换为 :
rs >= shutdown && (rs != shutdown || firstTask != null || workQueue.isEmpty())
这个判断的意思是:
- rs > shoutdown , 也就是STOP,TIDYING,TERMINATED状态直接返回失败
- rs >= shutdown && firstTask != null,线程池状态处于 SHUTDOWN,STOP,TIDYING,TERMINATED状态且worker的首个任务不为空时,添加工作线程失败,不接受新任务
- rs >= shutdown && workQueue.isEmpty:线程池状态处于 SHUTDOWN,STOP,TIDYING,TERMINATED状态且阻塞队列为空时,添加工作线程失败,不接受新任务。
所以,最外层的 for 循环是不断的校验当前的线程池状态是否能接受新任务,如果校验通过了之后才能继续往下运行。
首先做判断:
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
如果已经大于最大容量 或 根据本次操作类型 判断是否已经大于核心线程数 或 最大线程数 如果已经超过 则返回失败。
if (compareAndIncrementWorkerCount(c))
break retry;
这里会增加ctl中记录的线程数,如果增加成功,则跳出外层循环,执行下面要将的第二部分。
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
上面增加ctl线程数量失败的话,再次检查当前状态 ,从外层循环再次执行。
当上面增加ctl中记录的线程个数后,就该执行第二部分了:
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask); // 创建一个work对象
final Thread t = w.thread; // 拿到work对象中的线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); // 这边会操作workers(HashSet) 所以要加锁
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get()); // 获取线程池运行状态
if (rs < SHUTDOWN || // RUNNING状态
(rs == SHUTDOWN && firstTask == null)) { // 这里为了传入任务为空的情况也可以创建
if (t.isAlive()) // 如果线程已经被启动了 直接抛出异常
throw new IllegalThreadStateException();
workers.add(w); // 将新创建的worker添加到workersSet中
int s = workers.size();
if (s > largestPoolSize) // 维护下largestPoolSize 这个标识
largestPoolSize = s;
workerAdded = true; // 标识位设置为true
}
} finally {
mainLock.unlock();
}
if (workerAdded) {// 如果本次添加worker成功
t.start(); // 让worker跑起来
workerStarted = true; // 设置启动成功的标志位
}
}
} finally {
if (!workerStarted)
addWorkerFailed(w);
}
return workerStarted; // 返回启动标志位
这边还有个addWorkerFailed方法:
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w); // 从set 中删除该worker
decrementWorkerCount(); // cas 将ctl中记录的线程数减1
tryTerminate(); // 判断是否需要终止整个线程池 需要的话 就终止
} finally {
mainLock.unlock();
}
}
runWorker
这里我们接着上面的Worker ,看下runWroker干了什么,也就是Worker是如何从队列中获取任务执行的(执行worker.thread.start()其实就是执行了这里):
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask; // worker 一开始被启动的时候 会传入这个任务
w.firstTask = null;
//由于Worker初始化时AQS中state设置为-1,这里要先做一次解锁把state更新为 0,允许线程中断
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// task为空 或者 从队列中获取任务为空
// 注意 这里getTask就是从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
// 如果线程池运行状态是stopping, 确保线程是中断状态;
// 如果不是stopping, 确保线程是非中断状态.
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 钩子方法 方便子类做一些标识
Throwable thrown = null;
try {
task.run(); // 执行任务的run方法
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++; // 维护下总完成任务标识
w.unlock();
}
}
completedAbruptly = false;
} finally
// 如果能执行到这里 就说明 该worker是时候被废弃了
processWorkerExit(w, completedAbruptly);
}
}
// 从队列中获取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) { // 循环拿任务
int c = ctl.get();
int rs = runStateOf(c); // 得到当前线程池状态
// Check if queue empty only if necessary.
// 针对当前线程池状态判断是否要直接停止执行队列中的任务
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount(); // 将worker数量减1
return null;
}
int wc = workerCountOf(c); // 当前线程池中worker总数量
// Are workers subject to culling?
// 如果设置了allowCoreThreadTimeOut = true 那么核心线程也是可以被淘汰的
// timed 用于表示是否需要表示是否需要校验时间(是否已经大于核心线程数)
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut)) // 当前worker数量大于最大线程数量 或者 已经超时
&& (wc > 1 || workQueue.isEmpty())) { // 线程数量大于1 并且队列已经空了
if (compareAndDecrementWorkerCount(c)) // 线程数量减1 成功 返回空
return null;
continue; // 否则继续循环
}
try {
// 如果需要使用时间判断 啧使用poll 否则使用take
// 对列为空的话 take会无止境阻塞 而poll等待时间后 直接返回空
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null) // 拿到任务的话直接返回
return r;
timedOut = true; // 否则下次循环
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
submit
然后来看下submit方法:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
可以看出 不管是哪种,都是通过RunnableFuture 将其包裹起来,然后通过execute执行,最后将Future对象返回出去。
动态修改核心线程数以及最大线程数
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers(); // 当前已存在的线程数大于新设置的值的话
else if (delta > 0) { // 增大核心线程数
// We don't really know how many new threads are "needed".
// As a heuristic, prestart enough new workers (up to new
// core size) to handle the current number of tasks in
// queue, but stop if queue becomes empty while doing so.
// 防止任务很多 ,这里提前创建一些worker 并start
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty()) // 如果等待队列为空 则停止添加worker
break;
}
}
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
if (workerCountOf(ctl.get()) > maximumPoolSize) // 将最大线程数减小的话
interruptIdleWorkers();
}
private void interruptIdleWorkers() { // 中断闲置线程
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// 线程不是中断状态 并且 拿到了worker的锁
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
这里需要注意的是在worker 中含有任务并且在运行的时候,会上锁(参看runWorker方法),也就是说 ,只有目前没有执行任务的worker ,这里才能拿到锁, 进而设置线程已中断。
参考:
https://www.cnblogs.com/jajian/p/11442929.html
https://mp.weixin.qq.com/s/hduWrrK4B8x8Z3C7RnIhjw
https://mp.weixin.qq.com/s/FJQ5MhB1kMp8lP1NA6q4Vg
https://tech.meituan.com/2020/04/02/java-pooling-pratice-in-meituan.html