原文博客:Doi技术团队
链接地址:https://blog.doiduoyi.com
初心:记录优秀的Doi技术团队学习经历
第一章节是主要是服务器启动的代码分析。章节目录有:
|---------1.1初始化NioEventLoopGroup
|---------1.2初始化NioEventLoop
|---------1.3初始化NioServerSocketChannel
|---------1.4服务器启动流程
为什么先从初始化开始了解服务器启动?
因为在我看服务器启动的相关源码的时候,有很多地方都是初始化的时候已经建立好的。所以我就从初始化的源码开始看起。这是我第一次看源码的笔记,仍有很多理解错误的地方和不解的地方。欢迎讨论。
本篇目录
- 启动服务器代码
- 代码分析
启动服务器代码
- 1 创建NioEventLoopGroup对象。上文已经介绍了NioEventLoopGroup的初始化已经内部线程NioEventLoop的初始化话。
- 2 创建ServerBootstrap对象,该类初始化主要是创建了几个
LinkedHashMap
来存储设置。例如childOptions
或者childAttrs
public void bind() {
//1
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup work = new NioEventLoopGroup();
try {
// 引导绑定和启动服务器
//2
ServerBootstrap b = new ServerBootstrap();
//3
b.group(boss, work);
// 创建NioEventLoopGroup对象来处理事件,如接受新连接、接收数据、写数据等等
//4
b.channel(NioServerSocketChannel.class);
// 设置childHandler执行所有的连接请求
//5
b.childHandler(new ChildChannelHandler());
//6
b.option(ChannelOption.SO_BACKLOG, 100);
//绑定端口
//7
ChannelFuture future = b.bind(8080).sync();
//8
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//9
boss.shutdownGracefully();
work.shutdownGracefully();
}
}
3 Group
赋值两个线程池。
1 首先第一个boss线程池是赋值到父类AbstractBootstrap
的group变量中。
2 work线程池就赋值在ServerBootstrap的childGroup变量中。
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
//1
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
4 channel()
- 首先
ReflectiveChannelFactory
类是一个工厂类,里面只有一个方法newChannel()
用来将传入的class进行无参构造生成对象的。 - 然后channelFactory()是将ReflectiveChannelFactory赋予ServerBootstrap的父类的channelFactory参数
此时channel 还没被创建,直到bind()方法中调用
initAndRegister()
方法的时候才会用该工厂类生成一个channel。下文会讲。
public B channel(Class<? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
5 childHandler()
在调用这个childHandler()
这个方法的时候,你需要自己写一个方法继承ChannelInitializer类
。而ChannelInitializer
类也是继承ChannelInboundHandlerAdapter
的。所以childHandler()
方法的参数就是我们自己写的类。然后赋值到childHandler参数。
public ServerBootstrap childHandler(ChannelHandler childHandler) {
if (childHandler == null) {
throw new NullPointerException("childHandler");
}
this.childHandler = childHandler;
return this;
}
6 Option()
将一些配置存储到options变量中,该变量是一个LinkedHashMap
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
7 bind()
之前的一些方法都只是对ServerBootstrap的配置,说白了就是用来set参数的。
bind()则是开始启动服务器了。
- 1 进行对
group
和channelFactory
两个参数进行非空验证
public ChannelFuture bind(SocketAddress localAddress) {
validate(); //1
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
doBind()
- 1 创建一个channel 并且初始化和注册。
关键部分。代码分析看下面
- 2 判断channel是否注册成功。如果已经注册成功那么就进行
doBind0()
方法。如果还没成功,那就添加一个监听器,等返回成功的时候就进行doBind0()
方法。解析看下文。
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //1
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//2
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind0()
- 1 该方法是执行在channel已经与selector注册后的。给线程添加一个任务。该任务是绑定端口的。
到这个方法。服务器启动就已经完成了
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
//1
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
下文都是对initAndRegister()
方法的代码解析
initAndRegister()
- 1 利用工厂创建一个channel对象。初始化内容看上一篇文章
初始化NioServerSocketChannel
- 2 对channel进行一个初始化,看下文。
- 3 对channel进行注册,看下文
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//1
channel = channelFactory.newChannel();
//2
init(channel);
} catch (Throwable t) {
}
//3
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
init()
- 1 对channel 的config变量里添加options属性
- 2 对channel 的config变量里添加attrs属性
- 3 获取channel 对象的pipeline管道,然后在管道里面添加一个handler,该handler作用有:添加bootstrap里的handler。对channel 添加一个任务。
- 4 添加bootstrap里的handler。对channel 添加一个任务
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
//1
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//2
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
//3
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//4
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
config().group().register(channel)
-
config().group()
是获取到线程池,该线程池是ServerBootstrap
的父类AbstractBootstrap
存储的boss线程池。是不是恍然大悟,这就是把channel放到boss线程池里的一个线程里面去执行任务啊。 - 1 是
MultithreadEventLoopGroup
类的方法 也就是NioEventLoopGroup
的父类。 该方法就是从线程池里获取一个EventLoop.然后执行EventLoop里的register()方法。
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel); //1
}
继续看下去。
- 2 是
SingleThreadEventLoop
里的方法。``DefaultChannelPromise()` 方法是给channel 与该线程 添加一个监听器。
@Override
public ChannelFuture register(Channel channel) {
//2
return register(new DefaultChannelPromise(channel, this));
}
继续往下看
- 3 继续是
SingleThreadEventLoop
里的方法。promise是监听器对象,promise.channel()
获取到channel。unsafe()
方法是实现底层的register,read或者write操作
@Override
public ChannelFuture register(final ChannelPromise promise) {
//3
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
继续往下分析,该register()方法是AbstractChannel
类的。也就是初始化NioServerSocketChannel
的时候,建立pipeline和unsafe的类。
- 4 进行参数验证
- 5 将该eventLoop线程赋值于channel参数。
- 6 eventLoop.inEventLoop() 判断,如果现在的线程是EventLoop()的 线程,那么执行任务,如果不是那么就用执行器执行任务。在这里debug,会返回false,会调用execute()方法。
下面继续探讨 - 7 下面探讨下
register0()
方法
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//4
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
//5
AbstractChannel.this.eventLoop = eventLoop;
//6
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
//7
register0(promise);
}
});
} catch (Throwable t) {
}
}
}
首先在研究register0()之前有一个小知识。
在版本4.1.x 的时候,在初始化
eventLoop
的时候,还没有创建线程,而是保存了Executor
这个变量。这个变量在4.0.x版本的时候是没有的。那么4.1.x版本在什么时候创建线程呢?在eventLoop调用execute()方法的时候创建线程。下文可以看到
- 1 继续判断是否现在的线程是EventLoop对象的线程,肯定返回false,因为EventLoop线程里的thread变量是
null
嘛。
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
- 2 在
startThread()
方法中,才创建一个线程。 - 3 在任务队列里添加一个任务。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
//1
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//2
startThread();
//3
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
register0(ChannelPromise promise)
- 1 判断EventLoop线程是否还存活。
- 2 这个是记录是否注册过的。
neverRegistered
默认是true; - 3 这个是
核心代码
,到这里才进行将channel与selector注册在一起。
private void register0(ChannelPromise promise) {
try {
//1
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
//2
boolean firstRegistration = neverRegistered;
doRegister();//3
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
}
}