线程池
什么使用使用线程池?
- 单个任务处理时间比较短
- 需要处理的任务数量很大
线程池优势
- 重用存在的线程,减少线程创建、消亡的开销,提高性能
- 提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立即执行
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以处进行统一的分配,调优和监控。
线程池的实现方式
- Runnable:实现Runnable接口的类将被Thread执行,表示一个基本的任务
- Thread
- Callable:Callable同样是任务,与Runnable接口的区别在于它接收泛型,同时它执行任务后带有返回内容
Executor
public interface Executor {
void execute(Runnable command);
}
Executor 接口是线程池框架中最基础的部分,定义了一个 void execute(Runnable command)
方法,代表提交一个任务,由线程池来执行这个任务。
ExecutorService
Executor 下有一个重要子接口 ExecutorService,其中定义了一些可以操作线程池的方法:
public interface ExecutorService extends Executor {
// 关闭线程池,已经提交的任务继续执行,不再接受新的任务
void shutdown();
// 尝试停止正在执行的任务,返回等待执行的任务列表。因为停止正在执行的线程使用 Thread.interrupt() 方法, 所以不保证能够完全停止
List<Runnable> shutdownNow();
// 当前线程池是否已经关闭
boolean isShutdown();
// 如果关闭后,所有任务都已经完成,则返回true
// 并且只有先调用 shutdown 或 shutdownNow 才会返回 true
boolean isTerminated();
// 等待请求关闭线程池后,所有的任务完成或者等待超时
// 如果所有的任务都已经完成了,则返回 true,超时的话返回 false
boolean awaitTermination(long timeout, TimeUnit unit);
// 提交一个 Callable 任务,并返回一个表示任务的挂起结果的 Future,之后可以通过 Future 的 get() 方法来获取任务成功完成后返回的结果
<T> Future submit(Callable<T> task);
// 提交一个 Runnable 任务,因为 Runable 的 run() 方法没有返回值,第二个参数会放到 Future 中作为返回值
<T> Future submit(Runnalbe task, T result);
// 提交一个 Runable 任务,因为没有指定返回值,所以之后在 Future 中获取的返回值为 null
Future<?> submit(Runnable task);
// 执行所有任务,返回 Future 类型的集合
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks);
// 执行所有任务,但设置了超时时间
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
// 执行所有的任务,只要其中一个执行结束,就可以返回那个任务的结果
<T> invokeAny(Collection<? extends Callable<T>> tasks);
// 同上一个方法,设置了超时时间
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit);
}
ThreadPoolExecutor
ThreadPoolExecutor
是 JDK 中线程池的默认实现,实现了线程池中的基本方法,可以直接使用,或者基本它扩展,来实现我们需要的功能。
线程池的创建
下面这个是 ThreadPoolExecutor
的核心构造函数,其他的构造函数最终都会走到该构造方法。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectExecutionHandler handler)
corePoolSize & maximumPoolSize
corePoolSize
是线程池中的核心线程数,maxinumPoolSize
是线程池中允许的最大线程数。
当当前正在运行的线程数小于 corePoolSize 值,新提交任务时,会创建一个新的线程来执行,即使有空闲的线程;当当前正在运行的线程数大于 corePoolSize 值,且小于 maximumPoolSize 值,新任务会提交到等待队列中,直到等待队列满了,才会创建新的线程执行任务。
默认情况下,一个新的线程池中,只有任务提交时,核心线程才会创建,但可以通过执行 prestartCoreThread()
或 prestartAllCoreThreads()
方法,提前创建并启动所有核心线程。
keepAliveTime
线程允许的最大空闲时间。且默认情况下,只会在超时后,销毁非核心线程。也可以通过调用 allowCoreThreadTimeOut(true)
方法,来将这种策略应用于核心线程。
unit
keepAliveTime 的时间单位
workQueue
workQueue
是用来保存等待被执行任务的阻塞队列,通常和线程池的大小对应调整。
threadFactory
创建线程的工厂类,用来创建新线程。
如果没有指定,默认使用 Executors.defaultThreadFactory()
来创建线程,它创建的所有线程,都处于相同的 ThreadGroup 中,并具有相同的 NORM_PRIORITY 优先级,并且是非守护线程。
通过提供不同的 ThreadFactory,可以更改线程的名称、线程组、优先级、守护进程状态等,如果 ThreadFactory 在从 newThread 返回 null 时无法创建新线程,执行器将继续执行,但可能无法执行任务。
handler
线程池的拒绝策略。
当线程池处于关闭状态,或者阻塞队列满了,而且线程数量已经达到了 maximumPoolSize,再提交任务时,就会执行当前指定的策略。
线程池的5种状态
状态分类
RUNNING
线程池处在 RUNNING 状态时,能够接收新任务,以及对已添加的任务进行处理。
该状态是线程池的初始状态,线程池一旦被创建,就处于 RUNNING 状态
SHUTDOWN
线程池处于 SHUTDOWN 状态时,不接收新任务,但能处理等待队列中的任务。
线程池在 RUNNING 状态下,调用 shutdown()
方法,会变成 SHUTDOWN 状态。
STOP
线程池处于 STOP 状态时,不接收新任务,不再处理等待队列中的任务,并且会中断正在处理的任务
线程池在 RUNNING 状态下,调用 shutdownNow()
方法,变为 STOP 状态
TIDYING
所有的任务都销毁了,工作线程数量为0,线程池的状态在转换为 TIDYING 状态时,会执行钩子方法 terminated()
线程池在 SHUTDOWN 状态时,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN 状态变为 TIDYING 状态;线程池在 STOP 状态时,线程池中执行的任务为空时,就会由 STOP 状态变为 TIDYING 。
TERMINATED
terminated()
方法执行之后,线程池彻底终止,就变成 TERMINATED 状态。
转换图
线程池中的具体实现
/**
* 存放线程池的状态和线程池的有效线程数
* 其中,高3位用于存放线程池状态(runState),低29位表示线程数(workerCount)
*/
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
/**
* 代表线程池的最大线程数
* (1 << 29) - 1 = 000 1111 1111 1111 1111 1111 1111 1111 1
*/
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
/**
* 线程池的 RUNNING 状态的代表值,3个1 + 29个0
* 111 00000000000000000000000000000
*/
private static final int RUNNING = -1 << COUNT_BITS;
/**
* 000 00000000000000000000000000000
*/
private static final int SHUTDOWN = 0 << COUNT_BITS;
/**
* 001 00000000000000000000000000000
*/
private static final int STOP = 1 << COUNT_BITS;
/**
* 010 00000000000000000000000000000
*/
private static final int TIDYING = 2 << COUNT_BITS;
/**
* 011 00000000000000000000000000000
*/
private static final int TERMINATED = 3 << COUNT_BITS;
/**
* 获取线程池的状态
* 把 c 的低 29 位改为 0
*/
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/**
* 获取线程池的有效线程数
* 把 c 的高 3 位改为 0
*/
private static int workerCountOf(int c) {
return c & CAPACITY;
}
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
核心方法
execute
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/*
* 如果当前线程数 < 核心线程数,则新建一个线程放入线程池中;
* 把当前任务作为这个线程的第一个任务
*/
if (workerCountOf(c) < corePoolSize) {
/*
* 添加一个worker,如果返回为true,则添加任务成功,当前方法执行结束
*/
if (addWorker(command, true))
return;
/*
* 如果添加失败,则重新获取 ctl 值
*/
c = ctl.get();
}
/*
* 执行到这,有两种情况:
* 1. 当前线程数 >= 核心线程数
* 2. 添加 worker 失败
*/
/*
* 检查线程池的状态,确保当前处于 running 状态,并且添加任务对队列中
*/
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
/*
* 如果线程池不处于 RUNNING 状态,就移除缓存队列中的当前任务,
* 并且执行拒绝策略
*/
reject(command);
else if (workerCountOf(recheck) == 0)
/*
* 如果线程池还是 RUNNING 状态,并且线程数量为0,那么开启新的线程
*/
addWorker(null, false);
/*
* 当非 RUNNING 状态,或者缓存队列已满,则尝试开启新的线程
* addWorker 中,重新校验了状态,所以非 RUNNING 状态,直接返回 false
*/
} else if (!addWorker(command, false))
reject(command);
}
简单来说,执行 execute()
方法,总的流程如下:
- 当 workerCount < corePoolSize,则创建并启动一个新的线程来执行新提交的任务
- 当 workerCount >= corePoolSize:
- 如果阻塞队列未满,则将任务添加到阻塞队列中;
- 如果阻塞队列已满,则创建并启动一个新的线程来执行新提交的任务来执行新提交的任务;
- 如果 workCount >= maximumPoolSize,并且线程池内的阻塞队列已满,则执行拒绝策略。
addWorker
/**
* firstTask : 提交给这个线程执行的任务
* core : 如果为 true,则使用 corePoolSize 作为创建线程的界限,也就是说,创建线程时,
* 如果线程池中的线程总数已经达到了 corePoolSize,则创建失败
* 如果是 false,代表使用 maximumPoolSize 作为界限
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* 下面的判断可以概括为两部分:
* 1. 如果线程池处于STOP、TIDYING、TERMINATED状态时,线程池不允许提交任务,
* 且会中断已有的任务,所以不允许新建线程,直接返回false
* 2. 如果线程池处于SHUTDOWN状态时,线程池不允许提交任务,但会处理已有的任务,
* 所以,该状态时,需要满足两个条件,才可以新建线程:
* - 提交的任务 firstTask 为空
* - 缓存队列不为空
*/
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
/*
* 尝试增加线程数量
*/
if (compareAndIncrementWorkerCount(c))
break retry;
/*
* 如果由于并发问题,添加失败的话,重新获取 ctl
* 重新获取之后,防止其他线程将线程池状态改掉,再校验状态
*/
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
/*
* 当跳出上面循环后,目前是符合新建线程的条件,且线程数量也增加了
*/
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 根据 firstTask 来创建 Worker 对象
w = new Worker(firstTask);
// 获取 Worker 中的线程对象
final Thread t = w.thread;
if (t != null) {
/*
* 这是整个整个线程池的全局锁,当关闭一个线程池时,需要这个锁,
* 所以在开启新线程的时候,线程池不会被关闭
*/
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
/*
* 这里为了防止在获取锁的过程中,线程池的状态被修改,再次判断其状态。
* 这里,有两种情况会生成新的线程:
* 1.小于 SHUTDOWN ,即为 RUNNING 状态
* 2.如果等于 SHUTDOWN,不接受新的任务,但是会继续执行等待队列中的任务
*/
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// worker 中的线程 thread 是不能已经启动的
if (t.isAlive())
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
/*
* largestPoolSize 记录 workers 中线程数量出现的最大值
*/
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 线程添加成功,则启动该线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有启动
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
在 addWorker
方法中,提到了一个内部类 —— Worker
,在线程池中,将其中的线程包装成了 Worker(工人),就是指线程池中做任务的线程。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable {
private static final long serialVersionUID = 6138294804551838833L;
/** 真正的线程,执行任务. */
final Thread thread;
/** 创建线程时执行的任务,可以为空 */
Runnable firstTask;
/** 线程完成的任务数 */
volatile long completedTasks;
/**
* 根据给定的任务创建线程,传入的 firstTask 可以为 null
*/
Worker(Runnable firstTask) {
setState(-1); // 没有执行任务时,线程禁止被中断
this.firstTask = firstTask;
// 调用 ThreadFactory 来创建一个新的线程
this.thread = getThreadFactory().newThread(this);
}
/*
* 因为 Worker 本身继承了 Runnable 接口,也就是一个线程,所以 Worker 对象在启动的时候,会调用 run 方法
* 这里调用了外部类的 runWorker 方法
*/
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
这里需要注意的是,Worker 中使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?可以看到 tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的:
- lock 方法一旦获取了独占锁,表示当前线程正在执行任务中
- 如果正在执行任务,则不应该中断线程
- 如果该线程现在不是独占锁的状态,也就是空闲的状态,说明它没有在处理任务,这时可以对该线程进行中断;
- 线程池在执行 shutdown 方法或 tryTerminate 方法时,会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方法会使用 tryLock 方法来判断线程池中的线程是否是空闲状态;
- 之所以设置为不可重入,是因为不希望任务在调用像 setCorePoolSize 这样的线程池控制方法时重新获取锁。如果使用使用 ReentrantLock,它是可重入的,这样如果在任务重调用了如 setCorePoolSize 这类线程池控制的方法,会中断正在运行的线程。
所以,Woker 继承自 AQS,用于判断线程是否空闲以及是否可以被中断。
runWorker
Worker 在启动后,调用了 runWorker 方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 获取 worker 初始化时的第一个任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
// 表示在执行任务过程中是否出现了异常
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
/*
* 如果线程池正在停止,则要确保当前线程是中断状态;
* 如果线程池没有停止,则要确保当前线程没有被中断。
* 这里当第一次确认线程池没有停止时,调用 Thread.interrupted() 方法,
* 清除中断标记,之所以又校验了一次状态,是为了确保线程池没有在这期间,
* 调用 shutdownNow() 方法。
*/
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
// 执行任务之前的钩子函数
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
// 执行任务之后的钩子函数
afterExecute(task, thrown);
}
} finally {
// 清除当前任务,准备获取下一个任务
task = null;
// 累加完成的任务数
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 销毁线程
processWorkerExit(w, completedAbruptly);
}
}
getTask
getTask 用于从任务队列中取任务
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
/*
* 这里分为两种情况:
* 1.rs == SHUTDOWN && workQueue.isEmpty()
* 2.rs >= STOP
*/
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 当出现上面两种情况时,当前线程不需要再执行任务,则减少工作线程数
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
/*
* 当出现两种情况时,该线程可以超时关闭:
* 1.允许核心线程数内的线程回收
* 2.当前线程数超过了核心线程数
*/
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
/*
* 1.当线程数 > maximumPoolSize(这里是为了防止在此方法执行阶段同时执行了 setMaximumPoolSize 方法)或者超时
* 2.有效线程数 > 1或者阻塞队列是空的
* 当满足上面两个条件时,尝试将 workerCount 减 1
* 如果失败,则返回重试
*/
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
/*
* 如果允许超时的话,则调用workerQueue.pool方法,进行超时控制,如果在 keepAliveTime 时间内没有获取到任务,则返回 null;
* 否则,调用 workerQueue.take 方法,一直阻塞到获取到任务
*/
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
// 如果没有获取到任务,说明超时了
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 如果 completedAbruptly 为 true,则说明执行任务时出现了异常
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 统计完成的任务数
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 根据线程池状态进行判断是否结束线程池
tryTerminate();
int c = ctl.get();
/*
* 如果线程池是 RUNNING 或 SHUTDOWN 状态,且出现了异常,判断线程池的最小线程数量,如果线程池内的线程 < 最小有效线程数量,则调用 addWorker 添加一个新的线程
*/
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
addWorker(null, false);
}
}
拒绝策略
当线程池处于关闭状态,或者阻塞队列满了,而且线程数量已经达到了 maximumPoolSize,再提交任务时,就会执行相应的拒绝策略,对应创建线程池时的 handle 属性。
所有的拒绝策略都继承自 RejectedExecutionHandler
接口,在 ThreadPoolExecutor 中已经定义了四个实现类,供我们直接使用。
AbortPolicy
不管怎样,直接抛出 RejectedExecutionHandler 异常,这是线程池默认的策略。
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
CallerRunsPolicy
如果线程池没有被关闭,那么由调用者所在的线程来执行任务
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
DiscardPolicy
什么都不操作,直接丢弃任务
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
DiscardOldestPolicy
如果线程池没有被关闭,则丢弃队列中最靠前的任务,并执行当前任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}