先上类图:
各级类功能浏览
- Executors
工厂类,负责创建各式各样的Threadpoolexecutors出来
public static ExecutorService newFixedThreadPool(int nThreads)
public static ExecutorService newSingleThreadExecutor()
public static ExecutorService newCachedThreadPool()
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
- Executor
public interface Executor {
// 一个抽象方法
void execute(Runnable command);
}
- ExecutorService
增加一写生命周期管理,任务提交增强处理等api。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
- AbstractExecutorService
提供了invokeAny, invokeAll,submit等方法的默认实现。
- ScheduledExecutorService
提供了两个叼逼定时任务方法,一看就很亲切
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
- ThreadPoolExecutor
这个叼逼在最后慢慢介绍
DelegatedExecutorService(图是扣的,这笔我不认识。。。。)
ThreadFactory
jdk提供的线程创建接口
有自己的默认实现DefaultThreadFactory,只是在创建线程时为线程添加了命名等。我们可以自己实现该接口实现自定义的任何创建线程的方法。创建线程池时我们可以自定义。
- 图中没有,不得不提一下RunnableFuture这个类
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
异步获取线程池里线程执行结果就靠他了!!
ThreadPoolExecutor规则
corePoolSize与maximumPoolSize 由于ThreadPoolExecutor 将根据 corePoolSize和 maximumPoolSize设置的边界自动调整池大小,当新任务在方法 execute(java.lang.Runnable) 中提交时:
(1)如果运行的线程少于 corePoolSize,则创建新线程来处理请求,即使其他辅助线程是空闲的;
(2)如果设置的corePoolSize 和 maximumPoolSize相同,则创建的线程池是大小固定的,如果运行的线程与corePoolSize相同,当有新请求过来时,若workQueue未满,则将请求放入workQueue中,等待有空闲的线程去从workQueue中取任务并处理
(3)如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列满时才创建新线程才创建新的线程去处理请求;
(4)如果运行的线程多于corePoolSize 并且等于maximumPoolSize,若队列已经满了,则通过handler所指定的策略来处理新请求;
(5)如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务
也就是说,处理任务的优先级为:
(1) 核心线程corePoolSize > 任务队列workQueue > 最大线程maximumPoolSize,如果三者都满了,使用handler处理被拒绝的任务。
(2)当池中的线程数大于corePoolSize的时候,多余的线程会等待keepAliveTime长的时间,如果无请求可处理就自行销毁。cache线程池使用SynchronousQueue,fixed使用LinkedBlockingQueue
区别:
a) cache,创业公司有特别多的活要干,那就有一个活来现在的人没时间认领就去招聘一个新人,后来活干完了少了,招聘进来的人闲了,闲下来超过xxx时间后又把他们开除了。
b) fixed,创业公司有特别多的活要干,但是公司就这么几个人,有任务来就排期吧,反正这几个人一个个处理。。。。队列满了之后的应对策略(rejecthanlder)
a) 直接抛弃
b) 抛出异常
c) 主线程执行
d) 将最老的线程踢出去,把这个线程放入队列。
ThreadPoolExecutor源码
1.小技巧,把线程数和线程池状态存在一个原子整数中,通过cas原子更新
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
低29位存线程数,高3位存runState(线程池生命周期),这样runState有5个值,但是出于简单就考虑以下三个值:
RUNNING状态:线程池正常运行,可以接受新的任务并处理队列中的任务;
SHUTDOWN状态:不再接受新的任务,但是会执行队列中的任务;
STOP状态:不再接受新任务,不处理队列中的任务
提供几个工具函数针对gictl这个变量操作:
/**
* 这个方法用于取出runState的值 因为CAPACITY值为:00011111111111111111111111111111
* ~为按位取反操作,则~CAPACITY值为:11100000000000000000000000000000
* 再同参数做&操作,就将低29位置0了,而高3位还是保持原先的值,也就是runState的值
*
* @param c
* 该参数为存储runState和workerCount的int值
* @return runState的值
*/
private static int runStateOf(int c) {
return c & ~CAPACITY;
}
/**
* 这个方法用于取出workerCount的值
* 因为CAPACITY值为:00011111111111111111111111111111,所以&操作将参数的高3位置0了
* 保留参数的低29位,也就是workerCount的值
*
* @param c
* ctl, 存储runState和workerCount的int值
* @return workerCount的值
*/
private static int workerCountOf(int c) {
return c & CAPACITY;
}
/**
* 将runState和workerCount存到同一个int中
* “|”运算的意思是,假设rs的值是101000,wc的值是000111,则他们位或运算的值为101111
*
* @param rs
* runState移位过后的值,负责填充返回值的高3位
* @param wc
* workerCount移位过后的值,负责填充返回值的低29位
* @return 两者或运算过后的值
*/
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
// 只有RUNNING状态会小于0
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
2. Worker
实现了互斥锁但不可重入,所以没有使用ReentrantLock。
贴重要代码:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 真正执行的线程
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
// Lock methods
//
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
return getState() != 0;
}
// 实现不可重入的互斥锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
}
3.开始从execute方法进入
活动线程小于corePoolSize的时候创建新的线程;
活动线程大于corePoolSize时都是先加入到任务队列当中;
任务队列满了再去启动新的线程,如果线程数达到最大值就拒绝任务。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 活动线程数 < corePoolSize
if (workerCountOf(c) < corePoolSize) {
// 直接启动新的线程。第二个参数true:addWorker中会重新检查workerCount是否小于corePoolSize
if (addWorker(command, true))
// 添加成功返回
return;
c = ctl.get();
}
// 活动线程数 >= corePoolSize
// runState为RUNNING && 队列未满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// double check
// 非RUNNING状态 则从workQueue中移除任务并拒绝
if (!isRunning(recheck) && remove(command))
reject(command);// 采用线程池指定的策略拒绝任务
// 线程池处于RUNNING状态 || 线程池处于非RUNNING状态但是任务移除失败
else if (workerCountOf(recheck) == 0)
// 这行代码是为了SHUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
// 添加一个null任务是因为SHUTDOWN状态下,线程池不再接受新任务
addWorker(null, false);
// 两种情况:
// 1.非RUNNING状态拒绝新的任务
// 2.队列满了启动新的线程失败(workCount > maximumPoolSize)
} else if (!addWorker(command, false))
reject(command);
}
addWorker(null, false);这一行,这要结合addWorker一起来看。 主要目的是防止HUTDOWN状态下没有活动线程了,但是队列里还有任务没执行这种特殊情况。
4.addWorker
private boolean addWorker(Runnable firstTask, boolean core) {
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);// 当前线程池状态
// Check if queue empty only if necessary.
// 这条语句等价:rs >= SHUTDOWN && (rs != SHUTDOWN || firstTask != null ||
// workQueue.isEmpty())
// 满足下列调价则直接返回false,线程创建失败:
// rs > SHUTDOWN:STOP || TIDYING || TERMINATED 此时不再接受新的任务,且所有任务执行结束
// rs = SHUTDOWN:firtTask != null 此时不再接受任务,但是仍然会执行队列中的任务
// rs = SHUTDOWN:firtTask == null见execute方法的addWorker(null,
// false),任务为null && 队列为空
// 最后一种情况也就是说SHUTDONW状态下,如果队列不为空还得接着往下执行,为什么?add一个null任务目的到底是什么?
// 看execute方法只有workCount==0的时候firstTask才会为null结合这里的条件就是线程池SHUTDOWN了不再接受新任务
// 但是此时队列不为空,那么还得创建线程把任务给执行完才行。
if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()))
return false;
// 走到这的情形:
// 1.线程池状态为RUNNING
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))// 原子操作递增workCount
break retry;// 操作成功跳出的重试的循环
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)// 如果线程池的状态发生变化则重试
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// wokerCount递增成功
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 并发的访问线程池workers对象必须加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
// RUNNING状态 || SHUTDONW状态下清理队列中剩余的任务
if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 将新启动的线程添加到线程池中
workers.add(w);
// 更新largestPoolSize
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 启动新添加的线程,这个线程首先执行firstTask,然后不停的从队列中取任务执行
// 当等待keepAlieTime还没有任务执行则该线程结束。见runWoker和getTask方法的代码。
if (workerAdded) {
t.start();// 最终执行的是ThreadPoolExecutor的runWoker方法
workerStarted = true;
}
}
} finally {
// 线程启动失败,则从wokers中移除w并递减wokerCount
if (!workerStarted)
// 递减wokerCount会触发tryTerminate方法
addWorkerFailed(w);
}
return workerStarted;
}
5.runWorker方法
任务添加成功后实际执行的是runWorker这个方法,这个方法非常重要,简单来说它做的就是:
第一次启动会执行初始化传进来的任务firstTask;
然后会从workQueue中取任务执行,如果队列为空则等待keepAliveTime这么长时间。
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
// Worker的构造函数中抑制了线程中断setState(-1)----AQS中设置了state为-1,unlock会把state重新置为0,所以这里需要unlock从而允许中断
w.unlock();
// 用于标识是否异常终止,finally中processWorkerExit的方法会有不同逻辑
// 为true的情况:1.执行任务抛出异常;2.被中断。
boolean completedAbruptly = true;
try {
// 如果getTask返回null那么getTask中会将workerCount递减,如果异常了这个递减操作会在processWorkerExit中处理
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
&& !wt.isInterrupted())
wt.interrupt();
try {
// 任务执行前可以插入一些处理,子类重载该方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();// 执行用户任务
} catch (RuntimeException x) {
thrown = x;
throw x;
} catch (Error x) {
thrown = x;
throw x;
} catch (Throwable x) {
thrown = x;
throw new Error(x);
} finally {
// 和beforeExecute一样,留给子类去重载
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
// 结束线程的一些清理工作
processWorkerExit(w, completedAbruptly);
}
}
从上面的源码上可以看出,这里叉入了两个钩子,执行前后处理,所以可以自己扩展ThreadPoolExecutor实现一些增强功能,如暂停执行(jdk8源码注释上就有这个例子):
class PausableThreadPoolExecutor extends ThreadPoolExecutor {
private boolean isPaused;
private ReentrantLock pauseLock = new ReentrantLock();
private Condition unpaused = pauseLock.newCondition();
public PausableThreadPoolExecutor(...) { super(...); }
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
pauseLock.lock();
try {
while (isPaused) unpaused.await();
} catch (InterruptedException ie) {
t.interrupt();
} finally {
pauseLock.unlock();
}
}
public void pause() {
pauseLock.lock();
try {
isPaused = true;
} finally {
pauseLock.unlock();
}
}
public void resume() {
pauseLock.lock();
try {
isPaused = false;
unpaused.signalAll();
} finally {
pauseLock.unlock();
}
}
}}
6.getTask方法(这里实现了一只获取不到任务,超时就终结掉线程)
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry: for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 1.rs > SHUTDOWN 所以rs至少等于STOP,这时不再处理队列中的任务
// 2.rs = SHUTDOWN 所以rs>=STOP肯定不成立,这时还需要处理队列中的任务除非队列为空
// 这两种情况都会返回null让runWoker退出while循环也就是当前线程结束了,所以必须要decrement
// wokerCount
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
// 递减workerCount值
decrementWorkerCount();
return null;
}
// 标记从队列中取任务时是否设置超时时间
boolean timed; // Are workers subject to culling?
// 1.RUNING状态
// 2.SHUTDOWN状态,但队列中还有任务需要执行
for (;;) {
int wc = workerCountOf(c);
// 1.core thread允许被超时,那么超过corePoolSize的的线程必定有超时
// 2.allowCoreThreadTimeOut == false && wc >
// corePoolSize时,一般都是这种情况,core thread即使空闲也不会被回收,只要超过的线程才会
timed = allowCoreThreadTimeOut || wc > corePoolSize;
// 从addWorker可以看到一般wc不会大于maximumPoolSize,所以更关心后面半句的情形:
// 1. timedOut == false 第一次执行循环, 从队列中取出任务不为null方法返回 或者
// poll出异常了重试
// 2.timeOut == true && timed ==
// false:看后面的代码workerQueue.poll超时时timeOut才为true,
// 并且timed要为false,这两个条件相悖不可能同时成立(既然有超时那么timed肯定为true)
// 所以超时不会继续执行而是return null结束线程。(重点:线程是如何超时的???)
if (wc <= maximumPoolSize && !(timedOut && timed))
break;
// workerCount递减,结束当前thread
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
// 需要重新检查线程池状态,因为上述操作过程中线程池可能被SHUTDOWN
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
// ============== 以上这一坨都是生命周期判断以及cas递减workcount。
try {
// 1.以指定的超时时间从队列中取任务
// 2.core thread没有超时
Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
if (r != null)
return r;
timedOut = true;// 超时
} catch (InterruptedException retry) {
timedOut = false;// 线程被中断重试
}
}
}
7.processWorkerExit
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 正常的话再runWorker的getTask方法workerCount已经被减一了
if (completedAbruptly)
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 累加线程的completedTasks
completedTaskCount += w.completedTasks;
// 从线程池中移除超时或者出现异常的线程
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试停止线程池
tryTerminate();
int c = ctl.get();
// runState为RUNNING或SHUTDOWN
if (runStateLessThan(c, STOP)) {
// 线程不是异常结束
if (!completedAbruptly) {
// 线程池最小空闲数,允许core thread超时就是0,否则就是corePoolSize
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
// 如果min == 0但是队列不为空要保证有1个线程来执行队列中的任务
if (min == 0 && !workQueue.isEmpty())
min = 1;
// 线程池还不为空那就不用担心了
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 1.线程异常退出
// 2.线程池为空,但是队列中还有任务没执行,看addWoker方法对这种情况的处理
addWorker(null, false);
}
}
8.tryTerminate
每个worker退出时,都会尝试终止线程池。
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 以下状态直接返回:
// 1.线程池还处于RUNNING状态
// 2.SHUTDOWN状态但是任务队列非空
// 3.runState >= TIDYING 线程池已经停止了或在停止了
if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty()))
return;
// 只能是以下情形会继续下面的逻辑:结束线程池。
// 1.SHUTDOWN状态,这时不再接受新任务而且任务队列也空了
// 2.STOP状态,当调用了shutdownNow方法
// workerCount不为0则还不能停止线程池,而且这时线程都处于空闲等待的状态
// 需要中断让线程“醒”过来,醒过来的线程才能继续处理shutdown的信号。
if (workerCountOf(c) != 0) { // Eligible to terminate
// runWoker方法中w.unlock就是为了可以被中断,getTask方法也处理了中断。
// ONLY_ONE:这里只需要中断1个线程去处理shutdown信号就可以了。
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 进入TIDYING状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 子类重载:一些资源清理工作
terminated();
} finally {
// TERMINATED状态
ctl.set(ctlOf(TERMINATED, 0));
// 继续awaitTermination
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
9.线程池的终止
- shutdown
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 线程池状态设为SHUTDOWN,如果已经至少是这个状态那么则直接返回
advanceRunState(SHUTDOWN);
// 注意这里是中断所有空闲的线程:runWorker中等待的线程被中断 → 进入processWorkerExit →
// tryTerminate方法中会保证队列中剩余的任务得到执行。
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
// w.tryLock能获取到锁,说明该线程没有在运行,因为runWorker中执行任务会先lock,
// 因此保证了中断的肯定是空闲的线程。
// Worker实现的是不可重入锁,获取锁失败就说明任务正在执行中
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
}
finally {
mainLock.unlock();
}
}
- shutdownnow
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// STOP状态:不再接受新任务且不再执行队列中的任务。
advanceRunState(STOP);
// 中断所有线程
interruptWorkers();
// 返回队列中还没有被执行的任务。
tasks = drainQueue();
}
finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
void interruptIfStarted() {
Thread t;
// 初始化时state == -1
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}