4-netty源码分析之Pipeline
先用一张图来描叙下netty的piepline
由图可以看得出来,netty的Pipeline其实是由ChannelHandlerContext组成的一个双向链表,ChannelHandlerContext这个又是什么呢,由名字就可以知道是ChannelHandler的context,也就可以理解为channel handler执行的上下文,提供给handler执行时所需要的资源。
那么进一步理解,其实netty 的 Pipeline其实是一个由Handler组成的双向链表。那么我们一步步分析下这个Pipeline。
1.Pipeline初始化
其实这里很简单,Pipeline是什么时候初始话的呢?初始化的时候又是个什么样子呢?
其实在介绍server时就讲到了,先看一段代码[AbstractChannel]:
protected AbstractChannel(Channel parent) {
this.parent = parent;
/** AbstractNioByteChannel.NioByteUnsafe 内部类,newUnsafe()具体的子类实现*/
unsafe = newUnsafe();
/** Each channel has its own pipeline and it is created automatically when a new channel is created. */
pipeline = newChannelPipeline();
}
这个就是在server启动的时初始化channel所要执行的代码。其中就初始化了ChannelPipeline
pipeline = newChannelPipeline();
注释也很清楚
Each channel has its own pipeline and it is created automatically when a new channel is created.
我们继续:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
/** 维护了一个以 AbstractChannelHandlerContext 为节点的双向链表 */
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
可以看到这里创建的是DefaultChannelPipeline,也看到了我们想要看到的双向链表的“指针”。
DefaultChannelPipeline 实现于 ChannelPipeline,可以看到ChannelPipeline的注释中画的很清楚:
* I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
这里InBound和OutBound前面有讲过,代表pipeline中handler的流向,基本分为从socket中读数据 到 ByteBuf 与 将ByteBuf中的数据写入Socket。
我们继续回到上面的构造器:
- 1.先将channel赋值
- 2.创造tail节点
- 3.创建head节点
- 4.将头尾节点先后串起来形成双向链表
那么刚刚初始化好的Pipeline是这个样子的:
那么可以猜想,链表既然已经形成,我们的业务有需要很多各种Handler,那么这里是哪里取添加的呢?这个问题后面再解释,但大概已经知道添加无非就是形成一个HandlerContext,然后在head与tail之间改变prev与next指针的指向了。
那么我们在细看下HeadContext与TailContext
从图中就首先关注下Unsafe成员,前面讲过,netty底层的所有相关操作都是由Unsafe去完成的,那么自然可以猜想,当pipeline中处理器流向head时最终会调用Unsafe去处理底层相关的操作,那么究竟是不是这样的呢?
我们从业务角度解释一下这个猜想:
Pipeline是所有Handler的链路汇总,业务在写自己的Handler时一般只与业务相关,比如编解码、序列化等,那么数据最终会向下一个handler传递,最终会落到head或者tail 节点,那么这里自然会将数据写出或者读入了,因此这里的Unsafe存在自然就很合适了。
那么head作为第一个节点,数据首次流入自然第一个经过head了,我们跟着上篇的NioEventLoop进行debug一下,还记得处理事件的入口:
processSelectedKeys();
/** boos reactor处理新的连接 或者 worker reactor 处理 已存在的连接有数据可读 */
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
/** AbstractNioByteChannel中实现,重点 */
unsafe.read();
}
前面讲过NioMessageUnsafe处理连接相关事件,NioByteUnsafe处理后续的读写事件,那么我们跟踪数据读取:
@Override
public final void read() {
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}
final ChannelPipeline pipeline = pipeline();
/** 创建ByteBuf分配器 */
final ByteBufAllocator allocator = config.getAllocator();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
if (allocHandle == null) {
this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
}
ByteBuf byteBuf = null;
int messages = 0;
boolean close = false;
try {
int totalReadAmount = 0;
boolean readPendingReset = false;
do {
/** 分配一个ByteBuf */
byteBuf = allocHandle.allocate(allocator);
int writable = byteBuf.writableBytes();
/** 委托到外部类NioSocketChannel读, 将数据读取到分配的ByteBuf中去 */
int localReadAmount = doReadBytes(byteBuf);
if (localReadAmount <= 0) {
// not was read release the buffer
byteBuf.release();
byteBuf = null;
close = localReadAmount < 0;
if (close) {
// There is nothing left to read as we received an EOF.
setReadPending(false);
}
break;
}
if (!readPendingReset) {
readPendingReset = true;
setReadPending(false);
}
/**
* pipeline.fireChannelRead 正好 ChannelPipeline 中的 inbound 事件起点.
* 当调用了 pipeline.fireIN_EVT() 后, 那么就产生了一个 inbound 事件, 此事件会以 head -> customContext -> tail 的方向依次流经 ChannelPipeline 中的各个 handler.
* 调用了 pipeline.fireChannelRead 后, 就是 ChannelPipeline 中所需要做的工作了
*/
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
// Avoid overflow.
totalReadAmount = Integer.MAX_VALUE;
break;
}
totalReadAmount += localReadAmount;
// stop reading
if (!config.isAutoRead()) {
break;
}
if (localReadAmount < writable) {
break;
}
} while (++ messages < maxMessagesPerRead);
/**
* @see DefaultChannelPipeline#fireChannelRead
*/
pipeline.fireChannelReadComplete();
allocHandle.record(totalReadAmount);
if (close) {
closeOnRead(pipeline);
close = false;
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close);
} finally {
...
}
}
划重点:
pipeline.fireChannelRead(byteBuf);
pipeline传递读事件开始
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
可以看到事件第一流转的head节点。看看head做了什么?
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
此时head也就是简单的传递下读事件,没做其他事情。
继续回到主代码中:
pipeline.fireChannelReadComplete();
经过一些列的传递之后:
/** 继续向reactor线程注册读事件 */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
上面代码以调用链形式按顺序贴出,可以看到read开始从tail节点传递,传递完事件之后,进行了读注册,也就代表当前channel对读事件感兴趣,紧接着开启自动读取模式的,然后channel只要是活着,就可以连续读数据了。可以看到事件最后传递到head节点,最终会通过UnSafe#beginRead进行实际的读操作。
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
可以看到这里doBeginRead也就是将readInterestOp在有必要的时候加上。
head介绍到这里,其实tail也类似。这里比较简单,至于如何传播的细节,我们后面会记录
2.pipeline中handler的添加或移除
其实明白了pipeline的结构,就应该清楚添加或者移除节点的内部原理,那么何时触发?
我们以启动server为例看一下:
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler());
}
});
这段demo启动时,在initChannel中有个
p.addLast(new EchoServerHandler());
这段逻辑触发时间前面有讲过,就是连接进入的时候,启动workGroup时机会进入这段逻辑。
我们重点跟踪下本次的核心,添加节点。
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
找到核心代码:
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
/** 检查此 handler 是否有重复的名字 */
checkMultiplicity(handler);
/** 为这个 Handler 创建一个对应的 DefaultChannelHandlerContext 实例, 并与之关联起来 */
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
/** 开始回调用户代码 */
callHandlerAdded0(newCtx);
return this;
}
顺序分解:
- 1.检查是否重复添加
- 2.检查是否重名,并生成一个名字
- 2.new 一个context,将本次handler给context
- 3.将context add 进piepline
- 4.回调用户扩展代码
逻辑再简单不过了,详细如下:
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
/**
* 如果当前要添加的Handler是非共享的,并且已经添加过,那就抛出异常,否则,标识该handler已经添加
* 一个Handler如果是sharable的,就可以无限次被添加到pipeline中,我们客户端代码如果要让一个Handler被共用,只需要加一个@Sharable标注即可,注解见:
* @see ChannelHandlerAdapter#isSharable
*/
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
如果想重复添加,只需要标识改handler是共享的,直接@Sharable即可。
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
/**
* 如果用户代码在添加Handler的时候指定了一个name,那么要做到事仅仅为检查一下是否有重复
*/
checkDuplicateName(name);
return name;
}
private String generateName(ChannelHandler handler) {
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid
// any name conflicts. Note that we don't cache the names generated here.
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.
for (int i = 1;; i ++) {
String newName = baseName + i;
/**
* 检查name是否和已有的name有冲突,调用context0(),查找pipeline里面有没有对应的context
* 如果有,则一直往上生成name,比如:"类名#1","类名#2",...
*/
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
在生成context之前,先检查重名,若没有传入名字直接生成一个名字,eg;类名#0,
但如果冲突,,就会一直往下生成,eg:类名#1...
上面context方法也简单,就是遍历双向链表,看看是否名字有冲突。
如果有指定名字,那就直接检查一下是否名字冲突就OK。
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
最后add进链表,跟之前讲的一样,无非是该表prev跟next的指向罢了。此时的piepiline就是这样
移除原理类似,这里不多记录。
可以看到add跟remove是可以动态调用的,也就是这个Piepline是可以动态编排的,这一点即简单又强大。
3.pipeline事件传播机制
记录完了piepline的初始化跟后期维护,那么接下来就是piepline事件传播的原理了。
依然从下面的这行代码出发:
pipeline.fireChannelReadComplete();
其实在netty整个个代码里面,fireXXX之类的方法基本就属于触发事件的传递了,我们简单跟踪下逻辑。
@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}
static void invokeChannelReadComplete(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelReadComplete();
} else {
Runnable task = next.invokeChannelReadCompleteTask;
if (task == null) {
next.invokeChannelReadCompleteTask = task = new Runnable() {
@Override
public void run() {
next.invokeChannelReadComplete();
}
};
}
executor.execute(task);
}
}
private void invokeChannelReadComplete() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelReadComplete(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelReadComplete();
}
}
可以看到上叙逻辑第一个就找到了head节点,然后判断是否是EventLoop线程,如果是直接调用head的invokeChannelReadComplete,否则建立一个任务,扔进任务队列,等待EventLoop去执行处理。
那直接到head中看看执行的逻辑:
/** 继续向reactor线程注册读事件 */
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead();
}
可以看到这里就是简单的传递事件,也就是说此时head不做任何处理,仅仅是传递Pipeline事件的开始,那么核心逻辑就在AbstractChannelHandlerContext里了,我们看看这里怎么去找下一个节点的。
@Override
public ChannelHandlerContext fireChannelReadComplete() {
invokeChannelReadComplete(findContextInbound());
return this;
}
主体逻辑递归回来了,这里只要记得递归处理就好,那么我们看看findContextInbound()找到的是哪个节点:
/**
* 从 head 开始遍历 Pipeline 的双向链表, 然后找到第一个属性 inbound 为 true 的 ChannelHandlerContext 实例 --> ChannelInitializer
*/
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
此处逻辑就是循环遍历双向链表,找到第一个inbound节点,找到之后继续执行如下逻辑:
next.invokeChannelReadComplete();
如此反复递归,找到知道实际处理的节点,执行真正的读写操作。
因此事件传播在netty中还是比较容易理解的。
其实异常的传递也是其中一个重点,道理类似,可以线下详细跟踪一番。