netty源码分析(28)- PooledByteBufAllocator分析

上一节分析了UnpooledByteBufAllocator,包括了堆内堆外内存是如何分配的,底层时时如何获取数据内容的。
本节分析分析PooledByteBufAllocator,看看它是怎么做Pooled类型的内存管理的。

  • 入口PooledByteBufAllocator#newHeapBuffer()PooledByteBufAllocator#newDirectBuffer()
    堆内内存和堆外内存分配的模式都比较固定
  1. 拿到线程局部缓存PoolThreadCache
  2. 拿到不同类型的rena
  3. 使用不同类型的arena进行内存分配
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        //拿到线程局部缓存
        PoolThreadCache cache = threadCache.get();
        //拿到heapArena
        PoolArena<byte[]> heapArena = cache.heapArena;

        final ByteBuf buf;
        if (heapArena != null) {
            //使用heapArena分配内存
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        //拿到线程局部缓存
        PoolThreadCache cache = threadCache.get();
        //拿到directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            //使用directArena分配内存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
  • 跟踪threadCache.get()
    调用的是FastThreadLocal#get()方法。那么其实threadCache也是一个FastThreadLocal,可以看成是jdk的ThreadLocal,只不过还了一种跟家块的是西安方法。get方发住哟啊是调用了初始化方法initialize
    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
        //调用初始化方法
        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }
private final PoolThreadLocalCache threadCache;

initialValue()方法的逻辑如下

  1. 从预先准备好的heapArenasdirectArenas中获取最少使用的arena
  2. 使用获取到的arean为参数,实例化一个PoolThreadCache并返回
    final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        private final boolean useCacheForAllThreads;

        PoolThreadLocalCache(boolean useCacheForAllThreads) {
            this.useCacheForAllThreads = useCacheForAllThreads;
        }

        @Override
        protected synchronized PoolThreadCache initialValue() {
            /**
             * arena翻译成竞技场,关于内存非配的逻辑都在这个竞技场中进行分配
             */
            //获取heapArena:从heapArenas堆内竞技场中拿出使用最少的一个arena
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            //获取directArena:从directArena堆内竞技场中拿出使用最少的一个arena
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                //创建PoolThreadCache:该Cache最终被一个线程使用
                //通过heapArena和directArena维护两大块内存:堆和堆外内存
                //通过tinyCacheSize,smallCacheSize,normalCacheSize维护ByteBuf缓存列表维护反复使用的内存块
                return new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }

      //省略代码......

      }

查看PoolThreadCache其维护了两种类型的内存分配策略,一种是上述通过持有heapArenadirectArena,另一种是通过维护tiny,small,normal对应的缓存列表来维护反复使用的内存。

final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    //通过arena的方式维护内存
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //维护了tiny, small, normal三种类型的缓存列表
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;
    private final AtomicBoolean freed = new AtomicBoolean();

    private int allocations;

    // TODO: Test if adding padding helps under contention
    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;

        //通过持有heapArena和directArena,arena的方式管理内存分配
        this.heapArena = heapArena;
        this.directArena = directArena;

        //通过tinyCacheSize,smallCacheSize,normalCacheSize创建不同类型的缓存列表并保存到成员变量
        if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            //创建规格化缓存队列
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            //创建规格化缓存队列
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            //创建规格化缓存队列
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + freeSweepAllocationThreshold + " (expected: > 0)");
        }
    }

    private static <T> MemoryRegionCache<T>[] createSubPageCaches(
            int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0 && numCaches > 0) {
            //MemoryRegionCache 维护缓存的一个对象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                //每一种MemoryRegionCache(tiny,small,normal)都表示不同内存大小(不同规格)的一个队列
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

    private static <T> MemoryRegionCache<T>[] createNormalCaches(
            int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
            //MemoryRegionCache 维护缓存的一个对象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                //每一种MemoryRegionCache(tiny,small,normal)都表示不同内存(不同规格)大小的一个队列
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }

......
}

通过查看分配缓存的方法PoolThreadCache#createSubPageCaches()可以发现具体维护的缓存列表对象MemoryRegionCache实际上时维护了一个Queue<Entry<T>> queue也就是队列。

    private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;
        private int allocations;

        MemoryRegionCache(int size, SizeClass sizeClass) {
            //做一个简单的规格化
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            //持有这种规格的缓存队列
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }
     ......
     }
  • 关于准备好的内存竞技场heapArenadirectArenaPooledByteBufAllocator持有。在实例化分配器的时候被初始化值
    private final PoolArena<byte[]>[] heapArenas;
    private final PoolArena<ByteBuffer>[] directArenas;
    
    //三种缓存列表长度
    private final int tinyCacheSize;
    private final int smallCacheSize;
    private final int normalCacheSize;

跟踪初始化的过程可以发现,其实headArenadirectArena都是一个PoolArena[],其内部分别定义了两个内部类PoolArena.HeapArenaPoolArena.DirectArena分别表示堆内内存竞技场和堆外内存竞技场。

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        super(preferDirect);
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        checkPositiveOrZero(nHeapArena, "nHeapArena");
        checkPositiveOrZero(nDirectArena, "nDirectArena");

        checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
        if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
            throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
        }

        if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: power of two)");
        }

        int pageShifts = validateAndCalculatePageShifts(pageSize);

        //创建两种内存分配的PoolArena数组,heapArenas和directArenas
        if (nHeapArena > 0) {
            //创建heapArenas内存竞技场(其实是PoolArena[])
            //nHeapArena:数组大小
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                //堆内:PoolArena[]存放它下面的HeapArena
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }

        if (nDirectArena > 0) {
            //创建heapArenas内存竞技场(其实是PoolArena[])
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                //堆外:PoolArena[]存放它下面的DirectArena
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        metric = new PooledByteBufAllocatorMetric(this);
    }
    private static <T> PoolArena<T>[] newArenaArray(int size) {
        //创建PoolArena数组
        return new PoolArena[size];
    }

初始化内存竞技场数组的大家的默认值为defaultMinNumArena,2被的cpu核心数,运行时每个线程可独享一个arena,内存分配的时候就不用加锁了

    public PooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
    }
        //2倍cpu核心数,默认创建这个数量大小的Arena数组
        // (这个数字和创建NioEventLoop数组的数量一致,每个线程都可以由一个独享的arena,这个数组中的arena其实在分配内存的时候是不用加锁的)
        final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numHeapArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                runtime.maxMemory() / defaultChunkSize / 2 / 3)));
        DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numDirectArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

  • 整体分配架构,如图
    假设初始化了4个NioEventLoop也就是4个线程的数组,默认cpu核心数为2。那么内存分配器PooledByteBufAllocator持有的arena数量也是4个。创建一个ByteBuf的过程如下:
  • 首先,通过PoolThreadCache去拿到一个对应的arena对象。那么PoolThreadCache的作用就是通过ThreadLoad的方式把内存分配器PooledByteBufAllocator持有的arena数组中其中的一个arena(最少使用的)塞到PoolThreadCache的一个成员变量里面。
  • 然后,当每个线程通过它(threadCache)去调用get方法的时候,会拿到它底层的一个arena,也就是第一个线程拿到第一个,第二个线程拿到第二个以此类推。这样可以把线程和arena进行一个绑定
  • PoolThreadCache除了可以直接在arena管理的这块内存进行内存分配,还可在它底层维护的一个ByteBuf缓存列表里进行内存分配。在PooledByteBufAllocator中持有tinyCacheSize,smallCacheSize,normalCacheSize,分配内存时调用threadCache.get();的时候实例化PoolThreadCache作为它的构造方法参数传入,创建了对应的缓存列表。
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,189评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,577评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,857评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,703评论 1 276
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,705评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,620评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,995评论 3 396
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,656评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,898评论 1 298
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,639评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,720评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,395评论 4 319
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,982评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,953评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,195评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,907评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,472评论 2 342

推荐阅读更多精彩内容

  • Swift1> Swift和OC的区别1.1> Swift没有地址/指针的概念1.2> 泛型1.3> 类型严谨 对...
    cosWriter阅读 11,084评论 1 32
  • 在一个方法内部定义的变量都存储在栈中,当这个函数运行结束后,其对应的栈就会被回收,此时,在其方法体中定义的变量将不...
    Y了个J阅读 4,412评论 1 14
  • 在学习jemalloc之前可以了解一下glibc malloc,jemalloc没有'unlinking' 和 '...
    dcharles阅读 6,633评论 0 7
  • Java SE 基础: 封装、继承、多态 封装: 概念:就是把对象的属性和操作(或服务)结合为一个独立的整体,并尽...
    Jayden_Cao阅读 2,095评论 0 8
  • 所有知识点已整理成app app下载地址 J2EE 部分: 1.Switch能否用string做参数? 在 Jav...
    侯蛋蛋_阅读 2,407评论 1 4