这篇文章同样先总结介绍框架,然后分开 介绍源码,初始化等。
Netty的Channel过滤器实现原理与ServletFilter机制一致,它将Channel的数据管道抽象为ChannelPipeline, 消息在ChannelPipeline中流动和传递。ChannelPipeline 持有I/O事件拦截器ChannelHandler的链表,由ChannelHandler对I/O事件进行拦截和处理,可以方便地通过新增和删除ChannelHandler来实现不同的业务逻辑定制,不需要对已有的ChannelHandler进行修改,能够实现对修改封闭和对扩展的支持。
1. 整体概况
下图展示了具体的事件传播流程图:
- (1)底层的SocketChannel read()方法读取ByteBuf, 触发ChannelRead事件,由I/O线程NioEventLoop 调用ChannelPipeline 的fireChannelRead(Object msg)方法,将消息( ByteBuf)传输到ChannelPipeline中:
- (2)消息依次被HeadHandler、ChannelHandlerl 、ChannelHandler2-.. ... TailHandler拦截和处理,在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递;
- (3)调用ChannelHandlerContext的write方法发送消息,消息从TailHandler开始,途经ChannelHanderN.....ChannelHandlerl. HeadHandler, 最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递,例如当编码失败时,就需要中断流程,构造异常的Future返回。
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
先理清一个关系:
一个Channel 包含了一个ChannelPipeline , 而ChannelPipeline 中又维护了一个由ChannelHandlerContext 组成的双向链表。这个链表的头是HeadContext,链表的尾是TailContext,并且每个ChannelHandlerContext 中又关联着一个ChannelHandler(这个channelHandlerContext只是channelHandler的上下文环境,用来保存上下文环境以及和前后channelHandler做一个桥梁沟通),如下图:
2.ChannelPipeline
通过接口可以发现,channelPipeline主要有两个功能:
- 第一个管理channelHandler的crud,比如first,last,addxxx等方法
- 第二个IO事件的传播处理,比如源码中带firexxx的表示触发下一个handler事件
继承图如下:
在前我们已经知道了一个Channel 的初始化的基本过程,下面我们再回顾一下。下面的代码是AbstractChannel 构造器:
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
2.1 HeadContext和TailContext
HeadContext 实现了ChannelInboundHandler,而tail 实现了
ChannelOutboundHandler 接口,并且它们都实现了ChannelHandlerContext 接口, 因此可以说head 和tail 即是一个ChannelHandler,又是一个ChannelHandlerContext。接着看HeadContext 构造器中的代码:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
它调用了父类AbstractChannelHandlerContext 的构造器,并传入参数inbound = false,outbound = true。而TailContext 的构造器与HeadContext 正好相反,它调用了父类AbstractChannelHandlerContext 的构造器,并传入参数inbound = true,outbound = false。也就是说header 是一个OutBoundHandler,而tail 是一个InboundHandler。
AbstractChannel 有一个pipeline 字段,在构造器中会初始化它为DefaultChannelPipeline 的实例。这里的代码就印证了一点:每个Channel 都有一个ChannelPipeline。接着我们跟踪一下DefaultChannelPipeline 的初始化过程,首先进入到DefaultChannelPipeline 构造器中:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
在DefaultChannelPipeline 构造器中, 首先将与之关联的Channel 保存到字段channel 中。然后实例化两个
ChannelHandlerContext:一个是HeadContext 实例head,另一个是TailContext 实例tail。接着将head 和tail 互相指向, 构成一个双向链表。
特别注意的是:我们在开始的示意图中head 和tail 并没有包含ChannelHandler,这是因为HeadContext 和TailContext继承于AbstractChannelHandlerContext 的同时也实现了ChannelHandler 接口了,因此它们有Context 和Handler的双重属性。
2.2 PipeLine的事件传播机制
在 AbstractChannelHandlerContext 中有inbound 和outbound 两个boolean 变量,分别用于标识Context 所对应的handler 的类型,即:
1、inbound 为true 是,表示其对应的ChannelHandler 是ChannelInboundHandler 的子类。
2、outbound 为true 时,表示对应的ChannelHandler 是ChannelOutboundHandler 的子类。 这里大家肯定还有很多疑惑,不知道这两个字段到底有什么作用? 这还要从ChannelPipeline 的事件传播类型说起。
Netty 中的传播事件可以分为两种:Inbound 事件和Outbound 事件。如下是从Netty 官网针对这两个事件的说明:
再次看看图:
从上图可以看出,inbound 事件和outbound 事件的流向是不一样的:
-
inbound 事件的流行是从下至上,并且inbound 的传递方式是通过调用相应的ChannelHandlerContext.fireIN_EVT()方法
- 例如:ChannelHandlerContext的fireChannelRegistered()调用会发送一个ChannelRegistered 的inbound 给下一个ChannelHandlerContext
-
outbound刚好相反,是从上到下。,而outbound 方法的的传递方式是通过调用ChannelHandlerContext.OUT_EVT()方法。
- 例如:ChannelHandlerContext 的bind()方法调用时会发送一个bind 的outbound 事件给下一个ChannelHandlerContext。
2.2.1 Outbound 事件传播
outbound 类似于主动触发(发起请求的事件);
Outbound 事件都是请求事件(request event),即请求某件事情的发生,然后通过Outbound 事件进行通知。Outbound 事件的传播方向是tail -> customContext -> head。
ChannelOutboundHandler方法有:
public interface ChannelOutboundHandler extends ChannelHandler {
void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;
void connect(
ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception;
void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;
void read(ChannelHandlerContext ctx) throws Exception;
void flush(ChannelHandlerContext ctx) throws Exception;
}
public class MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise)
throws Exception {
System.out.println("客户端关闭");
ctx.close(promise);
}
2.2.2 连接事件 案例讲解
我们接下来以connect 事件为例,分析一下Outbound 事件的传播机制。
首先,当用户调用了Bootstrap 的connect()方法时,就会触发一个Connect 请求事件,此调用会触发如下调用链:
继续跟踪,我们就发现AbstractChannel 的connect()其实由调用了DefaultChannelPipeline 的connect()方法:
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return pipeline.connect(remoteAddress, promise);
}
而pipeline.connect()方法的实现如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
可以看到,当outbound 事件(这里是connect 事件)传递到Pipeline 后,它其实是以tail 为起点开始传播的。 而tail.connect()其实调用的是AbstractChannelHandlerContext 的connect()方法:
public ChannelFuture connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
//此处省略N 句
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
next.invokeConnect(remoteAddress, localAddress, promise);
//此处省略N 句
return promise;
}
findContextOutbound()方法顾名思义,它的作用是以当前Context 为起点,向Pipeline 中的Context 双向链表的前端寻找第一个outbound 属性为true 的Context(即关联ChannelOutboundHandler 的Context),然后返回。 findContextOutbound()方法代码实现如下:
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
当我们找到了一个outbound 的Context 后,就调用它的invokeConnect()方法,这个方法中会调用Context 其关联的ChannelHandler 的connect()方法:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
如果用户没有重写ChannelHandler 的connect()方法,那么会调用ChannelOutboundHandlerAdapter 的connect()实现:
public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
SocketAddress localAddress, ChannelPromise promise) throws Exception {
ctx.connect(remoteAddress, localAddress, promise);
}
我们看到,ChannelOutboundHandlerAdapter 的connect()仅仅调用了ctx.connect(),而这个调用又回到了:Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect这样的循环中,直到connect 事件传递到DefaultChannelPipeline 的双向链表的头节点,即head 中。为什么会传递到head 中呢?回想一下,head 实现了ChannelOutboundHandler,因此它的outbound 属性是true。因为head 本身既是一个ChannelHandlerContext,又实现了ChannelOutboundHandler 接口,因此当connect()消息传递到head 后,会将消息转递到对应的ChannelHandler 中处理,而head 的handler()方法返回的就是head 本身:
因此最终connect()事件是在head 中被处理。head 的connect()事件处理逻辑如下:
public void connect(ChannelHandlerContext ctx,SocketAddress remoteAddress, SocketAddress localAddress,ChannelPromise promise) throws Exception {
//最终调用这里
unsafe.connect(remoteAddress, localAddress, promise);
}
到这里, 整个connect()请求事件就结束了。下图中描述了整个connect()请求事件的处理过程:
5.2 Inbound 事件传播
inbound 类似于是事件回调(响应请求的事件),
Inbound 的特点是它传播方向是head -> customContext -> tail。
ChannelInboundHandler方法有:
public interface ChannelInboundHandler extends ChannelHandler {
void channelRegistered(ChannelHandlerContext ctx) throws Exception;
void channelUnregistered(ChannelHandlerContext ctx) throws Exception;
void channelActive(ChannelHandlerContext ctx) throws Exception;
void channelInactive(ChannelHandlerContext ctx) throws Exception;
void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;
void channelReadComplete(ChannelHandlerContext ctx) throws Exception;
void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;
void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;
void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
5.2.1 使用fireXXX方法继续传播事件
注意,如果我们捕获了一个事件,并且想让这个事件继续传递下去,那么需要调用Context 对应的传播方法fireXXX。
如下面的示例代码:MyInboundHandler 收到了一个channelActive 事件,它在处理后,如果希望将事件继续传播下去那么需要接着调用ctx.fireChannelActive()方法
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接成功");
ctx.fireChannelActive();
}
}
5.1.2 激活事件 案例讲解
Inbound 事件和Outbound 事件的处理过程是类似的,只是传播方向不同。 Inbound 事件是一个通知事件,即某件事已经发生了,然后通过Inbound 事件进行通知。Inbound 通常发生在Channel的状态的改变或IO 事件就绪。
Inbound 的特点是它传播方向是head -> customContext -> tail。
上面我们分析了connect()这个Outbound 事件,那么接着分析connect()事件后会发生什么Inbound 事件,并最终找到Outbound 和Inbound 事件之间的联系。当connect()这个Outbound 传播到unsafe 后,其实是在AbstractNioUnsafe的connect()方法中进行处理的:
public final void connect(final SocketAddress remoteAddress,
final SocketAddress localAddress, final ChannelPromise promise) {
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
...
}
}
在AbstractNioUnsafe 的connect()方法中,首先调用doConnect()方法进行实际上的Socket 连接,当连接上后会调用fulfillConnectPromise()方法:
private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
if (!wasActive && active) {
pipeline().fireChannelActive();
}
}
我们看到,在fulfillConnectPromise()中,会通过调用pipeline().fireChannelActive()方法将通道激活的消息(即Socket 连接成功)发送出去。而这里,当调用pipeline.fireXXX 后,就是Inbound 事件的起点。因此当调用 pipeline().fireChannelActive()后,就产生了一个ChannelActive Inbound 事件,我们就从这里开始看看这个Inbound事件是怎么传播的?
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
果然, 在fireChannelActive()方法中,调用的是head.invokeChannelActive(),因此可以证明Inbound 事件在Pipeline中传输的起点是head。那么,在head.invokeChannelActive()中又做了什么呢?
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
上面的代码应该很熟悉了。回想一下在Outbound 事件(例如connect()事件)的传输过程中时,我们也有类似的操作:
1、首先调用findContextInbound(),从Pipeline 的双向链表中中找到第一个属性inbound 为true 的Context,然后将其返回。
-
2、调用Context 的invokeChannelActive()方法,invokeChannelActive()方法源码如下:
-
private void invokeChannelActive() { if (invokeHandler()) { try { ((ChannelInboundHandler) handler()).channelActive(this); } catch (Throwable t) { notifyHandlerException(t); } } else { fireChannelActive(); } }
这个方法和Outbound 的对应方法(如:invokeConnect()方法)如出一辙。与Outbound 一样,如果用户没有重写channelActive() 方法,那就会调用ChannelInboundHandlerAdapter 的channelActive()方法:
-
同样地, 在ChannelInboundHandlerAdapter 的channelActive()中,仅仅调用了ctx.fireChannelActive()方法,因此就会进入Context.fireChannelActive() -> Connect.findContextInbound() ->nextContext.invokeChannelActive() ->nextHandler.channelActive() -> nextContext.fireChannelActive()这样的循环中。同理,tail 本身既实现了ChannelInboundHandler 接口,又实现了ChannelHandlerContext 接口,因此当channelActive()消息传递到tail 后,会将消息转递到对应的ChannelHandler 中处理,而tail 的handler()返回的就是tail 本身:
TailContext 的channelActive()方法是空的。如果大家自行查看TailContext 的Inbound 处理方法时就会发现,它们的实现都是空的。可见,如果是Inbound,当用户没有实现自定义的处理器时,那么默认是不处理的。下图描述了Inbound事件的传输过程:
5.3 事件总结
5.3.1 Outbound 事件总结
1、Outbound 事件是请求事件(由connect()发起一个请求,并最终由unsafe 处理这个请求)。
2、Outbound 事件的发起者是Channel。
3、Outbound 事件的处理者是unsafe。
4、Outbound 事件在Pipeline 中的传输方向是tail -> head,这里指ContextHandler
5、在ChannelHandler 中处理事件时,如果这个Handler 不是最后一个Handler,则需要调用ctx 的方法(如: ctx.connect()方法)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。
6、Outbound 事件流:Context.OUT_EVT() -> Connect.findContextOutbound() -> nextContext.invokeOUT_EVT()-> nextHandler.OUT_EVT() -> nextContext.OUT_EVT()
5.3.2 Inbound 事件总结
1、Inbound 事件是通知事件,当某件事情已经就绪后,通知上层。
2、Inbound 事件发起者是unsafe。
3、Inbound 事件的处理者是Channel,如果用户没有实现自定义的处理方法,那么Inbound 事件默认的处理者是 TailContext,并且其处理方法是空实现。
4、Inbound 事件在Pipeline 中传输方向是head -> tail。
5、在ChannelHandler 中处理事件时,如果这个Handler 不是最后一个Handler,则需要调用ctx.fireIN_EVT()事 件(如:ctx.fireChannelActive()方法)将此事件继续传播下去。如果不这样做,那么此事件的传播会提前终止。
6、Outbound 事件流:Context.fireIN_EVT() -> Connect.findContextInbound() -> nextContext.invokeIN_EVT() ->nextHandler.IN_EVT() -> nextContext.fireIN_EVT().outbound 和inbound 事件设计上十分相似,并且Context 与Handler 直接的调用关系也容易混淆,因此我们在阅读这里的源码时,需要特别的注意。
3.ChannelHandler
ChannelHandler类似于Servlet的Filter 过滤器,负责对I/O事件或者I/O操作进行拦
截和处理,它可以选择性地拦截和处理自己感兴趣的事件,也可以透传和终止事件的传递。
基于ChannelHandler接口,用户可以方便地进行业务逻辑定制,例如打印日志、统一
封装异常信息、性能统计和消息编解码等。
ChannelHandler支持注解,目前支持的注解有两种。
- Sharable: 多个ChannelPipeline共用同一个ChannelHandler;
- Skip:被Skip注解的方法不会被调用,直接被忽略。
3.1 ChannelHandlerContext
每个ChannelHandler 被添加到ChannelPipeline 后,都会创建一个ChannelHandlerContext 并与之创建的 ChannelHandler 关联绑定。ChannelHandlerContext 允许ChannelHandler 与其他的ChannelHandler 实现进行交互。 ChannelHandlerContext 不会改变添加到其中的ChannelHandler,因此它是安全的。下图描述了 ChannelHandlerContext、ChannelHandler、ChannelPipeline 的关系:
3.2 Netty提供的ChannelHandler
netty提供了部分handler,开发者可以选择继承实现部分功能
编解码器:ByteToMessageDecoder....
半包处理:DelimiterBasedFrameDecoder,FixedLengthFrameDecoder...
参考:《Netty权威指南》