基于Netty源代码版本:netty-all-4.1.33.Final
前言
通过Netty的ServerBootstrap的实例入手对其进行一个简单的分析。
来先看看服务端的代码:
public class MyServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
//初始化一个服务端引导类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup)//设置线程组
.channel(NioServerSocketChannel.class)//设置ServerSocketChannel的IO模型 分为epoll与Nio
.childHandler(new MyServerInitializer())//这个handler 将会在每个客户端连接的时候调用。供 SocketChannel 使用。
.handler(new SimpleServerHandler())//这个hanlder 只专属于 ServerSocketChannel 而不是 SocketChannel。
.option(ChannelOption.SO_BACKLOG, 128)//设置option参数,保存成一个LinkedHashMap<ChannelOption<?>, Object>()
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
本文将从源码的角度分析ChannelFuture channelFuture = serverBootstrap.bind(8899).sync();的内部实现。这样就完成了Netty服务器端启动过程的源码分析。
我们首先要知道,netty服务的启动其实可以分为以下四步:
- 创建服务端Channel
- 初始化服务端Channel
- 注册Selector
- 端口绑定
一、创建服务端Channel
1、服务端Channel的创建,主要为以下流程
AbstractBootstrap.java
/**
* 创建一个新的Channel并绑定端口
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
我们接着看重载的bind
public ChannelFuture bind(SocketAddress localAddress) {
validate(); //相关参数的检查
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);//主要实现方法
}
该函数主要看两点:validate()和doBind(localAddress)
validate()方法
//函数功能:检查相关参数是否设置了
public B validate() {
if (group == null) {//这里的group指的是:serverBootstrap.group(bossGroup, workerGroup)代码中的bossGroup
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
该方法主要检查了两个参数,一个是group,一个是channelFactory,在这里可以想一想这两个参数是在哪里以及何时被赋值的?答案是在如下代码块中被赋值的,其中是将bossGroup赋值给了group,将BootstrapChannelFactory赋值给了channelFactory.
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
doBind(localAddress)方法
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();// 初始化并创建 NioServerSocketChannel
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind这个函数是我们要分析的重点,这个函数的主要工作有如下几点:
- 1、通过initAndRegister()方法得到一个ChannelFuture的实例regFuture。
- 2、通过regFuture.cause()方法判断是否在执行initAndRegister方法时产生来异常。如果产生来异常,则直接返回,如果没有产生异常则进行第3步。
- 3、通过regFuture.isDone()来判断initAndRegister方法是否执行完毕,如果执行完毕来返回true,然后调用doBind0进行socket绑定。如果没有执行完毕则返回false进行第4步。
- 4、regFuture会添加一个ChannelFutureListener监听,当initAndRegister执行完成时,调用operationComplete方法并执行doBind0进行socket绑定。
第3、4点想干的事就是一个:调用doBind0方法进行socket绑定。
下面将分成4部分对每行代码具体做了哪些工作进行详细分析。
initAndRegister()
该方法的具体代码如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//结论:这里的channel为一个NioServerSocketChannel对象,具体分析见后面
channel = channelFactory.newChannel();// 通过 反射工厂创建一个 NioServerSocketChannel
init(channel);
} catch (Throwable t) {
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
通过函数名以及内部调用的函数可以猜测该函数干了两件事情:
1、初始化一个Channel,要想初始化,肯定要先得到一个Channel。
Channel channel = channelFactory.newChannel();
init(channel);
2、将Channel进行注册。
ChannelFuture regFuture = config().group().register(channel);
下面我们将分析这几行代码内部干来些什么。
Channel channel = channelFactory.newChannel();
在上一篇文章中Netty源码分析--ServerBootstrap分析中,我们知道serverBootstrap.channel(NioServerSocketChannel.class)的功能为:设置父类属性channelFactory 为: ReflectiveChannelFactory类的对象。其中这里ReflectiveChannelFactory对象中包括一个clazz属性为:NioServerSocketChannel.class
因此,final Channel channel = channelFactory().newChannel();就是调用的ReflectiveChannelFactory类中的newChannel()方法,该方法的具体内容为:
public class ReflectiveChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) +
" does not have a public non-arg constructor", e);
}
}
@Override
public T newChannel() {
try {
return constructor.newInstance();//反射创建
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t);
}
}
@Override
public String toString() {
return StringUtil.simpleClassName(ReflectiveChannelFactory.class) +
'(' + StringUtil.simpleClassName(constructor.getDeclaringClass()) + ".class)";
}
}
看到这个类,我们可以得到的结论:final Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例。
NioServerSocketChannel构造器
下面将看下NioServerSocketChannel类的构造函数做了哪些工作。
NioServerSocketChannel类的继承体系结构如下:
其无参构造函数如下:
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));//传入默认的SelectorProvider
}
无参构造函数中private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
函数newSocket的功能为:利用SelectorProvider产生一个ServerSocketChannel对象。
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();//可以看到创建的是jdk底层的ServerSocketChannel
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
无参构造函数通过newSocket函数产生了一个ServerSocketChannel对象
然后调用了如下构造函数,我们继续看
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);//调用父类构造函数,传入创建的channel
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//父类AbstractNioMessageChannel的构造函数
protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent, ch, readInterestOp);
}
//父类 AbstractNioChannel的构造函数
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;//这个ch就是传入的通过jdk创建的Channel
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
try {
ch.configureBlocking(false);//设置当前的ServerSocketChannel为非阻塞的
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//父类AbstractChannel的构造函数
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();//创建Channel唯一标识
unsafe = newUnsafe();//netty封装的TCP 相关操作类
pipeline = newChannelPipeline();//逻辑链
}
ReflectiveChannelFactory类中的newChannel()方法反射产生一个NioServerSocketChannel实例对象时,调用上面这么多构造函数主要干了两件事情:
- 1、产生来一个ServerSocketChannel类的实例,设置到ch属性中,并设置为非阻塞的。
- 2、调用AbstractChannel这个抽象类的构造函数设置Channel的id(每个Channel都有一个id,唯一标识),unsafe(tcp相关底层操作),pipeline(逻辑链)等,而不管是服务的Channel还是客户端的Channel都继承自这个抽象类,他们也都会有上述相应的属性
this.ch = ch;//ch为一个ServerSocketChannel
ch.configureBlocking(false);
- 2、设置了config属性
ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket());
- 3、设置SelectionKey.OP_ACCEPT事件
this.readInterestOp = readInterestOp;//SelectionKey.OP_ACCEPT
- 4、设置unsafe属性
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioByteUnsafe();
}
主要作用为:用来负责底层的connect、register、read和write等操作。
- 5、设置pipeline属性
pipeline = newChannelPipeline();
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
每个Channel都有自己的pipeline,当有请求事件发生时,pipeline负责调用相应的hander进行处理。
这些属性在后面都会用到,至于NioServerSocketChannel 对象中的unsafe、pipeline属性的具体实现后面进行分析。
结论:
Channel channel = channelFactory().newChannel();这行代码的作用为通过反射产生来一个NioServerSocketChannel类的实例,其中这个NioServerSocketChannel类对象有这样几个属性:SocketChannel、NioServerSocketChannelConfig 、SelectionKey.OP_ACCEPT事件、NioMessageUnsafe、DefaultChannelPipeline
二、初始化服务端创建的Channel
init(channel);// 初始化这个 NioServerSocketChannel
我们首先列举下init(channel)中具体都做了哪了些功能:
- 设置ChannelOptions、ChannelAttrs ,配置服务端Channel的相关属性
- 设置ChildOptions、ChildAttrs,配置每个新连接的Channel的相关属性
- Config handler,配置服务端pipeline
-
add ServerBootstrapAcceptor,添加连接器,对accpet接受到的新连接进行处理,添加一个nio线程
init方法的具体代码如下:
@Override
void init(Channel channel) throws Exception {
//1、设置新接入channel的option
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);//设置NioServerSocketChannel相应的TCP参数,其实这一步就是把options设置到channel的config中
}
//2、设置新接入channel的attr
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//3、设置handler到pipeline上
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//可以看到两个都是局部变量,会在下面设置pipeline时用到
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(0));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(0));
}
//p.addLast()向ChannelPipeline添加一个ChannelInitializer并重写initChannel()方法
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
//这里的handler()返回的就是serverBootstrap.handler(new SimpleServerHandler())所设置的SimpleServerHandler
ChannelHandler handler = config.handler();
if (handler != null) {
//如果serverBootstrap设置了handler()那么将handler()方法的值设置到pipeline的最后
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//在这里会把我们自定义的ChildGroup、ChildHandler、ChildOptions、ChildAttrs相关配置传入到ServerBootstrapAcceptor接收器的构造函数中,并绑定到新的连接上
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
该函数的功能为:
- 1、设置channel的options
如果没有设置,则options为空,该属性在ServerBootstrap类中的定义如下
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
options可能如下:DefaultChannelConfig中的setOption()
public <T> boolean setOption(ChannelOption<T> option, T value) {
validate(option, value);
if (option == CONNECT_TIMEOUT_MILLIS) {
setConnectTimeoutMillis((Integer) value);
} else if (option == MAX_MESSAGES_PER_READ) {
setMaxMessagesPerRead((Integer) value);
} else if (option == WRITE_SPIN_COUNT) {
setWriteSpinCount((Integer) value);
} else if (option == ALLOCATOR) {
setAllocator((ByteBufAllocator) value);
} else if (option == RCVBUF_ALLOCATOR) {
setRecvByteBufAllocator((RecvByteBufAllocator) value);
} else if (option == AUTO_READ) {
setAutoRead((Boolean) value);
} else if (option == AUTO_CLOSE) {
setAutoClose((Boolean) value);
} else if (option == WRITE_BUFFER_HIGH_WATER_MARK) {
setWriteBufferHighWaterMark((Integer) value);
} else if (option == WRITE_BUFFER_LOW_WATER_MARK) {
setWriteBufferLowWaterMark((Integer) value);
} else if (option == WRITE_BUFFER_WATER_MARK) {
setWriteBufferWaterMark((WriteBufferWaterMark) value);
} else if (option == MESSAGE_SIZE_ESTIMATOR) {
setMessageSizeEstimator((MessageSizeEstimator) value);
} else if (option == SINGLE_EVENTEXECUTOR_PER_GROUP) {
setPinEventExecutorPerGroup((Boolean) value);
} else {
return false;
}
return true;
}
- 2、设置channel的attrs
如果没有设置,则attrs为空,该属性在ServerBootstrap类中的定义如下
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
- 3、如果serverBootstrap.handler()有值,那么将handler的参数设置到channel的pipeline上。
- 4、在pipeline上添加来一个ChannelInitializer对象,其中重写来initChannel方法。该方法通过p.addLast()向serverChannel的流水线处理器中加入了一个 ServerBootstrapAcceptor,
从名字上就可以看出来,这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器
看到这里,我们发现其实init只是初始化了一些基本的配置和属性,以及在pipeline上加入了一个接入器,用来专门接受新连接,并没有启动服务。
三、注册Selector
一个服务端的Channel创建完毕后,下一步就是要把它注册到一个事件轮询器Selector上,在initAndRegister()中我们把上面初始化的Channel进行注册
ChannelFuture regFuture = config().group().register(channel);//注册我们已经初始化过的Channel
回到 initAndRegister 方法中,继续看 config().group().register(channel) 这行代码,config 方法返回了 ServerBootstrapConfig,这个 ServerBootstrapConfig 调用了 group 方法,实际上就是 bossGroup。bossGroup 调用了 register 方法。
前面的分析我们知道group为:NioEvenLoopGroup,其继承MultithreadEventLoopGroup,该类中的register方法如下:
@Override
public ChannelFuture register(Channel channel) {
//调用了NioEvenLoop对象中的register方法,NioEventLoop extends SingleThreadEventLoop
return next().register(channel);
}
next()方法的代码如下,其功能为选择下一个NioEventLoop对象。
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
......
@Override
public EventExecutor next() {
return chooser.next();
}
......
}
根据线程个数nThreads是否为2的幂次方来选择chooser,其中这两个chooser为: PowerOfTwoEventExecutorChooser、GenericEventExecutorChooser,这两个chooser功能都是一样,只是求余的方式不一样。
next()方法返回的是一个NioEvenLoop对象
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
结论:由于NioEventLoopGroup中维护着多个NioEventLoop,next方法回调用chooser策略找到下一个NioEventLoop,并执行该对象的register方法进行注册。
由于NioEventLoop extends SingleThreadEventLoop,NioEventLoop没有重写该方法,因此看 SingleThreadEventLoop类中的register方法
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
@Deprecated
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
在本文NioServerSocketChannel实例化中设置来unsafe属性,具体是调用如下的方法来设置的,因此这里的channel.unsafe()就是NioMessageUnsafe实例。
public abstract class AbstractNioMessageChannel extends AbstractNioChannel {
......
@Override
protected AbstractNioUnsafe newUnsafe() {
return new NioMessageUnsafe();
}
......
}
channel.unsafe().register(this, promise)这行代码调用的是AbstractUnsafe类中的register方法,具体代码如下:
/**
* 1、先是一系列的判断。
* 2、判断当前线程是否是给定的 eventLoop 线程。注意:这点很重要,Netty 线程模型的高性能取决于对于当前执行的Thread 的身份的确定。如果不在当前线程,那么就需要很多同步措施(比如加锁),上下文切换等耗费性能的操作。
* 3、异步(因为我们这里直到现在还是 main 线程在执行,不属于当前线程)的执行 register0 方法。
*/
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//判断该channel是否已经被注册到EventLoop中
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//1 将eventLoop设置在NioServerSocketChannel上
AbstractChannel.this.eventLoop = eventLoop;
//判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,则添加一个任务到该线程中
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {//重点
@Override
public void run() {
register0(promise);//实际的注册过程
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
上面的重点是register0(promise)方法。基本逻辑为:
- 1、通过调用eventLoop.inEventLoop()方法判断当前线程是否为该EventLoop中拥有的线程,如果是,则直接注册,如果不是,说明该EventLoop在等待并没有执行权,则进行第二步。
对整个注册的流程做一个梳理
AbstractEventExecutor
public abstract class AbstractEventExecutor extends AbstractExecutorService implements EventExecutor {
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
}
SingleThreadEventExecutor
public abstract class SingleThreadEventExecutor extends AbstractScheduledEventExecutor implements OrderedEventExecutor {
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
}
- 2、既然该EventLoop中的线程此时没有执行权,但是我们可以提交一个任务到该线程中,等该EventLoop的线程有执行权的时候就自然而然的会执行此任务,而该任务负责调用register0方法,这样也就达到了调用register0方法的目的。
下面看register0这个方法,具体代码如下:
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
//当寄存器调用在事件循环之外时,检查通道是否仍然打开,因为它可以在同一时间关闭
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();//jdk channel的底层注册
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 触发绑定的handler事件
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//执行完,控制台输出:channelRegistered,最终会调用到ChannelInboundHandlerAdapter实现类的channelRegistered()
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
在上面的代码中,是通过调用AbstractNioChannel中doRegister()的具体实现就是把jdk底层的channel绑定到eventLoop的selecor上
public abstract class AbstractNioChannel extends AbstractChannel {
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
//把channel注册到eventLoop上的selector上
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
protected SelectableChannel javaChannel() {
return ch;
}
}
在本文的NioServerSocketChannel的实例化分析中,我们知道这里的javaChannel()方法返回的ch为实例化NioServerSocketChannel时产生的一个SocketChannelImpl类的实例,并设置为非阻塞的。
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);就完成了ServerSocketChannel注册到Selector中。
ServerSocketChannel注册完之后,接着执行pipeline.fireChannelRegistered方法。
@Override
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
看next.invokeChannelRegistered();
private void invokeChannelRegistered() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRegistered(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRegistered();
}
}
接着看看this.handler(),实际上就是head的handler()
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public ChannelHandler handler() {
return this;
}
}
返回的是this,那接着看((ChannelInboundHandler) handler()).channelRegistered(this);
public class DefaultChannelPipeline implements ChannelPipeline {
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
invokeHandlerAddedIfNeeded();
ctx.fireChannelRegistered();
}
}
继续看ctx.fireChannelRegistered();
abstract class AbstractChannelHandlerContext extends DefaultAttributeMap implements ChannelHandlerContext, ResourceLeakHint {
@Override
public ChannelHandlerContext fireChannelRegistered() {
invokeChannelRegistered(findContextInbound());
return this;
}
}
我们看看findContextInbound()
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
我们看到 ctx = ctx.next; 实际上是从head开始找,找到第一个 inbound 的hander
static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRegistered();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRegistered();
}
});
}
}
最后执行next.invokeChannelRegistered();
pipeline中维护了handler链表,还记得之前.handler(new SimpleServerHandler())初始化的handler中介绍了此handler被添加到此pipeline中了,通过遍历链表,执行InBound类型handler的channelRegistered方法
因此执行到这里,我们的控制台就回输出:channelRegistered,这行信息。
到这里,我们就将doBind方法final ChannelFuture regFuture = initAndRegister();给分析完了,得到的结论如下:
- 1、通过反射产生了一个NioServerSocketChannle对象。
- 2、完成了初始化
- 3、将NioServerSocketChannel进行了注册。
四、端口绑定
首先我们梳理下netty中服务端口绑定的流程
接下来我们分析doBind方法的剩余部分代码主要做了什么,
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind0(regFuture, channel, localAddress, promise);
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
该函数主要是提交了一个Runnable任务到NioEventLoop线程中来进行处理。,这里先看一下NioEventLoop类的execute方法
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//判断当前线程是否为该NioEventLoop所关联的线程,如果是,则添加任务到任务队列中,
//如果不是,则先启动线程,然后添加任务到任务队列中去
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
startThread();
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// The task queue does not support removal so the best thing we can do is to just move on and
// hope we will be able to pick-up the task before its completely terminated.
// In worst case we will log on termination.
}
if (reject) {
reject();
}
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
当提交的任务被线程执行后,则会执行channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);这行代码,这行代码完成的功能为:实现channel与端口的绑定。
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
}
在该方法中直接调用了pipeline的bind方法,这里的pipeline时DefaultChannelPipeline的实例。
tail = new TailContext(this);
@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
在上面方法中直接调用了TailContext实例tail的bind方法,tail在下一篇博文中有详细的介绍。继续看tail实例的bind方法
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeBind(localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeBind(localAddress, promise);
}
}, promise, null);
}
return promise;
}
此上面bind函数中的这行代码:final AbstractChannelHandlerContext next = findContextOutbound();所完成的任务就是在pipeline所持有的以AbstractChannelHandlerContext为节点的双向链表中从尾节点tail开始向前寻找第一个outbound=true的handler节点。
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
在 DefaultChannelPipeline 的构造器中, 会实例化两个对象: head 和 tail, 并形成了双向链表的头和尾。 head 是 HeadContext 的实例, 它实现了 ChannelOutboundHandler 接口和ChannelInboundHandler 接口, 并且它的 outbound 字段为 true.而tail 是 TailContext 的实例,它实现了ChannelInboundHandler 接口,并且其outbound 字段为 false,inbound 字段为true。 基于此在如上的bind函数中调用了 findContextOutbound方法 找到的 AbstractChannelHandlerContext 对象其实就是 head.
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
继续看,在pipelie的双向链表中找到第一个outbound=true的AbstractChannelHandlerContext节点head后,然后调用此节点的invokeConnect方法,该方法的代码如下:
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
bind(localAddress, promise);
}
}
HeadContext类中的handler()方法代码如下:
@Override
public ChannelHandler handler() {
return this;
}
该方法返回的是其本身,这是因为HeadContext由于其继承AbstractChannelHandlerContext以及实现了ChannelHandler接口使其具有Context和Handler双重特性。
继续看,看HeadContext类中的bind方法,代码如下:
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
unsafe这个字段是在HeadContext构造函数中被初始化的,如下:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
而此构造函数中的pipeline.channel().unsafe()这行代码返回的就是在本博文前面研究NioServerSocketChannel这个类的构造函数中所初始化的一个实例,如下:
unsafe = newUnsafe();//newUnsafe()方法返回的是NioMessageUnsafe对象。
接下来看NioMessageUnsafe类中的bind方法(准确来说:该方法在AbstractUnsafe中),该类bind具体方法代码如下:
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();//判断绑定是否完成
try {
doBind(localAddress);//底层jdk绑定端口
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();//触发ChannelActive事件
}
});
}
safeSetSuccess(promise);
}
上面的核心代码就是:doBind(localAddress);需要注意的是,此doBind方法是在NioServerSocketChannel类中的doBind方法,不是其他类中的。
在doBind(localAddress, config.getBacklog())中netty实现了jdk底层端口的绑定
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
上面方法中javaChannel()方法返回的是NioServerSocketChannel实例初始化时所产生的Java NIO ServerSocketChannel实例(更具体点为ServerSocketChannelImple实例)。 等价于serverSocketChannel.socket().bind(localAddress)完成了指定端口的绑定,这样就开始监听此端口。绑定端口成功后,是这里调用了我们自定义handler的channelActive方法,在绑定之前,isActive()方法返回false,绑定之后返回true。
在 pipeline.fireChannelActive()中会触发pipeline中的channelActive()方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
在channelActive中首先会把ChannelActive事件往下传播,然后调用readIfIsAutoRead()方法出触发channel的read事件,而它最终调用AbstractNioChannel中的doBeginRead()方法
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);//readInterestOp为 SelectionKey.OP_ACCEPT
}
}
在doBeginRead()方法,netty会把accept事件注册到Selector上。
到此我们对netty服务端的启动流程有了一个大致的了解,整体可以概括为下面四步:
- 1、channelFactory.newChannel(),其实就是创建jdk底层channel,并初始化id、piepline等属性。
- 2、init(channel),初始化NioServerSocketChannel,设置option、attr等属性,并添加ServerBootstrapAcceptor连接器,并触发addHandler事件。
- 3、config().group().register(channel),通过 ServerBootstrap 的 bossGroup 根据group长度取模得到NioEventLoop ,将 NioServerSocketChannel 注册到 NioEventLoop 中的 selector 上,然后触发 channelRegistered事件。
- 4、doBind0(regFuture, channel, localAddress, promise),完成服务端端口的监听,并把accept事件注册到selector上。