阻塞队列

一、阻塞队列中常用的方法

  • 第一组:add、remove、element
    1. add 方法
      add 方法是往队列里添加一个元素,如果队列满了,就会抛出异常来提示队列已满
    2. remove 方法
      remove 方法的作用是删除元素,如果我们删除的队列是空的,由于里面什么都没有,所以也无法删除任何元素,那么 remove 方法就会抛出异常。
    3. element 方法
      element 方法是返回队列的头部节点,但是并不删除。和 remove 方法一样,如果我们用这个方法去操作一个空队列,想获取队列的头结点,可是由于队列是空的,我们什么都获取不到,会抛出和前面 remove 方法一样的异常:NoSuchElementException
  • 第二组:offer、poll、peek
    1. offer 方法
      offer 方法用来插入一个元素,并用返回值来提示插入是否成功。如果添加成功会返回 true,而如果队列已经满了,此时继续调用 offer 方法的话,它不会抛出异常,只会返回一个错误提示:false
    2. 带超时时间的 offer 方法offer(E e, long timeout, TimeUnit unit)
      它有三个参数,分别是元素、超时时长和时间单位。通常情况下,这个方法会插入成功并返回 true;如果队列满了导致插入不成功,在调用带超时时间重载方法的 offer 的时候,则会等待指定的超时时间,如果时间到了依然没有插入成功,就会返回 false
    3. poll 方法
      poll 方法和第一组的 remove 方法是对应的,作用也是移除并返回队列的头节点。但是如果当队列里面是空的,没有任何东西可以移除的时候,便会返回 null 作为提示。正因如此,我们是不允许往队列中插入 null 的,否则我们没有办法区分返回的 null 是一个提示还是一个真正的元素
    4. 带超时时间的 poll 方法poll(long timeout, TimeUnit unit)
      如果能够移除,便会立刻返回这个节点的内容;如果队列是空的就会进行等待,等待时间正是我们指定的时间,直到超时时间到了,如果队列里依然没有元素可供移除,便会返回 null 作为提示
    5. peek 方法
      peek 方法和第一组的 element 方法是对应的,意思是返回队列的头元素但并不删除。如果队列里面是空的,它便会返回 null 作为提示
  • 第三组:put、take
    1. put 方法
      put 方法的作用是插入元素。通常在队列没满的时候是正常的插入,但是如果队列已满就无法继续插入,这时它既不会立刻返回 false 也不会抛出异常,而是让插入的线程陷入阻塞状态,直到队列里有了空闲空间,此时队列就会让之前的线程解除阻塞状态,并把刚才那个元素添加进去。
    2. take 方法
      take 方法的作用是获取并移除队列的头结点。通常在队列里有数据的时候会正常取出数据并删除;但是如果执行 take 的时候队列里无数据,则阻塞,直到队列里有数据;一旦队列里有数据了,就会立刻解除阻塞状态,并且取到数据。


二、有哪几种常见的阻塞队列

  • ArrayBlockingQueue
    1. ArrayBlockingQueue 是最典型的有界队列,其内部是用数组存储元素的,利用 ReentrantLock 实现线程安全。
    2. 我们在创建它的时候就需要指定它的容量,之后也不可以再扩容了,在构造函数中我们同样可以指定是否是公平的,代码如下:ArrayBlockingQueue(int capacity, boolean fair)
    3. 第一个参数是容量,第二个参数是是否公平。
    4. 正如 ReentrantLock 一样,如果 ArrayBlockingQueue 被设置为非公平的,那么就存在插队的可能;如果设置为公平的,那么等待了最长时间的线程会被优先处理,其他线程不允许插队,不过这样的公平策略同时会带来一定的性能损耗,因为非公平的吞吐量通常会高于公平的情况。
  • LinkedBlockingQueue
    正如名字所示,这是一个内部用链表实现的 BlockingQueue。如果我们不指定它的初始容量,那么它容量默认就为整型的最大值 Integer.MAX_VALUE,由于这个数非常大,我们通常不可能放入这么多的数据,所以 LinkedBlockingQueue 也被称作无界队列,代表它几乎没有界限。
  • SynchronousQueue
    1. SynchronousQueue 最大的不同之处在于,它的容量为 0,所以没有一个地方来暂存元素,导致每次取数据都要先阻塞,直到有数据被放入;同理,每次放数据的时候也会阻塞,直到有消费者来取。
    2. 需要注意的是,SynchronousQueue 的容量不是 1 而是 0,因为 SynchronousQueue 不需要去持有元素,它所做的就是直接传递,由于每当需要传递的时候,SynchronousQueue 会把元素直接从生产者传给消费者,在此期间并不需要做存储,所以如果运用得当,它的效率是很高的。
    3. 由于它的容量为 0,所以相比于一般的阻塞队列,SynchronousQueue 的很多方法的实现是很有意思的,我们来举几个例子:
      • element 始终会抛出 NoSuchElementException 异常
      • peek 方法永远返回 null
      • size 方法始终返回 0
  • PriorityBlockingQueue
    1. ArrayBlockingQueue 和 LinkedBlockingQueue 都是采用先进先出的顺序进行排序,可是如果有的时候我们需要自定义排序怎么办呢?这时就需要使用 PriorityBlockingQueue。
    2. PriorityBlockingQueue 是一个支持优先级的无界阻塞队列,可以通过自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化时通过构造器参数 Comparator 来指定排序规则。同时,插入队列的对象必须是可比较大小的,也就是 Comparable 的,否则会抛出 ClassCastException 异常。
    3. 它的 take 方法在队列为空的时候会阻塞,但是正因为它是无界队列,而且会自动扩容,所以它的队列永远不会满,所以它的 put 方法永远不会阻塞,添加操作始终都会成功,也正因为如此,它的成员变量里只有一个 Condition:private final Condition notEmpty;
  • DelayQueue
    1. DelayQueue 这个队列比较特殊,具有“延迟”的功能。我们可以设定让队列中的任务延迟多久之后执行,比如 10 秒钟之后执行,这在例如“30 分钟后未付款自动取消订单”等需要延迟执行的场景中被大量使用。
    2. 它是无界队列,放入的元素必须实现 Delayed 接口,而 Delayed 接口又继承了 Comparable 接口,所以自然就拥有了比较和排序的能力,代码如下:
      public interface Delayed extends Comparable<Delayed> {
        long getDelay(TimeUnit unit);
      }
      
    3. 可以看出这个 Delayed 接口继承自 Comparable,里面有一个需要实现的方法,就是 getDelay。这里的 getDelay 方法返回的是“还剩下多长的延迟时间才会被执行”,如果返回 0 或者负数则代表任务已过期。
    4. 元素会根据延迟时间的长短被放到队列的不同位置,越靠近队列头代表越早过期。

三、源码分析

  • ArrayBlockingQueue 源码分析
    1. ArrayBlockingQueue 有以下几个重要的属性
      // 第一个就是最核心的、用于存储元素的 Object 类型的数组;
      // 然后它还会有两个位置变量,分别是 takeIndex 和 putIndex
      // 这两个变量就是用来标明下一次读取和写入位置的;
      // 另外还有一个 count 用来计数,它所记录的就是队列中的元素个数。
      
      // 用于存放元素的数组
      final Object[] items;
      // 下一次读取操作的位置
      int takeIndex;
      // 下一次写入操作的位置
      int putIndex;
      // 队列中的元素数量
      int count;
      
    2. 三个变量
      // 实现并发同步的原理就是利用 ReentrantLock 和它的两个 Condition
      // 读操作和写操作都需要先获取到 ReentrantLock 独占锁才能进行下一步操作
      // 进行读操作时如果队列为空,线程就会进入到读线程专属的 notEmpty 的 Condition 的队列中去排队,等待写线程写入新的元素
      // 同理,如果队列已满,这个时候写操作的线程会进入到写线程专属的 notFull 队列中去排队,等待读线程将队列元素移除并腾出空间      
      
      final ReentrantLock lock;
      private final Condition notEmpty;
      private final Condition notFull;
      
    3. put 方法
      public void put(E e) throws InterruptedException {
          // 检查插入的元素是不是 null
          checkNotNull(e);
          // 如果不是 null,我们会用 ReentrantLock 上锁
          // 并且上锁方法是 lock.lockInterruptibly()
          final ReentrantLock lock = this.lock;
          lock.lockInterruptibly();
          try {
              // 检查当前队列是不是已经满了,也就是 count 是否等于数组的长度。
              // 如果等于就代表已经满了,于是我们便会进行等待
              // 直到有空余的时候,我们才会执行下一步操作,调用 enqueue 方法让元素进入队列
              while (count == items.length)
              notFull.await();
              enqueue(e);
          } finally {
              // 解锁
              lock.unlock();
          }
      }
      
  • 非阻塞队列ConcurrentLinkedQueue
public boolean offer(E e) {
    checkNotNull(e);
    final Node<E> newNode = new Node<E>(e);
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        if (q == null) {
            // p is last node
            if (p.casNext(null, newNode)) {
                if (p != t)
                    casTail(t, newNode);
                return true;
            }
        }
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

在检查完空判断之后,可以看到它整个是一个大的 for 循环,而且是一个非常明显的死循环。在这个循环中有一个非常亮眼的 p.casNext 方法,这个方法正是利用了 CAS 来操作的,而且这个死循环去配合 CAS 也就是典型的乐观锁的思想。我们就来看一下 p.casNext 方法的具体实现,其方法代码如下:

boolean casNext(Node<E> cmp, Node<E> val) {
    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val;
}

这里运用了 UNSAFE.compareAndSwapObject 方法来完成 CAS 操作,而 compareAndSwapObject 是一个 native 方法,最终会利用 CPU 的 CAS 指令保证其不可中断。
可以看出,非阻塞队列 ConcurrentLinkedQueue 使用 CAS 非阻塞算法 + 不停重试,来实现线程安全,适合用在不需要阻塞功能,且并发不是特别剧烈的场景。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
禁止转载,如需转载请通过简信或评论联系作者。
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,445评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,889评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,047评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,760评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,745评论 5 367
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,638评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,011评论 3 398
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,669评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,923评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,655评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,740评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,406评论 4 320
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,995评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,961评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,023评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,483评论 2 342

推荐阅读更多精彩内容