最近面试一些公司,被问到的关于Java NIO编程的问题,以及自己总结的回答。
谈谈对Java IO的认识。
- 对于I/O操作来说, 其根本的作用在于传输数据。输入和输出指的仅是数据的流向,实际传输是通过某些具体的媒介来完成的,其中最主要的是文件系统和网络连接;
- 早期的java.io包把I/O操作抽象成数据的流动,进而有了流的概念;在Java NIO中,则把I/O操作抽象成端到端的一个数据连接,这就有了通道(channel)的概念;
- Java中最基本的流是在字节这个层次上进行操作的;在read方法的调用是阻塞的,这可能会成为应用中的瓶颈(可以通过available方法获取在不阻塞的情况下可以获取到的字节数);流无法重新使用,BufferedInputStream通过mark和reset操作可以实现流中部分内容的重复读取;另外一种重用输入流的方式是把它转换成数据来使用;
- 输出流是通过write方法把数据存放在缓冲区(缓冲区满了会自动执行写入),使用flush方法强制进行实际的写入操作;
- 其他常用流:FileInput(Output)Stream、ByteArrayInput(Output)Stream、字符流(new BufferedReader(new InputStreamReader(inputStream)));
介绍一下Java NIO中的Buffer、Channel和Selector的概念和作用。
- Java NIO的缓冲区:使用数组的方式不够灵活且性能差,Java NIO的缓冲区功能更加强大;容量(capacity)表示缓冲区的额定大小,需要在创建时指定(allocate静态方法);读写限制(limit)表示缓冲区在进行读写操作时的最大允许位置;读写位置(position)表示当前进行读写操作时的位置;缓冲区的很多操作(clear、flip、rewind)都是操作limit和position的值来实现重复读写;
- Java NIO的通道:channel表示为一个已经建立好的到支持I/O操作的实体(如文件和网络)的连接,在此连接上进行数据的读写操作,使用的是缓冲区来实现读写;
public void openAndWrite() throws IOException {
FileChannel channel = FileChannel.open(Paths.get("my.txt"), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
ByteBuffer buffer = ByteBuffer.allocate(64);
buffer.putChar('A').flip();
channel.write(buffer);
}
- Socket和ServerSocket类中提供的建立连接和数据传输相关的方法都是阻塞式的;对服务端通常使用线程池的方式来调用ServerSocket.accept方法来监听连接请求;Java NIO提供了非阻塞式和多路复用的套接字连接;
public void startSimpleServer() throws IOException{
ServerSocketChannel channel = ServerSocketChannel.open();
channel.bind(new InetSocketAddress("localhost", 10800));
while(true){
try(SocketChannel sc = channel.accept()){
sc.write(ByteBuffer.wrap("Hello".getBytes("UTF-8")));
}
}
}
- 套接字通道的多路复用的思想比较简单,通过一个专门的选择器(Selector)来同时对多个套接字通道进行监听;当其中的某些套接字通道上有它感兴趣的事件发生时,这些通道就会变为可用状态,可以在选择器的选择操作中被选中;可用通道的选择一般是通过操作系统提供的底层操作系统调用来实现的,性能也比较高;
public class LoadWebPageUseSelector {
// 通过Selector同时下载多个网页的内容
public void load(Set<URL> urls) throws IOException {
Map<SocketAddress, String> mapping = urlToSocketAddress(urls);
// 1. 创建Selector
Selector selector = Selector.open();
// 2. 将套接字Channel注册到Selector上
for (SocketAddress address : mapping.keySet()) {
register(selector, address);
}
int finished = 0;
int total = mapping.size();
ByteBuffer buffer = ByteBuffer.allocate(32 * 1024);
int len = -1;
while (finished < total) {
// 3. 调用select方法进行通道选择,该方法会阻塞,直到至少有一个他们所感兴趣的事件发生,然后可以通过selectedKeys获取被选中的通道的对象集合
selector.select();
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isValid() && key.isConnectable()) {
SocketChannel channel = (SocketChannel) key.channel();
// 4. 如果连接成功,则发送HTTP请求;失败则取消该连接;
boolean success = channel.finishConnect();
if (!success) {
finished++;
key.cancel();
} else {
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
String path = mapping.get(address);
String request = "GET" + path + "HTTP/1.0\r\n\r\nHost:" + address.getHostString() + "\r\n\r\n";
ByteBuffer header = ByteBuffer.wrap(request.getBytes("UTF-8"));
channel.write(header);
}
} else if (key.isValid() && key.isReadable()) {
// 5. 当channel处于可读时则读取channel的数据并写入文件
SocketChannel channel = (SocketChannel) key.channel();
InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress();
String filename = address.getHostName() + ".txt";
FileChannel destChannel = FileChannel.open(Paths.get(filename), StandardOpenOption.APPEND, StandardOpenOption.CREATE);
buffer.clear();
// 6. 当返回0时表示本次没有数据可读不需要操作;如果为-1则表示所有数据亿级读取完毕,可以关闭;
while ((len = channel.read(buffer)) > 0 || buffer.position() != 0) {
buffer.flip();
destChannel.write(buffer);
buffer.compact();
}
if (len == -1) {
finished++;
key.cancel();
}
}
}
}
}
private void register(Selector selector, SocketAddress address) throws IOException {
SocketChannel channel = SocketChannel.open();
// 设置为非阻塞模式
channel.configureBlocking(false);
channel.connect(address);
// 注册时需要指定感兴趣的事件类型
channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
}
private Map<SocketAddress, String> urlToSocketAddress(Set<URL> urls) {
Map<SocketAddress, String> mapping = new HashMap<>();
for (URL url : urls) {
int port = url.getPort() != -1 ? url.getPort() : url.getDefaultPort();
SocketAddress address = new InetSocketAddress(url.getHost(), port);
String path = url.getPath();
if (url.getQuery() != null) {
path = path + "?" + url.getQuery();
}
mapping.put(address, path);
}
return mapping;
}
}
Java 7的版本对Java NIO有哪些增强?
- Java 7中的NIO.2进一步增强,主要包括文件系统访问和异步I/O通道;
- 引入Path接口作为文件系统中路径的一种抽象,来代替之前字符串处理的方式,更加语义化;引入DirectoryStream来支持目录下子目录和文件的遍历,它的优势在于它渐进式地遍历,每次只读取一定数量的内容,从而可以降低遍历时的开销(DirectoryStream<Path> stream = Files.newDirectoryStream(path, "*.java"));如果要递归地遍历子目录下的子目录,对整个目录树进行遍历,可以使用FileVisitor;通过引入文件视图FileAttributeView来获取和设置文件的各种属性;另外还提供了新的目录监视服务,当指定目录下的子目录或文件被创建、更新或删除时可以得到事件通知;Files工具类提供了一系列静态方法可以满足常见的需求;
public void calculate() throws IOException, InterruptedException {
WatchService service = FileSystems.getDefault().newWatchService();
Path path = Paths.get("").toAbsolutePath();
path.register(service, StandardWatchEventKinds.ENTRY_CREATE);
while (true) {
WatchKey watchKey = service.take();
for (WatchEvent<?> event : watchKey.pollEvents()) {
Path createdPath = (Path) event.context();
createdPath = path.resolve(createdPath);
long size = Files.size(createdPath);
System.out.println(createdPath + "=>" + size);
}
watchKey.reset();
}
}
public void manipulateFiles() throws IOException {
Path newFile = Files.createFile(Paths.get("new.txt").toAbsolutePath());
List<String> content = Arrays.asList("Hello", "World");
Files.write(newFile, content, Charset.forName("UTF-8"));
Files.size(newFile);
byte[] bytes = Files.readAllBytes(newFile);
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
Files.copy(newFile, outputStream);
Files.delete(newFile);
}
- 异步I/O通道一般提供两种使用方式:一会走是通过Future类的对象来表示异步操作的结果,另外一种是在执行操作时传入一个CompletionHandler接口的实现对象作为操作完成时的回调方法;异步文件通道由AsynchronousFileChannel类表示,它没有当前读写位置的概念。
public void asyncWrite() throws IOException, ExecutionException, InterruptedException {
AsynchronousFileChannel channel = AsynchronousFileChannel.open(Paths.get("large.bin"),
StandardOpenOption.CREATE, StandardOpenOption.WRITE);
ByteBuffer buffer = ByteBuffer.allocate(32 * 1024 * 1024);
Future<Integer> result = channel.write(buffer, 0);
Integer len = result.get();
}
public void startAsyncSimpleServer() throws IOException {
AsynchronousChannelGroup group = AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
final AsynchronousServerSocketChannel serverChannel = AsynchronousServerSocketChannel.open(group).bind(new InetSocketAddress(10080));
serverChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) {
serverChannel.accept(null, this);
// 使用clientChannel
}
@Override
public void failed(Throwable exc, Void attachment) {
// 错误处理
}
});
}