此篇文章讲解一下Netty中的任务队列.这里说的任务队列是Netty中的IO线程对应的任务队列.
在Netty中NioEventLoopGroup这个类相当于线程池,而由它创建的每个NioEventLoop相当于池中的线程,因为每个NioEventLoop都是和唯一的一个线程绑定的,而这个线程只负责IO相关的工作,因此称作IO线程.
在创建NioEventLoop的时候会创建一个与之关联的任务队列(Queue taskQueue).这个任务队列用于'装载'其他非IO线程向IO线程提交的任务,比如业务线程(即非IO线程)需要向对端写数据,那么业务线程会把写数据这个操作封装成一个任务'丢到'任务队列中,由IO线程将数据写到网络中.
private void write(Object msg, boolean flush, ChannelPromise promise) {
...
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
// 业务线程将写操作封装成Task
final WriteTask task = WriteTask.newInstance(next, m, promise, flush);
// 提交任务到IO线程对应的任务队列中
if (!safeExecute(executor, task, promise, m, !flush)) {
task.cancel();
}
}
}
private static boolean safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg, boolean lazy) {
// 将任务提交到IO线程的任务队列中
executor.execute(runnable);
}
首先看下这个taskQueue是由谁实现的
// 实例化NioEventLoop
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
// newTaskQueue(queueFactory)会实现具体的任务队列
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory), rejectedExecutionHandler);
this.provider = ObjectUtil.checkNotNull(selectorProvider, "selectorProvider");
this.selectStrategy = ObjectUtil.checkNotNull(strategy, "selectStrategy");
final SelectorTuple selectorTuple = openSelector();
this.selector = selectorTuple.selector;
this.unwrappedSelector = selectorTuple.unwrappedSelector;
}
private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory
if (queueFactory == null) {
// 流程会走到这里
return newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
}
return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
}
private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
// 流程会继续调用 PlatformDependent.<Runnable>newMpscQueue()
return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue()
: PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks);
}
public static <T> Queue<T> newMpscQueue() {
return Mpsc.newMpscQueue();
}
static <T> Queue<T> newMpscQueue() {
// 流程会执行第一个new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
// MPSC_CHUNK_SIZE=1024
return USE_MPSC_CHUNKED_ARRAY_QUEUE ? new MpscUnboundedArrayQueue<T>(MPSC_CHUNK_SIZE)
: new MpscUnboundedAtomicArrayQueue<T>(MPSC_CHUNK_SIZE);
}
上面说了这么多,只是想说明taskQueue具体的实现
Queue<Runnable> taskQueue = new MpscUnboundedArrayQueue<T>(1024)
接下来具体分析下MpscUnboundedArrayQueue
org.jctools.queues.MpscUnboundedArrayQueue
在Netty之前的版本中,taskQueue是Netty自身实现它的.但是后面版本就将这个taskQueue的实现'交由'JCTools下的类来实现了.
<dependency>
<groupId>org.jctools</groupId>
<artifactId>jctools-core</artifactId>
<version>3.1.0</version>
</dependency>
在Netty中,多个Netty客户端连接Netty服务端的时候,Netty服务端中的一个IO线程会负责处理多个客户端.
如上图所示,IO线程-1负责处理Netty客户端-1和Netty客户端-2的读写请求.当多个业务线程需要向对端写数据的时候,会把写操作封装成任务'丢到'IO线程-1的任务队列中.
Netty中的线程有个特别的地方,就是一个IO线程会对应多个业务线程,业务线程就是生产者,IO线程就是消费者,它消费业务线程'生产'的任务.属于单消费者多生产者模式.通过类的名称MpscUnboundedArrayQueue可以看出来,这个类就是为多生产者(MultiProducer)单消费者(SingleConsumer)设计的.
MpscUnboundedArrayQueue的底层使用数组的形式存储元素.
// MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
// 转成2^n
int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
long mask = (p2capacity - 1) << 1;
// 存储元素的底层数组
E[] buffer = allocateRefArray(p2capacity + 1);
producerBuffer = buffer;
producerMask = mask;
consumerBuffer = buffer;
consumerMask = mask;
soProducerLimit(mask);
}
从全局的角度看下MpscUnboundedArrayQueue的底层结构,这里假设initialCapacity=4
虽然设置的初始容量大小=4,但是当存放的元素大于4的时候,就会新创建一个与之前同等大小的数组,然后'挂接'到之前的数组. 当再次'装载'不了新放入的元素时候,会再次新创建一个数组'挂接'到之前的数组,以此类推.最后形成一个数组+链表的结构.
结合源码分析下.以下假设初始容量大小initialCapacity=4
// MpscUnboundedArrayQueue继承BaseMpscLinkedArrayQueue
public BaseMpscLinkedArrayQueue(final int initialCapacity) {
// p2capacity = 4
int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
// 转成二进制mask=110
long mask = (p2capacity - 1) << 1;
// 存储元素的底层数组大小=4+1=5
E[] buffer = allocateRefArray(p2capacity + 1);
// 指向生产者的数组
producerBuffer = buffer;
producerMask = mask;
// 指向消费者的数组
consumerBuffer = buffer;
consumerMask = mask;
// producerLimit=mask=110
soProducerLimit(mask);
}
构造方法中,虽然设置的初始容量=4,但是在初始化底层数组的时候,分配的大小=5.从上面的图中可以看出,上一个数组为了指向下一个数组,因此数组在设计的时候就必须要多出来一个元素,用于指向下一个数组.
当提交元素的时候,代码如下所示,调用offer方法
MpscUnboundedArrayQueue<Integer> queue = new MpscUnboundedArrayQueue<>(4);
queue.offer(1);
public boolean offer(final E e) {
long mask;
E[] buffer;
long pIndex;
while (true) {
long producerLimit = lvProducerLimit();
pIndex = lvProducerIndex();
// 表示正在扩容
if ((pIndex & 1) == 1) {
continue;
}
mask = this.producerMask;
buffer = this.producerBuffer;
// 如果提交的元素即将超过容量
if (producerLimit <= pIndex) {
int result = offerSlowPath(mask, pIndex, producerLimit);
switch (result) {
case CONTINUE_TO_P_INDEX_CAS:
break;
case RETRY:
continue;
case QUEUE_FULL:
return false;
case QUEUE_RESIZE:
// 扩容
resize(mask, buffer, pIndex, e, null);
return true;
}
}
// +2
if (casProducerIndex(pIndex, pIndex + 2)) {
break;
}
}
// 计算元素在数组中的偏移地址
final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
soRefElement(buffer, offset, e);
return true;
}
首先要明确一点的是,producerIndex(即代码中的pIndex)记录生产者添加元素指向的位置,而且这个位置并不是在数组中的实际下标.
每添加一个元素,producerIndex就会+2.并不是+1.
通过构造方法初始化时,producerLimit=110.
当添加第一个元素之后,pIndex=010
当添加第二个元素之后,pIndex=100
当添加第三个元素之后,pIndex=110
根据上面第16行代码producerLimit <= pIndex满足条件.进入下面的代码
private int offerSlowPath(long mask, long pIndex, long producerLimit) {
final long cIndex = lvConsumerIndex();
long bufferCapacity = getCurrentBufferCapacity(mask);
if (cIndex + bufferCapacity > pIndex) {
if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
return RETRY;
} else {
return CONTINUE_TO_P_INDEX_CAS;
}
} else if (availableInQueue(pIndex, cIndex) <= 0) {
return QUEUE_FULL;
}
// pIndex + 1
else if (casProducerIndex(pIndex, pIndex + 1)) {
// 需要扩容
return QUEUE_RESIZE;
} else {
return RETRY;
}
}
能走到上面的代码,说明此时容器马上满了,需要扩容了,会将pIndex+1.之后就会进入到扩容逻辑.
resize(mask, buffer, pIndex, e, null);
之前的pIndex=110,加1之后,变成pIndex=111.这个时候,其他生产者线程根据(pIndex & 1) == 1判断成立,说明有一个生产者线程正在扩容容器,当前生产者线程需要重试.
也就是说根据最后一个字节,控制是否有生产者线程正在扩容.
public boolean offer(final E e) {
long mask;
E[] buffer;
long pIndex;
while (true) {
long producerLimit = lvProducerLimit();
pIndex = lvProducerIndex();
// 表示有其他线程正在扩容
if ((pIndex & 1) == 1) {
// 重试
continue;
}
...
}
...
}
扩容的线程会重新创建一个新的数组
private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
int newBufferLength = getNextBufferSize(oldBuffer);
final E[] newBuffer;
try {
// 创建新数组
newBuffer = allocateRefArray(newBufferLength);
} catch (OutOfMemoryError oom) {
assert lvProducerIndex() == pIndex + 1;
soProducerIndex(pIndex);
throw oom;
}
producerBuffer = newBuffer;
final int newMask = (newBufferLength - 2) << 1;
producerMask = newMask;
final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);
soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);
// 新老数组进行连接
soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);
final long cIndex = lvConsumerIndex();
final long availableInQueue = availableInQueue(pIndex, cIndex);
soProducerLimit(pIndex + Math.min(newMask, availableInQueue));
// +2之后,保证其他生产者线程可以继续增加元素了
soProducerIndex(pIndex + 2);
// 添加一个JUMP元素
soRefElement(oldBuffer, offsetInOld, JUMP);
}
在扩容的时候,会添加一个JUMP元素,这个元素是用来告诉消费者,当消费到这类元素的时候,需要跳到下一个数组继续消费.
假设向容器中依次添加1-9这9个元素,它的结构如下
消费者也会按照1-9进行消费.(即添加顺序和消费顺序一致)
在向容器中添加元素的时候,采用如下方式. 根据起始地址+偏移地址,提高添加元素的速度.
static long modifiedCalcCircularRefElementOffset(long index, long mask) {
return REF_ARRAY_BASE + ((index & mask) << (REF_ELEMENT_SHIFT - 1));
}
public static final long REF_ARRAY_BASE;
public static final int REF_ELEMENT_SHIFT;
static {
// 数组中一个元素占用的大小
final int scale = UnsafeAccess.UNSAFE.arrayIndexScale(Object[].class);
if (4 == scale) {
REF_ELEMENT_SHIFT = 2;
} else if (8 == scale) {
REF_ELEMENT_SHIFT = 3;
} else {
throw new IllegalStateException("Unknown pointer size: " + scale);
}
// 数组中第一个元素的偏移地址
REF_ARRAY_BASE = UnsafeAccess.UNSAFE.arrayBaseOffset(Object[].class);
}
此篇简单介绍下Netty中如何使用JCTools中的类在并发场景下提交元素,以及它的底层数据结构. 这种是与传统直接创建一个2倍的数组处理方式不同的.