1. 前言
有一些概念总是Java I/O一块出现,比如同步与异步,阻塞与非阻塞,这些概念往往也是非常难以区分。在介绍Java I/O之前,本文先通俗地介绍一下这两组概念的区别:
- 同步和异步: 同步和异步的区分点在消息的通知机制。
如果是程序主动获取消息,为同步,程序被动获取消息,为异步。例如烧水,如果我们时不时去看看水是否烧开,则为同步。而如果水壶是会响笛的水壶,我们听见响笛则认为水烧开了,则为异步。而对于程序,如果是程序轮询结果或者直接等待结果,为同步。如果程序调用了然后立刻返回,结果等待被调用方通知,或者回调,则为异步。重点在获取调用的结果的方式。 - 阻塞和非阻塞: 区分点则在等待程序调用结果时,程序所处的状态。
例如烧水,如果在烧水的过程中,我们一直等着,啥事都不干,则为阻塞。如果我们在烧水的过程中,继续干着别的事,则为非阻塞。重点在获取程序调用结果的,程序所处于的状态。
总的来说,对于一个程序中调用过程来说,获取调用结果的方式,决定了程序是同步(主动)还是异步(被动)。而在获取调用结果的过程中,程序所处的状态,决定了程序是阻塞(挂起)还是非阻塞(处理其他的事情)
2. Java I/O的发展历程
Java I/O的发展一般来说主要是分为三个阶段:
- 第一个阶段:在JDK 1.0到JDK 1.3中,Java的I/O类库是非常简单的,很多UNIX网络编程中的概念或者接口在Java I/O类库中都没有体现。通常,我们这种类型的I/O为BIO,即Blocking I/O。
- 第二个阶段:在JDK 1.4中,java 新增加了java.nio包,正式引入了NIO(Non-blocking I/O),提供了异步开发I/O的API和类库。Java NIO主要由Selector,ByteBuffer和Channel三个核心部分组成。
- 第三个阶段:JDK1.7正式发布,java对NIO进行了升级,被称为NIO2.0,也称为AIO,支持文件的异步I/O以及网络的异步操作。
3. BIO、NIO以及AIO
网络Socket编程的一般类型是Server/client类型的,即两个进程之间的通信。Server端通过绑定端口号建立Socket监听连接,而Client端通过指定Ip地址和端口号通过三次握手建立双方的连接,如果连接成功,双方就通过Socket进行通信。
在第2部分,简单的介绍了BIO,NIO以及AIO的概念,在本部分主要通过实例代码来展示三个的关键点。
1.BIO
当Server和Client端采用BIO形式,双方通过输入流和输出流通过同步阻塞的方式进行通信。
采用BIO通信方式的Server端,一般由一个单独的Acceptor线程来监听客户端的连接请求,Server接收到Client端的连接后,就为每一个client建立新的线程,进行链路处理,通过输入流发送响应到客户端,销毁线程,整个socket通信流程结束。这是典型的一请求一应答模式的。
以下示例的socket通信流程,模拟客户端发送http请求给服务器端,如果请求的路径为登录地址,则返回已经登录,如果请求路径不为登录地址,则返回还未登录。
Server端监听8080端口通过轮询的方式不断监听client的连接请求,等待client的连接(serverSocket.accept())。当没有client连接server的时候,线程阻塞在accept()处。当Server接收到client的连接请求,新建一个线程进行链路业务处理。
服务器端Acceptor线程(主线程)代码如下:
public class InfoServer {
public static void main(String args[]){
try{
ServerSocket serverSocket = new ServerSocket(8080);
Socket socket = null;
while(true){
System.out.println("socket listening");
socket = serverSocket.accept(); //线程阻塞在此处
new Thread(new LoginCheckThread(socket)).start();
System.out.println("socket accepted");
}
}catch(Exception e){
e.printStackTrace();
}
}
}
在链路处理线程中,代码流程为:
- 首先根据socket的输入流产生字符流BufferReader对象;
- 通过BufferReader对象的readline()方法,读取client端传过来的数据。readline()方法当读取到换行符'\n'或'\r'时才返回。
- 最后,从socket的输出流中产生了字符流PrintWriter对象,通过PrintWriter对象发送响应内容。
链路处理线程为:
public class LoginCheckThread implements Runnable {
private Socket socket = null;
public LoginCheckThread(Socket socket){
this.socket = socket;
}
public void run(){
try{
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);
String content = bufferedReader.readLine();
System.out.println(Thread.currentThread().getName()+" "+content);
//获取访问的域名,如果是登录请求,则返回已经登录,否则提示没有登录
if(content.split(" ")[1].equals("/a/login")){
printWriter.println("you are not login in this system!");
}else{
printWriter.println("you have login in this system!");
}
socket.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
客户端的代码:
客户端通过指定IP地址和端口,尝试连接server端口,连接上Server端后,通过PrinterWriter对象想Server端发送请求数据。发送完请求数据之后,client通过BufferReader的read(Char[])方法来读取Server端的响应数据。需要注意到是read(char[])会产生阻塞,read(char[])方法只有在以下三种情况下才会返回:
- 读取到足够多的字节;
- 读取输入流的终止符;
- 发生IO异常
public class Client {
public static void main(String args[]){
try{
Socket socket = new Socket("127.0.0.1",8080);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
printWriter.println();
printWriter.flush();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] reponseChar = new char[1024];
//read方法是阻塞方法,程序在此处会产生阻塞
bufferedReader.read(reponseChar);
System.out.println("repose:"+new String(reponseChar));
socket.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
2.NIO
NIO即非阻塞I/O(Non-blocking I/O),NIO类库提供了 对应的ServerSocketChannel和SocketChannel两种不同套接字的通道实现,ServerSocketChannel和Socketchannel分别于BIO中的ServerSocket和Socket对应。这两种新增的通道都支持阻塞和非阻塞模式,阻塞模式使用起来更加的简单,但是性能和可靠性上都不好,非阻塞模式却正好相反。
Java NIO由Channel、Buffer、Selector三个核心部分组成。Channel和Buffer与操作系统的IO方式更加接近,所以性能上会比传统的AIO要好。
在NIO中,基本上所有的IO中都是从一个Channel开始。Channel译为通道,与流不同的是,通道同时读和写。数据可以从Channel中读到Buffer中,也可以从Buffer中写到Channel中。Selector(选择器)是能够检测一个到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。通过Selector,一个单独的线程可以管理多个Channel,从而管理多个网络连接。
如何使用NIO来进行网络编程:
- 创建Selector:通过Selector.open()可以创建Selector对象;
- 创建Channel:Channel分为ServerSocketChannel和SocketChannel。在Server端,通过ServerSocketChannel.open()可以创建Server端监听通道ServerSocketChannel,在Client端可以通过SocketChannel.open()可以打开连接通道SocketChannel对象;
-
向Selector中注册Channel及感兴趣的事件(OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT);
- ServerSocketChannel可以向Selector注册OP_ACCEPT,而SocketChannel可以向Seletor注册OP_READ,OP_WRTIE,OP_CONNECT;
- 轮询Selector,获取就绪的Channel(通过Selector.select()及其他重载方法)
- 针对特定的Channel进行业务上的处理。
下面通过具体的代码来说明如何进行NIO编程
InfoServer类为Server端的启动类,通过新建线程的形式启动Server端的监听线程。
public class InfoServer {
public static void main(String args[]){
new Thread(new LoginCheckTask(8080)).start();
}
}
LoginCheckTask类实现了网络监听、网络连接及请求处理的操作。在构造函数对Server端进行了网络初始化,包括获得Selector对象、ServersocketChannel对象、设置Socket参数,最后还向Selector注册了当前ServerSocketChannel通道的OP_ACCERT事件。
在NIO中,通道Channel要么从缓冲器获得数据,要么向缓冲器发送数据。唯一直接与通道交互的缓冲器是ByteBuffer。
当server端通道中监听到client端的连接后,建立与Client端连接的SocketChannel,并向该Socketchannel中注册OP_READ事件。
当多路复用器Selector检测到OP_READ事件就绪后,就从该SocketChannel的的缓冲器ByteBuffer中读取请求内容。当Server端有数据需要向client端发送响应时,首先需要将响应的字节数据写入到ByteBuffer中,然后通过SocketChannel的write方法向client端发送响应的。
public class LoginCheckTask implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
public LoginCheckTask(int port){
try{
//获取Selector对象
selector = Selector.open();
//获取ServerSocketChannel对象
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//设置监听参数
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//向selector注册OP_ACCEEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println(" server listening");
}catch (Exception e){
e.printStackTrace();
}
}
public void stop(){
this.stop = true;
}
public void run(){
while (!stop){
try{
//获取通道就绪的网络事件,该方法会阻塞1s
selector.select(1000);
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeySet.iterator();
SelectionKey key = null;
//分别处理就绪的网络事件
while(it.hasNext()){
key = it.next();
it.remove();
try{
if(key.isValid()){
//处理OP_ACCEPT事件
if(key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//将与client端建立的socketChannel,向Selector注册OP_READ事件;
socketChannel.register(selector,SelectionKey.OP_READ);
System.out.println("server accepted");
}
//处理OP_READ事件
if(key.isReadable()){
System.out.println("client is readable");
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SocketChannel sc = (SocketChannel) key.channel();
int readBytes = sc.read(readBuffer);
if(readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBytes];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
String response;
//获取访问的域名,如果是登录请求,则返回已经登录,否则提示没有登录
if(body.split(" ")[1].equals("/a/login")){
response = "you are not login in this system!/n";
}else{
response = "you have login in this system!/n";
}
byte[] reponseBytes = response.getBytes();
ByteBuffer reposeByteBuffer = ByteBuffer.allocate(1024);
reposeByteBuffer.put(reponseBytes);
reposeByteBuffer.flip();
Thread.currentThread().sleep(1000);
sc.write(reposeByteBuffer);
}
}
}
}catch(Exception e){
e.printStackTrace();
key.cancel();
if(key.channel()!= null){
key.channel().close();
}
}
}
}catch(Exception e){
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
}catch(Exception e){
e.printStackTrace();;
}
}
}
}
在NIO的Client端,程序的基本执行流程为:
- 初始化时需要产生Selector对象和SocketChannel,配置SocketChannel的阻塞模式为非阻塞模式。
- client尝试与Server建立的Socket连接,如果直接连接成功,则注册OP_READ事件并发送请求数据,否则注册OP_CONNECT事件。
- 与Server端类似,client也是通过循环不断探测多路复用器Selector的就绪事件
- 如果事件是连接成功事件,则注册OP_READ事件,并向Server服务端发送请求数据
- 如果事件为可读时间,则通过缓冲器ByteBuffer从SocketChannel中读取Server端的响应数据。
public class Client {
private Selector selector;
private SocketChannel socketChannel;
public static void main(String args[]){
new Client().connect();
}
public void connect(){
try{
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}catch(Exception e){
e.printStackTrace();
}
try {
//非阻塞模式,如果直接连接成功,则注册读,否则注册连接事件
if(socketChannel.connect(new InetSocketAddress("127.0.0.1",8080))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}catch(Exception e){
e.printStackTrace();
}
boolean loop= true;
while(loop){
try{
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isValid()){
if(key.isConnectable()){
if(socketChannel.finishConnect()){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
System.exit(1);
}
}
//读取服务器端返回数据
if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if(readBytes > 0){
byteBuffer.flip();
byte[] responseByte = new byte[byteBuffer.remaining()];
byteBuffer.get(responseByte);
System.out.println(new String(responseByte,"UTF-8"));
loop= false;
}
}
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
/**
* 如果连接成功,则向服务器发送数据
* @param sc
*/
public void doWrite(SocketChannel sc)throws Exception{
String request = "GET /a/index HTTP/1.1 ";
byte[] requestBytes = request.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(requestBytes);
byteBuffer.flip();
sc.write(byteBuffer);
}
}
在整个NIOSocket通信流程中,只有在Selector.select(1000)处阻塞1s,其他的读写操作由于都是通过缓冲器来操作Channel,所以均为非阻塞操作。
3.AIO
NIO 2.0(即AIO)引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。异步通道提供以下两种方式获取操作结果:
- 通过java.util.concurrent.Future类表示异步操作的结果;
- 在执行异步的时候传入一个java.nio.channels;
CompletionHandler的实现类作为操作完成的回调。
NIO2.0的异步套接字通道是真正的异步非阻塞I/O,对应于unix网络编程的事件驱动I/O。它不需要通过多路复用器Selector对注册的通道进行轮询,即可实现异步读写,从而简化了NIO的编程模型。
Server端代码:
public class InfoServer {
public static void main(String args[]){
new Thread(new LoginCheckHandler(8080)).start();
}
}
和之前类似,在构造函数中完成Server端Socket的初始化,主要完成获取AsynchronousServerSocketChannel对象,监听端口设置。
在run方法中,我们通过AsynchronousServerSocketChannel的异步方法accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)来接收client端的连接,并指定了连接完成后的回调函数AcceptCompletionHandler对象(AcceptCompletionHandler是ComplettionHandler的实现类)
public class LoginCheckHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public LoginCheckHandler(int port){
try{
//打开异步通道
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
//监听8080端口
asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("服务器正在监听8080端口中");
}catch(Exception e){
e.printStackTrace();
}
}
public void run(){
//latch的作用是在完成一组正在执行的操作前之前,允许当前的线程一直阻塞。在这里我们是为了防止服务器执行完成退出。
latch = new CountDownLatch(1);
//异步ServerSocketchannel
asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler());
try{
latch.await();
}catch(Exception e){
e.printStackTrace();
}
}
}
AcceptCompletionHandler是Server端与Client端完成Socket通信过程建立后的回调函数,当通信过程成功建立,则调用completed()方法,否则调用failed()方法。
需要特别注意,在completed方法中,还需要再次调用AsyncrhonousSocketChannel的accept方法,因为一个Server端可以接收多个client端的连接,所以需要继续调用accept方法继续接收其他client的连接,最终形成一个循环。每当一个client端连接进来后,再异步接收新的连接。
在completed方法中,通过asynchronousSocketChannel的read方法来异步读取客户端的请求内容,读操作也是异步的。ReadCompletedHandler类是读操作完成后的回调函数。
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,LoginCheckHandler> {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, LoginCheckHandler attachment){
System.out.println("服务器接收到连接");
attachment.asynchronousServerSocketChannel.accept(attachment,this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
asynchronousSocketChannel.read(buffer,buffer,new ReadCompletedHandler(asynchronousSocketChannel));
}
@Override
public void failed(Throwable t,LoginCheckHandler attachment){
t.printStackTrace();
attachment.latch.countDown();
}
}
ReadCompletedHandler也是CompletionHandler的实现类,如果是读操作的回调函数,java类库已经明确规定了,其泛型的参数类型为
CompletionHandler<Integer,ByteBuffer>
其中Integer主要是为了记录读取client端的请求数据的大小。
在completed方法内部,通过读取缓冲区ByteBuffer获取请求数据,并且完成业务处理,返回响应内容。从doWrite方法内部,可以看到写操作也是异步的。通过匿名内部类的方式来指定了写操作完成后的回调函数。
public class ReadCompletedHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel asynchronousSocketChannel;
public ReadCompletedHandler(AsynchronousSocketChannel channel){
if( this.asynchronousSocketChannel == null){
this.asynchronousSocketChannel = channel;
}
}
public void completed(Integer result,ByteBuffer attachment){
attachment.flip();
byte[] reponse = new byte[attachment.remaining()];
attachment.get(reponse);
try{
String req = new String(reponse,"UTF-8");
System.out.println("the server received: "+req);
//获取访问的域名,如果是登录请求,则返回已经登录,否则提示没有登录
if(req.split(" ")[1].equals("/a/login")){
doWrite("you are not login in this system!");
}else{
doWrite("you have login in this system!");
}
}catch(Exception e){
e.printStackTrace();
}
}
public void doWrite(String response){
try{
Thread.currentThread().sleep(5000);
System.out.println("读取到数据,等待5s");
}catch(Exception e){
e.printStackTrace();
}
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(bytes);
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//只要有剩余的内容没有发送完就继续发送数据
if(attachment.hasRemaining()){
asynchronousSocketChannel.write(attachment,attachment,this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try{
asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
public void failed(Throwable t,ByteBuffer attachment){
try{
this.asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
AIO client端代码:
public class Client {
public static void main(String args[]){
new Thread(new ClientHandler("localhost",8080)).start();
}
}
对于Client端的代码,特别要注意AsynchronousSocketChannel的connect方法已经指定了连接成功后的回调函数的泛型为
CompletionHandler<Void,ClientHandler>
在ClientHandler内部,完成了连接成功后的回调函数(ClientHandler),写操作完成后的回调函数(匿名内部类),读操作完成后的回调函数(匿名内部类)。
public class ClientHandler implements CompletionHandler<Void,ClientHandler>,Runnable {
private AsynchronousSocketChannel asynchronousSocketChannel;
private String host;
private int port;
private CountDownLatch countDownLatch;
public ClientHandler(String host,int port){
this.host = host;
this.port = port;
try{
asynchronousSocketChannel = AsynchronousSocketChannel.open();
}catch(Exception e){
e.printStackTrace();
}
}
public void run(){
countDownLatch = new CountDownLatch(1);
asynchronousSocketChannel.connect(new InetSocketAddress(host,port),this,this);
try{
countDownLatch.await();
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void completed(Void result, ClientHandler attachment) {
byte[] req = "GET /a/index HTTP/1.1 ".getBytes();
final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(req);
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer,ByteBuffer>() {
@Override
public void completed(Integer result,ByteBuffer attachment){
if(attachment.hasRemaining()){
asynchronousSocketChannel.write(attachment,attachment,this);
}else{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body;
try{
body = new String(bytes,"UTF-8");
System.out.println(body);
countDownLatch.countDown();
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
asynchronousSocketChannel.close();
countDownLatch.countDown();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
}
public void failed(Throwable t,ByteBuffer attachment){
try{
asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ClientHandler attachment) {
try{
asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
异步SocketChannel为被动执行对象,编程人员不需要像NIO一样,编写独立的I/O线程来处理读写操作。对于AsynchrousServerSocketChannel和AsynchrousSocketChannel,它们都由JDK底层的线程池负责回调并驱动读写操作。也正是因为如此AIO编程模型比NIO编程模型更加简单。
4.BIO、NIO、AIO对比
类型 | 同步阻塞BIO | 同步非阻塞NIO | 异步非阻塞AIO |
---|---|---|---|
客户端个数:(I/O 线程) | 1:1 | M:1(一个I/O线程处理多个客户端连接) | M:0(不需要额外的启动线程,被动回调) |
I/O类型(阻塞) | 阻塞I/O | 非阻塞I/O | 非阻塞I/O |
I/O类型(同步) | 同步I/O | 同步I/O | 异步I/O |
API使用难度 | 简单 | 非常复杂 | 复杂 |
调试难度 | 简单 | 复杂 | 复杂 |
可靠性 | 非常差 | 高 | 高 |
吞吐量 | 低 | 高 | 高 |
上表对于三种类型的I/O模型进行了对比,具体应该选用哪个编程模型,完全基于实际的业务场景进行技术选型。一般情况低负载、低并发的应用程序可以选用阻塞IO,以降低编程的复杂度。而高负载、高并发的网络应用可以采用NIO的非阻塞模式,提供应用程序的性能。
参考:李林锋《netty权威指南》