引言
JDK中除了上文提到的各种并发容器,还提供了丰富的阻塞队列。阻塞队列统一实现了BlockingQueue 接口,BlockingQueue 接口在java.util包Queue 接口的基础上提供了put(e)以及take()两个阻塞方法。他的主要使用场景就是多线程下的生产者消费者模式,生产者线程通过put(e)方法将生产元素,消费者线程通过take()消费元素。除了阻塞功能,BlockingQueue 接口还定义了定时的offer以及poll,以及一次性移除方法drainTo。
//插入元素,队列满后会抛出异常booleanadd(E e);//移除元素,队列为空时会抛出异常Eremove();//插入元素,成功反会truebooleanoffer(E e);//移除元素Epoll();//插入元素,队列满后会阻塞voidput(E e)throwsInterruptedException;//移除元素,队列空后会阻塞Etake()throwsInterruptedException;//限时插入booleanoffer(E e,longtimeout, TimeUnit unit)//限时移除Epoll(longtimeout, TimeUnit unit);//获取所有元素到Collection中intdrainTo(Collection c);
JDK1.8中的阻塞队列实现共有7个,分别是ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、SynchronousQueue、LinkedTransferQueue以及LinkedBlockingDeque,下面就来一一对他们进行一个简单的分析。
ArrayBlockingQueue
ArrayBlockingQueue是一个底层用数组实现的有界阻塞队列,有界是指他的容量大小是固定的,不能扩充容量,在初始化时就必须确定队列大小。它通过可重入的独占锁ReentrantLock来控制并发,Condition来实现阻塞。
//通过数组来存储队列中的元素final Object[] items;//初始化一个固定的数组大小,默认使用非公平锁来控制并发publicArrayBlockingQueue(intcapacity){this(capacity,false);}//初始化固定的items数组大小,初始化notEmpty以及notFull两个Condition来控制生产消费publicArrayBlockingQueue(intcapacity, boolean fair){if(capacity <=0)thrownewIllegalArgumentException();this.items =newObject[capacity];lock=newReentrantLock(fair);//通过ReentrantLock来控制并发notEmpty =lock.newCondition(); notFull =lock.newCondition();}
可以看到ArrayBlockingQueue初始化了一个ReentrantLock以及两个Condition,用来控制并发下队列的生产消费。这里重点看下阻塞的put以及take方法:
//插入元素到队列中publicvoidput(E e) throws InterruptedException{ checkNotNull(e); final ReentrantLocklock=this.lock;lock.lockInterruptibly();//获取独占锁try{while(count == items.length)//如果队列已满则通过await阻塞put方法notFull.await(); enqueue(e);//插入元素}finally{lock.unlock(); }}privatevoidenqueue(E x){// assert lock.getHoldCount() == 1;// assert items[putIndex] == null;final Object[] items =this.items; items[putIndex] = x;if(++putIndex == items.length)//插入元素后将putIndex+1,当队列使用完后重置为0putIndex =0; count++; notEmpty.signal();//队列添加元素后唤醒因notEmpty等待的消费线程}//移除队列中的元素publicEtake() throws InterruptedException{ final ReentrantLocklock=this.lock;lock.lockInterruptibly();//获取独占锁try{while(count ==0)//如果队列已空则通过await阻塞take方法notEmpty.await();returndequeue();//移除元素}finally{lock.unlock(); }}privateEdequeue(){// assert lock.getHoldCount() == 1;// assert items[takeIndex] != null;final Object[] items =this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] =null;if(++takeIndex == items.length)//移除元素后将takeIndex+1,当队列使用完后重置为0takeIndex =0; count--;if(itrs !=null) itrs.elementDequeued(); notFull.signal();//队列消费元素后唤醒因notFull等待的消费线程returnx;}
在队列添加和移除元素的过程中使用putIndex、takeIndex以及count三个变量来控制生产消费元素的过程,putIndex负责记录下一个可添加元素的下标,takeIndex负责记录下一个可移除元素的下标,count记录了队列中的元素总量。队列满后通过notFull.await()来阻塞生产者线程,消费元素后通过notFull.signal()来唤醒阻塞的生产者线程。队列为空后通过notEmpty.await()来阻塞消费者线程,生产元素后通过notEmpty.signal()唤醒阻塞的消费者线程。
限时插入以及移除方法在ArrayBlockingQueue中通过awaitNanos来实现,在给定的时间过后如果线程未被唤醒则直接返回。
publicbooleanoffer(E e,longtimeout, TimeUnit unit) throws InterruptedException{ checkNotNull(e);longnanos = unit.toNanos(timeout);//获取定时时长final ReentrantLocklock=this.lock;lock.lockInterruptibly();try{while(count == items.length) {if(nanos <=0)//指定时长过后,线程仍然未被唤醒则返回falsereturnfalse; nanos = notFull.awaitNanos(nanos);//指定时长内阻塞线程} enqueue(e);returntrue; }finally{lock.unlock(); }}
还有一个比较重要的方法:drainTo,drainTo方法可以一次性获取队列中所有的元素,它减少了锁定队列的次数,使用得当在某些场景下对性能有不错的提升。
publicintdrainTo(Collection c,intmaxElements){ checkNotNull(c);if(c ==this)thrownewIllegalArgumentException();if(maxElements <=0)return0; final Object[] items =this.items; final ReentrantLocklock=this.lock;//仅获取一次锁lock.lock();try{intn = Math.min(maxElements, count);//获取队列中所有元素inttake = takeIndex;inti =0;try{while(i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x);//循环插入元素items[take] =null;if(++take == items.length) take =0; i++; }returnn; }finally{// Restore invariants even if c.add() threwif(i >0) { count -= i; takeIndex = take;if(itrs !=null) {if(count ==0) itrs.queueIsEmpty();elseif(i > take) itrs.takeIndexWrapped(); }for(; i >0&&lock.hasWaiters(notFull); i--) notFull.signal();//唤醒等待的生产者线程} } }finally{lock.unlock(); }}
LinkedBlockingQueue
LinkedBlockingQueue是一个底层用单向链表实现的有界阻塞队列,和ArrayBlockingQueue一样,采用ReentrantLock来控制并发,不同的是它使用了两个独占锁来控制消费和生产。put以及take方法源码如下:
publicvoidput(E e)throwsInterruptedException {intc = -1; Node node =newNode(e);finalReentrantLock putLock =this.putLock;//因为使用了双锁,需要使用AtomicInteger计算元素总量,避免并发计算不准确finalAtomicIntegercount=this.count; putLock.lockInterruptibly();try{while(count.get() == capacity) { notFull.await();//队列已满,阻塞生产线程} enqueue(node);//插入元素到队列尾部c =count.getAndIncrement();//count + 1if(c +1< capacity)//如果+1后队列还未满,通过其他生产线程继续生产notFull.signal(); }finally{ putLock.unlock(); }if(c ==0)//只有当之前是空时,消费队列才会阻塞,否则是不需要通知的signalNotEmpty(); }privatevoidenqueue(Node node) {//将新元素添加到链表末尾,然后将last指向尾部元素last = last.next= node;}publicE take()throwsInterruptedException { E x;intc = -1;finalAtomicIntegercount=this.count;finalReentrantLock takeLock =this.takeLock; takeLock.lockInterruptibly();try{while(count.get() ==0) { notEmpty.await();//队列为空,阻塞消费线程} x = dequeue();//消费一个元素c =count.getAndDecrement();//count - 1if(c >1)// 通知其他等待的消费线程继续消费notEmpty.signal(); }finally{ takeLock.unlock(); }if(c == capacity)//只有当之前是满的,生产队列才会阻塞,否则是不需要通知的signalNotFull();returnx;}//消费队列头部的下一个元素,同时将新头部置空privateE dequeue() { Node h = head; Node first = h.next; h.next= h;// help GChead = first; E x = first.item; first.item =null;returnx;}
可以看到LinkedBlockingQueue通过takeLock和putLock两个锁来控制生产和消费,互不干扰,只要队列未满,生产线程可以一直生产,只要队列不为空,消费线程可以一直消费,不会相互因为独占锁而阻塞。
看过了LinkedBlockingQueue以及ArrayBlockingQueue的底层实现,会发现一个问题,正常来说消费者和生产者可以并发执行对队列的吞吐量会有比较大的提升,那么为什么ArrayBlockingQueue中不使用双锁来实现队列的生产和消费呢?我的理解是ArrayBlockingQueue也能使用双锁来实现功能,但由于它底层使用了数组这种简单结构,相当于一个共享变量,如果通过两个锁,需要更加精确的锁控制,这也是为什么JDK1.7中的ConcurrentHashMap使用了分段锁来实现,将一个数组分为多个数组来提高并发量。LinkedBlockingQueue不存在这个问题,链表这种数据结构头尾节点都相对独立,存储上也不连续,双锁控制不存在复杂性。这是我的理解,如果你有更好的结论,请留言探讨。
PriorityBlockingQueue
PriorityBlockingQueue是一个底层由数组实现的无界队列,并带有排序功能,同样采用ReentrantLock来控制并发。由于是无界的,所以插入元素时不会阻塞,没有队列满的状态,只有队列为空的状态。通过这两点特征其实可以猜测它应该是有一个独占锁(底层数组)和一个Condition(只通知消费)来实现的。put以及take方法源码分析如下:
publicvoidput(E e){ offer(e);}publicbooleanoffer(E e){if(e ==null)thrownewNullPointerException(); final ReentrantLocklock=this.lock;lock.lock();intn, cap; Object[] array;//无界队列,队列长度不够时会扩容while((n = size) >= (cap = (array = queue).length)) tryGrow(array, cap);try{//通过comparator来实现优先级排序Comparator cmp = comparator;if(cmp ==null) siftUpComparable(n, e, array);elsesiftUpUsingComparator(n, e, array, cmp); size = n +1; notEmpty.signal();//和ArrayBlockingQueue一样,每次添加元素后通知消费线程}finally{lock.unlock(); }returntrue;}publicEtake() throws InterruptedException{ final ReentrantLocklock=this.lock;lock.lockInterruptibly(); E result;try{while( (result = dequeue()) ==null) notEmpty.await();//队列为空,阻塞消费线程}finally{lock.unlock(); }returnresult;}
DelayQueue
DelayQueue也是一个无界队列,它是在PriorityQueue基础上实现的,先按延迟优先级排序,延迟时间短的排在前面。和PriorityBlockingQueue相似,底层也是数组,采用一个ReentrantLock来控制并发。由于是无界的,所以插入元素时不会阻塞,没有队列满的状态。能想到的最简单的使用场景一般有两个:一个是缓存过期,一个是定时执行的任务。但由于是无界的,缓存过期上一般使用的并不多。简单来看下put以及take方法:
privatefinal transient ReentrantLocklock=newReentrantLock();privatefinal PriorityQueue q =newPriorityQueue();//优先级队列publicvoidput(E e){ offer(e);}publicbooleanoffer(E e){ final ReentrantLocklock=this.lock;lock.lock();try{ q.offer(e);//插入元素到优先级队列if(q.peek() == e) {//如果插入的元素在队列头部leader =null; available.signal();//通知消费线程}returntrue; }finally{lock.unlock(); }}publicEtake() throws InterruptedException{ final ReentrantLocklock=this.lock;lock.lockInterruptibly();try{for(;;) { E first = q.peek();//获取头部元素if(first ==null) available.await();//空队列阻塞else{longdelay = first.getDelay(NANOSECONDS);//检查元素是否延迟到期if(delay <=0)returnq.poll();//到期则弹出元素first =null;// don't retain ref while waitingif(leader !=null) available.await();else{ Thread thisThread = Thread.currentThread(); leader = thisThread;try{ available.awaitNanos(delay);//阻塞未到期的时间}finally{if(leader == thisThread) leader =null; } } } } }finally{if(leader ==null&& q.peek() !=null) available.signal();lock.unlock(); }}
SynchronousQueue
SynchronousQueue相比较之前的4个队列就比较特殊了,它是一个没有容量的队列,也就是说它内部时不会对数据进行存储,每进行一次put之后必须要进行一次take,否则相同线程继续put会阻塞。这种特性很适合做一些传递性的工作,一个线程生产,一个线程消费。内部分为公平和非公平访问两种模式,默认使用非公平,未使用锁,全部通过CAS操作来实现并发,吞吐量非常高。这里只对它的非公平实现下的take和put方法做下简单分析:
//非公平情况下调用内部类TransferStack的transfer方法putpublicvoidput(E e)throwsInterruptedException {if(e ==null)thrownewNullPointerException();if(transferer.transfer(e,false,0) ==null) { Thread.interrupted();thrownewInterruptedException(); }}//非公平情况下调用内部类TransferStack的transfer方法takepublicE take()throwsInterruptedException { E e = transferer.transfer(null,false,0);if(e !=null)returne; Thread.interrupted();thrownewInterruptedException();}//具体的put以及take方法,只有E的区别,通过E来区别REQUEST还是DATA模式E transfer(E e,booleantimed,longnanos) { SNode s =null;// constructed/reused as neededintmode = (e ==null) ? REQUEST : DATA;for(;;) { SNode h = head;//栈无元素或者元素和插入的元素模式相匹配,也就是说都是插入元素if(h ==null|| h.mode == mode) {//有时间限制并且超时if(timed && nanos <=0) {if(h !=null&& h.isCancelled()) casHead(h, h.next);// 重新设置头节点elsereturnnull; }//未超时cas操作尝试设置头节点elseif(casHead(h, s = snode(s, e, h, mode))) {//自旋一段时间后未消费元素则挂起put线程SNode m = awaitFulfill(s, timed, nanos);if(m == s) {// wait was cancelledclean(s);returnnull; }if((h = head) !=null&& h.next== s) casHead(h, s.next);// help s's fulfillerreturn(E) ((mode == REQUEST) ? m.item : s.item); } }//栈不为空并且和头节点模式不匹配,存在元素则消费元素并重新设置head节点elseif(!isFulfilling(h.mode)) {// try to fulfillif(h.isCancelled())// already cancelledcasHead(h, h.next);// pop and retryelseif(casHead(h, s=snode(s, e, h, FULFILLING|mode))) {for(;;) {// loop until matched or waiters disappearSNode m = s.next;// m is s's matchif(m ==null) {// all waiters are gonecasHead(s,null);// pop fulfill nodes =null;// use new node next timebreak;// restart main loop} SNode mn = m.next;if(m.tryMatch(s)) { casHead(s, mn);// pop both s and mreturn(E) ((mode == REQUEST) ? m.item : s.item); }else// lost matchs.casNext(m, mn);// help unlink} } }//节点正在匹配阶段 else{// help a fulfillerSNode m = h.next;// m is h's matchif(m ==null)// waiter is gonecasHead(h,null);// pop fulfilling nodeelse{ SNode mn = m.next;if(m.tryMatch(h))// help matchcasHead(h, mn);// pop both h and melse// lost matchh.casNext(m, mn);// help unlink} } }}//先自旋后挂起的核心方法SNode awaitFulfill(SNode s,booleantimed,longnanos) {finallongdeadline = timed ? System.nanoTime() + nanos :0L; Thread w = Thread.currentThread();//计算自旋的次数intspins = (shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) :0);for(;;) {if(w.isInterrupted()) s.tryCancel(); SNode m = s.match;//匹配成功过返回节点if(m !=null)returnm;//超时控制if(timed) { nanos = deadline - System.nanoTime();if(nanos <=0L) { s.tryCancel();continue; } }//自旋检查,是否进行下一次自旋if(spins >0) spins = shouldSpin(s) ? (spins-1) :0;elseif(s.waiter ==null) s.waiter = w;// establish waiter so can park next iterelseif(!timed) LockSupport.park(this);//在这里挂起线程elseif(nanos > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanos); }}
代码非常复杂,这里说下我所理解的核心逻辑。代码中可以看到put以及take方法都是通过调用transfer方法来实现的,然后通过参数mode来区别,在生产元素时如果是同一个线程多次put则会采取自旋的方式多次尝试put元素,可能自旋过程中元素会被消费,这样可以及时put,降低线程挂起的性能损耗,高吞吐量的核心也在这里,消费线程一样,空栈时也会先自旋,自旋失败然后通过线程的LockSupport.park方法挂起。
LinkedTransferQueue
LinkedTransferQueue是一个无界的阻塞队列,底层由链表实现。虽然和LinkedBlockingQueue一样也是链表实现的,但并发控制的实现上却很不一样,和SynchronousQueue类似,采用了大量的CAS操作,没有使用锁,由于是无界的,所以不会put生产线程不会阻塞,只会在take时阻塞消费线程,消费线程挂起时同样使用LockSupport.park方法。
LinkedTransferQueue相比于以上的队列还提供了一些额外的功能,它实现了TransferQueue接口,有两个关键方法transfer(E e)和tryTransfer(E e)方法,transfer在没有消费时会阻塞,tryTransfer在没有消费时不会插入到队列中,也不会等待,直接返回false。
privatestaticfinalintNOW =0;// for untimed poll, tryTransferprivatestaticfinalintASYNC =1;// for offer, put, addprivatestaticfinalintSYNC =2;// for transfer, takeprivatestaticfinalintTIMED =3;// for timed poll, tryTransfer//通过SYNC状态来实现生产阻塞publicvoidtransfer(E e)throwsInterruptedException{if(xfer(e,true, SYNC,0) !=null) { Thread.interrupted();// failure possible only due to interruptthrownewInterruptedException(); }}//通过NOW状态跳过添加元素以及阻塞publicbooleantryTransfer(E e){returnxfer(e,true, NOW,0) ==null;}//通过ASYNC状态跳过阻塞publicvoidput(E e){ xfer(e,true, ASYNC,0);}//通过SYNC状态来实现消费阻塞publicEtake()throwsInterruptedException{ E e = xfer(null,false, SYNC,0);if(e !=null)returne; Thread.interrupted();thrownewInterruptedException();}//生产消费调用同一个方法,通过e是否为空,haveData,how等参数来区分具体逻辑privateExfer(E e,booleanhaveData,inthow,longnanos){if(haveData && (e ==null))thrownewNullPointerException(); Node s =null;// the node to append, if neededretry:for(;;) {// restart on append race//找出第一个可用节点for(Node h = head, p = h; p !=null;) {// find & match first nodebooleanisData = p.isData; Object item = p.item;//队列为空时直接跳过if(item != p && (item !=null) == isData) {// unmatched//节点类型相同,跳过if(isData == haveData)// can't matchbreak;if(p.casItem(item, e)) {// matchfor(Node q = p; q != h;) { Node n = q.next;// update by 2 unless singletonif(head == h && casHead(h, n ==null? q : n)) { h.forgetNext();break; }// advance and retryif((h = head) ==null|| (q = h.next) ==null|| !q.isMatched())break;// unless slack < 2} LockSupport.unpark(p.waiter);returnLinkedTransferQueue.cast(item); } } Node n = p.next; p = (p != n) ? n : (h = head);// Use head if p offlist}//插入节点或移除节点具体逻辑//tryTransfer方法会直接跳过并返回结果if(how != NOW) {// No matches availableif(s ==null) s =newNode(e, haveData); Node pred = tryAppend(s, haveData);//加入节点if(pred ==null)continueretry;// lost race vs opposite modeif(how != ASYNC)//自旋以及阻塞消费线程逻辑,和SynchronousQueue类似,先尝试自选,失败后挂起线程//transfer方法在没有消费线程时也会阻塞在这里returnawaitMatch(s, pred, e, (how == TIMED), nanos); }returne;// not waiting}}
LinkedBlockingDeque
LinkedBlockingDeque是一个有界的双端队列,底层采用一个双向的链表来实现,在LinkedBlockingQeque的Node 实现多了指向前一个节点的变量prev。并发控制上和ArrayBlockingQueue类似,采用单个ReentrantLock来控制并发,这里是因为双端队列头尾都可以消费和生产,所以使用了一个共享锁。它实现了BlockingDeque接口,继承自BlockingQueue接口,多了addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast等方法,用来头尾生产和消费。LinkedBlockingDeque的实现代码比较简单,基本就是综合了LinkedBlockingQeque和ArrayBlockingQueue的代码逻辑,这里就不做分析了。
总结
文章对JDK1.8中的7种阻塞队列都做了简单分析,帮助大家大致梳理的这7个队列的基本原理。总的来说每种阻塞队列都有它自己的应用场景,使用时可以先根据有界还是无界,然后在根据各自的特性来进行选择。
有界阻塞队列包括:ArrayBlockingQueue、LinkedBlockingQueue以及LinkedBlockingDeque三种,LinkedBlockingDeque应用场景很少,一般用在“工作窃取”模式下。ArrayBlockingQueue和LinkedBlockingQueue基本就是数组和链表的区别。无界队列包括PriorityBlockingQueue、DelayQueue和LinkedTransferQueue。PriorityBlockingQueue用在需要排序的队列中。DelayQueue可以用来做一些定时任务或者缓存过期的场景。LinkedTransferQueue则相比较其他队列多了transfer功能。最后剩下一个不存储元素的队列SynchronousQueue,用来处理一些高效的传递性场景。