零、本文纲要
一、NIO三大组件
- Channel
- Buffer
- Selector
二、Buffer
- 基础依赖
- ByteBuffer使用
- ByteBuffer结构
- ByteBuffer常见方法
三、Buffer使用模拟
- 情景模拟
- 模拟还原数据
一、NIO三大组件
NIO,non-blocking io 非阻塞 IO
Channel / Buffer / Selector
1. Channel
双向通道,可以从channel将数据读入buffer,也可以将buffer的数据写入channel;
与stream对比,stream是单向的,要么输入要么输出。
常见的Channel:
FileChannel / DatagramChannel / SocketChannel / ServerSocketChannel
2. Buffer
用来缓冲读写数据。
常见的Buffer:
ByteBuffer(MappedByteBuffer/DirectByteBuffer/HeapByteBuffer) /
ShortBuffer / IntBuffer / LongBuffer / FloatBuffer / DoubleBuffer / CharBuffer
3. Selector
① 多线程处理多个Socket连接
单个Thread对应单个Socket
内存占用高 / 线程上下文切换成本高 / 仅适合【连接数少】的场景
② 线程池处理多个Socket连接
单个Thread可以处理多个Socket
阻塞模式下线程只能处理一个Socket / 仅适合【短连接】的场景
③ selector配合线程处理多个Socket
selector 的作用就是配合一个线程来管理多个 channel,获取这些 channel 上发生的事件,这些 channel 工作在非阻塞模式下,不会让线程吊死在一个 channel 上。
适合连接数特别多,但流量低的场景(low traffic)。
调用 selector 的 select() 会阻塞直到 channel 发生了读写就绪事件,这些事件发生,select 方法就会返回这些事件交给 thread 来处理。
二、Buffer
0. 基础依赖
netty-all 4.1.39.Final
lombok 1.16.18
gson 2.8.5
guava 19.0
logback-classic 1.2.3
protobuf-java 3.11.3
1. ByteBuffer使用
a、向 buffer 写入数据,例如调用 channel.read(buffer)
b、调用 flip() 切换至读模式
c、从 buffer 读取数据,例如调用 buffer.get()
d、调用 clear() 或 compact() 切换至写模式
e、重复 1~4 步骤
try (RandomAccessFile file = new RandomAccessFile("src/main/resources/data.txt", "rw")) {
FileChannel channel = file.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(16);
do {
//1. 向 buffer 写入
int len = channel.read(buffer);
log.debug("读到的字节数:{}", len);
if (len == -1) {
break;
}
//2. 切换 buffer 读模式
buffer.flip();
while (buffer.hasRemaining()) {
log.debug("{}", (char) buffer.get());
}
//3. 切换 buffer 写模式
buffer.clear();
} while (true);
} catch (IOException e) {
log.info(e.getMessage());
}
2. ByteBuffer结构
// Creates a new buffer with the given mark, position, limit, capacity,
// backing array, and array offset
ByteBuffer(int mark, int pos, int lim, int cap, // package-private
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
mark 标记位
position 当前位
limit 界限位
capacity 容量
backing array 支撑数组
array offset 数组偏移
3. ByteBuffer常见方法
① allocate方法
用来给ByteBuffer分配空间
public static ByteBuffer allocate(int capacity) {
if (capacity < 0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
HeapByteBuffer(int cap, int lim) {...} //此时容量对应limit写上线
② channel#read方法 / buffer#put方法
向 buffer 写入数据
FileChannelImpl#read → IOUtil#readIntoNativeBuffer
public final ByteBuffer put(byte[] src) {
return put(src, 0, src.length);
}
public ByteBuffer put(byte[] src, int offset, int length) {
checkBounds(offset, length, src.length);
if (length > remaining())
throw new BufferOverflowException();
int end = offset + length;
for (int i = offset; i < end; i++)
this.put(src[i]);
return this;
}
③ filp方法
切换至【读模式】,重置position、limit,可从buffer中读取数据
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}
注意:
a、filp方法将 写limit定位到读limit,position重置为0,进而读取内容。
b、另外此时mark也会被清除。
④ hasRemaining方法
判断是否仍有剩余数据
public final boolean hasRemaining() {
return position < limit;
}
⑤ buffer#get方法 / channel#write
HeapByteBuffer#get → Buffer#nextGetIndex
FileChannel#write(ByteBuffer[] srcs)
get方法注意点:
a、会使 position 读指针向后走;
b、可以使用 rewind 方法,使 position 重置,而limit不变,用来重复度;
c、调用 get(int i) 方法获取索引 i 的内容,它不会移动读指针。
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}
注意:rewind方法会重置mark标记。
对比filp与rewind:后者 rewind 没有改变 limit指针 所指向的读上限。
⑥ clear方法
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}
注意:clear方法并没有清除内容,而是改变了指针的指向,提升了效率。
⑦ compact方法
HeapByteBuffer#compact
注意:compact方法允许我们未读完,而且可以在未读的后一个位置重新开始写。
⑧ mark方法 & reset方法
public final Buffer mark() {
mark = position;
return this;
}
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}
注意:mark方法与reset方法允许我们在任意mark位置重新读,rewind方法是从头开始。
⑨ 字符串 与 buffer 互相转换
ByteBuffer buffer = StandardCharsets.UTF_8.encode("StrToBuffer");
CharBuffer charBuffer = StandardCharsets.UTF_8.decode(buffer);
三、Buffer使用模拟
1. 情景模拟
网络通信:
a、客户端发送多条数据给服务端,数据间使用"\n"分隔;
b、数据接收时为了提升效率,数据会被服务端重新组合。
模拟数据为:
a、Hello, NIO.\n
b、I`m Stone.\n
c、How are you?\n
此时,服务器将数据重组,出现ByteBuffer (黏包,半包),如下:
a、Hello, NIO.\nI`m Stone.\nHo 【24bytes】
c、w are you?\n 【11bytes】
2. 模拟还原数据
省略了buffer动态扩容与收缩的业务逻辑,实际使用时,框架内一般会有代码实现。
@Slf4j
public class BufferDemo01 {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(32);
//1. 接收到第一组数据
//1.1 模拟接收到第一组数据
buffer.put("Hello, NIO.\nI`m Stone.\nHo".getBytes(StandardCharsets.UTF_8));
//1.2 处理第一组数据
split(buffer);
//2. 接收到第二组数据
//2.1 模拟接收到第二组数据
buffer.put("w are you?\n".getBytes(StandardCharsets.UTF_8));
//2.2 处理第二组数据
split(buffer);
}
public static void split(ByteBuffer buffer) {
//1. 切换至 读模式
buffer.flip();
//2. 记录当前 读上限
int originLimit = buffer.limit();
//3. 处理当前数据
for (int i = 0; i < originLimit; i++) {
//3.1 如果读取到的数据是规定的 分隔符"\n"
if (buffer.get(i) == '\n') {
log.debug("当前分隔符所在的位置:{},buffer.position():{}。", i, buffer.position());
ByteBuffer message = ByteBuffer.allocate(i + 1 - buffer.position());
buffer.limit(i + 1); //3.2 调整当前读上限为 message 容量
message.put(buffer); //3.3 从 buffer 读,向 message 写
//debugAll(message); //该方法是打印当前 message 的方法
buffer.limit(originLimit); //3.4 调整当前读上限为原先读originLimit
}
}
//4. 如果当前数据有剩余,则将当前数据拼接至下组数据
buffer.compact();
}
}
四、结尾
以上即为Netty基础-NIO(一)的全部内容,感谢阅读。