bytebuf的总结
1.首先区分heap和direct,两者最本质的区别就是承载数据的对象分别是数组和directbytebuffer,同时netty进行了优化
通过unsafe获取我们数组或者directbytebuffer的对象地址进行操作,速度更快。若发现不支持cleaner,则通过反射调用directbytebuffer的不包含cleaner的构造函数,释放堆外内存的时候,直接使用unsafe。
2.再来区分pool和unpool:首先unpool的时候调用的InstrumentedxxxxByteBuf对象实质上都是UnpooledxxxByteBuf的子类。而对于pool分配对象的时候是先尝试获取PoolThreadCache中的PoolArena,如果获取成功就依靠PoolArena分配对象,其根据是direct还是heap 选择directbytebuf或者是字节数组,如果没有PoolArena。则只能new一个UnpooledxxxByteBuf
ByteBuf的创建和release(direct heap pool unpool composite)
CompositeByteBuf 其offset是上一个component的endoffset 自己的endoffset是自身的offset+readableByte
创建
public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
if (alloc == null) {
throw new NullPointerException("alloc");
}
我们创建bytebuf对象采用alloc
this.alloc = alloc;
this.direct = direct;
最大的components
this.maxNumComponents = maxNumComponents;
存储bytebuf的集合继承arraylist
components = newList(0, maxNumComponents);
}
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
设置单个bytebuf的内存最大值
super(maxCapacity);
采用AtomicIntegerFieldUpdater--无锁化设置refCnt为1,
refCntUpdater.set(this, 1);
}
private static ComponentList newList(int initComponents, int maxNumComponents) {
获取合理的componentlist
int capacityGuess = Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents);
创建数组对象大小
return new ComponentList(Math.max(initComponents, capacityGuess));
}
添加
public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
添加bytebuf,increaseWriterIndex必须为true,如果不为true 那么我永远无法获取添加的bytebuf
addComponents0(increaseWriterIndex, components.size(), buffers, 0, buffers.length);
是否需要调整componet
consolidateIfNeeded();
return this;
}
private int addComponents0(boolean increaseWriterIndex, int cIndex, ByteBuf[] buffers, int offset, int len) {
checkNotNull(buffers, "buffers");
int i = offset;
try {
检测当前componet的index,这边检测到异常会直接抛出去,然后释放这些
bytebuf
checkComponentIndex(cIndex);
如果当前的其实位置小于
while (i < len) {
ByteBuf b = buffers[i++];
if (b == null) {
break;
}
存放
cIndex = addComponent0(increaseWriterIndex, cIndex, b) + 1;
int size = components.size();
因为add方法有可能失败所以这边要跟真正的size进行比较
if (cIndex > size) {
cIndex = size;
}
}
return cIndex;
} finally {
如果发生异常就释放着剩余的bytebuf
for (; i < len; ++i) {
ByteBuf b = buffers[i];
if (b != null) {
try {
b.release();
} catch (Throwable ignored) {
// ignore
}
}
}
}
}
private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
assert buffer != null;
标识是否添加成功
boolean wasAdded = false;
try {
如果发生异常直接抛出
checkComponentIndex(cIndex);
int readableBytes = buffer.readableBytes();
这段代码的意思是复制一个bytebuf
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
如果正好相等
if (cIndex == components.size()) {
添加
wasAdded = components.add(c);
如果cindex等于0,设置endoffset,标识整个componet的尾部offset
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
更新endOffset
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
如果不等于尾部,说明是插入到中介
components.add(cIndex, c);
wasAdded = true;
if (readableBytes != 0) {
更新offset
updateComponentOffsets(cIndex);
}
}
更新写索引
if (increaseWriterIndex) {
writerIndex(writerIndex() + buffer.readableBytes());
}
return cIndex;
} finally {
如果没有添加成功直接release
if (!wasAdded) {
buffer.release();
}
}
}
public void add(int index, E element) {
rangeCheckForAdd(index);
确保真实的存放bytebuf的list的length够大
ensureCapacityInternal(size + 1); // Increments modCount!!
这边通过复制来避免遍历插入(因为不是插入最后一位的,所以需要遍历)
System.arraycopy(elementData, index, elementData, index + 1,
size - index);
修改指定位置的元素
elementData[index] = element;
size++;
}
更新offset,因为是插入不是直接放在尾部,writeindex标识我们增加到哪了
readIndex标识我们读到哪了。
private void updateComponentOffsets(int cIndex) {
如果size小于cIndex 说明是直接插入的,我们不需要更新
int size = components.size();
if (size <= cIndex) {
return;
}
或这个cindex的Component
Component c = components.get(cIndex);
if (cIndex == 0) {
更新该Component的offset
c.offset = 0;
c.endOffset = c.length;
cIndex ++;
}
更新剩余的component的offset和endOffset
for (int i = cIndex; i < size; i ++) {
Component prev = components.get(i - 1);
Component cur = components.get(i);
cur.offset = prev.endOffset;
cur.endOffset = cur.offset + cur.length;
}
}
如果已经超过最大者了,那么把所有的component元素取出来,尝试塞入一个bytebuf中然后依次释放其他的
private void consolidateIfNeeded() {
final int numComponents = components.size();
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = allocBuffer(capacity);
// We're not using foreach to avoid creating an iterator.
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
}
}
写
public ByteBuf writeByte(int value) {
确保可写,且如果添加的字节长度超过了目前最后一个component的endoffset 则添加了一个新的component,并更新offset,否则就是从componentlist中寻找可以容下这么大值的component并更新,默认是从后往前找
ensureWritable0(1);
添加的value在writeIndex++
_setByte(writerIndex++, value);
return this;
}
public CompositeByteBuf setByte(int index, int value) {
根据这个index找到是哪个component
Component c = findComponent(index);
index标识在这个Component写到哪了,然后在offset写下内容,具体的看底层是依赖pooldirect和unpooldirect或者poolheap和unpoolheap
c.buf.setByte(index - c.offset, value);
return this;
}
读
public byte readByte() {
checkReadableBytes0(1);
更新readerIndex
int i = readerIndex;
底层还是依赖pooldirect和unpooldirect或者poolheap和unpoolheap
byte b = _getByte(i);
更新readerIndex
readerIndex = i + 1;
return b;
}
protected byte _getByte(int index) {
从component获取,这边不需要更新offset,只需要,更新readIndex即可
Component c = findComponent(index);
底层还是依赖pooldirect和unpooldirect或者poolheap和unpoolheap
return c.buf.getByte(index - c.offset);
}
pooldirect和unpooldirect
创建,读和写
pooldirect --创建
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
尝试从threadCache获取
PoolThreadCache cache = threadCache.get();
获取directArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
如果不为空则依靠directArena创建
if (directArena != null) {
这边留下面分析
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
创建一个新的为unpool的。。这是为什么
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
包装下检测内存是否泄漏
return toLeakAwareBuffer(buf);
}
static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
UnpooledUnsafeNoCleanerDirectByteBuf,选择一个不需要cleaner的构造通过反射生成directByteBuffer其回收内存是依靠UNSAFE.freeMemory(address),而 newUnsafeDirectByteBuf还是会生成cleaner去回收,cleaner是启动一个线程去调用。UNSAFE.freeMemory(address),前者关闭的更快,后者需要等gc把我们堆内的directbytebuffer对象回收后,才会回收堆外内存
if (PlatformDependent.useDirectBufferNoCleaner()) {
return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
}
public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
super(maxCapacity);
if (alloc == null) {
throw new NullPointerException("alloc");
}
if (initialCapacity < 0) {
throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
}
if (maxCapacity < 0) {
throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
}
if (initialCapacity > maxCapacity) {
throw new IllegalArgumentException(String.format(
"initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
}
this.alloc = alloc;
setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
}
private void setByteBuffer(ByteBuffer buffer) {
如果之前该对象存在bytebuff
ByteBuffer oldBuffer = this.buffer;
if (oldBuffer != null) {
if (doNotFree) {
等待下一次是否
doNotFree = false;
} else {
是否
freeDirect(oldBuffer);
}
}
this.buffer = buffer;
tmpNioBuf置为空,我们写入的数据的时候。采用tmpNioBuf,这样不会更改原始的position
tmpNioBuf = null;
capacity = buffer.remaining();
}
DirectByteBuffer(int cap) {
super(-1, 0, cap, cap);
boolean pa = VM.isDirectMemoryPageAligned();
获取pagesize的大小
int ps = Bits.pageSize();
size是分配的内存大小,cap是实际需要的大小
long size = Math.max(1L, (long)cap + (pa ? ps : 0));
计算是否有足够的内存,没有的话会调用gc,最后实在还是没有,则会抛出异常
Bits.reserveMemory(size, cap);
long base = 0;
try {
分配内存
base = unsafe.allocateMemory(size);
} catch (OutOfMemoryError x) {
Bits.unreserveMemory(size, cap);
throw x;
}
unsafe.setMemory(base, size, (byte) 0);
if (pa && (base % ps != 0)) {
内存的地址
address = base + ps - (base & (ps - 1));
} else {
address = base;
}
创建cleaner
cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
att = null;
}
写
public ByteBuf writeByte(int value) {
确保可写
ensureWritable0(1);
根据底层的不同实现分别采用数组或者directbytebuffer去写入
_setByte(writerIndex++, value);
return this;
}
pool扩容
public final ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
是否为unpooled,如果是则
if (chunk.unpooled) {
如果长度正好等于length直接返回 ,否则重新分配
if (newCapacity == length) {
return this;
}
} else {
pool的时候,当newCapacity 小于最大值时候直接返回
if (newCapacity > length) {
if (newCapacity <= maxLength) {
length = newCapacity;
return this;
}
} else if (newCapacity < length) {
if (newCapacity > maxLength >>> 1) {
if (maxLength <= 512) {
if (newCapacity > maxLength - 16) {
length = newCapacity;
重新分配writeIndex和readerIndex
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
} else { // > 512 (i.e. >= 1024)
length = newCapacity;
重新分配writeIndex和readerIndex
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
} else {
return this;
}
}
重新分配
chunk.arena.reallocate(this, newCapacity, true);
return this;
}
unpool扩容
public ByteBuf capacity(int newCapacity) {
checkNewCapacity(newCapacity);
int readerIndex = readerIndex();
int writerIndex = writerIndex();
int oldCapacity = capacity;
if (newCapacity > oldCapacity) {
ByteBuffer oldBuffer = buffer;
分配新的directbytebuffer
ByteBuffer newBuffer = allocateDirect(newCapacity);
调整position和limit
oldBuffer.position(0).limit(oldBuffer.capacity());
newBuffer.position(0).limit(oldBuffer.capacity());
复制
newBuffer.put(oldBuffer);
调整position为0 limit 为capacity
newBuffer.clear();
替换原始的bytebuffer
setByteBuffer(newBuffer);
} else if (newCapacity < oldCapacity) {
ByteBuffer oldBuffer = buffer;
因为是unpool 所以重新new一个 然后 调整大小
ByteBuffer newBuffer = allocateDirect(newCapacity);
如果readerIndex 小于newCapacity说明 还有一部分空间可读
if (readerIndex < newCapacity) {
如果writerIndex大于newCapacity 那么调整下writerIndex
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
设置最终可写的空间
oldBuffer.position(readerIndex).limit(writerIndex);
newBuffer.position(readerIndex).limit(writerIndex);
newBuffer.put(oldBuffer);
newBuffer.clear();
} else {
readerIndex 和writeindex都是一样 没有可读的空间了 但是可以继续写
setIndex(newCapacity, newCapacity);
}
setByteBuffer(newBuffer);
}
return this;
}
poolDirect,memory==directByteBuffer
protected void _setByte(int index, int value) {
memory.put(idx(index), (byte) value);
}
poolheap和unpoolheap 于direct一样 只是底层采用数组,也可以借助于unsafe 更快的操作数组
poolHeap,memory==字节数组
protected void _setByte(int index, int value) {
HeapByteBufUtil.setByte(memory, idx(index), value);
}
参考Hypercube的简书,介绍如何实现pool的思想
简介:Netty的PooledByteBuf采用与jemalloc一致的内存分配算法。可用这样的情景类比,想像一下当前电商的配送流程。当顾客采购小件商品(比如书籍)时,直接从同城仓库送出;当顾客采购大件商品(比如电视)时,从区域仓库送出;当顾客采购超大件商品(比如汽车)时,则从全国仓库送出。Netty的分配算法与此相似,稍有不同的是:在Netty中,小件商品和大件商品都首先从同城仓库(ThreadCache-tcache)送出;如果同城仓库没有,则会从区域仓库(Arena)送出,可参见下图:
对于商品分类,Netty根据每次请求分配内存的大小,将请求分为如下几类.
注意以下几点:
内存分配的最小单位为16B。
< 512B的请求为Tiny,< 8KB(PageSize)的请求为Small,<= 16MB(ChunkSize)的请求为Normal,> 16MB(ChunkSize)的请求为Huge。
< 512B的请求以16B为起点每次增加16B;>= 512B的请求则每次加倍。
不在表格中的请求大小,将向上规范化到表格中的数据,比如:请求分配511B、512B、513B,将依次规范化为512B、512B、1KB。
poolArena
为了提高内存分配效率并减少内部碎片,jemalloc算法将Arena切分为小块Chunk,根据每块的内存使用率又将小块组合为以下几种状态:QINIT,Q0,Q25,Q50,Q75,Q100。Chunk块可以在这几种状态间随着内存使用率的变化进行转移。
QINIT的内存使用率为[0,25)、Q0为(0,50)、Q100为[100,100]。
Chunk块的初始状态为QINIT,当使用率达到25时转移到Q0状态,再次达到50时转移到Q25,依次类推直到Q100;当内存释放时又从Q100转移到Q75,直到Q0状态且内存使用率为0时,该Chunk从Arena中删除。注意极端情况下,Chunk可能从QINIT转移到Q0再释放全部内存,然后从Arena中删除。
qInit, q000, q025, q050, q075, q100
0-0.25 0-0.5 0.5-0.75 0.50-1 0.75-1 100
Chunk和Page
虽然已将Arena切分为小块Chunk,但实际上Chunk是相当大的内存块,在jemalloc中建议为4MB,Netty默认使用16MB。为了进一步提高内存利用率并减少内部碎片,需要继续将Chunk切分为小的块Page。一个典型的切分将Chunk切分为2048块,Netty正是如此,可知Page的大小为:16MB/2048=8KB。一个好的内存分配算法,应使得已分配内存块尽可能保持连续,这将大大减少内部碎片,由此jemalloc使用伙伴分配算法尽可能提高连续性。伙伴分配算法的示意图如下:
图中最底层表示一个被切分为2048个Page的Chunk块。自底向上,每一层节点作为上一层的子节点构造出一棵满二叉树,然后按层分配满足要求的内存块。以待分配序列8KB、16KB、8KB为例分析分配过程(每个Page大小8KB):
8KB--需要一个Page,第11层满足要求,故分配2048节点即Page0;
16KB--需要两个Page,故需要在第10层进行分配,而1024的子节点2048已分配,从左到右找到满足要求的1025节点,故分配节点1025即Page2和Page3;
8KB--需要一个Page,第11层满足要求,2048已分配,从左到右找到2049节点即Page1进行分配。
分配结束后,已分配连续的Page0-Page3,这样的连续内存块,大大减少内部碎片并提高内存使用率。
SubPage
Netty中每个Page的默认大小为8KB,在实际使用中,很多业务需要分配更小的内存块比如16B、32B、64B等。为了应对这种需求,需要进一步切分Page成更小的SubPage。SubPage是jemalloc中内存分配的最小单位,不能再进行切分。SubPage切分的单位并不固定,以第一次请求分配的大小为单位(最小切分单位为16B)。比如,第一次请求分配32B,则Page按照32B均等切分为256块;第一次请求16B,则Page按照16B均等切分为512块。为了便于内存分配和管理,根据SubPage的切分单位进行分组,每组使用双向链表组合,示意图如下
红线标出的就是一个完整的page节点
其中每组的头结点head只用来标记该组的大小,之后的节点才是实际分配的SubPage节点。需要注意的是,这些节点正是上一节中满二叉树的叶子节点即一个Page。
poolArena
pool类型的bytebuf 分配内存的核心逻辑如下:
以pool堆外内存为例 接受内存分配的大概逻辑
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
通过PoolThreadLocalCache获取PoolThreadCache
PoolThreadCache cache = threadCache.get();
因为是堆外内存所以使用directArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
使用directArena分配内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
内存是否泄漏的检测
return toLeakAwareBuffer(buf);
}
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
从recycle获取PooledByteBuf
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
去获取堆外内存,即diirectbytebuffer
allocate(cache, buf, reqCapacity);
return buf;
}
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
根据是否支持unsafe,采用不同的方式生成directbytebuffer对象
if (HAS_UNSAFE) {
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
RECYCLER是对象池
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
分配最大内存
buf.reuse(maxCapacity);
return buf;
}
核心的分配内存池的逻辑
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
规范化请求容量
final int normCapacity = normalizeCapacity(reqCapacity);
是属于tiny和small 说明小于page,除了huge即超过chunksize的 是不在PoolThreadCache里面的,其他的逻辑都是
先尝试从PoolThreadCache分配 分配失败说明cache中的内存不够,那么在尝试从arena分配
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) {
从cache分配tiny
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
获取tiny的subpag的head节点
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
获取small的subpag的head节点
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
获取 PoolSubpag的head节点
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
分配内存(因为netty做的限制,即会规范内存所以我们的内存要么就是tiny 要么是small)
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
更新分配记录
incTinySmallAllocation(tiny);
return;
}
}
说明PoolSubpage还未初始化好 ,我们只能normal分配
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
更新分配记录
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
总结一下内存分配过程:
对于Tiny/Small、Normal大小的请求,优先从线程缓存中分配。
没有从缓存中得到分配的Tiny/Small请求,会从以第一次请求大小为基准进行分组的Subpage双向链表中进行分配;如果双向链表还没初始化,则会使用Normal请求分配Chunk块中的一个Page,Page以请求大小为基准进行切分并分配第一块内存,然后加入到双向链表中。
没有从缓存中得到分配的Normal请求,则会使用伙伴算法分配满足要求的连续Page块。
对于Huge请求,则直接使用Unpooled直接分配。
内存分配过程分析完毕,接着分析内存释放,内存释放的时候根据是否支持pool以及内存块的大小,来判断是否保存在poolthreadlocalcache 还是直接释放
PoolThreadCache
不分析了,累了
ResourceLeakTracker的原理
我们创建一个虚引用的leak,然后把该leak和bytebuf包装成SimpleLeakAwareByteBuf或者AdvancedLeakAwareByteBuf
当我们release的时候,如果引用计数器正常操作(比如开发没有操作失误),那么我们会自动的把bytebuf底层的数据承载给置空等待bytebuf对象被gc,在release之间 我们还会调用leak的clsoe 清除referent 并且删除allleak当中该对象,如果发生内存泄漏则 代表bytebuf 被回收了 但是其内部数组或者directbytebuffer 还存在,这就导致了可能存在内存泄漏。即我们没用正确的调用bytebuf的release方法
protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
ResourceLeakTracker<ByteBuf> leak;
根据检测类别来选择不同的方法
switch (ResourceLeakDetector.getLevel()) {
case SIMPLE:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new SimpleLeakAwareByteBuf(buf, leak);
}
break;
case ADVANCED:
case PARANOID:
leak = AbstractByteBuf.leakDetector.track(buf);
if (leak != null) {
buf = new AdvancedLeakAwareByteBuf(buf, leak);
}
break;
default:
break;
}
return buf;
}
private DefaultResourceLeak track0(T obj) {
Level level = ResourceLeakDetector.level;
如果是禁止的直接返回
if (level == Level.DISABLED) {
return null;
}
如果是 抽样级别的
if (level.ordinal() < Level.PARANOID.ordinal()) {
随机下
if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
如果有泄漏就是打印日志
reportLeak();
把该对象加入到集合中并保障成对象返回
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
return null;
}
reportLeak();
return new DefaultResourceLeak(obj, refQueue, allLeaks);
}
private void reportLeak() {
如果日志不给用,说明无法警告了 直接清除队列
if (!logger.isErrorEnabled()) {
clearRefQueue();
return;
}
// Detect and report previous leaks.
for (;;) {
@SuppressWarnings("unchecked")
DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
如果ref==null 说明队列已经没有元素
if (ref == null) {
break;
}
如果ref注销失败,说明存在内存泄漏,即release方法少调用,导致我们的bytebuf对象被gc了 但是其内部的字节数组或者directbytebuf还没有释放,而这部分内存根本没人用
if (!ref.dispose()) {
continue;
}
打印日志
String records = ref.toString();
if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
如果records为空 只打印resourceType
if (records.isEmpty()) {
reportUntracedLeak(resourceType);
} else {
reportTracedLeak(resourceType, records);
}
}
}
}
创建一个虚引用
DefaultResourceLeak(
Object referent,
ReferenceQueue<Object> refQueue,
ConcurrentMap<DefaultResourceLeak<?>, LeakEntry> allLeaks) {
传递一个我们要监控的对象和公共队列
super(referent, refQueue);
assert referent != null;
获取hash只
trackedHash = System.identityHashCode(referent);
存放在allLeaks表中
allLeaks.put(this, LeakEntry.INSTANCE);
更改record属性
headUpdater.set(this, new Record(Record.BOTTOM));
this.allLeaks = allLeaks;
}
Reference(T referent, ReferenceQueue<? super T> queue) {
this.referent = referent;
this.queue = (queue == null) ? ReferenceQueue.NULL : queue;
}
DirectByteBuffer内存回收(jdk,普通 ,有cleaner和无cleaner),注意这边的cleaner是netty的一个接口不是jdk的原生cleaner类,我们通过这个接口进行反射调用unsafe的freeMemory,而cleaner 只有jdk6和9支持
DirectByteBuffer:原生的jdk对象,与堆外内存交互,如果我们直接使用该对象,那么堆外内存会在,堆内对象DirectByteBuffer被gc后,依靠cleaner调用其内部的线程执行unsafe的freeMemory来回收内存
具体逻辑如下:创建directBytebuffer 会创建一个cleaner,cleaner内部包含这个directBytebuffer
同时会把该cleaner加入到一个cleaner链表的头部
当directBytebuffer 被gc cleaner 加入到队列里面
然后调用cleaner自身的clean方法
该方法先调用remove 吧cleaner从链表中删除
然后在调用cleaner携带的一个线程的run方法
该run方法内部通过unsafe是否堆外内存
UnpooledDirectByteBuf:netty对象,内部持有DirectByteBuffer,释放内存通过调用release-->dellocate-->CLEANER--->unsafe进行释放
UnpooledUnsafeNoCleanerDirectByteBuf:netty对象,内部持有DirectByteBuffer,释放内存通过调用release-->dellocate--->unsafe进行释放
UnpooledUnsafeDirectByteBuf:netty对象,内部持有DirectByteBuffer,释放内存通过调用release-->dellocate-->CLEANER--->unsafe进行释放
recycle---参考了这篇博客链接:https://www.jianshu.com/p/4eab8450560c,來源:简书
首先看图Recycle
提供的方法:
Recycler主要提供了3个方法:
get():获取一个对象。
recycle(T, Handle):回收一个对象,T为对象泛型。
newObject(Handle):当没有可用对象时创建对象的实现方法。
核心的四个类或接口
DefaultHandle:对象的包装类,在Recycler中缓存的对象都会包装成DefaultHandle类。
Stack:存储本线程回收的对象。对象的获取和回收对应Stack的pop和push,即获取对象时从Stack中pop出1个DefaultHandle,回收对象时将对象包装成DefaultHandle push到Stack中。Stack会与线程绑定,即每个用到Recycler的线程都会拥有1个Stack,在该线程中获取对象都是在该线程的Stack中pop出一个可用对象。
WeakOrderQueue:存储其它线程回收到本线程stack的对象,当某个线程从Stack中获取不到对象时会从WeakOrderQueue中获取对象。每个线程的Stack拥有1个WeakOrderQueue链表,链表每个节点对应1个其它线程的WeakOrderQueue,其它线程回收到该Stack的对象就存储在这个WeakOrderQueue里。
Link: WeakOrderQueue中包含1个Link链表,回收对象存储在链表某个Link节点里,当Link节点存储的回收对象满了时会新建1个Link放在Link链表尾。
----总结:
1.Link属于WeakOrderQueue的元素,Link本身持有是一个Defaulthandler数组
2.WeakHashMap,每个线程都一个,key是stack value是WeakOrderQueue
3.加入A线程的stack被线程b,c,d 拿到,因为thread(A的线程) == currentThread(b,c,d的线程),所以通过该b,c,d的WeakHashMap看看A的stack是否有对应的WeakOrderQueue,如果没有,就给他创建。然后b,c,d依次把对象归还到各自WeakHashMap对应A的stack的WeakOrderQueue中。所以加入有5个线程,那么每个线程中的最多可以包含四个WeakOrderQueue。
4.scavenge每次都是从head节点获取,如果获取不到就next,这些队列并不删除,jvm会自动在内存不够的时候删除
recycle 取得时候不需要加锁 因为其是从线程变量里面取 还的时候却有可能多线程还 所以采用多个队列 避免 加锁 同时队列可以无限长吗 什么时候消减,同时采用了弱引用避免队列过大,造成gc压力
源码如下:
get
public final T get() {
如果maxCapacityPerThread==0 说明不需要经过stack缓存,直接new对象,NOOP_HANDLE后期不会回收defaulthandle
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
获取当前线程的stack,如果没有就创建 且和当前线程绑定
Stack<T> stack = threadLocal.get();
从当前线程的stack拿DefaultHandle
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
如果为空说明 利用该stack生成一个handle,这个handler回收对象的时候会把value存放到stack中
handle = stack.newHandle();
然后生成一个对象,这个对象就是我们返回给调用者一个包装对象,内部有调用者想要的对象和对应的handle
handle.value = newObject(handle);
}
return (T) handle.value;
}
这边就是newObject一个实现类,可以看到对应的handle被这个对象给绑定 从而可以得到这个stack
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0);
}
};
DefaultHandle<T> pop() {
int size = this.size;
等于0代表当前stack的数据已经用完了
if (size == 0) {
从WeakOrderQueue获取
if (!scavenge()) {
return null;
}
size = this.size;
}
拿到数据从数组中删除
size --;
DefaultHandle ret = elements[size];
elements[size] = null;
如果不相等说明被多次回收了 抛出异常
if (ret.lastRecycledId != ret.recycleId) {
throw new IllegalStateException("recycled multiple times");
}
ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
return ret;
}
recycle
public final boolean recycle(T o, Handle<T> handle) {
if (handle == NOOP_HANDLE) {
return false;
}
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}
h.recycle(o);
return true;
}
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
private void pushNow(DefaultHandle<?> item) {
if ((item.recycleId | item.lastRecycledId) != 0) {
throw new IllegalStateException("recycled already");
}
item.recycleId = item.lastRecycledId = OWN_THREAD_ID;
int size = this.size;
if (size >= maxCapacity || dropHandle(item)) {
// Hit the maximum capacity or should drop - drop the possibly youngest object.
return;
}
if (size == elements.length) {
elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
}
elements[size] = item;
this.size = size + 1;
}
private void pushLater(DefaultHandle<?> item, Thread thread) {
这边是获取这个线程的map,持有的stack却是其他线程的
Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
WeakOrderQueue queue = delayedRecycled.get(this);
if (queue == null) {
if (delayedRecycled.size() >= maxDelayedQueues) {
// Add a dummy queue so we know we should drop the object
delayedRecycled.put(this, WeakOrderQueue.DUMMY);
return;
}
// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
// drop object
return;
}
delayedRecycled.put(this, queue);
} else if (queue == WeakOrderQueue.DUMMY) {
// drop object
return;
}
queue.add(item);
}
scavenge--每次都是从head节点获取,如果获取不到就next,这些队列并不删除,jvm会自动在内存不够的时候删除
boolean scavenge() {
// continue an existing scavenge, if any
if (scavengeSome()) {
return true;
}
// reset our scavenge cursor
prev = null;
cursor = head;
return false;
}
boolean scavengeSome() {
如果是第一次调用则cursor 和prev 都是null
WeakOrderQueue prev;
WeakOrderQueue cursor = this.cursor;
if (cursor == null) {
prev = null;
把cursor移到head
cursor = head;
如何还是为空代表整个WeakOrderQueue队列为空
if (cursor == null) {
return false;
}
} else {
cursor不为null 就是代表head节点不为null, prev代表光标cursor的前一个
prev = this.prev;
}
boolean success = false;
do {
将head节点中的数据转移到stack中
if (cursor.transfer(this)) {
success = true;
break;
}
转移失败找head节点的下一个
WeakOrderQueue next = cursor.next;
cursor.owner.get() == null 代表已经被内存回收
if (cursor.owner.get() == null) {
// If the thread associated with the queue is gone, unlink it, after
// performing a volatile read to confirm there is no data left to collect.
// We never unlink the first queue, as we don't want to synchronize on updating the head.
if (cursor.hasFinalData()) {
for (;;) {
if (cursor.transfer(this)) {
success = true;
} else {
break;
}
}
}
if (prev != null) {
prev.setNext(next);
}
} else {
prev = cursor;
}
cursor = next;
} while (cursor != null && !success);
this.prev = prev;
this.cursor = cursor;
return success;
}
boolean transfer(Stack<?> dst) {
Link head = this.head.link;
if (head == null) {
return false;
}
if (head.readIndex == LINK_CAPACITY) {
if (head.next == null) {
return false;
}
this.head.link = head = head.next;
}
final int srcStart = head.readIndex;
int srcEnd = head.get();
final int srcSize = srcEnd - srcStart;
if (srcSize == 0) {
return false;
}
final int dstSize = dst.size;
final int expectedCapacity = dstSize + srcSize;
if (expectedCapacity > dst.elements.length) {
final int actualCapacity = dst.increaseCapacity(expectedCapacity);
srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
}
if (srcStart != srcEnd) {
final DefaultHandle[] srcElems = head.elements;
final DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
for (int i = srcStart; i < srcEnd; i++) {
DefaultHandle element = srcElems[i];
if (element.recycleId == 0) {
element.recycleId = element.lastRecycledId;
} else if (element.recycleId != element.lastRecycledId) {
throw new IllegalStateException("recycled already");
}
srcElems[i] = null;
if (dst.dropHandle(element)) {
// Drop the object.
continue;
}
element.stack = dst;
dstElems[newDstSize ++] = element;
}
if (srcEnd == LINK_CAPACITY && head.next != null) {
// Add capacity back as the Link is GCed.
this.head.reclaimSpace(LINK_CAPACITY);
this.head.link = head.next;
}
head.readIndex = srcEnd;
if (dst.size == newDstSize) {
return false;
}
dst.size = newDstSize;
return true;
} else {
// The destination stack is full already.
return false;
}
}
}