Netty源码分析:1.4服务器启动流程

原文博客:Doi技术团队
链接地址:https://blog.doiduoyi.com
初心:记录优秀的Doi技术团队学习经历

第一章节是主要是服务器启动的代码分析。章节目录有:
|---------1.1初始化NioEventLoopGroup
|---------1.2初始化NioEventLoop
|---------1.3初始化NioServerSocketChannel
|---------1.4服务器启动流程
为什么先从初始化开始了解服务器启动?
因为在我看服务器启动的相关源码的时候,有很多地方都是初始化的时候已经建立好的。所以我就从初始化的源码开始看起。这是我第一次看源码的笔记,仍有很多理解错误的地方和不解的地方。欢迎讨论。

本篇目录

  • 启动服务器代码
  • 代码分析

启动服务器代码

  • 1 创建NioEventLoopGroup对象。上文已经介绍了NioEventLoopGroup的初始化已经内部线程NioEventLoop的初始化话。
  • 2 创建ServerBootstrap对象,该类初始化主要是创建了几个LinkedHashMap来存储设置。例如childOptions或者childAttrs
 public void bind() {
 
         //1 
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup work = new NioEventLoopGroup();
        try {
//            引导绑定和启动服务器
            //2
             ServerBootstrap b = new ServerBootstrap();
             //3
             b.group(boss, work);
//            创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等
            //4
             b.channel(NioServerSocketChannel.class);
//            设置childHandler执行所有的连接请求
            //5
             b.childHandler(new ChildChannelHandler());
             //6
             b.option(ChannelOption.SO_BACKLOG, 100);
             
            //绑定端口
            //7
            ChannelFuture future = b.bind(8080).sync();

            //8
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            //9
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }

    }

3 Group

赋值两个线程池。
1 首先第一个boss线程池是赋值到父类AbstractBootstrap的group变量中。
2 work线程池就赋值在ServerBootstrap的childGroup变量中。

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
        //1
        super.group(parentGroup);
        if (childGroup == null) {
            throw new NullPointerException("childGroup");
        }
        if (this.childGroup != null) {
            throw new IllegalStateException("childGroup set already");
        }
        this.childGroup = childGroup;
        return this;
    }

4 channel()

  • 首先ReflectiveChannelFactory类是一个工厂类,里面只有一个方法newChannel()用来将传入的class进行无参构造生成对象的。
  • 然后channelFactory()是将ReflectiveChannelFactory赋予ServerBootstrap的父类的channelFactory参数

此时channel 还没被创建,直到bind()方法中调用initAndRegister()方法的时候才会用该工厂类生成一个channel。下文会讲。

 public B channel(Class<? extends C> channelClass) {
        if (channelClass == null) {
            throw new NullPointerException("channelClass");
        }
        return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
    }

5 childHandler()

在调用这个childHandler()这个方法的时候,你需要自己写一个方法继承ChannelInitializer类。而ChannelInitializer类也是继承ChannelInboundHandlerAdapter的。所以childHandler()方法的参数就是我们自己写的类。然后赋值到childHandler参数。

public ServerBootstrap childHandler(ChannelHandler childHandler) {
        if (childHandler == null) {
            throw new NullPointerException("childHandler");
        }
        this.childHandler = childHandler;
        return this;
    }

6 Option()

将一些配置存储到options变量中,该变量是一个LinkedHashMap

public <T> B option(ChannelOption<T> option, T value) {
        if (option == null) {
            throw new NullPointerException("option");
        }
        if (value == null) {
            synchronized (options) {
                options.remove(option);
            }
        } else {
            synchronized (options) {
                options.put(option, value);
            }
        }
        return (B) this;
    }

7 bind()

image.png

之前的一些方法都只是对ServerBootstrap的配置,说白了就是用来set参数的。
bind()则是开始启动服务器了。

  • 1 进行对groupchannelFactory两个参数进行非空验证
public ChannelFuture bind(SocketAddress localAddress) {
        validate(); //1 
        if (localAddress == null) {
            throw new NullPointerException("localAddress");
        }
        return doBind(localAddress);
    }

doBind()

  • 1 创建一个channel 并且初始化和注册。关键部分。代码分析看下面
  • 2 判断channel是否注册成功。如果已经注册成功那么就进行doBind0()方法。如果还没成功,那就添加一个监听器,等返回成功的时候就进行doBind0()方法。解析看下文。
 private ChannelFuture doBind(final SocketAddress localAddress) {
        final ChannelFuture regFuture = initAndRegister(); //1
        final Channel channel = regFuture.channel();
        if (regFuture.cause() != null) {
            return regFuture;
        }
        //2
        if (regFuture.isDone()) {
            ChannelPromise promise = channel.newPromise();
            doBind0(regFuture, channel, localAddress, promise);
            return promise;
        } else { 
            final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
            regFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    Throwable cause = future.cause();
                    if (cause != null) {
                       
                        promise.setFailure(cause);
                    } else {
                      
                        promise.registered();

                        doBind0(regFuture, channel, localAddress, promise);
                    }
                }
            });
            return promise;
        }
    }


doBind0()

  • 1 该方法是执行在channel已经与selector注册后的。给线程添加一个任务。该任务是绑定端口的。

到这个方法。服务器启动就已经完成了

 private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {
        //1
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

下文都是对initAndRegister()方法的代码解析

initAndRegister()

  • 1 利用工厂创建一个channel对象。初始化内容看上一篇文章初始化NioServerSocketChannel
  • 2 对channel进行一个初始化,看下文。
  • 3 对channel进行注册,看下文
final ChannelFuture initAndRegister() {
        Channel channel = null;
        try {
           //1
            channel = channelFactory.newChannel();
            //2
            init(channel);
        } catch (Throwable t) {
          
        }
        //3
        ChannelFuture regFuture = config().group().register(channel);
        if (regFuture.cause() != null) {
            if (channel.isRegistered()) {
                channel.close();
            } else {
                channel.unsafe().closeForcibly();
            }
        }
        return regFuture;
    }

init()

  • 1 对channel 的config变量里添加options属性
  • 2 对channel 的config变量里添加attrs属性
  • 3 获取channel 对象的pipeline管道,然后在管道里面添加一个handler,该handler作用有:添加bootstrap里的handler。对channel 添加一个任务。
  • 4 添加bootstrap里的handler。对channel 添加一个任务
void init(Channel channel) throws Exception {
        final Map<ChannelOption<?>, Object> options = options0();
        //1 
        synchronized (options) {
            setChannelOptions(channel, options, logger);
        }

        //2
        final Map<AttributeKey<?>, Object> attrs = attrs0();
        synchronized (attrs) {
            for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
                @SuppressWarnings("unchecked")
                AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
                channel.attr(key).set(e.getValue());
            }
        }
        
        //3
        ChannelPipeline p = channel.pipeline();

        final EventLoopGroup currentChildGroup = childGroup;
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions;
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
        synchronized (childOptions) {
            currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
        }
        synchronized (childAttrs) {
            currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
        }
        
        //4
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }

                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

config().group().register(channel)

  • config().group()是获取到线程池,该线程池是ServerBootstrap的父类AbstractBootstrap存储的boss线程池。是不是恍然大悟,这就是把channel放到boss线程池里的一个线程里面去执行任务啊。
  • 1 是MultithreadEventLoopGroup类的方法 也就是NioEventLoopGroup的父类。 该方法就是从线程池里获取一个EventLoop.然后执行EventLoop里的register()方法。
 @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
@Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel); //1
    }

继续看下去。

  • 2 是SingleThreadEventLoop里的方法。``DefaultChannelPromise()` 方法是给channel 与该线程 添加一个监听器。
@Override
    public ChannelFuture register(Channel channel) {
       //2
        return register(new DefaultChannelPromise(channel, this));
    }

继续往下看

  • 3 继续是SingleThreadEventLoop里的方法。promise是监听器对象,promise.channel()获取到channel。unsafe()方法是实现底层的register,read或者write操作
 @Override
    public ChannelFuture register(final ChannelPromise promise) {
        //3
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

继续往下分析,该register()方法是AbstractChannel类的。也就是初始化NioServerSocketChannel的时候,建立pipeline和unsafe的类。

  • 4 进行参数验证
  • 5 将该eventLoop线程赋值于channel参数。
  • 6 eventLoop.inEventLoop() 判断,如果现在的线程是EventLoop()的 线程,那么执行任务,如果不是那么就用执行器执行任务。在这里debug,会返回false,会调用execute()方法。
    下面继续探讨
  • 7 下面探讨下register0()方法
 @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            //4
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }
            //5
            AbstractChannel.this.eventLoop = eventLoop;
            //6
            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                        //7
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                  
                }
            }
        }

首先在研究register0()之前有一个小知识。

在版本4.1.x 的时候,在初始化eventLoop的时候,还没有创建线程,而是保存了Executor这个变量。这个变量在4.0.x版本的时候是没有的。那么4.1.x版本在什么时候创建线程呢?在eventLoop调用execute()方法的时候创建线程。下文可以看到

  • 1 继续判断是否现在的线程是EventLoop对象的线程,肯定返回false,因为EventLoop线程里的thread变量是null嘛。
 @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
image.png
  • 2 在startThread()方法中,才创建一个线程。
  • 3 在任务队列里添加一个任务。
public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        //1
        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
             //2
            startThread();
            //3
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }
        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

register0(ChannelPromise promise)

  • 1 判断EventLoop线程是否还存活。
  • 2 这个是记录是否注册过的。neverRegistered默认是true;
  • 3 这个是核心代码,到这里才进行将channel与selector注册在一起。
 private void register0(ChannelPromise promise) {
            try {
                //1
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                //2
                boolean firstRegistration = neverRegistered;
                doRegister();//3
                neverRegistered = false;
                registered = true;

      
                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
            
                if (isActive()) {
                    if (firstRegistration) {
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                     
                        beginRead();
                    }
                }
            } catch (Throwable t) {
            
            }
        }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,311评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,339评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,671评论 0 342
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,252评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,253评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,031评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,340评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,973评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,466评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,937评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,039评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,701评论 4 323
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,254评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,259评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,485评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,497评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,786评论 2 345