1.服务端启动例子(基于4.0.31.Final)
public class Server {
private ServerBootstrap serverBootstrap;
private NioEventLoopGroup bossGroup;
private NioEventLoopGroup workGroup;
public static void main(String[] args) throws InterruptedException {
System.out.println("服务启动");
Server server = new Server();
server.start();
}
private void start() throws InterruptedException {
try {
serverBootstrap=new ServerBootstrap();
bossGroup = new NioEventLoopGroup();
workGroup = new NioEventLoopGroup(4);
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new InitHandler())
.childHandler(new IOChannelInitialize());
ChannelFuture future = serverBootstrap.bind(8802).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
private class IOChannelInitialize extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new IdleStateHandler(1000, 0, 0));
ch.pipeline().addLast(new IOHandler());
}
}
}
步骤说明
1.1 创建 ServerBootstrap 实例,它是 netty 的启动辅助类,提供了一系列的方法用于设置服务 端启动相关的参数。底层通过门面模式对各种能力进行抽象和封装,尽量不需要用户跟过 多的底层 API 打交道,降低用户的开发难度
1.2
NioEventLoopGroup 是 netty Reactor 线程池,bossGroup 监听和 accept 客户端连接,workGroup 则处理 IO ,编解码1.3
绑定服务端 NioServerSocketChannel1.4 设置一些参数
1.5 初始化 pipeline 并绑定 handler ,pipeline 是一个负责处理网络事件的职责链,负责管理和执行 ChannelHandler ,设置系统提供的 IdleStateHandler 和自定义 IOHandler
1.6 serverBootstrap.bind(8802) 这里才是启动服务端绑定端口
1.7 future.channel().closeFuture().sync(); 等待服务端关闭
1.8 优雅关闭
2. 源码分析
2.1 NioEventLoopGroup
NioEventLoopGroup 不仅仅是 I/O 线程,除了负责 I/O 的读写,还负责系统 Task 和定时任务
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {
this(nThreads, threadFactory, SelectorProvider.provider());
}
public NioEventLoopGroup(
int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {
super(nThreads, threadFactory, selectorProvider);
}
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}
继续,以下是精简代码
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
...
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
children = new SingleThreadEventExecutor[nThreads];
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}
for (int i = 0; i < nThreads; i ++) {
...
children[i] = newChild(threadFactory, args);
...
}
MultithreadEventExecutorGroup 实现了线程的创建和线程的选择,我们看看 newChild 方法( NioEventLoopGroup 类的方法),newChild 实例化线程
@Override
protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}
创建了一个 NioEventLoop
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
跟着 super
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
代码有精简,继续
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
}
}
});
在这里实例化了一个线程,并在 run 中调用 SingleThreadEventExecutor 的 run 方法,这个线程在哪里启动的呢,我们继续往下看
总结:
NioEventLoopGroup 实际就是 Reactor 线程池,负责调度和执行客户端的接入、网络读写事件的处理、用户自定义任务和定时任务的执行。
2.2 ServerBootstrap
ServerBootstrap 是服务端的启动辅助类,父类是 AbstractBootstrap ,与之相对应的客户端启动辅助类是 Bootstrap
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {
volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
}
2.2.1 设置booss和work线程池
将 bossGroup 传给父类,workGroup 赋值给 serverBootstrap 的 childGroup
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
2.2.2 设置NioServerSocketChannel处理连接请求
serverBootstrap.channel(NioServerSocketChannel.class)
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
继续跟 new BootstrapChannelFactory
private static final class BootstrapChannelFactory<T extends Channel> implements ChannelFactory<T> {
private final Class<? extends T> clazz;
BootstrapChannelFactory(Class<? extends T> clazz) {
this.clazz = clazz;
}
@Override
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
}
BootstrapChannelFactory 是一个继承了 ChannelFactory 的内部类,从名称上就能看出,这是一个 channel 工厂类,重写了父类的 newChannel 方法,通过反射创建 NioServerSocketChannel 实例,后面会告诉你是在哪里调用到的
2.2.3 设置channel通道块的值
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128)
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
这里的 option 方法是父类 AbstractBootstrap 的方法,options 是一个有序的非线程安全的双向链表,加锁添加
2.2.4 serverBootstrap.childOption
public <T> ServerBootstrap childOption(ChannelOption<T> childOption, T value) {
if (childOption == null) {
throw new NullPointerException("childOption");
}
if (value == null) {
synchronized (childOptions) {
childOptions.remove(childOption);
}
} else {
synchronized (childOptions) {
childOptions.put(childOption, value);
}
}
return this;
}
childOption 是子类 serverBootstrap 的方法
childOption 和 option 的区别:
option : 主要是设置 ServerChannel 的一些选项
childOption : 主要设置 ServerChannel 的子 channel 的选项,即 option
针对的是 boss 线程而 childOption 针对的是 work 线程池
2.2.5 设置服务端NioServerSocketChannel的Handler
serverBootstrap.handler(new InitHandler())
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
2.2.6 serverBootstrap.childHandler()
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
handler 和 childHandler 的区别
Handler 是属于服务端 NioServerSocketChannel ,只会创建一次 childHandler 是属于每一个新建的 NioSocketChannel ,每当有一个连接上来,都会调用
2.2.7 真正的启动过程是在这里执行,我们看看bind()方法
serverBootstrap.bind(8802).sync()
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
- (1) 通过端口号创建一个 InetSocketAddress ,继续 bind
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
- (2) validate() 方法进行一些参数验证,我们直接看 doBind()
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.executor = channel.eventLoop();
}
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
- (3.1) 先看 initAndRegister ( AbstractBootstrap 类 ),去掉了一些不重要的
final ChannelFuture initAndRegister() {
final Channel channel = channelFactory().newChannel();
init(channel);
ChannelFuture regFuture = group().register(channel);
return regFuture;
}
channelFactory 是 serverBootstrap.channel() 时创建的,在这里调用反射创建 NioServerSocketChannel 实例
- (3.2.1) 再看 init(channel) 方法( ServerBootstrap 类)
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
options() 是 serverBootstrap.option() 赋值的 AbstractBootstrap 类的 options 双向链表成员变量,在这里将 options 和 attrs 注入 channel 中
P.addLast() 为 NioServerSocketChannel 加入新的 handler (处理器),这里 pipeline 类似于 Servlet 的过滤器,管理所有 handler
-
(3.2.2) 再看 group().register() 方法
这里的 group 是 bossGroup(NioEventLoopGroup----▷MultithreadEventLoopGroup) ,多次跳转到 SingleThreadEventLoop 类的 register() 方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
if (channel == null) {
throw new NullPointerException("channel");
}
if (promise == null) {
throw new NullPointerException("promise");
}
channel.unsafe().register(this, promise);
return promise;
}
- (3.2.3) 清除一些不重要的代码,下面才是真正的注册
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
eventLoop.inEventLoop() 用来判断启动线程与当前线程是否相同,相同表示已经启动,不同则有两种可能:未启动或者线程不同。
- (3.2.4) 这里线程还未启动,走 eventLoop.execute() ,这个 execute() 方法是 SingleThreadEventExecutor 类的
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- (3.2.5) 启动线程
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
thread.start();
}
}
}
我们在最开始2.1里面 SingleThreadEventExecutor 构造方法内的 thread 就是在这里启动的,我们再回到2.1的
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
SingleThreadEventExecutor.this.run();
}
}
});
- (3.2.6) 打开 SingleThreadEventExecutor.this.run() ;
@Override
protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select(oldWakenUp);
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
processSelectedKeys();
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();
processSelectedKeys();
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
}
}
}
在这里异步执行,轮询 select 客户端的 accept ,并且 runAllTasks 所有的任务
-
(3.3) 我们再看 (3.1) 里面的
ChannelFuture regFuture = group().register(channel); 跳转到 SingleThreadEventLoop 的 register 方法
@Override
public ChannelFuture register(Channel channel) {
...
channel.unsafe().register(this, promise);
return promise;
}
以下是精简后的代码(位于 AbstractChannel 类的 AbstractUnsafe 内部类)
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
...
eventLoop.execute(new OneTimeTask() {
@Override
public void run() {
register0(promise);
}
});
...
}
private void register0(ChannelPromise promise) {
...
doRegister();
...
if (firstRegistration && isActive()) {
pipeline.fireChannelActive();
}
...
}
继续(位于 AbstractNioChannel 类)
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
...
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
...
}
}
将 NioServerSocketChannel 注册到 boss 线程池 NioEventLoop 的 Selector 上。
在这里应该注册 OP_ACCEPT(16) 到多路复用器上
注册0的原因:
(1)注册方法是多态的,它既可以被 NioServerSocketChannel 用来监听客户端的连接接入,也可以注册 SocketChannel 用来监听网络读或写操作
(2)通过 SelectionKey 的 interestOps(int ops) 方法可以方便地修改监听操作位
再看 pipeline.fireChannelActive()
@Override
public ChannelPipeline fireChannelActive() {
head.fireChannelActive();
if (channel.config().isAutoRead()) {
channel.read();
}
return this;
}
@Override
public Channel read() {
pipeline.read();
return this;
}
@Override
public ChannelPipeline read() {
tail.read();
return this;
}
@Override
public ChannelHandlerContext read() {
...
next.invokeRead();
...
}
private void invokeRead() {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
进到 HeadContext 的 read
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
@Override
public final void beginRead() {
...
doBeginRead();
...
}
@Override
protected void doBeginRead() throws Exception {
if (inputShutdown) {
return;
}
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
最终在这里将 selectionKey 的监听操作位改为 OP_READ
- (4) 再看 doBind0( ) 方法
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
将方法丢到 reactor 线程池任务队列中执行,会先判断注册是否成功,成功则继续执行bind方法
- (5) 执行 bind( ) 方法
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return tail.bind(localAddress, promise);
}
@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
...
next.invokeBind(localAddress, promise);
...
}
由于 bind 事件是出站事件,寻找出站的 handler ,执行 invokeBind( ) 方法
private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
@Override
public void bind(
ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
throws Exception {
unsafe.bind(localAddress, promise);
}
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
...
doBind(localAddress);
...
}
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
经过多层 bind 深入,最后在这里可以看到,还是会调用Java底层的nio进行 socket bind
自此,服务端启动流程解析完毕,我们总结一下
① 通过 ServerBootstrap 辅助启动类,配置了 reactor 线程池,服务端 Channel ,一些配置参数,客户端连接后的 handler
② 将 ServerBootstrap 的值初始化,并注册 OP_ACCEPT 到多路复用器
③ 启动 reactor 线程池,不断循环监听连接,处理任务
④ 绑定端口