Java 线程池的使用,是面试必问的。下面我们来从使用到源码整理一下。
1、构造线程池
- 通过Executors来构造线程池
1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同,
同时使用了一个无界LinkedBlockingQueue存放阻塞任务,因此多余的任务将存在阻塞队列,
不会由RejectedExecutionHandler处理
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
2、构造一个缓冲功能的线程池,配置corePoolSize=0,maximumPoolSize=Integer.MAX_VALUE,
keepAliveTime=60s,以及一个无容量的阻塞队列 SynchronousQueue,因此任务提交之后,
将会创建新的线程执行;线程空闲超过60s将会销毁
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
3、构造一个只支持一个线程的线程池,配置corePoolSize=maximumPoolSize=1,
无界阻塞队列LinkedBlockingQueue;保证任务由一个线程串行执行
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
4、构造有定时/延时功能的线程池,配置corePoolSize,无界延迟阻塞队列DelayedWorkQueue;
有意思的是:maximumPoolSize=Integer.MAX_VALUE,由于DelayedWorkQueue是无界队列,
所以这个值是没有意义的
对于一些不能及时处理,需要延时处理的操作,用ScheduledExecutorService处理很方便,
比如我们在某些条件下需要清理redis/mysql中数据时,但是可能当前有些地方还需要用到(并发),这时用ScheduledExecutorService处理非常合适,
虽然也可以用定时任务处理,但是定时任务会一直执行,而这里的场景是满足一定条件去执行,而执行的机会又很少。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
注:阻塞队列与普通队列的区别在于,当队列是空的时,从队列中获取元素的操作将会被阻塞,
或者当队列是满时,往队列里添加元素的操作会被阻塞。
试图从空的阻塞队列中获取元素的线程将会被阻塞,直到其他的线程往空的队列插入新的元素。
同样,试图往已满的阻塞队列中添加新元素的线程同样也会被阻塞,直到其他的线程使队列重新变得空闲起来,
如从队列中移除一个或者多个元素,或者完全清空队列.
- 通过ThreadPoolExecutor自定义线程池
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler )
* corePoolSize 核心线程池大小----1
* maximumPoolSize 最大线程池大小----3
* keepAliveTime 线程池中超过corePoolSize数目的空闲线程最大存活时间----30
* keepAliveTime时间单位----TimeUnit.MINUTES
* workQueue 阻塞队列----new ArrayBlockingQueue<Runnable>(5)----阻塞队列的容量是5
* threadFactory 新建线程工厂----new CustomThreadFactory()----定制的线程工厂
* rejectedExecutionHandler 当提交任务数超过maxmumPoolSize+workQueue之和(3+5),
即当提交第9个任务时(前面线程都没有执行完,此测试方法中用 sleep(30),
任务会交给RejectedExecutionHandler来处理
new ThreadPoolExecutor(
1,
3,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(5),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
package com.vendor.control.web.device;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Created by zhangkai on 2019/8/12.
*/
public class CustomThreadPoolExecutor
{
private ThreadPoolExecutor pool = null;
public void init() {
pool = new ThreadPoolExecutor(
1,
3,
30,
TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(5),
new CustomThreadFactory(),
new CustomRejectedExecutionHandler());
}
public void destory() {
if(pool != null) {
pool.shutdownNow();
}
}
public ExecutorService getCustomThreadPoolExecutor() {
return this.pool;
}
private class CustomThreadFactory implements ThreadFactory
{
private AtomicInteger count = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
String threadName = CustomThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
System.out.println(threadName);
t.setName(threadName);
return t;
}
}
private class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
// 当使用blockingqueue的offer插入数据时,如果队列已满,那么阻塞指定时间等待队列可用,
//等待期间如果被中断,那么抛出InterruptedException。
// 如果插入成功,那么返回true,如果在达到指定时间后仍然队列不可用,
//那么返回false。===========超时退出
// 使用put 时,插入数据时,如果队列已满,那么阻塞等待队列可用,等待期间如果被中断,
//那么抛出InterruptedException。 ============= 一直阻塞:
System.out.println("拒绝任务");
executor.getQueue().offer(r); //会有任务线程不执行
//executor.getQueue().put(r); //不会有任务线程不执行
} catch (Exception e) {
e.printStackTrace();
}
}
}
// 测试构造的线程池
public static void main(String[] args) {
CustomThreadPoolExecutor exec = new CustomThreadPoolExecutor();
// 1.初始化
exec.init();
ExecutorService pool = exec.getCustomThreadPoolExecutor();
for(int i=1; i<=10; i++) {
System.out.println("提交第" + i + "个任务!");
pool.execute(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(30);
System.out.println(">>>task is running=====");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
// 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
// exec.destory();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
2、线程池执行流程
源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1、工作线程 < 核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 2、运行态,并尝试将任务加入队列;如果能加入,说明队列没满
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
} // 3、工作线程 < 核心线程,并且队列满了,那么继续新建线程,尝试使用最大线程运行
else if (!addWorker(command, false))
reject(command);
}
从上面这个图中可以看出,在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,并且工作线程数<核心线程数时,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
在核心线程数创建以后,就不会再关闭了,这个创建过程类似懒加载,只有需要用的时候才去创建
当核心线程数执行完其第一个任务以后,就会阻塞,等待从队列中获取任务(getTask),获取到的话,线程就继续执行任务。见下面runWorker源码。
ThreadPoolExecutor执行顺序总结:
当线程数小于核心线程数时,创建线程。
当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程
若线程数等于最大线程数,抛出异常,拒绝任务
简单的说,
- addWorker(command, true): 创建核心线程执行任务;
- addWorker(command, false):创建非核心线程执行任务;
- addWorker(null, false): 创建非核心线程,当前任务为空;
- addWorker(null,true) : 预先创建corePoolSize个线程;
addWorker源码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//启动线程,执行任务,这里调用的其实是worker的run方法,见下面Worker构造方法
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
//Worker构造方法,thread变量构造的线程是它本身,即当调用Worker中thread.start()时,
//最终执行的是Worker类的run方法
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
//执行任务时,最后调用的方法
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//第一次执行task时,task肯定不为空,当firstTask执行完以后,while循环等待,
//指导从队列中获取到task,即getTask()不为空时,getTask就是从队列中获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
3、拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
- ThreadPoolExecutor.DiscardPolicy:丢弃任务,但是不抛出异常。
- ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新提交被拒绝的任务
- ThreadPoolExecutor.CallerRunsPolicy:由调用线程(提交任务的线程)处理该任务
线程池的默认拒绝策略为AbortPolicy,即丢弃任务并抛出RejectedExecutionException异常。
4、线程池优雅关闭
从源码中可以看到,有两种关闭方式,shutdown和shutdownNow。
- executorService.shutdown():线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//设置线程池状态为SHUTDOWN,之后就不能再向线程池提交任务了
advanceRunState(SHUTDOWN);
//遍历所有未执行任务的线程,对其设置中断
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//判断所有任务是否都已退出,如果退出则设置标记 “TERMINATED”
//在这里会死循环一直等到所有线程都执行完任务后,再次中断线程
tryTerminate();
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
//中断时,先获取锁,runWorker中执行任务时,会先lock加锁(见上面runWorker源码)
//所以,这里其实只会对中断空闲线程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
}
} finally {
mainLock.unlock();
}
}
从上面的源码中可以看出,当我们调用线程池的shuwdown方法时,如果线程正在执行线程池里的任务,即便任务处于阻塞状态,线程也不会被中断,而是继续执行(因为有加锁,所以interruptIdleWorkers中worker获取不到锁,所以执行不了中断)。
如果线程池阻塞等待从队列里读取任务getTask(),则会被唤醒,但是会继续判断队列是否为空,如果不为空会继续从队列里读取任务,为空则线程退出。
- executorService.shutdownNow():线程池拒接收新提交的任务,同时立马关闭线程池,线程池里的任务不再执行。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//修改线程池的状态为STOP状态
advanceRunState(STOP);
//遍历线程池里的所有工作线程,然后调用线程的interrupt方法
interruptWorkers();
//将队列里还没有执行的任务放到列表里,返回给调用方
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
//直接中断
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
//移除工作队列中任务,同时把其返回给调用方
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
当我们调用线程池的shutdownNow时,如果线程正在getTask方法中执行,则会通过for循环进入到if语句,于是getTask返回null,从而线程退出(getTask源码中会先判断线程状态,而上一步已经把线程状态修改为STOP了)。不管线程池里是否有未完成的任务。
如果线程因为执行提交到线程池里的任务而处于阻塞状态,则会导致报错(如果任务里没有捕获InterruptedException异常),否则线程会执行完当前任务,然后通过getTask方法返回为null来退出。
总结:调用完shutdownNow和shuwdown方法后,并不代表线程池已经完成关闭操作,它只是异步的通知线程池进行关闭处理。如果要同步等待线程池彻底关闭后才继续往下执行,需要调用awaitTermination方法进行同步等待。
5、ThreadPoolExecutor参数设置
5.1 默认值
- corePoolSize=1
- queueCapacity=Integer.MAX_VALUE
- maxPoolSize=Integer.MAX_VALUE
- keepAliveTime=60s
- allowCoreThreadTimeout=false
- rejectedExecutionHandler=AbortPolicy()
5.2 自定义线程池参数的合理设置
为了说明合理设置的条件,我们首先确定有以下⼏个相关参数:
- 1.tasks,程序每秒需要处理的最⼤任务数量(假设系统每秒任务数为100~1000)
- 2.tasktime,单线程处理⼀个任务所需要的时间(每个任务耗时0.1秒)
- 3.responsetime,系统允许任务最⼤的响应时间(每个任务的响应时间不得超过2秒)
corePoolSize:核心线程数
每个任务需要tasktime秒处理,则每个线程每秒可处理1/tasktime个任务。系统每秒有tasks个任务需要处理,则需要的线程数为:tasks/(1/tasktime),即tasks*tasktime个线程数。
假设系统每秒任务数为100到1000之间,每个任务耗时0.1秒,则需要100x0.1⾄1000x0.1,即10到100个线程。
那么corePoolSize应该设置为大于10。具体数字最好根据8020原则,即80%情况下系统每秒任务数,若系统80%的情况下任务数小于200,最多时为1000,则corePoolSize可设置为20。
queueCapacity:任务队列的长度:
任务队列的长度要根据核心线程数,以及系统对任务响应时间的要求有关。队列长度可以设置为(corePoolSize/tasktime) * responsetime=(20/0.1) * 2=400,即队列长度可设置为400。
如果队列长度设置过⼤,会导致任务响应时间过长,如以下写法:
LinkedBlockingQueue queue = new LinkedBlockingQueue();
这实际上是将队列长度设置为Integer.MAX_VALUE,将会导致线程数量永远为corePoolSize,再也不会增加,当任务数量陡增时,任务响应时间也将随之陡增。
maxPoolSize:最大线程数
当系统负载达到最⼤值时,核心线程数已无法按时处理完所有任务,这时就需要增加线程。
每秒200个任务需要20个线程,那么当每秒达到1000个任务时,则需要(1000-queueCapacity) * 0.1,即60个线程,可将maxPoolSize设置为60。
keepAliveTime:
线程数量只增加不减少也不⾏。当负载降低时,可减少线程数量,如果⼀个线程空闲时间达到keepAliveTiime,该线程就退出。默认情况下线程池最少会保持corePoolSize个线程。keepAliveTiime设定值可根据任务峰值持续时间来设定。
rejectedExecutionHandler:
根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
以上关于线程数量的计算并没有考虑CPU的情况。若结合CPU的情况,比如,当线程数量达到50时,CPU达到100%,则将maxPoolSize设置为60也不合适,此时若系统负载长时间维持在每秒1000个任务,则超出线程池处理能⼒,应设法降低每个任务的处理时间(tasktime)。
补充:线程中断
在程序中,我们是不能随便中断一个线程的,因为这是极其不安全的操作,我们无法知道这个线程正运行在什么状态,它可能持有某把锁,强行中断可能导致锁不能释放的问题;或者线程可能在操作数据库,强行中断导致数据不一致混乱的问题。正因此,JAVA里将Thread的stop方法设置为过时,以禁止大家使用。
一个线程什么时候可以退出呢?当然只有线程自己才能知道。
所以我们这里要说的Thread的interrrupt方法,本质不是用来中断一个线程。是将线程设置一个中断状态。
当我们调用线程的interrupt方法,它有两个作用:
- 1、如果此线程处于阻塞状态(比如调用了wait方法,io等待),则会立马退出阻塞,并抛出InterruptedException异常,线程就可以通过捕获InterruptedException来做一定的处理,然后让线程退出。
- 2、如果此线程正处于运行之中,则线程不受任何影响,继续运行,仅仅是线程的中断标记被设置为true。所以线程要在适当的位置通过调用isInterrupted方法来查看自己是否被中断,并做退出操作。
如果线程的interrupt方法先被调用,然后线程调用阻塞方法进入阻塞状态,InterruptedException异常依旧会抛出。
如果线程捕获InterruptedException异常后,继续调用阻塞方法,将不再触发InterruptedException异常。
参考:
https://www.jianshu.com/p/f030aa5d7a28
https://www.jianshu.com/p/23cb8b903d2c
https://www.cnblogs.com/zedosu/p/6665306.html
https://blog.csdn.net/mayongzhan_csdn/article/details/80790966