零、整体流程
1、不断获取accpet事件对应的socketChannel,并构建为NioSocketChannel
1)获取accept事件对应的socketChannel
SocketChannel ch = javaChannel().accept();
2)使用获取到的socketChannel初始化一个NioSocketChannel
包括初始化Channel关联的ch、pipeline、unsafe、config。
new NioSocketChannel(this, ch);
3)通过ServerSocketChannel,将NioSocketChannel childCh作为参数产生一个ChannelRead事件
2、boss线程中执行ChannelRead事件
1)将ServerBootstrap中设置的childHandler加入NioSocketChannel childCh的pipeline中
2)将childCh与ServerBootstrap的workGroup中的某个EventLoop child绑定
3)向child的任务执行队列中添加childCh的register0事件
3、SocketChannel所绑定的worker线程中执行register0事件
1)将NioSocketChannel的ch注册到NioEventLoop child的selector上
SelectionKey selectionKey = javaChannel().register(eventLoop().selector, 0, this);
2)SocketChannel childCh产生ChannelRegisted事件
pipeline.fireChannelRegistered();
会导致ChannelInitialzer.channelRegisted()执行,将用户定义的业务处理器添加到childCh的pipeline中。
3)SocketChannel childCh产生ChannelActive事件
pipeline.fireChannelActive();
HeadContext接收该事件,处理过程:在childCh上产生read()事件,最终由AbstractUnsafe.beginRead()执行read事件,给childCh添加OP_READ事件监听。
selectionKey.interestOps(OP_READ);
一、代码入口
NioEventLoop.processSelectedKey(SelectionKey k, AbstractNioChannel ch)处理监听到的IO事件时,对于OP_ACCEPT事件,将执行unsafe.read();,具体逻辑由NioMessageUnsafe.read()实现。
二、不断获取accpet事件对应的socketChannel,并构建为NioSocketChannel
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
循环执行以下逻辑直到跳出---->
1、获取accept事件对应的socketChannel
SocketChannel ch = javaChannel().accept();
2、使用socketChannel实例化一个NioSocketChannel childCh,并初始化相关field
new NioSocketChannel(this, ch);
parent = nioServerSocketChannel
unsafe = new NioSocketChannelUnsafe()
pipeline = new DefaultChannelPipeline(this) ->此处初始化一个DefaultChannelPipeline,并将pipeline和channel互相绑定
ch = ch ->同时将ch设置为非阻塞模式
readInterestOp = OP_READ
config = new NioSocketChannelConfig(this, javaChannel().socket())
3、通过ServerSocketChannel,将NioSocketChannel childCh作为参数产生一个ChannelRead事件
<---循环结束条件:a、ch.accept()无新连接产生;b、本次已accept的连接数量超过阈值(16)。
三、boss线程中执行ChannelRead事件
ChannelRead事件由ServerBootstrapAcceptor处理
1、将ServerBootstrap中设置的childHandler加入NioSocketChannel childCh的pipeline中
child.pipeline().addLast(childHandler);
2、给NioSocketChannel childCh设置attrs和options
3、将NioSocketChannel childCh注册到ServerBootstrap的workerGroup上
childGroup.register(child);
实际调用childCh的unsafe完成注册。
AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise);
- 将channel.eventLoop绑定为当前NioEventLoop child;
- 将AbstractUnsafe.register0(DefaultChannelPromise promise)任务加入EventLoop child的执行任务队列;
四、在ServerSocketChannel上产生ChannelReadComplete事件
pipeline中没有节点对该事件有实际处理
五、SocketChannel所绑定的worker线程中执行register0事件
1、将NioSocketChannel的ch注册到NioEventLoop child的selector上,同时将注册得到的SelectionKey绑定为NioSocketChannel的selectionKey
doRegister();
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
netty的轮询注册机制其实是将AbstractNioChannel内部的jdk类对象SelectableChannel ch注册到jdk类对象Selector selector上去,并且将AbstractNioChannel channel作为SelectableChannel对象ch的一个attachment附属上,这样在jdk轮询出某个SelectableChannel有IO事件发生时,就可以直接取出AbstractNioChannel进行后续操作。
2、标识ChannelPromise状态为success
3、SocketChannel childCh产生ChannelRegisted事件
pipeline.fireChannelRegistered();
会导致ChannelInitialzer.channelRegisted()执行,将用户定义的业务处理器添加到childCh的pipeline中。
4、SocketChannel childCh产生ChannelActive事件
pipeline.fireChannelActive();
HeadContext接收该事件,进行以下操作
- 将事件继续向后传递给其他处理器;
- 在childCh上产生read()事件,最终由AbstractUnsafe.beginRead()执行read事件,调用AbstractNioChannel.doBeginRead()给childCh添加OP_READ事件监听。