更多 Java 并发编程方面的文章,请参见文集《Java 并发编程》
线程组 Thread Group
线程的集合,不推荐使用。
线程池 Thread Pool 介绍
所在包:java.util.concurrent
线程池用于多线程处理中,它可以根据系统的情况,可以有效控制线程执行的数量,优化运行效果。
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量,超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。
线程池的3个优点:
- 线程复用,不需要频繁的创建和销毁线程
- 控制最大并发数,提高系统资源利用率,同时避免过多的资源竞争,避免堵塞
- 管理线程
目的:减少大量创建和销毁线程带来的时间和空间上的消耗。
线程池的组成
一般的线程池主要分为以下4个组成部分:
- 线程池管理器:用于创建并管理线程池
- 工作线程:线程池中的线程
- 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
- 任务队列:用于存放待处理的任务,提供一种缓冲机制
线程池框架
Java 中的线程池是通过 Executor
框架实现的,该框架中用到了 Executor
,Executors
,ExecutorService
,ThreadPoolExecutor
,Callable
和 Future
、FutureTask
这几个类。
-
Executor
:所有线程池的接口,只有一个方法 -
Executors
:Executor
的工厂类,提供了创建各种不同线程池的方法,返回的线程池都实现了ExecutorService
接口 -
ThreadPoolExecutor
:线程池的具体实现类,一般所有的线程池都是基于这个类实现的
ThreadPoolExecutor
的构造方法如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
其中:
-
corePoolSize
:- 线程池的核心线程数,线程池中运行的线程数也永远不会超过
corePoolSize
个 - 在没有设置
allowCoreThreadTimeOut
为true
的情况下,核心线程会在线程池中一直存活,即使处于闲置状态
- 线程池的核心线程数,线程池中运行的线程数也永远不会超过
-
maximumPoolSize
:- 线程池中允许的最大线程数
- 当活动线程(核心线程+非核心线程)达到这个数值后,后续任务将会根据
RejectedExecutionHandler
来进行拒绝策略处理
-
keepAliveTime
:- 非核心线程 闲置时的超时时长。超过该时长,非核心线程就会被回收。
- 若线程池通过
allowCoreThreadTimeOut()
方法设置allowCoreThreadTimeOut
属性为true
,则该时长同样会作用于核心线程,AsyncTask
配置的线程池就是这样设置的
-
unit
:- 是一个枚举,它表示的是
keepAliveTime
的单位
- 是一个枚举,它表示的是
-
workQueue
:- 工作队列,用于存放不能被及时处理掉的任务的一个队列,通过线程池的
execute()
方法提交的Runnable
对象会存储在该队列中 - 它是一个
BlockingQueue
类型。关于BlockingQueue
,虽然它是Queue
的子接口,但是它的主要作用并不是容器,而是作为线程同步的工具,他有一个特征,当生产者试图向BlockingQueue
放入(put)元素,如果队列已满,则该线程被阻塞;当消费者试图从BlockingQueue
取出(take)元素,如果队列已空,则该线程被阻塞。
- 工作队列,用于存放不能被及时处理掉的任务的一个队列,通过线程池的
-
ThreadFactory
:- 线程工厂,功能很简单,就是为线程池提供创建新线程的功能
- 这是一个接口,可以通过自定义,做一些自定义线程名的操作
- 默认使用
DefaultThreadFactory
:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
-
RejectedExecutionHandler
:当任务无法被执行时(超过线程最大容量maximumPoolSize
并且workQueue
已经被排满了)的处理策略,这里有四种任务拒绝类型-
CallerRunsPolicy
:当线程池中的数量等于最大线程数时,重试添加当前的任务;它会自动重复调用execute()
方法。 -
AbortPolicy
: 默认策略,当线程池中的数量等于最大线程数时抛java.util.concurrent.RejectedExecutionException
异常 -
DiscardPolicy
:当线程池中的数量等于最大线程数时,默默丢弃不能执行的新加任务,不报任何异常。 -
DiscardOldestPolicy
:当线程池中的数量等于最大线程数时,抛弃线程池中工作队列头部的任务(即等待时间最久的任务),并执行新传入的任务。
-
线程池的工作过程
- 线程池刚创建时,里面没有一个线程。
- 当调用
execute()
方法添加一个任务时,线程池会做如下判断:- 如果正在运行的线程数量小于
corePoolSize
,那么马上创建 核心线程 运行这个任务; - 如果正在运行的线程数量大于或等于
corePoolSize
,那么将这个任务放入工作队列workQueue
;- 如果这时候队列满了,而且正在运行的线程数量小于
maximumPoolSize
,那么还是要创建 非核心线程 立刻运行这个任务; - 如果队列满了,而且正在运行的线程数量大于或等于
maximumPoolSize
,那么线程池会抛出异常RejectExecutionException
。
- 如果这时候队列满了,而且正在运行的线程数量小于
- 如果正在运行的线程数量小于
- 当一个线程完成任务时,它会从工作队列
workQueue
中取下一个任务来执行。 - 当一个 非核心线程 无事可做,超过一定的时间(
keepAliveTime
)时,线程池会判断,如果当前运行的线程数大于corePoolSize
,那么这个 非核心线程 就被停掉。所以线程池的所有任务完成后,它最终会收缩到corePoolSize
的大小。
常见的 Java 线程池
Executors
:提供一系列工厂方法来创建线程池,返回的线程池都实现了 ExecutorService
接口:
-
SingleThreadExecutor
- 单个线程的线程池,即线程池中每次只有一个线程在运行,单线程串行执行任务。如果这个唯一的线程因为异常结束,那么会有一个新的线程来替代它。此线程池保证所有任务的执行顺序按照任务的提交顺序执行。
-
Executors
类中方法定义:public static ExecutorService newSingleThreadExecutor()
- 实现:
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
-
FixedThreadPool
- 固定数量的线程池,只有核心线程,每提交一个任务就是一个线程,直到达到线程池的最大数量,然后后面进入等待队列,直到前面的任务完成才继续执行。线程池的大小一旦达到最大值就会保持不变,如果某个线程因为执行异常而结束,那么线程池会补充一个新线程。
-
Executors
类中方法定义:public static ExecutorService newFixedThreadPool(int nThreads)
- 实现:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
-
CachedThreadPool
-
可缓存线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。其中,
SynchronousQueue
是一个是缓冲区为1的阻塞队列。 -
Executors
类中方法定义:public static ExecutorService newCachedThreadPool()
- 实现:
-
可缓存线程池。如果线程池的大小超过了处理任务所需要的线程,那么就会回收部分空闲(60秒不执行任务)的线程,当任务数增加时,此线程池又可以智能的添加新线程来处理任务。此线程池不会对线程池大小做限制,线程池大小完全依赖于操作系统(或者说JVM)能够创建的最大线程大小。其中,
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
-
ScheduledThreadPool
-
核心线程池固定,大小无限制的线程池,支持定时和周期性的执行线程。创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在
DEFAULT_KEEPALIVEMILLIS
时间内回收。
-
核心线程池固定,大小无限制的线程池,支持定时和周期性的执行线程。创建一个周期性执行任务的线程池。如果闲置,非核心线程池会在
-
Executors
类中方法定义:public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
- 实现:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
注意事项:
- 线程池 ExecutorService 中可以使用
execute(t)
或者submit(t)
方法加入多个任务,任务的执行顺序与加入顺序无关- 关于
execute(t)
和submit(t)
的区别,请参见 Java ExecutorService
- 关于
- 如果线程池中某个线程因为执行异常而结束,线程池会自动补充一个新线程
示例:
public class ThreadPool_Test {
public static void main(String[] args) {
ExecutorService es = Executors.newSingleThreadExecutor();
for (int i = 0; i < 5; i++) {
es.execute(new MyThread());
}
es.shutdown();
}
static class MyThread extends Thread {
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
}
以上代码的输出如下:
因为创建的线程池中只有一个线程,因此每次只会有一个任务执行,getName()
都会得到相同的结果。
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
pool-1-thread-1
如果将 Executors.newSingleThreadExecutor()
更改为 Executors.newFixedThreadPool(2)
,创建一个有两个线程的线程池,则输出如下:
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
pool-1-thread-1
pool-1-thread-2
Thread Factory
在我们通过 Executors 来创建线程池的时候,还可以传入一个参数 ThreadFactory
,例如:
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
An object that creates new threads on demand. Using thread factories removes hardwiring of calls to new Thread.
ThreadFactory
遵循了工厂模式,提供 newThread()
方法用来创建线程。
示例:我们希望创建一个线程,它能够在产生 不能捕获的 Runtime Exception 时能自动 Logging:
public class ThreadFactory_Test {
public static void main(String[] args) {
ExecutorService es = Executors.newSingleThreadExecutor(new MyThreadFactory());
es.execute(new MyThread());
es.shutdown();
}
static class MyThread extends Thread {
public void run() {
// Runtime exception
int i = 1 / 0;
}
}
static class MyThreadFactory implements ThreadFactory {
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.err.println(t.getName() + e.getMessage());
}
});
return t;
}
}
}
线程池的关闭
可以通过调用线程池的 shutdown
或 shutdownNow
方法来关闭线程池,但是它们的实现原理不同:
-
shutdown
的原理是只是将线程池的状态设置成SHUTDOWN
状态,然后中断所有没有正在执行任务的线程 -
shutdownNow
的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt
方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow
会首先将线程池的状态设置成STOP
,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
只要调用了这两个关闭方法的其中一个,isShutdown
方法就会返回 true
。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed
方法会返回 true
。
线程池的监控
通过线程池提供的参数进行监控。
线程池里有一些属性在监控线程池的时候可以使用:
-
public long getTaskCount()
:线程池需要执行的任务数量。 -
public long getCompletedTaskCount()
:线程池在运行过程中已完成的任务数量。小于或等于taskCount。 -
public int getPoolSize()
:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。 -
public int getLargestPoolSize()
:线程池曾经创建过的最大线程数量。通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。 -
public int getActiveCount()
:获取活动的线程数。
通过扩展线程池进行监控。通过继承线程池并重写线程池的 beforeExecute
,afterExecute
和 terminated
方法,我们可以在任务执行前,执行后和线程池关闭前干一些事情。如监控任务的平均执行时间,最大执行时间和最小执行时间等。这几个方法在线程池里是空方法。如:
protected void beforeExecute(Thread t, Runnable r) { }
合理的配置线程池
任务的性质:CPU密集型任务,IO密集型任务和混合型任务。
任务的优先级:高,中和低。
任务的执行时间:长,中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
任务性质不同的任务可以用不同规模的线程池分开处理:
- CPU密集型任务配置尽可能少的线程数量,如配置 Ncpu+1个线程的线程池。
- IO密集型任务则由于需要等待IO操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*Ncpu。
- 混合型的任务,如果可以拆分,则将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。
我们可以通过Runtime.getRuntime().availableProcessors()
方法获得当前设备的CPU个数。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue
来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者也可以使用优先级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,如果等待的时间越长 CPU 空闲时间就越长,那么线程数应该设置越大,这样才能更好的利用CPU。
引用:
ThreadFactory usage in Java
Java线程池的简介以及使用
关于线程池的执行原则及配置参数详解
聊聊并发(3):Java线程池的分析和使用