I/O vs NI/O vs NIO.2
NIO 'N' 是 New 还是 Non-Blocking?
IO
这里我们分别用BlockingIO 和 Non-BlockingIO实现 EchoServer:
Blocking IO
public class PlainEchoServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); //#1
try {
while (true) {
final Socket clientSocket = socket.accept(); //#2
System.out.println("Accepted connection from " + clientSocket);
new Thread(new Runnable() { //#3
@Override
public void run() {
try {
BufferedReader reader = new BufferedReader(new InputStreamReader(clientSocket.getInputStream()));
PrintWriter writer = new PrintWriter(clientSocket.getOutputStream(), true);
while(true) { //#4
writer.println(reader.readLine());
writer.flush();
}
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
} }
}).start(); //#5
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
new PlainEchoServer().serve(8989);
}
}
- 1 :绑定端口
- 2 :会一直阻塞直到下一个客户端链接;
- 3 :这里会创建一个新的线程来处理客户端链接
- 4 :接收数据并写回
- 5 :启动线程
这里我们必须为每一个客户端链接开启一个单独的线程,虽然,我们可以使用线程池去处理,但是并没有从本质上解决这个问题。
NIO
Non-Blocking IO
其实在 "NIO-异步IO"中已经实现过EchoServer,可以参考。
public class PlainNioEchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
ServerSocketChannel serverChannel = ServerSocketChannel.open();
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address); //#1
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //#2
while (true) {
try {
selector.select(); //#3
} catch (IOException ex) {
ex.printStackTrace();
// handle in a proper way
break;
}
Set readyKeys = selector.selectedKeys(); //#4
Iterator iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
iterator.remove(); //#5
try {
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel)key.channel();
SocketChannel client = server.accept(); //#6
System.out.println("Accepted connection from : " + client);
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, ByteBuffer.allocate(100)); //#7
}
if (key.isReadable()) { //#8
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output); //#9
}
if (key.isWritable()) { //#10
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
output.flip();
client.write(output); //#11
output.compact();
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
}
} }
}
}
public static void main(String[] args) throws IOException {
new PlainNioEchoServer().serve(8989);
}
}
- 1 :绑定端口
- 2 :服务通道上注册监听Accept事件,这样新链接进来便可以感知到
- 3 :这里会阻塞
- 4 :获取到所有的SelectKey实例
- 5 :将已经处理的实例从注册表中删除
- 6 :接收客户端链接
- 7 :注册链接到selector上
- 8 :检查读
- 9 :从缓冲区读到通道
- 10:检测写
- 11:从缓冲区写数据到通道
NIO.2
NIO.2 提供了一个CompletionHandler
,这个handler会在操作(Accept、Read、Write)完成时回调,不用像我们在NIO的例子中那样做检测。
public class PlainNio2EchoServer {
public void serve(int port) throws IOException {
System.out.println("Listening for connections on port " + port);
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(port);
serverChannel.bind(address); //#1
final CountDownLatch latch = new CountDownLatch(1);
serverChannel.accept(
null,
new CompletionHandler<AsynchronousSocketChannel, Object>() { //#2
@Override
public void completed(final AsynchronousSocketChannel channel, Object attachment) {
serverChannel.accept(null, this); //#3
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer, buffer, new EchoCompletionHandler(channel)); //#4
}
@Override
public void failed(Throwable throwable, Object attachment) {
try {
serverChannel.close(); //#5
} catch (IOException e) {
// ingnore on close
} finally {
latch.countDown();
}
}
}
);
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private final class EchoCompletionHandler implements CompletionHandler<Integer, ByteBuffer> {
private final AsynchronousSocketChannel channel;
EchoCompletionHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
channel.write(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { //#6
@Override
public void completed(Integer result, ByteBuffer buffer) {
if (buffer.hasRemaining()) {
channel.write(buffer, buffer, this); //#7
} else {
buffer.compact();
channel.read(buffer, buffer,this); //#8
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
// ingnore on close
}
}
}
public static void main(String[] args) throws IOException {
new PlainNio2EchoServer().serve(8989);
}
}
- 1 :绑定端口
- 2 :开始接收客户端链接,一有链接便会回调handler
- 3 :使用同一个handler,在此准备接收新的客户端链接
- 4 :触发一个通道的读操作,并注册一个EchoHandler回调
- 5 :出现错误则关闭通道
- 6 :触发一个通道的写操作,并注册当前EchoHandler回调
- 7 :缓冲区中仍然有数据的话,同样触发一个写操作
- 8 :触发一个通道的读操作,并注册当前EchoHandler回调