[怀旧并发09]分析Java延迟与周期任务的实现原理

Java并发编程源码分析系列:

延迟或周期执行任务可以使用Timer或者ScheduledThreadPoolExecutor,前者可以抛弃,后者是今天的主角。

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,对应执行任务变成ScheduledFutureTask。本文会在前三篇分析线程池原理的基础上,分析ScheduledThreadPoolExecutor的实现原理,最后介绍下为什么不用Timer了。

ScheduledThreadPoolExecutor的创建

ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
ScheduledExecutorService singleScheduledThreadPool = Executors.newSingleThreadScheduledExecutor();

public ScheduledThreadPoolExecutor(int corePoolSize,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue(), threadFactory, handler);
}

ScheduledThreadPoolExecutor的创建可以使用Executors,也可以自己传参构建。上面的构造函数是参数最全的版本,可以设置线程目标数量、线程工厂和饱和策略。至于等待队列,使用内部类DelayedWorkQueue,看后文分析。

ScheduledFutureTask

ScheduledFutureTask的构造函数没什么特别,保存了三个参数。

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
}

ScheduledFutureTask(Callable<V> callable, long ns) {
    super(callable);
    this.time = ns;
    this.period = 0;
    this.sequenceNumber = sequencer.getAndIncrement();
}
  • time:任务执行时间;
  • period:任务周期执行间隔;
  • sequenceNumber:自增的任务序号。

Callable默认period=0,表示任务不是周期执行,因为只有Runnable可以周期执行。想想也是,Callable目的是获得执行结果,没有必要重复调用。

图1

ScheduledFutureTask继承了我们熟悉的FutureTask,这个不用多说。图1是它实现的接口,比较陌生的是Delayed,而Delayed又继承了Comparable。

public long getDelay(TimeUnit unit) {
        return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

这两个接口的存在很容易理解,ScheduledFutureTask在等待队列里调度不再按照FIFO,而是按照执行时间,谁即将执行,谁就排在前面。在这里也可以看到sequenceNumber的作用,当执行时间相同时,按照序号排序。

添加延迟任务

对ScheduledThreadPoolExecutor使用通用的execute或者submit提交任务,最终调用schedule方法,默认马上执行。如果需要延迟执行,需要直接使用schedule,传递时间参数。

public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                       long delay,
                                       TimeUnit unit) {
    if (callable == null || unit == null)
        throw new NullPointerException();
    RunnableScheduledFuture<V> t = decorateTask(callable,
        new ScheduledFutureTask<V>(callable,
                                   triggerTime(delay, unit)));
    delayedExecute(t);
    return t;
}

Runnable和Callable包装成ScheduledFutureTask实例,保存了延迟信息,然后执行delayedExecute。

private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}

如果线程池已经关闭,直接调用饱和策略,否则将任务加入等待队列。加入之后,需要再判断线程池的状态,和当前任务是否能运行。如果不能继续执行,将任务移出队列并取消任务。

canRunInCurrentRunState处理任务加入等待队列后,又未执行就发生线程池关闭的情况,它通过预设的两个变量判断任务到底能不能执行。

  • 延迟任务用executeExistingDelayedTasksAfterShutdown
  • 周期任务用continueExistingPeriodicTasksAfterShutdown
void ensurePrestart() {
    int wc = workerCountOf(ctl.get());
    if (wc < corePoolSize)
        addWorker(null, true);
    else if (wc == 0)
        addWorker(null, false);
}

最后调用到ensurePrestart,使用addWorkder增加工作线程,这在ThreadPoolExecutor解释过了

添加周期任务

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                             long initialDelay,long period,TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (period <= 0)
       throw new IllegalArgumentException();
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   delayedExecute(t);
   return t;
}

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
                                                long delay,TimeUnit unit) {
   if (command == null || unit == null)
       throw new NullPointerException();
   if (delay <= 0)
       throw new IllegalArgumentException();
   ScheduledFutureTask<Void> sft =
       new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
                                     unit.toNanos(-delay));
   RunnableScheduledFuture<Void> t = decorateTask(command, sft);
   sft.outerTask = t;
   delayedExecute(t);
   return t;
}

执行周期任务有上面两个方法,具体作用方法名写得很清楚:

  • scheduleAtFixedRate:按固定的频率执行,不受执行时长影响,到点就执行;
  • scheduleWithFixedDelay:任务执行完后,按固定的延后时间再执行。

两个方法几乎一样,不同的是构建ScheduledFutureTask时,period一个传正数,另一个传负数。不用怀疑,区分两种情况就是用正负。

等待队列

线程池的等待队列使用了内部类DelayedWorkQueue,和普通线程池等待队列最大的不同是它的任务是按照目标执行时间进行排序。

入队的offer被重写了,add和put方法也是调用offer,具体BlockingQueue的实现逻辑不在这里讨论,重点是看offer里的siftUp方法。

private void siftUp(int k, RunnableScheduledFuture<?> key) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        RunnableScheduledFuture<?> e = queue[parent];
        if (key.compareTo(e) >= 0)
            break;
        queue[k] = e;
        setIndex(e, k);
        k = parent;
    }
    queue[k] = key;
    setIndex(key, k);
}

siftUp根据任务的compareTo,将任务移动到队列中指定的位置,就是这样。

对应地,出队take方法,根据任务的delay时间,小于等于0时将任务出队,否则等待。

任务执行

当线程池从等待队列取出一个任务时,会执行它的run方法。

public void run() {
    boolean periodic = isPeriodic();
    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        ScheduledFutureTask.super.run();
    else if (ScheduledFutureTask.super.runAndReset()) {
        setNextRunTime();
        reExecutePeriodic(outerTask);
    }
}

方法有三个分支,第一个if判断任务在当前线程池状态下是否能执行,canRunInCurrentRunState已经讲解过。第二个if是判断是否周期任务,不是的话直接执行,不需要多余的操作。重点来看第三个if,也就是周期执行任务。

  1. runAndReset:任务执行完重置为初始状态,等待下一次执行;
  2. setNextRunTime:计算下次执行时间;
  3. reExecutePeriodic:再调度任务。
private void setNextRunTime() {
    long p = period;
    if (p > 0)
        time += p;
    else
        time = triggerTime(-p);
}

计算下次执行时间,period根据正负有不同的计算逻辑,负的时间也会先改正,很明显对应上文的scheduleAtFixedRate和scheduleWithFixedDelay两个方法。

void reExecutePeriodic(RunnableScheduledFuture<?> task) {
    if (canRunInCurrentRunState(true)) {
        super.getQueue().add(task);
        if (!canRunInCurrentRunState(true) && remove(task))
            task.cancel(false);
        else
            ensurePrestart();
    }
}

将任务重新加入等待队列,中间几个方法都解释过了。

Timer的缺陷

自从知道ScheduledThreadPoolExecutor,再没有使用Timer,因为它有几个缺陷:

  • 多任务在单线程里执行,一个任务结束,另一个任务才能开始,时间间隔不准;
  • 出现异常会导致全部任务停止;
  • 绝对时间,受系统时间影响。
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);

Timer的代码很简单,主要数据结构是一个任务队列和一个执行线程。新增的任务会加入任务队列,到达时间后,由执行线程执行。只有一个线程,很容易理解上面讲的缺陷。

ScheduledThreadPoolExecutor每个任务都有对应的执行线程,时间使用相对时间计算,也就没有上面的缺陷,所以没有理由使用Timer了。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容