20220119_netty服务端ServerBootstrap如何接受连接并后续处理读写事件学习笔记
1概述
非阻塞NIO,异步Future,高性能(单线程无锁),零拷贝(ByteBuf),Netty是基于NIO,它封装了JDK的NIO。可通过ChannelHandler对通讯框架进行灵活扩展。
本节主要学习一下netty服务端如何接受连接并后续处理读写事件
1.1父事件循环线程组的启动时序(略)
1.2接收连接后续处理时序
1.2.1父事件循环线程组
我们知道,服务端绑定成功后,会创建一个NioServerSocketChannel(即连接监听类型的通道),同时启动一个NioEventLoop线程,不断循环遍历selector中的有效的就绪事件OP_ACCEPT,可以接收成千上万个socket传输类型的通道。
另外,处理IO读写的通道(数据传输类型的通道)
1.2.1.1processSelectedKeys获取有效的事件
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\nio\NioEventLoop.java
private SelectedSelectionKeySet selectedKeys;
@Override
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\nio\NioEventLoop.java
@Override
protected void run() {
}
private void processSelectedKeys() {
if (selectedKeys != null) { // 就绪事件到来
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
// null out entry in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.keys[i] = null;
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a); // 服务端的NioSocketChannel,程序跑到这
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
public static final int OP_ACCEPT = 1 << 4;
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); // 可访问底层 SelectableChannel
......
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// readyOps==OP_ACCEPT
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\nio\AbstractNioMessageChannel.java
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i)); // 触发父线程组通道流水线Dispatch分发fireChannelRead
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
}
1.2.1.2父通道流水线最后一个内置nettyHandler.ServerBootstrapAcceptor
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
[图片上传失败...(image-dcf362-1654176742543)]
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\nio\AbstractNioMessageChannel.java
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\socket\nio\NioServerSocketChannel.java
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
// Accepts a connection made to this channel's socket
// class sun.nio.ch.SocketChannelImpl
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\nio\AbstractNioMessageChannel.java
private final class NioMessageUnsafe extends AbstractNioUnsafe {
private final List<Object> readBuf = new ArrayList<Object>();
@Override
public void read() {
// 回调 channelRead
pipeline.fireChannelRead(readBuf.get(i));
1.2.2是如何转到子事件循环组的呢
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\DefaultChannelPipeline.java
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\AbstractChannelHandlerContext.java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\DefaultChannelPipeline.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.fireChannelRead(msg);
}
// netty-all-4.1.67.Final-sources.jar!\io\netty\channel\AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(MASK_CHANNEL_READ), msg);
return this;
}
// 第二次:
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
1.2.2.1ServerBootstrapAcceptor.channelRead
里面内置 子事件循环组,private final EventLoopGroup childGroup;
// netty-all-4.1.67.Final-sources.jar!\io\netty\bootstrap\ServerBootstrap.java
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 刚刚创建的数据传输子通道:
// class io.netty.channel.socket.nio.NioSocketChannel
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
setAttributes(child, childAttrs);
try {
//
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
// 连接失败,则强制关闭通道
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: {}", child, t);
}
[图片上传失败...(image-3d1913-1654176742544)]
1.2.2.2childGroup.register
// E:\workdirectory\mavenRep\LocalMavenRep\io\netty\netty-all\4.1.67.Final\netty-all-4.1.67.Final-sources.jar!\io\netty\channel\MultithreadEventLoopGroup.java
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
// E:\workdirectory\mavenRep\LocalMavenRep\io\netty\netty-all\4.1.67.Final\netty-all-4.1.67.Final-sources.jar!\io\netty\channel\SingleThreadEventLoop.java
@Override
public ChannelFuture register(Channel channel) {
// 封装成 DefaultChannelPromise
return register(new DefaultChannelPromise(channel, this));
}
// E:\workdirectory\mavenRep\LocalMavenRep\io\netty\netty-all\4.1.67.Final\netty-all-4.1.67.Final-sources.jar!\io\netty\channel\AbstractChannel.java
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() { // 这里,是由ServerBootstrapAcceptor线程推送,所以是不同线程
@Override
public void run() {
register0(promise); // 注册通道的读写事件 OP_READ、OR_WRITE
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
// 这里的pipeLine就是子线程组中的NioSocektChannel通道流水线
pipeline.invokeHandlerAddedIfNeeded();
注意:ChannelInitializer本身也是一个入站处理器。
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
// E:\workdirectory\mavenRep\LocalMavenRep\io\netty\netty-all\4.1.67.Final\netty-all-4.1.67.Final-sources.jar!\io\netty\channel\ChannelInitializer.java
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.add(ctx)) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) { // finally,移除 ChannelInitializer 自己 add by kikop
pipeline.remove(this);
}
}
return true;
}
return false;
}
public void initChannel(SocketChannel ch) {
// 此时的通道流水线只有 headHandler、tailHandler
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
// 我们再次增加自己的业务handler
p.addLast(new DiscardServerHandler());
}
1.2.2.3子通道移除前后流水线对比
移除前:
[图片上传失败...(image-3ec2e5-1654176742544)]
移除后:
[图片上传失败...(image-ef4441-1654176742544)]
2代码示例
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Discards any incoming data.
*/
public final class DiscardServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8009"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
p.addLast(new DiscardServerHandler());
}
});
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}