前言
LinkedBlockingDeque
是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。
本文源码: 本文源码下载
实现思路
无并发的情况
由于需要在队列的两端都可以插入和移除元素. 加之需要用链表实现, 所以需要一个双向链表来实现.
关于deque
如果不了解的话可以去做一下这道题 Design Circular Deque.
并发的情况
并发的情况下需要多线程操作同一个队列(也就是这个双向链表),
LinkedBlockingQueue
采用的是两个锁, 而LinkedBlockingDeque
则是采用了一个锁,在写操作的过程中需要加锁保证同步.
源码
属性
/** 双向链表 节点类 */
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
/**
* Pointer to first node.
* Invariant: (first == null && last == null) ||
* (first.prev == null && first.item != null)
*/
transient Node<E> first;
/**
* Pointer to last node.
* Invariant: (first == null && last == null) ||
* (last.next == null && last.item != null)
*/
transient Node<E> last;
/** deque中元素的个数 */
private transient int count;
/** 容量 */
private final int capacity;
/** Main lock guarding all access */
final ReentrantLock lock = new ReentrantLock();
/** Condition for waiting takes */
private final Condition notEmpty = lock.newCondition();
/** Condition for waiting puts */
private final Condition notFull = lock.newCondition();
有以下属性
Node
: 节点类
first
: 双向链表的头节点. (里面会存有真正的值, 与LinkedBlockingQueue
的head
不同,只是个虚设的作用.)
last
: 双向链表的尾节点.
capacity
: 链表最大容量.
lock
: 可重入锁.
notEmpty
: 当链表为空让线程等待的作用.
notFull
: 当链表满的时候让线程等待的作用.
构造方法
/**
* Creates a {@code LinkedBlockingDeque} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
/**
* Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
*
* @param capacity the capacity of this deque
* @throws IllegalArgumentException if {@code capacity} is less than 1
*/
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
可以看到初始化的过程中初始化了
capacity
,但是根本没有初始化first
和last
节点.
辅助方法
一些链表的常见操作包括在两端插入和删除元素, 这些方法只有在获得锁的时候才可以被调用.
/**
* 将新节点node加入到双向链表的头部
* 如果成功加入 返回true
* 否则 返回false (超过了所允许的最大容量)
*/
private boolean linkFirst(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> f = first;
node.next = f;
first = node;
if (last == null)
last = node;
else
f.prev = node;
++count;
notEmpty.signal();
return true;
}
/**
* 将新节点node加入到双向链表的尾部
* 如果成功加入 返回true
* 否则 返回false (超过了所允许的最大容量)
*
*/
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
/**
* 从链表头中删除一个节点并返回该节点的元素
* 另外给一个因为链表容量满而休眠的线程信号
* 如果链表为空则返回null
*/
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
/**
* 从链表尾中删除一个节点并返回该节点的元素
* 另外给一个因为链表容量满而休眠的线程信号
* 如果链表为空则返回null
*/
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last;
if (l == null)
return null;
Node<E> p = l.prev;
E item = l.item;
l.item = null;
l.prev = l; // help GC
last = p;
if (p == null)
first = null;
else
p.next = null;
--count;
notFull.signal();
return item;
}
/**
* Unlinks x.
*/
void unlink(Node<E> x) {
// assert lock.isHeldByCurrentThread();
Node<E> p = x.prev;
Node<E> n = x.next;
if (p == null) { // 如果是头节点
unlinkFirst();
} else if (n == null) { // 如果是尾节点
unlinkLast();
} else { // 如果既不是头节点也不是尾节点
p.next = n;
n.prev = p;
x.item = null;
// Don't mess with x's links. They may still be in use by
// an iterator.
--count;
notFull.signal();
}
}
BlockingDeque的一些方法
插入元素
/**
* 往头部加入节点e
* 如果deque已满 则让该线程休眠等待
*
* 有两种情况会加入不成功:
* 1. e为null 抛出运行时异常NullPointerException
* 2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
*
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(node))
notFull.await();
} finally {
lock.unlock();
}
}
/**
* 往尾部加入节点e
* 如果deque已满 则让该线程休眠等待
*
* 有两种情况会加入不成功:
* 1. e为null 抛出运行时异常NullPointerException
* 2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
可以看到
putFirst
和putLast
分别从头尾两端插入元素.(必须先获得锁)
如果元素满的时候会一直等待, 另外有两种情况下插入元素不会成功.
1. e为null 抛出运行时异常NullPointerException.
2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
另外需要注意的是如果元素满的时候会调用
notFull.await()
导致该线程休眠,那什么时候会signal
呢,在上面的辅助方法中unlink
中会有notFull
的signal
,同理在link
的时候也有notEmpty
的signal
.
关于
offer
和add
等方法就不多介绍了,原理基本一样.
消费元素
/**
* 取链表的头节点
* 如果链表为空 会一直等待
*
* 如果等待的过程中该线程被其他的线程中断 则抛出InterruptedException异常
* @return
* @throws InterruptedException
*/
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
/**
* 取链表的尾节点
* 如果链表为空 会一直等待
*
* 如果等待的过程中该线程被其他的线程中断 则抛出InterruptedException异常
* @return
* @throws InterruptedException
*/
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
可以看到
takeFirst
和takeLast
是分别从队列两端消费元素,如果链表为空,则会一直等待. 有一种情况下该线程是没有成功取得元素的.
1. 等待的过程中该线程被其他的线程中断 则抛出InterruptedException异常.
与
put
对应的就是take
,与offer
对应的就是poll
. 基本原理差不多, 就不多说了.
删除元素
/**
* 删除从链表头开始第一个出现的o
* true 删除成功
* false 删除失败 (o为null删除失败)
* @param o
* @return
*/
public boolean removeFirstOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = first; p != null; p = p.next) {
if (o.equals(p.item)) {
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
/**
* 删除从链表尾到头开始第一个出现的o
* true 删除成功
* false 删除失败 (o为null删除失败)
* @param o
* @return
*/
public boolean removeLastOccurrence(Object o) {
if (o == null) return false;
final ReentrantLock lock = this.lock;
lock.lock();
try {
for (Node<E> p = last; p != null; p = p.prev) {
if (o.equals(p.item)) {
unlink(p);
return true;
}
}
return false;
} finally {
lock.unlock();
}
}
removeFirstOccurrence
和removeLastOccurrence
分别是从头尾两端开始找到第一个符合的元素进行删除.
true
表示删除成功.
false
表示删除失败.
以上是
BlockingDeque
的方法, 如果你想把它当做一个blockingqueue
来用也是可以的,那就从尾部加入元素,从头部消费数据即可.
BlockingQueue methods 阻塞队列的方法
/**
* 调用blockingDeque的尾部加入方法
* 有两种情况会加入不成功:
* 1. e为null 抛出运行时异常NullPointerException
* 2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
* @throws NullPointerException {@inheritDoc}
* @throws InterruptedException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
putLast(e);
}
/**
* 调用blockingDeque的头部消费方法
* 在等待的过程中如果该线程被其他的线程中断 则抛出InterruptedException异常
* @return
* @throws InterruptedException
*/
public E take() throws InterruptedException {
return takeFirst();
}
实现了
BlockingQueue
的所有接口方法包括put
,take
等等.
这里只给了take
,put
方法, 其余的方法基本上也是调用BlockingDeque
上面的方法实现的.
Stack methods 栈的方法
同样的也可以实现一个栈的基本功能, 在头部加入在头部消费.
/**
* @throws IllegalStateException if this deque is full
* @throws NullPointerException {@inheritDoc}
*/
public void push(E e) {
addFirst(e);
}
/**
* @throws NoSuchElementException {@inheritDoc}
*/
public E pop() {
return removeFirst();
}
遍历元素
由于该队列是需要两端都可以加入和消费,意味着遍历的话也是需要从两端都可以遍历. 因此有两个
Iterator
实现类.Itr
(从头到尾遍历) 和DescendingItr
从尾到头遍历. 这两个类都继承于AbstractItr
(该类实现了大部分的方法).
private abstract class AbstractItr implements Iterator<E> {
/**
* 在next()中要返回的节点
*/
Node<E> next;
/**
* 在next()中要返回的元素
*/
E nextItem;
/**
* 调用remove方法删除的就是该lastRet节点 (上一个被返回的节点)
*/
private Node<E> lastRet;
// 留给子类实现
abstract Node<E> firstNode();
abstract Node<E> nextNode(Node<E> n);
AbstractItr() {
// set to initial position
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// 获得next和nextItem
next = firstNode();
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}
/**
* 下一个节点
*/
private Node<E> succ(Node<E> n) {
// Chains of deleted nodes ending in null or self-links
// are possible if multiple interior nodes are removed.
for (;;) {
Node<E> s = nextNode(n);
if (s == null)
return null;
else if (s.item != null)
return s;
else if (s == n) // 表示该节点已经被删除
return firstNode();
else
n = s;
}
}
/**
* 计算出next和nextItem
*/
void advance() {
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
// assert next != null;
next = succ(next);
nextItem = (next == null) ? null : next.item;
} finally {
lock.unlock();
}
}
public boolean hasNext() {
return next != null;
}
public E next() {
if (next == null)
throw new NoSuchElementException();
lastRet = next; // 更新lastRet
E x = nextItem;
advance(); // 获得next和nextItem
return x;
}
public void remove() {
Node<E> n = lastRet;
if (n == null)
throw new IllegalStateException();
lastRet = null;
final ReentrantLock lock = LinkedBlockingDeque.this.lock;
lock.lock();
try {
if (n.item != null)
unlink(n);
} finally {
lock.unlock();
}
}
}
/** Forward iterator */
private class Itr extends AbstractItr {
Node<E> firstNode() { return first; }
Node<E> nextNode(Node<E> n) { return n.next; }
}
/** Descending iterator */
private class DescendingItr extends AbstractItr {
Node<E> firstNode() { return last; }
Node<E> nextNode(Node<E> n) { return n.prev; }
}
这里看个例子
package com.linkedblockingdeque;
import java.util.Iterator;
public class Test01 {
public static void main(String[] args) {
LinkedBlockingDeque<Integer> lbd = new LinkedBlockingDeque<>();
lbd.addLast(5);
Iterator iterator = lbd.iterator();
System.out.println(iterator.hasNext());
lbd.remove(5);
System.out.println(iterator.next());
Iterator iterator1 = lbd.iterator();
System.out.println(iterator1.hasNext());
}
}
输出如下: 可以看到我们在删除
5
这个节点后居然用这个iterator
还可以输出5
这个节点. 用下面这个图来说明一下在此双链表中删除一个节点会让该节点指向自己.
true
5
false
疑问
最后留个疑问
为什么LinkedBlockingQueue
是用两个锁来实现,而LinkedBlockingDeque
是用一个锁来实现.
参考
1.
Java1.8
源码.
2.Java
并发编程的艺术