PriorityBlockingQueue是带优先级的无界阻塞队列,每次出队都返回优先级最高或最低的元素。其内部使用平衡二叉树堆实现的,所以遍历队列元素不能保证有序性。默认使用对象的compareTo方法进行比较,也可以自定义comparators。
PriorityBlockingQueue内部有一个数组用来存放队列元素,在前面介绍的ArrayBlockingQueue类中也有一个数组存放队列元素,为什么ArrayBlockingQueue是有界队列而PriorityBlockingQueue是无界队列呢?因为在PriorityBlockingQueue内部会对存放队列元素的数据进行扩容,扩容要保证只能一个线程进行,所以PriorityBlockingQueue内部有一个自旋锁 allocationSpinLock,其使用CAS操作来保证只有一个线程可以扩容。 PriorityBlockingQueue类中还有一个ReentrantLock对象锁,队列的读写操作需要获取该对象。由于是无界队列生成元素并不受限制,但是队列为空时消费数据会被限制(阻塞),所以PriorityBlockingQueue内部只有一个条件变量来实现消费模式。
PriorityBlockingQueue内部主要的成员变量:
private transient Object[]queue;
private transient volatile int allocationSpinLock;
private final ReentrantLocklock;
private final ConditionnotEmpty;
下面对主要函数原理进行讲解。
offer(E e)
offer方法向队列中插入一个元素,由于是无界队列,所以插入操作总是返回true。
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
//(1)尝试获取独占锁对象。
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
//(2)判断队列是否需要进行扩容。
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
//(3)使用默认对比器或自定义对比器进行建二叉树堆
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
//(4)通知因为队列为空而阻塞的消费者可以进行获取数据。
notEmpty.signal();
} finally {
//(5)释放锁。
lock.unlock();
}
return true;
}
tryGrow(Object[] array, int oldCap)
对队列进行扩容,使用自旋锁和CAS算法保证只有一个线程能进行扩容。
private void tryGrow(Object[] array, int oldCap) {
//(1)释放获取的独占锁,在进行扩容的过程让别的线程也可以获取到该锁。
lock.unlock();
Object[] newArray = null;
//(2)CAS成功则进行扩容。
if (allocationSpinLock == 0 &&
UNSAFE.compareAndSwapInt(this, allocationSpinLockOffset,
0, 1)) {
try {
//(3)进行扩容,oldGap<64则增加oldCap+2,否则增加oldCap的一半,并且容量最大值为MAX_ARRAY_SIZE。从这来看虽然队列会进行扩容,但也不是无限扩容,严格来说也应该算是有界的。
int newCap = oldCap + ((oldCap < 64) ?
(oldCap + 2) : // grow faster if small
(oldCap >> 1));
if (newCap - MAX_ARRAY_SIZE > 0) { // possible overflow
int minCap = oldCap + 1;
if (minCap < 0 || minCap > MAX_ARRAY_SIZE)
throw new OutOfMemoryError();
newCap = MAX_ARRAY_SIZE;
}
if (newCap > oldCap && queue == array)
newArray = new Object[newCap];
} finally {
//(4)扩完容之后将自旋锁allocationSpinLock 设置为0,允许下次进行扩容。
allocationSpinLock = 0;
}
}
//(5)第一个线程CAS成功后,第二个线程会进入这段代码,然后第二个线程会让出cpu,尽量让第一个线程获取锁,但这不保证一定可以。if (newArray == null) // back off if another thread is allocating
Thread.yield();
//(6)获取锁,将原来队列中的元素拷贝到扩容后的队列中。
lock.lock();
if (newArray != null && queue == array) {
queue = newArray;
System.arraycopy(array, 0, newArray, 0, oldCap);
}
}
poll()
获取队列内部堆树的根节点元素,如果队列为空则返回null。
public E poll() {
//(1)尝试获取锁。
final ReentrantLock lock = this.lock;
lock.lock();
try {
//(2)获取队列的第一个元素,并整理二叉树堆。
return dequeue();
} finally {
//(3)释放锁。
lock.unlock();
}
}
put()
由于是无界队列,不需要阻塞,put方法内部调用的offer方法,这就不进行赘述了。
public void put(E e) {
offer(e); // never need to block
}
take()
获取队列内部堆树的根节点元素,如果队列为空则阻塞。
public E take() throws InterruptedException {
//(1)尝试获取锁对象,调用的是lockInterruptibly方法,所以在当其他线程设置了中断标志,该线程会抛出InterruptedException异常。
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
//(2)如果队列为空,则阻塞,停止消费。
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
//(3)释放锁。
lock.unlock();
}
return result;
}
size()
size方法需要获取锁,因为本类中的size变量没有volatile变量修饰无法保证内存的可见性。
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return size;
} finally {
lock.unlock();
}
}
PriorityBlockingQueue队列在内部使用二叉树堆维护元素的优先级,使用数组作为元素的存储的数据结构,该数组可进行扩容,但是容量也是有限制的,使用CAS来保证扩容时的唯一性。
今天的分享就到这,有看不明白的地方一定是我写的不够清楚,所有欢迎提任何问题以及改善方法。