传统IO和普通IO的区别
传统IO:面向流,阻塞IO(Blocking), selector
NIO:面向缓冲区,非阻塞IO(Non Blocking)
通道(channel负责传输)和缓冲区(Buffer负责数据的存储)。通道表示打开到IO设备的链接(文件,套接字等)。使用NIO需要获取链接IO设备的通道以及容纳数据的缓冲区,然后操作缓冲区对数据进行处理。
缓冲区
缓冲区:
在javaNIO中负责数据的存取。底层就是数组,用于存储不同数据类型的数据。根据数据类型不同,提供了相应类型的缓冲区(Boolean除外),
ByteBuffer
shortBuffer
IntBUffer
LongBuffer
FolatBuffer
DoubleBuffer
上述缓冲区的管理方式基本相同,通过allocate()获取相应的缓冲区
缓冲区核心方法:
-
put
:向缓冲区中存入数据 -
get
:从缓冲区中取出数据 -
flip
:切换模式(从写模式切换到读模式) -
rewind
:可重复读取数据,将position的值置为0 -
clear
:清空缓冲区,回到最初的状态,其中的数据并没有被清空。数据处于被遗忘的状态。 -
hasRemaining
:判断缓冲区中是否还有可以读取的数据
*remaining
:如果有可以读取的数据,看看还有多少个。
缓冲区核心属性:
-
capacity
:缓冲区中最大存储数据的容量,一旦声明不能改变。
*limit
:表示缓冲区中可以操作数据的大小,limit后面的数据是不能够进行读写的。 -
position
:表示换缓冲区中正在操作数据的位置。 -
mark:
标记,可以记录position的位置,可以通过reset恢复position的状态。
直接缓冲区和非直接缓冲区:
- 非直接缓冲区:通过
allocate()
方法获取缓冲区,将缓冲区建立在jvm的内存中。直接缓冲区只有ByteBuffer支持 - 直接缓冲区:通过
allocateDirect()
分配直接缓冲区,将换缓冲区建立在操作系统的物理内存中。
@Test
public void fun1(){
//创建指定大小缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
System.out.println("-------------------创建");
printBuffer(buffer);
System.out.println("-------------------存数据");
buffer.put("zxvbnmasdw".getBytes());
printBuffer(buffer);
System.out.println("-------------------切换模式");
buffer.flip();
printBuffer(buffer);
System.out.println("-------------------rewind");
buffer.rewind();
printBuffer(buffer);
}
private void printBuffer(Buffer buffer){
System.out.println("capacity :" + buffer.capacity());
System.out.println("limit :" + buffer.limit());
System.out.println("position :" + buffer.position());
}
通道(Channel)
用于源节点和目标节点的链接在javaNIO中负责缓冲区中数据的传输,通道本身不存储任何数据,需要配合缓冲区进行使用。
通道的主要实现类:
java.nio.channel
FileChannel
SocketChannel
ServerSocketChannel
DataGramChannel(UDP)
获取通道:
- java针对支持通道的类提供了getChannel方法
FileInputStrean FIleOutputStrean
RandomAccessFile
Socket
ServerSocket
DataGramSocket - jdk1.7中NIO2针对各个通道(FileChannel)提供了静态方法open()
- jdk1.7中NIO2的Files工具类的newByteChannel()方法
使用非直接缓冲区进行文件的复制
@Test
public void fun3() throws Exception{
FileInputStream in = new FileInputStream("src/file");
FileOutputStream out = new FileOutputStream("src/file2");
//获取相应的通道
FileChannel inChannel = in.getChannel();
FileChannel outCannel = out.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
while(inChannel.read(buffer) != -1){
buffer.flip();
outCannel.write(buffer);
buffer.clear();
}
}
使用直接缓冲区完成文件的复制(内存映射文件的方式)
@Test
public void fun4() throws IOException{
FileChannel inChannel = FileChannel.open(Paths.get("src/file"), StandardOpenOption.READ);
//StandardOpenOption.CREATE_NEW:不存在就创建,存在就报错
//StandardOpenOption.CREATE:不存在就创建,存在就覆盖
FileChannel outChannel = FileChannel.open(Paths.get("src/file3"), StandardOpenOption.WRITE, StandardOpenOption.READ,StandardOpenOption.CREATE);
//内存映射文件,和allocateDirect相同,只是获取方式不同。
ByteBuffer readBuffer = inChannel.map(MapMode.READ_ONLY, 0, inChannel.size());
ByteBuffer writeBuffer = outChannel.map(MapMode.READ_WRITE, 0, inChannel.size());
//直接对缓冲区中对数据进行操作
byte[] by = new byte[readBuffer.limit()];
readBuffer.get(by);
writeBuffer.put(by);
}
通道之间的数据传输(直接缓冲区的方式)
- transferTo
- transferFrom
@Test
public void fun5() throws Exception{
FileChannel inChannel = FileChannel.open(Paths.get("src/file"), StandardOpenOption.READ);
//StandardOpenOption.CREATE_NEW:不存在就创建,存在就报错
//StandardOpenOption.CREATE:不存在就创建,存在就覆盖
FileChannel outChannel = FileChannel.open(Paths.get("src/file4"),
StandardOpenOption.WRITE, StandardOpenOption.READ,StandardOpenOption.CREATE);
inChannel.transferTo(0, inChannel.size(), outChannel);
}
分散读取(scatter)和聚集写入(Gather)
- 分散读取:将通道中的数据按照缓存的次序依次读取缓存中(将缓冲区依次填满)
-
聚集写入:将缓存中的数据按照缓存的次序依次写入通道中
@Test
public void fun6() throws Exception{
FileInputStream in = new FileInputStream("src/file");
FileChannel inc = in.getChannel();
FileOutputStream out = new FileOutputStream("src/file5");
FileChannel outc = out.getChannel();
ByteBuffer buffer1 = ByteBuffer.allocate(100);
ByteBuffer buffer12 = ByteBuffer.allocate(100);
inc.read(new ByteBuffer[]{buffer1, buffer12});
outc.write(new ByteBuffer[]{buffer1, buffer12});
}
字符集
- 编码:将字符串装换成字节数组
- 解码:将字节数组装换成字符串
@Test
public void fun7() throws Exception{
//获取编码方式
Charset charset = Charset.forName("GBK");
//获取编码器
CharsetEncoder encoder = charset.newEncoder();
CharBuffer charBuffer = CharBuffer.allocate(1024);
CharBuffer buffer = charBuffer.put("哈哈你好");
buffer.flip();
ByteBuffer byteBuffer = encoder.encode(charBuffer);
//创建解码器
CharsetDecoder decoder = charset.newDecoder();
CharBuffer charBuffer2 = decoder.decode(byteBuffer);
System.out.println(charBuffer2.get());
}
网络通信
Socket阻塞式
当服务端接收到用户的请求,开启线程,通过accept阻塞这个线程等待客户端成功建立链接(包括客户端的数据全部发送到服务器段),成功之后才对数据进行操作。
- 通道(Channel),负责链接
java.nio.channels.Channel接口
抽象类selectorChannel(子类:SocketChannel, ServerSocketChannel,DataGramChannel。
Pipe.SinkChannel, Pipe.SourceChannel) - 2、缓冲区(Buffer),负责数据的存取
- 3、选择器(Selector),是selectorChannel的多路复用,用于检测selectorChannel的状况。
@Test
public void client() throws Exception{
//获取SocketChannel,通过此通道将客户端的数据写给服务端
SocketChannel clientChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 3000));
//创建Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//创建FIleChannel,用于将本地文件中的数据写入缓存中。
FileChannel fileChannel = new FileInputStream("src/file").getChannel();
//fileChannel.read(buffer):从文件中读取数据,往buffer中写入数据
while(fileChannel.read(buffer) != -1){
buffer.flip();
//clientChannel.write(buffer):从buffer中读取数据,往channel中写入数据
clientChannel.write(buffer);
buffer.clear();
}
clientChannel.shutdownOutput();
//接受服务端的反馈
int len = 0;
while((len = clientChannel.read(buffer)) != -1){
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
@Test
public void server() throws Exception{
ServerSocketChannel serverChannel = ServerSocketChannel.open();
//绑定链接端口号
serverChannel.bind(new InetSocketAddress(3000));
//获取客户端链接的通道
SocketChannel socketChannel = serverChannel.accept();
//接受客户端的数据保存在本地
//1、创建FIleChannel(将socket中的数据写入本地文件中)
FileChannel fileChannel = new FileOutputStream("src/file7").getChannel();
//创建buffer,
ByteBuffer buffer = ByteBuffer.allocate(1024);
while(socketChannel.read(buffer) != -1){
buffer.flip();
fileChannel.write(buffer);
buffer.clear();
}
buffer.put("请求收到".getBytes());
buffer.flip();
socketChannel.write(buffer);
}
Socket非阻塞
@Test
public void client() throws Exception{
SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 3000));
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
String str = scanner.next();
buffer.put(str.getBytes());
buffer.flip();
socketChannel.write(buffer);
buffer.clear();
}
}
@Test
public void server() throws Exception {
// 获取通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
// 设置为非阻塞
serverChannel.configureBlocking(false);
// 绑定端口
serverChannel.bind(new InetSocketAddress(3000));
// 创建连接器
Selector selector = Selector.open();
// 将连接器注册到channel,并设置监听事件(接受事件)
// SelectionKey.OP_CONNECT:链接状态
// SelectionKey.OP_READ:读状态
// SelectionKey.OP_WRITE:写状态
// SelectionKey.OP_ACCEPT:接受状态,当接受准备就绪,开始进行下一步操作
// 通过 | 进行链接可以监听多个状态
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 轮寻获取选择器上已经准备就绪的状态
while (selector.select() > 0) {
// 获取所有的监听Key
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isAcceptable()) {
// 若获取状态就绪,就获取客户端的链接
SocketChannel clientChannel = serverChannel.accept();
// 将客户端的链接设置为非阻塞状态
clientChannel.configureBlocking(false);
// 给该通道注册到选择器上,并设置状态为读就绪
clientChannel.register(selector, SelectionKey.OP_READ);
} else if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int len = 0;
while ((len = channel.read(buffer)) > 0) {
buffer.flip();
System.out.println(new String(buffer.array(), 0, len));
buffer.clear();
}
}
// 取消处理完了的选择建
iterator.remove();
}
}
}
dataGram非阻塞
@Test
public void client() throws Exception{
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(1024);
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()){
buffer.put(scanner.next().getBytes());
buffer.flip();
channel.send(buffer, new InetSocketAddress("127.0.0.1", 3000));
buffer.clear();
}
}
@Test
public void server() throws Exception{
DatagramChannel channel = DatagramChannel.open();
channel.configureBlocking(false);
channel.bind(new InetSocketAddress(3000));
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_READ);
while(selector.select() > 0){
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> iterator = keys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
if(key.isReadable()){
ByteBuffer buffer = ByteBuffer.allocate(1024);
channel.receive(buffer);
buffer.flip();
System.out.println(new String(buffer.array(), 0, buffer.limit()));
}
iterator.remove();
}
}
}
管道
/**
* 可以写入管道和读取管道放入到不同的线程中
* @throws Exception
*/
@Test
public void client() throws Exception{
//获取管道
Pipe pipe = Pipe.open();
//将缓冲区中的数据写入管道
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.put("发送数据".getBytes());
buffer.flip();
SinkChannel sink = pipe.sink();
sink.write(buffer);
buffer.clear();
//读取缓冲区中的数据
SourceChannel source = pipe.source();
int len = source.read(buffer);
System.out.println(new String(buffer.array(), 0, len));
}