工作时候,有用过Netty写过网络库。最近想研究下RPC框架,就想着写几篇博客,梳理下Netty的源码。(研究的源码版本是4.1.x)
最简单的示例
首先还是拿Netty 官网的UserGuide做例子
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
Netty的源码还是很复杂的,所以研究时候有种无处下手的感觉。但是Netty作为一个NIO的网络框架,其底层肯定是使用了java jdk 的nio编程。所以我们可以通过对比java nio编程 与 Netty源码,来了解Netty究竟对java 的nio做了什么样的封装。
一个java nio编程的例子
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
serverSocketChannel.configureBlocking(false);
while(true){
SocketChannel socketChannel =
serverSocketChannel.accept();
if(socketChannel != null){
//do something with socketChannel...
}
}
例子是网上随便粘的,当然简化了很多很多东西,这篇博客只会分析服务端channel的创建,所以这些暂时也够了。
总结下:
1)创建ServerSocketChannel
2)绑定某个端口
3)设置非阻塞
4)监听新的连接
后面分析源码也会试着找下Netty对应的源码在哪
创建一个NioServerSocketChannel
具体的调用流程如下图
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//省略....
}
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
}
//省略...
}
ReflectiveChannelFactory
public T newChannel() {
try {
return clazz.newInstance(); //这里的clazz就是当时从ServerBootstrap.channel方法里传进去的NioServerSocketChannel
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class " + clazz, t);
}
}
代码流程还是很清晰的,总结下就是 Netty会在ServerBootstrap里,通过工厂创建一个ServerChannel
在本例中,工厂是默认的 ReflectiveChannelFactory(通过反射创建Channel),创造出的Channel类型是NioServerSocketChannel
NioServerSocketChannel 的初始化
分析完了NioServerSocketChannel接下来看看NioServerSocketChannel究竟在初始化时候做了啥。
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
/**
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
*
* See <a href="https://github.com/netty/netty/issues/2308">#2308</a>.
*/
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);
}
}
1、创建一个对应的jdk channel
通过 SelectorProvider.openServerSocketChannel() 创建了一个ServerSocketChannel 这个ServerSocketChannel就是 上文,nio编程的例子里的jdk channel
2、AbstractNioChannel构造方法
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
1)NioServerSocketChannel最终会调用父类AbstractNioChannel的构造函数
2)绑定一个 NioServerSocketChannelConfig
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
1)保存所创建的jdk channel
2)配置configureBlocking 为false
调用父类 AbstractChannel的构造方法
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId(); //绑定一个id
unsafe = newUnsafe(); //绑定一个NioSocketChannelUnsafe
pipeline = newChannelPipeline(); //绑定一个DefaultChannelPipeline
}
这一步非常重要:
1)绑定一个 id
2)绑定一个 NioSocketChannelUnsafe
3)绑定一个DefaultChannelPipeline
这几个组件,在Netty中起着非常重要的作用,后面博客里讲到了会详细分析下
总结下,ServerSocketChannel做了这样几件事
1)创建一个jdk channel,并记录下来
2)配置jdk channel 的 configureBlocking 为false
3)绑定一个NioServerSocketChannelConfig
4)绑定id、NioSocketChannelUnsafe、DefaultChannelPipeline 三个组件
初始化ServerChannel
再回到ServerBootStrap里,在创建了ServerChannel(即我们这的NioServerSocketChannel)后,下面一步,是对其进行初始化
@Override
void init(Channel channel) throws Exception {
//获取我们在client代码里配置的channel
final Map<ChannelOption<?>, Object> options = options0();
//最终调用 channel.config().setOption((ChannelOption<Object>) option, value) 即把配置设置到ChannelConfig里
synchronized (options) {
setChannelOptions(channel, options, logger);
}
//获取client里配置的attrs
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
//设置childOptions和childAttrs,最终传到 ServerBootstrapAcceptor里
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler(); //client里设置的handler被传进来了
if (handler != null) {
pipeline.addLast(handler);
}
// We add this handler via the EventLoop as the user may have used a ChannelInitializer as handler.
// In this case the initChannel(...) method will only be called after this method returns. Because
// of this we need to ensure we add our handler in a delayed fashion so all the users handler are
// placed in front of the ServerBootstrapAcceptor.
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
//currentChildGroup、currentChildHandler、currentChildOptions、currentChildAttrs
// 对应childGroup 我们配置的childHandler 配置的childOptions 配置的childAttrs
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
流程还挺长的,总结下:
1)把client端设置的option缓存到NioServerSocketChannelConfig里
2)把client端设置的attr设置到channel里
3)保存配置的childOptions,配置的childAttrs 到ServerBootstrapAcceptor里
4)往NioSocketChannel的pipeline中添加一个ServerBootstrapAcceptor
注册 ServerChannel
ServerChannel的注册流程比较长,下面一篇博客再单独介绍吧。
总结
这篇博客分析了下Netty的服务端启动流程,总结下:
1)Netty会在ServerBootstrap中创建一个ServerChannel
2)ServerChannel底层会绑定一个jdk 的channel
3)ServerChannel创建时,会绑定一个id、NioSocketChannelUnsafe、DefaultChannelPipeline、NioServerSocketChannelConfig
4)初始化ServerChannel时,会把我们自定义配置的option、attr设置进去
5)最终会往 NioSocketChannel 的pipeline里添加一个 ServerBootstrapAcceptor
6)注册ServerChannel(下面一篇博客介绍)