1-netty源码分析之Server
- 看netty源码之后进行总结的第一篇笔记,无非帮助自己对于看代码的一个总结,方便自己回顾学习;依然保持从demo出发,服务端、客户端、线程模型、管道四篇核心点记录;
一.demo出发,启动server。
public final class EchoServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
以上就是netty启动server的经典demo了,截自源码;
1.EventLoopGroup:netty框架中的reactor线程模型中"线程"就是由它提供,这个也是netty框架的核心概念之一,一个group可以包含多个EventLoop(即多个线程)。
2.ServerBootstrap:netty服务端启动引擎,也可以认为是netty启动的辅助类,以build的方式组装netty的相关配置及组件,最终串气各个组件启动服务;
3.NioServerSocketChannel:服务端持有的channel,字面翻译是“渠道”,那么它代表一个具体的与客户端的连接,或者与IO相关的操作,它会和一个特定的EventLoop(即线程)绑定用以处理相关的IO操作;
4.ChannelOption:socket相关参数,标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度
5.Handler:处理器,处理的就是整个通信过程中的相关事件或者数据,我们扩展自己的handler就可以实现通信数据的业务处理了;
6.ChannelInitializer:一种ChannelInboundHandler,看initChannel方法里,提供了ChannelPipeline用以组装各种handler,那么ChannelInitializer就是组装相关handler的作用;
7.ChannelPipeline:理解为管道,那么管道里链路管理着各种handler,数据经过管道流向各种handler节点,处理后流出或者流入;
8.ChannelFuture:jdk多线程的Future相似,异步处理的回调监听结果,即在整个netty框架的异步操作成功或者失败都会触发监听即可得到相关结果;
用一张图将以上组件进行串起来:
启动组件大致概念简单介绍如此,后面详细概念继续以debug模式讲解
1.EventLoopGroup初始化
EventLoopGroup bossGroup = new NioEventLoopGroup(3);
EventLoopGroup workerGroup = new NioEventLoopGroup();
看一下类图:
前面讲过,EventLoopGroup是netty线程模型中的线程部分,那么这里是怎么体现的呢?其实这里的父类MultithreadEventExecutorGroup里面封装了一个EventExecutor[] 数组,而EventExecutor是一个接口,找到最下层的实现结构如下:
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
private final EventExecutor[] children;
...
}
这里可以想下NioEventLoop就是一个具体的“线程”了,为啥?依照上图找到父类SingleThreadEventExecutor,可以看到里面包装了一个成员变量Thread,有次可知:SingleThreadEventExecutor即是一个线程的抽象,因而NioEventLoop可以理解为一个线程了,相关的执行操作都是委托到里面的thread去执行。只不过NioEventLoop有更多的大于线程的能力,比如schedule等,这里代码继承关系可以提现。
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
private final Thread thread;
...
}
知道了 EventLoopGroup 是一个线程数组,那么就回到初始化的地方,跟着代码debug进入看看初始化的具体干了什么。
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
/** reject:设置任务队列线程池拒绝策略,默认直接抛异常 */
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
此处连续三次调用可以看出做了一些额外操作,分别是
- 1.获取SelectorProvider
- 2.获取SelectStrategyFactory
- 3.有关任务队列的拒绝策略:RejectedExecutionHandlers.reject()
继续往下:
根据debug可以看出,这里有个线程数的默认值,根据当前处理器核心数 * 2计算得出,当然我们传的是3,因此直接使用3创建一个new SingleThreadEventExecutor[nThreads]数组;核心初始化线程如下:
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
...
/** 创建一个大小为 nThreads 的 SingleThreadEventExecutor 数组 */
children = new SingleThreadEventExecutor[nThreads];
/**
* 根据 nThreads 的大小, 创建不同的 Chooser,
* 即如果 nThreads 是 2 的幂, 则使用 PowerOfTwoEventExecutorChooser, 反之使用 GenericEventExecutorChooser.
* 不论使用哪个 Chooser, 它们的功能都是一样的, 即从 children 数组中选出一个合适的 EventExecutor 实例.
*/
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
/**
* 调用 newChhild 方法初始化 children 数组.
* 具体子类实现
*/
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
...
} finally {
if (!success) {
...
}
}
}
}
主要干了这两件事:
- 1.根据线程数创建一个ExecutorChooser,其实这里也是体现netty对于性能的追求
- 2.调用模板方法创建具体的 线程
进入子类具体的方法:NioEventLoopGroup
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider, SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, threadFactory, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
一连串的super调用之后进入核心线程创建方法:
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
/** 多态,调用NioEventLoop的run方法 */
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
...
}
}
});
...
}
可以看到这里初始化了具体的线程,并且指定了run方法的调用执行为:
SingleThreadEventExecutor.this.run();
那这里的run方法就是EventLoop的核心了,干了什么后面详细说明,先看下run方法干了什么:
/**
* Netty 的事件循环机制
* 当 taskQueue 中没有任务时, 那么 Netty 可以阻塞地等待 IO 就绪事件;
* 而当 taskQueue 中有任务时, 我们自然地希望所提交的任务可以尽快地执行, 因此 Netty 会调用非阻塞的 selectNow() 方法, 以保证 taskQueue 中的任务尽快可以执行.
*
* 1.轮询IO事件
* 2.处理轮询到的事件
* 3.执行任务队列中的任务
* */
@Override
protected void run() {
for (;;) {
try {
/** 如果任务队列没有任务,则进行一次selectNow() */
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
/**
* 轮询出注册在selector上面的IO事件
*
* wakenUp 表示是否应该唤醒正在阻塞的select操作,
* 可以看到netty在进行一次新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始
*/
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
/**
* 第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件. 那么当有 IO 事件就绪时, 第二步自然就是处理这些 IO 事件啦.
*/
needsToSelectAgain = false;
/**
* 此线程分配给 IO 操作所占的时间比
* 即运行 processSelectedKeys 耗时在整个循环中所占用的时间
*/
final int ioRatio = this.ioRatio;
/** 当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用 processSelectedKeys()、runAllTasks(); */
if (ioRatio == 100) {
try {
/** 查询就绪的 IO 事件后 进行处理 */
processSelectedKeys();
} finally {
// Ensure we always run tasks.
/** 运行 taskQueue 中的任务. */
runAllTasks();
}
}
/**
* ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下 processSelectedKeys() 所执行的时间(即 IO 操作的耗时),
* 然后根据公式, 计算出执行 task 所占用的时间, 然后以此为参数, 调用 runAllTasks().
*/
else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
...
}
}
哈哈,无限死循环,无非就是轮训,那么netty的reactor线程模型就此方法为核心点跟进即可;
EventLoopGroup初始化先讲到这里,线程模型后面再详细笔记。
二.ServerBootstrap组装组件
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
-
1.组装两个EventGroup:bossGroup, workerGroup;bossGroup用来专门负责绑定到端口监听连接事件,workerGroup用来处理每个接收到的连接
可以看到bossGroup只是简单的赋值给父类的成员变量,workerGroup赋值给ServerBootstrap的ServerBootstrap的childGroup属性,既简单的赋值而已;
-
2.组装channel
这里不是将传入的NioServerSocketChannel作为类型构造一个工厂类赋值给自己的channelFactory,后续在启动初始化时利用其构造channel;
- 3.option、handler、childHandler只是简单的赋值而已,不多解释;ChannelInitializer在前面讲过,用以封装业务handler链,后面启动时会讲解。
三.启动服务
ChannelFuture f = b.bind(PORT).sync();
可以说启动的核心逻辑就在这里,确实做了不少事情,可以想象nio多路复用、事件轮训、selector与channel的注册、channel与pipeline的绑定、channel与eventLoop的绑定等等都在这一步做的,那么详细debug走起。
进入核心方法doBind;
- initAndRegister
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
/** 初始化channel --> client 获取 NioSocketChannel; --> server 获取 ServerSocketChannel*/
channel = channelFactory().newChannel();
init(channel);
} catch (Throwable t) {
...
}
/**
* Channel 注册过程:
* 1.将 Channel 与对应的 EventLoop 关联,
* 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的;
*
* 2.当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中.
* 通过这两步, 就完成了 Netty Channel 的注册过程.
*
* 3.若是服务端注册,则group()返回的是bossGroup
*/
ChannelFuture regFuture = group().register(channel);
return regFuture;
}
做了三件事:
- 1.根据之前构造的工厂new一个channel
- 2.将创建的channel进行一系列初始化动作
- 3.将channel与对应的EventLoopGroup关联,即channel绑定到指定的线程;
一个一个进行分解:
-
1.new channel
利用构造器进行instance那么找到构造器:
1.构造器中直接调用SelectorProvider打开一个ServerSocketChannel,可以看到这一步到了与nio交互了;
2.直接super父类构造中,不断super就会到AbstractChannel中核心点
看上面三行:
1.将channel进行赋值,此时为空
2.构造一个UnSafe对象,这里netty真正的读写等IO事件都是交给UnSafe去操作的,这里返回的是一个NioMessageUnsafe,服务端需要的UnSafe对象,将新的连接注册到worker线程组【netty将一个新连接的建立也当作一个io操作来处理,这里的Message的含义我们可以当作是一个SelectableChannel,读的意思就是accept一个SelectableChannel,写的意思是针对一些无连接的协议,比如UDP来操作的】
-
3.创建一个数据自己的piepline,用以后续组装handler
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); /** 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表 */ tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }
这里是上面构造piepline的方法,可以看到这里的链表结构,首先将channel绑定自己,然后构造head,tail收尾节点,同时这里对对于head,tail类型是ChannelHandlerContext,理解为一个handler的context,可以做一些额外的动作。
可以看到这里有个重要的属性inbound,这个是干啥的呢?
ChannelHandler有两个子类ChannelInboundHandler和ChannelOutboundHandler,这两个类对应了两个数据流向,如果数据是从外部流入我们的应用程序,我们就看做是inbound,相反便是outbound,因此这个代表handler的流向节点意思了; 同时head和tail都要与具体的UnSafe绑定,因为这里是数据流向的首尾节点,那么自然就是具体的非读即写数据了,此处自然交给绑定的UnSafe去操作,因此这就是绑定UnSafe的理由了。
OK,new channel介绍到这里,继续回到initAndRegister中的init方法:
-
2.init(channel)
@Override void init(Channel channel) throws Exception { ... ChannelPipeline p = channel.pipeline(); ... /** * 加入新连接处理器,用来专门接受新连接 * 初始化channel时,加入匿名ChannelHandler, 作用就是在register channel到selector时回调init方法,将boss上的handler加入pipeline中,并且ServerBootstrapAcceptor handler用以绑定childGroup和NioSocketChannel */ p.addLast(new ChannelInitializer<Channel>() { @Override public void initChannel(final Channel ch) throws Exception { final ChannelPipeline pipeline = ch.pipeline(); /** 这里的handler返回的是主.handler(new LoggingHandler(LogLevel.INFO)) 中的handler*/ ChannelHandler handler = handler(); if (handler != null) { pipeline.addLast(handler); } ch.eventLoop().execute(new Runnable() { @Override public void run() { /** 这里的 childGroup.register 就是将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */ pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }
省略非主要代码,这里拿到构造的pipeline,然后加入一个ChannelInitializer的handler,并且植入一个initChannel方法,说下initChannel的作用【该方法当然此时不会执行】:
1.获取主handler并加入piepline
2.在taskQueu中offer一个任务,具体执行的是将ServerBootstrapAcceptor加入到pieline中 ,而ServerBootstrapAcceptor主要作用是将workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联,具体如何触发在后面client发起连接时会细说。
initChannel做的事不多,继续回到initAndRegister方法
-
3.group().register(channel)
/** * Channel 注册过程: * 1.将 Channel 与对应的 EventLoop 关联, * 因此这也体现了, 在 Netty 中, 每个 Channel 都会关联一个特定的 EventLoop, 并且这个 Channel 中的所有 IO 操作都是在这个 EventLoop 中执行的; * * 2.当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中. * 通过这两步, 就完成了 Netty Channel 的注册过程. * * 3.若是服务端注册,则group()返回的是bossGroup */ ChannelFuture regFuture = group().register(channel);
其实前面讲过,每个channel都会绑定一个EventLoop用以专门处理跟此Channel相关的IO事件,看代码跟踪,这里先记一下bossGroup对象:
可以很明显的跟踪到这里group()返回的就是bossGroup,继续往下走:MultithreadEventLoopGroup的register方法体:首先解剖下next()做了什么:
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
@Override
public EventLoop next() {
/** 获取一个可用的 SingleThreadEventLoop */
return (EventLoop) super.next();
}
这里看到了chooser,也就是EventExecutorChooser,记得我们当初设置的是3,非2的幂,初始化EventLopp的时候,因此取得GenericEventExecutorChooser类型的选择器:
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
private final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}
}
看逻辑abs算法出一个主线程,用以实际的操作,其实这里有个很重要的要点明:
主线程无论初始化多少,最终执行操作的永远只有一个线程,因此这里在初始化EventLoopGroup bossGroup = new NioEventLoopGroup(3);时直接将参数设为1即可
继续回到register:
生成DefaultChannelPromise后,调用UnSafe对象进行 register进行具体的底层相关的注册操作,这就实现了注释中的第二部:
2.当关联好 Channel 和 EventLoop 后, 会继续调用底层的 Java NIO SocketChannel 的 register 方法, 将底层的 Java NIO SocketChannel 注册到指定的 selector 中.通过这两步, 就完成了 Netty Channel 的注册过程.
这里由于是用户线程,因此转成EventLoop的任务扔进队列里等待去执行,于是会有异步回调就靠这个ChannelPromise了。什么时候会执行任务,那就要先启动线程:
继续跟踪代码:
具体做了:
1.判断是不是EventLoop线程,如果是直接offer任务;
2.如果使用户线程,先启动线程EventLoop线程,再offer任务
那么很明显清楚,这里对任务进行了归一处理,猜想启动了EventLoop线程后,大概就是轮训task队列的任务了,那么继续跟踪启动的逻辑:
这里的thread到底是什么,回到EventLoop初始化的地方:
protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
try {
/** 多态,调用NioEventLoop的run方法 */
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
}
}
});
}
标注核心逻辑,这里的run就是当初的那个NioEventLoop的具体实现的run,防止走丢,再贴一次代码:
/**
* Netty 的事件循环机制
* 当 taskQueue 中没有任务时, 那么 Netty 可以阻塞地等待 IO 就绪事件;
* 而当 taskQueue 中有任务时, 我们自然地希望所提交的任务可以尽快地执行, 因此 Netty 会调用非阻塞的 selectNow() 方法, 以保证 taskQueue 中的任务尽快可以执行.
*
* 1.轮询IO事件
* 2.处理轮询到的事件
* 3.执行任务队列中的任务
* */
@Override
protected void run() {
for (;;) {
try {
/** 如果任务队列没有任务,则进行一次selectNow() */
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
/**
* 轮询出注册在selector上面的IO事件
*
* wakenUp 表示是否应该唤醒正在阻塞的select操作,
* 可以看到netty在进行一次新的loop之前,都会将wakeUp 被设置成false,标志新的一轮loop的开始
*/
select(wakenUp.getAndSet(false));
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
cancelledKeys = 0;
/**
* 第一步是通过 select/selectNow 调用查询当前是否有就绪的 IO 事件. 那么当有 IO 事件就绪时, 第二步自然就是处理这些 IO 事件啦.
*/
needsToSelectAgain = false;
/**
* 此线程分配给 IO 操作所占的时间比
* 即运行 processSelectedKeys 耗时在整个循环中所占用的时间
*/
final int ioRatio = this.ioRatio;
/** 当 ioRatio 为 100 时, Netty 就不考虑 IO 耗时的占比, 而是分别调用 processSelectedKeys()、runAllTasks(); */
if (ioRatio == 100) {
try {
/** 查询就绪的 IO 事件后 进行处理 */
processSelectedKeys();
} finally {
// Ensure we always run tasks.
/** 运行 taskQueue 中的任务. */
runAllTasks();
}
}
/**
* ioRatio 不为 100时, 则执行到 else 分支, 在这个分支中, 首先记录下 processSelectedKeys() 所执行的时间(即 IO 操作的耗时),
* 然后根据公式, 计算出执行 task 所占用的时间, 然后以此为参数, 调用 runAllTasks().
*/
else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
...
} catch (Throwable t) {
handleLoopException(t);
}
}
}
看到轮训了,那么EventLoop线程启动了,很自然就能找到当时人进去的任务,其中有个register0的逻辑,线程启动了,自认就该执行任务了,回到注册逻辑:
出现了selectionKey,这一步就将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中, 并且将当前 Channel 作为 attachment.回到register0
pipeline.invokeHandlerAddedIfNeeded()
这句是干嘛的呢? 还记得初始化channel--> init(channel)的方法吗,里面有个动作是这样的:
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
/** 这里的handler返回的是主.handler(new LoggingHandler(LogLevel.INFO)) 中的handler*/
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
/** 这里的 childGroup.register 就是将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */
pipeline.addLast(new ServerBootstrapAcceptor(ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
这里的invokeHandlerAddedIfNeeded就会触发这个ChannelInitializer中的initChannel方法,然后顺其自然的将ChannelHandler加进piepline,紧接着触发ServerBootstrapAcceptor将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联;很随意的将整个线程run起来。
跟踪一下:
执行效果确实如此。
回过头看下ChildHandler是不是跟这个时候的一样,比较一下:
经对比,确实如此,可以说这里就是将childHandler加入piepline的地方了。
还记的我们在初始化ServerBootstrap时有b.group(bossGroup, workerGroup)这样一个组装线程的地方,但是会发现整个服务端启动过程都不会涉及workerGroup相关的启动,其实这里也是关键,这里先将workGroup赋值给ServerBootstrap#ServerBootstrapAcceptor的属性,在客户端发起请求时触发channelRead方法,紧接着就启动了workGroup,进而轮训处理相关的IO事件啦,看下代码:
/**
* 这里讲workerGroup绑定到channel,那么这里如何被触发呢?
* 其实当一个 client 连接到 server 时, Java 底层的 NIO ServerSocketChannel 会有一个 SelectionKey.OP_ACCEPT 就绪事件, 接着就会调用到 NioServerSocketChannel.doReadMessages
*
* 新建此连接的 NioSocketChannel 并添加 childHandler 到 NioSocketChannel 对应的 pipeline 中, 并将此 channel 绑定到 workerGroup 中的某个 eventLoop 中
*/
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
/** 将msg转换成对应的channel */
final Channel child = (Channel) msg;
/** 添加用户自定义的childHandler */
child.pipeline().addLast(childHandler);
/** 设置 NioSocketChannel 对应的 attr和option */
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
/** 将 workerGroup 中的某个 EventLoop 和 NioSocketChannel 关联 */
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
register就跟bossGroup类似,调用UnSafe去完成具体的操作,中间又把相关的用户线程收冗成EventLoop进行具体任务操作,完成一系列动作。
OK,大致注册先介绍到这里
到此为止,整个channel注册过程完成,继续回到起点
- 4.doBind
经过piepline事件的传递,走到AbstractChannel#AbstractUnsafe的bind方法:
到这里,调用底层的socketChannel进行具体adress绑定工作,整个bind结束。接下来就会调用pipeline.fireChannelActive();进行下达工作了。
下面进行个总结
- 1.初始化相关的组件
- 2.设置好EventLoop,包括bossGroup及workGroup
- 3.初始化channel,并且绑定EventLoop
- 4.启动线程,将用户线程进行的动作(比如注册、绑定等)进行任务化,交给EventLoop处理
- 5.BossEventLoop轮训事件,接受客户端请求,触发WorkerEventLoop启动处理IO读写等操作
- 6.进行任务处理,主要包括将 Channel 对应的 Java NIO SockerChannel 注册到一个 eventLoop 的 Selector 中,并绑定到具体的地址进行监听。