Netty 源码解析(四): Netty 的 ChannelPipeline

今天是猿灯塔“365篇原创计划”第四篇。

接下来的时间灯塔君持续更新Netty系列一共九篇

Netty 源码解析(一): 开始

Netty 源码解析(二): Netty 的 Channel

Netty 源码解析(三): Netty 的 Future 和 Promise

当前:Netty 源码解析(四): Netty 的 ChannelPipeline

Netty 源码解析(五): Netty 的线程池分析

Netty 源码解析(六): Channel 的 register 操作

Netty 源码解析(七): NioEventLoop 工作流程

Netty 源码解析(八): 回到 Channel 的 register 操作

Netty 源码解析(九): connect 过程和 bind 过程分析

今天呢!灯塔君跟大家讲:

Netty 的 ChannelPipeline

ChannelPipeline和Inbound、Outbound

我想很多读者应该或多或少都有 Netty 中 pipeline 的概念。前面我们说了,使用 Netty 的时候,我们通常就只要写一些自定义的 handler 就可以了,我们定义的这些 handler 会组成一个 pipeline,用于处理 IO 事件,这个和我们平时接触的 Filter 或 Interceptor 表达的差不多是一个意思。

每个 Channel 内部都有一个 pipeline,pipeline 由多个 handler 组成,handler 之间的顺序是很重要的,因为 IO 事件将按照顺序顺次经过 pipeline 上的 handler,这样每个 handler 可以专注于做一点点小事,由多个 handler 组合来完成一些复杂的逻辑。

从图中,我们知道这是一个双向链表。

首先,我们看两个重要的概念:InboundOutbound。在 Netty 中,IO 事件被分为 Inbound 事件和 Outbound 事件。

Outboundout 指的是 出去,有哪些 IO 事件属于此类呢?比如 connect、write、flush 这些 IO 操作是往外部方向进行的,它们就属于 Outbound 事件。

其他的,诸如 accept、read 这种就属于 Inbound 事件。

比如客户端在发起请求的时候,需要 1️⃣connect 到服务器,然后 2️⃣write 数据传到服务器,再然后 3️⃣read 服务器返回的数据,前面的 connect 和 write 就是 out 事件,后面的 read 就是 in 事件。

比如很多初学者看不懂下面的这段代码,这段代码用于服务端的 childHandler 中:

pipeline.addLast(new StringDecoder());

pipeline.addLast(new StringEncoder());

pipeline.addLast(new BizHandler());

初学者肯定都纳闷,以为这个顺序写错了,应该是先 decode 客户端过来的数据,然后用 BizHandler 处理业务逻辑,最后再 encode 数据然后返回给客户端,所以添加的顺序应该是 1 -> 3 -> 2 才对。

其实这里的三个 handler 是分组的,分为 Inbound(1 和 3) 和 Outbound(2):

1. pipeline.addLast(new StringDecoder());

2. pipeline.addLast(new StringEncoder());

3. pipeline.addLast(new BizHandler());

客户端连接进来的时候,读取(read)客户端请求数据的操作是 Inbound 的,e 操作是 Outbound 的,此时使用的是 2。

处理完数据后,返回给客户端数据的 write 操作是 Outbound 的,此时使用的是 2。

所以虽然添加顺序有点怪,但是执行顺序其实是按照 1 -> 3 -> 2 进行的。

如果我们在上面的基础上,加上下面的第四行,这是一个 OutboundHandler:

4. pipeline.addLast(new OutboundHandlerA());

那么执行顺序是不是就是 1 -> 3 -> 2 -> 4 呢?答案是:不是的。

对于 Inbound 操作,按照添加顺序执行每个 Inbound 类型的 handler;而对于 Outbound 操作,是反着来的,从后往前,顺次执行 Outbound 类型的 handler。

所以,上面的顺序应该是先 1 后 3,它们是 Inbound 的,然后是 4,最后才是 2,它们两个是 Outbound 的。说实话,这种组织方式对新手应该很是头疼。

那我们在开发的时候怎么写呢?其实也很简单,从最外层开始写,一步步写到业务处理层,把 Inbound 和 Outbound 混写在一起。比如 encode 和 decode 是属于最外层的处理逻辑,先写它们。假设 decode 以后是字符串,那再进来一层应该可以写进来和出去的日志。再进来一层可以写 字符串 <=> 对象 的相互转换。然后就应该写业务层了。

到这里,我想大家应该都知道 Inbound 和 Outbound 了吧?下面我们来介绍它们的接口使用。

定义处理 Inbound 事件的 handler 需要实现 ChannelInboundHandler,定义处理 Outbound 事件的 handler 需要实现 ChannelOutboundHandler。最下面的三个类,是 Netty 提供的适配器,特别的,如果我们希望定义一个 handler 能同时处理 Inbound 和 Outbound 事件,可以通过继承中间的 ChannelDuplexHandler 的方式,比如 LoggingHandler 这种既可以用来处理 Inbound 也可以用来处理 Outbound 事件的 handler。

有了 Inbound 和 Outbound 的概念以后,我们来开始介绍 Pipeline 的源码。

我们说过,一个 Channel 关联一个 pipeline,NioSocketChannel 和 NioServerSocketChannel 在执行构造方法的时候,都会走到它们的父类 AbstractChannel 的构造方法中:

protected AbstractChannel(Channel parent) {

    this.parent = parent;

    // 给每个 channel 分配一个唯一 id

    id = newId();

    // 每个 channel 内部需要一个 Unsafe 的实例

    unsafe = newUnsafe();

    // 每个 channel 内部都会创建一个 pipeline

    pipeline = newChannelPipeline();

}

上面的三行代码中,id 比较不重要,Netty 中的 Unsafe 实例其实挺重要的,这里简单介绍一下。

在 JDK 的源码中,sun.misc.Unsafe 类提供了一些底层操作的能力,它设计出来是给 JDK 中的源码使用的,比如 AQS、ConcurrentHashMap 等,我们在之前的并发包的源码分析中也看到了很多它们使用 Unsafe 的场景,这个 Unsafe 类不是给我们的代码使用的,是给 JDK 源码使用的(需要的话,我们也是可以获取它的实例的)。

Unsafe 类的构造方法是 private 的,但是它提供了 getUnsafe() 这个静态方法:

Unsafe unsafe = Unsafe.getUnsafe();

大家可以试一下,上面这行代码编译没有问题,但是执行的时候会抛 java.lang.SecurityException 异常,因为它就不是给我们的代码用的。

但是如果你就是想获取 Unsafe 的实例,可以通过下面这个代码获取到:

Field f = Unsafe.class.getDeclaredField("theUnsafe");

f.setAccessible(true);

Unsafe unsafe = (Unsafe) f.get(null);

Netty 中的 Unsafe 也是同样的意思,它封装了 Netty 中会使用到的 JDK 提供的 NIO 接口,比如将 channel 注册到 selector 上,比如 bind 操作,比如 connect 操作等,这些操作都是稍微偏底层一些。Netty 同样也是不希望我们的业务代码使用 Unsafe 的实例,它是提供给 Netty 中的源码使用的。

不过,对于我们源码分析来说,我们还是会有很多时候需要分析 Unsafe 中的源码的

关于 Unsafe,我们后面用到了再说,这里只要知道,它封装了大部分需要访问 JDK 的 NIO 接口的操作就好了。这里我们继续将焦点放在实例化 pipeline 上:

protected DefaultChannelPipeline newChannelPipeline() {

    return new DefaultChannelPipeline(this);

}

这里开始调用 DefaultChannelPipeline 的构造方法,并把当前 channel 的引用传入:

protected DefaultChannelPipeline(Channel channel) {

    this.channel = ObjectUtil.checkNotNull(channel, "channel");

    succeededFuture = new SucceededChannelFuture(channel, null);

    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);

    head = new HeadContext(this);

    head.next = tail;

    tail.prev = head;

}

这里实例化了 tail 和 head 这两个 handler。tail 实现了 ChannelInboundHandler 接口,

而 head 实现了 ChannelOutboundHandler 和 ChannelInboundHandler 两个接口,

并且最后两行代码将 tail 和 head 连接起来:复制代码

注意,在不同的版本中,源码也略有差异,head 不一定是 in + out,大家知道这点就好了。

还有,从上面的 head 和 tail 我们也可以看到,其实 pipeline 中的每个元素是 ChannelHandlerContext 的实例,而不是 ChannelHandler 的实例,context 包装了一下 handler,但是,后面我们都会用 handler 来描述一个 pipeline 上的节点,而不是使用 context,希望读者知道这一点。

这里只是构造了 pipeline,并且添加了两个固定的 handler 到其中(head + tail),还不涉及到自定义的 handler 代码执行。我们回过头来看下面这段代码:

我们说过 childHandler 中指定的 handler 不是给 NioServerSocketChannel 使用的,是给 NioSocketChannel 使用的,所以这里我们不看它。

这里调用 handler(…) 方法指定了一个 LoggingHandler 的实例,然后我们再进去下面的 bind(…) 方法中看看这个 LoggingHandler 实例是怎么进入到我们之前构造的 pipeline 内的。

顺着 bind() 一直往前走,bind() -> doBind() -> initAndRegister():

final ChannelFuture initAndRegister() {

    Channel channel = null;

    try {

        // 1. 构造 channel 实例,同时会构造 pipeline 实例,

        // 现在 pipeline 中有 head 和 tail 两个 handler 了

        channel = channelFactory.newChannel();

        // 2. 看这里

        init(channel);

    } catch (Throwable t) {

    ......

}

上面的两行代码,第一行实现了构造 channel 和 channel 内部的 pipeline,我们来看第二行 init 代码:

// ServerBootstrap:

  @Override

void init(Channel channel) throws Exception {

    ......

    // 拿到刚刚创建的 channel 内部的 pipeline 实例

    ChannelPipeline p = channel.pipeline();

    ...

    // 开始往 pipeline 中添加一个 handler,这个 handler 是 ChannelInitializer 的实例

    p.addLast(new ChannelInitializer<Channel>() {

        // 我们以后会看到,下面这个 initChannel 方法何时会被调用

        @Override

        public void initChannel(final Channel ch) throws Exception {

            final ChannelPipeline pipeline = ch.pipeline();

            // 这个方法返回我们最开始指定的 LoggingHandler 实例

            ChannelHandler handler = config.handler();

            if (handler != null) {

                // 添加 LoggingHandler

                pipeline.addLast(handler);

            }

            // 先不用管这里的 eventLoop

            ch.eventLoop().execute(new Runnable() {

                @Override

                public void run() {

                    // 添加一个 handler 到 pipeline 中:ServerBootstrapAcceptor

                    // 从名字可以看到,这个 handler 的目的是用于接收客户端请求

                    pipeline.addLast(new ServerBootstrapAcceptor(

                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));

                }

            });

        }

    });

}

这里涉及到 pipeline 中的辅助类 ChannelInitializer,我们看到,它本身是一个 handler(Inbound 类型),但是它的作用和普通 handler 有点不一样,它纯碎是用来辅助将其他的 handler 加入到 pipeline 中的。

大家可以稍微看一下 ChannelInitializer 的 initChannel 方法,有个简单的认识就好,此时的 pipeline 应该是这样的:

ChannelInitializer 的 initChannel(channel) 方法被调用的时候,会往 pipeline 中添加我们最开始指定的 LoggingHandler 和添加一个 ServerBootstrapAcceptor。但是我们现在还不知道这个 initChannel 方法何时会被调用。

上面我们说的是作为服务端的 NioServerSocketChannel 的 pipeline,NioSocketChannel 也是差不多的,我们可以看一下 Bootstrap 类的 init(channel) 方法:

void init(Channel channel) throws Exception {

    ChannelPipeline p = channel.pipeline();

    p.addLast(config.handler());

    ...

}

它和服务端 ServerBootstrap 要添加 ServerBootstrapAcceptor 不一样,它只需要将 EchoClient 类中的 ChannelInitializer 实例加进来就可以了,它的 ChannelInitializer 中添加了两个 handler,LoggingHandler 和 EchoClientHandler:

很显然,我们需要的是像 LoggingHandler 和 EchoClientHandler 这样的 handler,但是,它们现在还不在 pipeline 中,那么它们什么时候会真正进入到 pipeline 中呢?以后我们再揭晓。

还有,为什么 Server 端我们指定的是一个 handler 实例,而 Client 指定的是一个 ChannelInitializer 实例?其实它们是可以随意搭配使用的,你甚至可以在 ChannelInitializer 实例中添加 ChannelInitializer 的实例。

非常抱歉,这里又要断了,下面要先介绍线程池了,大家要记住 pipeline 现在的样子,head + channelInitializer + tail

本节没有介绍 handler 的向后传播,就是一个 handler 处理完了以后,怎么传递给下一个 handler 来处理?比如我们熟悉的 JavaEE 中的 Filter 是采用在一个 Filter 实例中调用 chain.doFilter(request, response) 来传递给下一个 Filter 这种方式的。

我们用下面这张图结束本节。下图展示了传播的方法,但我其实是更想让大家看一下,哪些事件是 Inbound 类型的,哪些是 Outbound 类型的:

Outbound 类型的几个事件大家应该比较好认,注意 bind 也是 Outbound 类型的。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,772评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,458评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,610评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,640评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,657评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,590评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,962评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,631评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,870评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,611评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,704评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,386评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,969评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,944评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,179评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,742评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,440评论 2 342