1_Netty源码分析之Netty服务端启动


本文均为原创,如需转载请注明出处。

[TOC]

Netty服务端创建流程分析

​ Netty为了向使用者屏蔽NIO通信的底层细节,在和用户交互的边界做了封装,母的就是为了减少用户开发工作量,降低开发难度。BootstrapSocket客户端创建工具类,用户听过Bootstrap可以方柏霓地创建Netty地客户端并发起异步TCP连接操作。

Netty服务端时序图

Netty服务端--Channel的创建

首先基于NIO的学习,思考两个问题

  • 服务端的Socket在哪里初始化?

  • 在哪里accept连接?

研究服务端是如何创建的,查看源码,首先应该从源头出发,直接调用的方法开始层层深入。

Netty服务端的入口bind()方法

​ 服务端创建的入口bind()方法

// 绑定端口,同步等待成功
ChannelFuture f = b.bind(port).sync();

进入bind()方法发现会执行一个dobind()方法

    private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister();
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }

该方法调用了一个initAndRegister()

 final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

很明显可以看出。这里会创建一个底层的channel,调用Netty维护的一个工厂类,下面的init()方法是一个抽象类,可以看看那些方法继承并实现了它呢?😊

AbstractBootstrap下的两个子类.png

所以可想而知!在调用bind()方法后,Netty会调用JDK底层初始化一个Channel,回过头来看看channelFactory.newChannel()的实现。

ChannelFactory下有一个利用反射的实现子类ReflectiveChannelFactory(),从名字就可以看得出来,是一个利用反射来初始化channel的工厂类。该类下的newChannel()方法:

    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

确实是利用了返回来进行实例化的。那么它是如何知道实话化那个对象呢?

​ 让我们回到上一层开始出现channelFactory.newChannel(),在这里可以很清楚的看出来,当前类AbstractBootstrap自身维护着该工厂对象,并且在构造函数中给该工厂类所需要的对象进行了赋值。

    AbstractBootstrap() {
        // Disallow extending from a different package.
    }

    AbstractBootstrap(AbstractBootstrap<B, C> bootstrap) {
        group = bootstrap.group;
        // 那么这里的bootstrap对象又是从哪里得到的呢?
        channelFactory = bootstrap.channelFactory;
        handler = bootstrap.handler;
        localAddress = bootstrap.localAddress;
        synchronized (bootstrap.options) {
            options.putAll(bootstrap.options);
        }
        synchronized (bootstrap.attrs) {
            attrs.putAll(bootstrap.attrs);
        }
    }

这里的AbstractBootstrap<B, C> bootstrap很直接的就可以想到我们在最外层定义的AbstractBootstrap的子类Bootstrap/ServerBootstrap中设置了

          ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childHandler(new ChildChannelHandler());

其中设置的channel属性就是告诉底层的channelFactory来实例化该对象。

这里我们验证一下,直接从我们的代码中.channel()方法中去,可以很清楚的看出,上面说的调用的是ChannelFactory下的子类ReflectiveChannelFactory

    public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        // 调用工厂类,利用反射创建对象
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

该类的实现非常的简单:

public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {

    private final Class<? extends T> clazz;

    // 直接返回 需要初始化类的 class 对象
    public ReflectiveChannelFactory(Class<? extends T> clazz) {
        if (clazz == null) {
            throw new NullPointerException("clazz");
        }
        this.clazz = clazz;
    }

    // 利用反射 初始化返回实例对象
    @Override
    public T newChannel() {
        try {
            return clazz.newInstance();
        } catch (Throwable t) {
            throw new ChannelException("Unable to create Channel from class " + clazz, t);
        }
    }

    @Override
    public String toString() {
        return StringUtil.simpleClassName(clazz) + ".class";
    }
}

现在明白了channel是由谁创建的,那么到底是怎么创建出来的呢?现在进入NioServerSocketChannel.查看该对象的构造函数,做了那些事情。

通过NioServerSocketChannel加密Channel的创建

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
// 该方法 返回的对象为 java.nio.channels;下的。调用的方法也是JDK底层的实现
    private static ServerSocketChannel newSocket(SelectorProvider provider) {
        try {
            return provider.openServerSocketChannel();
        } catch (IOException e) {
            throw new ChannelException(
                    "Failed to open a server socket.", e);
        }
    }
// 原来NioServerSocketChannel 是直接调用的JDK底层的newSocket来来创建Channel 通道

其中还有一个构造

    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        // 这里是可以生成一个NiO channel配置信息
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

一次点进该方法的父类可以找到在NIO编程中不可避免地一项异步配置:ch.configureBlocking(false);,配置为异步非阻塞。

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);// 配置为异步 非阻塞 (重点)
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
Channel UML图.png

在上面继承关系中AbstractChannel维护着channel通道地内部属性


    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId(); // Channel的 唯一标识
        unsafe = newUnsafe();// 对应 底层TCP读写的相关操作
        pipeline = newChannelPipeline();// 后续研究😊
    }

在服务端channel初始化完成之后,下一步就需要将该channel注册到selector上面。

注册selector

​ 在上述代码channel初始化完成的地方为:调用了init()方法。因此应该从该地方出发查看如何将channel注册到selector上。initAndRegister初始并注册。


    final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
            // 初始化操作
            channel = channelFactory.newChannel();
            init(channel);
        } catch (Throwable t) {
            if (channel != null) {
                // channel can be null if newChannel crashed (eg SocketException("too many open files"))
                channel.unsafe().closeForcibly();
            }
            // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }

        // 注册channel
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }

其中register的实现在AbstractChannel下。

 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            // 绑定线程,简单的赋值操作
            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            // 实现注册
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

register0(promise);

private void register0(ChannelPromise promise) {
            try {
                // check if the channel is still open as it could be closed in the mean time when the register
                // call was outside of the eventLoop
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 调用Jdk底层注册
                doRegister();
                neverRegistered = false;
                registered = true;

                // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
                // user may already fire events through the pipeline in the ChannelFutureListener.
                // 触发事件
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                // 传播时间
                pipeline.fireChannelRegistered();
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // This channel was registered before and autoRead() is set. This means we need to begin read
                        // again so that we process inbound data.
                        //
                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // Close the channel directly to avoid FD leak.
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                // 调用JDK 方法实现注册
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;

注册成功后,考虑端口绑定。

端口绑定

还是根据dobind()方法可以看到里面有个bind0()方法。channel对象调用bind()方法,在AbstractChannel()下有具体实现

@Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 调用JDK底层 绑定
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        // 触发事件
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

调用底层的绑定方法

    @Override
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            // 调用底层JDK的绑定方法
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

总结

​ 服务端启动核心路径总结:

服务端启动核心路径.png

首先调用服务端的newChannel()创建服务端channel,这个过程实际上就是调用JDK底层的API来创建一个JDK channel,然后Netty将其包装成自己的服务端的channel,同时会创建一些基本的组件绑定在此channel上(比喻:pipeline)。然后调用init()来初始化服务端channel,这个过程最重要的就是为服务端的channel添加一个连接处理器。随后调用register()方法注册selector,这个过程NettyJDK底层生成的channel注册到selector上,最后调用bind()方法通过jdk底层的API将端口号绑定。来实现,绑定之后,nettyselector绑定一个OP_ACCEPT事件,然后selector就可以接收绑定其他channel了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,236评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,867评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,715评论 0 340
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,899评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,895评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,733评论 1 283
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,085评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,722评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,025评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,696评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,816评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,447评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,057评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,009评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,254评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,204评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,561评论 2 343

推荐阅读更多精彩内容