前言
说到Netty,不得不提ChannelPipeline
,使用一种拦截过滤链模式
的设计,来处理或拦截Channel
的入站事件
以及出站操作
利用这种设计模式,能够让用户完全控制事件
应该被如何处理
以及在pipeline内各ChannelHandler之间如何相互交互
PipeLine如何使用
1. 如何创建一个PipeLine
每个
Channel
都有属于自己的PipeLine
,当我们在new 一个Channel
的时候将自动创建
一个其对应的Pipeline实例
,默认是一个DefaultChannelPipeline
如何自动创建
每个AbstractChannel的子类
(例如NioServerSocketChannel
,NioSocketChannel
)在调用各自构造函数实例化自身时,都会逐级向上调用其抽象父类AbstractChannel
的protected修饰的构造方法
以NioServerSocketChannel
为例,注意ServerSocketChannel
是JDK nio包
下提供的可被选择的Channel
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里逐级向上调用直至AbstractChannel抽象类的受保护构造函数
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
// 为属性pipeline赋值一个DefaultChannelPipeline实例
pipeline = newChannelPipeline();
}
// new一个ChannelPipeline实例
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
可以看到这里是调用newChannelPipeline方法,new了一个DefaultChannelPipeline实例
,同时我们也需要注意到每个Channel实例都有一个pipeline属性,而每个pipeline实例也有一个channel属性
,后面分析DefaultChannelPipeline的类结构会细说
2. 事件如何在pipe中流转
官方注释给出的一张图很形象的描述了I/O事件是如何在pipeline中被各个handler处理的,这里直接贴出来
一个I/O事件要么被一个
ChannelInboundHandler(入站处理器
)处理,要么被一个
ChannelOutboundHandler(出站处理器
)处理,然后通过调用一个被定义
在ChannelHandlerContext
(处理器上下文
)中的事件传递方法
(如fireChannelRead(msg)
或write(msg)`等)将I/O事件继续转发给离这个处理器最近的一个处理器处理,这就是整个I/O事件在pipeline中流转的核心
流转核心描述中引出了入站处理器
和出站处理器
以及处理器上下文
的概念
3. 入站/出站处理器
说到处理器有入站和出站的分别,那么为什么需要分开呢?原因是
因为我们的I/O事件有入站事件和出站事件
导致的
正因为IO事件有入站和出站之分,才有了与其对应的处理器,即入站处理器只会处理入站事件,出站处理器只处理出站事件
- Inbound event 入站事件
入站事件
被各个入站处理器
处理,采用自底而上
的方法,即上图的左边部分
一个入站处理器通常处理来自I/O线程(NioEventLoop线程)生成的入站数据,而入站数据
通常是从远程实际的输入操作
(如SocketChannel的read(ByteBuffer))读取来的
我们可以这样理解,当某个NioSocketChannel的读事件就绪
时,Netty的NioEventLoop
事件循环会触发去该事件对应的Channel
中读取数据
(实际上也是发起操作系统的recvfrom系统调用
,将底层socket读缓冲区
的数据拷贝
到应用程序的内存空间
中)到ByteBuf中
,数据拷贝完成
后,会开启该Channel对应的pipeline中该I/O事件的流转
开启事件流转的代码我们简单看下,至于NioEventLoop事件循环部分可以看我的Netty源码之NioEventLoop详解
这里以NioSocketChannel为例,实际被触发的读操作是其抽象父类 AbstractNioByteChannel
中定义的子Unsafe类(NioByteUnsafe
)操作的读
@Override
public final void read() {
// ...省略其他代码
// 1. 获取到该Channel所属的pipeline
final ChannelPipeline pipeline = pipeline();
try {
do {
// ...省略其他代码
// 2. 读取Channel数据到byteBuf中,实际上是调用系统
allocHandle.lastBytesRead(doReadBytes(byteBuf));
// ...省略其他代码
// 3. 激活pipeline的通道读流程
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
// 4. 当没有继续可读时, 触发pipeline的通道读完成
pipeline.fireChannelReadComplete();
// ...省略其他代码
} catch (Throwable t) {
// 5. 发生异常时执行这里
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// ...省略其他代码
}
}
根据源码可以看到
每当
allocHandle.continueReading()返回true
,就会触发pipeline.fireChannelRead(byteBuf)
读取完成,会触发
pipeline.fireChannelReadComplete()
如果发生
异常
会执行handleReadException(...)
方法
这里看下异常处理方法做了什么
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
// 1. 触发读完成
pipeline.fireChannelReadComplete();
// 2. 触发异常
pipeline.fireExceptionCaught(cause);
if (close || cause instanceof IOException) {
closeOnRead(pipeline);
}
}
可以看到这里发生了异常,就会触发pipeline的异常处理流程
至此,我们整个IO事件触发pipeline处理流程开始前面的部分已经分析完了,下面我会带领大家尝试设计整个pipeline,继而分析
IO事件从
激活pipeline操作到
具体某个handler是如何
处理`的流程
设计ChannelPipeline
基于以上pipeline所具有的能力,我们将其划分为三部分
添加/删除/获取Handler的能力 即自身
ChannelPipeline
接口定义激活相应入站事件的能力
ChannelInboundInvoker
接口定义激活相应出站事件的能力
ChannelOutboundInvoker
接口定义
接口ChannelInboundInvoker
通道入站事件调用者接口,该接口定义了所有入站事件的调用
- 激活通道读,并返回pipleline中的下一个调用者
ChannelInboundInvoker fireChannelRead(Object msg);
- 激活通道读完成,返回下一个调用者
ChannelInboundInvoker fireChannelReadComplete();
- 激活
通道激活
,当通道connected完成
时触发
ChannelInboundInvoker fireChannelActive();
- 激活通道注册完成, 当通道registered成功触发
ChannelInboundInvoker fireChannelRegistered();
- ...
以上即为通道入站调用者接口定义
ChannelOutboundInvoker
通道出站调用者接口,定义了一些关于请求绑定
指定端口地址,连接
指定端口地址,以及写
,冲刷
数据等方法定义
// 请求绑定端口
ChannelFuture bind(SocketAddress localAddress);
// 请求连接端口
ChannelFuture connect(SocketAddress remoteAddress);
// 请求写数据
ChannelFuture write(Object msg);
// 请求冲刷数据
ChannelOutboundInvoker flush();
// ...
ChannelFuture writeAndFlush(Object msg, ChannelPromise promise);
ChannelPipeline
public interface ChannelPipeline
extends ChannelInboundInvoker, ChannelOutboundInvoker
// 增加
ChannelPipeline addFirst(String name, ChannelHandler handler);
// 尾部增加
ChannelPipeline addLast(String name, ChannelHandler handler);
// 删除指定handler
ChannelHandler remove(String name);
// 替换指定handler
ChannelPipeline replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
// 获取
ChannelHandler first();
ChannelHandler last();
ChannelHandler get(String name);
// ...
以上接口设计分析完毕,我们进入正题,看下真实的激活通道读做了什么事情
DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
这里激活通道读事件以后,利用了AbstractChannelHandlerContext这个抽象类,并调用了其invokeChannelRead方法,并传入了pipeline的首元素head
这里简单画下pipeline图
这里的head/tail的类型都是AbstractChannelHandlerContext类型,即pipeline中并不持有具体的Handler,而是将每一个add进去的handler保证成一个handlerContext并将其追加到pipeline链上,即pipeline链上的每一个节点都是handlerContext
我们看下pipeline的addLast(...)方法就明白了
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
// 1. 封装为handlerContext
newCtx = newContext(group, filterName(name, handler), handler);
// 2. 追加到pipeline链上
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventLoop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
callHandlerAddedInEventLoop(newCtx, executor);
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
// 追加进pipeline链
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
这里就是将添加的handler封装成一个handlerContext
后再追加到pipeline的链上
追加也就是追加到tail前,真正的tail节点保持不动,始终是尾节点
ChannelHandlerContext
- UML
是一个pipeline中的节点,不仅内部持有具体的Handler处理器,而且实现了ChannelInboundInvoker以及ChannelOutboundInvoker接口,说明其具有了调用者的能力
- 子实现类
AbstractChannelHandlerContext
既然是一个节点,那么自身肯定持有前/后引用
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
// 前一个节点引用
volatile AbstractChannelHandlerContext next;
// 后一个节点引用
volatile AbstractChannelHandlerContext prev;
// 持有所属的pipeline链引用
private final DefaultChannelPipeline pipeline;
// ...
}
接下来重点分析这个AbstractChannelHandlerContext类,pipeline中的事件传递流转流程重点就是该类,下面我们看下他是怎么一个一个将事件流转下去的
上面分析到当调用pipeline的激活通道读事件后,利用了抽象类AbstractChannelHandlerContext的invokeChannelRead方法并传入了pipeline的首元素节点head
直接看源码
// 执行通道读
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获取执行该通道读的执行器,使用事件循环的单线程执行
EventExecutor executor = next.executor();
// 判断此次通道读线程是否是该channel所注册的事件循环线程
// 是的话就执行利用channelContext的invoke执行,否则就以任务的方式提交通道读,从而保证一定使用事件循环线程执行方法
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
// 执行通道读
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
该方法是一个static修饰的静态方法,方法表示的是:
根据传入的是哪个context,就执行哪个context持有的handler的channelRead方法
注意该执行是一次性的,如果想让读事件继续流转,需要在你的handler中再调用一次其context的fireChannelRead方法
原因在这里
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
只有当我们调用了channelContext的fireChannelRead方法,才能触发findContextInbound找到下一个context,从而再调用invokeChannelRead方法去执行,如此循环往复,就将整个pipeline链中的所有handler都执行了一遍了,这就是pipeline链路事件流转的秘密所在
看下findContextInbound方法做了什么
private AbstractChannelHandlerContext findContextInbound(int mask) {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while ((ctx.executionMask & mask) == 0);
return ctx;
}
就是去找到当前handlerContext的下一个handlerContext
至此,整个pipeline的源码分析就结束了,其中有一个重要的一点需要注意的是:
我们pipeline链中的事件流转的各个handler的执行都是用Channel注册的那一个事件循环线程来执行的,如果我们在某个我们自定义的handler中做了阻塞的事情,会造成整个事件循环NioEventLoop单线程的阻塞
总结
最后以几点来总结下整个ChannelPipeline源码分析
每个Netty的Channel
在实例化
时自动创建与其对应的Pipeline
,二者相互持有对方的引用ChannelPipeline
的设计思想
就是拦截过滤器链
,通过其定义的相应的添加/删除/替换方法,我们可以很方便的在整个链中加入我们想要做的业务处理handler
,当有相应的IO事件到来时,会触发pipeline的各类激活方法以传递事件实际上
Pipeline并不直接持有
被添加进的各个Handler的引用,而是将每个
被添加的Handler封装成了handlerContext(一个节点),
并将其
添加到自己的链上`,其自身仅持有两个节点引用(head/tail)以及一个重要的channel引用需要注意的只要是
注册在同一个NioEventLoop
上的Channel的入站/出站处理,都是基于该NioEventLoop的单线程处理的,如果某个handler处理时间过长
(阻塞线程),会造成整个NioEventLoop的阻塞