线程池作用是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费系统资源,多了造成系统拥挤效率不高。用线程池控制线程数量,其他线程排队等候。一个任务执行完毕,再从队列中取出最前面的任务开始执行。
1. ThreadPoolExecutor
实际上,一般说的线程池接口,基本上说的就是ExecutorService。ExecutorService源码中有各种API帮助我们使用,ExecutorService接口的默认实现类为ThreadPoolExecutor。
1.1 线程池的构造函数
构造参数的含义:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1. corePoolSize:核心线程数量。
默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
核心线程默认会一直存活在线程池中,即使核心线程处于闲置状态。
如果指定ThreadPoolExecutor的allowCoreThreadTimeout=true,那么核心线程若处于闲置状态的话,超过一定的时间(KeepAliveTime),就会销毁掉。
2. maximumPoolSize:线程池最大线程数
线程池中最多能创建多少个线程。
3. keepAliveTime:非核心线程闲置超时时间
默认情况下,当线程池的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池的线程数不大于corePoolSize。
4. TimeUnit:keepAliveTime:时间单位
MILLISECONDS : 1毫秒 、SECONDS : 秒、MINUTES : 分、HOURS : 小时、DAYS : 天
5. BlockingQueue<Runnable>:阻塞队列
一个阻塞队列,用于存储等待执行的任务。一般使用LinkedBlockingQueue或者Synchronous(谁坤尼斯
)。维护着等待执行的Runnable对象。若corePoolSize满了之后,则新添加的任务会被添加到这个队列中等待处理,如果队列满了,则创建非核心线程执行任务。
6. ThreadFactory:创建线程的方式
这是一个接口,Thread newThread(Runnable r),需要实现该方法。
7. RejectedExecutionHandler:拒绝策略
当线程无法执行新任务(一般是由于线程池的线程数量已经达到最大数(maximumPoolSize)或者线程池关闭导致的)默认情况下当线程池无法处理新线程,会抛出RejectedExecutionException。
1.2 线程池的生命周期
- 当创建线程池后,初始时,线程池处于Running状态。
- 当调用shutdown()方法,则线程池处于shutdown状态,此时线程池不能接受新的任务,他会等待所有任务执行完毕。
- 当调用shutdownNow()方法,则线程处于stop状态,此时线程池不能接受新的任务,并且试图尝试终止正在执行的任务。
- 当线程池处于shutdown状态或者stop状态,并且所有工作线程已经销毁,任务缓存队列已经清空或者执行结束后,线程池被设置为terminated(终止
谭母你提的
)。
1.3 线程池任务执行流程
corePoolSize就是核心线程池的大小,maxmumPoolSize就是线程池的任务量过大时的一种补救措施。
- 当线程池中线程数量小于corePoolSize,每来一个任务,就会创建一个线程执行这个任务。
- 当前线程池线程数量大于等于corePoolSize,则每来一个任务。会尝试将其添加到任务缓存队列中,若是添加成功,则该任务会等待线程将其取出去执行;若添加失败(一般来说任务缓存队列已满),则会尝试创建新的线程执行。
- 当前线程池线程数量等于maximumPoolSize,则会采取任务拒绝策略进行处理。
- 当前线程池线程数量大于corePoolSize,如果某线程空闲时间超过keepAliveTime,线程将会被终止,直到不大于corePoolSize数量。如果允许为核心池(corePoolSize)中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
1.4 线程池中的线程初始化
默认情况下,创建线程池之后,线程池是没有线程的,需要提交任务才会创建线程。
在实际中如果需要线程池创建之后,立即创建线程,可以通过:
- java.util.concurrent.ThreadPoolExecutor#prestartCoreThread:初始化一个核心线程。
- java.util.concurrent.ThreadPoolExecutor#prestartAllCoreThreads:初始化所有核心线程。
1.5 任务缓存队列及排队策略
java.util.concurrent.BlockingQueue的特性是:当队列是空的时候,从队列中获取或删除元素将被阻塞,或者当队列是满时,往队列中添加元素会被阻塞。
阻塞队列不接受空值,当尝试向队列中添加空值时,会抛出NullPointerException。
阻塞队列都是线程安全的,所有的查询方法都是原子的,并且使用了内部锁或者其他形式的并发控制。
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出,默认大小无限大(注意内存溢出),吞吐量通常高于ArrayBlockingQueue。
- synchronousQueue:不存储元素的存储队列,每个插入操作必须等待另一个线程调用移除操作,否则该插入操作一直处于阻塞状态。吞吐量要高于LinkedBlockingQueue。
1.6 任务拒绝策略
线程池的任务缓存队列已满并且线程池中线程数量已达到maxmumPoolSize,如果还有任务的到来就会采取拒绝策略。通常有四种处理策略。
1.6.1 线程池默认的处理策略
处理策略 | 效果 |
---|---|
AbortPolicy | 丢弃任务并抛出RejectedExecutionException异常 |
DiscardPolicy | 也是丢弃任务,但是不抛出异常 |
DiscardOldestPolicy | 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程) |
CallerRunsPolicy | 只要线程池未关闭,该策略直接在调用者线程中串行运行被丢弃的任务,显然这样不会真的丢弃任务,但是可能会造成调用者性能急剧下降 |
1.6.2 自定义线程池处理策略
@Slf4j
public class PrintingPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
log.info(r.toString() + "被抛弃了!");
}
}
有一种场景是:当线程池满了之后,我们的拒绝策略:主线程保持阻塞,线程池有资源了,主线程在将任务交由线程池处理。【不丢弃任务,当线程池没资源,主线程等待线程池有资源】
因为我们一般声明线程池,使用的是ArrayBlockingQueue
有界的阻塞队列,而BlockingQueue
特点是当队列满了,线程向队列存数据,线程会被阻塞。
@Slf4j
public class BlockPolicy implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
log.error("", e);
}
}
}
1.7 线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭。
shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
1.8 线程池容量的动态调整
ThreadPoolExcutor提供了动态调整线程容量大小的方法:
setCorePoolSize:设置核心池大小。
setMaximumPoolSize:设置线程池最大能创建的线程数目大小。
1.9 创建线程池的API
Executors类中也提供了几个静态方法来创建线程池:(不推荐使用)
1. java.util.concurrent.Executors#newCachedThreadPool()
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
corePoolSize大小为0;maxmumPoolSize为无限大;任务队列为SynchronousQueue,该队列没有缓存区,即不会有任务在该队列中排队。
该线程池时为了缓存频繁用到的线程。
因为是"缓存"线程池,没有缓存可以永久有效,因此coreSizePool为0,因此任务队列的缓存区应该为0。否则即便系统有可用线程资源,当有新的任务进入时也不会立即执行,而是进入任务队列排队直到队列满,这样显然不合理。
由于队列缓冲区为空,那么每来一个任务,都会新建线程执行任务,这样可能会导致大量线程被创建,进而系统瘫痪。
2. java.util.concurrent.Executors#newSingleThreadExecutor()
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
单线程化的线程池,它只会用唯一的工作线程来执行任务。但是需要注意的是大量任务堆积在LinkedBlockingQueue中,可能会造成内存溢出。
3. java.util.concurrent.Executors#newFixedThreadPool(int)
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
创建定长线程池,可控制线程最大并发数,超过的线程会在队列中等待。
注意事项
若是线程池采用的是LinkedBlockingQueue,那么若是消费的速度小于生产的速度,内存随着时间的堆积,可能会造成内存溢出。
解决方案:可以采用有界队列,可能造成部分任务被丢弃。