读LinkedBlockingQueue源码记录

简介

    LinkedBlockingQueue阻塞队列大小的配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的。说是无边界,其实是采用了默认大小为Integer.MAX_VALUE的容量 。它的内部实现是一个链表。

  和ArrayBlockingQueue一样,LinkedBlockingQueue 也是以先进先出的方式存储数据,最新插入的对象是尾部,最新移出的对象是头部。

探索LinkedBlockingQueue

1.主要属性

static class Node<E> {
    //数据存储
    E item; 
    //后继节点
    Node<E> next;   
    Node(E x) { item = x; }
}
//最大容量,默认为Integer.MAX_VALUE
private final int capacity;
//当前队列中的元素数量,与ArrayBlockingQueue不同
private final AtomicInteger count = new AtomicInteger(0);
//头节点,head.item恒为null 
private transient Node<E> head;
//尾节点,last.next恒为null
private transient Node<E> last;
//出列锁
private final ReentrantLock takeLock = new ReentrantLock();
//非空条件
private final Condition notEmpty = takeLock.newCondition();
//入列锁
private final ReentrantLock putLock = new ReentrantLock();
//非满条件
private final Condition notFull = putLock.newCondition();

从上面的属性可以看出,LinkedBlockingQueue主要以链表的方式进行数据存储,出列与入列通过不同的锁进行线程安全控制。

2.初始化

接着看构造方法。

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);//默认最大容量为Integer.MAX_VALUE
}
public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //初始化收尾节点
    last = head = new Node<E>(null);
}
public LinkedBlockingQueue(Collection<? extends E> c) {
    this(Integer.MAX_VALUE);
    //加入列锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock(); 
    try {
        int n = 0;
        //遍历入列
        for (E e : c) {
            if (e == null)
                throw new NullPointerException();
            if (n == capacity)
                throw new IllegalStateException("Queue full");
            //处理尾节点
            enqueue(new Node<E>(e));
            ++n;
        }
        count.set(n);
    } finally {
        putLock.unlock();
    }
}

再看enqueue()方法:可读性不是很好,分析后,可以等价于last.next = node; last = node;即将node链接到当前队列的末尾,然后将末节点设置为当前node。

private void enqueue(Node<E> node) {
    last = last.next = node;//等价于 last.next = node; last = node;
}

3.入列

入列方法有offer(),和put()。

offer()方法:作用是将元素e插入到阻塞队列的尾部,如果队满,返回false,即插入失败。否则,插入元素。


public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    //获取当前元素数量
    final AtomicInteger count = this.count;
    //等于队列的最大容量,返回false
    if (count.get() == capacity)
        return false;
    int c = -1; //-1标识入列失败
    Node<E> node = new Node(e);
    //加锁
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        //判断当前数量是否小于最大容量
        if (count.get() < capacity) {
            //尾节点处理
            enqueue(node);
            //当前容量+1
            c = count.getAndIncrement();
            //判断当前容量是否大于最大容量,如果小于最大容量,唤醒其他入列线程。
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    // c!=-1即已经走了上面的代码的逻辑,所以此时队列长度不为0
    // 唤醒等待在NotEmpty条件的线程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

再看 signalNotEmpty()方法:主要是唤醒等待在notEmpty线程的出列操作。

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

offer()同样有重载一个延迟等待方法,可以看到该方法只是多了等待时间,如果超时就返回false,其他与offer()的逻辑一样。

public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    //获取等待时间(纳秒)
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    //设置线程可中断
    putLock.lockInterruptibly();
    try {
        while (count.get() == capacity) {
            if (nanos <= 0)
                return false;
            //设置最长等待时间
            nanos = notFull.awaitNanos(nanos);
        }
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return true;
}

再看put()方法:与offer(E e, long timeout, TimeUnit unit)不同的地方在于,当队满时无限等待,这里不做多解释。

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        //当前队列满,则持续等待。
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node);
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

4.出列

出列的方法有take(),poll()等,由于逻辑基本相似,这里只分析take()方法。

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    //加可中断锁
    takeLock.lockInterruptibly();
    try {
        //当前队列长度为0,线程等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        //出列操作
        x = dequeue();
        //队列长度减一
        c = count.getAndDecrement();
        //队列长度还大于1,唤醒其他出队线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // c!=-1 说明已经执行上面的逻辑,此时队列长度可能小于最大长度,唤醒等待在NotFull入队线程
    if (c == capacity)
        signalNotFull();
    return x;
}

再看dequeue()方法:处理头结点的方法,获取第一个元素后,断开链表,让第一个结点作为头结点以及将其的数据部分设置为空,同时让原来的头结点自关联帮助GC。

这里我觉得有点奇怪,为什么不直接把first结点断开,然后h.next = first.next. 最后将first.next=null?

private E dequeue() {
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // 将头结点的后继结点指向自己,有助于GC
    head = first;   //将头结点
    E x = first.item;
    first.item = null;  //头结点具有一致性:即数据为null,后继结点才是真实数据结点
    return x;
}

5.其他方法

remainingCapacity()查看剩余的长度;

size() () 查看当前队列长度;

clear() 清除当前队列所有数据;

....

ArrayBlockingQueue和LinkedBlockingQueue比较

1.ArrayBlockingQueue一个有界的阻塞队列,由于其底层基于数组,初始化后就会立即在内存分配固定大小容量的数组元素;而LinkedBlockingQueue配置是可选的,如果我们初始化时指定一个大小,它就是有边界的,如果不指定,它就是无边界的,默认大小为Integer.MAX_VALUE的容量,由于其节点的创建都是动态创建,并且在节点出队列后可以被GC所回收,因此其具有灵活的伸缩性。

2.LinkedBlockingQueue的读取和插入操作所使用的锁是两个不同的lock,它们之间的操作互相不受干扰,因此两种操作可以并行完成,而ArrayBlockingQueue中在入队列和出队列操作过程中,使用的是同一个lock,所以即使在多核CPU的情况下,其读取和操作的都无法做到并行,而故LinkedBlockingQueue的吞吐量要高于后者。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342