7.2.2 ChannelHandlerContext
7.2.2.1 AbstractChannelHandlerContext
AbstractChannelHandlerContext的类签名如下:
abstract class AbstractChannelHandlerContext extends
DefaultAttributeMap implements ChannelHandlerContext
该类作为其他ChannelHandlerContext的基类,Netty4.0中继承自DefaultAttributeMap实现属性键值对的存储和获取,由于此种用法与channel.attr()
存在混淆,故不建议使用。确有需要时,请直接使用channel.attr()
。
下面介绍其中的关键字段:
// Context形成双向链表,next和prev分别是后继节点和前驱节点
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
private final boolean inbound; // 对应处理器为InboudHandler
private final boolean outbound; // 对应处理器为OutboudHandler
private final DefaultChannelPipeline pipeline;
private final String name; // Context的名称
private final boolean ordered; // 事件顺序标记
final EventExecutor executor; // 事件执行线程
private volatile int handlerState = INIT; // 状态
其中handlerState
的字面意思容易使人误解,为此列出四种状态并加以解释:
// 初始状态
private static final int INIT = 0;
// 对应Handler的handlerAdded方法将要被调用但还未调用
private static final int ADD_PENDING = 1;
// 对应Handler的handlerAdded方法被调用
private static final int ADD_COMPLETE = 2;
// 对应Handler的handlerRemoved方法被调用
private static final int REMOVE_COMPLETE = 3;
AbstractChannelHandlerContext只有一个构造方法:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
// 只有执行线程为EventLoop或者标记为OrderedEventExecutor才是顺序的
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
由于Netty将事件抽象为入站事件和出站事件,AbstractChannelHandlerContext对事件的处理也分为两类,分别以ChannelRead和read事件为例分析。首先分析ChannelRead事件:
@Override
public ChannelHandlerContext fireChannelRead(final Object msg){
invokeChannelRead(findContextInbound(), msg);
return this;
}
回顾之前的分析,事件在ChannelPipeline中不会自动流动,入站事件需要调用fireXXX
方法将事件传递到下一个处理器,出站事件需要调用诸如write
方法传递给下一个处理器处理。查找入站处理器的代码如下:
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next; // 入站事件向后查找
} while (!ctx.inbound);
return ctx;
}
代码十分简洁,向双向链表前进方向查找到最近的入站处理器即可;可推知,出站事件则向双向链表后退方向查找最近的出站处理器。这也是出站\入站事件传播方向不同的原因。
找到事件传播的下一个处理器后,在处理器中执行处理过程的代码如下:
static void invokeChannelRead(final AbstractChannelHandlerContext next,
final Object msg) {
ObjectUtil.checkNotNull(msg, "msg");
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
// 使用当前IO线程执行用户自定义处理
next.invokeChannelRead(msg);
} else {
// 使用用户定义的线程执行处理过程
executor.execute(() -> {next.invokeChannelRead(msg);});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
// 处理器执行用户自定义的处理过程
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
Netty为用户提供了方便的线程切换处理,特别是在版本4.1后,可以直接使用JDK提供的线程池。最佳实践:在一个处理器中如果有耗时较大的业务逻辑,可以指定该处理器的处理线程Executor,以避免占用IO线程造成性能损失。
之后再看一下invokeHandler()
和notifyHandlerException(t)
方法,invokeHandler决定是否调用处理器,进行业务逻辑处理:
private boolean invokeHandler() {
// handlerState为volatile变量,存储为本地变量,以便减少volatile读
int handlerState = this.handlerState;
return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
}
可知,当一个处理器还没有调用HandlerAdded方法时,只有处理器的执行线程是非顺序线程池的实例才能执行业务处理逻辑;否则必须等待已调用handlerAdded方法,才能处理业务逻辑。这部分的处理,保证了ChannelPipeline的线程安全性,由此用户可以随意增加删除Handler。
notifyHandlerException方法对处理过程中出现的异常进行处理:
private void notifyHandlerException(Throwable cause) {
if (inExceptionCaught(cause)) {
// 处理异常的过程中出现异常
logger.warn("...)
return;
}
invokeExceptionCaught(cause);
}
private void invokeExceptionCaught(final Throwable cause) {
if (invokeHandler()) {
try {
handler().exceptionCaught(this, cause);
} catch (Throwable error) {
logger.warn("...")
}
} else {
fireExceptionCaught(cause);
}
}
invokeExceptionCaught方法可类比invokeChannelRead方法,其形式一致。由代码可知,当一个Handler的处理过程出现异常时,会调用该Handler的exceptionCaught()
方法进行处理。注意:异常事件的传播也是由用户使用fireExceptionCaught方法控制。
至此,ChannelRead入站事件的处理已分析完毕,read出站事件的处理也可类比,不同的是:出站事件需要反向找出站处理器Handler,不再赘述。
7.2.2.2 DefaultChannelHandlerContext
DefaultChannelHandlerContext是使用Netty时经常使用的事实Context类,其中的大部分功能已在AbstractChannelHandlerContext中完成,所以该类十分简单,其构造方法如下:
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,
EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
对出站\入站Handler的判别使用粗暴的instanceof:
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
实现中引入了一个新的字段:
private final ChannelHandler handler;
回忆ChannelPipeline,pipeline其中形成Context的双向链表,而处理逻辑则在Handler中。设计模式中指出,关联Handler与Context的方法有两种:组合和继承。本例中,使用的是组合,下面分析继承的方法。
7.2.2.3 HeadContext和TailContext
HeadContext和TailContext使用继承的方式关联Handler,作为ChannelPipeline双向链表的头节点和尾节点。首先分析HeadContext,类签名如下:
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler
由于既继承OutboundHandler又继承InboundHandler,可知HeadContext需要同时处理出站事件和入站事件。以read和ChannelRead事件为例,当用户调用read出站事件时,是在告诉IO线程:我需要向网络读数据做处理;当IO线程读到数据后,则使用ChannelRead事件通知用户:已读取到数据。read()
方法代码如下:
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
其中unsafe
是Netty内部实现底层IO细节的类,beginRead()
方法设定底层Selector关心read事件,如果read事件就绪,则会调用unsafe.read()
方法读取数据,然后调用channelPipe.fireChannelRead()
方法通知用户已读取到数据,可进行业务处理。 HeadContext的ChannelRead()
方法代码如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
可知HeadContext只是简单的将事件传播到下一个入站处理器。
也许你会有疑问:自己根本没使用过read出站事件,为什么数据自动读取了呢?这是因为默认设置自动读取autoRead
。通过HeadContext的channelActive
代码分析:
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead(); // 自动读取
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
当channel激活后,如果配置了自动读取,则会调用channel.read()
,注意这将在ChannelPipeline中传播出站事件read,最终传播到HeadContext的read()
方法,最后调用unsafe.beginRead()
设置关心底层read事件,从而实现激活后自动读取数据。而当读取完一组数据后,在channelReadComplete()
方法中继续下一组数据的自动读取。
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelReadComplete();
readIfIsAutoRead(); // 自动读取下一组数据
}
分析完HeadContext,再分析TailContext。首先看方法签名:
final class TailContext extends AbstractChannelHandlerContext
implements ChannelInboundHandler
TailContext只继承入站事件处理器,作为双向链表的尾节点,它对入站事件的处理极其简单,那就是:什么也不做。但有两个方法除外:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
onUnhandledInboundException(cause);
}
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
onUnhandledInboundMessage(msg);
}
而这也仅仅是做个简单提示,告知用户这两个方法必须在ChannelPipeline中有处理器处理,否则很有可能是代码出了问题:
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
如果开启了日志的Debug级别,不难相信,你已经接触过这样的提示。