本文基于上一篇Netty Server端实现,以解读源码的方式带领大家理解Server端是如何启动的,主要包括以下几点
EventLoopGroup
类的作用是什么?为什么要new两个EventLoopGroup实例
ServerBootstrap
类的作用是什么?以及它的build模式
中如group()
方法,channel()
方法,childHandler()
方法的分别做了哪些事?ServerBootstrap的
bind方法
做了哪些事?ChannelFuture
接口的作用是什么?
接下来我们围绕这几个点一一去分析,先贴一下整个Server启动的代码,方便我们分析
public class Server {
// 监听端口
private int port;
private Server(int port) {
this.port = port;
}
// 启动一个Server服务器
private void start() throws InterruptedException {
// 1.
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 2.
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
// 3.
.channel(NioServerSocketChannel.class)
// 4.
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new ServerInboundHandler());
}
})
// 5.
.option(ChannelOption.SO_BACKLOG, 128)
// 6.
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("Server is started");
// 7.
ChannelFuture f = serverBootstrap.bind(port).sync();
// 8.
f.channel().closeFuture().sync();
}finally {
// 9.
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws InterruptedException {
// 利用vm参数传递端口号,不传则默认8081
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}else{
port = 8081;
}
// 启动server
System.out.println("Server to starting... the port is: " + port);
new Server(port).start();
}
}
启动分析
1. NioEventLoopGroup
- UML类图结构
我们跟踪的构造函数,该类提供了多个重载的构造函数,最后的一个构造函数会调用super(...)的构造函数
这里直接看其抽象父类类
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
这里简单介绍一下该类有一个私有的静态常量 DEFAULT_EVENT_LOOP_THREADS
,还有一个static静态块用于初始化该常量值,即(在不指定系统参数io.netty.eventLoopThreads
的情况下)
当构造函数的参数nThreads值为0时就会取该常量值作为该EventLoop事件循环的线程数
继续往下由调用了其抽象父类的构造方法
/**
* Create a new instance.
*
* @param nThreads the number of threads that will be used by this instance.
* @param executor the Executor to use, or {@code null} if the default should be used.
* @param args arguments which will passed to each {@link #newChild(Executor, Object...)} call
*/
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
接下来重点分析该类的this(...)函数
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// 重点是这个children属性
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// 调用newChild() 方法初始化每一个children数组的元素
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
// Let the caller handle the interruption.
Thread.currentThread().interrupt();
break;
}
}
}
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
接下来看newChild(...)
方法,该方法为抽象方法,具体的执行为NioEventLoopGroup中类重写的方法
protected abstract EventExecutor newChild(Executor executor, Object... args) throws Exception;
@Override
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
return new NioEventLoop(this, executor, (SelectorProvider) args[0],
((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]);
}
可见这里的children数组的元素在实例话的时候实际上是一个个的 对象,重点来了,我们重点分析这个NioEventLoop对象
另外一点就是会为chooser属性赋值一个EventExecutorChooser事件循环选择器,该选择器的有一个next方法,作用是当有channel注册时,具体选择哪个事件循环EventExecutor(NioEventLoop)去注册
- DefaultEventExecutorChooserFactory
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
可以看到这里根据每个事件循环group中事件循环的个数是否是2的次方,分别实例化不同的事件执行选择器,默认两个group的选择器都是PowerOfTwoEventExecutorChooser实例
到这里先总结一下文章开头的第一个问题:
Server端启动代码前new了两个NioEventLoopGroup对象,每个NioEventLoopGroup对象都有一个EventExecutor数组类型的children属性(实际上new的是NioEventLoop对象),而每个children数组的size即是在new NioEventLoopGroup时传入的参数
2. NioEventLoop
重点介绍这个类, 该类是Netty底层的核心类,继承了抽象类SingleThreadEventLoop(单线程事件循环),注册Channel到Selector,在事件循环中实现IO多路复用
这里说下个人的理解,NioEventLoop一个人负责了Java NIO多路复用中的while(true)循环
以及Selector的相关工作
,包括register,cancel,select等工作,其实现基础是也是基于JDK的Selector实现的,也所以称之为事件循环
- UML 类图
分析其源码之前我们先带着问题入手
NioEventLoop如何实现循环?
NioEventLoop如何实现Selector选择器相关功能?
- Nio Selector相关属性
private Selector selector
private Selector unwrappedSelector
private SelectedSelectionKeySet selectedKeys
private final SelectorProvider provider
看到这里是不是豁然开朗?结合JDK NIO的源码我们发现NioEventLoop内部持有的就是JDK NIO的Selector,也就是利用它们实现了事件register, select
- NioEventLoop 自身相关属性
执行事件select时的策略器,即提供了一种能力去控制事件循环的行为,比如一个正在阻塞的select操作能被延迟或跳过,如果有事件需要被立即处理的话
默认情况下属性被赋值为DefaultSelectStrategy实例
private final SelectStrategy selectStrategy
private volatile int ioRatio = 50
private int cancelledKeys
private boolean needsToSelectAgain
ioRatio属性控制IO任务执行的时间占比
cancelledKeys属性表示取消注册的Key集合
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
实例化NioEventLoop完成以下:
调用父类构造完善父类属性
parent
等,指向其group赋值selectorProvider至
provider
属性openSelector()就是利用1.中的provider调用openSelector() 打开一个Selector,并将其赋值给
unwrappedSelector
属性以及赋值selectedKeys
属性等赋值
selectStrategy
属性
至此我们明白了NioEventLoop 通过持有JDK的Selector从而实现select相关功能,那循环又是如何实现的呢?
NioEvent'Loop重写了SinglethreadEventExecutor中的抽象run方法,该方法即时循环的关键实现
@Override
protected void run() {
// 死循环实现
for (;;) {
try {
try {
// 每一次循环都会计算select调用策略,如果taskQueue有任务,即直接执行selectNow(),不阻塞
// 若taskQueue没有任务,即执行select() 默认阻塞1秒
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
// 不论阻塞调用select还是非阻塞调用,都会执行以下
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
// 当前时间
final long ioStartTime = System.nanoTime();
try {
处理io事件
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
// io事件处理结束,处理taskQueue中的任务,并指定非io任务超时时间,按ioRatio比例计算出来
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
至此我们知道了Nio如何实现事件循环的大体流程,详细的事件循环我会单独放在一篇文章中讲解NioEventLoop事件循环详解
2. ServerBootstrap
- UML类图
- 属性
volatile EventLoopGroup group
即bossGroup
private volatile EventLoopGroup childGroup;
即workerGroup
private volatile ChannelHandler childHandler;
即处理请求channel的处理器
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>()
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
持有Channel反射工厂
private volatile ChannelFactory<? extends C> channelFactory
- group方法
设置用于bossGroup 和workerGroup,bossGroup的事件循环用于处理serverChannel的accept,而workerGroup里的事件循环则用于处理所有channel的IO
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
// 调用父类方法赋值group属性
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
// 赋值childGroup属性
this.childGroup = childGroup;
return this;
}
- channel方法
channel 就是根据传入的channel 的class类型去创建一个
ReflectiveChannelFactory
反射Channel工厂,并赋值给channelFactory
属性
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
- ReflectiveChannelFactory
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
// 根据clazz类型获取其构造器
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
// 利用反射调用构造器new对应的channel实例
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
- childHandler方法
就是赋值childHandler属性
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
- option方法
就是向属性options中添加元素,该Map内的元素会
用在Channel实例被创建时
,调用native方法为底层socket设置相关属性
,若想移除内核默认socket的某个属性值,只要将参数value设置null即可
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return self();
}
- childOption方法
同理该方法即是用在channel被创建时指定底层socket属性,
若想移除内核默认socket的某个属性值,只要将参数value设置null即可
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
至此ServerBootstrap基本属性都通过了build的方式赋值完毕,接下来看下关键方法bind方法做了
bind方法定义在了其抽象父类AbstractBootstrap中
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
new了一个InetSocketAddress对象作为参数继续调用了重载的bind方法
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
validate方法做了一些基础校验,包括group属性即bossGroup是否为null以及channelFactory是否为null等
接下来继续调用了doBind方法,重点来了
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
首先我们看下该方法的返回值是一个ChannelFuture对象
,至于ChannelFuture是一个通道异步IO操作结果
,因为Netty中所有IO操作都是以异步的方式,后面我会专门一篇文章来分析ChannelFuture
Netty源码之ChannelFuture
doBind方法首先调用了initAndRegister方法
,从方法名上我们知道这是一个初始化和注册的方法,初始化谁?注册谁?
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 1. 利用通道工厂创建一个新的channel,这里内部就是利用NioServerSocketChannel构造反射一个实例,并利用默认的SelectorProvider open了一个NioServerSocketChannel实例
channel = channelFactory.newChannel();
// 2. 紧接着调用init方法,该方法为抽象方法,实际的init实现在ServerBootstrap中,该方法主要处理该channel的ChannelOption属性,attr属性以及pipeline以及相应的handler等信息
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 3. 注册该channel到serverBootstrap的bossGroup事件循环组上,具体的注册需要由事件循环组`选择`一个事件循环(`NioEventLoop`)来注册
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
- 继续向下看ReflectiveChannelFactory的
newChannel方法
做了什么
public T newChannel() {
try {
return constructor.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
这里由于之前赋值了属性constructor为NioServerSocketChannel的构造器,故反射调用实例化
NioServerSocketChannel
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
利用默认的SelectorProvider 去openServerSocketChannel,在继续调用this()方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
这里可以看到ServerSocketChannel 实例构建以后再去调用父类方法
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
即赋值id属性,以及new一个该Channel所属的pipeline
至此ServerSocketChannel实例化完成
- init(channel)方法做了什么?
init方法是AbstractBootstrap定义的抽象方法,具体的实现是由ServerBootstrap实现的,我们直接子类的实现
void init(Channel channel) throws Exception {
// 1. 获取options集合中,将其设置到底层ServerSocket上
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
// 2. 获取attrs集合,设置到channel上
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
// 3. 获取channel所属的pipeline
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
// 4. 重点是这里为ServerSocketChannel添加了一个入站处理器ServerBootstrapAcceptor,该处理器会当ServerSocketChannel有Accept事件时负责将
socketChannel注册到currentChildGroup,并设置currentChildHandler等工作,即ServerBootstrapAcceptor是一个桥梁,联通了bossGroup和workerGroup
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
-
config().group().register(channel)
注册
当ServerSocketChannel实例化后,并init完成,但是此时该channle还没有注册到eventLoop上,接下来就会完成注册动作
config().group()方法就是获取当前bossGroup事件循环组实例,即NioEventLoopGroup,我们直接看它的register方法做了什么
NioEventLoopGroup的register方法并没有覆写其抽象父类MultithreadEventLoopGroup的方法
MultithreadEventLoopGroup
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
这里调用了内部的next方法,返回一个EventLoop实例,这里大家思考以下next方法的作用是什么?
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
这里调用了其父类MultithreadEventExecutorGroup的next方法,继续
@Override
public EventExecutor next() {
return chooser.next();
}
到了这里大家是否明白next方法的作用了呢?文章开头部分讲述了在new NioEventLoopGroup对象的时候有一个属性是chooser的赋值,该属性是一个eventLoop选择器,因为我们的NioEventLoopGroup对象内有一个eventLoop数组,当我们在注册某个channel的时候到底是注册到哪个eventLoop上呢?这个工作由这个选择器来完成
当时实例化的是PowerOfTwoEventExecutorChooser选择器,选择的规则就是用一个自增的AtomicInteger类型的idx值去取模eventLoop数组的length - 1,就得到了channel需要注册到的eventLoop数组的下标从而取出对应的eventLoop去注册
AbstractChannel
类的注册方法
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 赋值该channel所要注册到的事件循环是哪个
AbstractChannel.this.eventLoop = eventLoop;
// 判断当前执行线程是否是该eventLoop内部的单线程,当前线程是Main,此时的eventLoop 内的单线程为null,还没有启动过
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 这里调用eventLoop的execute提交任务执行register0注册
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
继续向下看NioEventLoop的execute方法,该方法定义在其抽象父类
SingleThreadEventExecutor
内,NioEventLoop并未重写该方法
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判断当前执行线程是否是该EventLoop自身线程,这里是Main方法调用的,且该EventLoop自身线程还未启动(`thread属性仍为null`),返回false
boolean inEventLoop = inEventLoop();
// 将需要执行的任务丢进taskQueue任务队列中
addTask(task);
if (!inEventLoop) {
// 这里会新建该EventLoop的唯一单线程,并调用其run方法启动事件循环
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
startThread方法
启动EventLoop的执行线程
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
可以看到这里利用了CAS实现保证了只启动一个线程,再看doStartThread方法内部
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 调用自身的run方法,该方法由NioEventLoop覆写了
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
if (logger.isErrorEnabled()) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
"be called before run() implementation terminates.");
}
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
// Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
// the future. The user may block on the future and once it unblocks the JVM may terminate
// and start unloading classes.
// See https://github.com/netty/netty/issues/6596.
FastThreadLocal.removeAll();
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
if (logger.isWarnEnabled()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
这里的executor是当时创建NioEvnetLoopGroup时new的ThreadPerTaskExecutor
任务执行器,包含了一个默认的线程工厂(含有“nioEventLoopGroup-2-”
前缀属性,即一个事件循环组对应一个默认线程工厂)
ThreadPerTaskExecutor类
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
这里就是调用线程工厂去new一个线程,并把任务传进去执行该任务,注意这里的command已经交给了新建的线程执行了
因此thread = Thread.currentThread()
被赋予了当前执行的新线程即以“nioEventLoopGroup-2-”
前缀命名的EventLoop线程
这里的SingleThreadEventExecutor.this.run(),EventLoop线程正式启动了NioEventLoop的事件循环方法,而在其run方法中会利用for死循环不断执行该EventLoop的IO任务以及非IO任务(此处的注册任务就属于非IO任务)
initAndRegister
方法执行完成
了,断点一下,我们看看此时ServerSocketChannel的属性状态
可以看到registered状态为true表示已注册,我们通过cmd查看一下本机端口占用情况
此时的端口8081还未被监听,也就是说ServerSocketChannel还没有bind到8081端口并启动监听,继续往下看代码
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
可以看到这里就是通过获取ServerSocketChannel注册到的那个EventLoop实例并提交一个任务,任务就是将ServerSocketChannel 绑定到指定的端口上
bind实际上是调用了NioServerSocketChannel
的doBind方法
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
这里的javaChannel()
的bind方法实际上是调用了ServerSocketChannelImpl实例
自身的bind方法
,注意这里用到了config里的backlog属性值,如果在ServerBootstrap的option方法不指定的话,默认windows下该值默认为200,其他情况下为128
The SOMAXCONN value of the current machine. If failed to get the value, {@code 200} is used as a default value for Windows or {@code 128} for others.
这里有一篇fasionchan博主
发布的关于内核backlog参数的叙述
深入理解Linux TCP backlog
public ServerSocketChannel bind(SocketAddress var1, int var2) throws IOException {
synchronized(this.lock) {
if (!this.isOpen()) {
throw new ClosedChannelException();
} else if (this.isBound()) {
throw new AlreadyBoundException();
} else {
InetSocketAddress var4 = var1 == null ? new InetSocketAddress(0) : Net.checkAddress(var1);
SecurityManager var5 = System.getSecurityManager();
if (var5 != null) {
var5.checkListen(var4.getPort());
}
NetHooks.beforeTcpBind(this.fd, var4.getAddress(), var4.getPort());
Net.bind(this.fd, var4.getAddress(), var4.getPort());
Net.listen(this.fd, var2 < 1 ? 50 : var2);
synchronized(this.stateLock) {
this.localAddress = Net.localAddress(this.fd);
}
return this;
}
}
}
最终这里调用了bind方法和listen方法,即完成了端口的绑定和监听工作,等待Client端的connect请求
至此整个Netty Server端启动完成,下面用一张流程图表示整个Netty Server启动流程方便记忆
总结
这里回复一下文章开头的几个问题
EventLoopGroup的作用说白了就是个Selector事件循环组,当有channel需要注册时,他会提供选择某个NioEventLoop去注册的功能
至于有两个group的原因:bossGroup内提供的NioEventLoop数组是用来handle客户端的Accept连接请求的,而workGroup内的NioEventLoop则是处理客户端连接之后的事件循环(IO任务和非IO任务),一般bossGroup的NioEventLoop数组数量设为1, 而workGroup的数量默认为当前CPU数量的2被ServerBootstrap类的作用是一个简化Netty Server程序启动的启动类,其中包含了很多贯穿整个Netty程序需要用到的属性,比如channel方法指定了用于监听client socket连接的ServerSocket 的class类型,childHandler方法则用于当有client连接事件准备好后,并创建了对应的SocketChannel后,该SocketChannel对应的pipeline中需要添加进去哪些handler
ServerBootstrap的bind方法是Netty Server启动的最后关键一步,前期的相关方法都可以认为是相关静态属性赋值(底层Selector初始化除外),通过bind方法,Netty实例化了NioServerSocketChannle实例,并为其init了该channel相应的option属性以及pipeline内的相关handler,这里add了一个重要的handler就是ServerBootstrap的内部类
ServerBootstrapAcceptor
,该类继承了ChannelInboundHandlerAdapter
入站处理器,client连接以后,ServerSocket 的pipeline最有一个处理器就是该处理器,会在该处理器中进行socketChannel的相关init工作(option属性配置,pipeline添加childHandler以及调用workGroup发起注册OP_READ动作等),最有bind方法会进行该NioServerSocketChannle的内核底层绑定端口,并监听端口,等待client的请求连接最后说下
ChannelFuture
的作用,Netty中所有的异步channel的IO操作都是异步的方式
,意味着所有的IO操作都将会立即返回
,即无法保证所请求的I/O操作在调用结束时已完成,代替的是将会得到一个ChannelFuture的实例
,通过这个实例我们将可以得到IO操作的结果和状态
而获取结果的方式是给这个ChannelFuture的实例增加一个Listener
,当该ChannelFuture实例isDone为true的时候,会通知该监听器的operationComplete方法,而我们会在该方法内部再编写逻辑,根据IO操作的结果是成功还是失败而做出不同的处理
以上就是Netty Server端启动的所有分析,因水平有限,如有错误的地方还望不吝指出,共同进步...