源码分析
NIOEventLoopGroup
NioEventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children [], 默认大小是处理器核数 * 2, 这样就构成了一个线程池,初始化EventExecutor时NioEventLoopGroup重载newChild方法,所以children元素的实际类型为NioEventLoop。
线程启动时调用SingleThreadEventExecutor的构造方法,执行NioEventLoop类的run方法,首先会调用hasTasks()方法判断当前taskQueue是否有元素。如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp) 方法
select ( oldWakenUp) 方法解决了 Nio 中的 bug,selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow(),若触发了epoll的空轮询bug,则会反复执行selector.select(timeoutMillis),变量selectCnt 会逐渐变大,当selectCnt 达到阈值(默认512),则执行rebuildSelector方法,进行selector重建,解决cpu占用100%的bug。
rebuildSelector方法先通过openSelector方法创建一个新的selector。然后将old selector的selectionKey执行cancel。最后将old selector的channel重新注册到新的selector中。rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。
接下来调用processSelectedKeys 方法(处理I/O任务),当selectedKeys != null时,调用processSelectedKeysOptimized方法,迭代 selectedKeys 获取就绪的 IO 事件的selectkey存放在数组selectedKeys中, 然后为每个事件都调用 processSelectedKey 来处理它,processSelectedKey 中分别处理OP_READ;OP_WRITE;OP_CONNECT事件。
最后调用runAllTasks方法(非IO任务),该方法首先会调用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行,然后依次从taskQueue中取任务执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。
每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。
Outbound 事件都是请求事件, 发起者是 Channel,处理者是 unsafe,通过 Outbound 事件进行通知,传播方向是 tail到head。Inbound 事件发起者是 unsafe,事件的处理者是 Channel, 是通知事件,传播方向是从头到尾。
内存管理机制,首先会预申请一大块内存Arena,Arena由许多Chunk组成,而每个Chunk默认由2048个page组成。Chunk通过AVL树的形式组织Page,每个叶子节点表示一个Page,而中间节点表示内存区域,节点自己记录它在整个Arena中的偏移地址。当区域被分配出去后,中间节点上的标记位会被标记,这样就表示这个中间节点以下的所有节点都已被分配了。大于8k的内存分配在poolChunkList中,而PoolSubpage用于分配小于8k的内存,它会把一个page分割成多段,进行内存分配。
ByteBuf的特点:支持自动扩容(4M),保证put方法不会抛出异常、通过内置的复合缓冲类型,实现零拷贝(zero-copy);不需要调用flip()来切换读/写模式,读取和写入索引分开;方法链;引用计数基于AtomicIntegerFieldUpdater用于内存回收;PooledByteBuf采用二叉树来实现一个内存池,集中管理内存的分配和释放,不用每次使用都新建一个缓冲区对象。UnpooledHeapByteBuf每次都会新建一个缓冲区对象。
NioEventLoop与NioChannel类关系
一个NioEventLoopGroup下包含多个NioEventLoop
每个NioEventLoop中包含有一个Selector,一个taskQueue,一个delayedTaskQueue
每个NioEventLoop的Selector上可以注册监听多个AbstractNioChannel.ch
每个AbstractNioChannel只会绑定在唯一的NioEventLoop上
每个AbstractNioChannel都绑定有一个自己的DefaultChannelPipeline
NioEventLoop线程执行过程
轮询监听的IO事件
netty的轮询注册机制
netty将AbstractNioChannel内部的jdk类SelectableChannel对象注册到NioEventLoop中的jdk类Selector对象上去,并且将AbstractNioChannel作为SelectableChannel对象的一个attachment附属上。
这样在Selector轮询到某个SelectableChannel有IO事件发生时,就可以直接取出IO事件对应的AbstractNioChannel进行后续操作。循环执行阻塞selector.select(timeoutMIllis)操作直到以下条件产生
a)轮询到了IO事件(selectedKey != 0)
b)oldWakenUp参数为true
c)任务队列里面有待处理任务(hasTasks())
d)第一个定时任务即将要被执行(hasScheduledTasks())
e)用户主动唤醒(wakenUp.get()==true)解决JDK的NIO epoll bug
该bug会导致Selector一直空轮询,最终导致cpu 100%。
在每次selector.select(timeoutMillis)后,如果没有监听到就绪IO事件,会记录此次select的耗时。如果耗时不足timeoutMillis,说明select操作没有阻塞那么长时间,可能触发了空轮询,进行一次计数。
计数累积超过阈值(默认512)后,开始进行Selector重建:
a)拿到有效的selectionKey集合
b)取消该selectionKey在旧的selector上的事件注册
c)将该selectionKey对应的Channel注册到新的selector上,生成新的selectionKey
d)重新绑定Channel和新的selectionKey的关系netty优化了sun.nio.ch.SelectorImpl类中的selectedKeys和publicSelectedKeys这两个field的实现
netty通过反射将这两个filed替换掉,替换后的field采用数组实现。
这样每次在轮询到nio事件的时候,netty只需要O(1)的时间复杂度就能将SelectionKey塞到set中去,而jdk原有field底层使用的hashSet需要O(lgn)的时间复杂度。
处理IO事件
1)对于boss NioEventLoop来说,轮询到的是基本上是连接事件(OP_ACCEPT):
a)socketChannel = ch.accept();
b)将socketChannel绑定到worker NioEventLoop上;
c)socketChannel在worker NioEventLoop上创建register0任务;
d)pipeline.fireChannelReadComplete();
2)对于worker NioEventLoop来说,轮询到的基本上是IO读写事件(以OP_READ为例):
a)ByteBuffer.allocateDirect(capacity);
b)socketChannel.read(dst);
c)pipeline.fireChannelRead();
d)pipeline.fireChannelReadComplete();
处理任务队列
1)处理用户产生的普通任务
NioEventLoop中的Queue<Runnable> taskQueue被用来承载用户产生的普通Task。
taskQueue被实现为netty的mpscQueue,即多生产者单消费者队列。netty使用该队列将外部用户线程产生的Task聚集,并在reactor线程内部用单线程的方式串行执行队列中的Task。
当用户在非IO线程调用Channel的各种方法执行Channel相关的操作时,比如channel.write()、channel.flush()等,netty会将相关操作封装成一个Task并放入taskQueue中,保证相关操作在IO线程中串行执行。
2)处理用户产生的定时任务
NioEventLoop中的Queue<ScheduledFutureTask<?>> delayedTaskQueue = new PriorityQueue被用来承载用户产生的定时Task。
当用户在非IO线程需要产生定时操作时,netty将用户的定时操作封装成ScheduledFutureTask,即一个netty内部的定时Task,并将定时Task放入delayedTaskQueue中等待对应Channel的IO线程串行执行。
为了解决多线程并发写入delayedTaskQueue的问题,netty将添加ScheduledFutureTask到delayedTaskQueue中的操作封装成普通Task,放入taskQueue中,通过NioEventLoop的IO线程对delayedTaskQueue进行单线程写操作。
3)处理任务队列的逻辑
a)将已到期的定时Task从delayedTaskQueue中转移到taskQueue中
b)计算本次循环执行的截止时间
c)循环执行taskQueue中的任务,每隔64个任务检查一下是否已过截止时间,直到taskQueue中任务全部执行完或者超过执行截止时间。
Netty中Reactor线程和worker线程所处理的事件
Server端NioEventLoop处理的事件
Client端NioEventLoop处理的事件
pipeline原理与事件处理
目录
- pipeline整体关系简述
- Unsafe的作用
- 事件的分类及处理
- pipeline中节点的添加和删除
一、pipeline整体关系简述
netty中的pipeline模型
当EventLoop的selector监听到某Channel产生了就绪的IO事件,并调用socket API对就绪的IO事件进行操作后,需要将操作产生的“IO数据”或“操作结果”告知用户进行相应的业务处理。
netty将因外部IO事件导致的Channel状态变更(Channel被注册到EventLoop中,Channel状态变为可用,Channel读取到IO数据...)或Channel内部逻辑操作(添加ChannelHandler...)抽象为不同的回调事件,并定义了pipeline对Channel的回调事件进行流式的响应处理。
用户可在pipeline中添加多个事件处理器(ChannelHandler),并通过实现ChannelHandler中定义的方法,对回调事件进行定制化的业务处理。ChannelHandler也可以调用自身方法对Channel本身进行操作。
netty会保证“回调事件在ChannelHandler之间的流转”及“Channel内部IO操作”由EventLoop线程串行执行,用户也可以在ChannelHandler中使用自行构建的业务线程进行业务处理。
pipeline相关类的关系图
- DefaultChannelPipeline:
事件处理流,是一个双向链表结构,链表中节点元素为ChannelHandlerContext。
新的AbstractNioChannel创建时,会创建该Channel对应的DefaultChannelPipeline,用于处理该Channel对应的回调事件。
DefaultChannelPipeline创建时,会自动创建并向链表中添加两个ChannelhandlerContext节点——head和tail。
pipeline的fireXXX()方法:回调事件的发起方法。会产生相应回调事件并直接调用ChannelHandlerContext.invokeXXX(head)方法将回调事件传递给pipeline的head节点。
ChannelHandlerContext:
事件处理器上下文,pipeline中的实际处理节点。
每个处理节点ChannelHandlerContext中包含一个具体的事件处理器ChannelHandler,同时ChannelHandlerContext中也绑定了对应的pipeline和Channel的信息,方便ChannelHandler进行调用。
AbstractChannelHandlerContext具体实现了ChannelHandlerContext接口的功能,并进行了相应扩展。
- ChannelHandlerContext的fireXXX方法:回调事件的发起方法。会产生相应回调事件并将其交给pipeline中的下一个处理节点。此方法提供给用户实现的ChannelHandler使用,用于将回调事件向pipeline中的下一个节点传递。
- AbstractChannelHandlerContext的static invokeXXX(AbstractChannelHandlerContext next)方法:封装next.invokeXXX()的逻辑并交给EventLoop的IO线程执行。
- ChannelHandlerContext的invokeXXX()方法:回调事件执行方法。执行节点中事件处理器ChannelHandler的XXX方法,实际处理回调事件。
ChannelHandler:
ChannelHandler(事件处理器接口),由ChannelInboundHandler接口和ChannelOutboundHandler接口继承。
ChannelInboundHandler中定义了各个回调事件的回调方法,由用户进行具体实现。
ChannelOutboundHandler中定义了方法进行Channel内部IO操作(Channel发起bind/connect/close操作,Channel监听OP_READ,Channel写IO数据...),供用户在回调方法中使用。
ChannelInboundHandlerAdapter和ChannelOutboundHandlerAdapter为接口的默认实现类(其实没干什么事),用户通过继承这两个类来实现自己的业务处理逻辑
二、Unsafe的作用
概述
Unsafe是Channel的内部类,一个Channel对应一个Unsafe。
Unsafe用于处理Channel对应网络IO的底层操作。ChannelHandler处理回调事件时产生的相关网络IO操作最终也会委托给Unsafe执行。
Unsafe接口中定义了socket相关操作,包括SocketAddress获取、selector注册、网卡端口绑定、socket建连与断连、socket写数据。这些操作都和jdk底层socket相关。
NioUnsafe在Unsafe基础上增加了几个操作,包括访问jdk的SelectableChannel、socket读数据等。
NioByteUnsafe实现了与socket连接的字节数据读取相关的操作。
NioMessageUnsafe实现了与新连接建立相关的操作。
三、事件的分类及处理
在netty的pipeline中包含两种类型的事件,分别为inbound和outbound,inbound为上行事件,outbound为下行事件。
inbound事件为被动触发,在某些情况发生时自动触发;
outbound为主动触发,在需要主动执行某些操作时触发。
Inbound事件
Outbound事件
发起并处理inbound事件
对于inbound事件,因为需要进行业务逻辑处理,因此pipeline的head节点会执行fireXXX()方法将事件透传给后面的用户自己实现inbound处理节点,由用户自己实现的ChannelHandler接收事件并回调执行业务逻辑。
发起并处理outbound事件
对于outbound事件,因为和IO操作相关,最后会由pipeline中的head节点接收处理。head节点实现了ChannelHandler的事件执行方法,将实际的执行操作委托给Unsafe进行。
bind、connect、read、writeAndFlush等outbound事件的处理过程可以自己跟代码看一下。(最终由head节点委托给Unsafe类执行相关IO操作)
head、tail节点的作用
head节点:
head节点既是inBound处理节点,又是outBound处理节点。
head节点作为pipeline的头结点开始接收并传递inbound事件。并作为pipeline的最后一环最终接收处理outbound事件(委托Unsafe进行outbound事件的相关IO操作)。tail节点:
tail节点是inBound处理节点。
tail节点作为pipeline的第一环传递outbound事件,其实就是将outbound事件透传到前一个outbound处理节点。并作为pipeline的最后一环最终接收inbound事件,大部分左右是终止inbound事件的传播。
tail节点的exceptionCaught方法:若最终在用户自定义的处理节点没有捕获处理异常,则在tail节点捕获异常打印警告日志。
tail节点的channelRead方法:若Channel读入的ByteBuf在流经pipeline过程中没有被消费掉,最终流入了tail节点,则将该ByteBuf丢弃回收并打印警告日志。