线程作为CPU执行的工作单位,具有自己的上下文环境,也会占用一定的资源。Linux下对于系统的整个线程数量有一定限制,一个进程下的线程数量也是有一定限制的,通常情况下Linux下一个进程下的可执行线程数量默认为1024。因为一台服务器的资源是有一定空间的,每一条线程都会占用一定的资源,因此对线程数量的把控也是理所应当的事情。既然线程作为一种可利用的资源存在,如何去有效的管理资源就是线程池主要做的事情。下面让我们仔细看看Java的线程池都做了哪些事情。
线程概念
Java中对于线程有自己的支持,支持用户自行创建线程,一般我们是通过实现Runnable接口来创建线程,简单步骤如下:
static class DemoThread implements Runnable{
public void run() {
System.out.println("My Name Is :" + Thread.currentThread().getName());
}
}
public static void main(String[] args) {
int i = 0;
while ((i++) < 10) {
Thread t = new Thread(new DemoThread());
t.start();
}
}
在这里我创建了十个线程并且分别打印出了线程的名字,实现非常简单。只是通过实现一个Runnable接口然后作为Thread的构造函数的参数传进去即可,这样我们就创建了十个线程出来。
线程池
Java中我们一般通过Executors工具类来创建线程,我们日常使用的线程池主要是四类:
- newFixedThreadPool 固定线程数量的线程池,也就是说如果即使有空闲的线程在线程池中也不会进行回收操作。他的内部实现其实就是令corePoolSize = maximumPoolSize。但是它的任务队列主要是通过LinkedBlockingQueue来实现的,这个队列是无界队列,所以在使用的时候一定要注意,防止队列无线增长撑爆内存。
- newCachedThreadPool 一个带有缓存特性的线程池,如果需要执行任务的时候就会创建出来一个线程,当线程空闲的时候也会回收线程。这个线程池主要适用于执行很多短时间的异步任务。默认的回收时间是60s,线程池的最大线程数量为Integer.MAX_VALUE,它的内部阻塞队列为SynchronousQueue。
- newSingleThreadExecutor 一个单线程的线程池,这个线程池与普通的单个线程的区别主要是他可以被重复利用,当这个线程出现异常时候系统会自从创建一个新的线程来代替他执行任务,这个线程池的阻塞队列也是LinkedBlockingQueue,也是无界队列。因为LinkedBlockingQueue是有顺序的队列,因此该线程池能保证任务的提交顺序与执行顺序一致。
- newScheduledThreadPool 这是个执行定时任务的线程池,在实际的使用过程中用的并不多,一般的定时任务都会使用特殊的框架来执行。
当我们看到这些线程池创建的时候,往往都是创建一个ThreadPoolExecutor类出来,通过参数的配置来区分这些线程池的类型。这个类正是线程池的核心类,下面让我们看看这个ThreadPoolExecutor类的具体构造:
在明确线程池构建之前,需要了解这几个重要的参数:
- corePoolSize 线程池的核心线程数量 当我们往线程池中提交一个任务时,如果线程池中的线程数量小于这个值的话就会创建新的线程出来(尽管这时候线程池中仍然有空闲线程),否则就将新添加的任务放到阻塞队列中去等待执行。
- maximumPoolSize 线程池中的最大线程数量 当阻塞队列添加满(有界队列)的情况下,如果继续提交任务的话就会继续增加线程知道线程的数量达到maximumPoolSize为止。
- Handler 当线程池的阻塞队列满了并且线程数量达到了maximumPoolSize,这时候线程已经没有办法处理更多的任务,那么这时候的处理策略就是特殊的Handler,通常有以下几种处理策略:
- AbortPolicy 默认的是丢弃策略,即直接抛出异常。
- CallerRunsPolicy 用调用者自己的线程来执行任务(例如主线程)
- DiscardPolicy 不抛出异常,直接丢弃新增加的任务。
- DiscardOldestPolicy 丢掉最老的还未执行的任务。
- keepAliveTime 空闲线程的存活时间,超出该时间的话线程被回收。一般情况下只有在线程数量超过corePoolSize的时候该参数才会生效,如果配置了allowCoreThreadTimeOut(true)的话核心线程也会被回收。
- unit 跟keepAliveTime配合使用的时间单位,通常使用的是秒。
- workQueue 用来保存待执行任务的任务队列,我们在上面看到了LinkedBlockingQueue的使用,其实还可以使用ArrayBlockingQueue(有界的数组阻塞队列,推荐);SynchronousQuene是一个不存储任何元素的阻塞队列,通常只有在一个线程调用移除操作之后另外一个线程才能够进行插入操作。
- threadFactory 创建线程池的工厂类,这个工厂类的好处就是可以对线程进行定制化,例如给每一个线程都添加固定的属性值等等。
以上的属性就是线程池的核心属性了,理解了上面的属性值,我们就一起来看看骑内部的具体实现:
具体实现
首先来看一下重要的属性
//ctl作为重要的状态字段,其通过高三位作为线程状态的描述,低29位作为线程数量的统计功能
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //1110 0000 0000。。。
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1; //0001 1111 1111。。。
//运行状态被存储在高三位中
private static final int RUNNING = -1 << COUNT_BITS; //1110 0000。。 可以接受新的任务,可以处理队列任务
private static final int SHUTDOWN = 0 << COUNT_BITS; //0000 0000。。 不可以接受新的任务,但是会继续处理队列任务
private static final int STOP = 1 << COUNT_BITS; //0010 0000。。不接受新任务,不处理队列中的任务,中断正在执行的任务
private static final int TIDYING = 2 << COUNT_BITS; //0100 0000。。所有的任务已经结束,任务线程为0
private static final int TERMINATED = 3 << COUNT_BITS; //0110 0000。。线程池处于关闭状态
//对于线程状态的转换这里也需要介绍一下:
// RUNNING -> SHUTDOWN(调用shudown方法)
// (RUNNING or SHUTDOWN) -> STOP(调用shutdownNow方法)
// SHUTDOWN -> TIDYING(当任务队列和线程池都为空时)
// STOP -> TIDYING(当线程池为空)
// TIDYING -> TERMINATED(terminated方法执行完)
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
private final BlockingQueue<Runnable> workQueue;//上文提到的阻塞队列
private final ReentrantLock mainLock = new ReentrantLock();//控制接近工作线程的锁
private final HashSet<Worker> workers = new HashSet<Worker>();//工作线程的集合,只有在持有mainLock的情况下才允许操作这个集合
private long completedTaskCount;//完成的任务数量
private volatile ThreadFactory threadFactory;
private volatile RejectedExecutionHandler handler;
private volatile long keepAliveTime;
private volatile boolean allowCoreThreadTimeOut; //在控制失效线程时候是否应该考虑coreThread
private volatile int corePoolSize;
private volatile int maximumPoolSize;
通常情况下我们通过线程池执行任务的入口是execute和submit,其中execute方法没有返回值,而submit方法的执行是否返回值的。下面我们通过execute方法切入到具体的线程执行过程中去讲解线程池:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//如果当前线程数小于corePoolSize则创建新的线程执行任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//如果当前线程池正处于运行状态并且阻塞队列没有满则将任务添加到阻塞队列中
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//再次检查一下当前线程池的状态,如果不是可运行状态就将这个任务移除
if (! isRunning(recheck) && remove(command))
reject(command);
//如果是可运行状态但是线程池中没有线程的时候就新增加一个线程(因为在二次检查过程中线程可能全部消亡,需要保证线程池中必须有执行任务的线程才可以)
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//否则就新增线程直到maximumPoolSize为止
else if (!addWorker(command, false))
reject(command);
}
从上面的代码解读中我们看到多次出现方法addWorker,接下来就看看addWorker方法的具体实现:
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果状态值和workQueue不满足条件就直接返回false,然后什么也不做
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//如果worker数量大于线程最大容量就直接返回,否则就增加一个工作线程
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新获取ctl状态,如果前后状态不一致的话就再次重试整个操作
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// 获取线程失败或者线程池在此期间被关闭都有可能导致状态改变,因此需要多次检查
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 如果线程t的状态是正在运行的状态就直接抛出异常
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted) //线程可能启动失败,例如在执行任务时线程抛出异常
addWorkerFailed(w);
}
return workerStarted;
}
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
workers.remove(w);
decrementWorkerCount();
tryTerminate(); //主要用于触发terminated状态后的钩子函数
} finally {
mainLock.unlock();
}
}
从上面可以看出来这个Worker类就是线程池中处理线程的主要类,他相当于对Thread的一层包装,下面看看Worker的具体实现。
//Work类既继承了AQS,可以非常方便的处理锁操作;也实现了Runnable接口,可以当做一个任务使用
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
/** Worker内部的真正线程*/
final Thread thread;
/** 初始任务,可有而无 */
Runnable firstTask;
/** 线程完成的任务数量 */
volatile long completedTasks;
public void run() {
runWorker(this);
}
//核心的任务调用流程就在这个方法了里面
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
//处理中断状态,因为如果线程池是STOP状态的话,会立即停止正在执行的所有任务在,这个时候应该是可以中断的。或者如果执行任务的线程wt是不可中断的并且线程状态>=STOP状态
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);//钩子函数,由子类实现
Throwable thrown = null;
try { //记录线程执行过程中可能出现的异常信息
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally { //执行钩子函数
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly); //处理线程的一些收尾工作
}
}
下面继续看一下getTask()是如何工作的:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池目前不能接受新任务并且任务队列是空或者线程池不再接受新任务,就减少当前的工作线程
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; //从队列中取任务的时候是否会等待一段时间过期
for (;;) {
int wc = workerCountOf(c);
//allowCoreThreadTimeOut作为一项线程池的配置,允许核心线程死亡
timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果线程数量小于最大要求数量并且不设置核心线程过期就什么都不做,这里break的话就不会减少线程,因为现在线程池中没有任务了
if (wc <= maximumPoolSize && ! (timedOut && timed))
break;
if (compareAndDecrementWorkerCount(c)) //如果可以减少线程的话,减少之后就直接返回
return null;
c = ctl.get(); // 多次读取状态值,因为线程池在运作期间可能会发生状态改变,如果状态改变了就要重新操作
if (runStateOf(c) != rs)
continue retry;
// 如果任务能够走到这里,说明在运作过程中线程的状态还是被改变了,需要重新运行for循环。
}
try {
// 如果可以等待过期了就直接调用Pull,否则调用take。调用take时候如果队列中没有任务的话就会一直阻塞知道有任务进来,这里不占用CPU
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true; //一次poll操作可能因为没有数据导致过期
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
从这里我们就知道为什么线程池中的线程可以一直执行任务,因为在没有任务的时候任务队列会为空,而在这是调用阻塞队列的take方法时候回发生阻塞,这时候线程就被挂起来了。当下次有任务进来的时候线程池,take方法立马被唤醒,然后继续上面的循环操作指导下一次take被阻塞!
以上是关于Execute操作的一些内容,下面介绍一下另一种提交任务的方式:submit。
我们常用的线程池操作除了execute之外还有submit操作,对于submit操作会返回一个结果值。我们可以在之后的操作里从这个返回的结果值里面取到线程运行的结果。
//submit操作会返回一个Future对象,我们可以从Future对象中取到这次任务的运行结果
<T> Future<T> submit(Callable<T> task);
实际上在线程提交过后会被封装成一个FutureTask对象。
FutureTask的主要实现又是依托于内部的一个类Sync,下面我们看看这个Sync:
private final class Sync extends AbstractQueuedSynchronize //通过继承AQS从而拥有一定的线程状态控制力
/**0表示该任务可以运行 */
private static final int READY = 0;
/**1表示该任务正在运行*/
private static final int RUNNING = 1;
/**2表示该任务已经运行过了*/
private static final int RAN = 2;
/**4表示该任务已经被取消*/
private static final int CANCELLED = 4;
private final Callable<V> callable;
/**get()方法但会的结果值 */
private V result;
/**任务执行过程中可能产生的异常信息*/
private Throwable exception;
/**
* The thread running task. When nulled after set/cancel, this
* indicates that the results are accessible. Must be
* volatile, to ensure visibility upon completion.(这里感觉英文翻译的比较完整)
*/
private volatile Thread runner;
然后我们从Submit动作的开始来分析线程池是如何一步一步走下去的:
// 1.封装任务 2.执行任务 3.返回结果
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 1.封装任务
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result)); //生成callable的适配器
}
所以最终还是返回了一个RunnableFuture对象,只不过在这个对象上做了进一步的包装.
// 2.执行任务
什么?竟然又回归到了execute()方法上去,这些在上文中都有讲过。所以实际上最后运行的方法还是Sync中定义的run方法:
void innerRun() {
//设置线程状态
if (!compareAndSetState(READY, RUNNING))
return;
runner = Thread.currentThread();
if (getState() == RUNNING) { // 多次检查
V result;
try {
result = callable.call();
} catch (Throwable ex) {
setException(ex);
return;
}
set(result); //设置结果
} else {
releaseShared(0); // 试图设置状态来反映共享模式下的一个释放
}
}
//set方法的具体内容
void innerSet(V v) {
for (;;) {
int s = getState();
if (s == RAN) //任务已经运行过了表示已经被set过了
return;
if (s == CANCELLED) {
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
result = v;
releaseShared(0);
done();
return;
}
}
}
注意:releaseShared是AQS里面的方法,我对这个类不熟悉,所以在此不进行分析,以免误人子弟。