推荐阅读:美团技术团队对NIO的浅析
众所周知,Netty是基于JAVA NIO 而封装的网络通讯框架。
官网介绍:Netty is an asynchronous event-driven network application framework
for rapid development of maintainable high performance protocol servers & clients.
本人打着想重新梳理一遍Netty的知识体系的心态,来写Netty挖掘机这一系列的文章。我相信一步步深入到源码后,会有收获的!
什么是阻塞和非阻塞、同步和异步IO
一个IO操作分为先发起IO请求、再进行实际的IO操作
- 阻塞、非阻塞IO:<font color=red>发起IO请求</font>时线程是否会阻塞直到完成
- 同步、异步IO:<font color=red>真正IO实际读写操作</font> 过程 是否会阻塞请求线程
同步需要等待(线程阻塞时)或轮询(线程非阻塞时)内核完成IO操作
异步是系统等内核完成IO操作后(被调用者)再主动通知程序(通知 或者 回调)
同步阻塞:发起一个IO请求时,线程阻塞,直到等待实际的内核IO读写操作完成才放开线程
同步非阻塞:发起一个IO请求时,线程非阻塞,但是线程需要去定时轮询内核IO操作是否完成
异步非阻塞:发起一个IO请求时,线程非阻塞,不等待内核IO操作完成,也不用进程花费CPU资源去轮询,而是系统(被调用者)主动通知程序(调用者)
为什么说NIO是同步非阻塞?
NIO它的实际内核I/O操作(读read、写write、接受accept)会阻塞请求线程,所以是同步的;但是它的发起请求这一步,不会导致线程阻塞,而会通过多路复用器select进行轮询获取实际内核I/O操作完成的信息
什么是BIO?
即面向流的同步阻塞IO, Client、Server基于输入流、输出流进行通信
单线程BIO
多线程版BIO
典型的C/S模型。它是由Server开启accept线程进行监听Client的连接请求,一个请求对应创建一个新的线程进行处理,并通过输出流返回给客户端。
优点:支持同一时间内多个客户端并行请求服务端
缺点:不具备弹性伸缩能力。当面对海量连接时,意味着线程数膨胀,与其同时造成的是系统性能的急剧下降(因为线程也是宝贵的系统资源),进而会发生句柄溢出、线程堆栈溢出,甚至造成服务器宕机。
代码示例:
public void start(int port) throws IOException {
ServerSocket server = new ServerSocket(port);
System.out.println("# Server start,the port:" + port);
System.out.println("### start listener client connect...");
Socket client = server.accept();
System.out.println("### now accept one client connect, start handler...");
new Thread(() -> {
handler(client);
}).start();
}
private void handler(Socket client) {
try {
InputStream stream = client.getInputStream();
while (true) {
byte[] bytes = new byte[1024];
int len = stream.read(bytes);
...
}
} catch (IOException e) {
e.printStackTrace();
}
}
线程池版BIO
在多线程的基础上,增加了线程池的概念,可以避免线程频繁创建。
线程池本身就是一个天然的漏斗,可以在特殊情况下解决一些系统无法处理的问题
然而还是逃不开一个事实:把线程当做命根子。。。
public void start(int port) {
// 创建容量为100的线程池
ExecutorService executor = Executors.newFixedThreadPool(100);
try {
ServerSocket server = new ServerSocket(port);
System.out.println("# Server start,the port:" + port);
while (! Thread.currentThread().isInterrupted()) {// 循环等待新连接
Socket client = server.accept();
executor.submit(new Thread(() -> {
handler(client);
}));
}
} catch (Exception e) {
e.printStackTrace();
}
}
private void handler(Socket client) {
try {
InputStream stream = client.getInputStream();
while (true) {
byte[] bytes = new byte[1024];
int len = stream.read(bytes);
...
}
} catch (IOException e) {
e.printStackTrace();
}
}
什么是NIO
即面向缓冲区的同步非阻塞IO。
从jdk1.4开始,增加了与原标准IO API 不同使用方式的NIO(NO BLOCK IO)
jdk1.5_update10 版本使用 epoll 替代了传统的 select/poll,极大的提升了 NIO 通信的性能。
传输方式
在BIO中使用字节流or字符流来传输数据;而在NIO中使用通道和缓冲区来传输数据
Channel将数据读入Buffer, Buffer将数据写入Channel
核心模块
Channel
JAVA NIO Channel 主要有以下几种即数据通道,不同于单向io(读和写需要不同的通道),这里的通道支持双向,即可从通道内读写数据。
实际Channel与Buffer结合使用:从通道读数据到缓冲区,缓冲区向通道写入数据
- FileChannel 读写文件时用的通道(仅它无法设置非阻塞模式,默认阻塞)
- SocketChannel 客户端传输TCP连接数据时的通道,与JAVA IO中的Socket对应
- DatagramChannel 传输UDP连接数据时的通道,与Java IO中的DatagramSocket对应
- ServerSocketChannel 服务端监听进入的TCP连接的通道,与Java IO中的ServerSocket对应
以下是使用FileChannel 读取文件的Demo
public static void main(String[] args) {
StringBuffer json = null;
try(FileInputStream fis = new FileInputStream("test.json");
FileChannel fc = fis.getChannel()) {
json = new StringBuffer();
ByteBuffer buf = ByteBuffer.allocate(1024);
int br = fc.read(buf);// 读入缓冲区
while (br != -1) {
buf.flip();// 缓冲区读取就绪
while(buf.hasRemaining()) {
json.append((char)buf.get());
}
buf.clear();
br = fc.read(buf);
}
} catch (IOException e) {
e.printStackTrace();
}
System.out.println(json.toString());
}
Buffer
即缓冲区。本质上是一块 可以从中读取数据,或写入数据 的内存,在JAVA NIO中提供了具体的方法来访问内存中的数据
Channel将数据读入Buffer, Buffer将数据写入Channel
Channel读入Buffer:
FileChannel fc = fis.getChannel()
ByteBuffer buf = ByteBuffer.allocate(1024);
int bread = fc.read(buf);
Buffer写入Channel:
// 需要有写入权限,否则会抛异常
// RandomAccessFile fc = new RandomAccessFile("test.json", "rw")
FileChannel fc = fis.getChannel()
ByteBuffer buf = ByteBuffer.allocate(1024);
int bread = fc.write(buf);
主要属性
- Capacity(容量)
- Limit(限制)
- Position(位置)
主要方法
allocate(int capacity) 分配指定容量(字节)的缓冲区.
allocateDirect(int capacity) 分配指定容量的直接缓冲区
flip() 翻转缓冲区,使得limit变为当前位置position,position变为0,常用于Channel读入Buffer或Buffer写入Channel操作后
-
rewind() 倒带缓冲区,将当前位置position设置为0,limit不变,常用于通道读入缓冲区后,要读取缓冲区的数据
举个栗子
StringBuffer json = null; try(FileInputStream fis = new FileInputStream("test.json"); FileChannel fc = fis.getChannel()) { json = new StringBuffer(); long size = fc.size(); ByteBuffer buf = ByteBuffer.allocate((int) size); fc.read(buf); buf.rewind();// 倒带buffer,将position设置为0 for(int i = 0; i < size; i++) {// 遍历buffer 的数据 json.append((char)buf.get()); } } catch (IOException e) { e.printStackTrace(); } System.out.println(json.toString());
clear() 清除缓冲区,将缓冲区置于写入模式
compact() 压缩缓冲区,将缓冲区未读部分复制到缓冲区索引为0的区域,且下一次读入缓冲区时从未读部分的索引+1开始
Selector(多路复用)
即选择器
在java nio中,可以通过Selector用单个线程来管理多个通道。它能够检测n个Nio通道的状态(连接就绪、接收就绪、读就绪、写就绪)
现代操作系统的多任务处理上,开销貌似变得越来越小,但程序上仍要控制使用线程的频率,因为线程的切换开销是很昂贵的
事件模型
我们说NIO 本身是基于事件驱动思想来处理IO的。这里的事件即我们通过Selector注册对应的事件在Channel上。它的事件主要有
连接就绪 SelectionKey.OP_ACCEPT
读就绪 SelectionKey.OP_READ
写就绪 SelectionKey.OP_WRITE
以下文字是摘自美团技术团队 《Java Nio浅析》中的一段
我们首先需要注册当这几个事件到来的时候所对应的处理器。然后在合适的时机告诉事件选择器:我对这个事件感兴趣。对于写操作,就是写不出去的时候对写事件感兴趣;对于读操作,就是完成连接和系统没有办法承载新读入的数据的时;对于accept,一般是服务器刚启动的时候;而对于connect,一般是connect失败需要重连或者直接异步调用connect的时候。
其次,用一个死循环选择就绪的事件,会执行系统调用(Linux 2.6之前是select、poll,2.6之后是epoll,Windows是IOCP),还会阻塞的等待新事件的到来。新事件到来的时候,会在selector上注册标记位,标示可读、可写或者有连接到来。
注意,select是阻塞的,无论是通过操作系统的通知(epoll)还是不停的轮询(select,poll),这个函数是阻塞的。所以你可以放心大胆地在一个while(true)里面调用这个函数而不用担心CPU空转。
流程
-
创建
Selector selector = Selector.open();
-
注册Channel
要注意的是使用的Channel必须设置非阻塞模式,否则会抛出IllegalBlockingModeException异常。
why?
翻看了javadoc 后,找到这么一句:Non-blocking mode is most useful in conjunction with selector-based multiplexing,意思是认为非阻塞模式与基于选择器的多路复用结合使用最为有用。想想当Channel是阻塞模式的话,那多路复用其实也没啥意思了...
以ServerSocketChannel来举栗。
// 打开通道 ServerSocketChannel server = ServerSocketChannel.open(); // 绑定端口 server.bind(new InetSocketAddress(port)); // 设置阻塞模式,false:nio style server.configureBlocking(false); // 创建选择器 selector = Selector.open(); // 使用给定的选择器注册此通道 server.register(selector, SelectionKey.OP_CONNECT);
-
轮询获取channel状态
通过调用某个select方法,获取就绪的channel。
int select():获取就绪的key个数,阻塞直到获取至少一个就绪的channel
int select(long timeout):获取就绪的key个数,阻塞直到时间达到指定timeout
int selectNow():非阻塞获取就绪的key个数
通过以上可得到就绪的key个数,再调用selectedKeys获取所有的事件key,迭代获取key对应的通道即可
完整代码如下:
Selector selector;
public static void main(String[] args) throws IOException {
new NioServer(8080).listener();
}
public NioServer(int port) {
try {
// 打开通道
ServerSocketChannel server = ServerSocketChannel.open();
// 绑定端口
server.bind(new InetSocketAddress(port));
// 设置阻塞模式,false:nio style.true:oio style.
server.configureBlocking(false);
// 创建选择器
selector = Selector.open();
// 使用给定的选择器注册此通道
server.register(selector, SelectionKey.OP_CONNECT);
System.out.println("服务端已启动,端口" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listener() throws IOException {
for(;;) {
// 阻塞,获取已就绪的key个数
int wait = selector.select();
if(wait == 0) continue;
// 获取所有的事件key
Set<SelectionKey> readykeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = readykeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
// 移除已处理数据
iterator.remove();
// 处理逻辑
process(key);
}
}
}
/**
* 业务逻辑方法
* @param key
*/
public void process(SelectionKey key) throws IOException {
// 判断客户端是否已确认连接并且可交互
if(key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
// 设置阻塞模式
client.configureBlocking(false);// 非阻塞
// 注册选择器,设置读模式,告诉client端下次进来便要读取数据
client.register(selector, SelectionKey.OP_READ, ByteBuffer.wrap("HI!\r\n".getBytes("utf-8")));
// // 将此对应的channel设置为准备接受其他客户端的连接请求
// key.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("接受连接来自 "+client);
}
// 处理数据读取请求
if(key.isReadable()) {
// 取数据
// 返回该读key 对应的channel
SocketChannel client = (SocketChannel) key.channel();
// 获取channel内的数据
ByteBuffer buffer = ByteBuffer.allocate(1024);
StringBuilder content = new StringBuilder();
try {
while (true) {
if (!(client.read(buffer) > 0)) break;
// 翻转缓冲区
buffer.flip();
content.append(buffer.toString());
}
// 将channel设置为准备下一次读取
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
key.cancel();
if(key.channel() != null) key.channel().close();
}
}
}
-
scatter/gather I/O (也叫Vectored I/O)
Channel内置scatter/gather I/O功能
-
scatter read:分散读入
-
```java
Charset charset = Charset.forName("UTF-8");
try(FileInputStream fis = new FileInputStream("test.json");
FileChannel fc = fis.getChannel()) {
ByteBuffer headBuf = ByteBuffer.allocate(8);// 头部缓冲区固定大小8字节
long size = fc.size();
ByteBuffer bodyBuf = ByteBuffer.allocate((int) size);// 剩下的归为body
ByteBuffer[] byteBuffers = new ByteBuffer[]{headBuf, bodyBuf};
fc.read(byteBuffers);
headBuf.flip();
System.out.println("head buffer data " + charset.decode(headBuf));
bodyBuf.flip();
System.out.println("body buffer data " + charset.decode(bodyBuf));
} catch (IOException e) {
e.printStackTrace();
}
```
-
gather write:聚集写入
try(RandomAccessFile accessFile = new RandomAccessFile("test.json", "rw"); FileChannel fc = accessFile.getChannel()) { ByteBuffer headBuf = ByteBuffer.allocate(5);// 头部缓冲区固定大小8字节 headBuf.put("hello".getBytes()); ByteBuffer bodyBuf = ByteBuffer.allocate(5);// 剩下的归为body bodyBuf.put("Jerry".getBytes()); ByteBuffer[] byteBuffers = new ByteBuffer[]{headBuf, bodyBuf}; headBuf.flip(); bodyBuf.flip(); long length = fc.write(byteBuffers); System.out.println("buffer data write to channel length " + length); } catch (IOException e) { e.printStackTrace(); }
分散/聚集 IO支持,针对的是通道和buffer间的交互
通道可以将数据分散读入多个缓冲区,多个缓冲区可以将数据聚集写入单个通道
说明 | 特性 |
---|---|
连接输出 | 在内存中写入非顺序放置数据的应用程序可以在一个向量I / O操作中执行此操作。 |
效率 | 一个向量I / O读取或写入可以替换许多普通读取或写入,从而节省系统调用所涉及的开销 |
拆分输入 | 需要单独处理传输数据的各个部分的情况。例如,如果消息由标题和正文组成,则可以将标题和正文保留在单独的缓冲区中。这样做可以使您更容易分别使用标题和正文 |
总结
为什么NIO会替代BIO?
可以从两个角度出发
-
jdk序列化(BIO面向流传输)
- 不支持跨语言工作。即当跨应用调服务时,A应用(Python),B应用(JAVA),此时A应用发送到B应用的<font color='#35b998'>序列化</font>对象,B应用无法对其<font color='#35b998'>反序列化</font>
- 就算是同语言工作。效率低下,即<font color='#35b998'>码流太大</font>,无论是网络传输or持久化到磁盘,会导致额外的系统占用。实际测试远低于ByteBufffer(java.nio包)
- 性能差。占用CPU资源高
-
线程(BIO严重依赖线程工作)
- 创建和销毁的成本高。在Linux系统中,线程本质上是一个轻量级的进程,这种系统级别的开销是挺大的
- 吃内存
- 线程切换的成本高。
- 易造成生产>消费。即外部网络请求的频率远大于线程的创建及销毁。每个TCP连接即占用一个线程,此时当遇到IO读写阻塞导致线程无法及时释放等情况时,会导致性能下降,甚至宕机。