1、Linux IO模型
《Unix网络编程》把I/O模型分成五类
-
阻塞式I/O模型:整个过程都是阻塞的——BIO(java socket)
- 非阻塞式I/O模型:只有从内核空间复制数据时才是阻塞的,从磁盘到内核空间是非阻塞的——NIO
线程发起io请求后,立即返回(非阻塞io)。用户线程不阻塞等待,但是,用户线程要定时轮询检查数据是否就绪,当数据就绪后,用户线程将数据从用户空间写入socket空间,或从socket空间读取数据到用户空间(同步)。
- I/O复用:一次监控一批通道,看哪个通道有数据就返回哪个——NIO(Java)
当用户线程发起io请求后,将socket连接及关注事件注册到selector(多路复用器,os级别线程)上,selector循环遍历socket连接,看是否有关注数据就绪,如果连接有数据就绪后,就通知应用程序,建立线程进行数据读写。同BIO对比,NIO中线程处理的都是有效连接(数据就绪),且一个线程可以分管处理多个连接上的就绪数据,节省线程资源开销
selector:注册的socket事件由数组管理,长度有限制,轮询查找时需要遍历数组。
netty中创建EventLoopGroup时可以选择使用NioEventLoopGroup,NioEventLoopGroup管理的NioEventLoop中使用的就是selector
poll:注册的socket事件由链表实现,数量没有限制,遍历链表轮询查找。
-
信号驱动式I/O:从磁盘拷贝到内核空间(这个过程是非阻塞的)以后,信号通知用户进程,然后从内核空间拷贝用户空间(这个过程是阻塞的)
epoll:基于事件驱动思想,采用reactor模式,通过事件回调,无需使用某种方式主动检查socket状态,被动接收就绪事件即可。
netty中创建EventLoopGroup时可以选择使用EpollEventLoopGroup。
- 异步I/O:整个过程都是异步的,当数据已经拷贝到用户空间以后,才发信号通知进程——AIO(Java)
线程发起io请求后,立即返回(非阻塞io),当数据读写完成后,OS通知用户线程(异步)。这里数据写入socket空间,或从socket空间读取数据到用户空间由OS完成,用户线程无需介入,所以也就不会阻塞用户线程,即异步
Linux IO流程
- 内核空间等待数据准备好(Waiting for the data to be ready)
-
从内核向进程复制数据(copying the data from the kernel to the process)
各种I/O模型的比较
2、 Java BIO(Blocking I/O)
- ServerSocket
- Socket
Demo
package com.demo.io;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.channels.Channel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class MultiThreadedEchoServer {
private int port;
public MultiThreadedEchoServer(int port) {
this.port = port;
}
public void startServer() {
ServerSocket echoServer = null;
Executor executor = Executors.newFixedThreadPool(5);
int i = 0;
System.out.println("服务器在端口[" + this.port + "]等待客户请求......");
try {
echoServer = new ServerSocket(8080);
while (true) {
Socket clientRequest = echoServer.accept();
executor.execute(new ThreadedServerHandler(clientRequest, i++));
}
} catch (IOException e) {
System.out.println(e);
}
}
public static void main(String[] args) throws IOException {
new MultiThreadedEchoServer(8080).startServer();
}
}
package com.demo.io;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.Socket;
public class ThreadedServerHandler implements Runnable {
Socket clientSocket = null;
int clientNo = 0;
ThreadedServerHandler(Socket socket, int i) {
if (socket != null) {
clientSocket = socket;
clientNo = i;
System.out.println("创建线程为[" + clientNo + "]号客户服务...");
}
}
@Override
public void run() {
PrintStream os = null;
BufferedReader in = null;
try {
in = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
os = new PrintStream(clientSocket.getOutputStream());
String inputLine;
while ((inputLine = in.readLine()) != null) {
// 输入'Quit'退出
if (inputLine.equals("Quit")) {
System.out.println("关闭与客户端[" + clientNo + "]......" + clientNo);
os.close();
in.close();
clientSocket.close();
break;
} else {
System.out.println("来自客户端[" + clientNo + "]的输入: [" + inputLine + "]!");
os.println("来自服务器端的响应:" + inputLine);
}
}
} catch (IOException e) {
System.out.println("Stream closed");
}
}
}
package com.demo.io;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;
import java.net.UnknownHostException;
public class EchoClient {
public static void main(String[] args) {
Socket echoSocket = null;
PrintWriter out = null;
BufferedReader in = null;
try {
echoSocket = new Socket("127.0.0.1", 8080);
out = new PrintWriter(echoSocket.getOutputStream(), true);
in = new BufferedReader(new InputStreamReader(
echoSocket.getInputStream()));
System.out.println("连接到服务器......");
System.out.println("请输入消息[输入\"Quit\"]退出:");
BufferedReader stdIn = new BufferedReader(new InputStreamReader(
System.in));
String userInput;
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
System.out.println(in.readLine());
if (userInput.equals("Quit")) {
System.out.println("关闭客户端......");
out.close();
in.close();
stdIn.close();
echoSocket.close();
System.exit(1);
}
System.out.println("请输入消息[输入\"Quit\"]退出:");
}
} catch (UnknownHostException e) {
System.err.println("Don't know about host");
System.exit(1);
} catch (IOException e) {
System.err.println("Couldn't get I/O for "
+ "the connection ");
System.exit(1);
}
}
}
3、Java NIO简介
1、变迁
- NIO = New I/O
- NIO 1: JSR 51
- JDK 1.4引入
- http://jcp.org/en/jsr/detail?id=051
- NIO 2: JSR203
2、 Java IO vs NIO
4、 Java NIO组件之Buffer
Java NIO Buffer:一个Buffer本质上是内存中的一块,可以将数据写入这块内存,从这块内存中获取数据
java.nio定义了以下几种Buffer的实现
1、Buffer中有三个主要概念:
-
capacity:
- 代表这个缓冲区的容量,一旦设定就不可以更改;比如capacity为1024的IntBuffer,代表其一次可以存放1024个int类型的值;
- 一旦buffer的容量达到capacity,需要清空buffer,才能重新写入值
-
position:位置索引,表示当前正在操作(读写)时的位置
- 从写操作模式到读操作模式切换的时候(flip),position 都会归零,这样就可以从头开始读写了;
-
limit:读写模式下,可操作(读写)的限制;
- 写操作模式下,limit 代表的是最大能写入的数据,这个时候 limit 等于 capacity;
- 读操作模式下,limit 等于 Buffer 中实际的数据大小,因为 Buffer 不一定被写满了。
2、Direct ByteBuffer VS. non-direct ByteBuffer
- Non-direct ByteBuffer
-> HeapByteBuffer,标准的java类
-> 维护一份byte[]在JVM堆上
-> 创建开销小 - Direct ByteBuffer
-> DirectByteBuffer,底层存储在非JVM堆上,通过native代码操作
-> -XX:MaxDirectMemorySize=<size>
-> 创建开销大
3、Buffer API:
- buffer创建
- allocate/allocateDirect
//创建一个容量为10的byte缓冲区 ByteBuffer buff = ByteBuffer.allocate(10); //创建一个容量为10的char缓冲区 CharBuffer buff = CharBuffer.allocate(10);
- wrap
//使用一个指定数组作为缓冲区的存储器, //缓冲区的数据会存放在bytes数组中, //bytes数组或buff缓冲区任何一方中数据的改动都会影响另一方 byte[] bytes = new byte[10]; ByteBuffer buff = ByteBuffer.wrap(bytes); //创建指定初始位置(position)和上界(limit)的缓冲区: byte[] bytes = new byte[10]; ByteBuffer buff = ByteBuffer.wrap(bytes, 3, 8);
- buffer读取
- put/get:使用get()从缓冲区中取数据,使用put()向缓冲区中存数据
- flip:将一个处于存数据(put)状态的缓冲区变为一个处于准备取数(get)的状态
- mark/reset:使用mark记住当前位置(标记),之后可以使用reset将位置恢复到标记处
- compact:压缩,将已读取了的数据丢弃,保留未读取的数据并将保留的数据重新填充到缓冲区的顶部,然后继续向缓冲区写入数据;
- rewind/clear : 设置读写position为0;
public final Buffer rewind() {
position = 0;
mark = -1; //取消标记
return this;
}
public final Buffer clear(){
position = 0; //重置当前读写位置
limit = capacity;
mark = -1; //取消标记
return this;
}
package com.demo.nio.buffers;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
public class BufferAccess {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(10);
printBuffer(buffer);
buffer.put((byte)'H').put((byte)'e').put((byte)'l').put((byte)'l').put((byte)'o');
printBuffer(buffer);
// 翻转缓冲区
buffer.flip();
printBuffer(buffer);
//取buffer
System.out.println("" + (char) buffer.get() + (char) buffer.get());
printBuffer(buffer);
buffer.mark();
printBuffer(buffer);
//读取两个元素后,恢复到之前mark的位置处
System.out.println("" + (char) buffer.get() + (char) buffer.get());
printBuffer(buffer);
buffer.reset();
//buffer.rewind();
printBuffer(buffer);
//压缩,将已读取了的数据丢弃,
// 保留未读取的数据并将保留的数据重新填充到缓冲区的顶部,然后继续向缓冲区写入数据
buffer.compact();
printBuffer(buffer);
buffer.clear();
printBuffer(buffer);
}
private static void printBuffer(Buffer buffer) {
System.out.println("[limit=" + buffer.limit()
+", position = " + buffer.position()
+", capacity = " + buffer.capacity()
+", array = " + new String((byte[]) buffer.array()) +"]");
}
}
- buffer复制(浅复制)
- duplicate():复制一个可读可写的缓冲区
- asReadOnlyBuffer():复制一个只读缓冲区
- slice():复制一个从源缓冲position到limit的新缓冲区
5、 Java NIO组件之 Channel
所有的 NIO 操作始于通道,通道是数据来源或数据写入的目的地
java.nio 包中主要实现的以下几个 Channel:
FileChannel:文件通道,用于文件的读和写;
DatagramChannel:用于 UDP 连接的接收和发送;
SocketChannel:TCP 连接通道,简单理解就是 TCP 客户端;
ServerSocketChannel:TCP 对应的服务端,用于监听某个端口进来的请求;
6、 Java NIO组件之 Selector
java.nio.channels.Selector,支持IO多路复用的抽象实体;
用于检查一个或多个NIO Channel的状态是否处于可读、可写;
这样的话,可以实现单线程管理多个channels,也就是可以管理多个网络链接,如下图;
- 创建Selector
Selector selector = Selector.open();
- 注册Channel到Selector上
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
//register的第二个参数,这个参数是一个“关注集合”,代表关注的channel状态,
//有四种基础类型可供监听, 用SelectionKey中的常量表示如下:
//SelectionKey.OP_CONNECT
//SelectionKey.OP_ACCEPT
//SelectionKey.OP_READ
//SelectionKey.OP_WRITE
- 从Selector中选择channel
Selector.select 更新所有就绪的SelectionKey的状态,并返回就绪的channel个数;
一旦向Selector注册了一个或多个channel后,就可以调用select来获取channel,select方法会返回所有处于就绪状态的channel,
select()方法的返回值是一个int,代表有多少channel处于就绪了。也就是自上一次select后有多少channel进入就绪
int select()
int select(long timeout)
int selectNow()
迭代Selected Key集合并处理就绪channel
在调用select并返回了有channel就绪之后,可以通过选中的key集合来获取channel,这个操作通过调用selectedKeys()方法:
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
SelectionKey key = keyIterator.next();
if(key.isAcceptable()) {
// a connection was accepted by a ServerSocketChannel.
} else if (key.isConnectable()) {
// a connection was established with a remote server.
} else if (key.isReadable()) {
// a channel is ready for reading
} else if (key.isWritable()) {
// a channel is ready for writing
}
keyIterator.remove();
7、Demo
package com.demo.nio.demo;
import java.io.IOException;
public class NIOEchoServer {
public static void main(String[] args) throws IOException {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
// 采用默认值
}
}
EchoHandler timeServer = new EchoHandler(port);
new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();
}
}
package com.demo.nio.demo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class EchoHandler implements Runnable {
private Selector selector;
private ServerSocketChannel servChannel;
private volatile boolean stop;
private int num = 0;
public EchoHandler(int port) {
try {
//创建Selector
selector = Selector.open();
//创建ServerSocketChannel并注册到Selector上,关注Accept事件
servChannel = ServerSocketChannel.open();
servChannel.configureBlocking(false);
servChannel.socket().bind(new InetSocketAddress(port), 1024);
servChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服务器在端口[" + port + "]等待客户请求......");
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
public void stop() {
this.stop = true;
}
@Override
public void run() {
while (!stop) {
try {
//更新所有就绪的SelectionKey的状态,并返回就绪的channel个数
if(selector.select(1000)==0)
continue;
//迭代Selected Key集合并处理就绪channel
//事件处理循环:遍历就绪的channel,分别进行处理
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Throwable t) {
t.printStackTrace();
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null)
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 处理新接入的请求消息,Accept事件
if (key.isAcceptable()) {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = ssc.accept(); // Non blocking, never null
socketChannel.configureBlocking(false);
SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_READ);
sk.attach(num++);
}
//处理read事件
if (key.isReadable()) {
// 读取数据
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println("来自客户端[" + key.attachment() + "]的输入: [" + body.trim() + "]!");
if (body.trim().equals("Quit")) {
System.out.println("关闭与客户端[" + key.attachment() + "]......");
key.cancel();
sc.close();
} else {
String response = "来自服务器端的响应:" + body;
doWrite(sc, response);
}
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else {
}
}
}
}
private void doWrite(SocketChannel channel, String response) throws IOException {
if (response != null && response.trim().length() > 0) {
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
channel.write(writeBuffer);
}
}
}
package com.demo.nio.demo;
public class NIOEchoClient {
public static void main(String[] args) {
int port = 8080;
if (args != null && args.length > 0) {
try {
port = Integer.valueOf(args[0]);
} catch (NumberFormatException e) {
}
}
new Thread(new NIOEchoClientHandler("127.0.0.1", port), "NIOEchoClient-001").start();
}
}
package com.demo.nio.demo;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NIOEchoClientHandler implements Runnable {
private String host;
private int port;
private Selector selector;
private SocketChannel socketChannel;
private ExecutorService executorService;
private volatile boolean stop;
public NIOEchoClientHandler(String host, int port) {
this.host = host == null ? "127.0.0.1" : host;
this.port = port;
this.executorService= Executors.newSingleThreadExecutor();
try {
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
}
@Override
public void run() {
try {
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress(host, port));
} catch (IOException e) {
e.printStackTrace();
System.exit(1);
}
while (!stop) {
try {
selector.select(1000);
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = it.next();
it.remove();
try {
handleInput(key);
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null)
key.channel().close();
}
}
}
} catch (Exception e) {
e.printStackTrace();
System.exit(1);
}
}
// 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
if (selector != null) {
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(executorService != null) {
executorService.shutdown();
}
}
private void handleInput(SelectionKey key) throws IOException {
if (key.isValid()) {
// 判断是否连接成功
SocketChannel sc = (SocketChannel) key.channel();
if (key.isConnectable()) {
if (sc.finishConnect()) {
System.out.println("连接到服务器......");
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("请输入消息[输入\"Quit\"]退出:");
executorService.submit(() -> {
while(true) {
try {
buffer.clear();
InputStreamReader input = new InputStreamReader(System.in);
BufferedReader br = new BufferedReader(input);
String msg = br.readLine();
if (msg.equals("Quit")) {
System.out.println("关闭客户端......");
key.cancel();
sc.close();
this.stop = true;
break;
}
buffer.put(msg.getBytes());
buffer.flip();
sc.write(buffer);
System.out.println("请输入消息[输入\"Quit\"]退出:");
} catch (Exception ex) {
ex.printStackTrace();
}
}
});
sc.register(selector, SelectionKey.OP_READ);
} else {
System.exit(1); // 连接失败,进程退出
}
}
if (key.isReadable()) {
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
int readBytes = sc.read(readBuffer);
if (readBytes > 0) {
readBuffer.flip();
byte[] bytes = new byte[readBuffer.remaining()];
readBuffer.get(bytes);
String body = new String(bytes, "UTF-8");
System.out.println(body);
if(body.equals("Quit"))
{
this.stop = true;
}
} else if (readBytes < 0) {
// 对端链路关闭
key.cancel();
sc.close();
} else
; // 读到0字节,忽略
}
if(key.isWritable()){
System.out.println("The key is writable");
}
}
}
private void doWrite(SocketChannel sc) throws IOException {
/* System.out.println("请输入消息[输入\"Quit\"]退出:");
BufferedReader stdIn = new BufferedReader(new InputStreamReader(
System.in));
String userInput;
while ((userInput = stdIn.readLine()) != null) {
out.println(userInput);
System.out.println(in.readLine());*/
byte[] req = "QUERY TIME ORDER".getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
sc.write(writeBuffer);
if (!writeBuffer.hasRemaining())
System.out.println("Send order 2 server succeed.");
/*
if (userInput.equals("Quit")) {
System.out.println("Closing client");
out.close();
in.close();
stdIn.close();
echoSocket.close();
System.exit(1);
}
System.out.println("请输入消息[输入\"Quit\"]退出:");
}*/
}
}
8、NIO错误认识
1、使用NIO = 高性能
其实并不一定,在一些场景下,使用NIO并不一定更快,比如
- 客户端应用
- 连接数<1000
- 并发程度不高
- 局域网环境下
2、NIO完全屏蔽了平台差异
NO,NIO仍然是基于各个OS平台的IO系统实现的,差异仍然存在
3、使用NIO做网络编程很容易
- 离散的事件驱动模型,编程困难
- 陷阱重重
(其实,真正简化网络编程的是Netty)