ByteBuf相关源码解析

bytebuf的总结
1.首先区分heap和direct,两者最本质的区别就是承载数据的对象分别是数组和directbytebuffer,同时netty进行了优化
通过unsafe获取我们数组或者directbytebuffer的对象地址进行操作,速度更快。若发现不支持cleaner,则通过反射调用directbytebuffer的不包含cleaner的构造函数,释放堆外内存的时候,直接使用unsafe。
2.再来区分pool和unpool:首先unpool的时候调用的InstrumentedxxxxByteBuf对象实质上都是UnpooledxxxByteBuf的子类。而对于pool分配对象的时候是先尝试获取PoolThreadCache中的PoolArena,如果获取成功就依靠PoolArena分配对象,其根据是direct还是heap 选择directbytebuf或者是字节数组,如果没有PoolArena。则只能new一个UnpooledxxxByteBuf

ByteBuf的创建和release(direct heap pool unpool composite)

CompositeByteBuf 其offset是上一个component的endoffset 自己的endoffset是自身的offset+readableByte

创建

    public CompositeByteBuf(ByteBufAllocator alloc, boolean direct, int maxNumComponents) {
        super(AbstractByteBufAllocator.DEFAULT_MAX_CAPACITY);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
我们创建bytebuf对象采用alloc
        this.alloc = alloc;
        this.direct = direct;
最大的components
        this.maxNumComponents = maxNumComponents;
存储bytebuf的集合继承arraylist
        components = newList(0, maxNumComponents);
    }

    protected AbstractReferenceCountedByteBuf(int maxCapacity) {
设置单个bytebuf的内存最大值
        super(maxCapacity);
采用AtomicIntegerFieldUpdater--无锁化设置refCnt为1,
        refCntUpdater.set(this, 1);
    }

   private static ComponentList newList(int initComponents, int maxNumComponents) {
获取合理的componentlist
        int capacityGuess = Math.min(AbstractByteBufAllocator.DEFAULT_MAX_COMPONENTS, maxNumComponents);
创建数组对象大小
        return new ComponentList(Math.max(initComponents, capacityGuess));
    }

添加

  public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers) {
添加bytebuf,increaseWriterIndex必须为true,如果不为true 那么我永远无法获取添加的bytebuf
        addComponents0(increaseWriterIndex, components.size(), buffers, 0, buffers.length);
是否需要调整componet
        consolidateIfNeeded();
        return this;
    }

 private int addComponents0(boolean increaseWriterIndex, int cIndex, ByteBuf[] buffers, int offset, int len) {
        checkNotNull(buffers, "buffers");
        int i = offset;
        try {
检测当前componet的index,这边检测到异常会直接抛出去,然后释放这些
bytebuf
            checkComponentIndex(cIndex);

   如果当前的其实位置小于
            while (i < len) {
      
                ByteBuf b = buffers[i++];
                if (b == null) {
                    break;
                }
存放
                cIndex = addComponent0(increaseWriterIndex, cIndex, b) + 1;
                int size = components.size();
因为add方法有可能失败所以这边要跟真正的size进行比较
                if (cIndex > size) {
                    cIndex = size;
                }
            }
            return cIndex;
        } finally {
如果发生异常就释放着剩余的bytebuf
            for (; i < len; ++i) {
                ByteBuf b = buffers[i];
                if (b != null) {
                    try {
                        b.release();
                    } catch (Throwable ignored) {
                        // ignore
                    }
                }
            }
        }
    }

private int addComponent0(boolean increaseWriterIndex, int cIndex, ByteBuf buffer) {
        assert buffer != null;
标识是否添加成功
        boolean wasAdded = false;
        try {
如果发生异常直接抛出
            checkComponentIndex(cIndex);

            int readableBytes = buffer.readableBytes();

这段代码的意思是复制一个bytebuf
            Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
如果正好相等
            if (cIndex == components.size()) {
添加
                wasAdded = components.add(c);
如果cindex等于0,设置endoffset,标识整个componet的尾部offset
                if (cIndex == 0) {
                    c.endOffset = readableBytes;
                } else {
更新endOffset
                    Component prev = components.get(cIndex - 1);
                    c.offset = prev.endOffset;
                    c.endOffset = c.offset + readableBytes;
                }
            } else {
如果不等于尾部,说明是插入到中介
                components.add(cIndex, c);
                wasAdded = true;
                if (readableBytes != 0) {
更新offset
                    updateComponentOffsets(cIndex);
                }
            }
更新写索引
            if (increaseWriterIndex) {
                writerIndex(writerIndex() + buffer.readableBytes());
            }
            return cIndex;
        } finally {
如果没有添加成功直接release
            if (!wasAdded) {
                buffer.release();
            }
        }
    }

 public void add(int index, E element) {
        rangeCheckForAdd(index);
 确保真实的存放bytebuf的list的length够大
        ensureCapacityInternal(size + 1);  // Increments modCount!!
这边通过复制来避免遍历插入(因为不是插入最后一位的,所以需要遍历)
        System.arraycopy(elementData, index, elementData, index + 1,
                         size - index);
修改指定位置的元素
        elementData[index] = element;
        size++;
    }
更新offset,因为是插入不是直接放在尾部,writeindex标识我们增加到哪了
readIndex标识我们读到哪了。
 private void updateComponentOffsets(int cIndex) {
如果size小于cIndex 说明是直接插入的,我们不需要更新
        int size = components.size();
        if (size <= cIndex) {
            return;
        }
或这个cindex的Component 
        Component c = components.get(cIndex);
        if (cIndex == 0) {
更新该Component的offset
            c.offset = 0;
            c.endOffset = c.length;
            cIndex ++;
        }
更新剩余的component的offset和endOffset

        for (int i = cIndex; i < size; i ++) {
            Component prev = components.get(i - 1);
            Component cur = components.get(i);
            cur.offset = prev.endOffset;
            cur.endOffset = cur.offset + cur.length;
        }
    }
如果已经超过最大者了,那么把所有的component元素取出来,尝试塞入一个bytebuf中然后依次释放其他的
 private void consolidateIfNeeded() {
   
        final int numComponents = components.size();

        if (numComponents > maxNumComponents) {
            final int capacity = components.get(numComponents - 1).endOffset;

            ByteBuf consolidated = allocBuffer(capacity);

            // We're not using foreach to avoid creating an iterator.
            for (int i = 0; i < numComponents; i ++) {
                Component c = components.get(i);
                ByteBuf b = c.buf;
                consolidated.writeBytes(b);
                c.freeIfNecessary();
            }
            Component c = new Component(consolidated);
            c.endOffset = c.length;
            components.clear();
            components.add(c);
        }
    }

 public ByteBuf writeByte(int value) {
确保可写,且如果添加的字节长度超过了目前最后一个component的endoffset 则添加了一个新的component,并更新offset,否则就是从componentlist中寻找可以容下这么大值的component并更新,默认是从后往前找
        ensureWritable0(1);
添加的value在writeIndex++
        _setByte(writerIndex++, value);
        return this;
    }

public CompositeByteBuf setByte(int index, int value) {
根据这个index找到是哪个component
        Component c = findComponent(index);
index标识在这个Component写到哪了,然后在offset写下内容,具体的看底层是依赖pooldirect和unpooldirect或者poolheap和unpoolheap
        c.buf.setByte(index - c.offset, value);
        return this;
    }

  public byte readByte() {
        checkReadableBytes0(1);
更新readerIndex
        int i = readerIndex;
底层还是依赖pooldirect和unpooldirect或者poolheap和unpoolheap
        byte b = _getByte(i);
更新readerIndex
        readerIndex = i + 1;
        return b;
    }

  protected byte _getByte(int index) {
从component获取,这边不需要更新offset,只需要,更新readIndex即可
        Component c = findComponent(index);
底层还是依赖pooldirect和unpooldirect或者poolheap和unpoolheap
        return c.buf.getByte(index - c.offset);
    }
pooldirect和unpooldirect

创建,读和写

pooldirect --创建
 protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
尝试从threadCache获取
        PoolThreadCache cache = threadCache.get();
获取directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
如果不为空则依靠directArena创建
        if (directArena != null) {
这边留下面分析
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
创建一个新的为unpool的。。这是为什么
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
包装下检测内存是否泄漏
        return toLeakAwareBuffer(buf);
    }

  static UnpooledUnsafeDirectByteBuf newUnsafeDirectByteBuf(
            ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
UnpooledUnsafeNoCleanerDirectByteBuf,选择一个不需要cleaner的构造通过反射生成directByteBuffer其回收内存是依靠UNSAFE.freeMemory(address),而 newUnsafeDirectByteBuf还是会生成cleaner去回收,cleaner是启动一个线程去调用。UNSAFE.freeMemory(address),前者关闭的更快,后者需要等gc把我们堆内的directbytebuffer对象回收后,才会回收堆外内存
        if (PlatformDependent.useDirectBufferNoCleaner()) {
            return new UnpooledUnsafeNoCleanerDirectByteBuf(alloc, initialCapacity, maxCapacity);
        }
        return new UnpooledUnsafeDirectByteBuf(alloc, initialCapacity, maxCapacity);
    }

  public UnpooledDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        if (initialCapacity < 0) {
            throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
        }
        if (maxCapacity < 0) {
            throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
        }
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(ByteBuffer.allocateDirect(initialCapacity));
    }


    private void setByteBuffer(ByteBuffer buffer) {
如果之前该对象存在bytebuff
        ByteBuffer oldBuffer = this.buffer;
        if (oldBuffer != null) {
            if (doNotFree) {
等待下一次是否
                doNotFree = false;
            } else {
是否
                freeDirect(oldBuffer);
            }
        }

        this.buffer = buffer;
tmpNioBuf置为空,我们写入的数据的时候。采用tmpNioBuf,这样不会更改原始的position
        tmpNioBuf = null;
        capacity = buffer.remaining();
    }


 DirectByteBuffer(int cap) {                  

        super(-1, 0, cap, cap);
        boolean pa = VM.isDirectMemoryPageAligned();
获取pagesize的大小
        int ps = Bits.pageSize();
size是分配的内存大小,cap是实际需要的大小
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
计算是否有足够的内存,没有的话会调用gc,最后实在还是没有,则会抛出异常
        Bits.reserveMemory(size, cap);

        long base = 0;
        try {
分配内存
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        if (pa && (base % ps != 0)) {
     内存的地址
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
创建cleaner
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;

    }
写
   public ByteBuf writeByte(int value) {
确保可写
        ensureWritable0(1);
根据底层的不同实现分别采用数组或者directbytebuffer去写入
        _setByte(writerIndex++, value);
        return this;
    }
pool扩容
  public final ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

       是否为unpooled,如果是则
        if (chunk.unpooled) {
如果长度正好等于length直接返回 ,否则重新分配
            if (newCapacity == length) {
                return this;
            }
        } else {
pool的时候,当newCapacity 小于最大值时候直接返回
            if (newCapacity > length) {
                if (newCapacity <= maxLength) {
                    length = newCapacity;
                    return this;
                }
            } else if (newCapacity < length) {
                if (newCapacity > maxLength >>> 1) {
                    if (maxLength <= 512) {
                        if (newCapacity > maxLength - 16) {
                            length = newCapacity;
重新分配writeIndex和readerIndex
                            setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                            return this;
                        }
                    } else { // > 512 (i.e. >= 1024)
                        length = newCapacity;
重新分配writeIndex和readerIndex
                        setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
                        return this;
                    }
                }
            } else {
                return this;
            }
        }

重新分配
        chunk.arena.reallocate(this, newCapacity, true);
        return this;
    }
unpool扩容

   public ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        int readerIndex = readerIndex();
        int writerIndex = writerIndex();

        int oldCapacity = capacity;
        if (newCapacity > oldCapacity) {
            ByteBuffer oldBuffer = buffer;
分配新的directbytebuffer
            ByteBuffer newBuffer = allocateDirect(newCapacity);
调整position和limit
            oldBuffer.position(0).limit(oldBuffer.capacity());
            newBuffer.position(0).limit(oldBuffer.capacity());
复制
            newBuffer.put(oldBuffer);
调整position为0 limit 为capacity
            newBuffer.clear();
替换原始的bytebuffer
            setByteBuffer(newBuffer);
        } else if (newCapacity < oldCapacity) {
            ByteBuffer oldBuffer = buffer;
因为是unpool 所以重新new一个 然后 调整大小
            ByteBuffer newBuffer = allocateDirect(newCapacity);
如果readerIndex 小于newCapacity说明 还有一部分空间可读
            if (readerIndex < newCapacity) {
如果writerIndex大于newCapacity 那么调整下writerIndex
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
设置最终可写的空间
                oldBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.position(readerIndex).limit(writerIndex);
                newBuffer.put(oldBuffer);
                newBuffer.clear();
            } else {
readerIndex 和writeindex都是一样 没有可读的空间了 但是可以继续写
                setIndex(newCapacity, newCapacity);
            }
            setByteBuffer(newBuffer);
        }
        return this;
    }

poolDirect,memory==directByteBuffer
 protected void _setByte(int index, int value) {
        memory.put(idx(index), (byte) value);
    }

poolheap和unpoolheap 于direct一样 只是底层采用数组,也可以借助于unsafe 更快的操作数组
poolHeap,memory==字节数组
protected void _setByte(int index, int value) {
        HeapByteBufUtil.setByte(memory, idx(index), value);
    }

参考Hypercube的简书,介绍如何实现pool的思想

简介:Netty的PooledByteBuf采用与jemalloc一致的内存分配算法。可用这样的情景类比,想像一下当前电商的配送流程。当顾客采购小件商品(比如书籍)时,直接从同城仓库送出;当顾客采购大件商品(比如电视)时,从区域仓库送出;当顾客采购超大件商品(比如汽车)时,则从全国仓库送出。Netty的分配算法与此相似,稍有不同的是:在Netty中,小件商品和大件商品都首先从同城仓库(ThreadCache-tcache)送出;如果同城仓库没有,则会从区域仓库(Arena)送出,可参见下图:


微信截图_20181106130506.png

对于商品分类,Netty根据每次请求分配内存的大小,将请求分为如下几类.


请求的大小.png

注意以下几点:

内存分配的最小单位为16B。
< 512B的请求为Tiny,< 8KB(PageSize)的请求为Small,<= 16MB(ChunkSize)的请求为Normal,> 16MB(ChunkSize)的请求为Huge。
< 512B的请求以16B为起点每次增加16B;>= 512B的请求则每次加倍。
不在表格中的请求大小,将向上规范化到表格中的数据,比如:请求分配511B、512B、513B,将依次规范化为512B、512B、1KB。

poolArena
为了提高内存分配效率并减少内部碎片,jemalloc算法将Arena切分为小块Chunk,根据每块的内存使用率又将小块组合为以下几种状态:QINIT,Q0,Q25,Q50,Q75,Q100。Chunk块可以在这几种状态间随着内存使用率的变化进行转移。
QINIT的内存使用率为[0,25)、Q0为(0,50)、Q100为[100,100]。
Chunk块的初始状态为QINIT,当使用率达到25时转移到Q0状态,再次达到50时转移到Q25,依次类推直到Q100;当内存释放时又从Q100转移到Q75,直到Q0状态且内存使用率为0时,该Chunk从Arena中删除。注意极端情况下,Chunk可能从QINIT转移到Q0再释放全部内存,然后从Arena中删除。
qInit, q000, q025, q050, q075, q100
0-0.25 0-0.5 0.5-0.75 0.50-1 0.75-1 100

Chunk和Page
虽然已将Arena切分为小块Chunk,但实际上Chunk是相当大的内存块,在jemalloc中建议为4MB,Netty默认使用16MB。为了进一步提高内存利用率并减少内部碎片,需要继续将Chunk切分为小的块Page。一个典型的切分将Chunk切分为2048块,Netty正是如此,可知Page的大小为:16MB/2048=8KB。一个好的内存分配算法,应使得已分配内存块尽可能保持连续,这将大大减少内部碎片,由此jemalloc使用伙伴分配算法尽可能提高连续性。伙伴分配算法的示意图如下:


伙伴分配算法.png

图中最底层表示一个被切分为2048个Page的Chunk块。自底向上,每一层节点作为上一层的子节点构造出一棵满二叉树,然后按层分配满足要求的内存块。以待分配序列8KB、16KB、8KB为例分析分配过程(每个Page大小8KB):

8KB--需要一个Page,第11层满足要求,故分配2048节点即Page0;
16KB--需要两个Page,故需要在第10层进行分配,而1024的子节点2048已分配,从左到右找到满足要求的1025节点,故分配节点1025即Page2和Page3;
8KB--需要一个Page,第11层满足要求,2048已分配,从左到右找到2049节点即Page1进行分配。
分配结束后,已分配连续的Page0-Page3,这样的连续内存块,大大减少内部碎片并提高内存使用率。

SubPage
Netty中每个Page的默认大小为8KB,在实际使用中,很多业务需要分配更小的内存块比如16B、32B、64B等。为了应对这种需求,需要进一步切分Page成更小的SubPage。SubPage是jemalloc中内存分配的最小单位,不能再进行切分。SubPage切分的单位并不固定,以第一次请求分配的大小为单位(最小切分单位为16B)。比如,第一次请求分配32B,则Page按照32B均等切分为256块;第一次请求16B,则Page按照16B均等切分为512块。为了便于内存分配和管理,根据SubPage的切分单位进行分组,每组使用双向链表组合,示意图如下


subpage.png

红线标出的就是一个完整的page节点
其中每组的头结点head只用来标记该组的大小,之后的节点才是实际分配的SubPage节点。需要注意的是,这些节点正是上一节中满二叉树的叶子节点即一个Page。

poolArena

pool类型的bytebuf 分配内存的核心逻辑如下:

      以pool堆外内存为例 接受内存分配的大概逻辑
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    通过PoolThreadLocalCache获取PoolThreadCache
        PoolThreadCache cache = threadCache.get();
        因为是堆外内存所以使用directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
        使用directArena分配内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
        内存是否泄漏的检测
        return toLeakAwareBuffer(buf);
    }
    
     PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
     从recycle获取PooledByteBuf
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        去获取堆外内存,即diirectbytebuffer
        allocate(cache, buf, reqCapacity);
        return buf;
    }
    
          protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
          根据是否支持unsafe,采用不同的方式生成directbytebuffer对象
            if (HAS_UNSAFE) {
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }
        
        
            static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
            RECYCLER是对象池
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        分配最大内存
        buf.reuse(maxCapacity);
        return buf;
    }

    核心的分配内存池的逻辑
     private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
       规范化请求容量
        final int normCapacity = normalizeCapacity(reqCapacity);
        是属于tiny和small 说明小于page,除了huge即超过chunksize的 是不在PoolThreadCache里面的,其他的逻辑都是
        先尝试从PoolThreadCache分配 分配失败说明cache中的内存不够,那么在尝试从arena分配
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) {
            从cache分配tiny
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                获取tiny的subpag的head节点
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                获取small的subpag的head节点
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
            获取 PoolSubpag的head节点
            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    分配内存(因为netty做的限制,即会规范内存所以我们的内存要么就是tiny 要么是small)
                    s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
                    更新分配记录
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            说明PoolSubpage还未初始化好 ,我们只能normal分配
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }
更新分配记录
            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }

总结一下内存分配过程:

对于Tiny/Small、Normal大小的请求,优先从线程缓存中分配。
没有从缓存中得到分配的Tiny/Small请求,会从以第一次请求大小为基准进行分组的Subpage双向链表中进行分配;如果双向链表还没初始化,则会使用Normal请求分配Chunk块中的一个Page,Page以请求大小为基准进行切分并分配第一块内存,然后加入到双向链表中。
没有从缓存中得到分配的Normal请求,则会使用伙伴算法分配满足要求的连续Page块。
对于Huge请求,则直接使用Unpooled直接分配。
内存分配过程分析完毕,接着分析内存释放,内存释放的时候根据是否支持pool以及内存块的大小,来判断是否保存在poolthreadlocalcache 还是直接释放

PoolThreadCache

不分析了,累了

ResourceLeakTracker的原理

我们创建一个虚引用的leak,然后把该leak和bytebuf包装成SimpleLeakAwareByteBuf或者AdvancedLeakAwareByteBuf
当我们release的时候,如果引用计数器正常操作(比如开发没有操作失误),那么我们会自动的把bytebuf底层的数据承载给置空等待bytebuf对象被gc,在release之间 我们还会调用leak的clsoe 清除referent 并且删除allleak当中该对象,如果发生内存泄漏则 代表bytebuf 被回收了 但是其内部数组或者directbytebuffer 还存在,这就导致了可能存在内存泄漏。即我们没用正确的调用bytebuf的release方法

  protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
        ResourceLeakTracker<ByteBuf> leak;
        根据检测类别来选择不同的方法
        switch (ResourceLeakDetector.getLevel()) {
            case SIMPLE:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new SimpleLeakAwareByteBuf(buf, leak);
                }
                break;
            case ADVANCED:
            case PARANOID:
                leak = AbstractByteBuf.leakDetector.track(buf);
                if (leak != null) {
                    buf = new AdvancedLeakAwareByteBuf(buf, leak);
                }
                break;
            default:
                break;
        }
        return buf;
    }
    
        private DefaultResourceLeak track0(T obj) {
        Level level = ResourceLeakDetector.level;
        如果是禁止的直接返回
        if (level == Level.DISABLED) {
            return null;
        }
如果是 抽样级别的
        if (level.ordinal() < Level.PARANOID.ordinal()) {
        随机下
            if ((PlatformDependent.threadLocalRandom().nextInt(samplingInterval)) == 0) {
            如果有泄漏就是打印日志
                reportLeak();
                把该对象加入到集合中并保障成对象返回
                return new DefaultResourceLeak(obj, refQueue, allLeaks);
            }
            return null;
        }
        reportLeak();
        return new DefaultResourceLeak(obj, refQueue, allLeaks);
    }
    
    
     private void reportLeak() {
     如果日志不给用,说明无法警告了 直接清除队列
        if (!logger.isErrorEnabled()) {
            clearRefQueue();
            return;
        }

        // Detect and report previous leaks.
        for (;;) {
            @SuppressWarnings("unchecked")
            DefaultResourceLeak ref = (DefaultResourceLeak) refQueue.poll();
            如果ref==null 说明队列已经没有元素
            if (ref == null) {
                break;
            }
            如果ref注销失败,说明存在内存泄漏,即release方法少调用,导致我们的bytebuf对象被gc了 但是其内部的字节数组或者directbytebuf还没有释放,而这部分内存根本没人用
            if (!ref.dispose()) {
                continue;
            }
            打印日志
            String records = ref.toString();
            if (reportedLeaks.putIfAbsent(records, Boolean.TRUE) == null) {
            如果records为空 只打印resourceType
                if (records.isEmpty()) {
                    reportUntracedLeak(resourceType);
                } else {
                    reportTracedLeak(resourceType, records);
                }
            }
        }
    }
    
    
    创建一个虚引用
    DefaultResourceLeak(
                Object referent,
                ReferenceQueue<Object> refQueue,
                ConcurrentMap<DefaultResourceLeak<?>, LeakEntry> allLeaks) {
传递一个我们要监控的对象和公共队列
            super(referent, refQueue);

            assert referent != null;

        获取hash只
            trackedHash = System.identityHashCode(referent);
存放在allLeaks表中
            allLeaks.put(this, LeakEntry.INSTANCE);
更改record属性
            headUpdater.set(this, new Record(Record.BOTTOM));
            this.allLeaks = allLeaks;
        }
        
          Reference(T referent, ReferenceQueue<? super T> queue) {
        this.referent = referent;
        this.queue = (queue == null) ? ReferenceQueue.NULL : queue;
    }

DirectByteBuffer内存回收(jdk,普通 ,有cleaner和无cleaner),注意这边的cleaner是netty的一个接口不是jdk的原生cleaner类,我们通过这个接口进行反射调用unsafe的freeMemory,而cleaner 只有jdk6和9支持

DirectByteBuffer:原生的jdk对象,与堆外内存交互,如果我们直接使用该对象,那么堆外内存会在,堆内对象DirectByteBuffer被gc后,依靠cleaner调用其内部的线程执行unsafe的freeMemory来回收内存
具体逻辑如下:创建directBytebuffer 会创建一个cleaner,cleaner内部包含这个directBytebuffer
同时会把该cleaner加入到一个cleaner链表的头部
当directBytebuffer 被gc cleaner 加入到队列里面
然后调用cleaner自身的clean方法
该方法先调用remove 吧cleaner从链表中删除
然后在调用cleaner携带的一个线程的run方法
该run方法内部通过unsafe是否堆外内存

UnpooledDirectByteBuf:netty对象,内部持有DirectByteBuffer,释放内存通过调用release-->dellocate-->CLEANER--->unsafe进行释放

UnpooledUnsafeNoCleanerDirectByteBuf:netty对象,内部持有DirectByteBuffer,释放内存通过调用release-->dellocate--->unsafe进行释放

UnpooledUnsafeDirectByteBuf:netty对象,内部持有DirectByteBuffer,释放内存通过调用release-->dellocate-->CLEANER--->unsafe进行释放

recycle---参考了这篇博客链接:https://www.jianshu.com/p/4eab8450560c,來源:简书

首先看图
recycler.png
Recycle

提供的方法:

Recycler主要提供了3个方法:
get():获取一个对象。
recycle(T, Handle):回收一个对象,T为对象泛型。
newObject(Handle):当没有可用对象时创建对象的实现方法。

核心的四个类或接口
DefaultHandle:对象的包装类,在Recycler中缓存的对象都会包装成DefaultHandle类。
Stack:存储本线程回收的对象。对象的获取和回收对应Stack的pop和push,即获取对象时从Stack中pop出1个DefaultHandle,回收对象时将对象包装成DefaultHandle push到Stack中。Stack会与线程绑定,即每个用到Recycler的线程都会拥有1个Stack,在该线程中获取对象都是在该线程的Stack中pop出一个可用对象。
WeakOrderQueue:存储其它线程回收到本线程stack的对象,当某个线程从Stack中获取不到对象时会从WeakOrderQueue中获取对象。每个线程的Stack拥有1个WeakOrderQueue链表,链表每个节点对应1个其它线程的WeakOrderQueue,其它线程回收到该Stack的对象就存储在这个WeakOrderQueue里。
Link: WeakOrderQueue中包含1个Link链表,回收对象存储在链表某个Link节点里,当Link节点存储的回收对象满了时会新建1个Link放在Link链表尾。

----总结:
1.Link属于WeakOrderQueue的元素,Link本身持有是一个Defaulthandler数组
2.WeakHashMap,每个线程都一个,key是stack value是WeakOrderQueue
3.加入A线程的stack被线程b,c,d 拿到,因为thread(A的线程) == currentThread(b,c,d的线程),所以通过该b,c,d的WeakHashMap看看A的stack是否有对应的WeakOrderQueue,如果没有,就给他创建。然后b,c,d依次把对象归还到各自WeakHashMap对应A的stack的WeakOrderQueue中。所以加入有5个线程,那么每个线程中的最多可以包含四个WeakOrderQueue。
4.scavenge每次都是从head节点获取,如果获取不到就next,这些队列并不删除,jvm会自动在内存不够的时候删除

recycle 取得时候不需要加锁 因为其是从线程变量里面取 还的时候却有可能多线程还 所以采用多个队列 避免 加锁 同时队列可以无限长吗 什么时候消减,同时采用了弱引用避免队列过大,造成gc压力
源码如下:
get

 public final T get() {
 如果maxCapacityPerThread==0 说明不需要经过stack缓存,直接new对象,NOOP_HANDLE后期不会回收defaulthandle
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        获取当前线程的stack,如果没有就创建 且和当前线程绑定
        Stack<T> stack = threadLocal.get();
        从当前线程的stack拿DefaultHandle
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
        如果为空说明 利用该stack生成一个handle,这个handler回收对象的时候会把value存放到stack中
            handle = stack.newHandle();
            然后生成一个对象,这个对象就是我们返回给调用者一个包装对象,内部有调用者想要的对象和对应的handle
            
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }
        这边就是newObject一个实现类,可以看到对应的handle被这个对象给绑定 从而可以得到这个stack
   private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
            return new PooledUnsafeDirectByteBuf(handle, 0);
        }
    };

   DefaultHandle<T> pop() {
            int size = this.size;
            等于0代表当前stack的数据已经用完了
            if (size == 0) {
            从WeakOrderQueue获取
                if (!scavenge()) {
                    return null;
                }
                size = this.size;
            }
            拿到数据从数组中删除
            size --;
            DefaultHandle ret = elements[size];
            elements[size] = null;
            如果不相等说明被多次回收了 抛出异常
            if (ret.lastRecycledId != ret.recycleId) {
                throw new IllegalStateException("recycled multiple times");
            }
            ret.recycleId = 0;
            ret.lastRecycledId = 0;
            this.size = size;
            return ret;
        }

recycle

  public final boolean recycle(T o, Handle<T> handle) {
        if (handle == NOOP_HANDLE) {
            return false;
        }

        DefaultHandle<T> h = (DefaultHandle<T>) handle;
        if (h.stack.parent != this) {
            return false;
        }

        h.recycle(o);
        return true;
    }

   public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }
            stack.push(this);
        }

    void push(DefaultHandle<?> item) {
            Thread currentThread = Thread.currentThread();
            if (threadRef.get() == currentThread) {
                // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
                pushNow(item);
            } else {
                // The current Thread is not the one that belongs to the Stack
                // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
                // happens later.
                pushLater(item, currentThread);
            }
        }

    private void pushNow(DefaultHandle<?> item) {
            if ((item.recycleId | item.lastRecycledId) != 0) {
                throw new IllegalStateException("recycled already");
            }
            item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

            int size = this.size;
            if (size >= maxCapacity || dropHandle(item)) {
                // Hit the maximum capacity or should drop - drop the possibly youngest object.
                return;
            }
            if (size == elements.length) {
                elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
            }

            elements[size] = item;
            this.size = size + 1;
        }

  private void pushLater(DefaultHandle<?> item, Thread thread) {
            这边是获取这个线程的map,持有的stack却是其他线程的
            Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
            WeakOrderQueue queue = delayedRecycled.get(this);
            if (queue == null) {
                if (delayedRecycled.size() >= maxDelayedQueues) {
                    // Add a dummy queue so we know we should drop the object
                    delayedRecycled.put(this, WeakOrderQueue.DUMMY);
                    return;
                }
                // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
                if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
                    // drop object
                    return;
                }
                delayedRecycled.put(this, queue);
            } else if (queue == WeakOrderQueue.DUMMY) {
                // drop object
                return;
            }

            queue.add(item);
        }

scavenge--每次都是从head节点获取,如果获取不到就next,这些队列并不删除,jvm会自动在内存不够的时候删除

    boolean scavenge() {
            // continue an existing scavenge, if any
            if (scavengeSome()) {
                return true;
            }

            // reset our scavenge cursor
            prev = null;
            cursor = head;
            return false;
        }
    
   boolean scavengeSome() {
        如果是第一次调用则cursor 和prev 都是null
    WeakOrderQueue prev;
            WeakOrderQueue cursor = this.cursor;
            if (cursor == null) {
                prev = null;
                把cursor移到head
                cursor = head;
                如何还是为空代表整个WeakOrderQueue队列为空
                if (cursor == null) {
                    return false;
                }
            } else {
        cursor不为null 就是代表head节点不为null,  prev代表光标cursor的前一个
                prev = this.prev;
            }

            boolean success = false;
            do {
            将head节点中的数据转移到stack中
                if (cursor.transfer(this)) {
                    success = true;
                    break;
                }
                转移失败找head节点的下一个
                WeakOrderQueue next = cursor.next;
                cursor.owner.get() == null 代表已经被内存回收
                if (cursor.owner.get() == null) {
                    // If the thread associated with the queue is gone, unlink it, after
                    // performing a volatile read to confirm there is no data left to collect.
                    // We never unlink the first queue, as we don't want to synchronize on updating the head.
                    if (cursor.hasFinalData()) {
                        for (;;) {
                            if (cursor.transfer(this)) {
                                success = true;
                            } else {
                                break;
                            }
                        }
                    }

                    if (prev != null) {
                        prev.setNext(next);
                    }
                } else {
                    prev = cursor;
                }

                cursor = next;

            } while (cursor != null && !success);

            this.prev = prev;
            this.cursor = cursor;
            return success;
        }

  boolean transfer(Stack<?> dst) {
            Link head = this.head.link;
            if (head == null) {
                return false;
            }

            if (head.readIndex == LINK_CAPACITY) {
                if (head.next == null) {
                    return false;
                }
                this.head.link = head = head.next;
            }

            final int srcStart = head.readIndex;
            int srcEnd = head.get();
            final int srcSize = srcEnd - srcStart;
            if (srcSize == 0) {
                return false;
            }

            final int dstSize = dst.size;
            final int expectedCapacity = dstSize + srcSize;

            if (expectedCapacity > dst.elements.length) {
                final int actualCapacity = dst.increaseCapacity(expectedCapacity);
                srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
            }

            if (srcStart != srcEnd) {
                final DefaultHandle[] srcElems = head.elements;
                final DefaultHandle[] dstElems = dst.elements;
                int newDstSize = dstSize;
                for (int i = srcStart; i < srcEnd; i++) {
                    DefaultHandle element = srcElems[i];
                    if (element.recycleId == 0) {
                        element.recycleId = element.lastRecycledId;
                    } else if (element.recycleId != element.lastRecycledId) {
                        throw new IllegalStateException("recycled already");
                    }
                    srcElems[i] = null;

                    if (dst.dropHandle(element)) {
                        // Drop the object.
                        continue;
                    }
                    element.stack = dst;
                    dstElems[newDstSize ++] = element;
                }

                if (srcEnd == LINK_CAPACITY && head.next != null) {
                    // Add capacity back as the Link is GCed.
                    this.head.reclaimSpace(LINK_CAPACITY);
                    this.head.link = head.next;
                }

                head.readIndex = srcEnd;
                if (dst.size == newDstSize) {
                    return false;
                }
                dst.size = newDstSize;
                return true;
            } else {
                // The destination stack is full already.
                return false;
            }
        }
    }
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容