1、概述
Java中的关于线程池的核心类是Executor,Executor是一个接口,真正的线程池的实现为ThreadPoolExecutor,ThreadPoolExecutor提供了一系列参数来配置线程池,通过不同的参数可以创建不同的线程池。线程池的优点在于:
① 复用线程池中的线程,避免因为线程的创建和销毁所带来的性能开销。
② 能够有效的控制线程池的最大并发数,避免大量的线程之间因互相抢占系统资源而导致的阻塞现象。
③ 能够对线程进行简单的管理,并提供定时执行以及指定间隔循环执行等功能。
2、构造方法及参数
线程池有多种构造方法,其中最复杂的一个是含有7个参数的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
线程池构造方法
① CorePoolSize
线程的核心线程数。默认情况下,核心线程数会在线程中一直存活,即使它们处于闲置状态。如果将ThreadPoolExecutor的allowCoreThreadTimeOut属性设置为true,那么核心线程就会存在超时策略,这个时间间隔有keepAliveTime所决定,当等待时间超过keepAliveTime所指定的时长后,核心线程就会被停止。
② maximumPoolSize
线程池所能容纳的最大线程数。当活动线程数达到这个数值后,后续的新任务将会被阻塞。
③ keepAliveTime
非核心线程闲置时的超时时长,超过这个时长,非核心线程就会被回收,当ThreadPoolExector的allowCoreThreadTimeOut属性设置为True时,keepAliveTime同样会作用于核心线程。
④ unit
用于指定keepAliveTime参数的时间单位,这是一个枚举,常用的有TimeUnit.MILLISECONDS(毫秒)、TimeUnit.SECONDS(秒)以及TimeUnit.MINUTES(分钟)等。
TimeUnit.NANOSECONDS 纳秒
TimeUnit.MICROSECONDS 微秒
TimeUnit.MILLISECONDS 毫秒
TimeUnit.SECONDS 秒
TimeUnit.MINUTES 分钟
TimeUnit.HOURS 小时
TimeUnit.DAYS 天
⑤ workQueue
线程池中的任务队列,通过线程池execute方法提交的Runnable对象会存储在这个参数中。这个任务队列是BlockQueue类型,属于阻塞队列,就是当队列为空的时候,此时取出任务的操作会被阻塞,等待任务加入队列中不为空的时候,才能进行取出操作,而在满队列的时候,添加操作同样被阻塞。
⑥ threadFactory
线程工厂,为线程池提供创建新线程的功能。ThreadFactory是一个接口,它只有一个方法,newThread(Runnable r),用来创建线程。
ThreadFactory factory =new ThreadFactory() {
//线程安全的Integer操作类
private final AtomicInteger mCount =new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "new Thread #" + mCount.getAndIncrement());
}
};
⑦ 饱和策略,当任务队列和线程池都满了时所采取的应对策略,有已下几种:
AbordPolicy: 抛出RejectedExecutionExcepton异常,该策略是默认策略。
CallerRunsPolicy: 用调用者所在的线程来处理任务。此策略提供简单的反馈控制,能够减缓新任务提交速度。
DiscardPolicy: 直接将该任务舍弃。
DiscardOldestPolicy: 丢弃最老的任务,并执行当前任务。
3、线程池执行逻辑
线程池的执行逻辑如上图所示,每当有一个任务需要由线程池执行,先判断线程池中线程数是否小于核心线程数,是的话开一个核心线程,并在该线程中执行。如果已经大于核心线程数,则判断任务队列是否已满,如果未满的话线程任务入队。如果任务队列已满,则判断线程池中线程数是否大于最大线程数,如果不是的话就开一个非核心线程处理任务。如果已经开到最大线程数,则调用饱和策略。当线程池中数量大于核心线程数时,若某非核心线程数的空闲时间大于闲置超时时长,则回收该空闲线程。
4、常见线程池
4.1 FixedThreadPool
创建方法:
public static ExecutorService newFixedThreadPool(int var0) {
return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
该线程池从构造方法可以看出,其核心线程池数量和最大线程池数量一致,故其线程都是核心线程,所以不会被回收。同时它的队列是LinkedBlockingQueue,这个队列详细信息之后会介绍,它有一个特点就是其队列的大小没有限制,也就是说超出的线程任务都会在队列中排队等待。也正是这个特点,可以使它的请求任务小于最大线程数时可以更快的响应外界的请求。
4.2 ScheduledThreadPool
创建方法:
public static ScheduledExecutorService newScheduledThreadPool(int var0) {
return new ScheduledThreadPoolExecutor(var0);
}
public ScheduledThreadPoolExecutor(int var1) {
super(var1, 2147483647, 0L, TimeUnit.NANOSECONDS, new ScheduledThreadPoolExecutor.DelayedWorkQueue());
}
该线程池有自定义的核心线程数,但是其非核心线程数设置的是int的最大值,可以理解为对非核心线程数没有限制。同时keepAliveTime为0,意味着非核心线程一旦执行完毕就会立刻回收。这个线程池适合执行定时任务和具有固定周期的重复任务。
4.3 CachedThreadPool
创建方法:
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
该线程池没有核心线程,并且不限制非核心线程数量,它的keepAliveTime为60秒,超过这个时间就会回收线程。其队列为SynchronousQueue,这个队列的特点是不做数据缓冲,可以粗略理解为这个队列的大小为0,应用到这个线程池里也就是说即一旦有任务过来,会立刻开一个线程执行任务。这个线程池的一般用来执行大量的低耗时任务,当整个线程池闲置,其所有线程都会关闭,从而处于一个低占用资源的状态。
4.4 SingleThreadExecutor
创建方法:
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()));
}
该线程池只有且只有一个核心线程,而队列采用的是刚才所说的LinkedBlockingQueue。所以该线程池的线程是一个个执行的,后来的线程都加入到队列中等待。该线程池的意义在于将所有任务都控制在一个线程中,并按照一定顺序执行,这样就不用处理这些任务的同步问题。
5、阻塞队列
上一节已经提到了两个阻塞队列LinkedBlockingQueue和SynchronousQueue了,这一节就来详细介绍一下。
阻塞队列一般用于有生产者和消费者的场景,也就是说生产者往队列里面添加数据,而消费者则从队列中消费数据。这边往往会遇到两种情况:
① 队列中没有数据时,消费者端会被自动阻塞(挂起),直到有数据放入队列,再被唤醒。
② 当队列中填满数据时,生产者端会被自动阻塞(挂起),直到队列中有空位置时,再被唤醒。
阻塞队列提供了四种处理方法:
方法\处理方式 | 抛出异常 | 返回特殊值 | 一直阻塞 | 超时退出 |
---|---|---|---|---|
插入方法 | add(e) | offer(e) | put(e) | offer(e,time,unit) |
移除方法 | remove() | poll() | take() | poll(time,unit) |
检查方法 | element() | peek() | 不可用 | 不可用 |
抛出异常:是指当阻塞队列满时候,再往队列里插入元素,会抛出IllegalStateException(“Queue full”)异常。当队列为空时,从队列里获取元素时会抛出NoSuchElementException异常 。
返回特殊值:插入方法会返回是否成功,成功则返回true。移除方法,则是从队列里拿出一个元素,如果没有则返回null
一直阻塞 :当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到拿到数据,或者响应中断退出。当队列空时,消费者线程试图从队列里take元素,队列也会阻塞消费者线程,直到队列可用。
超时退出:当阻塞队列满时,队列会阻塞生产者线程一段时间,如果超过一定的时间,生产者线程就会退出。
接下来介绍一下java中的阻塞队列。
5.1 ArrayBlockingQueue
一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。一旦指定了队列的长度,则队列的大小不能被改变。在生产者消费者例子中,如果生产者生产实体放入队列超过了队列的长度,则在offer(或者put,add)的时候会被阻塞,直到队列的实体数量小于队列的初始size为止。不过可以设置超时时间,超时后队列还未空出位置,则offer失败。如果消费者发现队列里没有可被消费的实体时也会被阻塞,直到有实体被生产出来放入队列位置,不过可以设置等待的超时时间,超过时间后会返回null。
5.2 DelayQueue
一个使用优先级队列实现的无界阻塞队列。队列中的元素必须实现Delayed接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
5.3 LinkedBlockingQueue
一个用链表实现的有界阻塞队列。可以通过构造方法设置capacity来使得阻塞队列是有界的,也可以不设置,则为无界队列。此队列按照先进先出的原则对元素进行排序。
5.4 PriorityBlockingQueue
无界限队列,相当于PriorityQueue 和 BlockingQueue的结合体。插入的对象必须是可比较的,或者通过构造方法实现插入对象的比较器Comparator。队列里的元素按Comparator comparator比较结果排序。
5.5 SynchronousQueue
无内部容量的阻塞队列,put必须等待take,同样take必须等待put。比较适合两个线程间的数据传递。异步转同步的场景不太适用,因为对于异步线程来说在处理完事务后进行put,但是必须等待put的值被取走。SynchronousQueue的吞吐量高于LinkedBlockingQueue 和 ArrayBlockingQueue。
6、线程池execute()方法分析
线程池中最常用的方法就是其void execute(Runnable command) 方法了,去看一下其源码实现:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
官方对该方法是这么注释的:该方法给线程池提交一个将来要执行的任务。这个任务可能在一个新开的线程中执行,或者在一个已经在线程池中的线程执行。如果任务无法提交执行,无论是因为该执行程序已关闭或因其容量已达到,该任务将由当前的RejectedExecutionHandler处理。
上面都是官方的注释,其实已经大致说明白了这个方法的执行流程。和我再3、线程池执行逻辑里面说的差不多。从上面官方注释也可以看出,线程池执行任务时间不是一调用execute()就执行任务的。线程池会根据各项指标状况来决定是唤醒线程池中一个已有的阻塞线程来执行还是new一个Thread来执行任务。具体这些策略和我们创建线程池适合传入的参数有关。前面几节已经说明了。
6.1 execute()处理三步
在execute()函数内官方注释将其分为了3步来进行:
① 如果运行的线程数少于核心线程,开启一个新线程给这个command。调用addWorker()方法,并用原子性的方式检查runState和workerCount,如果addWorker()返回true,则代表任务执行成功,直接返回。否则进入下一步。对应如下代码:
// void execute(Runnable command)
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
② 这一步将任务入队,如果任务能成功入队,此时任然应该要仔细检查是否应该添加一个线程(因为在上次检查到这次检查之间,可能有线程死亡)或者在进入这个方法时候线程池已经关闭了。所以需要检查状态并且如果有必要的话回滚队列,或者如果没有线程的话新开始一个线程。对应如下代码:
// void execute(Runnable command)
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
③ 如果入队失败,那么我们尝试添加一个新线程。 如果失败,我们知道线程池已关闭或饱和,因此拒绝任务。对应如下代码:
// void execute(Runnable command)
else if (!addWorker(command, false))
reject(command);
6.2 ctl成员变量
ctl是控制线程池状态的变量,由两部分组成,runState(高位)和workerCount(低28位),这种处理方法类似于Android里测量阶段要用到的MeasureSpec变量。runStateOf(int c) 方法用于获取线程池状态,workerCountOf(int c)方法用于获取线程池正在运行线程数。这样在execute()中调用的方法isRunning(recheck)作用就很好判断了,其作用是判断线程是否处在运行状态。
6.3 addWorker(Runnable firstTask,boolean core)
我们在execute()中多次看到了addWorker()函数,看一下该函数做了什么:
// ThreadPoolExecutor.class
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如有必要检查队列是否为空,如果空直接返回false
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // 重新获取ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 保持锁定时重新检查。
// 线程生成失败或者在获取锁之前线程池关闭则退出
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // 再次检查线程是启动的
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
整个方法的作用大致流程就是检查是否可以根据当前池状态和给定的界限(核心线程数或最大线程数)添加并创建新的Worker(Worker内部封装了一个线程)。 如果可以,则相应地更新线程中Worker数量,并且创建并开始新的Worker,运行firstTask作为其第一个任务。 如果线程池停止或被关闭,此方法返回false。 如果线程工厂在被询问时未能创建线程,它也会返回false。 如果线程创建失败,或者由于线程工厂返回null,或者由于异常(通常是Thread.start()中的OutOfMemoryError),则会调用 addWorkerFailed(w) 干净地回滚,将新创建的Worker回收。
Tips: addWorker()中调用的方法compareAndIncrementWorkerCount(c),其作用是比较原子变量并加一,如果改原子变量加一前值没有变就返回true,不然就返回false。