多线程并发编程18-定时线程池ScheduledThreadPoolExecutor

        ScheduledThreadPoolExecutor是一个可以指定一定延迟时间后或者定时进行任务调度执行的线程池,继承于ThreadPoolExecutor,关于ThreadPoolExecutor的介绍请查看前面文章。因为继承于ThreadPoolExecutor,所以ScheduledThreadPoolExecutor内部也有如下参数:

corePoolSize:线程池核心线程个数。

maximumPoolSize:线程池最大线程个数。

keepAliveTime:存活时间,当非核心线程处于闲置状态,这些闲置的非核心线程能存活的最大时间。

unit:存活时间的时间单位。

workQueue:任务队列,用来保存等待执行任务的阻塞队列。

threadFactory:创建线程的工厂。

handler:饱和策略。当线程个数已达到最大线程个数,并且任务队列也已满,继续添加任务则会指定该饱和策略,比如:AbortPolicy(抛出异常)、CallerRunsPolicy(使用调用则所在线程来运行任务)、DiscardOldestPolicy(调用poll丢弃一个任务,执行当前任务)和DiscardPolicy(默默丢弃,不抛异常)。

不同的是ScheduledThreadPoolExecutor无法指定maximumPoolSize最大线程数,默认使用Integer.MAX_VALUE,也就相当于没有线程个数的限制。线程池的队列是DelayWorkQueue,DelayWorkQueue是一种以定时过期时间进行比较的优先级阻塞队列,最快要过期的元素放到队首。从ScheduledThreadPoolExecutor的构造函数就能看出差异,

public ScheduledThreadPoolExecutor(int corePoolSize) {

    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,

          new DelayedWorkQueue());

}

    下面从提交任务的入口函数进行讲解ScheduledThreadPoolExecutor的内部原理。

public ScheduledFuture schedule(Runnable command,  long delay,TimeUnit unit) 

    该方法是三种提交任务的其中一种,任务从提交时间算起延迟单位为unit的delay时间后开始执行,提交的任务为非周期性任务,任务只执行一次。

public ScheduledFuture<?> schedule(Runnable command,  long delay,TimeUnit unit) {

//(1)参数校验。

    if (command == null || unit == null)

        throw new NullPointerException();

//(2)任务转换。 triggerTime函数将延迟时间转换为触发的时间戳。

    RunnableScheduledFuture<?> t = decorateTask(command,

        new ScheduledFutureTask<Void>(command, null,

                                      triggerTime(delay, unit)));

//(3)添加任务到延迟队列。

    delayedExecute(t);

    return t;

}

private void delayedExecute(RunnableScheduledFuture<?> task)

    周期性和非周期性任务的执行主入口。

private void delayedExecute(RunnableScheduledFuture<?> task) {

//(1)如果线程池shutdown状态,则通过指定的饱和策略进行拒绝任务。

    if (isShutdown())

        reject(task);

    else {

//(2)将任务添加到阻塞队列。

        super.getQueue().add(task);

//(3)再次执行检查,如果线程池shutdown状态,则将task从阻塞队列中移除,而task有可能已经执行,所以还需要取消task。

        if (isShutdown() &&

            !canRunInCurrentRunState(task.isPeriodic()) &&

            remove(task))

            task.cancel(false);

        else

//(4)创建一个工作线程。注意,在ThreadPoolExecutor中添加一个工作线程的时候会将当前task作为新增工作线程的第一个任务进行执行, 而这里并没有将新增的task作为参数传入作为新增工作线程的第一个工作线程,这是因为当前新增的task可能没有达到过期时间并不能执行。

            ensurePrestart();

    }

}

    上面说完了如何提交非周期性任务,下面来看看线程池里的工作线程如何获取并执行任务。在ThreadPoolExecutor讲解时说过,工作线程执行run方法,持续从阻塞队列中获取task,并调用task的run方法进行执行任务。所以下面看看ScheduledFutureTask。

    ScheduledFutureTask继承FutureTask,内部有如下7种状态:

private static final int NEW          =0;    //初始状态

private static final int COMPLETING  =1;    //执行中状态

private static final int NORMAL      =2;    //正常运行结束状态

private static final int EXCEPTIONAL  =3;    //运行中异常

private static final int CANCELLED    =4;    //任务被取消

private static final int INTERRUPTING =5;    //任务正在被中断

private static final int INTERRUPTED  =6;    //任务已经被中断

 重写compareTo方法,在加入元素到延迟队里中让最快过期的元素放到队列。由于工作线程是调用ScheduledFutureTask的run方法进行执行task的,所以下面看看run方法。

public void run()

    执行task任务。

public void run() {

//(1)判断是否是周期行任务。

    boolean periodic = isPeriodic();

//(2)判断线程池shutdown时是否需要取消任务。

    if (!canRunInCurrentRunState(periodic))

        cancel(false);

//(3)执行非周期行任务,正常执行会将状态从NEW-> COMPLETING ->NORMAL  执行失败会将状态从NEW-> COMPLETING -> EXCEPTIONAL 。

    else if (!periodic)

        ScheduledFutureTask.super.run();

//(4)执行周期行任务,只是执行任务并不会将状态进行改变,为了让任务成为可重复的执行任务。

    else if (ScheduledFutureTask.super.runAndReset()) {

//(5)设置下一次执行的时间

        setNextRunTime();

//(6)将任务重新添加到延迟队列中。

        reExecutePeriodic(outerTask);

    }

}

public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit) 

    另一种提交任务的方式,任务从提交时间算起延迟单位为unit的initialDelay时间后开始执行,提交的任务为周期性任务,当任务执行完之后,让其延迟固定单位为unit的delay时间后再次运行任务。任务会一直重复直到任务运行中抛出了异常、被取消或关闭线程池。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,long delay,TimeUnit unit) {

//(1)检查参数

    if (command == null || unit == null)

        throw new NullPointerException();

    if (delay <= 0)

        throw new IllegalArgumentException();

//(2)任务转换,注意这里period= -delay<0,说明该任务为固定延迟的周期性任务,与非周期性任务不同的是,在ScheduledFutureTask的run方法中调用的是 runAndReset。

    ScheduledFutureTask<Void> sft =

        new ScheduledFutureTask<Void>(command,

                                      null,

                                      triggerTime(initialDelay, unit),

                                      unit.toNanos(-delay));

    RunnableScheduledFuture<Void> t = decorateTask(command, sft);

    sft.outerTask = t;

//(3)添加任务到队列。

    delayedExecute(t);

    return t;

}

public ScheduledFuture scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) 

    第三种提交任务方式,任务从提交时间算起延迟单位为unit的initialDelay时间后开始执行,然后从initialDelay+period时间点再次执行。任务会一直重复直到任务运行中抛出了异常、被取消或关闭线程池。如果当前任务还没有执行完,下一次要执行任务的时间到了,并不会并发执行,下次要执行的任务会延迟执行,要等到当前任务执行完毕后再执行。

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay, long period,TimeUnit unit) {

//(1)检查参数。

    if (command == null || unit == null)

        throw new NullPointerException();

    if (period <= 0)

        throw new IllegalArgumentException();

//(2)任务转换,注意这里period= >0,说明该任务为固定频率的周期性任务,与非周期性任务不同的是,在ScheduledFutureTask的run方法中调用的是 runAndReset。

    ScheduledFutureTask<Void> sft =

        new ScheduledFutureTask<Void>(command,

                                      null,

                                      triggerTime(initialDelay, unit),

                                      unit.toNanos(period));

    RunnableScheduledFuture<Void> t = decorateTask(command, sft);

    sft.outerTask = t;

//(3)添加任务到队列。

    delayedExecute(t);

    return t;

}

    ScheduledThreadPoolExecutor内部使用DelayQueue来存储任务,任务在队列内部以过期时间进行排序,即最快要过期的元素放在队首,任务分为三种,非周期任务、固定延迟的周期任务和固定频率的周期任务。任务类型使用period的值来区分。

    今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容