线程池的使用和源码
相关api
创建线程池
- 第一种方式: 使用工厂类:
Executors.newFixedThreadPool()
,不推荐
- 第一种方式: 使用工厂类:
-
- 第二种方式: 实例化ThreadPoolExecutor类 参数列表:
-
int corePoolSize
, 线程池中正在执行的线程个数,空闲状态也不会退出 -
int maximumPoolSize
, 线程池中可创建的最大线程个数 -
long keepAliveTime
, 当线程数大于核心时,多于的空闲线程最多存活时间 -
TimeUnit unit
, 时间单位 -
BlockingQueue<Runnable> workQueue
, 使用线程来执行任务的任务队列,不设置大小的时候,为无限大(使用第一种工厂类的方式创建默任务队列无限大)。设置大小的时候,最好设置适合的拒绝策略 -
ThreadFactory threadFactory
, 线程工厂 -
handler
, 自定义线程池满的拒绝策略
向线程池中添加任务
调用ThreadPoolExecutor类的submit方法,给线程添加任务。submit的参数是一个函数式接口,调用方式:
- 第一种添加方式:实例化一个Thread类,
- 实例化匿名内部类
Future<String> result = executorService.submit(new Runnable() {
public void run() {
.......
}
});
- 使用Lambda 表达式
lambda表达式允许你通过表达式来代替功能接口,同时Lambda表达式只能针对函数式接口(当接口里只有一个抽象方法的时候,就是函数式接口,可以使用@FunctionalInterfac注解限定,可以有其他方法,不在这里将)使用
。
结果获取
future.get();
线程池源码
设计到的相关类:
线程池状态转变
其中AtomicInteger变量ctl:低29位表示线程池中线程数,高3位表示线程池的运行状态:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3; //29
private static final int COUNT_MASK = (1 << COUNT_BITS) - 1; // 000 1111111111111111111111111111
// runState is stored in the high-order bits
// -1的二进制1111111111111111
private static final int RUNNING = -1 << COUNT_BITS;// 111 00000000000000000000000000000 即高3位为111,该状态的线程池会接收新任务,并处理阻塞队列中的任务;
private static final int SHUTDOWN = 0 << COUNT_BITS;// 000 00000000000000000000000000000 即高3位为000,该状态的线程池不会接收新任务,但会处理阻塞队列中的任务;
private static final int STOP = 1 << COUNT_BITS;// 001 00000000000000000000000000000 即高3位为001,该状态的线程不会接收新任务,也不会处理阻塞队列中的任务,而且会中断正在运行的任务;
private static final int TIDYING = 2 << COUNT_BITS;// 010 00000000000000000000000000000 即高3位为010, 所有的任务都已经终止;
private static final int TERMINATED = 3 << COUNT_BITS;//011 00000000000000000000000000000 即高3位为011, terminated()方法已经执行完成
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~COUNT_MASK; } // ~COUNT_MASK = 111 00000000000000000000000000000。最新版本没有使用
private static int workerCountOf(int c) { return c & COUNT_MASK; } // COUNT_MASK = 000 1111111111111111111111111111
private static int ctlOf(int rs, int wc) { return rs | wc; }
任务执行的主要流程
主要函数
submit(Callable<T> task)方法
AbstractExecutorService.submit(task)实现了ExecutorService.submit(task)
execute –> addWorker –>runWorker (getTask)
- 线程池的工作线程通过Woker类实现,在ReentrantLock锁的保证下,把Woker实例插入到HashSet后,并启动Woker中的线程。
- 从Woker类的构造方法实现可以发现:线程工厂在创建线程thread时,将Woker实例本身this作为参数传入,当执行start方法启动线程thread时,本质是执行了Worker的runWorker方法。
- firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
execute(Runnable command)方法
ThreadPoolExecutor.execute(task)实现了Executor.execute(task)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1\. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2\. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3\. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
//workerCountOf获取线程池的当前线程数;小于corePoolSize,执行addWorker创建新线程执行command任务
if (addWorker(command, true))
return;
c = ctl.get();
}
// double check: c, recheck
// 线程池处于RUNNING状态,把提交的任务成功放入阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//如果线程池没有RUNNING,成功从阻塞队列中删除任务,执行reject方法处理任务
if (! isRunning(recheck) && remove(command))
reject(command);
//线程池处于running状态,但是没有线程,则创建线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 往线程池中创建新的线程失败,则reject任务
else if (!addWorker(command, false))
reject(command);
}
为什么需要double check线程池的状态? 在多线程环境下,线程池的状态时刻在变化,而ctl.get()是非原子操作,很有可能刚获取了线程池状态后线程池状态就改变了。判断是否将command加入workque是线程池之前的状态。倘若没有double check,万一线程池处于非running状态(在多线程环境下很有可能发生),那么command永远不会执行。
addWorker()方法
addWorker主要负责,创建新的线程并执行任务。 线程池创建新线程执行任务时,需要 获取全局锁:
private final ReentrantLock mainLock = new ReentrantLock();
private boolean addWorker(Runnable firstTask, boolean core) {
// CAS更新线程池数量
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN) // SHUTDOWN不会接收新任务,但会处理阻塞队列中的任务
&& (runStateAtLeast(c, STOP)
|| firstTask != null
|| workQueue.isEmpty()))
return false;
for (;;) {
if (workerCountOf(c)
>= ((core ? corePoolSize : maximumPoolSize) & COUNT_MASK))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateAtLeast(c, SHUTDOWN))
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 线程池重入锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get(); // 之前的版本用的rs = runStateOf(c); 一下使用的rs做判断
if (isRunning(c) ||
(runStateLessThan(c, STOP) && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w); // workers主要用于线程池工作线程的统计和线程池状态变更时判断使用
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 线程启动,执行任务(Worker.thread(firstTask).start());
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker类的runworker方法
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this); // 创建线程
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// ...
}
- 继承了AQS类,可以方便的实现工作线程的中止操作;
- 实现了Runnable接口,可以将自身作为一个任务在工作线程中执行;
- 当前提交的任务firstTask作为参数传入Worker的构造方法;
runWorker方法是线程池的核心:
1. 线程启动之后,通过unlock方法释放锁,设置AQS的state为0,表示运行可中断;
2. Worker执行firstTask或从workQueue中获取任务:
2.1 进行加锁操作,保证thread不被其他线程中断(除非线程池被中断)
2.2 检查线程池状态,倘若线程池处于中断状态,当前线程将中断。
2.3 执行beforeExecute
2.4 执行任务的run方法
2.5 执行afterExecute方法
2.6 解锁操作
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 先执行firstTask,再从workerQueue中取task(getTask())
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run();
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
getTask方法
通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
// Check if queue empty only if necessary.
if (runStateAtLeast(c, SHUTDOWN)
&& (runStateAtLeast(c, STOP) || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// Are workers subject to culling?
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
keepAliveTime的作用:
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
allowCoreThreadTimeOut为false,线程即使空闲也不会被销毁;倘若为ture,在keepAliveTime内仍空闲则会被销毁。
- 如果线程允许空闲等待而不被销毁timed == false,workQueue.take任务:如果阻塞队列为空,当前线程会被挂起等待;当队列中有任务加入时,线程被唤醒,take方法返回任务,并执行;
- 如果线程不允许无休止空闲timed == true, workQueue.poll任务:如果在keepAliveTime时间内,阻塞队列还是没有任务,则返回null;
提交任务
Callable负责产生结果,Future负责获取结果。
- Callable接口类似于Runnable,只是Runnable没有返回值。
- Callable任务除了返回正常结果之外,如果发生异常,该异常也会被返回,即Future可以拿到异步执行任务各种结果;
- Future.get方法会导致主线程阻塞,直到Callable任务执行完成;
submit(Callable<T> task)方法
// submit()在ExecutorService中的定义
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// submit方法在AbstractExecutorService中的实现
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
// newTaskFor返回一个FutureTask对象(class FutureTask<V> implements RunnableFuture<V>)
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;
FutureTask对象
内部状态:
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// this.state 是 volatile,对 volatile 字段的写入,存在一个 happen-before
// 关系;也就是说,`this.state = NEW` 执行完毕时,`this.callable = callable`
// 也保证已经写入
this.state = NEW; // ensure visibility of callable
}
run方法
public void run() {
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set方法和setException方法
// NEW -> COMPLETING -> NORMAL
protected void set(V v) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = v;
STATE.setRelease(this, NORMAL); // final state
finishCompletion();
}
}
// NEW -> COMPLETING -> EXCEPTIONAL
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
outcome = t;
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
内部通过awaitDone方法对主线程进行阻塞,具体实现如下:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
// 判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成或被取消,则直接返回;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 如果当前state等于COMPLETING,说明任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
// 如果主线程被中断,则抛出中断异常
else if (Thread.interrupted()) {
// 把节点从 waiters 列表里移除
removeWaiter(q);
throw new InterruptedException();
}
// 第一个循环,q == null,进入此处的判断证明 Callable 还未完成,所以会创建等待节点
else if (q == null) {
// 此处的 timed 传入为 false,不会在此返回
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
// 第二个循环会执行下面这个语句,把 q 入队。将上一个判断条件中新建的 q 加入到链表的首节点中,并且 queued 变成 true
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
// 挂起线程
LockSupport.parkNanos(this, parkNanos);
}
else
// 挂起线程
LockSupport.park(this);
}
}
finishCompletion方法
:当run方法里面,任务执行完成,LockSupport.unpark唤醒线程。当线程被释放后,那么在awaitDone的死循环中就会进入下一个循环,由于状态已经变成了NORMAL或者EXCEPTIONAL,将会直接跳出循环
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}