【并发编程系列9】阻塞队列之PriorityBlockingQueue,DelayQueue原理分析

前言

前面我们介绍了ArrayBlockingQueue,LinkedBlockingQueue,LinkedBlockingDeque三种阻塞队列,今天继续介绍PriorityBlockingQueue和DelayQueue两个阻塞队列,在介绍这两个阻塞队列之前,需要先了解一种数据结构:二叉堆。因为PriorityBlockingQueue内部使用了最小二叉堆算法来保证每次弹出的元素是最小元素,而DelayQueue又依赖于PriorityBlockingQueue。

二叉堆

堆的数据结构是一颗完全二叉树,完全二叉树指的是除了最后一层,其余层均有左右子节点。二叉堆又可以分为最大二叉堆和最小二叉堆。

最大二叉堆的某个结点的值最多与其父结点一样大,最小堆则是某个结点的值最多与其父结点一样小。所以最大堆中最大的结点永远是根结点,最小堆中最小的结点永远是根节点。

用数组(下标从1开始算)来存储二叉堆的话有以下几个特性:

  • 第n个位置的子节点分别在index[2n]和index[2n+1]。如index[1]位置的子节点在index[2]和index[3],而index[2]位置的子节点为index[4]和[5],以此类推。
  • 叶子节点的下标为index[n/2+1]到index[n]。如一个长度为9的数组中,index[5]到index[9]位置的元素均属于叶子节点。
  • 第n个位置(非根节点)的父节点为:n/2

如下图就是一个二叉堆(圆圈内的数字表示数组下标,并不表示真实元素的值):


在这里插入图片描述

比如说4这个位置,他的左右子节点就是24和24+1,即8和9
n/2+1=5,说明从5到n即9都是叶子节点。

堆有三种基本操作:初始化,上沉,下浮(下面以最小二叉堆为例来说明):

  • 初始化:将一个无序的数组初始化成堆。从最后一个非叶子结点开始,将父节点和子节点进行比较,如果父节点大于子节点,则将父节点和子节点替换,确保父节点<=子节点
  • 上沉:用于插入元素。在数组的末尾插入新的元素,然后和父节点比较,如果比父节点大,则插入完成,如果比父节点小,则交换位置,并以此类推。
  • 下浮:用于移除元素。移除头节点元素,此时会将数组末尾的数据拿过来先放到头节点,然后和子节点进行比较,如果比子节点大,则交换位置,以此类推。

注意:二叉堆和二叉树不一样,二叉堆并不保证左右节点的大小

PriorityBlockingQueue

PriorityBlockingQueue是一个支持优先级的无界阻塞队列(大小受限于内存)。和前面介绍的三种有界队列相比,无界队列的最大区别是即使初始化的时候指定了长度,那么当队列元素达到上限后队列也会自动进行扩容,所以PriorityBlockingQueue在添加元素的时候不会发生阻塞,而如果扩容后的大小超过了内存限制,会抛出OutOfMemoryError错误。

默认情况下PriorityBlockingQueue队列元素采取自然顺序升序排列。也可以自定义类实现compareTo()方法来指定元素排序规则,或者在初始化时,可以指定构造参数Comparator来对元素进行排序。
注意:PriorityBlockingQueue不能保证相同优先级元素的顺序(即两个值排序一样时,不保证顺序)。

下面还是先来看看PriorityBlockingQueue类图:


在这里插入图片描述

可以看到提供了4个 构造器:

  • PriorityBlockingQueue():
    初始化一个默认大小(11)长度的队列,并使用默认自然排序。
  • PriorityBlockingQueue(int):
    初始化一个指定大小的长度的队列,并使用默认自然排序。
  • PriorityBlockingQueue(int,Comparator):
    初始化一个指定大小的队列,并按照指定比较器进行排序。
  • PriorityBlockingQueue(Collection):
    根据传入的集合进行初始化并堆化,如果当前集合是SortedSet或者PriorityBlockingQueue类型,则保持原有顺序,否则使用自然排序进行堆化。

初始化

前面两个构造器最后都会调用第三个构造器去初始化一个队列:


在这里插入图片描述

我们看到只有一个Condition队列,这个是用来阻塞出队线程的,入队线程不会被阻塞。
接下来我们主要看看第4个构造器,是如何初始化一个队列的:

public PriorityBlockingQueue(Collection<? extends E> c) {
        this.lock = new ReentrantLock();
        this.notEmpty = lock.newCondition();
        boolean heapify = true; //true表示需要堆化即需要重排序
        boolean screen = true;  //true表示需要筛选空值
        if (c instanceof SortedSet<?>) {
            SortedSet<? extends E> ss = (SortedSet<? extends E>) c;
            this.comparator = (Comparator<? super E>) ss.comparator();
            heapify = false;//如果比较器是SortedSet类型则不需要堆化
        }
        else if (c instanceof PriorityBlockingQueue<?>) {
            PriorityBlockingQueue<? extends E> pq =
                (PriorityBlockingQueue<? extends E>) c;
            this.comparator = (Comparator<? super E>) pq.comparator();
            screen = false;//如果比较器是PriorityBlockingQueue类型则不需要筛选空值
            if (pq.getClass() == PriorityBlockingQueue.class) // exact match
                heapify = false;//如果pq就是一个PriorityBlockingQueue则不需要堆化
        }
        Object[] a = c.toArray();
        int n = a.length;
        // If c.toArray incorrectly doesn't return Object[], copy it.
        if (a.getClass() != Object[].class)
            a = Arrays.copyOf(a, n, Object[].class);//如果c.torray()失败,重新复制一个数组
        if (screen && (n == 1 || this.comparator != null)) {//??
            for (int i = 0; i < n; ++i)
                if (a[i] == null)
                    throw new NullPointerException();
        }
        this.queue = a;
        this.size = n;
        if (heapify)
            heapify();//堆化(排序)
    }

上面标注了?的if判断,没想到什么场景会发生,如果有知道的,恳请留言告知,非常感谢!

这一段代码看起来很长,实际上就是将指定的集合赋值给队列,并确认排序规则,如果需要排序,则调用heapify()方法,这个初始化排序才是关键:


在这里插入图片描述

下图就是一个数组[8,5,2,7,6,4,1,9,3]的二叉堆表现形式:

在这里插入图片描述

首先看到代码437行,从二叉堆的特性知道,二叉堆的初始化会从最后一个非叶子节点开始,也就是n/2开始,但是因为这种算法是基于元素从1开始算的,而数组是从0开始,所以这里需要减1,也就是从下图中的位置3(元素7)开始往前面循环。

后面就是两个排序规则判断,代码逻辑是一样的,我们进入siftDownComparable方法,这个方法主要就是完成元素的下沉操作


在这里插入图片描述

主要逻辑为:

  1. 将当前循环节点的左右子节点比较,确保拿到最小子节点的下标child
  2. 再将child对应的元素和父节点比较,确保父节点<最小子节点
  3. 最后会再次确认当前元素与最小子节点(可能是左也可能是右)的子节点(如果有的话)进行大小比较,依此类推,完成元素下沉。

注意:除了阻塞队列,我还分享了最新Java架构项目实战教程+大厂面试题库,有兴趣的点击此处免费获取,没基础勿进!

第一次下沉

从元素7开始循环,首先将元素9和元素3比较,发现9>3,临时变量替换一下,
然后将元素7和元素3比较,发现7>3,所以直接将父节点和右子节点替换,完成了第一次循环(因为子节点已经是叶子节点了,所以不满足二次循环条件)。


在这里插入图片描述

第二次下沉

第二次循环就到了下标2的位置,也就是元素2,和第一次循环类似,因为子节点是叶子节点,所以也是一次循环就结束,直接完成了父节点和最小子节点的替换,升级过程如下:


在这里插入图片描述

第三次下沉

第三次循环就到了下标1的位置,也就是元素5,这时候因为左子节点本来就小于右子节点,所以不需要临时替换,直接比较左子节点和父子节点,注意这里图2是一个临时过程,因为是首先将左子节点赋值给父节点,然后发现左子节点下面还有子节点会再进行一次循环,直到通过break跳出循环之后才会将5赋值给左子节点,完成替换:


在这里插入图片描述

第四次下沉

第四次循环就到了下标0的位置,也就是元素8,首先完成1和3的替换,然后完成3和8的替换:


在这里插入图片描述

这时候因为最小子节点的下标是2,2<half,所以会再次循环(注意再次循环的时候还是拿最开始的元素8来和左右子节点进行比较),然后又会将8和2进行替换,将元素2赋值到下标2的位置,然后这时候不满足循环条件了,结束循环,这时候才正式将元素8赋值到下标6的位置:


在这里插入图片描述

上图中两个流程可以看到,元素8会一路下沉到最后。

到这里完成了初始化排序,最终数组由:[8,5,2,7,6,4,1,9,3]变为[1,3,2,5,6,4,8,9,7]。

添加元素(生产者)

put(E)方法会调用offer(E)方法,上一篇阻塞队列的文章中,我们知道,offer(E)方法是不阻塞的,而这里是无界数组也不会阻塞,所以直接调用offer(E)方法就可以了:

在这里插入图片描述

这里逻辑比较简单,首先看有没有越界,越界了就先进行扩容,扩容放在后面讲。
然后添加元素主要就是进行上浮过程,进入默认的排序规则上浮方法siftUpComparable:


在这里插入图片描述

还是用上面排序后的二叉堆,假如我们现在添加一个元素4,会得到下面这样一个二叉堆:


在这里插入图片描述

这时候为了确保新添加的元素按照排序规则不会比根节点小,需要将新添加的元素进行上浮操作。

第一次上浮

发现4<6,所以将6放到队尾,注意这时候4并不会赋值到队列中,因为4还需要继续上浮确认放在哪个位置


在这里插入图片描述

第二次上浮

第二次上浮会发现4<3,不满足所以会跳出循环,确认将4放在了下标4的位置,完成插入元素操作


在这里插入图片描述

获取元素(消费者)

调用take()方法获取元素


在这里插入图片描述

主要看dequeue()方法:


在这里插入图片描述

这个方法的主要逻辑为:
1、先拿到第一个元素(需要返回)和最后一个元素
2、然后将最后一个元素置为空
3、用存好的最后一个元素的值从头开始下沉
最后一步下沉操作和初始化的最后一步下沉操作是一样的处理方式,直到完成下沉就会诞生一个最小的元素重新放到头节点

扩容

最后我们来分析下扩容tryGrow方法

    private void tryGrow(Object[] array, int oldCap) {
        lock.unlock(); //扩容前先释放锁(扩容可能会费时,先让出锁,让出队线程可以正常操作)
        Object[] newArray = null;
        if (allocationSpinLock == 0 &&
            UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                     0, 1)) {//通过CAS操作确保只有一个线程可以扩容
            try {
                int newCap = oldCap + ((oldCap < 64) ?
                                       (oldCap + 2) : // grow faster if small
                                       (oldCap >> 1));
                if (newCap - MAX_ARRAY_SIZE > 0) {//大于当前最大容量则可能溢出
                    int minCap = oldCap + 1;
                    if (minCap < 0 || minCap > MAX_ARRAY_SIZE)//扩大一个元素也溢出或者超过最大容量则抛出异常
                        throw new OutOfMemoryError();
                    newCap = MAX_ARRAY_SIZE;//扩容后如果超过最大容量,则只扩大到最大容量
                }
                if (newCap > oldCap && queue == array)
                    newArray = new Object[newCap];//根据最新容量初始化一个新数组
            } finally {
                allocationSpinLock = 0;
            }
        }
        if (newArray == null) //如果是空,说明前面CAS失败,有线程在扩容,让出CPU
            Thread.yield();
        lock.lock();//这里重新加锁是确保数组复制操作只有一个线程能进行
        if (newArray != null && queue == array) {
            queue = newArray;
            System.arraycopy(array, 0, newArray, 0, oldCap);//将旧的元素复制到新数组
        }
    }

DelayQueue

DelayQueue是一个支持延时获取元素的无界阻塞队列。队列中使用PriorityQueue来实现。队列中的元素必须实现Delayed接口:


在这里插入图片描述

接口里定义了一个getDelay方法来获取当前剩余的过期时间,另外因实现了Comparable接口,所以还会有一个compareTo方法。

DelayQueue使用示例

1、新建一个对象,实现Delayed ,并重写getDelay和compareTo

package com.zwx.concurrent.queue.block.model;

import java.sql.Time;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class MyElement implements Delayed {
    private long expireTime;//过期时间(毫秒)
    private int id;

    public long getExpireTime() {
        return expireTime;
    }

    public void setExpireTime(long expireTime) {
        this.expireTime = expireTime;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public MyElement(int id, long expireTime) {
        this.id = id;
        this.expireTime = System.currentTimeMillis() + expireTime;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        //类里面接收的是毫秒,但是getDelay方法在DelayQeue里面传的是纳秒,所以这里需要进行一次单位转换
        return unit.convert(expireTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        //注意,这里的排序要确定最先到期的放在第一位,否则会阻塞住后面未到期的
        return Long.valueOf(expireTime).compareTo(((MyElement) o).expireTime);
    }
}

package com.zwx.concurrent.queue.block;

import com.zwx.concurrent.queue.block.model.MyElement;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.DelayQueue;

public class DelayQueueDemo {
    public static void main(String[] args) {
        List<MyElement> list = new ArrayList<>();
        for (int i=1;i<=5;i++){
            MyElement myElement = new MyElement(i,i*1000);
            list.add(myElement);
        }
        DelayQueue delayQueue = new DelayQueue(list);

        while (true){
            try {
                MyElement myElement = (MyElement) delayQueue.take();
                System.out.println(myElement.getId());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

DelayQueue类图

接下来看看类图


在这里插入图片描述

只有两个构造器,第一个是空的构造器,第二个是默认初始化一个集合。

初始化

在这里插入图片描述

通过循环调用add(e)方法进行添加,然后add方法又去调用了offer(e)方法:


在这里插入图片描述

添加元素(消费者)

DelayQueue队列的元素是存在其内部维护的PriorityQueue上,所以上面调用了q.offer(e)方法。
leader表示获取到锁的线程。q.peek()==e表示当前第一个元素就是刚刚添加进去的元素,所以需要将leader设置为空,唤醒出队(消费者)线程重新争抢锁。

q.offer(e)方法的处理方式基本和上面讲的PriorityBlockingQueue中逻辑一致


在这里插入图片描述

获取元素(消费者)

take方法会依次获取元素,如果第一个元素没到期,则会一直阻塞:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null)
                    available.await();//队列为空,则阻塞
                else {
                    long delay = first.getDelay(NANOSECONDS);
                    if (delay <= 0)
                        return q.poll();//如果到期了,则调用poll方法取元素并直接返回
                    first = null; // don't retain ref while waiting
                    if (leader != null)
                        available.await();//头节点不为空,说明有线程持有锁并正在等待到期时间,所以直接阻塞
                    else {//leader==null
                        Thread thisThread = Thread.currentThread();
                        leader = thisThread;//设置头节点为当前线程,表名有线程在等待头节点元素过期
                        try {
                            available.awaitNanos(delay);//阻塞指定时间
                        } finally {
                            if (leader == thisThread)
                                leader = null;
                        }
                    }
                }
            }
        } finally {
            if (leader == null && q.peek() != null)
                available.signal();
            lock.unlock();
        }
    }

Leader-Follower线程模型

在Leader-follower线程模型中每个线程有三种模式:

  • leader:只有一个线程成为leader,如DelayQueue如果有一个线程在等待元素到期,则其他线程就会阻塞等待
  • follower:会一直尝试争抢leader,抢到leader之后才开始干活
  • processing:处理中的线程

DelayQueue队列中有一个leader属性:private Thread leader = null;用到的就是Leader-Follower线程模型。
当有一个线程持有锁,设置了leader属性,正在等待元素到期时,则成为了leader,其他线程就直接阻塞。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342