从I/O模型到Netty(二)

Java NIO

在上一篇文章中对于I/O模型已经讲的比较清楚了,在I/O密集型应用中使用Reactor模式可以明显提高系统的性能(我们这里谈到的性能很大程度上指的是吞吐量),但是在具体的开发过程中模式还是要落地成真实的代码,使用传统的I/O库肯定是不行的,在Java中需要使用java.nio包下的库。

虽然是讲NIO的实现,但本文将不会把所有Java NIO中的主要API全部过一遍,而是通过例子理清NIO到底可以做什么事情。

本文中提到的JDK源代码都可以在%JAVA_HOME%/jre/lib/rt.jar中看到。

Java NIO最初在Java4中被引入,但是到今天还是有很大部分的开发者从来没使用过NIO的API,因为基础I/O已经能满足了我们日常的开发需求。但如果要开发I/O密集型应用的场景下,NIO可以明显的提升程序的性能,另外NIO与基础I/O有本质思想上的区别。
本文主要讲Java中的NIO,内容包含:

  1. Oracle官方对NIO的说法
  2. Java中NIO的历史进程
  3. NIO和NIO.2的区别在哪里
  4. NIO中的主要类的介绍
  5. 使用NIO的API构建一个Socket服务器

Oracle官方对NIO的说法

首先看看Oracle的官方文档中是怎么说的:

Java中对于I/O的支持主要包括java.iojava.nio两个包的内容,它们共同提供了如下特性:

  1. 通过数据流和序列化从文件系统中读取和写数据。
  2. 提供Charsets,解码器和编码器,用于在字节和Unicode字符之间的翻译。
  3. 访问文件、文件的属性、文件系统。
  4. 提供异步的或者非阻塞多路复用I/O的API,用于构建可扩展的服务器程序。

这里并没有提到网络I/O的东西,在Java1.4以前,网络I/O的API都是被放在java.net包下,在NIO中才被一起放入了java.nio包下。

Java中NIO的历史进程

  1. 最开始Java中使用I/O来访问文件系统只有通过java.io.File类来做,其中包含了一些对文件和目录基本的操作。对于开发中常碰到的I/O需求一般都能覆盖到,所以这也是日常开发工作中最常使用的I/O API。官方文档中称之为基础I/O(Basic I/O)。
    基础I/O是基于各种流的概念的,其基本模型就是上一篇中讲到的阻塞I/O。
  2. 为了进一步丰富I/O操作的API,也是为了提升在I/O密集型应用中的性能,基于Reactor模式,在Java1.4中引入了java.nio包,其中重点包含几个类:
  • java.nio.Buffer,用来存储各种缓冲数据的容器。
  • java.nio.channels.Channel,用于连接程序和I/O设备的数据通道。
  • java.nio.channels.Selector,多路复用选择器,在上一篇中讲到过。
  • java.nio.charset.Charset,用来编解码。
  1. 在Java7中引入了NIO.2,引入了一系列新的API(主要在新加入的包Java.nio.file),对于访问文件系统提供了更多的API实现,更加丰富的文件属性类,增加了一些异步I/O的API。同时,还添加了很多实用方法。

例如:以前简单的拷贝一个文件就必须要写一大堆的代码,现在实用java.nio.file.Files.copy(Path, Path, CopyOption...)就可以很轻松的做到了

NIO和NIO.2的区别在哪里

在上一节中已经简单介绍了这两个概念的不同,这里再简单罗列一下。NIO中引入的一个重要概念就是Reactor模式,而NIO.2对NIO本身不是一次升级,而是一次扩充,NIO.2中新增了很多实用方法(utilities),以支持更多的功能需求,并不是说能够提升多少的性能。主要增加了如下两点:

  1. 新的访问文件的API。
    访问文件从简单到复杂的方法

    Java.nio.file包和其子包中新增了大量的与访问文件相关的类,其中比较重要的有以下几个,更完整的更新可以在Oracle的官网文档中查看。
  • java.nio.file.Path,它可以用来取代早期的java.io.File用来访问文件。
  • java.nio.file.Files,其中包含了大量的对文件操作的API。
  1. 异步I/O的API
    在NIO原来的API的基础上,增加了对Proactor模式的支持,可以在包java.nio.channels中看到新加入的java.nio.channels.AsynchronousChanneljava.nio.channels.CompletionHandler<V, A>。使用这些类可以实现异步编程,如代码1中所示:

     //代码1
     //定义一个处理文件内容的函数式接口
     @FunctionalInterface
     static interface ProcessBuffer{
         void process(int result, ByteBuffer bb);
     }
     //递归地读取文件的全部内容
     static void readFileThrough(AsynchronousFileChannel ch, ProcessBuffer runn, int position) {
         
         ByteBuffer bb = ByteBuffer.allocate(512);
         ch.read(bb, position, null, new CompletionHandler<Integer, Object>() {
    
             @Override
             public void completed(Integer result, Object attachment) {
                 System.out.println("成功了");
                 bb.flip();
                 runn.process(result, bb);
                 bb.clear();
                 if (result == bb.capacity())
                     readFileThrough(ch, runn, position + result);
             }
    
             @Override
             public void failed(Throwable exc, Object attachment) {
                 System.err.println("失败了!!!");
             }
         });
     }
     //读取文件内容,并打印
     static void testAIOReadFile() throws IOException {
         Path p = Paths.get(fileDir, fileName);
         AsynchronousFileChannel channel = AsynchronousFileChannel.open(p, StandardOpenOption.READ);
    
         Thread daemon = new Thread(() -> {
             try {
                 System.out.println("守护");
                 Thread.sleep(10000);
             } catch (Exception e) {
             
             }
         });
     
         readFileThrough(channel, (result, bb) -> {
             if (result < bb.capacity()) {
                 System.out.println(new String(Arrays.copyOf(bb.array(), result)));
                 System.out.println("已读完。。。");
                 daemon.interrupt();
             }else {
                 System.out.print(new String(bb.array()));
             }
         }, 0);
     
         daemon.start();
     }
    

NIO中的主要类的介绍

NIO的基本思想是要构建一个Reactor模式的实现,具体落实到API,在Java中主要有以下几个类:

1. java.nio.Buffer

这是一个容器类,用来存储「基础数据类型」,所有从Channel中读取出来的数据都要使用Buffer的子类来作为存储单元,可以把它想象成一个带着很多属性的数组(和ArrayList很类似,其实它的实现机制也差不多就是这样)。

第一次看到介绍Buffer是在一本书上,书上画了好多方框和指向这些方框的属性值,看着就头晕。其实很简单,Buffer就是一个数组。

在读写交换时,必不可少的要批量地去读取并写入到目标对象,这个道理是不变的。在基础I/O中如果我们要把一个输入流写入一个输出流,可能会这么做:

//代码2
public static void copy(File src, File dest) throws IOException {
    FileInputStream in = new FileInputStream(src);
    FileOutputStream out = new FileOutputStream(dest);
    byte[] buffer = new byte[1024];
    int bytes = 0;
    while ((bytes = in.read(buffer)) > -1){
        out.write(buffer, 0, bytes);
    }
    out.close();
    in.close();
}

以上代码中使用了一个真实的数组用来做读写切换,从而达到批量(缓冲)读写的目标。
而在NIO中(如代码1),读写切换也同样是使用了一个数组进行暂存(缓冲),只不过在这个数组之上,封装了一些属性(java.nio.Buffer源码中的一些属性如代码3所示)和操作。

//代码3 - Buffer类中定义的一些属性
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;

关于Buffer类详细的继承关系和其主要方法,可以参考下图:


Buffer的继承关系

2. java.nio.channels.Channel

Channel可以看做是代码2中InputStream和OutStream的合体,在实际使用中,我们往往针对同一个I/O设备同时存在读和写的操作,在基础I/O中我们就需要针对同一个目标对象生成一个输入流和输出流的对象,可是在NIO中就可以只建立一个Channel对象了。

Channel抽象的概念是对于某个I/O设备的「连接」,可以使用这个连接进行一些I/O操作,java.nio.channels.Channel本身是一个接口,只有两个方法,但是在Java的的环境中,往往最简单的接口最烦人,因为它的实现类总是会异常的多。

//代码4 - 去除了所有注释的Channel类
package java.nio.channels;

import java.io.IOException;
import java.io.Closeable;

public interface Channel extends Closeable {

    public boolean isOpen();

    public void close() throws IOException;

}

当然,这是享受多态带来的好处的同时必须承受的。详细的Channel继承和实现关系如下:

Channel的继承和实现关系

3. java.nio.channels.Selector

如果你是使用NIO来做网络I/O,Selector是JavaNIO中最重要的类,正如它的注释里第一句说的,Selector是SelectableChannel的「多路复用器」。

SelectableChannel的实现类

多路复用,这是在上一篇介绍过的概念,在不同的操作系统也有不同的底层实现。用户也可以自己实现自己的Selector(通过类java.nio.channels.spi.SelectorProvider

//代码5 - provider构造方法
public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            //如果设置了属性java.nio.channels.spi.SelectorProvider,则会载入响应的类
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

如果你不实现自己的SelectorProvidor,在代码5中可以看到JDK会使用类sun.nio.ch.DefaultSelectorProvider来创建,这里会根据你的操作系统的类别不同而选择不同的实现类。openJDK中也有相应的实现,有兴趣的可以去GrepCode查看一下,Mac OS下是使用KQueueSelectorProvider

Selector的使用比较简单,同时要配合SelectionKey使用,它们的继承结构图也比较简单,如下:

Selector继承关系

4. 其他

其他一些类如Charset个人感觉属于实用性很强的类,但是在NIO与基础I/O的比较中就显得不那么重要了。

使用NIO的API构建一个Socket服务器

Java1.4引入的NIO中已经可以实现Reactor模式,在NIO.2中又引入了AIO的API,所以本节将分别使用两种模式来实现一个Socket服务器,这里重点介绍Java中NIO API的使用,至于NIO和基础I/O的性能对比,网上有很多,这里就不再做比较了。

首先定义一些基础类,将从Socket中获取的数据解析成TestRequest对象,然后再找到响应的Handler。看代码:

我这里为了偷懒,将很多基础类和方法定义在了一个类中,这种方法其实十分不可取。

//代码6 
/**
 * 执行计算工作的线程池
 */
private static ExecutorService workers = Executors.newFixedThreadPool(10);

/**
 * 解析出来的请求对象
 * @author lk
 *
 */
public static class TestRequest{
    
    /**
     * 根据解析到的method来获取响应的Handler
     */
    String method;
    String args;
    public static TestRequest parseFromString(String req) {
        System.out.println("收到请求:" + req);
        TestRequest request = new TestRequest();
        request.method = req.substring(0, 512);
        request.args = req.substring(512, req.length());
        return request;
    }
}


/**
 * 具体的逻辑需要实现此接口
 * @author lk
 *
 */
public static interface SockerServerHandler {
    ByteBuffer handle(TestRequest req);
}

主要的逻辑其实就是使用ServerSocketChannel的实例监听本地端口,并且设置其为非阻塞(默认为阻塞模式)。代码7中的parse()函数是一个典型的「使用Buffer读取Channel中数据」的方法,这里为了简(tou)单(lan),默认只读取1024个字节,所以并没有实际去循环读取。

//代码7
private static void useNIO() {
    Selector dispatcher = null;
    ServerSocketChannel serverChannel = null;
    try {
        dispatcher = Selector.open();
        serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        
        serverChannel.socket().setReuseAddress(true);
        serverChannel.socket().bind(LOCAL_8080);
        
        //ServerSocketChannel只支持这一种key,因为server端的socket只能去accept
        serverChannel.register(dispatcher, SelectionKey.OP_ACCEPT);
        
        while (dispatcher.select() > 0) {
            operate(dispatcher);
        }
        
    } catch (Exception e) {
        e.printStackTrace();
    }
}
/**
 * 在分发器上循环获取连接事件
 * @param dispatcher
 * @throws IOException
 */
private static void operate(Selector dispatcher) throws IOException {
    //Set<SelectionKey> keys = dispatcher.keys();
    Set<SelectionKey> keys = dispatcher.selectedKeys();
    Iterator<SelectionKey> ki = keys.iterator();
    while(ki.hasNext()) {
        SelectionKey key = ki.next();
        ki.remove();
        if (key.isAcceptable()) {
            ServerSocketChannel channel = (ServerSocketChannel) key.channel();
            //针对此socket的IO就是BIO了
            final SocketChannel socket = channel.accept();
            workers.submit(() -> {
                try {
                    
                    TestRequest request = TestRequest.parseFromString(parse(socket));
                    SockerServerHandler handler = (SockerServerHandler) Class.forName(getClassNameForMethod(request.method)).newInstance();
                
                    socket.write(handler.handle(request));
                    
                } catch (Exception e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }   
            });
        }
    }
}

private static String parse(SocketChannel socket) throws IOException {
    String req = null;
    
    try {
        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
        byte[] bytes;
        int count = 0;
        if ((count = socket.read(buffer)) >= 0) {
            buffer.flip();
            bytes = new byte[count];
            buffer.get(bytes);
            req = new String(bytes, Charset.forName("utf-8"));
            buffer.clear();
        }
        
    } finally {
        socket.socket().shutdownInput();
    }
    return req;
}

Java的程序有个通病,写出来的程序又臭又长,同样是使用JavaNIO的API实现一个非阻塞的Socket服务器,使用NIO.2中AIO(异步I/O)的API就很简单了,但是却陷入了回调地狱(当然可以通过别的方式避免回调,但是其本质还是一样的)。和上边介绍的Reactor模式相比,简直就是拿核武器比步枪,有点降维攻击的意味了。Reactor中那么复杂的概念和逻辑所实现的功能,使用AIO的API很轻松就搞定了,而且概念比较少,逻辑更清晰。

//代码8
private static void useAIO() {
    AsynchronousServerSocketChannel server;
    try {
        server = AsynchronousServerSocketChannel.open();
        server.bind(LOCAL_8080);
        while (true) {
            Future<AsynchronousSocketChannel> socketF = server.accept();
            try {
                final AsynchronousSocketChannel socket  = socketF.get();
                workers.submit(() -> {
                    try {
                        ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                        
                        socket.read(buffer, null, new CompletionHandler<Integer, Object>() {

                            @Override
                            public void completed(Integer count, Object attachment) {
                                byte[] bytes;
                                if (count >= 0) {
                                    buffer.flip();
                                    bytes = new byte[count];
                                    buffer.get(bytes);
                                    String req = new String(bytes, Charset.forName("utf-8"));
                                    TestRequest request = TestRequest.parseFromString(req);
                                    try {
                                        SockerServerHandler handler = (SockerServerHandler) Class.forName(getClassNameForMethod(request.method)).newInstance();
                                        ByteBuffer bb = handler.handle(request);
                                        socket.write(bb, null, null);
                                    } catch (InstantiationException | IllegalAccessException
                                            | ClassNotFoundException e) {
                                        // TODO Auto-generated catch block
                                        e.printStackTrace();
                                    }
                                    buffer.clear();
                                }
                            }

                            @Override
                            public void failed(Throwable exc, Object attachment) {
                                // TODO Auto-generated method stub
                                
                            }
                        });
                        
                        
                    } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } finally {
                        
                    }       
                });
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                break;
            }
        }
    } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

最后是测试用的客户端程序,NIO在客户端同样也可以发挥很重要的作用,这里就先略过了,代码9中客户端测试使用的是基础I/O:

//代码9
private volatile static int succ = 0;

public static void main(String[] args) throws UnknownHostException, IOException {
    CountDownLatch latch = new CountDownLatch(100);
    for (int i = 0; i < 100; i++) {
        new Thread( () -> {
            Socket soc;
            try {
                soc = new Socket("localhost", 8080);

                if (soc.isConnected()) {
                    OutputStream out = soc.getOutputStream();
                    byte[] req = "hello".getBytes("utf-8");
                    
                    out.write(Arrays.copyOf(req, 1024));
                    InputStream in = soc.getInputStream();
                    byte[] resp = new byte[1024];
                    in.read(resp, 0, 1024);
                    String result = new String(resp, "utf-8");
                    if (result.equals("haha")) {
                        succ++;
                    }
                    System.out.println(Thread.currentThread().getName() + "收到回复:" + result);
                    out.flush();
                    out.close();
                    in.close();
                    soc.close();
                }
                try {
                    System.out.println(Thread.currentThread().getName() + "去睡觉等待。。。");
                    latch.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        latch.countDown();
    }
    Runnable hook = () -> {
        System.out.println("成功个数:" + succ);
    };
    Runtime.getRuntime().addShutdownHook(new Thread(hook));
}

总结

原本只是想写一篇Netty在RPC框架中的使用,写着写着就写多了。本文从Java中引入NIO的历史讲起,梳理了Java对NIO支持的具体的API,最后通过一个典型的Socket服务器的例子具体的展示了Java中NIO相关API的使用,将Reactor模式和Proactor模式从理论落地到实际的代码。

由于作者比较懒,贴图全部都是在网上找的(代码大部分是自己写的),如侵删。下一篇将讲到比较火的一个NIO框架Netty的实现与使用。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,494评论 18 139
  • 零、写在前面 本文虽然是讲Netty,但实际更关注的是Netty中的NIO的实现,所以对于Netty中的OIO(O...
    TheAlchemist阅读 3,273评论 1 34
  • I/O是任何一个程序设计者都无法忽略的存在,很多高级编程语言都在尝试使用巧妙的设计屏蔽I/O的实际存在,减小它对程...
    TheAlchemist阅读 2,160评论 1 27
  • NIO(Non-blocking I/O,在Java领域,也称为New I/O),是一种同步非阻塞的I/O模型,也...
    闪电是只猫阅读 3,059评论 0 7
  • 六月初,淬泪殇;忧伤郁,韵月痕。 微凉的气息沁入心脾,随着呼吸蔓延全身,雨后清新混着泥土的新鲜味道让人迷醉。。 娇...
    走过岁月的朝夕阅读 163评论 0 0