多线程并发编程15-DelayQueue源码剖析

    今天来说一说DelayQueue,DelayQueue并发队列是一个无界阻塞延迟队列,队列中的每个元素都有一个过期时间,当从队列获取元素时,只有过期元素才会出队列,不允许存放null元素。队列头元素是最快要过期的元素。

    DelayQueue内部有一个PriorityQueue优先队列,存入到该队列的元素都实现Delayed接口,由于每个原始都有一个过期时间,所以要实现获取当前元素还剩多少时间就过期的接口,由于元素是存放在PriorityQueue优先队列中的,所以需要实现元素之间相互比较的接口。

    DelayQueue内部实现了一个Leader-Follower的模式,用于尽量减少不必要的线程等待。当一个线程调用队列的take方法变为leader线程后,它会调用条件变量available.awaitNanos(delay)等待delay时间,但其他线程(follower线程)则会调用available.await()进行无限等待。leader线程延迟时间过期后,会退出take方法,并通过调用available.signal()方法唤醒一个follower线程,被唤醒的follower线程被选举为leader线程。当向队列插入一个过期时间比头元素过期时间还短的元素时,leader会被重置为null。

    DelayQueue内部主要的成员变量:

private final transient ReentrantLock lock = new ReentrantLock();

private final PriorityQueue<E> q = new PriorityQueue<E>();

private Thread leader = null;

private final Condition available = lock.newCondition();

    下面对主要函数原理进行讲解。

offer(E e)

    插入元素到队列,当元素e为null时会抛出NullPointException异常,否则由于是无界队列,所以一直返回true。插入的元素需要实现Delayed接口。

public boolean offer(E e) {

//(1)尝试获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)将元素放入PriorityQueue优先队列中,如果e为null则会抛出NullPointException异常。

        q.offer(e);

//(3)如果当前插入的元素成为队列中的第一个节点(即插入元素的过期时间比队列中的所有元素的过期时间都小)则将leader设置为null,并调用available.signal()方法激活available条件队列中的一个线程。

        if (q.peek() == e) {

            leader = null;

            available.signal();

        }

        return true;

    } finally {

//(4)释放锁。

        lock.unlock();

    }

}

take()

    从队列中获取并移除延迟时间过期的元素,当队列中没有元素或没有延迟时间过去的元素则会进行阻塞。

public E take() throws InterruptedException {

//(1)尝试获取独占锁 ,调用的是lockInterruptibly 方法,所以会被中断。

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        for (;;) {

//(2)如果队列中没有元素则调用available.await()进行阻塞。

            E first = q.peek();

            if (first == null)

                available.await();

            else {

//(3)如果获取队列的第一个元素的延迟时间已到期,则返回该元素并从队列中移除。

                long delay = first.getDelay(NANOSECONDS);

                if (delay <= 0)

                    return q.poll();

//(4)这里有个小细节,当要进行阻塞等待的时候将first 变量置为null不引用队列中的元素。

                first = null; 

//(5)如果当前线程不是leader线程,则调用available.await()方法持续阻塞等待,直到被唤醒。

                if (leader != null)

                    available.await();

                else {

//(6)如果当前线程是leader线程,而调用available.awaitNanos(delay)方法,阻塞等待指定的过期时间。

                    Thread thisThread = Thread.currentThread();

                    leader = thisThread;

                    try {

                        available.awaitNanos(delay);

                    } finally {

                        if (leader == thisThread)

                            leader = null;

                    }

                }

            }

        }

    } finally {

        if (leader == null && q.peek() != null)

//(7)如果代码执行到这,说明当前线程从队列中移除过期元素了,但是队列中还存在元素,则需要调用available.signal() 方法,唤醒available条件队列中阻塞的线程去获取队列中的元素。

            available.signal();

//(8)释放锁。

        lock.unlock();

    }

}

poll()

    尝试获取并移除优先级队列中的头元素,如果优先级队列为空或者头元素的延迟时间还没有过期则返回null,否则返回头元素并从优先级队列中移除。

public E poll() {

//(1)尝试获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)获取优先级队列中的第一个元素。

        E first = q.peek();

//(3)如果第一个元素为null(即优先级队列中无元素)或第一个元素的延迟时间没有过期则返回null,否则返回头元素并将头元素从优先级队列中移除。

        if (first == null || first.getDelay(NANOSECONDS) > 0)

            return null;

        else

            return q.poll();

    } finally {

//(4)释放锁。

        lock.unlock();

    }

}

size()

    获取队列中的元素个数,由于在获取元素个数前需要先获取锁,所有size()方法返回的元素个数是精准的。

public int size() {

//(1)尝试获取独占锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)获取优先级队列中的元素个数。

        return q.size();

    } finally {

//(3)释放锁。

        lock.unlock();

    }

}

    DelayQueue阻塞无界延迟队列,内部使用PriorityQueue优先级队列进行存储元素,根据元素的延迟时间进行排序,使用ReentrantLock独占锁实现线程同步。

    今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342