Netty快速入门(11)EventLoop源码解析

EventLoop的类层图

我们来简单讨论一下Netty线程模型的源码。学习一下EventLoopGroup的原理。首先看一下EventLoop的类层结构图: 

上面是一个简化的版本,完整的是非常复杂的。从整体上来看,netty针对EventLoop定义了两种类型的接口,一个是EventExecutor和EventExecutorGroup,另一个是EventLoop和EventLoopGroup。

通过这些接口,可以总结出两大功能,第一个就是它是一个事件执行器(EventExecutor),它能够执行各种事件,也能够向它提交事件,它能够执行。第二它是一个EventLoop,能够向它注册handler,进行循环处理。相当于nio中的selector加上循环操作。

对于每个NioEventLoop,它继承了SingleThreadEventLoop,同时SingleThreadEventLoop继承了SingleThreadEventExecutor。

对于每个NioEventLoopGroup,它继承了MultithreadEventLoopGroup,同时MultithreadEventLoopGroup继承了MultithreadEventExecutorGroup。





EventExecutor结构

EventExecutor是一个事件执行器,提供了很多执行任务的方法:

比如submit方法,这些接口都继承了ScheduleExecutorService接口,因此它们能够执行定时任务,能够提交任务,还包括schedule方法,其中最重要的方法是next,上面是EventExecutorLoop里面的一些方法。

下面的EventExecutor里面的方法也是一样,next返回自己的类型,表示下一个执行,parent()方法表示它是属于哪一个EventExecutorGroup,因为每个EventExecutor都属于一个EventExecutorGroup,parent()方法能判断它属于哪个组。inEventLoop判断当前方法是不是在当前线程里面执行,还有其他方法等等。

EventExecutor和EventExecutorGroup可以总结如下:

1.EventExecutorGroup里面有一个EventExecutor数组,保存了多个EventExecutor;

2.EventExecutorGroup是不干什么事情的,当收到一个请后,他就调用next()获得一个它里面的EventExecutor,再调用这个executor的方法;

3.EventExecutorChooser.next()定义选择下一个EventExecutor的策略;

我们来看一下选择下一个EventExecutor的策略的具体实现,

查看DefaultEventExecutorChooserFactory的源码,

算法很简单,它有两中选择方式,一种是PowerOfTwoEventExecutorChooser,另一种是GenericEventExecutorChooser。它们的算法很简单,其实就是轮询(round robin),如果是2的指数,就用位来做(PowerOfTwoEventExecutorChooser),如果不是2的指数就取模(GenericEventExecutorChooser),这两种效果是一模一样的。

再来看一下具体的内容,来看MultithreadEventExecutorGroup类方法:

其中有个children属性,就是一个EventExecutor[]数组,有个chooser属性,就是EventExecutorChooser选择器,所有的方法都是通过next派发给SingleThreadEventExecutor去处理:

就是在这里具体处理事情。它里面实现的一个重要的execute(Runnable )方法,就是做线程提交的,一定要关注这个方法。所有的方法都派发给SingleThreadEventExecutor后,它自己也不干,它交给它里面的Executor属性去做,具体的实现一般是ThreadPerTaskExecutor:

上面可以理解为都是中介,真正干活的就是ThreadPerTaskExecutor。





EventLoop结构

前面的EventExecutor就是执行,EventLoop主要是注册:

EventLoopGroup也是啥都不干,让下面的SingleThreadEventLoop来干。

EventLoop除了注册事件,还包括事件循环,因为netty有很多EventLoop和EventLoopGroup,比如nio对应的是nio的一套,bio对应的是bio的一套,其它非nio的也各自有一套,我们这里只讨论nio这一套。它的事件循环肯定是放到具体的run方法里面的:

NioEventLoop中的run方法实现的是SingleThreadEventExecutor中的run方法,每个EventLoopGroup都包括一系列的EventExecutor,看下图:

上图左侧表示一个EventLoopGroup,同样包含了一系列的EventExecutor,刚才在源码中看到,它是一个数组,通过EventExecutorChooser进行选择,每一个都指向一个具体的NioEventLoop。对于每个NioEventLoop,它内部有Selector,有很多通道注册,同时还有个事件循环去处理这些事情(上图右上侧部分)。对于每个通道,它都有一个ChannelPipeline(上图右下侧),里面有个handler链,它默认有两个,一个是头(HeaderContext),一个是尾(TailContext),这是netty自己加的,中间我们可以插入自己的处理器。

上面可以理解为我们平时用的workerGroup的结构,对于bossGroup,大致上和workderGroup差不多,只不过注册的时候,只有一个接收事件:

上面Selector对应的只有一个NioServerSocketChannel,里面也是一个事件循环,处理接收事件,它的ChannelPipeline也大致上一样,只不过最尾部的一个东西,肯定是ServerBootstrapAccept,专门负责accept事件。





NioEventLoop创建分析

上面了解了NioEventLoop的类结构,下面来了解一下它的创建过程。我们使用debug的形式查看创建过程,首先在 new 地方打一个断点:

然后debug启动项目,首先进入的是NioEventLoopGroup中的方法:

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

我们在workerGroup处打断点发现,最终进入的也是这个方法,所以在两个地方打断点都可以。这个方法除了传入一个线程数量,还传入了一个Executor对象,前面讨论过,Executor就是最终干活的对象,最终实现是ThreadPerTaskExecutor,这里传的是null,说明我们是不需要传入的,由netty自己创建即可,我们继续往下走:

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)

这个方法中又多了一个参数,SelectorProvider定义了创建selector、ServerSocketChannel、SocketChannel等方法,采用Java的 Service Provider Interface (SPI) 方式实现,

从代码可以知道,创建过程为

如果provider已经创建,直接返回;

如果定义了java.nio.channels.spi.SelectorProvider属性,则采用该属性定义的类创建SelectorProvider并返回;如果失败,则继续;

采用SPI方法创建SelectorProvider并返回;如果实现,则通过DefaultSelectorProvider创建;

我们继续往下走,

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)

3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)

这里多了一个参数,继续往下走:

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)

3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)

4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)

这里多了一个拒绝处理参数,继续往下走:

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)

3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)

4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)

5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)

从这里可以看到,线程数如果没有定义会使用默认的数值,默认的值就是系统cup*2:

继续往下走,

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)

3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)

4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)

5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)

6、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)

这里增加了一个我们上面讨论过的参数类型,DefaultEventExecutorChooserFactory,也就是EventExecutor的选择器,继续往下走:

1、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int)

2、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor)

3、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider)

4、 io.netty.channel.nio.NioEventLoopGroup#NioEventLoopGroup(int, java.util.concurrent.Executor, java.nio.channels.spi.SelectorProvider, io.netty.channel.SelectStrategyFactory)

5、 io.netty.channel.MultithreadEventLoopGroup#MultithreadEventLoopGroup(int, java.util.concurrent.Executor, java.lang.Object...)

6、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, java.lang.Object...)

7、 io.netty.util.concurrent.MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, java.util.concurrent.Executor, io.netty.util.concurrent.EventExecutorChooserFactory, java.lang.Object...)

通过上面七步,我们组合齐了创建需要的所有参数,进入了这里,这里是真正创建开始的地方,从代码中看到,前面两个判断,一个是线程数量,另一个是对Executor的创建,使用的是ThreadPerTaskExecutor,

里面的execute方法很简单,每次执行的时候都会创建一个新的线程执行命令。

下面的children,就是我们前面提到的MultithreadEventExecutorGroup中的EventExecutor数组,我们指定了多少个线程数,就会创建多少个EventExecutor,来看一下每个EventExecutor的创建过程:

创建的时候调用的是new Child去创建,这个方法真正实现的地方是:

io.netty.channel.nio.NioEventLoopGroup#newChild

里面new 了一个NioEventLoop,具体的内容如下:

我们看到里面创建了一个selector,通过这样children数组就new 完了。

接下来就是创建chooser,作用是选择下一个EventExecutor,前面介绍过根据是否是2的指数的倍数进行选择,这里的断点打在了bossGroup上面,只创建了一个,因此是2的0次方,我们通过调试可以看到它选择的策略:

最下面的几行是对children的一些操作,这样整个new过程就结束了。我们再来回顾这张图:

创建的过程,EventExecutor数组创建好了,每个NioEventLoop的selector也创建了,但是没有看到事件循环线程启动,也没有看到事件注册等内容。所以在这个阶段,无论是boss还是worker都是一样的。从上面可以看出,创建流程是比较简单的。





NioEventLoop启动流程分析

netty是用ServerBootstrap进行启动,启动的代码是这一行:

1、 ChannelFuture f = server.bind(port).sync()

我们可以在这里打断点进行启动流程分析:

这里一种调用了两个方法,一个是bind方法,一个是sync方法,我们先从bind开始。

1、 ChannelFuture f = server.bind(port).sync()

2、 io.netty.bootstrap.AbstractBootstrap#bind(int)

继续往下:

1、 ChannelFuture f = server.bind(port).sync()

2、 io.netty.bootstrap.AbstractBootstrap#bind(int)

3、 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)

validate()主要是验证是否为空的:

继续往下走:

1、 ChannelFuture f = server.bind(port).sync()

2、 io.netty.bootstrap.AbstractBootstrap#bind(int)

3、 io.netty.bootstrap.AbstractBootstrap#bind(java.net.SocketAddress)

4、 io.netty.bootstrap.AbstractBootstrap#doBind

这个方法内容比较多,来看第一行代码,执行了一个initAndRegister方法,回想我们自己写nio程序的时候,启动第一件事是把ServerSocketChannel初始化并注册到selector中,initAndRegister方法中也是做了同样的事情:

来看这个方法,首先通过 channel =channelFactory.newChannel(); 创建一个channel,我们配置ServerBootstrap的时候,channel传入的是 NioServerSocketChannel,这里通过一个工厂创建它的实例,当然这里用的是反射技术。具体实现代码如下:

io.netty.channel.ReflectiveChannelFactory#newChannel

反射执行的时候,最终会走到下面的方法:

io.netty.channel.AbstractChannel#AbstractChannel(io.netty.channel.Channel)

对于每个channel,都有这些属性,parent,id(自动生成的),unsafe,另外一个非常重要的就是pipeline属性,就是前面我们说的ChannelPipeline。刚开始创建的时候,啥都没有,就有默认的HeaderContext和TailContext:

io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline

接下来回到initAndRegister方法,创建完了,我们来看init方法,

io.netty.bootstrap.ServerBootstrap#init

前面是设置options,设置属性的一些内容,接下来, ChannelPipeline p = channel.pipeline(); 就是把pipeline拿出来,然后把childGroup和childHandler都拿出来,接下来是往pipeline里面加东西,

就是把我们在ServerBootstrap中配置的handler加进去。这里面先配置了ChannelPipeline,接下来就是ServerBootstrapAcceptor,这里面会把childGroup和childHandler都传进去,之所以会传这两个,是因为这里的ServerBootstrapAcceptor是要把接收到的NioSocketChannel的连接注册上去,所以这里需要有childGroup,而对于每个NioSocketChannel中的pipeline里面有什么东西,这里需要childHandler来看的。做完这些事情,init方法就完成了。

再次回到initAndRegister方法,前面的newChannel和init方法都完成了,下面是注册方法:

ChannelFuture regFuture = config().group().register(channel);

这里先调用config和group方法,返回的就是我们配置的bossGroup,这里如果bossGroup配置多个,也会从中选一个,所以配置多个也是没有用处的(具体理由参考前面一篇文章)。对于boss来说,这里只会初始化注册一次,选择一个。获取group后,调用的是register方法,最终到的方法是:

io.netty.channel.AbstractChannel.AbstractUnsafe#register

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345