简介
LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。
和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。
探索LinkedBlockingQueue
1.主要属性
static class Node<E> {
//数据存储
E item;
//后继节点
Node<E> next;
Node(E x) { item = x; }
}
//最大容量,默认为Integer.MAX_VALUE
private final int capacity;
//当前队列中的元素数量,与ArrayBlockingQueue不同
private final AtomicInteger count = new AtomicInteger(0);
//头节点,head.item恒为null
private transient Node<E> head;
//尾节点,last.next恒为null
private transient Node<E> last;
//出列锁
private final ReentrantLock takeLock = new ReentrantLock();
//非空条件
private final Condition notEmpty = takeLock.newCondition();
//入列锁
private final ReentrantLock putLock = new ReentrantLock();
//非满条件
private final Condition notFull = putLock.newCondition();
从上面的属性可以看出,LinkedBlockingQueue主要以链表的方式进行数据存储,出列与入列通过不同的锁进行线程安全控制。
2.初始化
接着看构造方法。
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);//默认最大容量为Integer.MAX_VALUE
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化收尾节点
last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
//加入列锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
int n = 0;
//遍历入列
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
//处理尾节点
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}
再看enqueue()方法:可读性不是很好,分析后,可以等价于last.next = node; last = node;即将node链接到当前队列的末尾,然后将末节点设置为当前node。
private void enqueue(Node<E> node) {
last = last.next = node;//等价于 last.next = node; last = node;
}
3.入列
入列方法有offer(),和put()。
offer()方法:作用是将元素e插入到阻塞队列的尾部,如果队满,返回false,即插入失败。否则,插入元素。
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
//获取当前元素数量
final AtomicInteger count = this.count;
//等于队列的最大容量,返回false
if (count.get() == capacity)
return false;
int c = -1; //-1标识入列失败
Node<E> node = new Node(e);
//加锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
//判断当前数量是否小于最大容量
if (count.get() < capacity) {
//尾节点处理
enqueue(node);
//当前容量+1
c = count.getAndIncrement();
//判断当前容量是否大于最大容量,如果小于最大容量,唤醒其他入列线程。
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// c!=-1即已经走了上面的代码的逻辑,所以此时队列长度不为0
// 唤醒等待在NotEmpty条件的线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
再看 signalNotEmpty()方法:主要是唤醒等待在notEmpty线程的出列操作。
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
offer()同样有重载一个延迟等待方法,可以看到该方法只是多了等待时间,如果超时就返回false,其他与offer()的逻辑一样。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
if (e == null) throw new NullPointerException();
//获取等待时间(纳秒)
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//设置线程可中断
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
//设置最长等待时间
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}
再看put()方法:与offer(E e, long timeout, TimeUnit unit)不同的地方在于,当队满时无限等待,这里不做多解释。
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
//当前队列满,则持续等待。
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
4.出列
出列的方法有take(),poll()等,由于逻辑基本相似,这里只分析take()方法。
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//加可中断锁
takeLock.lockInterruptibly();
try {
//当前队列长度为0,线程等待
while (count.get() == 0) {
notEmpty.await();
}
//出列操作
x = dequeue();
//队列长度减一
c = count.getAndDecrement();
//队列长度还大于1,唤醒其他出队线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c!=-1 说明已经执行上面的逻辑,此时队列长度可能小于最大长度,唤醒等待在NotFull入队线程
if (c == capacity)
signalNotFull();
return x;
}
再看dequeue()方法:处理头结点的方法,获取第一个元素后,断开链表,让第一个结点作为头结点以及将其的数据部分设置为空,同时让原来的头结点自关联帮助GC。
这里我觉得有点奇怪,为什么不直接把first结点断开,然后h.next = first.next. 最后将first.next=null?
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // 将头结点的后继结点指向自己,有助于GC
head = first; //将头结点
E x = first.item;
first.item = null; //头结点具有一致性:即数据为null,后继结点才是真实数据结点
return x;
}
5.其他方法
remainingCapacity()查看剩余的长度;
size() () 查看当前队列长度;
clear() 清除当前队列所有数据;
....
ArrayBlockingQueue和LinkedBlockingQueue比较
1.ArrayBlockingQueue一个有界的阻塞队列,由于其底层基于数组,初始化后就会立即在内存分配固定大小容量的数组元素;而LinkedBlockingQueue配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的,默认大小为Integer.MAX_VALUE的容量,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。
2.LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,而ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而故LinkedBlockingQueue的吞吐量要高于后者。