1 概述
本文主要介绍Netty的服务端启动过程。
2 服务端的典型编码
我们首先看下如果使用Netty作为通信框架,服务端源码一般长啥样,下面先列一下简单的代码示例:
public class TimeServer {
public void bind(int port) throws Exception {
//定义两个EventLoopGroup,一个负责用户接收并
//建立客户端连接,另一个负责处理已经建立连接的客户端
//事件处理,当然也可以只用一个group
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
//因为这里是服务端,所以使用的channel类型
//为NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//设置一些客户端channel的选项,后续接受客户端
//连接时会用到
.childOption(ChannelOption.SO_KEEPALIVE, true)
//设置一些客户端channel属性,后续接受客户端
//连接时会用到
.childAttr(AttributeKey.valueOf("childChannelAttr"), "attrValue")
//设置服务端channel选项,服务端channel实例化后会用到
.option(ChannelOption.AUTO_READ, true)
//设置服务端channel属性,服务端channel实例化后会用到
.attr(AttributeKey.valueOf("serverChannelAttr"), "attrValue")
//设置客户端pipeline的handler,客户端建立连接之后会
//在其pipeline中添加该handler
//在有事件继续时会调用该handler进行处理。
.childHandler(new ChildChannelHandler())
//设置服务端的handler,服务端相关事件就绪之后会调用该
//handler
.handler(new ServerLoggerHandler());
//绑定端口启动服务端
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
} finally {
//优雅停机
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeServer().bind(8080);
}
}
//服务端中每个客户端channel使用的handler,ChannelInitializer类型的
//handler一般用于在通道初始化之后注册进行实际工作的handler
class ChildChannelHandler extends ChannelInitializer<Channel> {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TimerServerHandler());
}
}
//服务端中每个客户端channel使用的handler
class TimerServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server channelRead");
ctx.fireChannelRead(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
}
}
//服务端channel使用的handler
class ServerLoggerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//do some log operation
ctx.fireChannelRead(msg);
}
}
3 一些配置函数
在上面的示例代码中,我们连缀调用了许多类ServerBootstrap
的配置函数,如下所示,类ServerBootstrap
如其名字所示,负责服务端的配置和启动,相应的还有一个类Bootstrap
负责客户端的配置和启动,下面我们分别进行讲解:
b.group(bossGroup, workerGroup)
//因为这里是服务端,所以使用的channel类型
//为NioServerSocketChannel
.channel(NioServerSocketChannel.class)
//设置一些客户端channel的选项,后续接受客户端
//连接时会用到
.childOption(ChannelOption.SO_KEEPALIVE, true)
//设置服务端channel选项,服务端channel实例化后会用到
.option(ChannelOption.AUTO_READ, true)
//设置一些客户端channel属性,后续接受客户端
//连接时会用到
.childAttr(AttributeKey.valueOf("childChannelAttr"), "attrValue")
//设置服务端channel属性,服务端channel实例化后会用到
.attr(AttributeKey.valueOf("serverChannelAttr"), "attrValue")
//设置客户端pipeline的handler,客户端建立连接之后会
//在其pipeline中添加该handler
//在有事件继续时会调用该handler进行处理。
.childHandler(new ChildChannelHandler())
//设置服务端的handler,服务端相关事件就绪之后会调用该
//handler
.handler(new ServerLoggerHandler());
group
//ServerBootstrap
/**
* Set the {@link EventLoopGroup} for the parent (acceptor) and the child (client). These
* {@link EventLoopGroup}'s are used to handle all the events and IO for {@link ServerChannel} and
* {@link Channel}'s.
*/
//在ServerBootstrap中有两个线程group,分别为group和childGroup,
//每个group对应一个线程数组,每一个线程(其实就是NioEventLoop类)
//对应一个Selector,负责进行select,服务端如果绑定了多个端口,
//那么每个端口会绑定到group线程组中的某个线程上,
//负责端口接受客户端连接,接受到的客户端
//channel则会绑定到childGroup中的某个线程上,负责select客户端事件
//通过group和childGroup的不同配置,可以实现不同的线程模型,也就是
//Netty实现的不同类型的Reactor模式,具体可自行查询
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
channel
//
/**
* The {@link Class} which is used to create {@link Channel} instances from.
* You either use this or {@link #channelFactory(io.netty.channel.ChannelFactory)} if your
* {@link Channel} implementation has no no-args constructor.
*/
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
/**
* {@link io.netty.channel.ChannelFactory} which is used to create {@link Channel} instances from
* when calling {@link #bind()}. This method is usually only used if {@link #channel(Class)}
* is not working for you because of some more complex needs. If your {@link Channel} implementation
* has a no-args constructor, its highly recommend to just use {@link #channel(Class)} for
* simplify your code.
*/
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory<? extends C> channelFactory) {
return channelFactory((ChannelFactory<C>) channelFactory);
}
/**
* @deprecated Use {@link #channelFactory(io.netty.channel.ChannelFactory)} instead.
*/
@Deprecated
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
//配置channel会构造一个channelFactory负责实例化配置的channel
this.channelFactory = channelFactory;
return self();
}
ServerBootstrap.channel
则负责配置服务端对应的channel,Netty对Java NIO的原生channel进行了封装,使其使用更加统一,这里我们配置的是NioServerSocketChannel
childOption
//ServerBootstrap
/**
* Allow to specify a {@link ChannelOption} which is used for the {@link Channel} instances once they get created
* (after the acceptor accepted the {@link Channel}). Use a value of {@code null} to remove a previous set
* {@link ChannelOption}.
*/
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
childOption
主要负责在后续接受到客户端连接并建立了客户端channel时,对channel的配置,可配置的一些选项都定义在ChannelOption
类的常量中,其中我们常见的一些配置如下:
//ChannelOption
public static final ChannelOption<Boolean> SO_BROADCAST = valueOf("SO_BROADCAST");
public static final ChannelOption<Boolean> SO_KEEPALIVE = valueOf("SO_KEEPALIVE");
public static final ChannelOption<Integer> SO_SNDBUF = valueOf("SO_SNDBUF");
public static final ChannelOption<Integer> SO_RCVBUF = valueOf("SO_RCVBUF");
public static final ChannelOption<Boolean> SO_REUSEADDR = valueOf("SO_REUSEADDR");
public static final ChannelOption<Integer> SO_LINGER = valueOf("SO_LINGER");
public static final ChannelOption<Integer> SO_BACKLOG = valueOf("SO_BACKLOG");
public static final ChannelOption<Integer> SO_TIMEOUT = valueOf("SO_TIMEOUT");
这些配置看着很眼熟,其实对应了java对socket的配置参数,接口java.net.SocketOptions
中如下图所示,这里不再介绍,源码中有比较详细的介绍,可自行翻阅其源码,childOption
指定的配置会保存在Map childOptions
中。
option
option
和childOption
同理,但是childOption
主要负责对建立的客户端连接channel进行配置,而option
则负责对服务端的channel进行配置,通过option
方法指定的服务端channel配置选项会保存在Mapoptions
中。childAttr
//ServerBootstrap
/**
* Set the specific {@link AttributeKey} with the given value on every child {@link Channel}. If the value is
* {@code null} the {@link AttributeKey} is removed
*/
public <T> ServerBootstrap childAttr(AttributeKey<T> childKey, T value) {
if (childKey == null) {
throw new NullPointerException("childKey");
}
if (value == null) {
childAttrs.remove(childKey);
} else {
childAttrs.put(childKey, value);
}
return this;
}
childAttr
方法用于为建立的客户端连接的channel指定一些属性,主要是应用自己指定的一些属性,比如打上标签,然后后续进行分类等。保存在childAttrs
中。
attr
attr
和childAttr
方法类似,但是attr
则用于为server channel指定一些属性,保存在attrs
中。childHandler
//ServerBootstrap
/**
* Set the {@link ChannelHandler} which is used to serve the request for the {@link Channel}'s.
*/
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
在文章Netty源码-ChannelPipeline和ChannelHandler我们介绍了每个channel都会有一个pipeline,childHandler
就是指定在服务端接收客户端连接并建立了客户端连接对应的channel之后,该channel pipeline中对应的handler,常规编码中我们一般会指定一个ChannelInitializer
对象,在客户端channel注册之后,向其pipeline中注册多个handler,比如编码handler、解码handler、实际业务handler等。ChannelInitializer
handler和其他handler不同的是,在其channelRegistered
方法调用完之后其会从pipeline中移除自己。
handler
//AbstractBootstrap
/**
* the {@link ChannelHandler} to use for serving the requests.
*/
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return self();
}
handler
方法负责指定服务端channel pipeline中的handler,其实在ServerBootstrap
初始化方法init
中会向服务端channel注册一个ChannelInitializer
,这里通过handler
方法指定的handler会在ChannelInitializer
的initChannel
被添加到pipeline中,除此之外,还会添加一个默认的ServerBootstrapAcceptor
,负责处理客户端连接的相关逻辑,其实这个handler
方法一般可以不同调用配置的。
上面还有一个知识点,设置通道选项ChannelOption
或者属性AttributeKey
时都使用了常量池ConstantPool
,可见笔者文章Netty源码-常量池ConstantPool对其的简单介绍。
上面介绍了常用的配置方法,下面我们具体看服务端的启动过程。
4 服务端启动
服务端的启动通过ServerBootstrap.bind
方法触发,其定义在ServerBootstrap
父类AbstractBootstrap
中:
//AbstractBootstrap
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
/**
* Create a new {@link Channel} and bind it.
*/
public ChannelFuture bind(SocketAddress localAddress) {
//对配置进行验证
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
//调用实际的绑定方法
return doBind(localAddress);
}
/**
* Validate all the parameters. Sub-classes may override this, but should
* call the super method in that case.
*/
public B validate() {
//首先验证group不能为空
if (group == null) {
throw new IllegalStateException("group not set");
}
//验证channelFactory不能为空,channelFactory会在上面调用
//channel配置通道时被初始化,主要用于生成配置的channel实例
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return self();
}
//doBind主要有三个步骤,第一是实例化channel对象实例,第二是向group
//中的某个NioEventLoop注册自己,即向selector注册channel,第三是在上面
//两件事完成之后,向channel绑定端口
private ChannelFuture doBind(final SocketAddress localAddress) {
//这里的方法完成第一和第二件个步骤,初始化并注册自己
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//注册完成之后,则进行第三件步骤,绑定端口
//如果一二没有完成,则注册一个回调Listener,
//在一二完成之后子再自动调用Listener进行端口绑定
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
4.1 channel初始化和注册
AbstractBootstrap.initAndRegister
负责通道初始化和注册:
//AbstractBootstrap
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//通过channelFactory实例化我们配置的服务端channel
//NioServerSocketChannel
channel = channelFactory.newChannel();
//进行初始化
init(channel);
} catch (Throwable t) {
//异常处理
if (channel != null) {
// channel can be null if newChannel crashed (eg SocketException("too many open files"))
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//向group中的某个NioEventLoop注册自己
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now because the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
AbstractBootstrap.init
初始化方法根据服务端和客户端有不同的定义,服务端定义在其子类ServerBootstrap
中定义如下:
//ServerBootstrap
@Override
void init(Channel channel) throws Exception {
//用上面介绍的option方法指定的选项对channel进行配置
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//将上面介绍使用attr方法指定的属性附加到channel上
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//获取该服务端channel的pipeline
ChannelPipeline p = channel.pipeline();
//下面获取childGroup/childHandler/childOptions以及
//childAtts
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//向服务端channel的pipeline中注册ChannelInitializer
//负责初始化其pipeline
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
//首先添加通过ServerBootstrap.handler方法配置的
//handler
if (handler != null) {
pipeline.addLast(handler);
}
//然后如果在该channel注册的NioEventLoop自己的线程中
//则表示该线程已启动,此时则向服务端channel的pipeline
//中注册负责处理客户端连接的ServerBootstrapAcceptor
//传入上面通过child*方法配置的childGroup/
//childHandler/childOptions以及childAtts
//其实这里还引申出编码时应该注意的一个问题,因为
//这里向多个客户端channel的pipeline注册的childHandler
//为同一个,所以需要多个线程修改数据的可能,比如解码器
//所以我们一般使用ChannelInitializer注册handler,然后在
//初始化通道时通过addLast(new *Handler())的方法为每个通道
//注册一个新的对象实例,避免共享同一个对象实例
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
初始化的逻辑比较简单,到这里已经介绍完毕,下面看server channel是如何进行注册的,在AbstractBootstrap.initAndRegister
方法中通过config().group().register(channel)
进行注册。其中config
返回该channel的默认配置类ServerBootstrapConfig
,ServerBootstrapConfig.group
则ServerBootstrap.group
线程组,所以实际调用的是NioEventLoopGroup.register
方法,在其父类MultithreadEventLoopGroup
定义如下:
//MultithreadEventLoopGroup
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
每个NioEventLoopGroup
在其构造函数会实例化多个NioEventLoop
对象实例,并通过数组持有这些实例,next
方法则使用配置的EventExecutorChooserFactory.EventExecutorChooser
选择器从这些EventLoop中选出一个进行注册,默认的选择器工厂类实现DefaultEventExecutorChooserFactory
会根据EventLoop数组长度是否为2的指数次方,决定默认的选择器为PowerOfTwoEventExecutorChooser
还是GenericEventExecutorChooser
,二者目前都采用轮询方式从NioEventLoopGroup
中选择NioEventLoop
,不同的是如果长度为2的指数次方,则在轮询求余时可以采用& length
进行优化,具体可查看二者实现源码,限于篇幅,这里不进行介绍。所以最终调用的是NioEventLoop.register
方法,在其父类SingleThreadEventLoop
中定义如下:
//SingleThreadEventLoop
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
可见,在SingleThreadEventLoop.register
中最终调用了Unsafe.register
方法,在其实现类AbstractUnsafe
定义如下:
//AbstractUnsafe
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//如果eventLoop为空,则抛错
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
//如果已经注册了则返回失败
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
//判断该eventloop是否兼容,也就是group里的eventLoop是否
//和该通道的类型兼容,比如nio类型的通道必须对应nio类型
//的eventloop
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
//如果当前线程就是该eventLoop对应的线程,则直接调用register0进行
//实际注册操作,否则将该任务加入到该eventLoop的任务队列中,如果是第
//一次向其任务队列加入任务,也会同时初始化并启动该eventLoop关联的
//线程
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
//进行实际的注册工作
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
//真正的注册工作是放在AbstractUnsafe外部类AbstractChannel
//中定义的
doRegister();
neverRegistered = false;
//将是否注册过的标识置为true,上面有判断该通道是否已经注册过
//就是通过该变量标识的
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
//注册完成,触发handler的channelRegistered方法
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
//如果通道已经处于激活状态,则触发
//handler的channelActive方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
//向eventLoop的selector注册通道时并没有设置
//感兴趣的事件,这里调用此方法注册感兴趣的事件
//比如OP_ACCEPT等
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
AbstractChannel.doRegister
方法,我们看其在子类AbstractNioChannel
的定义:
@Override
protected void doRegister() throws Exception {
boolean selected = false;
//通过多次循环,如果没有抛错,则持续注册,直到注册成功
for (;;) {
try {
//这里也看到了熟悉的Java NIO的注册操作,将该Netty channel
//关联的java channel注册到该eventLoop持有的selector中
//注册时,并没有指定感兴趣的事件
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
//如果抛出异常,则尝试调用selectNow看是否能再次注册
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
上面提到AbstractUnsafe.register0
注册成功之后调用beginRead
方法,其实现比较简单,可自行查看源码,主要是向方法doRegister
注册之后返回的SelectionKey
注册感兴趣的对象。Java NIO编程中,事件注册有两种方式,一种是在向Selector
注册channel时同时指定感兴趣的事件,另一种是设置返回的SelectionKey
感兴趣的事件。这里的beginRead
方法调用还需要配置ChannelOption.AUTO_READ
为true,否则可以在自己定义的Handler中调用ChannelHandlerContext.read
方法,这个方法的调用最终也会调用beginRead
注册OP_ACCEPT
事件。
至此,我们已经介绍了服务端启动过程中三个步骤中的前两个:初始化和注册,下面介绍绑定。
4.2 绑定
我们在第4节的开始介绍ServerBootstrap.doBind
方法时,介绍了服务端启动主要完成三个步骤:初始化、注册和绑定,初始化和注册完成之后,会进行绑定操作。绑定操作在doBind0
方法中完成,其在AbstractBootstrap
定义如下:
//AbstractBootstrap
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
//在通道中绑定
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
channel.bind
方法定义如下:
//AbstractChannel
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
调用pipeline.bind
方法进行绑定,可见最终是通过调用pipeline中注册的handler进行绑定的,因为绑定属于outbound事件(关于pipeline、handler、outbound事件可见笔者文章Netty源码-ChannelPipeline和ChannelHandler),所以最后调用HeadContext.bind
:
//HeadContext
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
//调用Unsafe.bind进行绑定
unsafe.bind(localAddress, promise);
}
//AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
// Warn a user about the fact that a non-root user can't receive a
// broadcast packet on *nix if the socket is bound on non-wildcard address.
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
//调用实际的绑定工作,doBind
//是AbstractUnsafe外部类AbstractChannel的方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
AbstractChannel.doBind
我们看下其在子类NioServerSocketChannel
中的实现:
//NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
//根据Java版本号,在java channel上进行绑定
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
Netty服务端的启动大概就是这些内容,这里主要介绍了主要的启动流程,其中的具体细节比如Promise
、NioEventLoop
实例化等没有详细介绍。