作者: 一字马胡
转载标志 【2017-11-03】
更新日志
日期 | 更新内容 | 备注 |
---|---|---|
2017-11-03 | 添加转载标志 | 持续更新 |
NI/O C/S通信过程
下面分别展示了NI/O模式下的客户端/服务端编程模型:
Netty是一种基于NI/O的网络框架,网络层面的操作只是对NI/O提供的API的封装,所以,它的服务端流程和客户端流程是和NI/O的一致的,对于客户端而言,它要做的事情就是连接到服务端,然后发送/接收消息。对于服务端而言,它要bind一个端口,然后等待客户端的连接,接收客户端的连接使用的是Accept操作,一个服务端需要为多个客户端提供服务,而每次accept都会生成一个新的Channel,Channel是一个服务端和客户端之间数据传输的通道,可以向其write数据,或者从中read数据。本文将分析Netty框架的Accept流程。
Netty的Accept流程
Netty是一个较为复杂的网络框架,想要理解它的设计需要首先了解NI/O的相关知识,为了对Netty框架有一个大概的了解,你可以参考Netty线程模型及EventLoop详解,该文章详解解析了Netty中重要的事件循环处理流程,包含EventLoop的初始化和启动等相关内容。下面首先展示了Netty中的EventLoop的分配模型,Netty服务端会为每一个新建立的Channel分配一个EventLoop,并且这个EventLoop将服务于这个Channel得整个生命周期不会改变,而一个EventLoop可能会被分配给多个Channel,也就是一个EventLoop可能会服务于多个Channel的读写事件,这对于想要使用ThreadLocal的场景需要认真考虑。
在文章Netty线程模型及EventLoop详解中已经分析了EventLoop的流程,现在从事件循环的起点开始看起,也就是NioEventLoop的run方法,本文关心的是Netty的Accept事件,当在Channel上发生了事件之后,会执行processSelectedKeysPlain方法,看一下这个方法:
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
return;
}
Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (!i.hasNext()) {
break;
}
if (needsToSelectAgain) {
selectAgain();
selectedKeys = selector.selectedKeys();
// Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
break;
} else {
i = selectedKeys.iterator();
}
}
}
}
接着会执行processSelectedKey这个方法,下面是它的细节:
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
现在我们可以看到和NI/O一样的类似OP_READ和OP_ACCEPT之类的东西了,OP_ACCEPT表示的是有Accept事件发生了,需要我们处理,但是发现好像OP_READ 事件和OP_ACCEPT事件的处理都是通过一个read方法进行的,我们先来找到这个read方法:
-------------------------------------------------
AbstractNioMessageChannel.NioMessageUnsafe.read
-------------------------------------------------
public void read() {
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
}
}
}
本文中的所有代码都是已经处理过的,完整的代码参考源代码,本文为了控制篇幅去除了一些不影响阅读(影响逻辑)的代码,上面的read方法中有一个关键的方法doReadMessages,下面是它的实现:
--------------------------------------------
NioServerSocketChannel. doReadMessages
--------------------------------------------
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
这个方法就是处理accept类型的事件的,为了更好的理解上面的代码,下面展示一段在NI/O中服务端的代码:
int port = 8676;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking (false);
ServerSocket serverSocket = serverChannel.socket();
serverSocket.bind (new InetSocketAddres(port));
Selector selector = Selector.open();
serverChannel.register (selector, SelectionKey.OP_ACCEPT);
while (true) {
int n = selector.select();
Iterator it = selector.selectedKeys().iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey) it.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel = server.accept();
channel.configureBlocking (false);
channel.register (selector, SelectionKey.OP_READ);
}
if (key.isReadable( )) {
processReadEvent(key);
}
it.remove( );
}
}
可以看到,服务端accept一次就会产生一个新的Channel,Netty也是,每次Accept都会new一个新的NioSocketChannel,当然,这个Channel需要分配一个EventLoop给他才能开始事件循环,但是Netty服务端的Accept事件到此应该可以清楚流程了,下面分析这个新的Channel是怎么开始事件循环的。继续看AbstractNioMessageChannel.NioMessageUnsafe.read这个方法,其中有一个句话:
pipeline.fireChannelRead(readBuf.get(i));
现在来跟踪一下这个方法的调用链:
-> ChannelPipeline.fireChannelRead
-> DefaultChannelPipeline.fireChannelRead
-> AbstractChannelHandlerContext.invokeChannelRead
-> ChannelInboundHandler.channelRead
->ServerBootstrapAcceptor.channelRead
上面列出的是主要的调用链路,只是为了分析Accept的过程,到ServerBootstrapAcceptor.channelRead这个方法就可以看到是怎么分配EventLoop给Channel的了,下面展示了ServerBootstrapAcceptor.channelRead这个方法的细节:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler); 【1】
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); 【2】
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() { 【3】
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- 【1】首先将服务端的处理逻辑赋值给新建立的这个Channel得pipeline,使得这个新建立的Channel可以得到服务端提供的服务
- 【2】属性赋值
- 【3】将这个新建立的Channel添加到EventLoopGroup中去,这里进行EventLoop的分配
下面来仔细看一下【3】这个register方法:
========================================
MultithreadEventLoopGroup.register
========================================
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
========================================
SingleThreadEventLoop.register
========================================
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
========================================
AbstractUnsafe.register
========================================
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
上面的流程展示了这个新的Channel是怎么获得一个EventLoop的,而EventLoopGroup分配EventLoop的关键在于MultithreadEventLoopGroup.register这个方法中的next方法,而这部分的分析已经在Netty线程模型及EventLoop详解中做过了,不再赘述。当一个新的连接被服务端Accept之后,会创建一个新的Channel来维持服务端与客户端之间的通信,而每个新建立的Channel都会被分配一个EventLoop来实现事件循环,本文分析了Netty服务端Accept的详细过程,至此,对于Netty的EventLoop、EventLoopGroup以及EventLoop是如何被运行起来的,以及服务端是如何Accept新的连接的这些问题应该都已经有答案了。