ScheduledThreadPoolExecutor是一个可以指定一定延迟时间后或者定时进行任务调度执行的线程池,继承于ThreadPoolExecutor,关于ThreadPoolExecutor的介绍请查看前面文章。因为继承于ThreadPoolExecutor,所以ScheduledThreadPoolExecutor内部也有如下参数:
corePoolSize:线程池核心线程个数。
maximumPoolSize:线程池最大线程个数。
keepAliveTime:存活时间,当非核心线程处于闲置状态,这些闲置的非核心线程能存活的最大时间。
unit:存活时间的时间单位。
workQueue:任务队列,用来保存等待执行任务的阻塞队列。
threadFactory:创建线程的工厂。
handler:饱和策略。当线程个数已达到最大线程个数,并且任务队列也已满,继续添加任务则会指定该饱和策略,比如:AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用则所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)和DiscardPolicy(默默丢弃,不抛异常)。
不同的是ScheduledThreadPoolExecutor无法指定maximumPoolSize最大线程数,默认使用Integer.MAX_VALUE,也就相当于没有线程个数的限制。线程池的队列是DelayWorkQueue,DelayWorkQueue是一种以定时过期时间进行比较的优先级阻塞队列,最快要过期的元素放到队首。从ScheduledThreadPoolExecutor的构造函数就能看出差异,
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
下面从提交任务的入口函数进行讲解ScheduledThreadPoolExecutor的内部原理。
public ScheduledFuture schedule(Runnable command, long delay,TimeUnit unit)
该方法是三种提交任务的其中一种,任务从提交时间算起延迟单位为unit的delay时间后开始执行,提交的任务为非周期性任务,任务只执行一次。
public ScheduledFuture<?> schedule(Runnable command, long delay,TimeUnit unit) {
//(1)参数校验。
if (command == null || unit == null)
throw new NullPointerException();
//(2)任务转换。 triggerTime函数将延迟时间转换为触发的时间戳。
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
//(3)添加任务到延迟队列。
delayedExecute(t);
return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task)
周期性和非周期性任务的执行主入口。
private void delayedExecute(RunnableScheduledFuture<?> task) {
//(1)如果线程池shutdown状态,则通过指定的饱和策略进行拒绝任务。
if (isShutdown())
reject(task);
else {
//(2)将任务添加到阻塞队列。
super.getQueue().add(task);
//(3)再次执行检查,如果线程池shutdown状态,则将task从阻塞队列中移除,而task有可能已经执行,所以还需要取消task。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//(4)创建一个工作线程。注意,在ThreadPoolExecutor中添加一个工作线程的时候会将当前task作为新增工作线程的第一个任务进行执行, 而这里并没有将新增的task作为参数传入作为新增工作线程的第一个工作线程,这是因为当前新增的task可能没有达到过期时间并不能执行。
ensurePrestart();
}
}
上面说完了如何提交非周期性任务,下面来看看线程池里的工作线程如何获取并执行任务。在ThreadPoolExecutor讲解时说过,工作线程执行run方法,持续从阻塞队列中获取task,并调用task的run方法进行执行任务。所以下面看看ScheduledFutureTask。
ScheduledFutureTask继承FutureTask,内部有如下7种状态:
private static final int NEW =0; //初始状态
private static final int COMPLETING =1; //执行中状态
private static final int NORMAL =2; //正常运行结束状态
private static final int EXCEPTIONAL =3; //运行中异常
private static final int CANCELLED =4; //任务被取消
private static final int INTERRUPTING =5; //任务正在被中断
private static final int INTERRUPTED =6; //任务已经被中断
重写compareTo方法,在加入元素到延迟队里中让最快过期的元素放到队列。由于工作线程是调用ScheduledFutureTask的run方法进行执行task的,所以下面看看run方法。
public void run()
执行task任务。
public void run() {
//(1)判断是否是周期行任务。
boolean periodic = isPeriodic();
//(2)判断线程池shutdown时是否需要取消任务。
if (!canRunInCurrentRunState(periodic))
cancel(false);
//(3)执行非周期行任务,正常执行会将状态从NEW-> COMPLETING ->NORMAL 执行失败会将状态从NEW-> COMPLETING -> EXCEPTIONAL 。
else if (!periodic)
ScheduledFutureTask.super.run();
//(4)执行周期行任务,只是执行任务并不会将状态进行改变,为了让任务成为可重复的执行任务。
else if (ScheduledFutureTask.super.runAndReset()) {
//(5)设置下一次执行的时间。
setNextRunTime();
//(6)将任务重新添加到延迟队列中。
reExecutePeriodic(outerTask);
}
}
public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit)
另一种提交任务的方式,任务从提交时间算起延迟单位为unit的initialDelay时间后开始执行,提交的任务为周期性任务,当任务执行完之后,让其延迟固定单位为unit的delay时间后再次运行任务。任务会一直重复直到任务运行中抛出了异常、被取消或关闭线程池。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit) {
//(1)检查参数。
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
//(2)任务转换,注意这里period= -delay<0,说明该任务为固定延迟的周期性任务,与非周期性任务不同的是,在ScheduledFutureTask的run方法中调用的是 runAndReset。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//(3)添加任务到队列。
delayedExecute(t);
return t;
}
public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit)
第三种提交任务方式,任务从提交时间算起延迟单位为unit的initialDelay时间后开始执行,然后从initialDelay+period时间点再次执行。任务会一直重复直到任务运行中抛出了异常、被取消或关闭线程池。如果当前任务还没有执行完,下一次要执行任务的时间到了,并不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) {
//(1)检查参数。
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
//(2)任务转换,注意这里period= >0,说明该任务为固定频率的周期性任务,与非周期性任务不同的是,在ScheduledFutureTask的run方法中调用的是 runAndReset。
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
//(3)添加任务到队列。
delayedExecute(t);
return t;
}
ScheduledThreadPoolExecutor内部使用DelayQueue来存储任务,任务在队列内部以过期时间进行排序,即最快要过期的元素放在队首,任务分为三种,非周期任务、固定延迟的周期任务和固定频率的周期任务。任务类型使用period的值来区分。
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。