从JDK 1.4开始支持NIO编程,虽然目前用Netty的人占大部分,但是我们还是要先了解下Java的NIO是怎么实现的,今天就来一探究竟
示例代码:https://github.com/lhj502819/VariousCases/tree/main/CaseForNetty/src/main/java/cn/znnine/netty/bio
<a name="UKITN"></a>
Buffer、Channel、Selector的关系
- 每个Channel都会对应一个Buffer
- Selector对应一个线程,一个线程对应多个Channel
- 该图反映了有三个channel注册到该selector上
- 服务端线程切换到哪个channel是由事件决定的
- selector会根据不同的事件,在各个通道上切换
- Buffer就是一个内存块,底层是一个数组
- 数据的读取、写入通过Buffer,BIO中的要么是输入流,要么是输出流,不能双向,但是NIO的Buffer既可以读也可以写,是双向的,需要flip方法切换
- channel是双向的,可以返回底层操作系统的情况,比如Linux,底层的操作系统通道就是双向的
<a name="ncuK6"></a>
缓冲区Buffer
Buffer是一个对象,里面是要写入或者要读出的数据,在NIO库中,所有的数据都是用缓冲区处理的。<br />在读取数据时,它是直接读到缓冲区中的;在写入数据时,写入到缓冲区中,任何时候访问Channel中的数据,都是通过缓冲区进行操作的。<br />缓冲区实质上是一个数组,通常是一个字节数组ByteBuffer,还有其他的:<br />
- ByteBuffer:字节缓冲区
- CharBuffer:字符缓冲区
- ShortBuffer:短整型缓冲区
- IntBuffer:整型缓冲区
- LongBuffer:长整型缓冲区
- FloatBuffer:浮点型缓冲区
- DoubleBuffer:双精度浮点型缓冲区
<a name="qmfQT"></a>
原理解析
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
- capacity,容量,Buffer能容纳的元素数量最大值,在Buffer被创建的时候赋值,永远不能被修改
Buffer分为读、写模式,如下图所示
两种模式下,position和limit的含义不同
- position:表示位置,初始值为0
- 读模式下,每往Buffer写一个值,position就+1,代表下一次写入的位置
- 写模式下,每从Buffer中读一个值,position就+1,代表下一次读的位置
- limit:表示读/写的上限
- mark:标记作用,通过
mark()
方法记录当前position,通过reset方法,恢复position为标记- 写模式下,标记上一次写的位置
- 读模式系,标记上一次读的位置
- 四个值的大小关系(源码中的注释):mark <= position <= limit <= capacity
<a name="d7mNX"></a>
创建Buffer的方式
<a name="Pb8HG"></a>
allocate(int capacity)
每个Buffer实现类都提供了allocate(int capacity)
静态方法,帮助我们快速实例化一个Buffer对象,以ByteBuffer举例,代码如下:
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
//实际创建的是基于堆内(No-Direct)内存的实现类
return new HeapByteBuffer(capacity, capacity);
}
<a name="rW2HA"></a>
wrap(array)
每个Buffer实现类都提供了wrap(array)
静态方法,帮助我们将其对应的数组包装成一个Buffer对象
public static ByteBuffer wrap(byte[] array,
int offset, int length)
{
try {
//同样是返回基于堆内(No-Direct)内存的实现类
return new HeapByteBuffer(array, offset, length);
} catch (IllegalArgumentException x) {
throw new IndexOutOfBoundsException();
}
}
<a name="fQEVo"></a>
allocateDirect(int capacity)
用来快速实例化一个Buffer对象
public static ByteBuffer allocateDirect(int capacity) {
//返回基于堆外内存的实现类
return new DirectByteBuffer(capacity);
}
<a name="VnQqA"></a>
DirectBuffer和Non-Direct Buffer的区别
<a name="M3zU4"></a>
DirectBuffer
- 所分配的内存不在JVM堆上,不受GC的管理(但是Direct Buffer的Java对象是由GC管理的,因此当发生GC,对象被回收时,Direct Buffer也会被释放)
- 因此不在JVM堆上分配,因此JVM不好统计到非JVM管理的内存,但实际还是会占用JVM的内存
- 申请和释放DirectBuffer的开销比较大,因此正确的使用方式是在初始化的时候申请一个Buffer,然后不断的复用,在程序结束后才释放
- 使用DirectBuffer时,当运行一些底层的系统IO操作时,效率会比较高,因为JVM不需要拷贝buffer中的数据到中间临时缓冲区中
<a name="MC8Zx"></a>
Non-Direct Buffer
- 直接在JVM堆上进行内存的分配,本质上是byte[]数组的封装
- 因为Non-Direct Buffer在JVM堆中,因此当进行操作系统底层IO操作时,会将此buffer中的数据复制到临时缓冲区中,因此这种情况下Non-Direct Buffer的效率就比较低
<a name="Nab6j"></a>
向Buffer写数据
通过put
方法向Buffer写数据,每个Buffer实现类都提供了此方法,由于Buffer要与Channel交互,我们需要将Channel的数据写入Buffer中,是从Channel中读出来,写到Buffer。
//该方法会返回从Channel中写入到Buffer的数据大小
int num = channel.read(buf)
通常在说NIO的读操作的时候,说的是从Channel中读数据到Buffer中,对应的是对Buffer的写操作
<a name="Q0Ki4"></a>
从Buffer读数据
通过get
方法从Buffer读数据,每个Buffer实现类都有对应实现,与写数据类似,由于Buffer要与Channel交互,我们需要将Buffer的数据写入Channel中,是从Buffer中读出来,写到Channel中。
//该方法会返回向Channel中写入Buffer的数据大小
int num = channel.write(buffer)
<a name="AsBn7"></a>
flip读写模式切换
如果要读取Buffer中的数据,需要切换模式,从写模式切换到读模式,使用#flip()
方法,源码如下
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
使用示例如下:
//Read the data
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//从Channel中读数据写到Buffer中
int readBytes = sc.read(readBuffer);
if (readBytes > 0){
//调用flip方法切换为读模式
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
}
<a name="LP7XG"></a>
rewind()
重置position
的值为0,因此可以重新读取和写入Buffer了,并不会重置limit,可以重新读和写,主要针对读模式。
/**
*与flip的区别就是少了将position赋值给limit
*/
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
<a name="PbEEF"></a>
clear()
并不会清除Buffer的内容,只是将position和limit重置为写状态,读也是可以的,但主要是针对写模式。
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
<a name="rwWci"></a>
缺点
- 长度固定,一旦分配完成,容量不能动态扩展和收缩,当需要编码的POJO对象大于ByteBuffer的容量时,会发生索引越界异常
- 只有一个标识位置的指针position,读写的时候需要手动调用flip()和rewind()等,使用者必须小心谨慎地使用这些API,否则容易导致程序处理失败
- API功能有限,一些高级和实用的特性不支持,需要使用者自己实现
<br />
<a name="d083U"></a>
通道Channel
Channel就像是自来水管,网络数据通过 Channel读取和写入,通道与流的不同之处在于通道是双向的,流只是在一个方向上移动(一个流必须是InputStream或者OutputStream的子类),而通道可以用于读、写或者二者同时进行。<br />Channel需要与Buffer配合使用,从Buffer中读数据写入Channel,从Channel读数据写入Buffer。
public interface Channel extends Closeable {
//判断通道是否处于打开状态
public boolean isOpen();
//关闭通道
public void close() throws IOException;
}
<a name="R2MUr"></a>
Channel最重要的四个实现类
- SocketChannel:一个客户端用来发起TCP的Channel
- ServerSocketChannel:一个服务端用来监听新进来的连接的TCP的Channel,对于每一个新进来的连接,都会对应创建一个SocketChannel
- DatagramChannel:通过UDP读写数据
- FileChannel:从文件中,读写数据
<a name="hM7WN"></a>
示例
我们以FileChannel举例,读取文件的内容写入到另一个文件中,代码如下:
public class NioFileChannelDemo {
public static void main(String[] args) throws Exception {
try
(FileInputStream fileInputStream = new FileInputStream("E:\\workspeace\\VariousCases\\CaseForNetty\\src\\main\\java\\cn\\znnine\\netty\\nio\\java\\demo\\1.txt");
FileOutputStream fileOutputStream = new FileOutputStream("E:\\workspeace\\VariousCases\\CaseForNetty\\src\\main\\java\\cn\\znnine\\netty\\nio\\java\\demo\\2.txt");) {
//从输入流中获取对应的Channel
FileChannel inputStreamChannel = fileInputStream.getChannel();
//从输出流中获取对应的Channel
FileChannel fileOutputStreamChannel = fileOutputStream.getChannel();
//创建一个空的Buffer,分配容量为5
ByteBuffer buffer = ByteBuffer.allocate(5);
while (true) {
//从输入Channel中读数据写入Buffer中
int read = inputStreamChannel.read(buffer);
//如果读到的长度的是-1.说明读到末尾了
if (read == -1) {
break;
}
//切换成读模式
buffer.flip();
//将Buffer的内容读出来写入到输出Channel中
fileOutputStreamChannel.write(buffer);
//每读完一批就清空一次Buffer,为下一次写做好准备,因为position是capacity,再读就读不进来了
buffer.clear();
}
} catch (IOException exception) {
exception.printStackTrace();
}
}
}
<a name="PqohT"></a>
多路复用器Selector
Selector被称为选择器,Selector会不断地轮询注册在其上的Channel
,如果某个Channel
上面发生读或者写事件,这个Channel
就处于就绪状态,会被Selector
轮询出来,然后通过SelectionKey
可以获取到就绪Channel
的集合,进行后续的I/O操作。<br />一个多路复用器Selector可以同时轮询多个Channel,JDK使用了epoll()代替传统的select实现,所以并没有最大连接句柄的限制,意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端<br />
<a name="pdbbg"></a>
优点
使用一个线程能够处理多个Channel的优点是,只需要很少的线程来处理多个Channel,当然也可以使用一个线程处理所有的Channel,如果使用多个线程的话,线程之间的切换开销很大,并且也会占用系统资源
<a name="uAxNJ"></a>
缺点
优点是使用很少的线程处理大量的Channel,那缺点肯定是处理的效率降低了,就好比一个人做一件事和一个人做N件事,肯定是一个人做一件事快。
<a name="ro4HU"></a>
如何将Channel注册到Selector上?
只有将Channel注册到Selector之后才能被Selector轮询,注册代码如下:
//创建Reactor线程,创建多路复用器并启动线程
selector = Selector.open();
//打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道,是SelectableChannel(负责网络读写)的子类
serverChannel = ServerSocketChannel.open();
//设置Channel为非阻塞模式
serverChannel.configureBlocking(false);
//绑定端口
serverChannel.socket().bind(new InetSocketAddress(port), 1024);
//将Channel注册到selector上
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
serverChannel.register(selector, SelectionKey.OP_ACCEPT)
第二个参数表示一个“inerest集合”,意思是通过Selector监听Channel时,对哪些事件感兴趣,可以是多个,可以监听的事件类型如下,同时可以通过再次调用register方法来改变感兴趣的事件。
- CONNECT:连接完成事件,仅适用于客户端
- ACCEPT:接受新连接事件,仅适用于服务端
- READ:读事件,适用于两端,表示可读
- WRITE:写事件,适用于两端,表示可写
- 一个客户端Channel成功连接到另一个服务器,称为连接就绪
- 一个ServerSocketChannel准备好接收新进入的连接,称为接收就绪
- 一个有数据可读的Channel,是读就绪
- 一个等待写数据的Channel,是写就绪
<a name="ILFYl"></a>
SelectionKey
SelectionKey是一个抽象类,表示一个Channel和一个Selector的关系。
<a name="owd29"></a>
API
-
#channel
:返回绑定的Channel -
#selector
:获取绑定的Selector -
#interestOps
:获取感兴趣的事件集合 -
#readyOps
:获取就绪的事件集合
<a name="Mt8ho"></a>
获取方式
-
serverChannel.register
,将Channel注册到Selector后会返回Channel
和Selector
的SelectionKey - 通过Selector可以获取,
Selector#selectedKeys()
,可以获取到当前Selector上注册的所有的SelectionKey
<br />
<a name="CQS7G"></a>
使用示例
<br />详细代码我已经上传到Github:https://github.com/lhj502819/VariousCases/tree/main/CaseForNetty/src/main/java/cn/znnine/netty/nio/java
<a name="BYwdw"></a>
其中重要的类有MultiplexerTimeServer、JavaTimeClientHandler
,代码很容易读懂,大家有问题可以私聊问我
NIO编程的优点
- 客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞
- SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他链路,不需要同步等待这个链路可用
- 线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可用同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降。因此非常适合做高性能、高负载的网络服务器。