该系列统一使用java8的源码进行讲解。
由于线程的创建于销毁是存在开销的,为了避免频繁的创建与销毁线程,Java采用了池化技术来管理线程资源。只要涉及到多线程、异步的场景,基本就会有线程池的存在。因此掌握好线程池实现原理对程序员来说非常的重要,也是通往高级程序员以及架构师的必经之路。 本文主要从以下几个方面对线程池技术进行讲解。
- 剖析线程池的源码实现
- 讲解使用线程池的注意事项
- 线程池的变异使用方式(Tomcat与Netty如何使用线程池)
- 面试中的线程池问答
一. 源码剖析
为了使线程池可以适用于多种场景,对于线程池的创建提供了多个参数,进行控制。各个参数的含义必须要非常的明确。
1.1 构造方法
- corePoolSize 核心线程数
- maximumPoolSize 最大线程数
- keepAliveTime 保活时间
- unit 保活时间的单位
- workQueue 任务队列
- threadFactory 线程工厂
- handler 拒绝策略
结合参数描述一下线程的工作原理,以新来一个任务为例:
1. 新来任务后,如果线程数<corePoolSize,则创建线程执行(即便存在空闲的线程),否则执行2
2. 如果workQueue没有达到最大值则扔进阻塞队列,否则执行3
3. 如果线程数<maximumPoolSize,则创建线程执行,否则执行4
4. 按照指定的拒绝策略handler处理新来的任务
除了上面步骤提到的参数外,还有
- keepAliveTime, unit 保活时间,如果Worker阻塞在从workQueue中获取任务的时间超过该时间,且线程数>corePoolSize,那么就会对该Worker进行销毁,避免过多的线程阻塞,浪费资源。
- threadFactory 线程工厂,用于创建线程对象
连接了各个参数的含义,看下构造函数的源码:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
源码中只是进行参数取值范围控制,并赋值。
1.2 execute 提交任务
创建好线程池之后,我们就需要往线程池中提交任务,提交任务有两个方法(低级的面试也会问这两个方法有什么区别):
- submit() 有返回值,返回Future对象(Future后面再将)
- execute() 无返回值
其中 submit也只是任务包装成Future之后,调用execute,所以这里我们只需要看execute方法的实现即可。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//线程数小于核心线程数则新增worker执行
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//否则,扔到阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//扔进阻塞队列后判断状态,如果线程池状态处于非运行状态,则执行拒绝策略handler
if (! isRunning(recheck) && remove(command))
reject(command);
//如果运行着,但是没有worker,那么新增worker执行,为什么会出现这种情况?
//因为有参数可以控制核心线程数也可以在超时的情况下被销毁:allowCoreThreadTimeOut这个参数控制
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 队列慢,则新增worker执行任务
else if (!addWorker(command, false))
//worker也达到上限,则执行拒绝策略
reject(command);
}
其中,ctl一个线程安全的AtomicInteger变量,用一个整数来记录了线程池的状态(高三位)和目前线程池中线程(Worker)的个数(低29位)
举例说明:ctl的值为:
1000 0000 0000 0000 0000 0000 0000 0001 高三位100代表线程池处于运行状态,低29位为1,说明目前线程池中只有1个线程。
workerCountOf(c) 返回的就是低29位表示的数,即线程个数
isRuning(c) 就是判断高3位是否为100,100位运行状态
然后上面的代码逻辑就是我们一开始整理的新来一个任务时,线程池的执行逻辑。非常的重要,几乎每次面试都会被问。
1.3 Worker 线程池中的工作者
线程池中的工作者是Worker,Worker不仅对Thread进行了包装,还继承了AbstractQueuedSynchronizer(AQS相关的知识简单讲,后面会有文章细讲)实现了Runnable,下面我们就带着问题一起来认识下Worker。
1.3.1. Worker为什么要实现Runnable接口?
Worker中封装了Thread,也就是在构造Worker的时候,会创建Thread对象,Thread对象又要关联一个任务去执行,那这个任务就是Worker自己本身。也就是说:Worker中的线程对象Thread执行的是Worker的run方法。这样的话,thread一旦执行,执行的就是Worker的run方法,看下Worker的构造方法:
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
从构造方法中即可以看出,thread一旦启动,调用的就是Worker的run方法。
1.3.2 Worker为什么还实现了AbstractQueuedSynchronizer
这里主要是为了实现Worker的中断。从1.3.1 Worker的构造函数中可以看到,设置状态为-1, 相当于给Worker加了一把锁。那什么时候会解锁呢?简单看下runWork方法(也就是Worker的run方法),代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
//...省略
}
其中unlock()方法就是解锁,unlock方法会调用Worker的release方法,将state的值+1,这样state值就为0了。因为Worker创建并不代表Thread执行,只有Thread线程真正执行了,才会响应中断。此外,在执行每一个task的过程中也不允许中断。响应中断的方法如下:
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
首先会判断getState(), 这个state就是AQS的值,当Thread线程开始执行后,该值就会变为0,那么在这个中断方法中就可以进入进行中断了。
1.3.3 Worker线程都做了哪些事情
这就要看runWork方法了,代码如下:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//首次task不为null执行自己的任务,此后从workQueue中去任务
while (task != null || (task = getTask()) != null) {
//上锁,不允许中断
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
当worker创建时,firstTask是被赋值了的,所以先执行自己的任务,此后所有的任务都是通过getTask()从workQueue中获取。拿到任务后先lock加锁,然后通过调用task.run方法执行任务,执行完成后,解锁。
从这里可以看出来,在一个任务任务的执行过程中是不需要中断的
通过getTask方法,如果返回的是null,那么就要执行processWorkerExit,对该Worker进行退出
1.3.4 getTask只是从workerQueue中获取任务吗?
getTask除了从workerQueue中获取任务外,还会对worker的等待时间进行判断,释放掉多余的worker。
看下getTask的实现:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池关闭状态下,如果workQueue空,则减少Worker
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 判断是否需要因为worker数>corePoolSize 而销毁worker
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超时,且要多余1个线程,且目前没有任务需要处理,则进行销毁Worker
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//仅仅从数量上-1,销毁Worker的事情让runWork方法去做
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//去队列中获取数据,如果需要考虑超时,则按照超时返回的策略去获取任务
//如果不需要考虑超时,则直接使用take方法阻塞在workQueue上
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
//任务存在,直接将任务返回,执行任务
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
getTask方法会根据当前线程池的状态,去判断该Worker是否需要有限超时从workQueue中获取任务,这样可以让getTask提前退出,销毁多余的Worker。从这里也可以看出来并不会说先创建的线程就是核心线程,线程池只关心线程的数量,不关心哪些线程是因为<corePoolSize创建的,哪些是因为>=corePoolSize创建的,在销毁的时候是随机销毁的。
1.4 Worker何时被启动的
当一个新任务被提交到线程池后,有三种情况会创建新的worker并启动worker
- 线程数<corePoolSize时
- 线程数>=corePoolSize,且workQueue满时
- 任务添加到阻塞队列后,发现线程数为0时
会调用addWorker方法完成Worker的新增,代码如下:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//先通过死循环,保证在ctl上把worker数加上
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//构造一个worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//保证线程安全,只能有一个线程执行这段代码
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//添加成功后,通过线程启动worker
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker做了几件事情
- 在死循环中完成对ctl数值+1,这里为什么不用加锁?因为这里使用的是cas操作,属于乐观锁,不需要加锁也能保证线程安全的修改ctl
- 创建worker,并加锁将worker放到workers列表中,然后通过执行线程的start方法,调用Worker的run方法,然后执行runWork方法,Worker就开始工作了
到此,关于线程池的核心源码部分就基本完成了,关于更细致的源码剖析,线程池各个状态的转换细节可以参考我的另一篇简书上的文章 https://www.jianshu.com/p/a52f438c16be,有关线程池相关的剩余部分限于篇幅问题,放在下一篇中继续剖析。如有问题欢迎大家指正,我们一起学习,共同进步。