Netty权威指南2-读书笔记
UNIX网络编程5种I/O模型
I/O 复用模型(最大的优势是多路复用)
Linux提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select操作上,这样select/poll可以帮我们侦测多个fd是否处于就绪状态。select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,因此它的使用受到了一些制约。Linux还提供了一个epoll系统调用,epoll使用基于事件驱动方式代替顺序扫描,因此性能更高。当有fd就绪时,立即回调函数rollbackI/O 多路复用技术
I/O 多路复用技术通过把多个I/O 的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求
-
目前支持I/O多路复用的系统调用有select、pselect、poll、epoll,在 Linux网络编程过程中,很长一段时间都使用select做轮询和网络事件通知,然而select的些固有缺陷导致了它的应用受到了很大的限制,最终Linux不得不在新的内核版本中寻找select的替代方案,最终选择了epoll
- 支持一个进程打开的socket描述符(FD ) 不受限制(仅受限于操作系统的最大文
件句柄数)- select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设
置,默认值是1024,选择修改这个宏需要重新编译内核且网络效率会下降 - cat /proc/sys/fs/file- max
- select最大的缺陷就是单个进程所打开的FD是有一定限制的,它由FD_SETSIZE设
- I/O 效率不会随着FD数目的增加而线性下降
- 由于网络延时或者链路空闲,任一时刻只有少部分的socket是 “活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。epoll不存在这个问题,它只会对“活跃”的socket进行操作
- 使用mmap加速内核与用户空间的消息传递
- 无论是select、poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必
要的内存复制就显得非常重要,epoll是通过内核和用户空间mmap同一块内存来实现的 - mmap-map files or devices into memory
- 无论是select、poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必
- epoll的API更加简单
- 支持一个进程打开的socket描述符(FD ) 不受限制(仅受限于操作系统的最大文
用来克服select/poll缺点的方法不只有epoll, epoll只是一种Linux的实现方案。在 freeBSD下有kqueue
-
从5种I/O模型来看,其实都涉及到两个阶段
- 等待数据准备就绪
- 数据从内核复制到用户空间
- 对于阻塞io,调用recvfrom,阻塞直到第二个阶段完成或者错误才返回
- 对于非阻塞io,调用recvfrom,如果缓冲区没有数据则直接返回错误,一般都对非阻塞I/O 模型进行轮询检査这个状态,看内核是不是有数据到来;数据准备后,第二个阶段也是阻塞的
- 对于I/O复用模型,第一个阶段进程阻塞在select调用,等待1个或多个套接字(多路)变为可读,而第二个阶段是阻塞的
- 这里进程是被select阻塞但不是被socket io阻塞
- java nio实现
- 是否阻塞configureBlocking(boolean block)
- selector事件到来时(只是判断是否可读/可写)->具体的读写还是由阻塞和非阻塞决定->如阻塞模式下,如果输入流不足r字节则进入阻塞状态,而非阻塞模式下则奉行能读到多少就读到多少的原则->立即返回->
- 同理写也是一样->selector只是通知可写->但是能写多少数据也是有阻塞和非阻塞决定->如阻塞模式->如果底层网络的输出缓冲区不能容纳r个字节则会进入阻塞状态->而非阻塞模式下->奉行能输出多少就输出多少的原则->立即返回
- 对于accept->阻塞模式->没有client连接时,线程会一直阻塞下去->而非阻塞时->没有客户端连接->方法立刻返回null->
- 对于信号驱动I/O模型,应用进程建立SIGIO信号处理程序后立即返回,非阻塞,数据准备就绪时,生成SIGIO信号并通过信号回调应用程序通过recvfrom来读取数据,第二个阶段也是阻塞的
- 而对于异步I/O模型来说,第二个阶段的时候内核已经通知我们数据复制完成了
-
Java NIO的核心类库多路复用器Selector就是基于epoll的多路复用技术实现
- Enhancements in JDK 6 Release
- A new java.nio.channels.SelectorProvider implementation that is based on the Linux epoll event notification facility is included. The epoll facility is available in the Linux 2.6, and newer, kernels. The new epoll-based SelectorProvider implementation is more scalable than the traditional poll-based SelectorProvider implementation when there are thousands of SelectableChannels registered with a Selector. The new SelectorProvider implementation will be used by default when the 2.6 kernel is detected. The poll-based SelectorProvider will be used when a pre-2.6 kernel is detected.
- 即JDK6版本中默认的SelectorProvider即为epoll(Linux 2.6 kernal)
- macosx-sun.nio.ch.KQueueSelectorProvider
- solaris-sun.nio.ch.DevPollSelectorProvider
- linux
- 2.6以上版本-sun.nio.ch.EPollSelectorProvider
- 以下版本-sun.nio.ch.PollSelectorProvider
- windows-sun.nio.ch.WindowsSelectorProvider
- Oracle jdk会自动选择合适的Selector,如果想设置特定的Selector
- -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
- Enhancements in JDK 6 Release
-
Netty Native transports
- Since 4.0.16, Netty provides the native socket transport for Linux using JNI. This transport has higher performance and produces less garbage
- Netty's epoll transport uses epoll edge-triggered while java's nio library uses level-triggered. Beside this the epoll transport expose configuration options that are not present with java's nio like TCP_CORK, SO_REUSEADDR and more.
- 即Netty的Linux原生传输层使用了epoll边缘触发
- 而jdk的nio类库使用的是epoll水平触发
- epoll ET(Edge Triggered) vs LT(Level Triggered)
- 简单来说就是当边缘触发时,只有 fd 变成可读或可写的那一瞬间才会返回事件。当水平触发时,只要 fd 可读或可写,一直都会返回事件
- 简单地说,如果你有数据过来了,不去取LT会一直骚扰你,提醒你去取,而ET就告诉你一次,爱取不取,除非有新数据到来,否则不再提醒
- Nginx大部分event采用epoll EPOLLET(边沿触发)的方法来触发事件,只有listen端口的读事件是EPOLLLT(水平触发).对于边沿触发,如果出现了可读事件,必须及时处理,否则可能会出现读事件不再触发,连接饿死的情况
-
Java7 NIO2
- implemented using IOCP on Windows
- WindowsAsynchronousSocketChannelImpl implements Iocp.OverlappedChannel
- 即nio2在windows的底层实现是iocp
- Linux using epoll
- UnixAsynchronousSocketChannelImpl implements Port.PollableChannel
- 即nio2在linux 2.6后的底层实现还是epoll
- 通过epoll模拟异步
- 个人认为也许linux内核本身的aio实现方案其实并不是很完善,或多或少有这样或者那样的问题,即使用了aio,也没有明显的性能优势
- Not faster than NIO (epoll) on unix systems (which is true)
- implemented using IOCP on Windows
-
Reactor/Proactor
- 两种IO设计模式
- Reactor-Dispatcher/Notifier
- Don't call us, we'll call you
- Proactor-异步io
- Reactor通过某种变形,可以将其改装为Proactor,在某些不支持异步I/O的系统上,也可以隐藏底层的实现,利于编写跨平台代码
-
参考
- javadocs-io-enhancements
- macosx-SelectorProvider
- solaris/linux-SelectorProvider
- windows-SelectorProvider
- I/O Multiplexing
- netty-native-transports
- Why native epoll support is introduced in Netty?
- Java网络编程精解笔记4
- 为什么是EPOLL ? LT还是ET?
- man-epoll
- tengine-book
- netty NIO.2 support #2515
- Java IO: BIO, NIO, AIO
- 两种高性能I/O设计模式(Reactor/Proactor)的比较
Java I/O类库的发展和改进
- BIO(blocking)
- 采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁
- 该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1: 1 的正比关系,由于线程是Java虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者值死,不能对外提供服务
- ServerSocket/Socket/输入输出流(阻塞)
- 采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁
- 伪异步I/O
- 为了改进一线程一连接模型,后来又演进出了一种通过线程池或者消息队列实现1个或者多个线程处理N个客户端的模型
- 采用线程池和任务队列可以实现一种叫做伪异步的I/O通信框架
- 当有新的客户端接入时,将客户端的Socket封装成一个Task,投递到后端的线程池中进行处理,JDK的线程池维护一个消息队列和N个活跃线程,对消息队列中的任务进行处理
- 当对方发送请求或者应答消息比较缓慢,或者网络传输较慢时,读取输入流一方的通信线程将被长时间阻塞,如果对方要60s 才能够将数据发送完成,读取一方的I/O线程也将会被同步阻塞60s, 在此期间,其他接入消息只能在消息队列中排队
- 当消息的接收方处理缓慢的时候,将不能及时地从TCP缓冲区读取数据,这将会导致发送方的TCP window size( 滑动窗口)不断减小,直到为0,双方处于Keep-Alive状态,消息发送方将不能再向TCP缓冲区写入消息
- NIO
- SocketChannel和ServerSocketChannel,支持阻塞和非阻塞两种模式
- Buffer/Channel/Selector(多路复用器,可同时轮询多个Channel)
- java.nio.ByteBuffer的几个常用方法
- flip、clear、compact、mark、rewind、hasRemaining、isDirect等
- java.nio.ByteBuffer的几个常用方法
- 客户端发起的连接操作是异步的
- SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O 通信线程就可以处理其他的链路,不需要同步等待这个链路可用
- JDK的 Selector在 Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制
- AIO
- AsynchronousServerSocketChannel、AsynchronousSocketChannel
- CompletionHandler<V,A>
- V The result type of the I/O operation
- A The type of the object attached to the I/O operation
- 既然已经接收客户端成功了,为什么还要再次调用accept方法呢?原因是这样的:调用AsynchronousServerSocketChannel的accept方法后,如果有新的客户端连接接入,系统将回调我们传入的CompletionHandler实例的completed方法,表示新的客户端已经接入成功。因为一个AsynchronousServerSocketChannel可以接收成千上万个客户端,所以需要继续调用它的accept方法,接收其他的客户端连接,最终形成一个循环。每当接收一个客户读连接成功之后,再异步接收新的客户端连接
- 不选择Java原生NIO编程的原因
- N10的类库和API繁杂,使用麻烦
- 需要具备其他的额外技能做铺垫,例如熟悉Java多线程
- 可靠性能力补齐,工作景和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等问题
- JDK NIO 的 BUG, 例如臭名昭著的epollbug, 它会导致Selector空轮询,最终导致CPU100%
- 为什么选择Netty
- 健壮性、功能、性能、可定制性和可扩展性在同类框架中都是首屈一指的,它已经得到成百上千的商用项目验证
- API使用简单
- 预置了多种编解码功能,支持多种主流协议
- 可以通过ChannelHand丨er对通信框架进行灵活地扩展
- 性能高
- Netty修复了己经发现的所有JDKNIO BUG
- 社区活跃,版本迭代周期短
- 经历了大规模的商业应用考验,质量得到验证
Netty 入门
- ServerBootstrap、EventLoopGroup(boss)、EventLoopGroup(worker)、NioServerSocketChannel、ChannelOption、ChannelInitializer、ChannelPipeline、ChannelFuture、ChannelHandlerAdapter、ChannelHandlerContext
- Bootstrap、NioSocketChannel
- try/finally、shutdownGracefully(boss、worker)
- ChannelHandlerContext的 flush方法,它的作用是将消息发送队列中的消息写入SocketChannel中发送给对方.从性能角度考虑,为了防止频繁地唤醒Selector进行消息发送,Netty的 write方法并不直接将消息写入SocketChannel中,调用write方法只是把待发送的消息放到发送缓冲数组中,再通过调用flush方法,将发送缓冲区中的消息全部写到SocketChannel中
- 基于Netty开发的都是非Web的Java应用,它的打包形态非常简单,就是一个普通的.jar 包,通常可以使用Eclipse、Ant、Ivy、Gradle等进行构建
TCP 粘包/拆包问题的解决之道
- TCP是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,它们是连成一片的,其间并没有分界线。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为 , 一个完 整的包可能会被 TCP拆分成多个包进行发送,也有可能把多个小的包封装成个大的数据包发送,这就是所谓的TCP粘包和拆包问题。
- 由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决
- 消息定长,例如每个报文的大小为固定长度200字节,如果不够,空位补空格
- 在包尾增加回车换行符进行分割,例如FTP协议
- 将消息分为消息头和消息体,消息头中包含表示消息总长度(或者消息体长度)的字段,通常设计思路为消息头的第一个字段使用int32来表示消息的总长度
- 没有考虑读半包问题,这在功能测试时往往没有问题,但是一旦压力上来,或者发送大报文之后,就会存在粘包/拆包问题,如循环发送100条消息,则可能会出现TCP粘包
- 为了解决TCP粘包/拆包导致的半包读写问题,Netty默认提供了多种编解码器用于处理半包
- LineBasedFrameDecoder
- A decoder that splits the received {@link ByteBuf}s on line endings
- StringDecoder
- A decoder that splits the received {@link ByteBuf}s on line endings
- LineBasedFrameDecoder
分隔符和定长解码器的应用
- DelimiterBasedFrameDecoder
- A decoder that splits the received {@link ByteBuf}s by one or more delimiters
- FixedLengthFrameDecoder
- A decoder that splits the received {@link ByteBuf}s by the fixed number of bytes
编解码技术
- 基于Java提供的对象输入/输出流ObjectlnputStream和 ObjectOutputStream,可以直接把Java对象作为可存储的字节数组写入文件 ,也可以传输到网络上,Java序列化的目的:
- 网络传输
- 对象持久化
- Java序列化的缺点
- 无法跨语言
- 序列化后的码流太大
- 对于字符串
- byte[] value = this.userName.getBytes();
- buffer.putInt(value.length);
- buffer.put(value);
- 对于字符串
- 序列化性能太低
- 业界主流的编解码框架
- Google的Protobuf
- Facebook的Thrift
- JBoss Marshalling
- JBoss Marshalling是一个Java对象的序列化API包,修正了 JDK自带的序列化包的很多问题,但又保持跟java.io.Serializable接口的兼容
MessagePack编解码
-
MessagePack介绍
- It's like JSON. but fast and small
- MessagePack is an efficient binary serialization format. It lets you exchange data among multiple languages like JSON. But it's faster and smaller
- http://msgpack.org/
- 提供了对多语言的支持
-
API介绍
// Create serialize objects. List<String> src = new ArrayList<String>(); src. add (,,msgpackw); src.add("kumofs"); src.add("viver">; MessagePack msgpack = new MessagePack(); // Serialize byte[] raw = msgpack.write(src); // Deserialize directly using a template L±8t<String> dstl = msgpack. read (raw, Ten 5 >lates . tList (Ten^>lates. TString));
-
MessagePack编码器和解码器开发
- MessageToByteEncoder<I
- ByteToMessageDecoder、MessageToMessageDecoder<I
- LengthFieldBasedFrameDecoder extends ByteToMessageDecoder
- A decoder that splits the received {@link ByteBuf}s dynamically by the value of the length field in the message
- public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength,int lengthAdjustment, int initialBytesToStrip, boolean failFast)
- 功能很强大,可指定消息长度字段的偏移等,而不仅仅是消息头的第一个字段就是长度
- LengthFieldPrepender extends MessageToMessageEncoder<ByteBuf>
- An encoder that prepends the length of the message.
- 可自动前面加上消息长度字段
Google Protobuf 编解码
- 主要使用了netty默认提供的关于protobuf的编解码器
- ProtobufVarint32FrameDecoder extends ByteToMessageDecoder
- A decoder that splits the received {@link ByteBuf}s dynamically by the value of the Google Protocol Buffers
- ProtobufVarint32LengthFieldPrepender extends MessageToByteEncoder<ByteBuf>
- An encoder that prepends the the Google Protocol Buffers
- ProtobufEncoder extends MessageToMessageEncoder<MessageLiteOrBuilder>
- Encodes the requested Google Protocol Buffers Message And MessageLite into a {@link ByteBuf}
- ProtobufDecoder extends MessageToMessageDecoder<ByteBuf>
- Decodes a received {@link ByteBuf} into a Google Protocol Buffers Message And MessageLite
- 注意其构造函数要传一个MessageLite对象,即协议类型,用来反序列化
- ProtobufVarint32FrameDecoder extends ByteToMessageDecoder
BEFORE DECODE (302 bytes) AFTER DECODE (300 bytes)
+--------+---------------+ +---------------+
| Length | Protobuf Data |----->| Protobuf Data |
| 0xAC02 | (300 bytes) | | (300 bytes) |
+--------+---------------+ +---------------+
BEFORE ENCODE (300 bytes) AFTER ENCODE (302 bytes)
+---------------+ +--------+---------------+
| Protobuf Data |-------------->| Length | Protobuf Data |
| (300 bytes) | | 0xAC02 | (300 bytes) |
+---------------+ +--------+---------------+
- Protobuf的使用注意事项
- ProtobufDecoder仅仅负责解码,它不支持读半包。因此,在 ProtobufDecoder前面,
一定要有能够处理读半包的解码器- 使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包消息
- 继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder
- 继承ByteToMessageDecoder类,自己处理半包消息
- ProtobufDecoder仅仅负责解码,它不支持读半包。因此,在 ProtobufDecoder前面,
JBoss Marshalling 编解码
- JBoss的Marshalling完全兼容JDK序列化
- MarshallingDecoder extends LengthFieldBasedFrameDecoder
- Decoder which MUST be used with {@link MarshallingEncoder}
- 需要传入UnmarshallerProvider和maxObjectSize
- MarshallingEncoder extends MessageToByteEncoder<Object>
- {@link MessageToByteEncoder} implementation which uses JBoss Marshalling to marshal an Object
- 需要传入MarshallerProvider
- Netty的Marshalling编解码器支持半包和粘包的处理,对于开发者而言,只需要正确地将 Marshalling编码器和解码器加入到ChannelPipeline 中,就能实现对Marshalling序列化的支持
HTTP协议开发应用
- HTTP请求消息(HttpRequest)
- HTTP请求行
- HTTP消息头
- HTTP请求正文
- HTTP响应消息(HttpResponse)
- 状态行、消息报头、响应正文
- Netty HTTP文件服务器
- HttpRequestDecoder
- HttpObjectAggregator
- HttpResponseEncoder
- ChunkedWriteHandler、ChunkedFile
- FullHttpRequest、FullHttpResponse、DefaultFullHttpResponse
- Netty HTTP+ XML协议栈开发
- 很多基于HTTP的应用都是后台应用,HTTP仅仅是承载数据交换的一个通道,是一个载体而不是Web容器
- JiBX是一款非常优秀的XML (Extensible Markup Language) 数据绑定框架
- JiBX is a tool for binding XML data to Java objects
- Unmarshal是将XML文件转换成Java对象,而 Marshal则是将Java对象编排成规范的XML文件
- xpp-XML Pull Parsing
- 过程
- 构造请求消息HttpXmlRequest(封装一个FullHttpRequest和一个Object)
- 定义请求消息编码器HttpXmlRequestEncoder,对HttpXmlRequest进行编码
- 对请求消息中的业务object通过jibx序列化为xml字符串,随后将它封装成Netty的 ByteBuf
- 构造HTTP消息头-HttpHeaders/DefaultFullHttpRequest
- 请求消息消息体不为空,也没有使用Chunk方式,所以在HTTP消息头中设置消息体的长度Content-Length
- 后续Netty的 HTTP请求编码器继续对HTTP请求消息进行编码
- 定义请求消息解码器HttpXmlRequestDecoder
- 从HTTP消息体中获取请求码流,通过JiBx框架对它进行反序列化(FullHttpRequest#content),得到请求object对象,并封装为HttpXmlRequest
- 回调业务handler, 业务得到的就是解码后的POJO对象和HTTP消息头
- 注意-decoder有一个参数是业务obj的clazz对象
- 同理封装一个应答消息HttpXmlResponse(封装一个FullHttpResponse和一个Obect)
- 定义应答消息编码器HttpXmlResponseEncoder
- 同上,对应答消息中的object通过jibx序列化xml字符串并转为ByteBuf
- 构造HTTP应答消息FullHttpResponse,这里注意因为 Netty的 DefaultFullHttpResponse没有提供动态设置消息体content的接口,只能在第一次构造的时候设置内容,同上也需要设置Content-Length,Content-Type为text/xml
- 定义应答消息解码器HttpXmlResponseDecoder
- DefaultFullHttpResponse和HTTP应答消息反序列化后的object对象构造HttpXmlResponse(DefaultFullHttpResponse#content)
- 流程
- 客户端
- HttpResponseDecoder、HttpObjectAggregator、HttpXmlResponseDecoder(传入object clazz)、HttpRequestEncoder、HttpXmlRequestEncoder
- HttpXmlClientHandle
- channelActive时构造HttpXmlRequest
- messageReceived中直接得到HttpXmlResponse
- 服务端
- HttpRequestDecoder、HttpObjectAggregator、HttpXmlRequestDecoder(传入object clazz)、HttpResponseEncoder、HttpXmlResponseEncoder
- HttpXmlServerHandler
- messageReceived中直接得到HttpXmlRequest进行业务处理,然后发送HttpXmlResponse,如果非keepAlive模式,则发送完毕候关闭链接(通过在ChannelFuture加一个Listener监听)
- 客户端
WebSocket协议开发
- WebSocket是 HTML5 开始提供的一种浏览器与服务器间进行全双工通信的网络技术
- 在 WebSocketAPI中,浏览器和服务器只需要做个握手的动作,然后,浏览器和服务器之间就形成了一条快速通道,两者就可以直接互相传送数据了。WebSocket基于TCP双向全双工进行消息传递,在同一时刻,既可以发送消息,也可以接收消息,相比HTTP的半双工协议,性能得到很大提升
- WebSocket设计出来的目的就是要取代轮询和Comet技术,使客户端浏览器具备像
C/S 架构下桌面系统一样的实时通信能力 - 为了建立一个WebSocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求和通常的HTTP请求不同.包含了一些附加头信息,其中附加头信息“Upgrade:WebSocket”表明这是个申请协议升级的HTTP请求。服务器端解析这些附加的头信息,然后生成应答信息返回给客户端,客户端和服务器端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由地传速信息,并且这个连接会持续存在直到客户端或者服务器端的某一方主动关闭连接
- 握手成功之后,服务端和客户端就可以通过“messages”的方式进行通信了,一个消息由一个或者多个帧组成
- Netty WebSocket 协议开发
- Netty内置了WebSocket协议相关的api
- WebSocketServer
- HttpServerCodec、HttpObjectAggregator、ChunkedWriteHandler
- WebSocketServerHandler
- messageReceived,第一次握手请求消息由HTTP协议承载,所以它是一个HTTP消息,执行handleHttpRequest方法来处理WebSocket握手请求,对握手请求消息进行判断,如果消息头中没有包含Upgrade字段或者它的值不是websocket, 则返回HTTP 400响应
- 握手请求简单校验通过之后,开始构造握手工厂,创建握手处理类WebSocketServerHandshaker, 通过它构造握手响应消息返回给客户端,同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码(WebSocketServerHandshaker#handshake)
- 添加WebSocket Encoder和 WebSocket Decoder之后,服务端就可以自动对WebSocket
- 消息进行编解码了,后面的业务handler可以直接对WebSocket对象进行操作(WebSocketFrame)
- 直接对控制帧进行判断并返回应答消息
- 而客户端则是嵌套在html中,由js进行websocket的相关接口开发
私有协议栈开发
- 在传统的Java应用中,通常使用以下4 种方式进行跨节点通信
- 通过RM1进行远程服务调用
- 通过Java的 Socket+Java序列化的方式进行跨节点调用
- 利用一些开源的RPC框架进行远程服务调用,例如Facebook的Thrift、 Apache
的Avro等 - 利用标准的公有协议进行跨节点服务调用,例如HTTP+XML、RESTful+JSON或
者 WebService
- 跨节点的远程服务调用,除了链路层的物理连接外,还需要对请求和响应消息进行编解码。在请求和应答消息本身以外,也需要携带一些其他控制和管理类指令,例如链路建立的握手请求和响应消息、链路检测的心跳消息等。当这些功能组合到一起之后,就会形成私有协议
- Netty协议栈功能设计
- Netty协议栈用于内部各模块之间的通信,它基于TCP/IP协议栈,是一个类HTTP协议的应用层协议栈
- 在分布式组网环境下,每个Netty节 点 (Netty进程)之间建立长连接,使用Netty协议进行通信。Netty节点并没有服务端和客户端的区分,谁首先发起连接,谁就作为客户端,另一方自然就成为服务端。一个Netty节点既可以作为客户端连接另外的Netty节点,也可以作为Netty服务端被其他Netty节点连接
- 协议栈功能描述
- 承载了业务内部各模块之间的消息交互和服务调用
- 基于Netty的NIO通信框架,提供髙性能的异步通信能力
- 提供消息的编解码框架,可以实现POJO的序列化和反序列化
- 提供基于IP地址的白名申.接入认证机制
- 链路的有效性校验机制
- 链路的断连重连机制
- 通信模型
- Netty协议栈客户端发送握手请求消息,携带节点ID等有效身份认证信息
- Netty协议栈服务端对握手请求消息进行合法性校验,包括节点ID有效性校验、节点重复登录校验和IP地址合法性校验,校验通过后,返回登录成功的握手应答消息
- 链路建立成功之后,客户端发送业务消息
- 链路成功之后,服务端发送心跳消息
- 链路建立成功之后,客户端发送心跳消息
- 链路建立成功之后,服务端发送业务消息
- 服务端退出时,服务端关闭连接,客户端感知对方关闭连接后,被动关闭客户端连接
- Netty协议通信双方链路建立成功之后,双方可以进行全双工通信,无论客户端还是服务端,都可以主动发送请求消息给对方,通信方式可以是TWOWAY或者ONE WAY, 双方之间的心跳采用Ping-Pong机制,当链路处于空闲状态时,客户端主动发送Ping消息给服务端,服务端接收到Ping消息后发送应答消息Pong给客户端,如果客户端连续发送N 条 Ping消息都没有接收到服务端返回的Pong消息,说明链路已经挂死或者对方处于异常状态,客户端主动关闭连接,间隔周期T 后发起重连操作,直到重连成功
- 消息定义
- 消息头
- crcCode int 32位 消息校验码 = OxABEF(2字节) + 主版本号(1字节) + 次版本号(1字节)
- length int 32位 消息长度= 消息头的长度+消息体长度
- sessionID long 64位 集群节点全局唯一id
- type byte 8位 消息类型(包括握手请求、应答、心跳请求、应答等)
- priority byte 8位 消息优先级
- attachment Map<String,Object> 变长 可选字段,用于扩展消息头
- 消息体
- Object 变长
- 消息头
- 链路的建立
- 考虑到安全,链路建立需要通过基于IP 地址或者号段的黑白名单安全认证机制,在实际商用项目中,安全认证机制会更加严格,如通过密钥对用户名和密码进行安全认证
- 客户端与服务端链路建立成功之后,由客户端发送握手请求消息
- 服务端接收到客户端的握手请求消息之后,如 果 IP 校验通过,返回握手成功应答消息给客户端,应用层链路建立成功
- 链路的关闭
- 当对方宕机或者重启时,会主动关闭链路,另一方读取到操作系统的通知信号,得知对方REST链路,需要关闭连接,释放自身的句柄等资源
- proc/sys/net/ipv4/tcp_retries2
- 消息读写过程中,发生了 I/O 异常,需要主动关闭连接
- 心跳消息读写过程中发生了 I/O 异常,需要主动关闭连接
- 心跳超时,需要主动关闭连接
- 发生编码异常等不可恢复错误时,需要主动关闭连接
- 当对方宕机或者重启时,会主动关闭链路,另一方读取到操作系统的通知信号,得知对方REST链路,需要关闭连接,释放自身的句柄等资源
- 可靠性设计
- Netty协议栈可能会运行在非常恶劣的网络环境中,网络超时、闪断、对方进程僵死或者处理缓慢等情况都有可能发生
- 心跳机制
- 在网络空闲时采用心跳机制来检测链路的互通性, 一旦发现网络故障,立即关闭链路,主动重连
- 当网络处于空闲状态持续时间达到T(连续周期T没有读写消息)时,客户端主动发送Ping心跳消息给服务端
- 如果在下一个周期T到来时客户端没有收到对方发送的Pong心眺应答消息或者读取到服务端发送的其他业务消息,则心跳失败计数器加1
- 每当客户端接收到服务的业务消息或者Pong应答消息时,将心跳失败计数器清零:连续N次没有接收到服务端的Pong消息或者业务消息,则关闭链路,间隔INTERVAL时间后发起重连操作
- 服务端网络空闲状态持续时间达到T后,服务端将心跳失败计数器加1;只要接收到客户端发送的Ping消息或者其他业务消息,计数器清零
- 服务端连续N次没有接收到客户端的Ping消息或者其他业务消息,则关闭链路,释放资源,等待客户端重连
- 通过Ping-Pong双向心跳机制,可以保证无论通信哪一方出现网络故障,都能被及时地检测出来。为了防止由于对方短时间内繁忙没有及时返回应答造成的误判,只有连续N次心跳检测都失败才认定链路己经损害,需要关闭链路并重建链路
- 当读或者写心跳消息发生I/O异常的时候,说明链路己经中断,此时需要立即关闭链路,如果是客户端,需要重新发起连接。如果是服务端,需要清空缓存的半包信息,等待客户端重连
- 之前的项目中则直接是客户端每30s向服务器发送一个ping消息同时服务器返回一个ping消息;如果30s左右(网络延迟),链路空闲则断开链接;偌一方出现宕机则30s后可直接检测数来--但相对比而言,作者的方法更为严谨一些
- 重连机制
- 如果链路中断,等待INTERVAL时间后,由客户端发起重连操作,如果重连失败,间隔周期INTERVAL后再次发起重连,直到重连成功
- 为了保证服务端能够有充足的时间释放句柄资源,在首次断连时客户端需要等待INTERVAL时间之后再发起重连,而不是失败后就立即重连
- 为了保证句柄资源能够及时释放,无论什么场景下的重连失败,客户端都必须保证自身的资源被及时释放
- 重连失败后,需要打印异常堆栈信息,方便后续的问题定位
- 重复登录保护
- 当客户端握手成功之后,在链路处于正常状态下,不允许客户端重复登录,以防止客户端在异常状态下反复重连导致句柄资源被耗尽
- 缓存客户端的地址列表,通过该列表检查客户端是否已登陆
- 当服务端连续N次心跳超时之后需要主动关闭链路,清空该客户端的地址缓存信息,以保证后续该客户端可以重连成功,防止被重复登录保护机制拒绝掉
- 猜测是可能会出现类似客户端认为旧链接已经logout了,尝试重新登陆;但是服务器认为旧的链路还在(如客户端宕机)等;所以还是需要心跳机制辅助
- 消息缓存重发
- 无论客户端还是服务端,当发生链路中断之后,在链路恢复之前,缓存在消息队列中待发送的消息不能丢失,等链路恢复之后,重新发送这些消息,保证链路中断期间消息不丢失
- 考虑到内存溢出的风险,建议消息缓存队列设置上限,当达到上限之后,应该拒绝继续向该队列添加新的消息
- 安全性设计
- 为了保证整个集群环境的安全,内部长连接采用基于IP 地址的安全认证机制,服务端对握手请求消息的IP 地址进行合法性校验-白名单
- 如果将Netty协议栈放到公网中使用,需要采用更加严格的安全认证机制,例如基于密钥和AES加密的用户名+密码认证机制,也可以采用SSL/TSL安全传输
- 可扩展性设计
- 通过Netty消息头中的可选附件attachment字段,业务可以方便地进行自定义扩展
- Netty协议栈架构需要具备一定的扩展能力,例如统一的消息拦截、接口日志、安全、加解密等可以被方便地添加和删除,不需要修改之前的逻辑代码,类 似 Servlet的 FilterChain和 AOP, 但考虑到性能因素,不推荐通过AOP来实现功能的扩展
- 直接通过ChannelPipeline即可
- 开发过程
- 定义协议消息,NettyMessage,包括消息头Header和消息体Object
- 消息头则包括之前说的crcCode、length等字段
- 定义编解码类
- NettyMessageEncoder extends MessageToByteEncoder<NettyMessage>
- 编码header
- 对于header中的attachment,则首先写入map的长度,然后遍历map,写key的长度,写key的bytes(String),最后通过JBoss Marshalling对value进行编码
- 注意在作者的这个示例中,自己手动写的MarshallingEncoder,其实netty已经提供了MarshallingEncoder(参数不同),通过源代码可以看到,encode核心代码部分基本相同
- 编码消息体
- 最后将消息长度重写
- NettyMessageDecoder extends LengthFieldBasedFrameDecoder
- 这里用到了Netty的LengthFieldBasedFrameDecoder解码器,它支持自动的TCP粘包和半包处理,只需要给出标识消息长度的字段偏移量和消息长度自身所占的字节数,netty就能自动实现对半包的处理
- 解码过程和编码过程恰恰相反,这里不再详述
- 握手和安全认证
- LoginAuthReqHandler,在通道激活时发起握手请求
- 客户端握手请求发送之后,按照协议规范,服务端需要返回握手应答消息(channelRead中),首先判断消息是否是握手应答消息,如果不是,直接透传给后面的 ChannelHandler 进行处理;如果是握手应答消息,则对应答结果进行判断,如果非0 , 说明认证失败,关闭链路,重新发起连接
- LoginAuthRespHandler#channelRead,对重复登陆进行判断,然后白名单判断,如果均校验成功则构造握手应答消息
- 当发生异常关闭链路的时候需要将客户端的信息从登录注册表中删除,以保证后续客户端可以重连成功
- NettyMessageEncoder extends MessageToByteEncoder<NettyMessage>
- 心跳检测机制
- 握手成功之后,由客户端主动发送心跳消息,服务端接收到心跳消息之后,返回心跳应答消息
- 当握手成功之后,握手请求Handler会继续将握手成功消息向下透传至HeartBeatReqHandler,接收到之后对消息进行判断,如果是握手成功消息,则启动无限循环
定时器用于定期发送心跳消息。由于NioEventLoop是一个Schedule, 因此它支持定时器的执行,如每5s发送一条心跳消息 - 而服务器的HeartBeatRespHandler则比较简单,接收到心跳请求消息之后,构造心跳应答消息返回即可'
- 心跳超时的实现非常简单,直接利用Netty的 ReadTimeoutHandler机制,当一定周期内 (默认值50s) 没有读取到对方任何消息时,需要主动关闭链路。如果是客户端,重新发起连接;如果是服务端,释放资源,清除客户端登录缓存信息,等待服务端重连
- 可参考ReadTimeoutHandler#readTimedOut
- 断线重连
- 当客户端感知断连事件之后,释放资源,重新发起连接
- try块代码最后一行是future.channel().closeFuture().sync(),线程等待链接关闭
- 当关闭后执行finally块代码,尝试重连
- 客户端代码
- NettyMessageDecoder(1024 * 1024, 4, 4)、NettyMessageEncoder、ReadTimeoutHandler(50)、LoginAuthReqHandler、HeartBeatReqHandler
- 这次我们绑定了本地端口,主要用于服务端重复登录保护,另外,从产品管理角度看,一般情况下不允许系统随便使用随机端口
- 利用Netty的 ChannelPipeline和 Channe丨Handler机制,可以非常方便地实现功能解耦
和业务产品的定制。例如本例程中的心跳定时器、握手请求和后端的业务处理可以通过不同的Handler来实现,类似于AOP。通过Handler Chain的机制可以方便地实现切面拦截和定制,相比于AOP它的性能更高
- 服务端代码
- NettyMessageDecoder(1024 * 1024, 4, 4)、NettyMessageEncoder、ReadTimeoutHandler(50)、LoginAuthRespHandler、HeartBeatRespHandler
- 客户端宕机重启之后,服务端需要能够清除缓存信息,允许客户端重新登录
- 定义协议消息,NettyMessage,包括消息头Header和消息体Object
protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
if (!closed) {
ctx.fireExceptionCaught(ReadTimeoutException.INSTANCE);
ctx.close();
closed = true;
}
}
- 总结
- 当链路断连的时候,已经放入发送队列中的消息不能丢失,更加通用的做法是提供通知机制,将发送失败的消息通知给业务测,由业务做决定:是丢弃还是缓存重发
- 关于编解码的一些测试
- sendBuf.setInt(4, sendBuf.readableBytes() - 8)
- 这个是编码最后将长度值写入了,这个为啥-8呢
- NettyMessageDecoder(1024 * 1024, 4, 4))
- 可以看到解码传入的LengthFieldBasedFrameDecoder参数分别是4,4,两个4分别代码lengthFieldOffset和lengthFieldLength,即长度字段的偏移和长度字段的表示长度
- 从下面的源码注释来看,这个长度其实是指长度字段后的所有字节的长度,从解码角度来说也是如此,从源码解读亦如此,请注意这个问题
- 所以建议好好看源代码,源代码才是王道
- sendBuf.setInt(4, sendBuf.readableBytes() - 8)
lengthFieldOffset = 2
lengthFieldLength = 3
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+--------+--------+----------------+ +----------+----------+--------------
|Header 1| Length |Actual Content |----->| Header 1 | Length | Actual Content |
|0xCAFE | 0x00000C| "HELLO, WORLD"| | 0xCAFE | 0x00000C | "HELLO, WORLD"
服务端创建
- Netty服务端创建关键步骤
- 创建ServerBootstrap实例。ServerBootstrap是 Netty服务端的启动辅助类,它提供了一系列的方法用于设置服务端启动相关的参数-门面模式对各种能力进行抽象和封装
- 引入Builder模式,因为参数太多
- 设置并绑定Reactor线程池。Netty的 Reactor线程池是EventLoopGroup,它实际就是 EventLoop的数组。EventLoop的职责是处理所有注册到本线程多路复用器Selector上的Channel,Selector的轮询操作由绑定的EventLoop线程run方法驱动,在一个循环体内循环执行.
- EventLoop的职责不仅仅是处理网络I/O 事件,用户自定义的Task和定时任务Task也 统 由 EventLoop负责处理,这样线程模型就实现了统一
- 从调度层面看,也不存在从EventLoop线程中再启动其他类型的线程用于异步执行另外的任务,这样就避免了多线程并发操作和锁竞争,提升了I/O 线程的处理和调度性能
- 设置并绑定服务端Channel。作为NIO服务端,需要创建ServerSocketChannel,Netty对原生的NIO类库进行了封装,对应实现是NioServerSocketChannel
- Netty通过工厂类,利用反射创建NioServerSocketChannel对象。由于服务端监听端口往往只需要在系统启动时才会调用,因此反射对性能的影响并不大
- ServerBootstrapChannelFactory
- 链路建立的时候创建并初始化ChannelPipeline。ChannelPipeline并不是NIO服务端必需的,它本质就是一个负责处理网络事件的职责链,负责管理和执行ChannelHandler。网络事件以事件流的形式在ChannelPipeline中流转。典型的网络事件如下:
- 链路注册、链路激活、链路断开、接收到请求消息、请求消息接收并处理完毕、发送应答消息、链路发生异常、发生用户自定义事件
- 初始化 ChannelPipeline 完成之后,添加并设置 ChannelHandler是 Netty提供给用户定制和扩展的关键接口。利用ChannelHandler用户可以完成大多数的功能定制,例如消息编解码、心跳、安全认证、TSL/SSL认证、流量控制和流景整形等。
- Netty同时也提供了大量的系统ChannelHandler供用户使用 ,比较实用的系统ChannelHandler如:
- ByteToMessageCodec、LengthFieldBasedFrameDecoder、 LoggingHandler、SslHandler、 IdleStateHandler、 ChannelTrafficShapingHandler、Base64Decoder 和 Base64Encoder
- Netty同时也提供了大量的系统ChannelHandler供用户使用 ,比较实用的系统ChannelHandler如:
- 绑定并启动监听端口。在绑定监听端口之前系统会做一系列的初始化和检测工作,完成之后,会启动监听端口,并将ServerSocketChannel注册到Selector上监听客户端连接
- Selector轮询。由Reactor线程NioEventLoop负责调度和执行Selector轮询操作,选择准备就绪的Channel集合
- 参考源代码NioEventLoopGroup、NioEventLoop的源代码实现
- 会传入SelectorProvider.provider用来打开Selector
- 每一个NioEventLoop持有一个Selector
- 参考源代码NioEventLoopGroup、NioEventLoop的源代码实现
- 当轮询到准备就绪的Channel之后,就 由 Reactor线 程 NioEventLoop执ChannelPipeline的相应方法,最终调度并执行ChannelHandler
- 源码请参考ChannelPipeline
- fireXXX->DefaultChannelPipeline->AbstractChannelHandlerContext.invokeXXX(head)
- 执行Netty系统ChannelHandler和用户添加定制的ChannelHandler
- 创建ServerBootstrap实例。ServerBootstrap是 Netty服务端的启动辅助类,它提供了一系列的方法用于设置服务端启动相关的参数-门面模式对各种能力进行抽象和封装
I/O Request
* via {@link Channel} or
* {@link ChannelHandlerContext}
* |
* +---------------------------------------------------+---------------+
* | ChannelPipeline | |
* | \|/ |
* | +---------------------+ +-----------+----------+ |
* | | Inbound Handler N | | Outbound Handler 1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler N-1 | | Outbound Handler 2 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ . |
* | . . |
* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
* | [ method call] [method call] |
* | . . |
* | . \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 2 | | Outbound Handler M-1 | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* | | \|/ |
* | +----------+----------+ +-----------+----------+ |
* | | Inbound Handler 1 | | Outbound Handler M | |
* | +----------+----------+ +-----------+----------+ |
* | /|\ | |
* +---------------+-----------------------------------+---------------+
* | \|/
* +---------------+-----------------------------------+---------------+
* | | | |
* | [ Socket.read() ] [ Socket.write() ] |
* | |
* | Netty Internal I/O Threads (Transport Implementation) |
* +-------------------------------------------------------------------+
- Netty服务端创建源码分析
- EventLoopGroup acceptorGroup = new NioEventLoopGroup()、EventLoopGroup IOGroup = new NioEventLoopGroup()
- acceptor线程池
- io-processor线程池
- 并不是必须要创建两个不同的EventLoopGroup, 也可以只创建一个并共享
- TCP的backlog参数
- backlog指定了内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列:未链接队列和已连接队列
- 和tcp建立链接的三次握手相关
- backlog被规定为两个队列总和的最大值
- Netty默认的backlog为100,Lighttpd中此值达到128x8,可根据实际场景和网络状况进行灵活设置
- backlog指定了内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列:未链接队列和已连接队列
- TCP参数设置完成后,用户可以为启动辅助类和其父类分别指定Handler。两类Handler的用途不同
- AbstractBootstrap#handler,指定父类的handler
- ServerBootstrap#childHandler,指定子类的handler
- 通过看源代码,发现此书书中的描述有点不准确(或者说不直观),其实父类中的handler是添加到ServerSocketChannel的pipeline的,这个handler在server启动后就行执行,如LoggingHandler
- 子类的handler是添加导SocketChannel的pipeline的
- 最后一步,就是绑定本地端口,启动服务
- AbstractBootstrap#doBind、initAndRegister
- ServerBootstrap#init(Channel channel)
- 设置Socket参数和NioServerSocketChannel的附加属性
- 将AbstractBootstrap的Handler添加到 NioServerSocketChannel的 ChannelPipeline中
- 将用于服务端注册的 Handler ServerBootstrapAcceptor 添加到 ChannelPipeline 中
- ServerBootstrapAcceptor#channelRead
- child.pipeline().addLast(childHandler),将子handler加到了SocketChannel的pipeline中
- ServerBootstrapAcceptor#channelRead
- 当 NioServerSocketChannel 初始化完成之后,需要将它注册到Reactor线程的多路复用器上监听新客户端的接入
- AbstractChannel$AbstractUnsafe#register#register0
- AbstractNioChannel#doRegister->注册到NioEventLoop的Selector上
- pipeline.fireChannelRegistered,触发注册事件,传递给pipeline,执行父类的 handler
- 另外判断是否是NioEventLoop自身发起的操作。如果是,则不存在并发操作,直接执行Channel注册:如果由其他线程发起,则封装成一个Task放入消息队列中异步执行。此处,由于是由ServerBootstrap所在线程执行的注册操作,所以会将其封装成Task投递到NioEventLoop中执行
- ServerBootstrap#createChannel
- EventLoop eventLoop = group().next()
- 这里顺序选取了一个线程,注意这里的group是指parentGroup
- EventLoop eventLoop = group().next()
- AbstractChannel$AbstractUnsafe#register#register0
- EventLoopGroup acceptorGroup = new NioEventLoopGroup()、EventLoopGroup IOGroup = new NioEventLoopGroup()
- 客户端接入源码分析
- 负责处理网络读写、连接和客户端请求接入的Reactor线程就是NioEventLoop,当多路复用器检测到新的准备就绪的Channel时,默认执行processSelectedKeysOptimized
- NioEventLoop#run
- 由于Channel的Attachment是NioServerSocketChannel, 所以执processSelectedKey
- 根据就绪的操作位,执行不同的操作
- unsafe.read()
- NioMessageUnsafe#read
- NioServerSocketChannel#doReadMessages
- 接收新的客户端连接(调用accept)并创建NioSocketChannels
- 注意初始化new NioSocketChannel(this, childEventLoopGroup().next(), ch),即要传入childGroup中的一个线程
- 接收到新的客户端连接后,触发ChannelPipeline ChannelRead方法
- 执行headChannelHandlerContext的fireChannelRead 方法,事件在 ChannelPipeline中传递,执行ServerBootstrapAcceptor的channelRead方法
- 将启动时传入的childHandler加入到客户端SocketChannel的 ChannelPipeline中
- 设置客户端SocketChannel的TCP参数
- 注册SocketChannel到多路复用器
- NioServerSocketChannel#doReadMessages
- NioMessageUnsafe#read
- 负责处理网络读写、连接和客户端请求接入的Reactor线程就是NioEventLoop,当多路复用器检测到新的准备就绪的Channel时,默认执行processSelectedKeysOptimized
- 总结
- 源代码分析是基于netty-all-5.0.0.Alpha1版本,其实有些代码和4.x还是有一些区别的
- 对于NioServerSocketChannel和NioSocketChannel,在其构造中均会指定readInterestOp
- super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT)
- AbstractNioMessageServerChannel
- super(parent, eventLoop, ch, SelectionKey.OP_READ)
- AbstractNioChannel
- super(null, eventLoop, childGroup, newSocket(), SelectionKey.OP_ACCEPT)
客户端创建
- Bootstrap是Socket客户端创建工具类,用户通过Bootstrap可以方便地创建的客户端并发起异步 TCP连接操作
- Netty客户端创建流程分析
- 用户线程创建Bootstrap实例,通过API设置创建客户端相关的参数,异步发起客户端连接
- 创建处理客户端连接、I/O读写的Reactor线程组NioEventLoopGroup。可以通过构造函数指定I/O线程的个数,默认为CPU内核数的2倍
- 通过Bootstrap的ChannelFactory和用户指定的Channel类型创建用于客户端连接的NioSocketChannel,它的功能类似于JDK NIO类库提供的SocketChannel
- 创建默认的Channel Handler Pipeline, 用于调度和执行网络事件
- 注意这里调用的是父类的handler方法
- 异步发起TCP连接,判断连接是否成功。如果成功,则直接将NioSocketChannel注册到多路复用器上,监听读操作位,用于数据报读取和消息发送:如果没有立即连接成功,则注册连接监听位到多路复用器,等待连接结果
- 注册对应的网络监听状态位到多路复用器
- 由多路复用器在1/0现场中轮询各Channel, 处理连接结果
- 如果连接成功,设置Future结果,发送连接成功事件,触发ChannelPipeline执行
- 由ChannelPipeline调度执行系统和用户的ChannelHandler, 执行业务逻辑
- Netty客户端创建源码分析
- Bootstrap是 Netty提供的客户端连接工具类,主要用于简化客户端的创建
- 客户端相对于服务端,只需要一个处理I/O读写的线程组即可
- Bootstrap也提供了客户端TCP参数设置接口
- SO_TIMEOUT 控制读取操作将阻塞多少毫秒
- SO_SNDBUF 套接字使用的发送缓冲区大小
- SO_RCVBUF 套接字使用的接收缓冲区大小
- SO_REUSEADDR
- 用于决定如果网络上仍然有数据向旧的ServerSocket传输数据 ,是否允许新的 ServerSocket绑定到与旧的ServerSocket同样的端口上
- CONNECT_TIMEOUT_MILLIS
- 客户端连接超时时间,由于NIO原生的客户端并不提供设置连接超时的接口,因此 , Netty采用的是自定义连接超时定时器负责检测和超时控制
- TCP_N0_DELAY
- 激活或禁止TCP_NODELAY套接字选项,它决定是否使用Nagle算法。如果是时延敏感型的应用,建议关闭Nagle算法
- 对于TCP客户端连接,默认使用NioSocketChannel
- BootstrapChannelFactory利用channelClass类型信息 , 通过反射机制创建NioSocketChannel对象
- Bootstrap为了简化Handle 的编排,提供 Channellnitializer,它继承了 ChannelHandlerAdapter, 当 TCP链路注册成功之后,调用initChannel接口,用于设置用户ChannelHandler
- ChannelInitializer#channelRegistered
- initChannel
- ChannelInitializer#channelRegistered
- ChannelFuture f = b.connect(host, port).sync(),发起客户端连接
- 客户端连接操作
- Bootstrap#connect
- doConnect
- 首先要创建和初始化NioSocketChannel
- AbstractBootstrap#initAndRegister
- createChannel-channelFactory().newChannel(group().next())
- 这里的group为父类的parentGroup
- init(channel)
- 初始化Channel之后,将其注册到Selector上
- channel.unsafe().register(regFuture)
- createChannel-channelFactory().newChannel(group().next())
- AbstractBootstrap#initAndRegister
- 首先要创建和初始化NioSocketChannel
- doConnect0
- 从该操作开始,连接操作切换到了Netty的NIO线程NioEventLoop中进行,此时客户端返回,连接操作异步执行
- doConnectO最终调用HeadHandler的connect方法
- AbstractNioUnsafe#connect
- NioSocketChannel#doConnect
- javaChannel().socket().bind(localAddress)
- boolean connected = javaChannel().connect(remoteAddress)
- if (!connected) {selectionKey().interestOps(SelectionKey.OP_CONNECT)}
- 异步连接返回之后,需要判断连接结果,如果连接成功,则触发ChannelActive事件,否则注册SelectionKey.OP_CONNECT到多路复用器
- NioSocketChannel#doConnect
- AbstractNioUnsafe#connect
- doConnect
- 异步连接结果通知
- NioEventLoop的 Selector轮询客户端连接Channel,当服务端返回握手应答之后,对连接结果进行判断
- NioEventLoop#processSelectedKey
- 监听SelectionKey.OP_CONNECT
- AbstractNioUnsafe#finishConnect
- doFinishConnect
- NioSocketChannel#doFinishConnect,判断JDK的SocketChannel的连接结果
- 连接成功之后,调用fulfillConnectPromise方法,触发链路激活事件,该事件由ChannelPipeline进行传播
- doFinishConnect
- AbstractNioUnsafe#finishConnect
- NioEventLoop的 Selector轮询客户端连接Channel,当服务端返回握手应答之后,对连接结果进行判断
- Bootstrap#connect
- 客户端连接超时机制
- 在创建Netty客户端的时候,可以通过ChannelOption.CONNECT_TIMEOUT_MILLIS配置项设置连接超时时间
- 发起连接的同时,启动连接超时检测定时器
- AbstractNioUnsafe.connect
- 一旦超时定时器执行,说明客户端连接超时
- 如果在连接超时之前获取到连接结果,则删除连接超时定时器,防止其被触发
- AbstractNioUnsafe#finishConnect中finally块的处理
- AbstractNioUnsafe.connect
ByteBuf和相关辅助类
- NIO的ByteBuffer的主要缺点
- 长度固定,一旦分配完成,它的容量不能动态扩展和收缩
- 只有一个标识位置的指针position, 读写的时候需要手工调用flip和rewind等,使用者必须小心谨慎地处理这些API
- 一些高级和实用的特性不支持
- Netty的ByteBuf
- 7种 ava基础类型、byte数组、ByteBuffer (ByteBuf) 等的读写
- 缓冲区自身的copy和 slice等
- 设置网络字节序
- 构造缓冲区实例
- 操作位置指针等方法
- ByteBuf通过两个位置指针来协助缓冲区的读写操作,读操作使用readerlndex, 写操作使用 writerlndex
- 由于写操作不修改readerlndex指针,读操作不修改writerlndex指针,因此读写之间不再需要调整位置指针,这极大地简化了缓冲区的读写操作
- ByteBuf会自动进行动态扩展
- ByteBuf功能介绍
- 顺序读操作(read)
- 顺序写操作(write)
- readerlndex和writerlndex
- 调用 ByteBuf的read 操作时,从readerlndex处开始读取。readerlndex到 writerlndex之间的空间为可读的字节缓冲区;从 writerlndex到 capacity之间为可写的字节缓冲区;0到readerlndex 之间是已经读取过的缓冲区,可以调用discardReadBytes操作来重用这部分空间,以节约内存,防止ByteBuf的动态扩张
- Discardable bytes
- 将已读的字节部分丢弃
- 调用discardReadBytes会发生字节数组的内存复制,所以,频繁调用将会导致性能下降
- Readable bytes 和 Writable bytes
- 可读空间段是数据实际存储的区域,以read或者skip开头的任何操作都将会从
readerlndex开始读取或者跳过指定的数据,操作完成之后readerlndex增加了读取或者跳
过的字节数长度 - 可写空间段是尚未被使用可以填充的空闲空间,任何以write开头的操作都会从writerlndex开始向空闲空间写入字节,操作完成之后writerlndex增加了写入的字节数长度
- 可读空间段是数据实际存储的区域,以read或者skip开头的任何操作都将会从
- Clear操作
- Mark和Rest操作
- 查找操作
- Derived buffers
- duplicate 返回当前ByteBuf的复制对象,共享缓冲区,读写索引独立
- copy 复制一个新的ByteBuf对象,内容和索引都独立
- slice 可读子缓冲区(起始位置从readerlndex到 writerlndex),共享内容,读写索引独立维护
- 转换成标准的ByteBuffer
- ByteBuf#nioBuffer、nioBuffer(index,length)
- 返回后的ByteBuffer无法感知原ByteBuf的动态扩展操作
- 随机读写(set和 get)
- 源码概要分析
- 从内存分配的角度看,ByteBuf可以分为两类
- 堆内存(HeapByteBuf) 字节
- 可以被JVM 自动回收;缺点就是如果进行Socket的 I/O 读写,需要额外做一次内存复制,将堆内存对应的缓冲区复制到内核Channel中
- 直接内存(DirectByteBuf) 字节
- 区:非堆内存,它在堆外进行内存分配;但是将它写入或者从Socket Channel中读取时,由于少了一次内存复制,速度比堆内存快
- ByteBuf的最佳实践是在I/O通信线程的读写缓冲区使用DirectByteBuf, 后端业务消息的编解码模块使用HeapByteBuf, 这样组合可以达到性能最优
- 从内存回收角度看,ByteBuf也分为两类:基于对象池的ByteBuf和普通ByteBuf。两者的主要区别就是基于对象池的ByteBuf可以重用ByteBuf对象,它自己维护了一个内存池,可以循环利用创建的ByteBuf,提升内存的使用效率,降低由于高负载导致的频繁GC
- 堆内存(HeapByteBuf) 字节
- AbstractByteBuf
- ByteBuffer的一个最大的缺点就是一旦完成分配之后不能动态调整其容量。由于很多场景下我们无法预先判断需要编码和解码的POJO对象长度,因此只能根据经验数据给个估计值。如果这个值偏大,就会导致内存的浪费;如果这个值偏小,遇到大消息编码的时候就会发生缓冲区溢出异常。使用者需要自己捕获这个异常,并重新计算缓冲区的大小,将原来的内容复制到新的缓冲区中,然后重置指针。这种处理策略对用户非常不友好,而且稍有不慎,就会引入新的问题
- 采用倍增或者步进算法,动态扩张需要进行内存复制,频繁的内存复制会导致性能下降;采用先倍增后步进
- AbstractReferenceCountedByteBuf
- 引用计数
- ReferenceCounted
- A reference-counted object that requires explicit deallocation
- retain
- increases the reference count
- release
- decreases the reference count
- UnpooledHeapByteBuf
- UnpooledHeapByteBuf是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池
技术实现,这就意味着每次I/O 的读写都会创建个新的UnpooledHeapByteBuf
- UnpooledHeapByteBuf是基于堆内存进行内存分配的字节缓冲区,它没有基于对象池
- PooledByteBuf
- PoolArena,Netty的内存池实现类
- 为了集中管理内存的分配和释放,同时提高分配和释放内;时候的性能,很多框架和应用都会通过预先申请一大块内存,然后通过提供相应的分配和释放接口来使用内存。这样一来,对内存的管理就被集中到几个类或者函数中,由于不再频繁使用系统调用来申请和释放内存,应用或者系统的性能也会大大提髙。在这种设计思路下,预先申请的那一大块内存就被称为Memory Arena
- Netty的 PoolArena是由多个Chunk组成的大块内存区域,而每个Chunk则由一个或者多个Page组成
- 由于采用内存池实现,所以新创建PooledDirectByteBuf对象时不能直接new —个实例,而是从内存池中获取,然后设置引用计数器的值
- ByteBufHolder
- 相当于协议的消息体容器
- ByteBufAllocator
- responsible to allocate buffers
- CompositeByteBuf
- A virtual buffer which shows multiple buffers as a single merged buffer
- CompositeByteBuf允许将多个ByteBuf的实例组装到一起,形成一个统一的视图
- 如某个协议POJO对象包含两部分:消息头和消息体,它们都是ByteBuf对象。当需要对消息进行编码的时候需要进行整合
- ByteBufUtil
- A collection of utility methods that is related with handling ByteBuf
- encodeString、decodeString、hexDump
- 从内存分配的角度看,ByteBuf可以分为两类
Channel 和 Unsafe
- 类似于NIO的 Channel, Netty提供了自己的Channel和其子类实现,用于异步I/O操作和其他相关的操作
- Unsafe是个内部接口,聚合在Channel中协助进行网络读写相关的操作,因为它的设计初衷就是Channel的内部辅助类,不应该被Netty框架的上层使用者调用,所以被命名为Unsafe
- io.netty.channd.Channel是Netty网络操作抽象类,它聚合了一组功能,包括但不限于网路的读、写,客户端发起连接,主动关闭连接,链路关闭,获取通信双方的网络地址等。它也包含了Netty框架相关的一些功能,包括获取该Channel的EventLoop , 获取缓冲分配器 ByteBufAllocator和pipeline等
- 为什么不使用JDK NIO 原生的Channel
- JDK的SocketChannel和ServerSocketChannel没有统一的Channel接口
- JDK的SocketChannel和ServerSocketChannel的主要职责就是网络 I/O 操作,由于它们是SPI(service provider interface)类接口,由具体的虚拟机厂家来提供;直接实现SocketChannel 和 ServerSocketChannel抽象类,其工作量和重新开发一个新的 Channel 功能类是差不多的
- Netty的Channel需要能够跟 Netty的整体架构融合在一起,例如 I/O 模型、基于
ChannelPipeline 的定制模型,以及基于元数据描述配置化的 TCP参数等,这些 JDK的SocketChannel和 ServerSocketChannel都没有提供,需要重新封装 - 自定义的Channel, 功能实现更加灵活
- Channel的功能介绍
- 网络I/O操作
- read、write、flush、close、disconnect、close、connect、bind等
- ctx.close() starts to flow through the ChannelPipeline from the point of the ChannelHandlerContext while ctx.channel().close() will start from the tail of the ChannelPipeline all the time
- 从NioSocketChannel的doDisconnect实现来看,其直接调用了doClose,所以从TCP一层面上可以理解,disconnect和close一样
- 其他
- 通过eventLoop()方法可以获取到Channel注册的EventLoop
- 通过metadata()方法就可以获取当前Channel的TCP参数配置
- parent()。对于服务端Channel而言,它的父Channel为空:对于客户端Channel, 它的 Channel就是创建它的ServerSocketChannel
- 用户获取Channel标识的id
- ChannelId、DefaultChannelId
- 网络I/O操作
- Channel源码分析
- AbstractChannel
- 聚合了所有Channel使用到的能力对象,由AbstractChannel提供初始化和统一封装
- 网络读写操作会触发CharmelPipeline对应的事件方法。Netty基于事件驱动,我们也可以理解为当Chnanel进行I/O 操作时产生生对应的I/O 事件,然后驱动事件在ChannelPipeline中传播,由对应的ChannelHandler对事件进行拦截和处理
- 网络 I/O 操作直接调用 DefaultChannelPipeline 的相关方法,由DefaultChannelPipeline中对应的ChannelHandler进行具体的逻辑处理
- 提供了一些公共API具体实现 例如localAddress()和remoteAddress()
- AbstractNioChannel
- Abstract base class for {@link Channel} implementations which use a Selector based approach.
- doRegister、doBeginRead
- AbstractNioByteChannel
- {@link AbstractNioChannel} base class for {@link Channel}s that operate on bytes
- doWrite,注意处理半包
- 注意环形数组-ChannelOutboundBuffer#Entry[] buffer
- AbstractNioMessageChannel
- {@link AbstractNioChannel} base class for {@link Channel}s that operate on messages
- 一个发送的是ByteBuf或者FileRegion, 它们可以直接被发送:另一个发送的则是POJO对象
- AbstractNioMessageServerChannel
- 定义了一个 EventLoopGroup 类型的childGroup, 用于给新接入的客户端NioSocketChannel分配EventLoop
- NioServerSocketChannel
- 很多方法实现通过调用javaChannel
- doReadMessages,javaChannel().accept()
- new NioSocketChannel(this, childEventLoopGroup().next(), ch)
- NioSocketChannel
- doConnect
- javaChannel().socket().bind
- javaChannel().connect
- doWrite
- ChannelConfig#getWriteSpinCount:Returns the maximum loop count for a write operation
- doReadBytes
- Read bytes into the given ByteBuf and return the amount
- doConnect
- AbstractChannel
- Unsafe
- Unsafe接口实际上是Channel接口的辅助接口
- 实际的I/O读写操作都是由Unsafe接口负责完成的
- AbstractChannel$AbstractUnsafe
- register
- bind
- disconnect
- doDisconnect() might have closed the channel
- close
- 从disconnect和close的实现来看,close做的工作更多,如首先判断是否处于刷新状态,如果处于刷新状态说明还有消息尚未发送出去,需要等到所有消息发送完成再关闭链路等
- write
- 实际上将消息添加到环形发送数组中
- flush#flush0#doWrite
- AbstractNioUnsafe
- NioByteUnsafe
- read
- AdaptiveRecvByteBufAllocator,缓冲区大小可以动态调整的ByteBuf分配器
- Netty根据上次实际读取的码流大小对下次的接收Buffer缓冲区进行预测和调整,能够最大限度地满足不同行业的应用场景
- 其根据本次读取的实际字节数对下次接收缓冲区的容量进行动态调整
- doReadBytes
- 完成一次异步读之后,就会触发一次ChannelRead事件
- 在没有做仟何半包处理的情况下,以 ChannelRead的触发次数做计数器来进行性能分析和统计,是完全错误的
- 连续读操作做上限控制,默认值为16次,无论TCP缓冲区有多少码流需要读取,只要连续16次没有读完,都需要强制退出,等待下次selector轮询周期再执行
- 完成多路复用器本轮读操作之后,触发ChannelReadComplete事件
ChannelPipeline和ChannelHandler
- Netty的 Channel过滤器实现原理与ServletFilter机制一致,它将Channel的数据管道抽象为 ChannelPipeline,消息在 ChannelPipeline 中流动和传递。ChannelPipeline 持有 I/O
事件拦截器ChannelHandler的链表,由ChannelHandler对 I/O 事件进行拦截和处理,可以
方便地通过新增和删除ChannelHandler来实现不同的业务逻辑定制,不需要对已有的ChannelHandler进行修改,能够实现对修改封闭和对扩展的支持 - ChannelPipeline 的事件处理
- 底层的SocketChannel read方法读取 ByteBuf,触发 ChannelRead 事件,由 I/O线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead(Object msg)方法,将消息
(ByteBuf) 传输到 ChannelPipeline中 - 消息依次被 HeadHandler、ChannelHandlerl、ChannelHandler2 ... TailHandler 拦
截和处理,在这个过程中,任何ChannelHandler都可以中断当前的流程,结束消息的传递 - 调用ChannelHandlerContext的 write方法发送消息,消息从TailHandler开始,途经 ChannelHandlerN ... ChannelHandlerl、HeadHandler, 最终被添加到消息发送缓冲区中等待刷新和发送,在此过程中也可以中断消息的传递
- Netty中的事件分为inbound事件和outbound事件。inbound事件通常由I/O 线程触发
- ChannelHandlerContext#fireChannelRegistered
- ChannelHandlerContext#fireChannelActive
- ChannelHandlerContext#fireChannelRead
- ChannelHandlerContext#fireChannelReadComplete
- ChannelHandlerContext#fireExceptionCaught
- ChannelHandlerContext#fireUserEventTriggered
- ChannelHandlerContext#fireChannelWritabilityChanged
- ChannelHandlerContext#fireChannelInactive
- 举例如DefaultChannelHandlerContext#fireChannelRead
- findContextInbound(MASK_CHANNEL_READ)
- Inbound
- findContextInbound(MASK_CHANNEL_READ)
- 举例如DefaultChannelHandlerContext#fireChannelRead
- Outbound事件通常是由用户主动发起的网络I/O操作
- bind、connect、write、flush、read、disconnect、close
- 举例如DefaultChannelHandlerContext#write
- findContextOutbound(MASK_WRITE)
- Outbound
- findContextOutbound(MASK_WRITE)
- 举例如DefaultChannelHandlerContext#write
- bind、connect、write、flush、read、disconnect、close
- 底层的SocketChannel read方法读取 ByteBuf,触发 ChannelRead 事件,由 I/O线程 NioEventLoop 调用 ChannelPipeline 的 fireChannelRead(Object msg)方法,将消息
- 自定义拦截器
- 通常ChannelHandler只需要继承 ChannelHandlerAdapter类覆盖自己关心的方法即可
- 构建pipeline
- 使用ServerBootstrap或者Bootstrap启动服务端或者客户端时,Netty会为每个Channel连接创建个独立的pipeline
- 对于使用者而言,只需要将自定义的拦截器加入到pipeline中即可
- 对于类似编解码这样的ChannelHandler,它存在先后顺序
- Pipeline支持指定位置添加或者删除拦截器
- ChannelPipeline源码分析
- ChannelPipeline支持运行期动态修改,线程安全
- 其内部维护里一个链表(DefaultChannelHandlerContext.next/prev)和name2ctx的Map
- ChannelPipeline 的 inbound 事件
- pipeline中以fireXXX命名的方法都是从I/O线程流向用户业务Handler的inbound事件
- head.fireXXX 调用HeadHandler对应的fireXXX方法
- DefaultChannelHandlerContext#fireXXX
- 执行相关逻辑
- head.fireXXX 调用HeadHandler对应的fireXXX方法
- pipeline中以fireXXX命名的方法都是从I/O线程流向用户业务Handler的inbound事件
- ChannelPipeline的outbound 事件
- 由用户线程或者代码发起的I/O操作被称为outbound事件
- Pipeline本身并不直接进行 I/O 操作,在前面对Channel和Unsafe的介绍中我们知道
最终都是由Unsafe和Channel 来实现真正的 I/O操作的。 Pipeline负责将 I/O 事件通TailHandler 行调度和传播,最终调用Unsafe的I/O方法进行I/O操作 - 整体由NioEventLoop进行驱动
- ChannelHandler
- 基于ChannelHandler接口,用户可以方便地进行业务逻辑定制
- ChannelHandler 支持注解,Shareble和Skip(被Skip注解的方法不会被调用)
- ChannelHandlerAdapter
- 所有方法都被加了@Skip注解,这些方法在执行的过程中会被忽略,直接跳到下一个ChannelHandler中执行对应的方法
- ByteToMessageDecoder、MessageToMessageDecoder
- LengthFieldBasedFrameDecoder
- lengthFieldOffset、lengthFieldLength、lengthAdjustment、initialBytesToStrip
- the offset of the length field
- the length of the length field
- the compensation value to add to the value of the length field
- the number of first bytes to strip out from the decoded frame
- lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength
- frameLength = getUnadjustedFrameLength 根据offset和fieldLen获取帧长度
- frameLength += lengthAdjustment + lengthFieldEndOffset
- int actualFrameLength = frameLengthInt - initialBytesToStrip
- 可以看到这个lengthAdjustment是个加法
- in some protocols, the length field represents the length of the whole message, including the message header
- 详见LengthFieldBasedFrameDecoder的javadoc注释和源代码
- lengthFieldOffset、lengthFieldLength、lengthAdjustment、initialBytesToStrip
- MessageToByteEncoder
- MessageToMessageEncoder
- LengthFieldPrepender
- An encoder that prepends the length of the message
- lengthIncludesLengthFieldLength,消息长度将包含长度本身占用的字节数
- length = msg.readableBytes() + lengthAdjustment
- if (lengthIncludesLengthFieldLength) length += lengthFieldLength
EventLoop和EventLoopGroup
- Reactor单线程模型,是指所有的I/O操作都在同一个NIO线程上面完成
- 由于Reactor模式使用的是异步非阻塞I/O, 所有的I/O操作都不会导致阻塞,理论上一个线程可以独立处理所有I/O相关的操作
- Reactor多线程模型与单线程模型最大的区别就是有一组NIO线程来处理I/O操作
- 有专门一个NIO线程 — Acceptor线程用于监听服务端
- 网络I/O操作读、写等由一个NIO线程池负责
- 1个NIO线程可以同时处理N条链路,但是1个链路只对应一个NIO线程
- 1个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题
- 主从Reactor多线程模型
- 服务端用于接收客户端连接的不再是一个单独的NIO线程,而是一个独立的NIO线程池
- 最佳实践
- 创建两个NioEventLoopGroup,用于逻辑隔离NIOAcceptor和 NIO线程
- 尽量不要在ChannelHandler中启动用户线程(解码后用于将POJO消息派发到后端业务线程的除外)
- 解码要放在NIO线程调用的解码Handler中进行
- 如果业务逻辑操作非常简单,没有复杂的业务逻辑计算,没有可能会导致线程被阻塞的磁盘操作、数据库操作、网路操作等,可以直接在NIO线程上完成业务逻辑编排,不需要切换到用户线程
- 如果业务逻辑处理复杂,不要在NIO线程上完成,建议将解码后的POJO消息封装成Task, 派发到业务线程池中由业务线程执行,以保证NIO线程尽快被释放,处理其他的I/O操作
- NioEventLoop 设计原理
- 并不是一个纯粹的I/O线程,它除了负责I/O的读写之外
- 通过调用 NioEventLoop 的 execute(Runnable task)方法实现,Netty 有很多系统Task, 创建它们的主要原因是:当 I/O 线程和用户线程同时操作网络资源时,为了防止并发操作导致的锁竞争,将用户线程的操作封装成Task放入消息队列中,由I/O 线程负责执行,这样就实现了局部无锁化
- 通过调用NioEventLoop的schedule处理定时任务
- NioEventLoop中的run方法是SingleThreadEventExecutor中定义的抽象方法
- NioEventLoop
- 作为NIO框架的Reactor线程,NioEventLoop需要处理网络I/O读写事件,因此它必须聚合一个多路复用器对象
- rebuildSelector
- 解决the infamous epoll 100% CPU bug
- 在某个周期(例 如 100ms) 内如果连续发生JV次空轮询,说明触发了 JDK NIO的epoll()死循环bug
- 处理完I/O事件之后,NioEventLoop需要执行非I/O操作的系统Task和定时任务
- 为了保证两者都能得到足够的CPU时间被执行,Netty提供了I/O比例供用户定制
- setIoRatio
- The default value is{@code 50}, which means the event loop will try to spend the same amount of time for I/O as for non-I/O tasks
Future 和 Promise
- Netty强烈建议直接通过添加监听器的方式获取I/O操作结果
- 当 I/O操作完成之后,I/O线程会回调ChannelFuture中 GenericFutureListener的operationComplete方法,并把ChannelFuture对象当作方法的入参
- 异步I/O 操作有两类超时:一个是TCP层面的I/O 超时,另一个是业务逻辑层面的操作超时
- Promise是可写的Future,Future自身并没有写操作相关的接口,Netty通 过Promise对Future进行扩展,用于设置I/O操作的结果
- 循环判断的原因是防止线程被意外唤醒导致的功能异常(虚假唤醒)
- 由于在I/O线程中调用Promise的await或者sync方法会导致死锁
- checkDeadLock
Netty架构剖析
- Netty采用了典型的三层网络架构进行设计和开发
- Reactor通信调度层
- 职责链ChannelPipeline
- 业务逻辑编排层(Service ChannelHandler)
- 关键架构质量属性
- 高性能
- 采用异步非阻塞的I/O类库,基于Reactor模式
- TCP接收和发送缓冲区使用直接内存代替堆内存
- 支持通过内存池的方式循环利用ByteBuf
- 可配置的I/O 线程数、TCP参数
- 采用环形数组缓冲区实现无锁化并发编程
- 合理地使用线程安全容器、原子类等
- 关键资源的处理使用单线程串行化
- 通过引用计数器及时地申请释放不再被引用的对象
- 可靠性
- 链路有效性检测(读空闲超时机制、写空闲超时机制)
- 内存保护机制
- 优雅停机
- 优雅停机往往需要设置个最大超时时间T,如果达到T后系统仍然没有退出,则通过
Kill - 9 pid强杀当前的进程
- 优雅停机往往需要设置个最大超时时间T,如果达到T后系统仍然没有退出,则通过
- 可定制性
- 责任链模式、基于接口的开发、提供了大量工厂类、提供了大量的系统参数供用户按需设置
- 可扩展性
- 可以方便地进行应用层协议定制
- 高性能
Java 多线程编程在 Netty中的应用
- Java内存模型
- 工作内存和主内存
- Java内存交互协议
- 对于SUN的 JDK,在 Windows和 Linux操作系统上采用了内核线程的实现方式
- 这种线程由内核来完成线程切换,内核通过线程调度器对线程进行调度,并负责将线程任务映射到不同的处理器上
- Netty的并发编程实践
- 对共享的可变数据进行正确的同步-synchronized
- 锁的范围需要尽可能的小
- 正确使用锁
- 始终使用wait循环来调用wait方法,永远不要在循环之外调用wait方法。这样做的原因是尽管并不满足被唤醒条件,但是由于其他线程调用notifyAlIO方法会导致被阻塞线程意外唤醒,此时执行条件并不满足
- volatile的正确使用
- 线程可见性
- 禁止指令重排序优化
- volatile最适合使用的是个线程写,其他线程读的场合
- CAS指令和原子类
- 悲观锁
- 乐观锁。简单地说,就是先进行操作,操作完成之后再判断操作是否成功,是否有并发问题,如果有则进行失败补偿,如果没有就算操作成功--CAS自旋
- sun.misc.Unsafe
- AtomicIntegerFieldUpdater
- 线程安全类的应用
- ConcurrentLinkedQueue
- JDK的线程安全容器底层采用了 CAS、volatile和 ReadWriteLock实现,相比于传统
重量级的同步锁,采用了更轻量、细粒度的锁,因此,性能会更高 - Netty对 JDK的线程池进行了封装和改造,但是,本质上仍然是利用了线程池和线程安全队列简化了多线程编程
- 读写锁的应用
- HashedWheelTimer
- 线程安全性文档说明
- 在 Netty中,对于一些关键的类库,给出了线程安全性的API DOC
- 不要依赖线程优先级
- Netty中默认的线程工厂实现类,开放了包含设置线程优先级字段的构造函数。这是
个错误的决定.实际上JDK的线程优先级是无法跨平台正确运行的
- Netty中默认的线程工厂实现类,开放了包含设置线程优先级字段的构造函数。这是
- 对共享的可变数据进行正确的同步-synchronized
高性能之道
- I/O 通信性能三原则
- 传输、协议、线程
- Netty
- 异步非阻塞通信
- 高效的Reactor线程模型
- 个人认为采用第三种所谓主从模型的话,则需要绑定多个端口,每一个端口与一个boss thread绑定
- 实际bind的时候才会创建NioServerSocketChannel
- 个人认为采用第三种所谓主从模型的话,则需要绑定多个端口,每一个端口与一个boss thread绑定
- 无锁化的串行设计
- 高效的并发编程
- 高性能的序列化框架
- 零拷贝
- Netty的接收和发送ByteBuffer采用DIRECT BUFFERS
- 第二种“零拷贝”的实现CompositeByteBuf, 它对外将多个ByteBuf封装成1个ByteBuf
- 很多操作系统直接将文件缓冲区的内容发送到目标Channel中,而不需要通过循环拷贝的方式
- 内存池
- PooledByteBufAllocator#DEFAULT#directBuffer
- 使用内存池分配器创建直接内存缓冲区
- 过 RECYCLER的 get方法循环使用ByteBuf对象,如果是非内存池实现,则直接创建 一个新的ByteBuf对象
- setRefCnt方法设置引用计数器
- Unpooled#directBuffer
- 使用非堆内存分配器创建的直接内存缓冲区
- PooledByteBufAllocator#DEFAULT#directBuffer
- 灵活的TCP参数配置能力
可靠性
- 网络通信类故障
- 客户端连接超时
- ChannelOption.CONNECT_TIMEOUT_MILLIS
- 设置完连接超时之后,Netty在发起连接的时候,会根据超时时间创建ScheduledFuture
挂载在Reactor线程上,用于定时监测是否发生连接超时- AbstractNioUnsafe#connect
- 如果在超时期限内处理完成连接操作,则取消连接超时定时任务
- 通信对端强制关闭连接
- 强制关闭客户端,服务端己经监控到客户端强制关闭了连接,释放了连接句柄
- I/O异常被统一处理,该异常向上抛,由NioByteUnsafe进行统一异常处理
- 链路关闭
- 己方或者对方主动关闭链接并不属于异常场景,因此不会产生Exception事件通知
Pipeline
- 己方或者对方主动关闭链接并不属于异常场景,因此不会产生Exception事件通知
- 强制关闭客户端,服务端己经监控到客户端强制关闭了连接,释放了连接句柄
- 定制I/O故障
- 客户端的断连重连机制
- 消息的缓存重发
- 接口日志中详细记录故障细节
- 运维相关功能,例如告警、触发邮件/短信等
- Netty的处理策略是发生I/O 异常,底层的资源由它负责释放,同时将异常堆找信息
以事件的形式通知给上层用户,由用户对异常进行定制
- Netty的处理策略是发生I/O 异常,底层的资源由它负责释放,同时将异常堆找信息
- 链路的有效性检测
- 心跳检测的目的就是确认当前链路可用,对方活着并且能够正常接收和发送消息
- 心跳检测机制
- Ping-Pong型心跳:由通信一方定时发送Ping消息,对方接收到Ping消息之后,立即返回Pong应答消息给对方,属于请求-响应型心跳
- Ping-Ping型心跳:不区分心跳请求和应答,由通信双方按照约定定时向对方发送心跳Ping消息,它属于双向心跳
- 连续N次心跳检测都没有收到对方的Pong应答消息或者Ping请求消息,则认为
链路己经发生逻辑失效,这被称作心跳超时 - 读取和发送心跳消息的时候如何直接发生了IO异常
- 连续N次心跳检测都没有收到对方的Pong应答消息或者Ping请求消息,则认为
- Netty的心跳检测实际上是利用了链路空闲检测机制实现的
- 读空闲
- 写空闲
- 读写空闲
- 链路空闲的时候并没有关闭链路,而是触发IdleStateEvem事 件 ,用户订阅IdleStateEvent事件,用于自定义逻辑处理
- Reactor线程的保护
- 循环体内一定要捕获Throwable
- 规避NIO BUG
- 内存保护
- 链路总数的控制:每条链路都包含接收和发送缓冲区,链路个数太多容易导致内
存溢出 - 单个缓冲区的上限控制
- 缓冲区内存释放
- NIO消息发送队列的长度上限控制
- 缓冲区的内存泄漏保护
- 为了提升内存的利用率,Netty提供了内存池和对象池.为了防止因为用户遗漏导致内存泄漏,Netty在 Pipeline的尾Handler中自动对内存进行释放
- 实际的商用环境中,如果遇到畸形码流攻击、协议消息编码异常、消息丢包等问题时,可能会解析到一个超长的长度字段
- 流量整形
- Netty流量整形的原理是:对每次读取到的ByteBuf可写字节数进行计算,获取当前的报文流量,然后与流量整形阈值对比。如果已经达到或者超过了阈值。则计算等待时间delay, 将当前的ByteBuf放到定时任务Task中缓存,由定时任务线程池在延迟delay之后继续处理该ByteBuf
- 用户可以通过参数设置:报文的接收速率、报文的发送速率、整形周期
- Netty也支持链路级的流量整形
- 缓冲区的内存泄漏保护
- 链路总数的控制:每条链路都包含接收和发送缓冲区,链路个数太多容易导致内
- 优雅停机接口
- 客户端连接超时
- 优化建议
- 发送队列容量上限控制
- 如果网络对方处理速度比较慢,导致TCP滑窗长时间为0 ; 或者消息发送方发送速度
过快,或者一次批量发送消息量过大,都可能会导致ChannelOutboundBuffer的内存膨胀,
这可能会导致系统的内存溢出 - 过启动项的ChannelOption设置发送队列的长度或者通过-D 启动参数配置该长度
- 如果网络对方处理速度比较慢,导致TCP滑窗长时间为0 ; 或者消息发送方发送速度
- 回推发送失败的消息
- Mina的实现,当发生链路异常之后,Mina会将尚未发送的整包消息队列封装到异常对象中,然后推送给用户Handler, 由用户来决定后续的处理策略
- 发送队列容量上限控制
安全性
* Netty通过Ssmandler提供了对SSL的支持,它支持的SSL协议类型包括:SSLV2、SSLV3 和 TLS
* SSL单向认证
* SSL双向认证
* 第三方CA认证
* Netty扩展的安全特性
* IP地址黑名单机制
* 接入认证
展望
- 属性配置
- 获取见:SystemPropertyUtil
- 如:io.netty.noUnsafe、io.netty.eventLoopThreads(默认cpu个数*2)