本文是Netty文集中“Netty 源码解析”系列的文章。主要对Netty的重要流程以及类进行源码解析,以使得我们更好的去使用Netty。Netty是一个非常优秀的网络框架,对其源码解读的过程也是不断学习的过程。
预备知识
首先,我们知道JDK NIO的Selector实现了I/O多路复用。可以通过一个线程来管理多个Socket。我们可以将多个Channel(一个Channel代表了一个Socket)注册到一个Selector上,并且设置其感兴趣的事件。这样一来,在Selector.select操作时,若发现Channel有我们所感兴趣的事件发生时Selector就会将其记录下来(即,SelectedKeys),然后我们就可以对事件进行相应的处理了。更多关于JDK NIO的知识请参阅关于 NIO 你不得不知道的一些“地雷”
ServerSocketChannel的有效事件为OP_ACCEPT。
SocketChannel的有效事件为OP_CONNECT、OP_READ、OP_WRITE
SelectionKey.OP_ACCEPT 事件处理流程
当服务端收到客户端的一个连接请求时,‘SelectionKey.OP_ACCEPT’将会触发。在NioEventLoop的事件循环中会对该事件进行处理:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
我们来看看unsafe.read()的实现,在NioServerSocketChannel中unsafe是一个NioMessageUnsafe实例:
再者,这里我们主要针对接收连接的逻辑进行分析,关于allocHandle相关的分析不会进行展开,可以参阅Netty 源码解析 ——— AdaptiveRecvByteBufAllocator,这篇文章已经对AdaptiveRecvByteBufAllocator做了详细的分析了。所以此处,我们主要关注的是真实的接收客户端连接请求所涉及的流程。
① 对于ACCEPT事件,每次读循环执行一次读操作(但并没有读取任何字节数据,totalBytesRead > 0 为false)这也是符合NIO规范的,因为每次ACCEPT事件被触发时,仅表示有一个客户端向服务器端发起了连接请求。
② doReadMessages(readBuf):这步主要是通过serverSocket.accpet()来接受生成和客户端通信的socket,并将其放入到readBuf集合中。
创建一个NioSocketChannel实例,NioSocketChannel表示一个使用了NIO Selector的TCP/IP socket的实现。因此我们在构造NioSocketChannel是将上面accept()返回的SocketChannel作为构造函数的参数传入,也就是说NioSocketChannel中持有SocketChannel的引用。同时,我们也将NioServerSocketChannel作为构造函数的参数传入,作为当前NioSocketChannel的parent[一个Channel可以拥有一个父亲(parent),这取决于这个channel是如何被创建的。比如说,一个SocketChannel,它是被ServerSocketChannel所接受的,SocketChannel将ServerSocketChannel作为parent()方法的结果返回]。
而NioSocketChannel的构建和NioServerSocketChannel的构建是非常类似的,关于NioServerSocketChannel的构造我们已经在Netty 源码解析 ——— 服务端启动流程 (下)中进行了详细分析。这里我们提及重要的几点,NioSocketChannel实例的构造主要完成了:
a) 构建Unsafe实例赋值给成员变量unsafe,这里是NioByteUnsafe对象,该类用于完成Channel真实的I/O操作和传输。
b) 创建了该Channel的ChannelPipeline实例(即,DefaultChannelPipeline)赋值给成员变量pipeline。每一个Channel都有它自己的pipeline。
这里ChannelPipeline的构建是很重要的一步。每一个新的Channel被创建时都会分配一个新的ChannelPipeline。ChannelPipeline本质上就是一系列的ChannelHandlers。ChannelPipeline还提供了方法用于传播事件通过ChannelPipeline本身。
ChannelPipeline通过两个AbstractChannelHandlerContext对象本身(head、tail)来维护一个双向链表。
c) SelectionKey.OP_READ赋值给成员变量readInterestOp
设置SocketChannel.configureBlocking(false);配置SocketChannel为非阻塞模式,这步很重要。因为只有非阻塞模式Channel才能使用NIO的Selector来实现非阻塞的I/O操作。
d) 构建NioSocketChannelConfig实例赋值给成员变量config,即,这是负责维护NioSocketChannel相关配置的对象。
③ pipeline.fireChannelRead(readBuf.get(i)):这里实际上就是将第②步构建的NioSocketChannel作为ChannelRead事件的传播数据进行传播。ChannelRead是一个入站事件,它会从ChannelPipeline中的head开始传播,依次顺序回调ChannelInboundHandler的channelRead方法。
这里我们重点来看看ServerBootstrapAcceptor对channelRead事件的处理,ServerBootstrapAcceptor是服务端启动流程中Netty底层加入到NioServerSocketChannel所关联的ChannelPipeline中的一个入站处理器。
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
a) 将我们在启动流程中设置的childHandler(如,「serverBootstrap.childHandler(new MyServerInitializer())」)以及childOptions(如,「serverBootstrap.childOption(ChannelOption.ALLOW_HALF_CLOSURE, true)」)、childAttrs(如,「serverBootstrap.childAttr(AttributeKey.valueOf("userID"), UUID.randomUUID().toString())」)设置到这个NioSocketChannel中。
b) 将配置好的NioSocketChannel注册到childGroup中,并注册了一个监听器,因为注册是一个异步操作,该监听器会得以调用在注册操作完成时(完成包括,成功的完成、失败的完成、取消的完成)。该监听器实现:如果发现该注册操作失败了,则会强制关闭当前这个NioSocketChannel。
而将NioSocketChannel注册到childGroup的操作和将NioServerSocketChannel注册到parentGroup的流程也是极其类似的。详细的说明请参阅Netty 源码解析 ——— 服务端启动流程 (下)。这里做一个简单的概述。
整个注册流程:
a) 首先会通过轮询的方式从childGroup中获取一个NioEventLoop,将当前的NioSocketChannel注册到这个NioEventLoop上。
b) 将当前的SocketChannel注册到NioEventLoop中的Selector上,并将NioSocketChannel作为附加属性设置到SelectionKey中。
c) 回调我们自定义的ChannelInitializer的initChannel方法,将我们定义的一个个ChannelHandler添加到当前NioSocketChannel所关联的ChannelPipeline上,然后将ChannelInitializer本身从ChannelPipeline中移除。
d) 标记注册这个异步操作标志为成功完成。
e) 触发ChannelRegistered事件,该事件会在ChannelPipeline中得以传播。
f) 触发ChannelActive事件,该事件也是一个入站事件,它会从ChannelPipeline中的head开始传播,而head的channelActive方法除了将ChannelActive事件传播给下一个ChannelInboundHandler之外,还调用一个readIfIsAutoRead()方法。
而readIfIsAutoRead()最终会调用到「AbstractNioChannel.doBeginRead()」方法:
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
而这个方法中就完成了将readInterestOp(即,上面在说明NioSocketChannel的构建过程中已经提及,该成员变量值为SelectionKey.OP_READ)设置为感兴趣的事件。这样一来,Selector就会监听该SocketChannel的读事件了。
到目前为止,NioServerSocketChannel通过accept接受了一个客户端的连接请求的整个流程就完成了。这里NioSocketChannel的创建是由NioServerSocketChannel所在的NioEventLoop( 实际上是NioEventLoop所在的线程上 )完成的。然后将创建好的NioSocketChannel注册到childGroup中,也就是通过轮询的方式的方式从childGroup中获取一个NioEventLoop,然后将NioSocketChannel注册的其上。在注册的过程中也完成了将SocketChannel注册到Selector,并设置SelectionKey.OP_READ为感兴趣的事件,这样Selector就会开始监听这个SocketChannel的读事件了。
SelectionKey.OP_CONNECT 事件处理流程
当SelectionKey.OP_CONNECT(连接事件)准备就绪时,我们执行如下操作:
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
① 将SelectionKey.OP_CONNECT事件从SelectionKey所感兴趣的事件中移除,这样Selector就不会再去监听该连接的SelectionKey.OP_CONNECT事件了。而SelectionKey.OP_CONNECT连接事件是只需要处理一次的事件,一旦连接建立完成,就可以进行读、写操作了。
a) boolean wasActive = isActive():
public boolean isActive() {
SocketChannel ch = javaChannel();
return ch.isOpen() && ch.isConnected();
}
因为此时「SocketChannel.finishConnect()」还没调用,所以「ch.isConnected()」将返回false,因此isActive()的结果为false。
b) doFinishConnect():
protected void doFinishConnect() throws Exception {
if (!javaChannel().finishConnect()) {
throw new Error();
}
}
该方法会调用SocketChannel.finishConnect()来标识连接的完成,如果我们不调用该方法,就去调用read/write方法,则会抛出一个NotYetConnectedException异常。
注意,无论如何在connect后finishConnect()方法都是需要被调用的。调用finishConnect()的三种返回:
- 如果你在connect()后直接调用了finishConnect()( 并非在CONNECT事件中调用 ),则若finishConnect()返回了true,则表示channel连接已经建立,而且CONNECT事件也不会被触发了。
- 如果finishConnect()方法返回false,则表示连接还未建立好。那么就可以通过CONNECT事件来监听连接的完成。
-
如果finishConnect()方法抛出了一个IOException异常,则表示连接操作失败。
c) fulfillConnectPromise(connectPromise, wasActive):
II. isActive():获取当前SocketChannel的状态,因为此时「SocketChannel.finishConnect()」已经被调用过了,因此该方法会返回true。
我们来深入看看head对channelActive事件的处理:
III. boolean promiseSet = promise.trySuccess():
将当前的异步的“连接尝试”操作尝试标记为成功。如果用户取消了“连接尝试(即,调用connect操作后,用户调用了cancel来取消该操作)”的操作的话,该方法将返回false;否则返回true,并且如果有ChannelFutureListener注册到了这个ChannelFuture(即,ChannelPromise)上,那么监听器的operationComplete方法将得以回调。
IV. 无论当前的“连接尝试”操作是否被取消,channelActive()事件都将被触发,因为此时channel确实已经处于active状态了。
channelActive是一个入站事件,该事件会从ChannelPipeline的head开始传播至tail间的所有ChannelInboundHandler,并回调它们的channelActive方法。
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
因为NioSocketChannel的NioSocketChannelConfig对象的autoRead属性默认就为1,因此isAutoRead()为true。那么就会调用channel.read()操作,这将触发一个read事件在ChannelPipeline中,而read是一个出站操作。它会从ChannelPipeline的尾部开始传播至head间的每个ChannelOutboundHandler。
我们接着来看head对read事件的处理:
// DefaultChannelPipeline#HeadContext#read()
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
// AbstractChannel#AbstractUnsafe#beginRead()
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
// AbstractNioChannel#doBeginRead()
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
这里的readInterestOp是SelectionKey.OP_READ,是在构建NioSocketChannel对象时传进来的。因此,我们可以知道,此时的read事件主要是完成了,在Selector中对已经注册到其上的NioSocketChannel的OP_READ标识为感兴趣的事件。这样Selector就监控该SocketChannel的读操作了。
V. 如果用户执行了取消“连接尝试”的操作,那么就关闭channel,并触发channelInactive事件。
d) 如果成员变量connectTimeoutFuture非空,则说明该“连接尝试”操作设置了一个连接超时时间。那么,此时连接已经完成了,我们就可以取消这个连接超时检测的定时任务了。超时任务会记录本次“连接尝试”操作为失败状态,并且会将connectTimeoutFuture成员变量置为null。
比如,可能存在这样一种情况:也就是当程序执行完fulfillConnectPromise方法中的「promise.trySuccess()」之后,以及在执行finally代码块之前,“连接尝试”的已经完成,并且ChannelPromise已标记为了true。但是此时设置的连接超时时间已到并且连接超时任务被得以执行,此时超时任务发现ChannelPromise的状态已经被标识过了也就不会进行关闭channel的操作。
因此如上流程我们知道,『connectTimeoutFuture == null』有两种情况:1,如果没有设置连接超时;2,设置了连接超时,并且只因为超时或其他原因已经执行了 NioSocketChannel 的 close() 操作。『connectTimeoutFuture#operationComplete』也有两种情况:1,在超时时间内连接还没建立,则执行相应的close操作;2,连接已经建立了,但是还未执行到connectTimeoutFuture#cancel(false)操作,connectTimeoutFuture 就触发了,此时因为 promise的状态已经被表示过了,也不会进行close操作。
总的来说,OP_CONNECT事件的触发时,表示当前的socket处于了可连接的状态了,需要调用SocketChannel.finishConnect()来完成连接的后续事件。同时会触发ChannelActive事件,该事件为一个入站事件,它会在NioSocketChannel所关联的ChannelPipeline管道得以传播,即,回调head到tail之间所有的ChannelInboundHandler的channelActive方法。而head的channelActive方法中又会触发一个channel的read操作,该操作最终会在NioSocketChannel所注册的Selector中标识OP_READ为感兴趣的事件,这使得Selector会监听NioSocketChannel上是否有可读的数据准备好被读取了。
SelectionKey.OP_READ 事件处理流程
当有可读数据准备被读取时,‘SelectionKey.OP_READ’将会触发。在NioEventLoop的事件循环中会对该事件进行处理:
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
我们来看看unsafe.read()的实现,在NioSocketChannel中unsafe是一个NioByteUnsafe实例:
① 首先,在循环前将分配器处理器中累加的消息/字节计数重置。接着,进行一个读循环操作:
I. 根据给定的分配器以及预测的最优缓冲区容量大小创建一个缓冲区用于准备接受可读取的数据。在Netty中,分配器(allocator)默认为PooledByteBufAllocator实例,而PooledByteBufAllocator实现通过使用jemalloc算法来实现高效的内存分配。
II. allocHandle.lastBytesRead(doReadBytes(byteBuf)):
使用已经分配好的ByteBuf来读取数据。并在分配器处理器中记录本次读取的字节数。
III. 判断本次读操作所读取到的字节数:
a) 若‘读取到的字节数 < 0’,即为’-1’时,说明对端已经关闭了。close变量会被标记为true。因为没有读取到数据,因此调用‘byteBuf.release()’来释放bytebuf(因为,此时 bytebuf 不会在通过 channelPipeline进行传输了,也就是,这个 bytebuf 最后使用的地方就是当前方法,因此应该调用 release() 方法来声明对其的释放),然后退出读循环操作(break)。需要说明的是,因为byteBuf是通过PooledByteBufAllocator来分配的缓冲区,是一个池中的ByteBuf,因此是要通过release()方法来减小bytebud的引用计数,当bytebuf的引用计数为0时,则说明此时已经没有引用指向这个bytebuf了,那么它就会被“回收”;
b) 若‘读取到的字节数 == 0’,仅仅说明本次读操作没有读取到数据,那么就会执行同上面一样的释放bytebuf操作,即,‘byteBuf.release()’,然后退出读循环操作(break);
c) 若‘读取到的字节数 > 0’,说明本次读操作已经到达有效的数据了。那么执行:
[1]. 「allocHandle.incMessagesRead(1)」对读消息的次数进行累加。
[2]. 然后标识readPending为false,表示本次读操作已经读取到了有效数据,无需等待再一次的读操作。
[3]. 接着触发ChannelRead事件,它会在ChannelPipeline中传播,「pipeline.fireChannelRead(byteBuf)」。这是一个入站事件,它会从ChannelPipeline的head开始传播,依次顺序回调ChannelInboundHandler的channelRead()方法。这里可以看到,是读循环中每一次有效的读操作都对触发一次ChannelRead事件,并不是在所有数据都读取到之后才触发一次ChannelRead事件。因此,我们需要提供一系列的编解码器来将收到的数据分割成我们一个个的逻辑数据包,对此Netty也提供了一系列拆箱即有的编解码器为我们解决相关的问题。
IV. 根据当前的NioSocketChannel是否是自动读取的配置,以及已经读取的数据字节数,以及已经进行的读操作次数,以及最近一次读取的字节数来判断是否需要继续进行读循环操作。若需要则继续读循环操作;否则退出读循环,继续后面的流程。
② allocHandle.readComplete():在本次读循环结束后调用一次「allocHandle.readComplete()」来记录本次读循环的数据信息以用于预测下一次读事件触发时,应该分配多大的ByteBuf容量更加合理些。
③ pipeline.fireChannelReadComplete():触发ChannelReadComplete事件,用于表示当前读操作的最后一个消息已经被ChannelRead所消费。ChannelReadComplete是一个入站消息,它会从ChannelPipeline的head开始传播,依次顺序回调ChannelInboundHandler的channelReadComplete方法。
ChannelRead vs ChannelReadComplete
当Channel检测到对端有数据可以读取的时候,channelRead方法会被调用。
channelRead方法可能会被调用多次,当channelReadComplete方法被回调的时候,标识着数据已经都读取完了。也就是说,channelRead方法会被调用多次,当所有消息都读取完后channelReadComplete方法会得到一次调用。
④ 如果close被标识为了true,则说明对端已经关闭了连接。(即,读操作中读取的字节数量为-1,则表示远端已经关闭了),则执行「closeOnRead(pipeline)」
I. 若“isInputShutdown0()”返回false,则说明是远端连接已经关闭了。那么此时,如果我们的程序配置了“ChannelOption.ALLOW_HALF_CLOSURE”属性(即,可以在启动引导类时通过option(ChannelOption.ALLOW_HALF_CLOSURE, true)来启用配置),那么就会进行shutdownInput()操作,并触发一个用户自定义的ChannelInputShutdownEvent.INSTANCE事件,在ChannelPipeline中传播。该事件是一个入站事件,它会从ChannelPipeline中的head开始传播,异常顺序调用ChannelInboundHandler的userEventTriggered方法。
II. 若“isInputShutdown0()”返回false,则说明是远端连接已经关闭了。并且此时我们的程序并没有配置启动“ChannelOption.ALLOW_HALF_CLOSURE”。那么此时就会进行相应SocketChannel的关闭等相关操作。
III. 若“isInputShutdown0()”返回true,则说明是当前的NioSocketChannel自动调用了shutdownInput()方法来关闭了输入流。那么此时就会触发一个用户自定义的ChannelInputShutdownEvent.INSTANCE事件,在ChannelPipeline中传播。该事件是一个入站事件,它会从ChannelPipeline中的head开始传播,异常顺序调用ChannelINboundHandler的userEventTriggered方法。
关于「SocketChannel.shutdownInput()」:关闭一个连接的读,但不关闭这个通道。一旦关闭了读,那么在这之后调用channel的read都将返回’-1’,来表示’流的结尾’。如果往已经关闭的输入流中发送数据,都会默认被丢弃。
⑤ 如果本次读操作已经读取到有效数据(即,最近一次读操作返回的读取字节数>0),并且当前的NioSocketChannel的配置为非自动读取(disable autoRead,说明此时用户不希望Selector去监听当前SocketChannel的读事件,用户可以根据业务逻辑的需要,在希望读取数据时再去添加OP_READ事件到Selector中。并且在每次读取到数据后就将OP_READ事件从所感兴趣的事件中移除),那么此时需要将OP_READ事件从所感兴趣的事件中移除,这样Selector就不会继续监听该SocketChannel的读事件了。
removeReadOp()操作就是将OP_READ从SelectionKey的interestOps集合中移除:
PS:注意,如果在当前端主动调用「channel.shutdownInput()」方法时,需要在处理’ChannelInputShutdownReadComplete’这个用户自定义的事件时调用「channel().config().setAutoRead(false);」来将autoRead置为false。不然,OP_READ事件会一直被触发,而上的步骤’III’会一直被调用,这会导致一些问题,比如不必要的CPU消耗。
调用方式类似:
public class MyServerHandler extends SimpleChannelInboundHandler<String> {
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
Channel serverChannel = ctx.channel();
if(serverChannel instanceof NioSocketChannel) {
System.out.println("server shutdownInput...");
((NioSocketChannel) serverChannel).shutdownInput();
}
}
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof ChannelInputShutdownReadComplete) {
System.out.println("detail ChannelInputShutdownEvent event......");
ctx.channel().config().setAutoRead(false);
}
super.userEventTriggered(ctx, evt);
}
}
SelectionKey.OP_WRITE 事件处理流程
在NioEventLoop的事件循环中’SelectionKey.OP_WRITE’事件的处理流程如下:
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
关于OP_WRITE事件:
OP_WRITE事件的就绪条件并不是发生在调用channel的write方法之后,而是在当底层缓冲区有空闲空间的情况下。因为写缓冲区在绝大部分时候都是有空闲空间的,所以如果你注册了写事件,这会使得写事件一直处于就就绪,选择处理线程就会一直占用着CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
SocketChannel会在写数据时,若发现当buffer还有数据,但写缓冲区已经满的情况下,socketChannel.write(buffer)会返回已经写出去的字节数,此时为0。那么这个时候我们就需要注册OP_WRITE事件,这样当写缓冲区又有空闲空间的时候就会触发OP_WRITE事件,这样我们就可以继续将没写完的数据继续写出了。而且在写完后,一定要记得将OP_WRITE事件注销。
比如,来看看NioSocketChannel的doWrite()操作(「 ch.unsafe().forceFlush()」方法最终也就是会调用到这里):
关于写操作的具体流程分析请参见Netty 源码解析 ——— writeAndFlush流程分析
后记
本文主要对NioEventLoop中涉及到的四种NIO事件的处理流程进行了分析。四个看似简单的处理流程,深入探索后发现其实并不简单,其实可以展开的点还有很多,特别是关于写事件涉及到ChannelOutboundBuffer以及Netty默认使用的PooledByteBufAllocator实现了jemalloc算法来完成高效的内存分配等等,希望在后面的文章中能继续和大家分享我的分析以及想法。
若文章有任何错误,望大家不吝指教:)