PriorityBlockingQueue优先级阻塞队列

PriorityBlockingQueue

1、带优先级的无界阻塞队列,每次出队列都返回优先级最高或者最低的元素

2、内部维护最小堆,使用平衡二叉树实现,直接遍历队列元素不保证有序。

3、默认使用对象的compareTo方法比较,支持自定义comparators

类图结构

PrioripyBlockingQueue类图

​ PriorityBlockingQueue使用数组用来存储队列中的元素,元素在队列中根据坐标index呈现平衡二叉树的分布,以第i个元素为例,它的左孩子为2i+1,右孩子为2i+2,在类中常用位运算计算某个节点x的父节点(公式为: (x-1)>>1)。

元素存储关系

​ 在实际情况中,由于内部维护最小堆,使用平衡二叉树实现,直接遍历队列元素不保证有序。如下图情况,在数组中元素并非严格按序存放,需要按右侧逻辑结构理解最小堆的思想。

遍历数组不保证有序

源码分析

PriorityBlockingQueue

添加元素

​ 由上图可知添加元素有add、put和offer方法。

public void put(E e) {
    offer(e); // never need to block
}
public boolean add(E e) {
        return offer(e);
}
public boolean offer(E e) {
    // (1) 判断元素是否为null,null抛出异常
    if (e == null)
        throw new NullPointerException();
    // (2) 获取独占锁,注意一个队列共用一个独占锁
    final ReentrantLock lock = this.lock;
    lock.lock();
    int n, cap;
    Object[] array;
    // (3)如果容量已经达到峰值,需要扩容
    while ((n = size) >= (cap = (array = queue).length))
        tryGrow(array, cap);
  
    try {
        Comparator<? super E> cmp = comparator;
        // (4)根据是否有自定义比较器选择方法
        if (cmp == null)
            siftUpComparable(n, e, array);
        else
            siftUpUsingComparator(n, e, array, cmp);
        size = n + 1;
        // (5)添加元素成功,唤醒notEmpty等待的消费线程
        notEmpty.signal();
    } finally {
        // (6) 释放独占锁
        lock.unlock();
    }
    return true;
}

​ 而从源码易知put、add方法内部调用offer实现。所以添加元素的源码重点在于offer方法。从图中容易发现,与ArrayBlockingQueue不同,虽然同样使用数组作为存储结构,但是PriorityBlockingQueue永远从“结尾“添加元素(但不一定放在结尾),从前面出队列(出队列必取第一个元素)。

tryGrow(Object[] array, int oldCap)

private void tryGrow(Object[] array, int oldCap) {
    // (7)释放独占锁
    lock.unlock(); // must release and then re-acquire main lock
    Object[] newArray = null;
    // (8)申请扩容锁 CAS乐观锁
    if (allocationSpinLock == 0 &&
        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
                                 0, 1)) {
        try {
            // (9)扩容容量算法
            int newCap = oldCap + ((oldCap < 64) ?
                                   (oldCap + 2) : // grow faster if small
                                   (oldCap >> 1));
            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow
                int minCap = oldCap + 1;
                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
                    throw new OutOfMemoryError();
                newCap = MAX_ARRAY_SIZE;
            }
            if (newCap > oldCap && queue == array)
                newArray = new Object[newCap];
        } finally {
            allocationSpinLock = 0;
        }
    }
    // (10)
    if (newArray == null) // back off if another thread is allocating
        Thread.yield();
    lock.lock();
    if (newArray != null && queue == array) {
        queue = newArray;
        System.arraycopy(array, 0, newArray, 0, oldCap);
    }
   // (11)
}

​ 我们发现一进到扩容方法后,(7)处就调用了lock.unlock()释放独占锁,这样的好处是,在进行扩容时,其它线程可以获取到独占锁进行入队列、出队列的操作(因为出队列后,实际容量已经小于最大值,所以存在入队列的可能,但不妨碍之前扩容的线程继续扩容)。由于采取了CAS乐观锁的机制,也可以保证某时刻只有一个线程在进行扩容操作。而其它线程由于竞争失败,不断的自旋(自旋路线为(8)->(10)->(11)->(3)->(8))。

​ 在(10)处,竞争扩容锁失败的线程,会调用Thread.yield();尝试让出CPU时间片,不一定有效。

siftUpComparable(int k, T x, Object[] array)

源码注释:该方法用于将元素x尝试插入到队列k中,如果发现当前位置的父节点值大于元素x,则将元素x往上层提,直到x大于等于它的父节点,或者成为root根节点。

private static <T> void siftUpComparable(int k, T x, Object[] array) {
    // (0) key为目标元素的对应比较器
    Comparable<? super T> key = (Comparable<? super T>) x;
    while (k > 0) {
        //(1)获取父节点位置和父节点元素的值
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        // (2)如果x的值更大,说明找到了合适的位置
        if (key.compareTo((T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = key;
}

​ 用一个容易理解的例子来尝试理解这段代码,当一个年轻人来到公司,一开始处于组织架构树的末端,但是这个年轻人不讲武德,就找到所属小组的老领导偷袭一下,偷袭输了(key.compareTo((T) e) >= 0),老老实实呆在原位。如果偷袭成功(key.compareTo((T) e) < 0),让这个老领导担任年轻人原来的位置 array[k] = e;,然后新人取缔位置k = parent;继续偷袭,偷袭它的老领导的领导(k = parent, int parent = (k - 1) >>> 1,不断往上找父节点),以此类推,直到这个年轻人偷袭成功所有人,成为了大BOSS(root节点),或者被某个领导防御成功了(key.compareTo((T) e) >= 0),就在偷袭到的最高位置坐下。

siftUpUsingComparator

siftUpUsingComparator方法相比siftUpComparable只是比较器采用了自定义比较器,其它没有区别

private static <T> void siftUpUsingComparator(int k, T x, Object[] array,
                                   Comparator<? super T> cmp) {
    while (k > 0) {
        int parent = (k - 1) >>> 1;
        Object e = array[parent];
        if (cmp.compare(x, (T) e) >= 0)
            break;
        array[k] = e;
        k = parent;
    }
    array[k] = x;
}

移除元素

同样的从前面的图中我们可以看到移除元素的方法为poll、peek(不会移除)、take方法。

peek

由前文可知,PriorityBlockingQueue由一个独占锁维护,在peek调用之前会先获取独占锁。确认队列中是否有元素,有则返回index为0的元素,否则返回null。peek方法不会移除元素,只做查询

public E peek() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        return (size == 0) ? null : (E) queue[0];
    } finally {
        lock.unlock();
    }
}

poll

public E poll() {
  final ReentrantLock lock = this.lock;
  lock.lock();
  try {
    return dequeue();
  } finally {
    lock.unlock();
  }
}
  • dequeue
private E dequeue() {
    int n = size - 1;
    if (n < 0)
        return null;
    else {
        Object[] array = queue;
        E result = (E) array[0];
        E x = (E) array[n];
        array[n] = null;
        Comparator<? super E> cmp = comparator;
        if (cmp == null)
            siftDownComparable(0, x, array, n);
        else
            siftDownUsingComparator(0, x, array, n, cmp);
        size = n;
        return result;
    }
}
  • siftDownComparable

核心思想是从根节点开始总左右孩子中找到一个最小的节点,成为新的根节点,

Object c = array[child]; // 左孩子

int right = child + 1; // 右孩子索引

private static <T> void siftDownComparable(int k, T x, Object[] array,
                                           int n) {
    if (n > 0) {
        Comparable<? super T> key = (Comparable<? super T>)x;
        int half = n >>> 1;           // loop while a non-leaf
        // 如果k>=half已经超出到x原有的位置,没有必要
        while (k < half) {
            int child = (k << 1) + 1; // assume left child is least
            Object c = array[child]; // 左孩子,c默认为左孩子节点
            int right = child + 1; // 右孩子索引
            // 右孩子索引在有效范围内,即是否存在 && 右孩子更小
            if (right < n &&
                ((Comparable<? super T>) c).compareTo((T) array[right]) > 0)
                // c = 右孩子节点
                c = array[child = right];
            // 如果x小于等于于左(右)孩子,x成为根节点,这样变动最小
            // 如果x大于左(右)孩子,让左(右)孩子成为根节点,x取代他们原来的位置
            if (key.compareTo((T) c) <= 0)
                break;
            array[k] = c;
            k = child;
        }
        array[k] = key;
    }
}

take

take方法获取独占锁支持中断,

如果队列中没有元素,等待唤醒。 while循环避免虚假唤醒

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    E result;
    try {
        // 队列中没有元素,等待唤醒。 while循环避免虚假唤醒
        while ( (result = dequeue()) == null)
            notEmpty.await();
    } finally {
        lock.unlock();
    }
    return result;
}

参考

《Java并发编程实战》

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容