SynchronousQueue是一个无存储空间的阻塞队列,是实现newFixedThreadPool的核心,它也分为公平和非公平,因为是无存储空间的,所以与其他阻塞队列实现不同的是,这个阻塞peek方法直接返回null,无任何其他操作,其他的方法与阻塞队列的其他方法一致。这个队列的特点是,必须先调用take或者poll方法,才能使用off,add方法,下面看源码级实现。
从构造函数来看,底层是一个Transfer对象,公平的阻塞队列使用的是TransferQueue实现,非公平使用的是TransferStack实现。
这个队列方法的实现基本都是调用Transfer的实现的方法。
这里面request是代表消费者,data是代表生产者,而FULFILING是拥来判断当前队列是否有request,如果有则生产者数据进来才能被消费掉,不然直接返回null
先来看看take和poll方法,里面调用的是transferer方法,第一个参数决定了是生产者还是消费者,第二个参数用于判断是否阻塞一端时间,第三个参数阻塞时长。如果是消费者,则会返回消费者数据本身,如果返回的是null,则直接通过抛出异常方式来终止线程
一进来首先通过第一个参数用来判断是生产者还是消费者。用来阻塞的对象是Snode这个对象
SNode对象里面第一个是链指向下一个snode,match则保存的是与之匹配的对方角色,比如如果snode是生产者,则会保存一个消费者,然后这两个凑成一对返回。第三个线程是保存的当前线程,用来控制park和unpark的,因为我们知道,park和unpark是基于一个线程的,也就是说对线程a调用了park后,只能通过对线程a调用unpark才能释放掉a线程,第四个对象item保存的是数据,第五个Mode是拥来保存区别是生产者还是消费者的
首先拿到head,这里的head对象代表着栈顶的一个等待消费者,如果为null,或者如果不为null,则判断数据身份是否一致(是否为生产者或者消费者),如果一致则继续判断是否需要阻塞等待,如果不阻塞等待,或者阻塞等待,但是阻塞时间还没有到,则先把当前请求的对象包装为一个snode,然后通过CAS压入栈顶。这里封装的时候,第一个参数用来判断当前s是否被包装,如果已经被包装则直接执行替换操作,如果未被包装则直接new一个Snode,然后在进行赋值,这里主要是赋值是生产者还是消费者,第二个赋值是指向当前栈顶的元素,然后返回,用CAS压入栈顶。接下来调用awaitFulfill方法进行自旋,这里自旋会根据系统CPU核数,如果是2核以上,则自旋16*32,如果不是则直接调用park把当前线程挂起。
如下图所示,不停调用take或者poll方法入栈是下面这样一个过程
下面是awaitFull方法的源码,通过shouldSpin来进行判断是否需要自旋,自旋的条件是这个node在当前的栈顶。或者不在栈顶判断是否可以请求或者消费数据。
自旋计数是通过下面的spins=shouldSpin(s)?(spins-1):0来完成的。如果自旋结束了最后先保存当前线程到snode对象里面去,然后调用park命令挂起。这样就完成了消费者入队。
下面我们看请求数据,offer方法,这下面也是调用了transfer方法
代码和上面take方法调用的是一个方法。这里通过第二行就完成了判断是一个消费者。然后如果栈里没有等待的消费者,则直接就返回了Null,而如果有等待的生产者,则会跳到下面的判断调用isFulfilling方法,这个方法通过比较mode来判断是否可以消费。
如果可以,则把当前对象包装为一个snode,然后压入栈顶,然后如果压入的生产者下一个节点为null,说明没有消费者,然后就直接把所有的重新置为null,中断循环。如果有的话,则继续拿到消费者的下一个消费者。举例比如栈中有1,2,3消费者,然后新来的生产者s(a)压入栈顶,然后a的下一个指向(m)1,这时候呢(mn)指向了2,这是因为生产者a和消费者1要同步出栈,然后就把2推到了栈顶。这里调用(m)1的tryMatch,并传入了s(a),
这里很简单,一进来时候进来的是(m)1的对象内部,这时候match肯定是null,之前没有设置过,这时候就把这个match替换为生产者(s),然后拿到阻塞的线程,调用unpark方法释放线程。然后返回true。如果不为Null则判断这个消费者里面保存的生产者是否是传入进来的生产者。
如果匹配成功,则直接把(mn)2推到栈顶,然后返回请求的数据。如果不匹配则向栈的下层去寻找,直到找到为止。
如下所示,a代表生产者,1 2 3代表消费者,这样就完成了一次请求。