上一节分析了UnpooledByteBufAllocator
,包括了堆内堆外内存是如何分配的,底层时时如何获取数据内容的。
本节分析分析PooledByteBufAllocator
,看看它是怎么做Pooled
类型的内存管理的。
- 入口
PooledByteBufAllocator#newHeapBuffer()
和PooledByteBufAllocator#newDirectBuffer()
,
堆内内存和堆外内存分配的模式都比较固定
- 拿到线程局部缓存
PoolThreadCache
- 拿到不同类型的
rena
- 使用不同类型的
arena
进行内存分配
@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
//拿到线程局部缓存
PoolThreadCache cache = threadCache.get();
//拿到heapArena
PoolArena<byte[]> heapArena = cache.heapArena;
final ByteBuf buf;
if (heapArena != null) {
//使用heapArena分配内存
buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
//拿到线程局部缓存
PoolThreadCache cache = threadCache.get();
//拿到directArena
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
//使用directArena分配内存
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
-
跟踪threadCache.get()
调用的是FastThreadLocal#get()
方法。那么其实threadCache
也是一个FastThreadLocal
,可以看成是jdk的ThreadLocal
,只不过还了一种跟家块的是西安方法。get
方发住哟啊是调用了初始化方法initialize
public final V get() {
InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
Object v = threadLocalMap.indexedVariable(index);
if (v != InternalThreadLocalMap.UNSET) {
return (V) v;
}
//调用初始化方法
V value = initialize(threadLocalMap);
registerCleaner(threadLocalMap);
return value;
}
private final PoolThreadLocalCache threadCache;
initialValue()
方法的逻辑如下
- 从预先准备好的
heapArenas
和directArenas
中获取最少使用的arena
- 使用获取到的
arean
为参数,实例化一个PoolThreadCache
并返回
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
/**
* arena翻译成竞技场,关于内存非配的逻辑都在这个竞技场中进行分配
*/
//获取heapArena:从heapArenas堆内竞技场中拿出使用最少的一个arena
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
//获取directArena:从directArena堆内竞技场中拿出使用最少的一个arena
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
//创建PoolThreadCache:该Cache最终被一个线程使用
//通过heapArena和directArena维护两大块内存:堆和堆外内存
//通过tinyCacheSize,smallCacheSize,normalCacheSize维护ByteBuf缓存列表维护反复使用的内存块
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
//省略代码......
}
查看PoolThreadCache
其维护了两种类型的内存分配策略,一种是上述通过持有heapArena
和directArena
,另一种是通过维护tiny
,small
,normal
对应的缓存列表来维护反复使用的内存。
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
//通过arena的方式维护内存
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
//维护了tiny, small, normal三种类型的缓存列表
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// Used for bitshifting when calculate the index of normal caches later
private final int numShiftsNormalDirect;
private final int numShiftsNormalHeap;
private final int freeSweepAllocationThreshold;
private final AtomicBoolean freed = new AtomicBoolean();
private int allocations;
// TODO: Test if adding padding helps under contention
//private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
//通过持有heapArena和directArena,arena的方式管理内存分配
this.heapArena = heapArena;
this.directArena = directArena;
//通过tinyCacheSize,smallCacheSize,normalCacheSize创建不同类型的缓存列表并保存到成员变量
if (directArena != null) {
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
//创建规格化缓存队列
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
//创建规格化缓存队列
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
//创建规格化缓存队列
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
//MemoryRegionCache 维护缓存的一个对象
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
//每一种MemoryRegionCache(tiny,small,normal)都表示不同内存大小(不同规格)的一个队列
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
//MemoryRegionCache 维护缓存的一个对象
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
//每一种MemoryRegionCache(tiny,small,normal)都表示不同内存(不同规格)大小的一个队列
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
return cache;
} else {
return null;
}
}
......
}
通过查看分配缓存的方法PoolThreadCache#createSubPageCaches()
可以发现具体维护的缓存列表对象MemoryRegionCache
实际上时维护了一个Queue<Entry<T>> queue
也就是队列。
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
//做一个简单的规格化
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
//持有这种规格的缓存队列
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
......
}
- 关于准备好的内存竞技场
heapArena
和directArena
被PooledByteBufAllocator
持有。在实例化分配器的时候被初始化值
private final PoolArena<byte[]>[] heapArenas;
private final PoolArena<ByteBuffer>[] directArenas;
//三种缓存列表长度
private final int tinyCacheSize;
private final int smallCacheSize;
private final int normalCacheSize;
跟踪初始化的过程可以发现,其实headArena
和directArena
都是一个PoolArena[]
,其内部分别定义了两个内部类PoolArena.HeapArena
和PoolArena.DirectArena
分别表示堆内内存竞技场和堆外内存竞技场。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
checkPositiveOrZero(nHeapArena, "nHeapArena");
checkPositiveOrZero(nDirectArena, "nDirectArena");
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
//创建两种内存分配的PoolArena数组,heapArenas和directArenas
if (nHeapArena > 0) {
//创建heapArenas内存竞技场(其实是PoolArena[])
//nHeapArena:数组大小
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
//堆内:PoolArena[]存放它下面的HeapArena
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
//创建heapArenas内存竞技场(其实是PoolArena[])
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
//堆外:PoolArena[]存放它下面的DirectArena
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
private static <T> PoolArena<T>[] newArenaArray(int size) {
//创建PoolArena数组
return new PoolArena[size];
}
初始化内存竞技场数组的大家的默认值为defaultMinNumArena
,2被的cpu核心数,运行时每个线程可独享一个arena,内存分配的时候就不用加锁了
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
//2倍cpu核心数,默认创建这个数量大小的Arena数组
// (这个数字和创建NioEventLoop数组的数量一致,每个线程都可以由一个独享的arena,这个数组中的arena其实在分配内存的时候是不用加锁的)
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numDirectArenas",
(int) Math.min(
defaultMinNumArena,
PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));
- 整体分配架构,如图
假设初始化了4个NioEventLoop
也就是4个线程的数组,默认cpu核心数为2。那么内存分配器PooledByteBufAllocator
持有的arena
数量也是4个。创建一个ByteBuf的过程如下:
- 首先,通过
PoolThreadCache
去拿到一个对应的arena
对象。那么PoolThreadCache
的作用就是通过ThreadLoad
的方式把内存分配器PooledByteBufAllocator
持有的arena
数组中其中的一个arena(最少使用的)
塞到PoolThreadCache
的一个成员变量里面。- 然后,当每个线程通过它(
threadCache
)去调用get
方法的时候,会拿到它底层的一个arena
,也就是第一个线程拿到第一个,第二个线程拿到第二个以此类推。这样可以把线程和arena
进行一个绑定PoolThreadCache
除了可以直接在arena
管理的这块内存进行内存分配,还可在它底层维护的一个ByteBuf缓存列表里进行内存分配。在PooledByteBufAllocator
中持有tinyCacheSize
,smallCacheSize
,normalCacheSize
,分配内存时调用threadCache.get();
的时候实例化PoolThreadCache
作为它的构造方法参数传入,创建了对应的缓存列表。