EventLoopGroup
(如果使用到的是 NIO, 那么通常是 NioEventLoopGroup), 那么这个 NioEventLoopGroup 在 Netty 中到底扮演着什么角色呢?
NIO 的Reactor 模型
- 补充多线程的reactor模式
Reactor 多线程模型 有如下特点:
有专门一个线程, 即 Acceptor 线程用于监听客户端的TCP连接请求.
客户端连接的 IO 操作都是由一个特定的 NIO 线程池负责. 每个客户端连接都与一个特定的 NIO 线程绑定, 因此在这个客户端连接中的所有 IO 操作都是在同一个线程中完成的.
客户端连接有很多, 但是 NIO 线程数是比较少的, 因此一个 NIO 线程可以同时绑定到多个客户端连接中.
Netty 是 Reactor 模型与NIO的Reactor 本质上区别不是很大。那么和nio中的实现有哪些不同的。下面我们分析:
reactor 一般是服务端用的最多,这里我们以EchoServer分析
单线程模式:
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup)
.channel(NioServerSocketChannel.class)
...
多线程模式
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
...
- 上面两端代码,区别其实就是单线程重载方法group。
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
接下来分析reactor的核心NioEventLoopGroup,来确定这是个什么玩意,为什么它能充当一个线程组
类图如下:
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
//调用下面
public NioEventLoopGroup(int nThreads, Executor executor) {
this(nThreads, executor, SelectorProvider.provider());
}
//调用下面
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
this(nThreads, threadFactory, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
///调用下面
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory,
final SelectorProvider selectorProvider, final SelectStrategyFactory selectStrategyFactory) {
super(nThreads, threadFactory, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
//继续调用父类MultithreadEventLoopGroup
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
//在次调用父类的父类MultithreadEventExecutorGroup
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
注意:这里我们初始话executor为null那么后续我们猜测应该netty会为我们创建默认的executor。SelectorProvider.provider()这个方法前面介绍过,会根据当前系统来选择核实的io多路复用(select、poll、epoll)。DefaultSelectStrategy默认策略 。Execution的拒绝策略reject(线程池的拒绝策略)
最后在父类的父类MultithreadEventExecutorGroup构造器中
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
//创建一个大小为 nThreads 的 EventExecutor数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//newChild的实现类在NioEventLoopGroup中,返回NioEventLoop
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
//从DefaultEventExecutorChooserFactory工厂实现类中的newChooser方法: 根据线程数在children 数组中选出一个合适的 EventExecutor 实例
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
- 已知children是一个EventExecutor数组, 而ThreadPerTaskExecutor是Executor,最后使用newChild方法将ThreadPerTaskExecutor封装成EventLoop放到数组中
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
newChild方法将ThreadPerTaskExecutor封装成EventLoop放到数组中
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
综上所述,我们可以先猜测这个EventLoop的作用,可能是客户端一旦和服务端accept后会将task丢到从EventExecutor数组取出一个EventLoop来执行,那么会是这样吗?我们来继续
简要分析下NioEventLoop:
NioEventLoop的继承很多,这里我们只需了解他的父类SingleThreadEventExecutor 构造器中, 通过 threadFactory.newThread 创建了一个新的 Java 线程. 在这个线程中所做的事情主要就是调用 SingleThreadEventExecutor.this.run() 方法, 而因为 NioEventLoop 实现了这个方法, 因此根据多态性, 其实调用的是 NioEventLoop.run() 方法.
接下来我们追踪这个NioEventLoop是在哪里器作用的,需要注意的是我们使用了两个NioEventLoopGroup,一个是bossGroup一个是workerGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
- 作为服务端我们肯定是要从启动的bind入手分析:
根据之前服务端的分析,我们一路找到ServerBootstrap父类AbstractBootstrap中doBind0这个方法
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
根据前面的分析channel.eventLoop()取得为bossgroup,也就是应该accept的线程,正好 channel.bind也同时印证了我们的猜想。那么接下来workgroup从哪里来呢
- 想一下处理io阻塞事件在netty中一般是一何种形式处理的呢,对了就是handler,一般在ServerBootstrapAcceptor这handler和客户端连接后就会交个后面的handler处理,在哪里处理就是在childgroup线程组中处理
回想一下,在分析server端的是我们有介绍过ServerBootstrap实现的init初始化handler,这里出现过childGroup,正是我们苦苦寻找的workgroup
//这里初始化的为nioserverchannel
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//这里从config获取的handler为parent handler
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
//currentChildGroup、currentChildHandler客户端的连接的 IO 交互
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
简要分析下,像pipeline中添加ChannelInitializer,前面分析pipeline已经知道之后再register掉initChannel方法。添加的ServerBootstrapAcceptor这个handler
在它抄写了channelread 事件,然后交给childgroup线程处理自定义handler
ServerBootstrapAcceptor中channelRead方法
//inbound事件到来时,这里就是客户端和
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
//将操作io的handler绑定到childGroup,执行完成后断开childchannel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
基本上就暂时分析enveloop作为netty的reactor模式的核心。bossgroup、workgroup等作用