多线程并发编程14-PriorityBlockingQueue源码剖析

    PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或最低的元素。其内部使用平衡二叉树堆实现的,所以遍历队列元素不能保证有序性。默认使用对象的compareTo方法进行比较,也可以自定义comparators。

    PriorityBlockingQueue内部有一个数组用来存放队列元素,在前面介绍的ArrayBlockingQueue类中也有一个数组存放队列元素,为什么ArrayBlockingQueue是有界队列而PriorityBlockingQueue是无界队列呢?因为在PriorityBlockingQueue内部会对存放队列元素的数据进行扩容,扩容要保证只能一个线程进行,所以PriorityBlockingQueue内部有一个自旋锁 allocationSpinLock,其使用CAS操作来保证只有一个线程可以扩容。   PriorityBlockingQueue类中还有一个ReentrantLock对象锁,队列的读写操作需要获取该对象。由于是无界队列生成元素并不受限制,但是队列为空时消费数据会被限制(阻塞),所以PriorityBlockingQueue内部只有一个条件变量来实现消费模式。

    PriorityBlockingQueue内部主要的成员变量:

private transient Object[]queue;

private transient volatile int allocationSpinLock;

private final ReentrantLocklock;

private final ConditionnotEmpty;

    下面对主要函数原理进行讲解。

offer(E e)

    offer方法向队列中插入一个元素,由于是无界队列,所以插入操作总是返回true。

public boolean offer(E e) {

    if (e == null)

        throw new NullPointerException();

//(1)尝试获取独占锁对象。

    final ReentrantLock lock = this.lock;

    lock.lock();

    int n, cap;

    Object[] array;

//(2)判断队列是否需要进行扩容。

    while ((n = size) >= (cap = (array = queue).length))

        tryGrow(array, cap);

    try {

        Comparator<? super E> cmp = comparator;

//(3)使用默认对比器或自定义对比器进行建二叉树堆

        if (cmp == null)

            siftUpComparable(n, e, array);

        else

            siftUpUsingComparator(n, e, array, cmp);

        size = n + 1;

//(4)通知因为队列为空而阻塞的消费者可以进行获取数据。

        notEmpty.signal();

    } finally {

//(5)释放锁。

        lock.unlock();

    }

    return true;

}

tryGrow(Object[] array, int oldCap)

    对队列进行扩容,使用自旋锁和CAS算法保证只有一个线程能进行扩容。

private void tryGrow(Object[] array, int oldCap) {

//(1)释放获取的独占锁,在进行扩容的过程让别的线程也可以获取到该锁。

    lock.unlock(); 

    Object[] newArray = null;

//(2)CAS成功则进行扩容。

    if (allocationSpinLock == 0 &&

        UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,

                                0, 1)) {

        try {

//(3)进行扩容,oldGap<64则增加oldCap+2,否则增加oldCap的一半,并且容量最大值为MAX_ARRAY_SIZE。从这来看虽然队列会进行扩容,但也不是无限扩容,严格来说也应该算是有界的。

            int newCap = oldCap + ((oldCap < 64) ?

                                  (oldCap + 2) : // grow faster if small

                                  (oldCap >> 1));

            if (newCap - MAX_ARRAY_SIZE > 0) {    // possible overflow

                int minCap = oldCap + 1;

                if (minCap < 0 || minCap > MAX_ARRAY_SIZE)

                    throw new OutOfMemoryError();

                newCap = MAX_ARRAY_SIZE;

            }

            if (newCap > oldCap && queue == array)

                newArray = new Object[newCap];

        } finally {

    //(4)扩完容之后将自旋锁allocationSpinLock  设置为0,允许下次进行扩容。

            allocationSpinLock = 0;

        }

    }
//(5)第一个线程CAS成功后,第二个线程会进入这段代码,然后第二个线程会让出cpu,尽量让第一个线程获取锁,但这不保证一定可以。

    if (newArray == null) // back off if another thread is allocating

        Thread.yield();

//(6)获取锁,将原来队列中的元素拷贝到扩容后的队列中。

    lock.lock();

    if (newArray != null && queue == array) {

        queue = newArray;

        System.arraycopy(array, 0, newArray, 0, oldCap);

    }

}

poll()

    获取队列内部堆树的根节点元素,如果队列为空则返回null。

public E poll() {

//(1)尝试获取锁。

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

//(2)获取队列的第一个元素,并整理二叉树堆。

        return dequeue();

    } finally {

//(3)释放锁。

        lock.unlock();

    }

}

put()

    由于是无界队列,不需要阻塞,put方法内部调用的offer方法,这就不进行赘述了。

public void put(E e) {

    offer(e); // never need to block

}

take()

    获取队列内部堆树的根节点元素,如果队列为空则阻塞。

public E take() throws InterruptedException {

//(1)尝试获取锁对象,调用的是lockInterruptibly方法,所以在当其他线程设置了中断标志,该线程会抛出InterruptedException异常。

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    E result;

    try {

//(2)如果队列为空,则阻塞,停止消费。

        while ( (result = dequeue()) == null)

            notEmpty.await();

    } finally {

//(3)释放锁。

        lock.unlock();

    }

    return result;

}

size()

    size方法需要获取锁,因为本类中的size变量没有volatile变量修饰无法保证内存的可见性。

public int size() {

    final ReentrantLock lock = this.lock;

    lock.lock();

    try {

        return size;

    } finally {

        lock.unlock();

    }

}

    PriorityBlockingQueue队列在内部使用二叉树堆维护元素的优先级,使用数组作为元素的存储的数据结构,该数组可进行扩容,但是容量也是有限制的,使用CAS来保证扩容时的唯一性。

     今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,802评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,109评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,683评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,458评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,452评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,505评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,901评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,550评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,763评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,556评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,629评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,330评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,898评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,897评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,140评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,807评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,339评论 2 342

推荐阅读更多精彩内容