(*文章基于Netty4.1.22版本)
整体介绍
前面介绍了PoolSubpage,PoolChunk,PoolChunkList,这在这之上还有个PoolArena,在这里面管理了前面3个结构,结构图如下:
包含了前面说的6个PoolChunkList,还有两个PoolSubpage数组,数组中的每个元素都是一个链表的头节点,在讲PoolSubpage的时候涉及的Head就是从PoolSubpage数组里取的
在代码中,PoolArena是个抽象类,其有两个实现:HeapArena和DirectArena,分别为堆内存和对外内存的实现,而核心的分配和释放算法还是在PoolArena中的
源码分析
初始化
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
//....省略部分代码
// numTinySubpagePools = 512 >>> 4 = 32
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 由于一个Chunk为8KB,即8192,即2的13次方,所以pageShitfs为13。
numSmallSubpagePools = pageShifts - 9;// 13-9 = 4
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 初始化ChunkList
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
// 6个ChunkList连接起来
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
}
可以看到初始化比较简单,主要看下newSubpagePoolHead,初始化以一个Page作为头节点
private PoolSubpage<T> newSubpagePoolHead(int pageSize) {
PoolSubpage<T> head = new PoolSubpage<T>(pageSize);
head.prev = head;
head.next = head;
return head;
}
分配
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// newByteBuf主要由HeapArena和DirectArena实现,返回对应的内存
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
allocate(cache, buf, reqCapacity);// 核心算法
return buf;
}
标准化分配内存大小
在请求了一定的数量的大小之后,会将这个值进行一次操作,使其符合标准,主要是在normalizeCapacity中处理的,主要分两个情况:
1.大于等于512的,则保证大小为大于等于512的最小的2的N次方的值
2.小于512的,最小为16,以16为倍数增加
int normalizeCapacity(int reqCapacity) {
if (reqCapacity >= chunkSize) {
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
分配Small和Tiny的内存
在分配的时候,都会先从线程缓存中取,如果取到了,那么就不直接分配,这个后续再进行详细分析
tiny
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {// 从缓存取,后续分析
return;
}
tableIdx = tinyIdx(normCapacity);// normCapacity >>> 4
table = tinySubpagePools;
} else {
// ....
}
- 获取index的时候为什么要右移4位?
我们知道tinySubpagePools有32个元素,且小于512的大小都是按照16倍增的,那么每一个大小右移四位就能对应数组里的每个位置,即496>>>=31,480>>>4=30等等
small
// ....
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
// ...
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {//同tiny
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// ....
// ....
static int smallIdx(int normCapacity) {
int tableIdx = 0;
int i = normCapacity >>> 10;
while (i != 0) {
i >>>= 1;
tableIdx ++;
}
return tableIdx;
}
除了获取index处理和tiny不太一样,其他类似。其实获取index的结果也是类似的,就是通过一个size获取其在数组中的位置,在tiny中每个增大一个单位,其在数组中对应的位置会往后移动一个位置,那么small也是一样。small在初始化的时候有4个,且我们知道pageSize为8192,那么这个4个位置分别存放512,1024,2048和4096,结构类似下图:
通过Chunk和Page进行分配
final PoolSubpage<T> head = table[tableIdx];
synchronized (head) {
final PoolSubpage<T> s = head.next;// 获取head之后的那个Page
if (s != head) {// 如果链表中有多个Page
//....
long handle = s.allocate();//使用page进行分配
//....
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);// 使用page对应的chunk进行配内存
//....
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
//....
return;
当获取到index和table之后,就可以取到对应的Page,因为初始化的时候,默认每个也数组都有一个头节点,next和prev都指向自己。
还记得初始化Page的时候吗?会将当前创建的Page加入到头节点中,那么如果s!=head,那么证明当前数组这个位置已经初始化过一次了,可以直接拿来分配节点。
分配完成后获取到handle(这个在讲Page的时候分析过,用来唯一标志分配位置的一个long值),然后回调用Chunk的方法初始化:
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
//bitmapIdx在分析Chunk的时候分析过
initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
//获取当前在Chunk中对于节点的index
int memoryMapIdx = memoryMapIdx(handle);
// 获取对于该节点下的page
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
// 初始化byteBuf
buf.init(
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
buf.init方法在PooledByteBuf中实现(以堆内存为例)
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, handle, offset, length, maxLength, cache);
}
void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, 0, chunk.offset, length, length, null);
}
private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
this.chunk = chunk;
memory = chunk.memory;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
}
将一开始创建的ByteBuf与handle,Chunk,memory关联,还有初始化offset,这几个有什么作用呢?
- handle:在ByteBuffer释放的时候,通过handle可以知道是在哪里分配的
- Chunk:释放的主要是通过调用Arena的free方法一层层往下释放,保存Chunk可以知道是哪个Arena
- memory:每个Chunk在初始化的时候都会创建好内存存放的结构,例如堆内存分配的话这个memory就是用来写数据的byte数组
- offset:在同一个Chunk中分配的都是共用一个memory,那么如何保证写和读的位置不会冲突呢?其实就是根据这个offset来的,每次分配的结果都有一个偏移量和长度,用来限制每个ByteBuf的操作范围
上面说的offset是如何确定的?
总的来说很简单,举个例子,例如:
- 分配1000的内存,那么offset=0,maxLength=1024
- 继续分配1000的内存,那么offset=1024,maxLength=1024
- 继续分配2048内存,那么此时由于2048这个Chunk的Page大小为1024,第一个Chunk无法分配给这2048的内存,那么就把2049这个Chunk切分成4个大小为2048的内存Page,offset为8192,因为叶子节点第一个Chunk大小为相对位移为 0~ 8192,第二个为8192~16384
当initBufWithSubpage调用完成后,就return了。而如果s==head,那么有两种情况:
- 还未初始化过,初始化话后就会在这下面挂一个Page,下次直接使用
- 该位置的Page已经分配完
那么如果是上面两种情况,就走allocateNormal的流程进行分配
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
qInit.add(c);
}
这里的分配和Page的分配有点类似,Page会在数组上找是否有可用的Page,有则使用,allocateNormal则是先在ChunkList中找是否能分配成功,能则直接使用ChunkList上的Chunk进行分配,否则会创建一个新的Chunk并加入到qInit这个ChunkList中,下次如果获取大小在ChunkList返回内(normCapacity < maxCapacity)的,可以直接走ChunkList
这里说明一下,在tiny和small的情况下,也会走到normal的分配allocateNormal,并不是说走到这个方法就不是tiny和small的情况,而是因为这两者都是Subpage类型的内存,而Subpage的内存必须要先初始化Chunk,因为他是Chunk内部的一个子结构,入口是Chunk,调用Chunk初始化的时候,其内部会去觉得用什么分配,上面说的tiny和small只是在Chunk前置了一个数组,分为small和tiny,这个不要混淆了
分配非Small和tiny的内存
如果需要的大小>=8KB<=chunkSize,也是走Chunk分配,和分配tiny和small的类似,会先从ThreadCache获取,没有在初始化一个Chunk
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
allocateHuge(buf, reqCapacity);
}
可以看到这种情况也是调用allocateNormal进行分配,其实无论是走Chunk还是走Page,都是需要去创建一个新的Chunk,因为Page是需要在Chunk管理之下的,在Chunk的内部才会去判断这个是否需要走更小的Page然后觉得是否需要初始化Page(Chunk中的allocateRun和allocateSubpage方法)
最后还有个分配大于chunkSize的方法,allocateHuge,其实和分配Chunk并无不同,只不过这个Chunk不是池化内存,最后释放会直接释放,只是额外记录了其他计量的数据
private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
PoolChunk<T> chunk = newUnpooledChunk(reqCapacity);
activeBytesHuge.add(chunk.chunkSize());
buf.initUnpooled(chunk, reqCapacity);
allocationsHuge.increment();
}
释放
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {// 非池化内存,allocateHuge分配的Chunk时,该值为true
int size = chunk.chunkSize();
destroyChunk(chunk);// 如果是堆分配,那么什么事情都没做,在外部会把Chunk设为null,GC回收
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {// 放回ThreadCache
return;
}
freeChunk(chunk, handle, sizeClass);
}
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
case Normal:
++deallocationsNormal;
break;
case Small:
++deallocationsSmall;
break;
case Tiny:
++deallocationsTiny;
break;
default:
throw new Error();
}
// 调用ChunkList的free方法
destroyChunk = !chunk.parent.free(chunk, handle);
}
if (destroyChunk) {
destroyChunk(chunk);
}
}