初步认识 NIO

1. 网络NIO

Java NIO是New IO的简称,是一种可以替代Java IO的一套新的IO机制。它提供了一套不同于java标准IO的操作机制。严格来说,NIO和并发没直接关系,但使用NIO可以大大提高线程的使用效率。

Java NIO中涉及的基础内容包括通道(Channel)和缓冲区(Buffer)、文件IO和网络IO。

1.1 基于Socket的服务端的多线程模式

在学习socket编程时,写一个简单的Echo服务器是一个很好的开端,算是一个hello world了。其具体过程是Echo服务器读取客户端的一个输入,并将这个输入原封不动地返回给客户端。
服务端使用多线程处理客户端信息示意图如下所示。


多线程socket

服务器会为每一个客户端连接启动一个线程专为该客户端处理请求。

  • 服务端代码
public class Server {
    private static ExecutorService executor = Executors.newFixedThreadPool(10);
    static class EchoHandler implements Runnable{
        Socket client;
        BufferedReader br = null;
        PrintWriter pw = null;
       //服务处理线程类构造方法接受一个Socket对象
        public EchoHandler(Socket client){
            this.client = client;
        }
        @Override
        public void run() {
            try {
                 br = new BufferedReader(new InputStreamReader(client.getInputStream()));
                 pw = new PrintWriter(client.getOutputStream(),true);
                 String data;
                 while((data = br.readLine()) != null){
                     System.out.println(data);
                     pw.println(data + " " + Thread.currentThread().getId());
                 }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                try {
                    br.close();
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                if(pw != null) pw.close();
            }
        }
    }
    public static void main(String[] args) {
        ServerSocket ss = null;
        Socket client;
        try {
            //监听8000端口
            ss = new ServerSocket(8000);
        } catch (IOException e) {
            e.printStackTrace();
        }
        while(true){
            try {
                //接受客户端的连接请求
                client = ss.accept();
                System.out.println("处理 " + client.getRemoteSocketAddress());
                //和一个客户端连接后,创建一个线程用于执行该客户端的请求
                executor.execute(new EchoHandler(client));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
  • 客户端代码
public class Client {
    public static void main(String[] args) throws IOException {
        Socket client;
        PrintWriter pw = null;
        BufferedReader br = null;
        client = new Socket();
        try {
            client.connect(new InetSocketAddress("localhost",8000));
            pw = new PrintWriter(client.getOutputStream(),true);
            pw.println("hello");
            pw.flush();
            br = new BufferedReader(new InputStreamReader(client.getInputStream()));
            System.out.println(br.readLine());
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            br.close();
            pw.close();
            client.close();
        }
    }
}

多线程的服务器开发模式有效解决了多客户端请求的问题,但是该模式有个重大弱点是——它倾向于让CPU进行IO等待。
可以想象,如果客户端慢悠悠地发送数据,服务器端只能一直等待,不能干其他事情。所以我们希望将网络IO的等待时间从线程中分离出来,于是NIO出现了。

1.2 使用NIO进行网络编程

NIO的三个关键组件:通道(Channel)、缓存(Buffer)和选择器(Selector)。

  • 通道(Channel)
    Channel类似于流,一个channel可以和文件或者网络Socket对应。如果Channel和Socket对应,那个往channel中写入数据就相当于向socket中写入数据。
  • 缓存(Buffer)
    可以简单将Buffer理解为一个内存区域或者byte数组。数据需要包装成Buffer的形式才能和Channel进行交互。
  • 选择器(Selector)
    在Channel的实现中,有一个SelectableChannel实现,表示可被选择的通道。任何一个SelectableChannel都可将自己注册到Selector中,由Selector管理,一个Selector可以管理多个Channel。Channel的数据准备好后,就会通知Selector,就会得到已经准备好的数据。
    selector

    可以看到,一个Selector由一个线程管理,而一个Selector管理着多个Channel,每个Channel表示一个客户端的连接。如此,极少数的线程就可以处理多个客户端的请求。当与客户端连接的数据没有准备好,selector就会处于等待状态,一旦某个channel的数据准备好了,就会通知selector拿到数据进行处理。
  • NIO版服务端代码
public class NioServer {
    private Selector selector;
    private ExecutorService tp = Executors.newFixedThreadPool(10);
    private void startServer() throws IOException {
        selector = SelectorProvider.provider().openSelector();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);//将channel初始化为非阻塞模式
        //将channel绑定到8000端口
        ssc.socket().bind(new InetSocketAddress("localhost",8000));
        //将ssc注册到selector中,并注册感兴趣事件为SelectionKey.OP_ACCEPT(接受连接请求)
        //SelectionKey是连接channel和selector的一个契约
        SelectionKey acceptKey = ssc.register(selector,SelectionKey.OP_ACCEPT);
        while(true){
            //只有当该事件到达时,Selector.select()会返回,否则一直阻塞。
            selector.select();
            //表示一组selectedKeys准备好了,接下来selector会对各个key对应的channel进行处理
            Set readyKeys = selector.selectedKeys();
            Iterator itr = readyKeys.iterator();
            while(itr.hasNext()){
                SelectionKey key = (SelectionKey) itr.next();
                itr.remove();
                if(key.isAcceptable()){
                    doAccept(key);
                }else if(key.isReadable()){
                    doRead(key);
                }else if(key.isWritable()){
                    doWrite(key);
                }
                
            }
        }
    }
    /**
     * 处理写操作
     * @param key
     * @throws IOException
     */
    private void doWrite(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        EchoClient echoClient = (EchoClient) key.attachment();
        LinkedList<ByteBuffer> outq = echoClient.getOutq();
        ByteBuffer bb = outq.getLast();
        int len = channel.write(bb);
        if(bb.remaining() == 0){
            outq.removeLast();
        }
        if(outq.size() == 0){
            key.interestOps(SelectionKey.OP_READ);
        }
    }
    /**
     * 处理读操作
     * @param key
     */
    private void doRead(SelectionKey key) {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer bb = ByteBuffer.allocate(8192);
        int len = 0;
        try {
            len = channel.read(bb);
            if(len < 0) return;
        } catch (IOException e) {
            e.printStackTrace();
        }
        bb.flip();
        //多线程处理数据,防止任务派发线程被阻塞
        tp.execute(new HandleMsg(key,bb));
    }
    /**
     * 处理连接操作
     * @param key
     */
    private void doAccept(SelectionKey key) {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client;
        try {
            client = server.accept();
            client.configureBlocking(false);
            //服务端由SelectionKey.OP_ACCEPT转为SelectionKey.OP_READ,等待客户端发信息
            SelectionKey clientKey = client.register(selector, SelectionKey.OP_READ);
            EchoClient echoClient = new EchoClient();
            clientKey.attach(echoClient);
            InetAddress clientAddress = client.socket().getInetAddress();
            System.out.println("Accepted connection from " + clientAddress);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws IOException {
        new NioServer().startServer();
    }
}

EchoClient.java代码

public class EchoClient {
    private LinkedList<ByteBuffer> outq;
    public EchoClient(){
        outq = new LinkedList<>();
    }

    public LinkedList<ByteBuffer> getOutq() {
        return outq;
    }
    public void enqueue(ByteBuffer bb){
        outq.addFirst(bb);
    }
}

HandleMsg.java代码

public class HandleMsg implements Runnable {
    SelectionKey key;
    ByteBuffer bb;
    public HandleMsg(SelectionKey key, ByteBuffer bb) {
        this.key = key;
        this.bb = bb;
    }
    @Override
    public void run() {
        EchoClient echoClient = (EchoClient) key.attachment();
        echoClient.enqueue(bb);
        if(key.isValid())
        key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
        else{
            key.cancel();
        }
        key.selector().wakeup();
    }
}

整个服务端的代码实现都是围绕着channel、selector和SelectionKey进行各种操作。
同样,客户端也可以用NIO来实现。

  • NIO版客户端代码
public class NioClient {
    private Selector selector;
    /**
     * 初始化channel
     * @param ip
     * @param port
     * @throws IOException
     */
    public void init(String ip, int port) throws IOException {
        selector = SelectorProvider.provider().openSelector();
        SocketChannel channel = SocketChannel.open();
        channel.configureBlocking(false);
        channel.connect(new InetSocketAddress(ip,port));
        channel.register(selector, SelectionKey.OP_CONNECT);
    }
    public void working() throws IOException {
        while(true){
            if(!selector.isOpen()){
                break;
            }
            selector.select();
            Set readyKeys = selector.selectedKeys();
            Iterator itr = readyKeys.iterator();
            while(itr.hasNext()){
                SelectionKey key = (SelectionKey) itr.next();
                itr.remove();
                if(key.isConnectable()){
                    connect(key);
                }else if(key.isReadable()){
                    read(key);
                }
            }
        }
    }
    private void read(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(100);
        channel.read(buffer);
        byte[] data = buffer.array();
        String msg = new String(data).trim();
        System.out.println("Client receive data : " + msg);
        channel.close();
        key.selector().close();
    }
    private void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        if(channel.isConnectionPending()){
            channel.finishConnect();
        }
        channel.configureBlocking(false);
        channel.write(ByteBuffer.wrap(new String("Hello Server!").getBytes()));
        channel.register(selector, SelectionKey.OP_READ);
    }
    public static void main(String[] args) throws IOException {
        NioClient nioClient = new NioClient();
        nioClient.init("localhost",8000);
        nioClient.working();
    }
}

2. 总结

NIO其实又有人称之为Non-blocking IO,顾名思义,它是同步非阻塞的IO。
同步是指线程不断轮询IO事件是否准备就绪,非阻塞是指线程在等待IO的时候可以去做其他的任务。
NIO同步的核心是selector,selector管理着很多channel,它负责轮询各个channel是否准备就绪,减少了很多不必要的线程消耗;非阻塞的核心则是通道和缓冲区,当channel准备就绪,数据就会被写到缓冲区,而无需线程阻塞地等待IO,这是和BIO的最大区别。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342