一、为什么要用线程池?
Java 中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在开发过程中,合理地使用线程池能够带来 3个好处:
1)降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2)提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。 如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。线程池技术正是关注如何缩短或调整 T1,T3 时间的技术,从而提高服务器程序性能的。它把 T1,T3 分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有 T1,T3 的开销了。
3)提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。假设一个服务器一天要处理 50000 个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于 50000。所以利用线程池的服务器程序不会为了创建50000 而在处理请求时浪费时间,从而提高效率。
二、ThreadPoolExecutor 的类关系
Executor 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开来。
ExecutorService 接口继承了 Executor,在其上做了一些 shutdown()、submit()的扩展,可以说是真正的线程池接口;
AbstractExecutorService 抽象类实现了 ExecutorService 接口中的大部分方法;
ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
ScheduledExecutorService 接口继承了 ExecutorService 接口,提供了带"周期执行"功能 ExecutorService;
ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor 比 Timer 更灵活,功能更强大。
三、线程池的创建各个参数含义
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long
keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory
threadFactory,RejectedExecutionHandler handler)
1、corePoolSize
线程池中的核心线程数,当提交一个任务时,线程池创建一个新线程执行任务,直到当前线程数等于 corePoolSize;
如果当前线程数为 corePoolSize,继续提交的任务被保存到阻塞队列中,等待被执行;
如果执行了线程池的 prestartAllCoreThreads()方法,线程池会提前创建并启动所有核心线程。
2、maximumPoolSize
线程池中允许的最大线程数。如果当前阻塞队列满了,且继续提交任务,则创建新的线程执行任务,前提是当前线程数小于 maximumPoolSize
3、keepAliveTime
线程空闲时的存活时间,即当线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于 corePoolSize 时才有用
TimeUnit
keepAliveTime 的时间单位
workQueue
workQueue 必须是 BlockingQueue 阻塞队列。当线程池中的线程数超过它的corePoolSize 的时候,线程会进入阻塞队列进行阻塞等待。通过 workQueue,线程池实现了阻塞功能
4、workQueue
用于保存等待执行的任务的阻塞队列,一般来说,我们应该尽量使用有界队列,因为使用无界队列作为工作队列会对线程池带来如下影响。
1)当线程池中的线程数达到 corePoolSize 后,新任务将在无界队列中等待,因此线程池中的线程数不会超过 corePoolSize。
2)由于 1,使用无界队列时 maximumPoolSize 将是一个无效参数。
3)由于 1 和 2,使用无界队列时 keepAliveTime 将是一个无效参数。
4)更重要的,使用无界 queue 可能会耗尽系统资源,有界队列则有助于防止资源耗尽,同时即使使用有界队列,也要尽量控制队列的大小在一个合适的范围。
所以我们一般会使用,ArrayBlockingQueue、LinkedBlockingQueue、
SynchronousQueue、PriorityBlockingQueue。
5、threadFactory
创建线程的工厂,通过自定义的线程工厂可以给每个新建的线程设置一个具有识别度的线程名,当然还可以更加自由的对线程做更多的设置,比如设置所有的线程为守护线程。参见代码 cn.enjoyedu.ch6. ThreadPoolAdv
Executors 静态工厂里默认的 threadFactory,线程的命名规则是“pool-数字-thread-数字”。
6、RejectedExecutionHandler
线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了 4 种策略:
(1)AbortPolicy:直接抛出异常,默认策略;
(2)CallerRunsPolicy:用调用者所在的线程来执行任务;
(3)DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
(4)DiscardPolicy:直接丢弃任务;
当然也可以根据应用场景实现 RejectedExecutionHandler 接口,自定义饱和策略,如记录日志或持久化存储不能处理的任务。
四、扩展线程池
能扩展线程池的功能吗?比如在任务执行的前后做一点我们自己的业务工作?实际上,JDK 的线程池已经为我们预留的接口,在线程池核心方法中,有 2个方法是空的,就是给我们预留的。还有一个线程池退出时会调用的方法。参见代码 cn.enjoyedu.ch6. ThreadPoolExt可以看到,每个任务执行前后都会调用 beforeExecute 和 afterExecute 方法。相当于执行了一个切面。而在调用 shutdown 方法后则会调用 terminated 方法。
五、线程池的工作机制
1)如果当前运行的线程少于 corePoolSize,则创建新线程来执行任务(注意,执行这一步骤需要获取全局锁)。
2)如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
3)如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务。
4)如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution()方法。
六、提交任务
execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。submit()方法用于提交需要返回值的任务。线程池会返回一个 future 类型的对象,通过这个 future 对象可以判断任务是否执行成功,并且可以通过 future的 get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
七、关闭线程池
可以通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。它们的原理是遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。但是它们存在一定的区别,shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表,而shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。只要调用了这两个关闭方法中的任意一个,isShutdown 方法就会返回 true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminaed 方法会返回 true。至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用 shutdown 方法来关闭线程池,如果任务不一定要执行完,则可以调用 shutdownNow 方法。
八、合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。
•任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
•任务的优先级:高、中和低。
•任务的执行时间:长、中和短。
•任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。
CPU 密集型任务应配置尽可能小的线程,如配置 Ncpu+1 个线程的线程池。
由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如
2*Ncpu。
混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO
密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐
量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行
分解。可以通过 Runtime.getRuntime().availableProcessors()方法获得当前设备的
CPU 个数。
对于 IO 型的任务的最佳线程数,有个公式可以计算
Nthreads = NCPU * UCPU * (1 + W/C)
其中:
❑NCPU 是处理器的核的数目
❑UCPU 是期望的 CPU 利用率(该值应该介于 0 和 1 之间)
❑W/C 是等待时间与计算时间的比率
等待时间与计算时间我们在 Linux 下使用相关的 vmstat 命令或者 top 命令查
看。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可
以让优先级高的任务先执行。
执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先
级队列,让执行时间短的任务先执行。
依赖数据库连接池的任务,因为线程提交 SQL 后需要等待数据库返回结果,
等待的时间越长,则 CPU 空闲时间就越长,那么线程数应该设置得越大,这样
才能更好地利用 CPU。
建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需
要设大一点儿,比如几千。
假设,我们现在有一个 Web 系统,里面使用了线程池来处理业务,在某些
情况下,系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异
常,通过排查发现是数据库出现了问题,导致执行 SQL 变得非常缓慢,因为后台
任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的
工作线程全部阻塞,任务积压在线程池里。
如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会
撑满内存,导致整个系统不可用,而不只是后台任务出现问题。
九、预定义线程池
FixedThreadPool 详解
创建使用固定线程数的 FixedThreadPool 的 API。适用于为了满足资源管理的
需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。
FixedThreadPool 的 corePoolSize 和 maximumPoolSize 都被设置为创建
FixedThreadPool 时指定的参数 nThreads。
当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程
等待新任务的
最长时间,超过这个时间后多余的线程将被终止。这里把 keepAliveTime 设
置为 0L,意味着多余的空闲线程会被立即终止。
FixedThreadPool 使用有界队列 LinkedBlockingQueue 作为线程池的工作队列
(队列的容量为 Integer.MAX_VALUE)。
SingleThreadExecutor
创建使用单个线程的 SingleThread-Executor 的 API,于需要保证顺序地执行
各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。
corePoolSize 和 maximumPoolSize 被设置为 1。其他参数与 FixedThreadPool
相同。SingleThreadExecutor 使用有界队列 LinkedBlockingQueue 作为线程池的工
作队列(队列的容量为 Integer.MAX_VALUE)。
CachedThreadPool
创建一个会根据需要创建新线程的 CachedThreadPool 的 API。大小无界的线
程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。
corePoolSize 被设置为 0,即 corePool 为空;maximumPoolSize 被设置为
Integer.MAX_VALUE。这里把 keepAliveTime 设置为 60L,意味着 CachedThreadPool
中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
FixedThreadPool 和 SingleThreadExecutor 使用有界队列 LinkedBlockingQueue
作为线程池的工作队列。CachedThreadPool 使用没有容量的 SynchronousQueue
作为线程池的工作队列,但 CachedThreadPool 的 maximumPool 是无界的。这意
味着,如果主线程提交任务的速度高于 maximumPool 中线程处理任务的速度时,
CachedThreadPool 会不断创建新线程。极端情况下,CachedThreadPool 会因为创
建过多线程而耗尽 CPU 和内存资源。 WorkStealingPool
利用所有运行的处理器数目来创建一个工作窃取的线程池,使用 forkjoin 实
现
ScheduledThreadPoolExecutor
使用工厂类 Executors 来创建。Executors 可以创建 2 种类型的
ScheduledThreadPoolExecutor,如下。
•ScheduledThreadPoolExecutor。包含若干个线程的
ScheduledThreadPoolExecutor。
•SingleThreadScheduledExecutor。只包含一个线程的
ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor 适用于需要多个后台线程执行周期任务,同时
为了满足资源管理的需求而需要限制后台线程的数量的应用场景。
SingleThreadScheduledExecutor 适用于需要单个后台线程执行周期任务,同
时需要保证顺序地执行各个任务的应用场景。
提交定时任务
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit
unit)
//向定时任务线程池提交一个延时 Runnable 任务(仅执行一次)
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
//向定时任务线程池提交一个延时的 Callable 任务(仅执行一次)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long
initialDelay, long period, TimeUnit unit)
//向定时任务线程池提交一个固定时间间隔执行的任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long
initialDelay, long delay, TimeUnit unit);
//向定时任务线程池提交一个固定延时间隔执行的任务
固定时间间隔的任务不论每次任务花费多少时间,下次任务开始执行时间从
理论上讲是确定的,当然执行任务的时间不能超过执行周期。
固定延时间隔的任务是指每次执行完任务以后都延时一个固定的时间。由于
操作系统调度以及每次任务执行的语句可能不同,所以每次任务执行所花费的时
间是不确定的,也就导致了每次任务的执行周期存在一定的波动。
定时任务超时问题
scheduleAtFixedRate 中,若任务处理时长超出设置的定时频率时长,本次任
务执行完才开始下次任务,下次任务已经处于超时状态,会马上开始执行。
若任务处理时长小于定时频率时长,任务执行完后,定时器等待,下次任务
会在定时器等待频率时长后执行。
如下例子:
设置定时任务每 60s 执行一次,那么从理论上应该第一次任务在第 0s 开始, 第二次任务在第 60s 开始,第三次任务在 120s 开始,但实际运行时第一次任务
时长 80s,第二次任务时长 30s,第三次任务时长 50s,则实际运行结果为:
第一次任务第 0s 开始,第 80s 结束;
第二次任务第 80s 开始,第 110s 结束(上次任务已超时,本次不会再等待 60s, 会马上开始);
第三次任务第 120s 开始,第 170s 结束. 第四次任务第 180s 开始..... 具体代码,参见 cn.enjoyedu.ch6.schd. ScheduleWorkerTime
十、CompletionService
CompletionService 实际上可以看做是 Executor 和 BlockingQueue 的结合体。
CompletionService 在接收到要执行的任务时,通过类似 BlockingQueue 的 put 和
take 获得任务执行的结果。
CompletionService 的一个实现是 ExecutorCompletionService,
ExecutorCompletionService 把具体的计算任务交给 Executor 完成。
在实现上,ExecutorCompletionService 在构造函数中会创建一个
BlockingQueue(使用的基于链表的 LinkedBlockingQueue),该 BlockingQueue 的
作用是保存 Executor 执行的结果。
当提交一个任务到 ExecutorCompletionService 时,首先将任务包装成
QueueingFuture,它是 FutureTask 的一个子类,然后改写 FutureTask 的 done 方
法,之后把 Executor 执行的计算结果放入 BlockingQueue 中。
与 ExecutorService 最主要的区别在于 submit 的 task 不一定是按照加入时的
顺序完成的。CompletionService 对 ExecutorService 进行了包装,内部维护一个保
存 Future 对象的 BlockingQueue。只有当这个 Future 对象状态是结束的时候,才
会加入到这个 Queue 中,take()方法其实就是 Producer-Consumer 中的 Consumer。
它会从 Queue 中取出 Future 对象,如果 Queue 是空的,就会阻塞在那里,直到
有完成的 Future 对象加入到 Queue 中。所以,先完成的必定先被取出。这样就
减少了不必要的等待时间。
参见代码 cn.enjoyedu.ch6.comps. CompletionCase,我们可以得出结论:
使用方法一,自己创建一个集合来保存 Future 存根并循环调用其返回结果
的时候,主线程并不能保证首先获得的是最先完成任务的线程返回值。它只是按
加入线程池的顺序返回。因为 take 方法是阻塞方法,后面的任务完成了,前面
的任务却没有完成,主程序就那样等待在那儿,只到前面的完成了,它才知道原
来后面的也完成了。
使用方法二,使用 CompletionService 来维护处理线程不的返回结果时,主
线程总是能够拿到最先完成的任务的返回值,而不管它们加入线程池的顺序。