该系列统一使用java8的源码进行讲解
上一篇中对ThreadPoolExecutor的源码以及工作原理进行了讲解。今天来讲解一下在使用的过程中我们应该注意哪些问题
一. 参数冲突问题
在上一篇文章中,我们对ThreadPoolExecutor的构造函数的参数进行了讲解。其中
- 当corePoolSize==maximumPoolSize,且核心线程数不允许超时时,设置keepAliveTime与unit是没有意义的(因为没有需要超时的Worker)
- 当workQueue为无界队列时,其永远也放不满,这种情况maximumPoolSize就不会生效
二. Exectors 提供的三种线程池能不能用
2.1. 固定大小的线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
看实现可知,线程数固定,且采用的是无界阻塞队列。当任务提交的速度快与Worker消耗任务的速度时,任务会在workQueue中不断挤压,最终撑垮整个进程的内存空间,造成OOM,因此线上谨慎使用
2.2 单个线程的线程池
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
与1相比都是固定大小的线程池,唯一不同的是该线程数为1,存在于1一样的问题。
2.3 Cached线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
与固定大小的线程池正好相反,核心线程数为0,阻塞队列容量为1,有任务就开辟新的Worker执行,空闲时Worker会自动销毁。如果任务提交的任务非常的多,应用进程达到开辟线程的最大上限开辟不了线程抛出OOM,或者因为线程占用太多的内存空间,导致内存被耗尽抛出OOM(一个线程栈的默认大小为1m),或者因为线程太多,线程上下文切换的开销不可忽略,导致任务执行缓存从而拖垮整个应用进程。
综上:上面三种线程池都不推荐使用,而是应该由我们自己制定线程池参数,保证线程个数与任务队列的长度都在可控的范围内,而不是任由其无限制增长,在线上应用中,是不推荐存在这种不可控的情况出现的。资源都可控制了,那就延伸出另一种问题,达到了线程池的最大处理能力了怎么办?
三. 拒绝策略 RejectedExecutionHandler
当线程池达到自己最大处理能力时(任务队列满,每个Worker都在不停工作),再添加新任务时就会触发拒绝策略。JDK提供了4中拒绝策略
3.1. AbortPolicy 默认
直接拒绝,就是会抛出异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
注意是谁抛出异常,是执行提交任务的线程抛出异常。如果提交任务的线程正在循环的处理一批数据,且没有对异常进行捕获处理,那么在执行拒绝策略抛出异常后,后面的任务都不会被执行。如果提交任务的线程是一个用户请求线程,且业务中没有捕获该异常进行处理,用户请求会因为该异常而请求失败。并且这是默认的拒绝策略,初学者很容易掉进这个坑
3.2 DiscardPolicy
直接丢弃,实现就是什么都不做
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
针对不能对任务的场景是不能使用的
3.3 DiscardOldestPolicy
丢弃最老的一个任务
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
先从workQueue中弹出一个任务,然后再提交任务。同样会丢失数据,在不允许丢数据的场景不能使用
3.4 CallerRunsPolicy
调用者线程执行,谁提交任务,谁执行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
通过直接调用任务run方法的方式自己执行,虽然会延长线程的执行时间,但是这样不会丢失数据
四. 线程池如何关闭
4.1 shutdown
将线程池的状态修改为SHUTDOWN状态,在该状态不允许提交新任务
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池的状态
advanceRunState(SHUTDOWN);
//终端空闲的worker
interruptIdleWorkers();
//空方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
首先将线程池状态修改为shutdown状态,然后中断所有空闲的Worker。为什么这里需要中断线程呢,因为调用线程的interrupt方法,能够让阻塞在getTask中的take方法与poll方法上的Worker唤醒,唤醒之后,重新检查线程池的状态为shutdown状态,getTash方法就会返回null,从而Thread的run方法,线程销毁。最后调用tryTerminate尝试终止,只要有任务没有执行完成,有Worker在运行就不会转为terminate状态。看下tryTerimate的源码:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// stop 状态 或者shutdown状态,且workQueue没有内容
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// stop状态 或者shutdown状态,worker都已经销毁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//将状态修改为tidying状态
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
//将状态修改为terminate状态
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
- 首先判断状态,如果是running, tidying,terminate或者shutdown状态且workQueue不为空,就直接返回。总结就是:只有状态为stop 或者 shutdown且workQueue为空了,才能往下走。
- 如果Worker还存在,则中断一个空闲的worker,然后返回。
- 将状态修改为tidying状态,接着修改为terminate状态,在ThreadPoolExecutor中由于terminated方法时空方法,所以这两个状态可以认为是一样的。
什么样的状态才能转化为tidying状态, 首先前提是Worker都关闭了,其次是 stop,或者 shutdown且队列中没有任务了
4.2 shutdownNow
不同于shutdown的是,这里将状态修改为stop状态
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
首先将线程池状态修改为stop状态,然后唤醒所有的worker,然后调用tryTerminate方法,该方法已经讲过了。shutdownNow方法会返回阻塞队列中所有的任务。这些任务都没有被执行。
首先我们了解了shutdown与shutdownNow两个方法一个将状态修改为shutdown,一个将状态修改为stop,一个没有返回值,一直返回了workQueue中的任务列表。为了能够更加清晰的认清这两个方法,我们还是有必要再看一次getTask方法对于shutdown状态与stop状态有什么不同的处理操作。
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
如果是shutdown状态,且任务队列中有任务,则继续执行。如果是stop状态,直接返回null,销毁worker。
总结:两个方法修改任务状态后都不允许再提交新的任务,shutdown方法将线程池的状态修改为shutdown状态,会等待所有已经提交的任务全部执行完成然后各个worker自动退出,shutdownNow不会执行完成所有的任务,但是会将没有执行完成的任务列表返回,其在getTask中会直接返回null也会引导各个worker自动退出。最终Worker全部销毁,整个线程池关闭
4.3 线程池都是事后自行退出,那是谁在修改线程池的状态到最终terminate状态?
这个问题的答案隐藏在Worker的run方法中,run方法调用了runWorker方法,在runWorker方法中,退出循环后会执行finnaly语句,在这里会调用processWorkerExit方法,而在这个方法中会调用tryTerminate方法,也就是说每有一个Worker执行完成,都会去执行下tryTerminate方法,尝试将线程池的状态修改为terminate状态。代码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
... 省略
tryTerminate();
... 省略
}
在我们调用shutdown或者shutdownNow方法后,可以调用awaitTerminate方法,等待线程池关闭。
总结
针对ThreadPoolExecutor的使用事项,就先总结到这,大家如果有好的意见也可以提出来,我们一起探讨。