[Java源码][并发J.U.C]---阻塞队列LinkedBlockingDeque

前言

LinkedBlockingDeque是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。

本文源码: 本文源码下载

实现思路

无并发的情况

由于需要在队列的两端都可以插入和移除元素. 加之需要用链表实现, 所以需要一个双向链表来实现.
关于deque如果不了解的话可以去做一下这道题 Design Circular Deque.

并发的情况

并发的情况下需要多线程操作同一个队列(也就是这个双向链表), LinkedBlockingQueue采用的是两个锁, 而LinkedBlockingDeque则是采用了一个锁,在写操作的过程中需要加锁保证同步.

源码

属性

    /** 双向链表 节点类 */
    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;
        Node(E x) {
            item = x;
        }
    }

    /**
     * Pointer to first node.
     * Invariant: (first == null && last == null) ||
     *            (first.prev == null && first.item != null)
     */
    transient Node<E> first;

    /**
     * Pointer to last node.
     * Invariant: (first == null && last == null) ||
     *            (last.next == null && last.item != null)
     */
    transient Node<E> last;

    /** deque中元素的个数 */
    private transient int count;

    /** 容量 */
    private final int capacity;

    /** Main lock guarding all access */
    final ReentrantLock lock = new ReentrantLock();

    /** Condition for waiting takes */
    private final Condition notEmpty = lock.newCondition();

    /** Condition for waiting puts */
    private final Condition notFull = lock.newCondition();

有以下属性
Node: 节点类
first: 双向链表的头节点. (里面会存有真正的值, 与LinkedBlockingQueuehead不同,只是个虚设的作用.)
last: 双向链表的尾节点.
capacity: 链表最大容量.
lock: 可重入锁.
notEmpty: 当链表为空让线程等待的作用.
notFull: 当链表满的时候让线程等待的作用.

构造方法

/**
     * Creates a {@code LinkedBlockingDeque} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);
    }

    /**
     * Creates a {@code LinkedBlockingDeque} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this deque
     * @throws IllegalArgumentException if {@code capacity} is less than 1
     */
    public LinkedBlockingDeque(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
    }

可以看到初始化的过程中初始化了capacity,但是根本没有初始化firstlast节点.

辅助方法

一些链表的常见操作包括在两端插入和删除元素, 这些方法只有在获得锁的时候才可以被调用.

/**
     * 将新节点node加入到双向链表的头部
     * 如果成功加入 返回true
     * 否则 返回false (超过了所允许的最大容量)
     */
    private boolean linkFirst(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> f = first;
        node.next = f;
        first = node;
        if (last == null)
            last = node;
        else
            f.prev = node;
        ++count;
        notEmpty.signal();
        return true;
    }

    /**
     * 将新节点node加入到双向链表的尾部
     * 如果成功加入 返回true
     * 否则 返回false (超过了所允许的最大容量)
     *
     */
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }

    /**
     * 从链表头中删除一个节点并返回该节点的元素
     * 另外给一个因为链表容量满而休眠的线程信号
     * 如果链表为空则返回null
     */
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return item;
    }

    /**
     * 从链表尾中删除一个节点并返回该节点的元素
     * 另外给一个因为链表容量满而休眠的线程信号
     * 如果链表为空则返回null
     */
    private E unlinkLast() {
        // assert lock.isHeldByCurrentThread();
        Node<E> l = last;
        if (l == null)
            return null;
        Node<E> p = l.prev;
        E item = l.item;
        l.item = null;
        l.prev = l; // help GC
        last = p;
        if (p == null)
            first = null;
        else
            p.next = null;
        --count;
        notFull.signal();
        return item;
    }

    /**
     * Unlinks x.
     */
    void unlink(Node<E> x) {
        // assert lock.isHeldByCurrentThread();
        Node<E> p = x.prev;
        Node<E> n = x.next;
        if (p == null) { // 如果是头节点
            unlinkFirst();
        } else if (n == null) {  // 如果是尾节点
            unlinkLast();
        } else { // 如果既不是头节点也不是尾节点
            p.next = n;
            n.prev = p;
            x.item = null;
            // Don't mess with x's links.  They may still be in use by
            // an iterator.
            --count;
            notFull.signal();
        }
    }

BlockingDeque的一些方法

插入元素
/**
     * 往头部加入节点e
     * 如果deque已满 则让该线程休眠等待
     *
     * 有两种情况会加入不成功:
     * 1. e为null 抛出运行时异常NullPointerException
     * 2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
     *
     * @throws NullPointerException {@inheritDoc}
     * @throws InterruptedException {@inheritDoc}
     */
    public void putFirst(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkFirst(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

    /**
     * 往尾部加入节点e
     * 如果deque已满 则让该线程休眠等待
     *
     * 有两种情况会加入不成功:
     * 1. e为null 抛出运行时异常NullPointerException
     * 2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
     * @throws NullPointerException {@inheritDoc}
     * @throws InterruptedException {@inheritDoc}
     */
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }

可以看到putFirstputLast分别从头尾两端插入元素.(必须先获得锁)
如果元素满的时候会一直等待, 另外有两种情况下插入元素不会成功.
1. e为null 抛出运行时异常NullPointerException.
2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常

另外需要注意的是如果元素满的时候会调用notFull.await()导致该线程休眠,那什么时候会signal呢,在上面的辅助方法中unlink中会有notFullsignal,同理在link的时候也有notEmptysignal.

关于offeradd等方法就不多介绍了,原理基本一样.

消费元素
/**
     *  取链表的头节点
     *  如果链表为空 会一直等待
     *
     *  如果等待的过程中该线程被其他的线程中断 则抛出InterruptedException异常
     * @return
     * @throws InterruptedException
     */
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }

    /**
     *  取链表的尾节点
     *  如果链表为空 会一直等待
     *
     *  如果等待的过程中该线程被其他的线程中断 则抛出InterruptedException异常
     * @return
     * @throws InterruptedException
     */
    public E takeLast() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkLast()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }

可以看到takeFirsttakeLast是分别从队列两端消费元素,如果链表为空,则会一直等待. 有一种情况下该线程是没有成功取得元素的.
1. 等待的过程中该线程被其他的线程中断 则抛出InterruptedException异常.

put对应的就是take,与offer对应的就是poll. 基本原理差不多, 就不多说了.

删除元素
/**
     * 删除从链表头开始第一个出现的o
     * true 删除成功
     * false 删除失败 (o为null删除失败)
     * @param o
     * @return
     */
    public boolean removeFirstOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Node<E> p = first; p != null; p = p.next) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

    /**
     * 删除从链表尾到头开始第一个出现的o
     * true 删除成功
     * false 删除失败 (o为null删除失败)
     * @param o
     * @return
     */
    public boolean removeLastOccurrence(Object o) {
        if (o == null) return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            for (Node<E> p = last; p != null; p = p.prev) {
                if (o.equals(p.item)) {
                    unlink(p);
                    return true;
                }
            }
            return false;
        } finally {
            lock.unlock();
        }
    }

removeFirstOccurrenceremoveLastOccurrence 分别是从头尾两端开始找到第一个符合的元素进行删除.
true 表示删除成功.
false 表示删除失败.

以上是BlockingDeque的方法, 如果你想把它当做一个blockingqueue来用也是可以的,那就从尾部加入元素,从头部消费数据即可.

BlockingQueue methods 阻塞队列的方法

/**
     * 调用blockingDeque的尾部加入方法
     * 有两种情况会加入不成功:
     * 1. e为null 抛出运行时异常NullPointerException
     * 2. 线程休眠过程中被别的线程中断 抛出InterruptedException异常
     * @throws NullPointerException {@inheritDoc}
     * @throws InterruptedException {@inheritDoc}
     */
    public void put(E e) throws InterruptedException {
        putLast(e);
    }
/**
     * 调用blockingDeque的头部消费方法
     * 在等待的过程中如果该线程被其他的线程中断 则抛出InterruptedException异常
     * @return
     * @throws InterruptedException
     */
    public E take() throws InterruptedException {
        return takeFirst();
    }

实现了BlockingQueue的所有接口方法包括put,take等等.
这里只给了take,put方法, 其余的方法基本上也是调用BlockingDeque上面的方法实现的.

Stack methods 栈的方法

同样的也可以实现一个栈的基本功能, 在头部加入在头部消费.

/**
     * @throws IllegalStateException if this deque is full
     * @throws NullPointerException {@inheritDoc}
     */
    public void push(E e) {
        addFirst(e);
    }

    /**
     * @throws NoSuchElementException {@inheritDoc}
     */
    public E pop() {
        return removeFirst();
    }

遍历元素

由于该队列是需要两端都可以加入和消费,意味着遍历的话也是需要从两端都可以遍历. 因此有两个Iterator实现类. Itr (从头到尾遍历) 和 DescendingItr从尾到头遍历. 这两个类都继承于AbstractItr(该类实现了大部分的方法).

private abstract class AbstractItr implements Iterator<E> {
        /**
         * 在next()中要返回的节点
         */
        Node<E> next;

        /**
         * 在next()中要返回的元素
         */
        E nextItem;

        /**
         * 调用remove方法删除的就是该lastRet节点 (上一个被返回的节点)
         */
        private Node<E> lastRet;

        // 留给子类实现
        abstract Node<E> firstNode(); 
        abstract Node<E> nextNode(Node<E> n);

        AbstractItr() {
            // set to initial position
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                // 获得next和nextItem
                next = firstNode();
                nextItem = (next == null) ? null : next.item;
            } finally {
                lock.unlock();
            }
        }

        /**
         * 下一个节点
         */
        private Node<E> succ(Node<E> n) {
            // Chains of deleted nodes ending in null or self-links
            // are possible if multiple interior nodes are removed.
            for (;;) {
                Node<E> s = nextNode(n);
                if (s == null)
                    return null;
                else if (s.item != null)
                    return s;
                else if (s == n)  // 表示该节点已经被删除
                    return firstNode();
                else
                    n = s;
            }
        }

        /**
         * 计算出next和nextItem
         */
        void advance() {
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                // assert next != null;
                next = succ(next);
                nextItem = (next == null) ? null : next.item;
            } finally {
                lock.unlock();
            }
        }

        public boolean hasNext() {
            return next != null;
        }

        public E next() {
            if (next == null)
                throw new NoSuchElementException();
            lastRet = next; // 更新lastRet
            E x = nextItem;
            advance(); // 获得next和nextItem
            return x;
        }

        public void remove() {
            Node<E> n = lastRet;
            if (n == null)
                throw new IllegalStateException();
            lastRet = null;
            final ReentrantLock lock = LinkedBlockingDeque.this.lock;
            lock.lock();
            try {
                if (n.item != null)
                    unlink(n);
            } finally {
                lock.unlock();
            }
        }
    }

    /** Forward iterator */
    private class Itr extends AbstractItr {
        Node<E> firstNode() { return first; }
        Node<E> nextNode(Node<E> n) { return n.next; }
    }

    /** Descending iterator */
    private class DescendingItr extends AbstractItr {
        Node<E> firstNode() { return last; }
        Node<E> nextNode(Node<E> n) { return n.prev; }
    }

这里看个例子

package com.linkedblockingdeque;

import java.util.Iterator;

public class Test01 {
    public static void main(String[] args) {
        LinkedBlockingDeque<Integer> lbd = new LinkedBlockingDeque<>();
        lbd.addLast(5);
        Iterator iterator = lbd.iterator();
        System.out.println(iterator.hasNext());
        lbd.remove(5);
        System.out.println(iterator.next());

        Iterator iterator1 = lbd.iterator();
        System.out.println(iterator1.hasNext());
    }
}

输出如下: 可以看到我们在删除5这个节点后居然用这个iterator还可以输出5这个节点. 用下面这个图来说明一下在此双链表中删除一个节点会让该节点指向自己.

true
5
false
图片.png

疑问

最后留个疑问
为什么LinkedBlockingQueue是用两个锁来实现,而LinkedBlockingDeque是用一个锁来实现.

参考

1. Java1.8 源码.
2. Java并发编程的艺术

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容