基于网络的应用程序都需要将接收到的数据先放入缓冲区,等一个数据包完整接收到了再传递给应用层。 大家都知道TCP是面向字节流的,发送方 send 了 n 字节,但接收方并不知道一次 read 操作收到了多少字节,可能是1,可能是n, 也可能是n-x 或 n+x (x 未知)。
发送数据也是一样,一个数据包可能只发送了一部分,剩余的放在缓冲区中在 socket 端口可写时通过 on_write 回调函数中继续发送。
这里缓冲区的设计就很有讲究,尽量避免不必要的内存分配和复制,以提高性能。简而言之,它可以是一个字节数据队列:
- 发送方缓冲:从队尾追加数据,从队头取出数据发送到 socket
- 接收方缓冲:从队头取出数据,从队尾接收从 socket 中的数据
最简单的方法就是开辟一块内存,比如一个大数组为缓冲区,设置一个读指针 readIndex,从readIndex 位置开始读一直读到 writeIndex, 一个写指针writeIndex,数据从writeIndex 开始写一直写到 capacity。
著名的 C++ 网络编程框架 ACE 中就有 ACE_Message_Block 的设计
ACE_Message_Block 主要有读指针,写指针,数据块(ACE_Data_Block), 和连接指针(指向下一个消息体),这样就会将收到的数据串成一个链表。
再以 Netty 中的 ByteBuf 为例详细了解一下其设计思想
1)0 ~ readIndex 为无效区域
2)readIndex ~ writeIndex 为可读区域
3)writeIndex ~ capacity 为可写区域
4)capacity ~ maxCapacity 为可扩容区域
具体实现类为 AbstractByteBuf 的各个子类,主要区别在于是不是使用了内存池,是不是在堆内
内存区域主要分两类:
- 堆内内存: heap 堆内存
- 堆外内存: direct 或 native 内存
ByteBuf 实现 | 内存池中? | 安全? | 堆内? |
---|---|---|---|
PooledHeapByteBuf | Y | Y | Y |
PooledUnsafeHeapByteBuf | Y | N | Y |
PooledDirectByteBuf | Y | Y | N |
PooledUnsafeDirectByteBuf | Y | N | N |
UnpooledHeapByteBuf | N | Y | Y |
UnpooledUnsafeHeapByteBuf | N | N | Y |
UnpooledDirectByteBuf | N | Y | N |
UnpooledUnsafeDirectByteBuf | N | N | N |
主要方法有
方法 | 说明 |
---|---|
capacity() | 容量=废弃的字节数+可读字节数+可写字节数 |
maxCapacity() | ByteBuf 最大所能容纳的最大字节数 |
isWritable() | ByteBuf 是否可写, capacity() > writerIndex |
writeBytes(byte[] src) | 写入字节 |
isReadable() | ByteBuf 是否可写, writerIndex > readerIndex |
readBytes(byte[] dst) | 读取字节 |
内存的分配是交由 ByteBufAllocator 来分配的
写段代码演示一下
public static void printBufferIndex(ByteBuf buffer, String message) {
log.info("# {} -> buffer: {}, readableTypes {}, writableBytes: {}, capacity: {}",
message, buffer, buffer.readableBytes(), buffer.writableBytes(), buffer.capacity());
}
@Test
public void testByteBuf() {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(12, 16);
buffer.writeBytes(new byte[] { 1 , 2, 3, 4, 5, 6});
printBufferIndex(buffer, "write 6 bytes");
assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 6);
buffer.writeBytes(new byte[] { 7, 8, 9, 10, 11 ,12, 13, 14, 15, 16});
printBufferIndex(buffer, "write 12 bytes");
assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 16);
int size = buffer.readableBytes();
byte[] output = new byte[size];
buffer.readBytes(output);
printBufferIndex(buffer, String.format("read %d bytes", size));
assertTrue(buffer.readerIndex() == 16 && buffer.writerIndex() == 16);
buffer.discardReadBytes();
printBufferIndex(buffer, "discardReadBytess");
assertTrue(buffer.readerIndex() == 0 && buffer.writerIndex() == 0);
}
执行结果如下
# 先写6个字节,readIndex = 0, writeIndex = 6
write 6 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 12/16), readableTypes 6, writableBytes: 6, capacity: 12
# 再写10个字节,readIndex = 0, writeIndex = 16
write 12 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 16, cap: 16/16), readableTypes 16, writableBytes: 0, capacity: 16
# 再读16个字节,readIndex = 16, writeIndex = 16
read 16 bytes -> buffer: PooledUnsafeDirectByteBuf(ridx: 16, widx: 16, cap: 16/16), readableTypes 0, writableBytes: 0, capacity: 16
# 已经读过的字节丢弃掉,readIndex = 0, writeIndex = 0
discardReadBytess -> buffer: PooledUnsafeDirectByteBuf(ridx: 0, widx: 0, cap: 16/16), readableTypes 0, writableBytes: 16, capacity: 16
零拷贝
除了通过读写指针来减少内存的复制,Netty 还应用了如下的技术来提高性能
Netty 接收及发送 ByteBuffer 用 DirectBuffer, 使用堆外直接内存进行 socket 读写,不需要进行字节缓冲区的二次拷贝
Netty 使用 ComposeByteBuffer ,可以聚合多个 ByteBuffer 对象,不需要通过内存拷贝的方式来合并几个小的 ByteBuffer 到一个大的 ByteBuffer
Netty 对于文件传输采用了 transferTo 方法,可以直接将文件缓冲区的数据发送到目标 Channel, 避免了通过循环 write() 的方式进行内存拷贝