自顶向下深入分析Netty(七)--ChannelHandlerContext源码实现

7.2.2 ChannelHandlerContext

7.2.2.1 AbstractChannelHandlerContext

AbstractChannelHandlerContext的类签名如下:

    abstract class AbstractChannelHandlerContext extends
              DefaultAttributeMap implements ChannelHandlerContext

该类作为其他ChannelHandlerContext的基类,Netty4.0中继承自DefaultAttributeMap实现属性键值对的存储和获取,由于此种用法与channel.attr()存在混淆,故不建议使用。确有需要时,请直接使用channel.attr()
下面介绍其中的关键字段:

    // Context形成双向链表,next和prev分别是后继节点和前驱节点
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
    
    private final boolean inbound;  // 对应处理器为InboudHandler
    private final boolean outbound; // 对应处理器为OutboudHandler
    private final DefaultChannelPipeline pipeline;  
    private final String name;  // Context的名称
    private final boolean ordered;  // 事件顺序标记
    
    final EventExecutor executor; // 事件执行线程
    
    private volatile int handlerState = INIT;   // 状态

其中handlerState的字面意思容易使人误解,为此列出四种状态并加以解释:

    // 初始状态
    private static final int INIT = 0; 
    // 对应Handler的handlerAdded方法将要被调用但还未调用
    private static final int ADD_PENDING = 1;
    // 对应Handler的handlerAdded方法被调用
    private static final int ADD_COMPLETE = 2;
    // 对应Handler的handlerRemoved方法被调用
    private static final int REMOVE_COMPLETE = 3;

AbstractChannelHandlerContext只有一个构造方法:

    AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, 
              EventExecutor executor, String name, 
              boolean inbound, boolean outbound) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.inbound = inbound;
        this.outbound = outbound;
        
        // 只有执行线程为EventLoop或者标记为OrderedEventExecutor才是顺序的
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

由于Netty将事件抽象为入站事件和出站事件,AbstractChannelHandlerContext对事件的处理也分为两类,分别以ChannelRead和read事件为例分析。首先分析ChannelRead事件:

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg){
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

回顾之前的分析,事件在ChannelPipeline中不会自动流动,入站事件需要调用fireXXX方法将事件传递到下一个处理器,出站事件需要调用诸如write方法传递给下一个处理器处理。查找入站处理器的代码如下:

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next; // 入站事件向后查找
        } while (!ctx.inbound);
        return ctx;
    }

代码十分简洁,向双向链表前进方向查找到最近的入站处理器即可;可推知,出站事件则向双向链表后退方向查找最近的出站处理器。这也是出站\入站事件传播方向不同的原因。
找到事件传播的下一个处理器后,在处理器中执行处理过程的代码如下:

    static void invokeChannelRead(final AbstractChannelHandlerContext next, 
              final Object msg) {
        ObjectUtil.checkNotNull(msg, "msg");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            // 使用当前IO线程执行用户自定义处理
            next.invokeChannelRead(msg);
        } else {
            // 使用用户定义的线程执行处理过程
            executor.execute(() -> {next.invokeChannelRead(msg);});
        }
    }

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {   
                // 处理器执行用户自定义的处理过程
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }

Netty为用户提供了方便的线程切换处理,特别是在版本4.1后,可以直接使用JDK提供的线程池。最佳实践:在一个处理器中如果有耗时较大的业务逻辑,可以指定该处理器的处理线程Executor,以避免占用IO线程造成性能损失。
之后再看一下invokeHandler()notifyHandlerException(t)方法,invokeHandler决定是否调用处理器,进行业务逻辑处理:

    private boolean invokeHandler() {
        // handlerState为volatile变量,存储为本地变量,以便减少volatile读
        int handlerState = this.handlerState;
        return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
    }

可知,当一个处理器还没有调用HandlerAdded方法时,只有处理器的执行线程是非顺序线程池的实例才能执行业务处理逻辑;否则必须等待已调用handlerAdded方法,才能处理业务逻辑。这部分的处理,保证了ChannelPipeline的线程安全性,由此用户可以随意增加删除Handler。
notifyHandlerException方法对处理过程中出现的异常进行处理:

    private void notifyHandlerException(Throwable cause) {
        if (inExceptionCaught(cause)) {
            // 处理异常的过程中出现异常
            logger.warn("...)
            return;
        }
        invokeExceptionCaught(cause);
    }
    
    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                logger.warn("...")
            }
        } else {
            fireExceptionCaught(cause);
        }
    }

invokeExceptionCaught方法可类比invokeChannelRead方法,其形式一致。由代码可知,当一个Handler的处理过程出现异常时,会调用该Handler的exceptionCaught()方法进行处理。注意:异常事件的传播也是由用户使用fireExceptionCaught方法控制。
至此,ChannelRead入站事件的处理已分析完毕,read出站事件的处理也可类比,不同的是:出站事件需要反向找出站处理器Handler,不再赘述。

7.2.2.2 DefaultChannelHandlerContext

DefaultChannelHandlerContext是使用Netty时经常使用的事实Context类,其中的大部分功能已在AbstractChannelHandlerContext中完成,所以该类十分简单,其构造方法如下:

    DefaultChannelHandlerContext(DefaultChannelPipeline pipeline, 
              EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

对出站\入站Handler的判别使用粗暴的instanceof:

    private static boolean isInbound(ChannelHandler handler) {
        return handler instanceof ChannelInboundHandler;
    }

    private static boolean isOutbound(ChannelHandler handler) {
        return handler instanceof ChannelOutboundHandler;
    }

实现中引入了一个新的字段:

    private final ChannelHandler handler;

回忆ChannelPipeline,pipeline其中形成Context的双向链表,而处理逻辑则在Handler中。设计模式中指出,关联Handler与Context的方法有两种:组合继承。本例中,使用的是组合,下面分析继承的方法。

7.2.2.3 HeadContext和TailContext

HeadContext和TailContext使用继承的方式关联Handler,作为ChannelPipeline双向链表的头节点和尾节点。首先分析HeadContext,类签名如下:

    final class HeadContext extends AbstractChannelHandlerContext
                 implements ChannelOutboundHandler, ChannelInboundHandler

由于既继承OutboundHandler又继承InboundHandler,可知HeadContext需要同时处理出站事件和入站事件。以read和ChannelRead事件为例,当用户调用read出站事件时,是在告诉IO线程:我需要向网络读数据做处理;当IO线程读到数据后,则使用ChannelRead事件通知用户:已读取到数据。read()方法代码如下:

    public void read(ChannelHandlerContext ctx) {
        unsafe.beginRead();
    }

其中unsafe是Netty内部实现底层IO细节的类,beginRead()方法设定底层Selector关心read事件,如果read事件就绪,则会调用unsafe.read()方法读取数据,然后调用channelPipe.fireChannelRead()方法通知用户已读取到数据,可进行业务处理。 HeadContext的ChannelRead()方法代码如下:

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

可知HeadContext只是简单的将事件传播到下一个入站处理器。
也许你会有疑问:自己根本没使用过read出站事件,为什么数据自动读取了呢?这是因为默认设置自动读取autoRead。通过HeadContext的channelActive代码分析:

    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
        readIfIsAutoRead(); // 自动读取
    }
        
    private void readIfIsAutoRead() {
        if (channel.config().isAutoRead()) {
            channel.read();
        }
    }

当channel激活后,如果配置了自动读取,则会调用channel.read(),注意这将在ChannelPipeline中传播出站事件read,最终传播到HeadContext的read()方法,最后调用unsafe.beginRead()设置关心底层read事件,从而实现激活后自动读取数据。而当读取完一组数据后,在channelReadComplete()方法中继续下一组数据的自动读取。

    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
        readIfIsAutoRead(); // 自动读取下一组数据
    }

分析完HeadContext,再分析TailContext。首先看方法签名:

    final class TailContext extends AbstractChannelHandlerContext 
            implements ChannelInboundHandler

TailContext只继承入站事件处理器,作为双向链表的尾节点,它对入站事件的处理极其简单,那就是:什么也不做。但有两个方法除外:

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
            throws Exception {
        onUnhandledInboundException(cause);
    }

    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        onUnhandledInboundMessage(msg);
    }

而这也仅仅是做个简单提示,告知用户这两个方法必须在ChannelPipeline中有处理器处理,否则很有可能是代码出了问题:

    protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
              "Discarded inbound message {} that reached at the tail of the pipeline. " +
              "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

如果开启了日志的Debug级别,不难相信,你已经接触过这样的提示。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容