对象池

概要

看到现在,Netty之设计精妙,令人感叹。优化至极,令人发指。在高并发场景下,对象的分配的损耗是很大的,特别是生命周期短的,又需要反复创建,又被GC掉的。那么对象池是解决这类问题的好的方式。而Netty的对象池设计也是让我大开眼界。

1322310-20190627211957175-853244707[1].jpg
1564066435789.png
  • STACK : Stack:存储本线程回收的对象。对象的获取和回收对应Stack的pop和push,即获取对象时从Stack中pop出1个DefaultHandle,回收对象时将对象包装成DefaultHandle push到Stack中。Stack会与线程绑定,即每个用到Recycler的线程都会拥有1个Stack,在该线程中获取对象都是在该线程的Stack中pop出一个可用对象。
  • Handle: 对象的包装类,在Recycler中缓存的对象都会包装成DefaultHandle类。
  • WeakOrderQueue:存储其它线程回收到本线程stack的对象,当某个线程从Stack中获取不到对象时会从WeakOrderQueue中获取对象。每个线程的Stack拥有1个WeakOrderQueue链表,链表每个节点对应1个其它线程的WeakOrderQueue,其它线程回收到该Stack的对象就存储在这个WeakOrderQueue里。
  • Link: WeakOrderQueue中包含1个Link链表,回收对象存储在链表某个Link节点里,当Link节点存储的回收对象满了时会新建1个Link放在Link链表尾。

初始化

// 在PooledHeapByteBuf里面初始化一个RECYCLER用来针对PooledHeapByteBuf做对象回收
private static final Recycler<PooledHeapByteBuf> RECYCLER = new Recycler<PooledHeapByteBuf>() {
    @Override
    // 这个方法用来构造回收的实际对象
    protected PooledHeapByteBuf newObject(Handle<PooledHeapByteBuf> handle) {
        return new PooledHeapByteBuf(handle, 0);
    }
};

static PooledHeapByteBuf newInstance(int maxCapacity) {
    // 当需要一个PooledHeapByteBuf的时候,会去回收器里面看有没有可以复用的对象
    PooledHeapByteBuf buf = RECYCLER.get();
    // 如果有,那么重置这个buf,返回。
    buf.reuse(maxCapacity);
    return buf;
}

stack

关键属性

final Recycler<T> parent;
final Thread thread;
// 等于maxCapacity(32768) / maxSharedCapacityFactor(2) = 16k
// 这里代表
final AtomicInteger availableSharedCapacity;
// 总共支持多少个线程来帮忙回收本线程生成对象
final int maxDelayedQueues;
// 32768, 代表本线程生成的对象中,本地stack最大能存放多少个回收的对象
private final int maxCapacity;
// 回收对象的频率,每8个请求回收的对象中,最终只回收一个。
private final int ratioMask;
// 回收对象实际存放的数组
private DefaultHandle<?>[] elements;
// 实际现有对象的个数
private int size;
// 总共触发回收的次数
private int handleRecycleCount = -1; 
private WeakOrderQueue cursor, prev;
// 代表stack关联的其他线程帮忙回收的WeakOrderQueue列表,因为更新head涉及到多线程同步的问题,
// 用volatile保持可见
private volatile WeakOrderQueue head;

获取对象

public final T get() {
    if (maxCapacityPerThread == 0) {
        // 直接返回对象,会调用前面的newObject
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    // 拿到本线程绑定的stack
    Stack<T> stack = threadLocal.get();
    // 取stack中查看是否有对象可以复用
    DefaultHandle<T> handle = stack.pop();
    // 如果没有
    if (handle == null) {
        // 新建handle,并将绑定该stack,将handle的value设置为newObject
        // 也就是将对象包装成handle并绑定stack,返回。
        // 绑定的话,那么当回收对象的时候,如果是创建stack的线程,那么就可以直接回收掉    
        handle = stack.newHandle();
        handle.value = newObject(handle);
    }
    return (T) handle.value;
}

pop

DefaultHandle<T> pop() {
    int size = this.size;
    // 如果本地stack没有对象可复用,那么去其他线程回收的本线程对象中看看有没有可以用的。
    if (size == 0) {
        if (!scavenge()) {
            return null;
        }
        size = this.size;
    }
    // 否则拿到最后一位对象
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    // stack里handle的lastRecycledId和recycleId必须相等,否则被认为重复回收。
    if (ret.lastRecycledId != ret.recycleId) {
        throw new IllegalStateException("recycled multiple times");
    }
    // 将该对象的lastRecycledId和recycleId重置
    ret.recycleId = 0;
    ret.lastRecycledId = 0;
    this.size = size;
    return ret;
}

scavenge

boolean scavenge() {
    // continue an existing scavenge, if any
    if (scavengeSome()) {
        return true;
    }

    // reset our scavenge cursor
    prev = null;
    cursor = head;
    return false;
}

boolean scavengeSome() {
    // 暂且认为是指针,指示上次扫描到哪了,这次接着往下
    WeakOrderQueue cursor = this.cursor;
    // 如果指针为空,那么从头开始
    if (cursor == null) {
        cursor = head;
        if (cursor == null) {
            return false;
        }
    }

    boolean success = false;
    
    WeakOrderQueue prev = this.prev;
    do {
        // 开始迁移当前WeakOrderQueue,如果成功,直接break。
        if (cursor.transfer(this)) {
            success = true;
            break;
        }
        
        // 前面失败,继续下一个
        WeakOrderQueue next = cursor.next;
        
        // owner.get()等于null
        // 说明当初回收的线程已经不可达
        // owenr是WeakReference,如果get为null,只有一种可能,就是thread为null
        if (cursor.owner.get() == null) {
            // 看这个节点是否还有数据
            if (cursor.hasFinalData()) {
                for (;;) {
                    // 因为是无限循环,结果是
                    // 将这个节点上的数据全部迁移到目的地stack中来
                    if (cursor.transfer(this)) {
                        success = true;
                    } else {
                        break;
                    }
                }
            }
            // 既然这个回收的线程都挂了,那么这个节点也没用了,直接丢弃
            if (prev != null) {
                prev.next = next;
            }
        } else {
            // 继续下一个
            prev = cursor;
        }

        cursor = next;
    // 直到遍历完跟目的地相关的所有WeakOrderQueue
    } while (cursor != null && !success);

    this.prev = prev;
    this.cursor = cursor;
    return success;
}

transfer

boolean transfer(Stack<?> dst) {
    // 从WeakOrderQueue里的link开始遍历
    Link head = this.head;
    if (head == null) {
        return false;
    }

    // 如果已经扫完了当前link,那么接着next继续扫
    // 这里还有个目的是head前移,而之前的head将会被GC掉
    if (head.readIndex == LINK_CAPACITY) {
        if (head.next == null) {
            return false;
        }
        this.head = head = head.next;
    }

    // srcStart当然从link里的0开始,如果之前没有读取过的话
    final int srcStart = head.readIndex;
    // srcEnd是当前link里的最后一个对象位置
    int srcEnd = head.get();
    // 总共这个link里面存了多少个对象
    final int srcSize = srcEnd - srcStart;
    if (srcSize == 0) {
        return false;
    }

    // 获取转移元素的目的地Stack中当前的元素个数
    final int dstSize = dst.size;
    // 计算预计的容量
    final int expectedCapacity = dstSize + srcSize;
    
    // 如果预计的容量要超过目的地stack的最大长度,那么stack需要扩容
    // 一直扩到能容纳expectedCapacity为止
    // srcStart + actualCapacity - dstSize就表示扩容后我最多能将这里的对象转移过去的数目
    if (expectedCapacity > dst.elements.length) {
        final int actualCapacity = dst.increaseCapacity(expectedCapacity);
        srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
    }

    // 迁移开始
    if (srcStart != srcEnd) {
        // 拿到当前link的回收对象
        final DefaultHandle[] srcElems = head.elements;
        // 拿到目的地stack的回收对象
        final DefaultHandle[] dstElems = dst.elements;
        int newDstSize = dstSize;
        for (int i = srcStart; i < srcEnd; i++) {
            DefaultHandle element = srcElems[i];
            // 如果recycleId为0,设为lastRecycledId
            if (element.recycleId == 0) {
                element.recycleId = element.lastRecycledId;
                // 如果不相等,表示已经被回收了
            } else if (element.recycleId != element.lastRecycledId) {
                throw new IllegalStateException("recycled already");
            }
            // 将link中的对象清空
            srcElems[i] = null;

            // stask有自己的回收频率,看是否在对的节奏上,不是就放弃
            if (dst.dropHandle(element)) {
                // Drop the object.
                continue;
            }
            // 重新将该对象stack设为目的地stack
            element.stack = dst;
            // 加到目的地stack中
            dstElems[newDstSize ++] = element;
        }
        
        // 这里说明这个link已经被转移完毕了,那么这个link也就没有用了
        if (srcEnd == LINK_CAPACITY && head.next != null) {
            // Add capacity back as the Link is GCed.
            // 这里主要是更新availableSharedCapacity
            // 释放掉资源
            reclaimSpace(LINK_CAPACITY);
            // head前移,之前的head将被GC
            this.head = head.next;
        }
        
        // 当然要同步更新读取的进度
        // 如果这个时候目的地的size与前面转移前的size一样
        // 说明根本就转移没成功,返回false
        head.readIndex = srcEnd;
        if (dst.size == newDstSize) {
            return false;
        }
        // 设置目的地stack的size,成功返回
        dst.size = newDstSize;
        return true;
    } else {
        // The destination stack is full already.
        return false;
    }
}

回收对象

// 所有的recycler都要共享这个延迟回收的ThreadLocal,里面实际保存的各个stack对应WeakOrderQueue
private static final FastThreadLocal<Map<Stack<?>, WeakOrderQueue>> DELAYED_RECYCLED                = new FastThreadLocal<Map<Stack<?>, WeakOrderQueue>>() {
@Override
protected Map<Stack<?>, WeakOrderQueue> initialValue() {
    return new WeakHashMap<Stack<?>, WeakOrderQueue>();
}
};
void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    
    if (thread == currentThread) {
        // 如果stack所绑定的线程就是当前线程,那么立即回收
        pushNow(item);
    } else {
        // 如果stack所绑定的线程不是当前线程,待会再回收
        pushLater(item, currentThread);
    }
}

pushNow

private void pushNow(DefaultHandle<?> item) {
    // 如果添加到stack的handle的recycleId或lastrecycleId不为0
    // 说明之前已经被回收过了
    if ((item.recycleId | item.lastRecycledId) != 0) {
        throw new IllegalStateException("recycled already");
    }
    // 否则,重新自增一个id赋予recycleId或lastrecycleId
    item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

    int size = this.size;
    // 如果stack已满
    // dropHandle是每8个对象请求回收,只接收一个,防止对象爆炸式增长
    if (size >= maxCapacity || dropHandle(item)) {
        // Hit the maximum capacity or should drop - drop the possibly youngest object.
        return;
    }
    // stack中的elements扩容两倍,复制元素,将新数组赋值给stack.elements
    if (size == elements.length) {
        elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
    }

    // 压入对象
    elements[size] = item;
    this.size = size + 1;
}

pushLater

// 首先需要明确的是每个待回收的对象,必须是通过recycler生成的。在生成的时候就已经
// 绑定了当时创建线程的stack了,表示是该线程创建的。回收的话,如果正好也是当前线程回收
// 那么直接回收到stack里面。那么如果现在回收的线程发现,这个对象不是我创建的,那么如果
// 直接add到这个对象绑定的stack,那么就违反了stack是线程独占的事实,会发生同步的危险。
// Netty在解决这个问题时,引入了WeakOrderQueue来处理A线程生成的对象,B线程来回收。
private void pushLater(DefaultHandle<?> item, Thread thread) {
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    // 首先这里this也就是stack的上下文是指item所绑定的stack,也就是线程A
    // 拿到线程A的stack绑定的WeakOrderQueue
    WeakOrderQueue queue = delayedRecycled.get(this);
    // 如果不存在,那么初始化一个queue
    if (queue == null) {
        // 这里maxDelayedQueues是cpucore*2,也就是eventloop的最大数目
        // 说明delayedRecycled是规划每一个线程都给其他所有线程都准备有WeakOrderQueue
        if (delayedRecycled.size() >= maxDelayedQueues) {
            // 如果超过最大,那么put一个DUMMY,也就是放弃回收这个对象
            delayedRecycled.put(this, WeakOrderQueue.DUMMY,也就是放弃回收这个对象);
            return;
        }
        
        // 生成WeakOrderQueue
        if ((queue = WeakOrderQueue.allocate(this, thread)) == null) {
            // drop object
            return;
        }
        // 保存下来
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // drop object
        return;
    }
    // 将对象压入
    queue.add(item);
}

allocate

static WeakOrderQueue allocate(Stack<?> stack, Thread thread) {
    // We allocated a Link so reserve the space
    // 现在你需要在线程A的stack上新建一个WeakOrderQueue,得看下还有没有空间生成
    return reserveSpace(stack.availableSharedCapacity, LINK_CAPACITY)
            ? new WeakOrderQueue(stack, thread) : null;
}

add

void add(DefaultHandle<?> handle) {
    // 将lastRecycledId当前WeakOrderQueue的id
    handle.lastRecycledId = id;
    // 拿到尾节点Link
    Link tail = this.tail;
    int writeIndex;
    // 如果尾节点的Link里面已满,那么需要再新建Link
    if ((writeIndex = tail.get()) == LINK_CAPACITY) {
        // 如果没有空间,那么丢弃该对象,回收中断
        if (!reserveSpace(availableSharedCapacity, LINK_CAPACITY)) {
            // Drop it.
            return;
        }
        // 否则在链表尾部追加一个新的link
        this.tail = tail = tail.next = new Link();
        // 新的link的起始写入位置一定从0开始
        writeIndex = tail.get();
    }
    // 将handle放到tail里面
    tail.elements[writeIndex] = handle;
    // 将handle跟源stack解绑
    handle.stack = null;
    // tail节点写入位置+1
    tail.lazySet(writeIndex + 1);
}

为什么用lazySet?而不直接用set。不太理解,先记录下来,只知道lazySet有storestore屏障,所以在writeIndex + 1前,确保handle的stack=null被其他线程可见。

Right. It only guarantees write-ordering wrt previous writes, and the immediately preceding write is the nulling of the handle's stack property. So we are guaranteeing (cheaply) that the null stack value will be seen by other threads prior to anyone witnessing that tail == writeIndex + 1

It's a long time since I looked at the code, but it looks like this is simply to ensure that when the owning thread's pool reclaims the handle, and sets the stack property to itself, it is guaranteed that these two writes are not re-ordered (which could leave the value null when it should be its owning stack).

As a by-product, it also guarantees that any state that may have been modified in the recycled object is also visible prior to its reference on the receiving thread. Which is a necessary 'least surprise' property.

This class was written to (extraordinarily) minimise coordination between CPUs. More so than any other similar structure I have seen, and it pays for it by reducing the promptness of recycling between threads, since recycling was expected primarily to be same-thread or between a small cohort of threads (there have been some issues iirc, with people abusing it for random thread recycling amongst huge cohorts of threads)

Unlike many other concurrent data structures you may have seen, there may be zero volatile property accesses between two operations on different threads, so things like lazySet are necessary to guarantee against subtle correctness issues like this.

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,684评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,143评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,214评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,788评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,796评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,665评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,027评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,679评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 41,346评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,664评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,766评论 1 331
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,412评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,015评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,974评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,203评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,073评论 2 350
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,501评论 2 343

推荐阅读更多精彩内容