1. 网络NIO
Java NIO是New IO的简称,是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于java标准IO的操作机制。严格来说,NIO和并发没直接关系,但使用NIO可以大大提高线程的使用效率。
Java NIO中涉及的基础内容包括通道(Channel)和缓冲区(Buffer)、文件IO和网络IO。
1.1 基于Socket的服务端的多线程模式
在学习socket编程时,写一个简单的Echo服务器是一个很好的开端,算是一个hello world了。其具体过程是Echo服务器读取客户端的一个输入,并将这个输入原封不动地返回给客户端。
服务端使用多线程处理客户端信息示意图如下所示。
服务器会为每一个客户端连接启动一个线程专为该客户端处理请求。
- 服务端代码
public class Server {
private static ExecutorService executor = Executors.newFixedThreadPool(10);
static class EchoHandler implements Runnable{
Socket client;
BufferedReader br = null;
PrintWriter pw = null;
//服务处理线程类构造方法接受一个Socket对象
public EchoHandler(Socket client){
this.client = client;
}
@Override
public void run() {
try {
br = new BufferedReader(new InputStreamReader(client.getInputStream()));
pw = new PrintWriter(client.getOutputStream(),true);
String data;
while((data = br.readLine()) != null){
System.out.println(data);
pw.println(data + " " + Thread.currentThread().getId());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
try {
br.close();
client.close();
} catch (IOException e) {
e.printStackTrace();
}
if(pw != null) pw.close();
}
}
}
public static void main(String[] args) {
ServerSocket ss = null;
Socket client;
try {
//监听8000端口
ss = new ServerSocket(8000);
} catch (IOException e) {
e.printStackTrace();
}
while(true){
try {
//接受客户端的连接请求
client = ss.accept();
System.out.println("处理 " + client.getRemoteSocketAddress());
//和一个客户端连接后,创建一个线程用于执行该客户端的请求
executor.execute(new EchoHandler(client));
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
- 客户端代码
public class Client {
public static void main(String[] args) throws IOException {
Socket client;
PrintWriter pw = null;
BufferedReader br = null;
client = new Socket();
try {
client.connect(new InetSocketAddress("localhost",8000));
pw = new PrintWriter(client.getOutputStream(),true);
pw.println("hello");
pw.flush();
br = new BufferedReader(new InputStreamReader(client.getInputStream()));
System.out.println(br.readLine());
} catch (IOException e) {
e.printStackTrace();
}finally {
br.close();
pw.close();
client.close();
}
}
}
多线程的服务器开发模式有效解决了多客户端请求的问题,但是该模式有个重大弱点是——它倾向于让CPU进行IO等待。
可以想象,如果客户端慢悠悠地发送数据,服务器端只能一直等待,不能干其他事情。所以我们希望将网络IO的等待时间从线程中分离出来,于是NIO出现了。
1.2 使用NIO进行网络编程
NIO的三个关键组件:通道(Channel)、缓存(Buffer)和选择器(Selector)。
-
通道(Channel)
Channel类似于流,一个channel可以和文件或者网络Socket对应。如果Channel和Socket对应,那个往channel中写入数据就相当于向socket中写入数据。 -
缓存(Buffer)
可以简单将Buffer理解为一个内存区域或者byte数组。数据需要包装成Buffer的形式才能和Channel进行交互。 -
选择器(Selector)
在Channel的实现中,有一个SelectableChannel实现,表示可被选择的通道。任何一个SelectableChannel都可将自己注册到Selector中,由Selector管理,一个Selector可以管理多个Channel。Channel的数据准备好后,就会通知Selector,就会得到已经准备好的数据。
可以看到,一个Selector由一个线程管理,而一个Selector管理着多个Channel,每个Channel表示一个客户端的连接。如此,极少数的线程就可以处理多个客户端的请求。当与客户端连接的数据没有准备好,selector就会处于等待状态,一旦某个channel的数据准备好了,就会通知selector拿到数据进行处理。 - NIO版服务端代码
public class NioServer {
private Selector selector;
private ExecutorService tp = Executors.newFixedThreadPool(10);
private void startServer() throws IOException {
selector = SelectorProvider.provider().openSelector();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);//将channel初始化为非阻塞模式
//将channel绑定到8000端口
ssc.socket().bind(new InetSocketAddress("localhost",8000));
//将ssc注册到selector中,并注册感兴趣事件为SelectionKey.OP_ACCEPT(接受连接请求)
//SelectionKey是连接channel和selector的一个契约
SelectionKey acceptKey = ssc.register(selector,SelectionKey.OP_ACCEPT);
while(true){
//只有当该事件到达时,Selector.select()会返回,否则一直阻塞。
selector.select();
//表示一组selectedKeys准备好了,接下来selector会对各个key对应的channel进行处理
Set readyKeys = selector.selectedKeys();
Iterator itr = readyKeys.iterator();
while(itr.hasNext()){
SelectionKey key = (SelectionKey) itr.next();
itr.remove();
if(key.isAcceptable()){
doAccept(key);
}else if(key.isReadable()){
doRead(key);
}else if(key.isWritable()){
doWrite(key);
}
}
}
}
/**
* 处理写操作
* @param key
* @throws IOException
*/
private void doWrite(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
EchoClient echoClient = (EchoClient) key.attachment();
LinkedList<ByteBuffer> outq = echoClient.getOutq();
ByteBuffer bb = outq.getLast();
int len = channel.write(bb);
if(bb.remaining() == 0){
outq.removeLast();
}
if(outq.size() == 0){
key.interestOps(SelectionKey.OP_READ);
}
}
/**
* 处理读操作
* @param key
*/
private void doRead(SelectionKey key) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer bb = ByteBuffer.allocate(8192);
int len = 0;
try {
len = channel.read(bb);
if(len < 0) return;
} catch (IOException e) {
e.printStackTrace();
}
bb.flip();
//多线程处理数据,防止任务派发线程被阻塞
tp.execute(new HandleMsg(key,bb));
}
/**
* 处理连接操作
* @param key
*/
private void doAccept(SelectionKey key) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client;
try {
client = server.accept();
client.configureBlocking(false);
//服务端由SelectionKey.OP_ACCEPT转为SelectionKey.OP_READ,等待客户端发信息
SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ);
EchoClient echoClient = new EchoClient();
clientKey.attach(echoClient);
InetAddress clientAddress = client.socket().getInetAddress();
System.out.println("Accepted connection from " + clientAddress);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
new NioServer().startServer();
}
}
EchoClient.java代码
public class EchoClient {
private LinkedList<ByteBuffer> outq;
public EchoClient(){
outq = new LinkedList<>();
}
public LinkedList<ByteBuffer> getOutq() {
return outq;
}
public void enqueue(ByteBuffer bb){
outq.addFirst(bb);
}
}
HandleMsg.java代码
public class HandleMsg implements Runnable {
SelectionKey key;
ByteBuffer bb;
public HandleMsg(SelectionKey key, ByteBuffer bb) {
this.key = key;
this.bb = bb;
}
@Override
public void run() {
EchoClient echoClient = (EchoClient) key.attachment();
echoClient.enqueue(bb);
if(key.isValid())
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
else{
key.cancel();
}
key.selector().wakeup();
}
}
整个服务端的代码实现都是围绕着channel、selector和SelectionKey进行各种操作。
同样,客户端也可以用NIO来实现。
- NIO版客户端代码
public class NioClient {
private Selector selector;
/**
* 初始化channel
* @param ip
* @param port
* @throws IOException
*/
public void init(String ip, int port) throws IOException {
selector = SelectorProvider.provider().openSelector();
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(ip,port));
channel.register(selector, SelectionKey.OP_CONNECT);
}
public void working() throws IOException {
while(true){
if(!selector.isOpen()){
break;
}
selector.select();
Set readyKeys = selector.selectedKeys();
Iterator itr = readyKeys.iterator();
while(itr.hasNext()){
SelectionKey key = (SelectionKey) itr.next();
itr.remove();
if(key.isConnectable()){
connect(key);
}else if(key.isReadable()){
read(key);
}
}
}
}
private void read(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(100);
channel.read(buffer);
byte[] data = buffer.array();
String msg = new String(data).trim();
System.out.println("Client receive data : " + msg);
channel.close();
key.selector().close();
}
private void connect(SelectionKey key) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
if(channel.isConnectionPending()){
channel.finishConnect();
}
channel.configureBlocking(false);
channel.write(ByteBuffer.wrap(new String("Hello Server!").getBytes()));
channel.register(selector, SelectionKey.OP_READ);
}
public static void main(String[] args) throws IOException {
NioClient nioClient = new NioClient();
nioClient.init("localhost",8000);
nioClient.working();
}
}
2. 总结
NIO其实又有人称之为Non-blocking IO,顾名思义,它是同步非阻塞的IO。
同步是指线程不断轮询IO事件是否准备就绪,非阻塞是指线程在等待IO的时候可以去做其他的任务。
NIO同步的核心是selector,selector管理着很多channel,它负责轮询各个channel是否准备就绪,减少了很多不必要的线程消耗;非阻塞的核心则是通道和缓冲区,当channel准备就绪,数据就会被写到缓冲区,而无需线程阻塞地等待IO,这是和BIO的最大区别。