线程池
使用线程池的优点:
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
Java中的线程池是用ThreadPoolExecutor类来实现的
Executor接口
此接口提供了一种将任务提交与每个任务的运行机制分离的方法,包括线程使用,调度等的详细信息。该接口中只有execute(Runnable command)方法,用来替代通常创建或启动线程的方法。例如使用Thread创建线程
Thread thread = new Thread();
thread.start();
使用execute创建运行线程,具体的线程执行会由相应的实现类去执行(jdk默认线程池execute的实现是由ThreadPoolExecutor来实现的)
Thread thread = new Thread();
executor.execute(thread);
ExecutorService接口
ExecutorService接口提供管理终止的方法和可以生成Future的方法,用于跟踪一个或多个异步任务的进度, 它继承了Executor接口,同时增加了shutDown(),shutDownNow(),invokeAll(),invokeAny()和submit()等方法。
- shutDown() : 允许之前提交的任务继续执行(执行完后shutDown,不会再接收新的任务)
- shutDownNow():立即停止正在执行的任务
- invokeAll():执行给定的任务,当所有任务完成后返回任务状态和结果的Futures列表
- invokeAny():执行给定的任务,返回已完成的任务的结果
- submit():提交线程
AbstractExecutorService类
ExecutorService接口的默认实现,同时也是线程池实现类ThreadPoolExecutor的父类。
主要看下submit()方法与invokeAll()方法
/**不管参数是Callable还是Runable, 执行方法都一样
生成一个task,然后执行task,execute方法的具体实现在ThreadPoolExecutor中**/
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
/**代码很简单,将给定的任务线程封装成Future对象,
等待所有任务执行完成,统一返回Future对象,
如果出现异常,会将未完成的任务取消**/
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
for (int i = 0, size = futures.size(); i < size; i++) {
Future<T> f = futures.get(i);
if (!f.isDone()) {
try {
/** 没有完成,阻塞**/
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done)
for (int i = 0, size = futures.size(); i < size; i++)
futures.get(i).cancel(true);
}
}
ThreadPoolExecutor类
在关注ThreadPoolExecutor之前,先来了解下线程的基本状态信息。
线程总的来说有NEW(初始)、RUNNABLE(运行)、WAITING(等待)、TIME_WAITING(超时等待)、BLOCKED(阻塞)、TERMINATED(终止)6种状态。
- NEW:初始状态,线程被构建,但是还没有调用 start 方法
- RUNNABLED:运行状态,JAVA 线程把操作系统中的就绪和运行两种状态统一称为“运行中”
- BLOCKED:阻塞状态,表示线程进入等待状态,也就是线程因为某种原因放弃了 CPU 使用权,阻塞也分为几种情况
等待阻塞:运行的线程执行 wait 方法,jvm 会把当前线程放入到等待队列
同步阻塞:运行的线程在获取对象的同步锁时,若该同步锁被其他线程锁占用了,那么 jvm 会把当前的线程放入到锁池中
其他阻塞:运行的线程执行 Thread.sleep 或者 Thread.join 方法,或者发出了 I/O请求时,JVM 会把当前线程设置为阻塞状态,当 sleep 结束、join 线程终止、io 处理完毕则线程恢复- WAITING:等待,需要主动唤醒
- TIME_WAITING:超时等待状态,超时以后自动返回.
- TERMINATED:终止状态,表示当前线程执行完毕
对于线程池而言,也有五种种不同的状态,分别为RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED
- RUNNING:运行状态,可以处理任务,并且接收任务(前提阻塞队列处于未满状态,阻塞队列一旦满了,会根据相应的饱和策略进行不同的处理)
- SHUTDOWN:关闭状态,不能接收新的任务,但是能处理队列中的任务(shutdow方法)
- STOP:停止状态,不能接收行的任务,不能处理队列中的任务并且会中断正在运行的任务(shutdownNow方法)
- TIDYING:所有的任务都终止了,workCount为0,会进入该状态,将调用terminated方法进入TERMINATED状态
- TERMINATED:terminated()方法执行完成
ThreadPoolExecutor提供了四个构造方法:
我们以最后一个构造方法(参数最多的那个),对其参数进行解释:
public ThreadPoolExecutor(int corePoolSize, // 1
int maximumPoolSize, // 2
long keepAliveTime, // 3
TimeUnit unit, // 4
BlockingQueue<Runnable> workQueue, // 5
ThreadFactory threadFactory, // 6
RejectedExecutionHandler handler ) { //7
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 核心线程池大小 |
2 | maximumPoolSize | int | 最大线程池大小 |
3 | keepAliveTime | long | 线程最大空闲时间 |
4 | unit | TimeUnit | 时间单位 |
5 | workQueue | BlockingQueue<Runnable> | 线程等待队列 |
6 | threadFactory | ThreadFactory | 线程创建工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
当在execute(Runnable)方法中提交新任务并且少于corePoolSize线程正在运行时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。
如果有多于corePoolSize但小于maximumPoolSize线程正在运行,则仅当队列已满时才会创建新线程。
通过设置corePoolSize和maximumPoolSize相同,您可以创建一个固定大小的线程池。
通过将maximumPoolSize设置为基本上无界的值,例如Integer.MAX_VALUE,您可以允许池容纳任意数量的并发任务。 通常,核心和最大池大小仅在构建时设置,但也可以使用setCorePoolSize和setMaximumPoolSize进行动态更改。
ThreadFactory 线程工厂
新线程使用ThreadFactory创建。 如果未另行指定,则使用Executors.defaultThreadFactory默认工厂,使其全部位于同一个ThreadGroup中,并且具有相同的NORM_PRIORITY优先级和非守护进程状态。Keep-alive times 线程存活时间
如果线程池当前拥有超过corePoolSize的线程,那么多余的线程在空闲时间超过keepAliveTime时会被终止 。这提供了一种在不积极使用线程池时减少资源消耗的方法。Queuing 队列
BlockingQueu用于存放提交的任务,队列的实际容量与线程池大小相关联。
1、如果当前线程池任务线程数量小于核心线程池数量,执行器总是优先创建一个任务线程,而不是从线程队列中取一个空闲线程。
2、如果当前线程池任务线程数量大于核心线程池数量,执行器总是优先从线程队列中取一个空闲线程,而不是创建一个任务线程。
3、如果当前线程池任务线程数量大于核心线程池数量,且队列中无空闲任务线程,将会创建一个任务线程,直到超出maximumPoolSize,如果超时maximumPoolSize,则任务将会被拒绝。
主要有三种队列策略:
- Direct handoffs 直接握手队列
Direct handoffs 的一个很好的默认选择是 SynchronousQueue,它将任务交给线程而不需要保留。这里,如果没有线程立即可用来运行它,那么排队任务的尝试将失败,因此将构建新的线程。
此策略在处理可能具有内部依赖关系的请求集时避免锁定。Direct handoffs 通常需要无限制的maximumPoolSizes来避免拒绝新提交的任务。 但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致线程数量会无限增长问题。- Unbounded queues 无界队列
当所有corePoolSize线程繁忙时,使用无界队列(例如,没有预定义容量的LinkedBlockingQueue)将导致新任务在队列中等待,从而导致maximumPoolSize的值没有任何作用。当每个任务互不影响,完全独立于其他任务时,这可能是合适的; 例如,在网页服务器中, 这种队列方式可以用于平滑瞬时大量请求。但得注意,当任务持续以平均提交速度大余平均处理速度时,会导致队列无限增长问题。- Bounded queues 有界队列
一个有界的队列(例如,一个ArrayBlockingQueue)和有限的maximumPoolSizes配置有助于防止资源耗尽,但是难以控制。队列大小和maximumPoolSizes需要 相互权衡:
使用大队列和较小的maximumPoolSizes可以最大限度地减少CPU使用率,操作系统资源和上下文切换开销,但会导致人为的低吞吐量。如果任务经常被阻塞(比如I/O限制),那么系统可以调度比我们允许的更多的线程。
使用小队列通常需要较大的maximumPoolSizes,这会使CPU更繁忙,但可能会遇到不可接受的调度开销,这也会降低吞吐量。
这里主要为了说明有界队列大小和maximumPoolSizes的大小控制,若何降低资源消耗的同时,提高吞吐量
-
Rejected tasks 拒绝任务
拒绝任务有两种情况:1. 线程池已经被关闭;2. 任务队列已满且maximumPoolSizes已满;
无论哪种情况,都会调用RejectedExecutionHandler的rejectedExecution方法。
预定义了四种处理策略:
- AbortPolicy:默认测策略,抛出RejectedExecutionException运行时异常;
- CallerRunsPolicy:这提供了一个简单的反馈控制机制,可以减慢提交新任务的速度;
- DiscardPolicy:直接丢弃新提交的任务;
- DiscardOldestPolicy:如果执行器没有关闭,队列头的任务将会被丢弃,然后执行器重新尝试执行任务(如果失败,则重复这一过程);
我们可以自己定义RejectedExecutionHandler,以适应特殊的容量和队列策略场景中。