在Java中,使用线程来执行异步任务。Java线程的创建于销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建和销毁将消耗大量的计算资源。同时,为每一个任务创建新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。
Java的线程既是工作单元,也是执行机制。从JDK1.5开始,把工作单元与执行机制分离开来。工作单元包括Runnable和Callable,而执行机制由Executor框架提供。
10.1 Executor框架简介
Executor框架主要由3大部分组成:
(1)任务。包括被执行的任务需要实现的接口:Runnable和Callable。(2)任务的执行。包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口:ThreadPoolExecutor和ScheduledThreadPoolExecutor。
(3)异步计算的结果。包括接口Future和实现Future接口的FutureTask类。
ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务。
ScheduledThreadPoolExecutor可以在给定的延迟后运行命令,或者定期执行命令。
10.2 ThreadPoolExecutor
Executors可以创建3中不同类型的ThreadPoolExecutor:SingleThreadExecutor,FixedThreadPool和CachedThreadPool。
(1)newFixedThreadPool(int nThreads):创建使用固定线程数的线程池。它适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,适合负载较重的服务器。
(2)newSingleThreadExecutor():创建使用单个线程的线程池。适用于需要保证顺序的执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景。
(3)newCachedThreadPool():创建一个大小无界的线程池。适用于执行大量短期的异步执行的任务,适合负载较轻的服务器。由于使用SynchronousQueue,并且maximumPoolSize无限,keepAliveTime为60s,因此吞吐量最好。与FixedThreadPool不同,当新的任务到来,如果有线程空闲,那么空闲的线程会直接接受该任务,如果没有空闲的线程,线程池可以无限创建新的线程。
10.2.1 newFixedThreadPool(int nThreads)
(1)return new ThreadPoolExecutor(nThreads,nThreads,0L,TimeUtil.MILLSECONDS,new LinkedBlockingQueue<Runnable>)
(2)FixedThreadPool的corePoolSize和maximumPoolSize都被设置为nThreads,keepAliveTime被设置为0,意味着多余的空闲线程会被立刻终止。
(3)由于使用了无界队列,因此maximumPoolSize是一个无效参数。
10.2.2 SingleThreadPool
(1)SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1,其他参数和FixedThreadPool相同。
10.2.3CachedThreadPool
(1)return new ThreadPoolExecutor(0,Integer.MAX_VALUE,60L,TimeUtil.SECONDS,new SynchronousQueue<Runnable>());
(2)SynchronousQueue是一个没有容量的工作队列,而maximumPoolSize被设置为max,这意味着,如果主线程提交任务的速度高于线程池中处理任务的速度,cachedThreadPool会不断创建新的线程。
(3)对于一个新的task,首先执行Synchronous.offer(Runnable task),如果线程池中有空闲线程正在执行Synchronous.pool(timeAlive),那么主线程执行offer操作与空闲线程执行poll操作匹配成功,任务交给空闲线程执行,execute方法执行完成。
(4)当线程池为空或者线程池中没有空闲线程时,这种情况下offer操作失败,此时线程池创建新的线程执行任务,execute方法执行完成。
(5)执行任务的线程执行完毕后,会执行Synchronous.pool(timeAlive)操作,这个poll操作会让空闲线程最多等待60秒,如果60秒内没有获得新的任务执行,那么这个空闲线程将被终止。
10.3 ScheduledThreadPoolExecutor
Executors可以创建两种类型的ScheduledThreadPoolExecutor:
(1)public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建固定线程数量的ScheduledThreadPool。适用于需要多个后台线程执行周期性任务,同时为了满足资源管理的需求而限制后台的线程数量。
(2)public static ScheduledExecutorService newSingleThreadScheduledExecutor()
适用于需要单个后台线程执行周期性任务,同时需要保证顺序的执行各个任务的场景。
10.3.1 ScheduledThreadPoolExecutor详解
ScheduledThreadPoolExecutor的执行主要分为两步:
(1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue(无界阻塞队列)添加一个ScheduledFutureTask。
(2)线程池从DelayQueue中获取ScheduledFutureTask,然后执行任务。
ScheduledFutureTask主要包含3个变量
(1)long time:任务将要被执行的时间
(2)long sequenceNumber:任务ID
(3)long period:任务执行的间隔周期
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask排序,time小的在前面。
ScheduledThreadPoolExecutor的某个线程执行周期性任务的4个步骤:
(1)线程从DelayQueue中获取已到期的ScheduledFutureTask。
(2)线程执行这个task
(3)线程修改这个task的time为下次要执行的时间。
(4)把修改后的task放回DelayQueue中。
ScheduledThreadPoolExecutor的某个线程从DelayQueue中获取 ScheduledFutureTask的过程:
(1)获取Lock。
(2)如果PriorityQueue为空,则当前线程到Condition中等待。
(3)如果PriorityQueue的头元素的time比当前时间大,则在Condition中等待到time时间点。
(4)获取PriorityQueue的头元素,如果PriorityQueue不为空,则唤醒在Condition中等待的所有线程。
(5)释放Lock。
ScheduledThreadPoolExecutor向DelayQueue中添加 ScheduledFutureTask的过程:
(1)获取Lock。
(2)向PriorityQueue中添加任务。
(3)如果添加的任务是头元素,唤醒所有等待在Condition中的线程。
(4)释放Lock。