JDK 原生 NIO 程序的问题
JDK 原生也有一套网络应用程序 API,但是存在一系列问题,主要如下:
1、NIO 的类库和 API 繁杂,使用麻烦。你需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。
2、需要具备其他的额外技能做铺垫。例如熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的 NIO 程序。
3、可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等。
4、NIO 编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大。
5、JDK NIO 的 Bug。例如臭名昭著的 Epoll Bug,它会导致 Selector 空轮询,最终导致 CPU 100%。
Netty 的特点
Netty 对 JDK 自带的 NIO 的 API 进行封装,解决上述问题,主要特点有:
- 1、设计优雅,适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池;真正的无连接数据报套接字支持(自 3.1 起)。
- 2、使用方便,详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。
- 3、高性能,吞吐量更高,延迟更低;减少资源消耗;最小化不必要的内存复制。
- 4、安全,完整的 SSL/TLS 和 StartTLS 支持。
- 5、社区活跃,不断更新,社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入。
Netty 常见使用场景
Netty 常见的使用场景如下:
1、互联网行业。在分布式系统中,各个节点之间需要远程服务调用,高性能的 RPC 框架必不可少,Netty 作为异步高性能的通信框架,往往作为基础通信组件被这些 RPC 框架使用。
典型的应用有:阿里分布式服务框架 Dubbo 的 RPC 框架使用 Dubbo 协议进行节点间通信,Dubbo 协议默认使用 Netty 作为基础通信组件,用于实现各进程节点之间的内部通信。2、游戏行业。无论是手游服务端还是大型的网络游戏,Java 语言得到了越来越广泛的应用。Netty 作为高性能的基础通信组件,它本身提供了 TCP/UDP 和 HTTP 协议栈。
非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过 Netty 进行高性能的通信。3、大数据领域。经典的 Hadoop 的高性能通信和序列化组件 Avro 的 RPC 框架,默认采用 Netty 进行跨界点通信,它的 Netty Service 基于 Netty 框架二次封装实现。
Netty 线程模型
Netty 主要基于主从 Reactors 多线程模型(如下图)做了一定的修改,其中主从 Reactor 多线程模型有多个 Reactor:
- 1、MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor。
- 2、SubReactor 负责相应通道的 IO 读写请求。
- 3、非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
这里引用 Doug Lee 大神的 Reactor 介绍:Scalable IO in Java 里面关于主从 Reactor 多线程模型的图:
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
上面代码中的 bossGroup 和 workerGroup 是 Bootstrap 构造方法中传入的两个对象,这两个 group 均是线程池:
- bossGroup 线程池则只是在 Bind 某个端口后,获得其中一个线程作为 MainReactor,专门处理端口的 Accept 事件,每个端口对应一个 Boss 线程。
- workerGroup 线程池会被各个 SubReactor 和 Worker 线程充分利用。
异步处理
异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
- 1、通过 isDone 方法来判断当前操作是否完成。
- 2、通过 isSuccess 方法来判断已完成的当前操作是否成功。
- 3、通过 getCause 方法来获取已完成的当前操作失败的原因。
- 4、通过 isCancelled 方法来判断已完成的当前操作是否被取消。
- 5、通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
例如下面的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。
serverBootstrap.bind(port).addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
} else {
System.err.println("端口[" + port + "]绑定失败!");
}
});
相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
Netty 架构设计
前面介绍完 Netty 相关一些理论,下面从功能特性、模块组件、运作过程来介绍 Netty 的架构设计。
功能特性
Netty 功能特性如下:
- 1、传输服务,支持 BIO 和 NIO。
- 2、容器集成,支持 OSGI、JBossMC、Spring、Guice 容器。
- 3、协议支持,HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议。
- 4、Core 核心,可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象。
模块组件
Bootstrap、ServerBootstrap
Bootstrap 意思是引导,一个 Netty 应用通常由一个 Bootstrap 开始,主要作用是配置整个 Netty 程序,串联各个组件,Netty 中 Bootstrap 类是客户端程序的启动引导类,ServerBootstrap 是服务端启动引导类。
Future、ChannelFuture
正如前面介绍,在 Netty 中所有的 IO 操作都是异步的,不能立刻得知消息是否被正确处理。
但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过 Future 和 ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。
Channel
Netty 网络通信的组件,能够用于执行网络 I/O 操作。Channel 为用户提供:
- 当前网络连接的通道的状态(例如是否打开?是否已连接?)
- 网络连接的配置参数 (例如接收缓冲区大小)
- 提供异步的网络 I/O 操作(如建立连接,读写,绑定端口),异步调用意味着任何 I/O 调用都将立即返回,并且不保证在调用结束时所请求的 I/O 操作已完成。
调用立即返回一个 ChannelFuture 实例,通过注册监听器到 ChannelFuture 上,可以 I/O 操作成功、失败或取消时回调通知调用方。
- 支持关联 I/O 操作与对应的处理程序。
不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应。下面是一些常用的 Channel 类型:
- NioSocketChannel,异步的客户端 TCP Socket 连接。
- NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
- NioDatagramChannel,异步的 UDP 连接。
- NioSctpChannel,异步的客户端 Sctp 连接。
- NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
Selector
Netty 基于 Selector 对象实现 I/O 多路复用,通过 Selector 一个线程可以监听多个连接的 Channel 事件。
当向一个 Selector 中注册 Channel 后,Selector 内部的机制就可以自动不断地查询(Select) 这些注册的 Channel 是否有已就绪的 I/O 事件(例如可读,可写,网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。
NioEventLoop
NioEventLoop 中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用 NioEventLoop 的 run 方法,执行 I/O 任务和非 I/O 任务:
I/O 任务,即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法触发。
非 IO 任务,添加到 taskQueue 中的任务,如 register0、bind0 等任务,由 runAllTasks 方法触发。
两种任务的执行时间比由变量 ioRatio 控制,默认为 50,则表示允许非 IO 任务执行的时间与 IO 任务的执行时间相等。
NioEventLoopGroup
NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个 Channel 上的事件,而一个 Channel 只对应于一个线程。
ChannelHandler
ChannelHandler 是一个接口,处理 I/O 事件或拦截 I/O 操作,并将其转发到其 ChannelPipeline(业务处理链)中的下一个处理程序。
ChannelHandler 本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:
- ChannelInboundHandler 用于处理入站 I/O 事件。
- ChannelOutboundHandler 用于处理出站 I/O 操作。
或者使用以下适配器类:
- ChannelInboundHandlerAdapter 用于处理入站 I/O 事件。
- ChannelOutboundHandlerAdapter 用于处理出站 I/O 操作。
- ChannelDuplexHandler 用于处理入站和出站事件。
ChannelHandlerContext
保存 Channel 相关的所有上下文信息,同时关联一个 ChannelHandler 对象。
ChannelPipline
保存 ChannelHandler 的 List,用于处理或拦截 Channel 的入站事件和出站操作。
ChannelPipeline 实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及 Channel 中各个的 ChannelHandler 如何相互交互。
下图引用 Netty 的 Javadoc 4.1 中 ChannelPipeline 的说明,描述了 ChannelPipeline 中 ChannelHandler 通常如何处理 I/O 事件。
I/O 事件由 ChannelInboundHandler 或 ChannelOutboundHandler 处理,并通过调用 ChannelHandlerContext 中定义的事件传播方法。
例如 ChannelHandlerContext.fireChannelRead(Object)和 ChannelOutboundInvoker.write(Object)转发到其最近的处理程序。
入站事件由自下而上方向的入站处理程序处理,如图左侧所示。入站 Handler 处理程序通常处理由图底部的 I/O 线程生成的入站数据。
通常通过实际输入操作(例如 SocketChannel.read(ByteBuffer))从远程读取入站数据。
出站事件由上下方向处理,如图右侧所示。出站 Handler 处理程序通常会生成或转换出站传输,例如 write 请求。
I/O 线程通常执行实际的输出操作,例如 SocketChannel.write(ByteBuffer)。
在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应,它们的组成关系如下:
一个 Channel 包含了一个 ChannelPipeline,而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表,并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。
入站事件和出站事件在一个双向链表中,入站事件会从链表 head 往后传递到最后一个入站的 handler,出站事件会从链表 tail 往前传递到最前一个出站的 handler,两种类型的 handler 互不干扰。
Netty Reactor 工作架构图
1、Server 端包含 1 个 Boss NioEventLoopGroup 和 1 个 Worker NioEventLoopGroup。
2、NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。
-
3、每个 Boss NioEventLoop 循环执行的任务包含 3 步:
- 1)轮询 Accept 事件。
- 2)处理 Accept I/O 事件,与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上。
- 3)处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务。
-
4、每个 Worker NioEventLoop 循环执行的任务包含 3 步:
- 1)轮询 Read、Write 事件。
- 2)处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理。
- 3)处理任务队列中的任务,runAllTasks。
5、每个Worker NIOEventLoop 处理业务时,会使用pipeline(管道), pipeline 中包含了 channel , 即通过pipeline 可以获取到对应通道, 管道中维护了很多的 处理器
其中任务队列中的 Task 有 3 种典型使用场景。
- 1、用户程序自定义的普通任务
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件(读取客户端发送的消息)
* @param ctx 上下文对象,可以获取管道pipeline,通道channel,地址
* @param msg 客户端发送的数据,默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果这里有一个非常耗时长的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的TaskQueue中
//解决方案1:用户自定义的普通任务
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(10 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵2~", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
- 2、用户自定义定时任务
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 读取数据事件(读取客户端发送的消息)
* @param ctx 上下文对象,可以获取管道pipeline,通道channel,地址
* @param msg 客户端发送的数据,默认Object
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果这里有一个非常耗时长的业务 -> 异步执行 -> 提交该channel对应的NioEventLoop的TaskQueue中
//解决方案2:用户自定义的定时任务 -> 该任务提交到scheduleTaskQueue中
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(5 * 1000);
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~喵3~", CharsetUtil.UTF_8));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},5, TimeUnit.SECONDS);
}
}
- 3、非当前 Reactor 线程调用 Channel 的各种方法
例如在推送系统的业务线程里面,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中后被异步消费。
方案再说明
1、Netty 抽象出两组线程池,BossGroup 专门负责接收客户端连接,WorkerGroup 专门负责网络读写操作。
2、NioEventLoop 表示一个不断循环执行处理任务的线程,每个 NioEventLoop 都有一个 selector,用于监听绑定在其上的 socket 网络通道。
-
3、NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责。
- 1、NioEventLoopGroup 下包含多个 NioEventLoop
- 2、每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue。
- 3、每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel。
- 4、每个 NioChannel 只会绑定在唯一的 NioEventLoop 上。
- 5、每个 NioChannel 都绑定有一个自己的 ChannelPipeline。