netty 面试 轻量级对象池Recycler原理

本文基于 netty 4.1.46

以下为了方便描述,用户缓存的对象简称 T

一、Recycler 解决的问题

Recycler 即轻量级对象池,避免同一个线程重复创建对象。和异线程回收后的重用问题,例如:线程1创建对象T,线程2回收了对象T,线程1仍然可以通过get方法拿到对象T。
避免重复创建对象的好处是,减少了因创建对象的系统消耗,减轻了jvm yong gc 回收对象的压力。

注意: 何时用轻量级对象池 Recycler , "通过对象池来避免创建对象并不是一种好的做法,除非池中的对象是非常重量级的,现代的JVM实现具有高度优化的垃圾回收期,其性能很容易就会超过轻量级对象池的性能" 《Effective java》。举个例子:netty 的 PooledDirectByteBuf 就适合用对象池,因为堆外内存的分配,要系统调用有用户态和内核态的切换比较耗时,所以非常适用对象池。

二、使用方法

public class RecycleTest {
    private static Recycler<User> RECYCLER = new Recycler<User>() {
        @Override
        protected User newObject(Handle handle) {
            return new User(handle);
        }
    };
    private static class User {
        private final Recycler.Handle<User> handle;
        public User(Recycler.Handle<User> handle) {
            this.handle = handle;
        }
        public void recycle() {
            handle.recycle(this);
        }
    }

    public static void main(String[] args) {
        final User user = RECYCLER.get();
        new Thread(new Runnable() {
            @Override
            public void run() {
                //异线程回收
                user.recycle();
            }
        }).start();
        final User user1 = RECYCLER.get();
        //同线程回收
        user1.recycle();
    }
}

三、Recycler 的实现

1、Recycler 的 uml 图
clipboard.png
2、Stack 类的结构
clipboard.png

注意:

每一个Recycler对象包含一个FastThreadLocal<Stack<T>> threadLocal实例;每一个线程包含一个Stack对象,该Stack对象包含一个DefaultHandle[],而DefaultHandle中有一个属性T,用于存储真实对象。也就是说,每一个T都会被包装成一个DefaultHandle对象

3、多线程视角描述类的关系
image.png

Recyler类包含一个类对象
static FastThreadLocal<Map<Stack<T>, WeakOrderQueue>>
每一个线程对象包含一个 Map<Stack<T>, WeakOrderQueue>,存储着当前线程中所有回收其他线程的 T 对象的集合,包括其他线程不同的 Recyler 实例产生的变量产生的 T 或 U 对象,并且分别为其创建的 WeakOrderQueue 对象

WeakOrderQueue 对象中存储一个以Head为首的Link数组,每个Link对象中存储一个DefaultHandle[]数组,用于存放回收对象。

说明:(假设线程A创建对象T)

线程 A 回收 T 时,直接将 T 的 DefaultHandle 对象,压入 Stack的DefaultHandle[] elements 中;

线程 B 回收 T 时,会首先从其 Map<Stack<T>, WeakOrderQueue> 对象中 get('线程A的Stack对象')拿到 WeakOrderQueue,然后直接将T的DefaultHandle对象压入 WeakOrderQueue 中的Link 链表中的尾部 Link 的 DefaultHandle[] 中,同时,这个 WeakOrderQueue 会与线程A的Stack中的 head 属性进行关联,用于后续对象的 pop 操作;

当线程 A 从对象池获取对象时,如果线程A的Stack中有对象,则直接弹出;如果没有对象,则先从其 head 属性所指向的 WeakorderQueue 开始遍历 queue 链表,将T对象从其他线程的WeakOrderQueue中转移到线程A的Stack中(一次pop操作只转移一个包含了元素的Link),再从 Stack 中弹出。

四、源码分析

1 、变量对应关系和含义
静态变量 默认值 Recycler 实例变量 Stack 实例变量 含义
DEFAULT_MAX_CAPACITY_PER_THREAD 4 * 1024 maxCapacityPerThread availableSharedCapacity 当前线程创建的 Handle 对象,被其他线程回收,并且能缓冲到 WeakOrderQueue 中 Handle 的数量
INITIAL_CAPACITY 256 Stack 中 DefaultHandle<?>[] elements 容量
LINK_CAPACITY 16 Link 中的数组 DefaultHandle<?>[] elements 容量
MAX_DELAYED_QUEUES_PER_THREAD 2*cpu maxDelayedQueuesPerThread maxDelayedQueues 当前线程创建的对象能够让多少个线程进行回收
8 interval 控制对象回收频率

2、获取对象get()

/**
 * 1、获取当前线程的 Stack
 * 2、从 Stack 里弹出对象
 * 3、创建对象并且和 Stack 绑定
 */
@SuppressWarnings("unchecked")
public final T get() {
    //如果 Stack 的容量为 0
    if (maxCapacityPerThread == 0) {
        return newObject((Handle<T>) NOOP_HANDLE);
    }
    //获取当前线程的 Stack
    Stack<T> stack = threadLocal.get();
    //从 Stack 里弹出对象
    DefaultHandle<T> handle = stack.pop();
    if (handle == null) {
        //创建对象并且和 Stack 绑定,回收对象时,可以通过 Handle 调用 Stack 进行回收
        handle = stack.newHandle();
        handle.value = newObject(handle);
    }
    return (T) handle.value;
}
2.1、从Stack中取出对象 pop 方法
@SuppressWarnings({ "unchecked", "rawtypes" })
DefaultHandle<T> pop() {
    int size = this.size;
    //当前线程如果没有对象
    if (size == 0) {
        //试着从其他线程回收的 WeakOrderQueue 对象中取 Handle
        if (!scavenge()) {
            return null;
        }
        size = this.size;
        if (size <= 0) {
            // double check, avoid races
            return null;
        }
    }
    size --;
    DefaultHandle ret = elements[size];
    elements[size] = null;
    // As we already set the element[size] to null we also need to store the updated size before we do
    // any validation. Otherwise we may see a null value when later try to pop again without a new element
    // added before.
    this.size = size;

    if (ret.lastRecycledId != ret.recycleId) {
        throw new IllegalStateException("recycled multiple times");
    }
    ret.recycleId = 0;
    ret.lastRecycledId = 0;
    return ret;
}
2.2 scavengeSome () 方法试着从其他线程回收的 WeakOrderQueue 对象中取 Handle
private boolean scavengeSome() {
    WeakOrderQueue prev;
    WeakOrderQueue cursor = this.cursor;
    if (cursor == null) {
        prev = null;
        cursor = head;
        if (cursor == null) {
            return false;
        }
    } else {
        prev = this.prev;
    }

    boolean success = false;
    do {
        //将 WeakOrderQueue 中的数据拿到 Stack 中的 DefaultHandle<?>[] elements
        if (cursor.transfer(this)) {
            success = true;
            break;
        }//如果没有拿到

        WeakOrderQueue next = cursor.getNext();
        //cursor 锁持有的线程 被释放掉了,那么要做一些清理工作
        if (cursor.get() == null) {
            // If the thread associated with the queue is gone, unlink it, after
            // performing a volatile read to confirm there is no data left to collect.
            // We never unlink the first queue, as we don't want to synchronize on updating the head.
            if (cursor.hasFinalData()) {//如果有数据
                for (;;) {
                    if (cursor.transfer(this)) {
                        success = true;
                    } else {
                        break;
                    }
                }
            }

            if (prev != null) {
                // Ensure we reclaim all space before dropping the WeakOrderQueue to be GC'ed.
                cursor.reclaimAllSpaceAndUnlink();
                //删除 cursor 节点
                prev.setNext(next);
            }
        } else {
            //否则移动 prev 指针到下一个节点
            prev = cursor;
        }
        //移动 cursor 指针到下一个节点
        cursor = next;

        //如果已经到结尾了,或者 已经找到了,那么循环退出
    } while (cursor != null && !success);

    this.prev = prev;
    this.cursor = cursor;
    return success;
}
2.3 transfer(this)) 方法;将 WeakOrderQueue 中的数据拿到 Stack 中的 DefaultHandle<?>[] elements
 boolean transfer(Stack<?> dst) {
        Link head = this.head.link;
        //如果当前 Link 中没有数据
        if (head == null) {
            return false;
        }
        //当前Link中的数据 都没取走了
        if (head.readIndex == LINK_CAPACITY) {
            //如果 Link 队列中没有节点了
            if (head.next == null) {
                return false;
            }
            //将当前 Head 节点移除
            head = head.next;
            this.head.relink(head);
        }
        //head 节点已经被取走的数量
        final int srcStart = head.readIndex;
        //head 节点的 Handle 数量
        int srcEnd = head.get();
        //head 节点可读的 handle 数量
        final int srcSize = srcEnd - srcStart;
        //head 节点已经没有可读 handle 对象了
        if (srcSize == 0) {
            return false;
        }
        //当前 Stack 中 DefaultHandle 的数量
        final int dstSize = dst.size;
        final int expectedCapacity = dstSize + srcSize;
        //head 节点中的 Link 的 Handle 全部读出来, Stack 中 DefaultHandle[] elements 能否放的下
        if (expectedCapacity > dst.elements.length) {
            final int actualCapacity = dst.increaseCapacity(expectedCapacity);
            //算出 需要数组的最大 下标
            srcEnd = min(srcStart + actualCapacity - dstSize, srcEnd);
        }

        if (srcStart != srcEnd) {
            final DefaultHandle[] srcElems = head.elements;
            final DefaultHandle[] dstElems = dst.elements;
            int newDstSize = dstSize;
            for (int i = srcStart; i < srcEnd; i++) {
                DefaultHandle<?> element = srcElems[i];
                if (element.recycleId == 0) {
                    element.recycleId = element.lastRecycledId;
                } else if (element.recycleId != element.lastRecycledId) {
                    throw new IllegalStateException("recycled already");
                }
                srcElems[i] = null;
                //控制回收频率
                if (dst.dropHandle(element)) {
                    // Drop the object.
                    continue;
                }
                element.stack = dst;
                dstElems[newDstSize ++] = element;
            }
            // 遍历下一个节点
            if (srcEnd == LINK_CAPACITY && head.next != null) {
                // Add capacity back as the Link is GCed.
                this.head.relink(head.next);
            }

            head.readIndex = srcEnd;
            if (dst.size == newDstSize) {
                return false;
            }
            dst.size = newDstSize;
            return true;
        } else {
            // The destination stack is full already.
            return false;
        }
    }
}

3、回收对象 recycle()

/**
 * 1、同线程回收对象
 * 2、异线程回收对象
 */
@Override
public void recycle(Object object) {
    if (object != value) {
        throw new IllegalArgumentException("object does not belong to handle");
    }

    //此处的 stack 是创建 Handle 对象的线程持有的 stack
    Stack<?> stack = this.stack;
    if (lastRecycledId != recycleId || stack == null) {
        throw new IllegalStateException("recycled already");
    }

    stack.push(this);
}

void push(DefaultHandle<?> item) {
    Thread currentThread = Thread.currentThread();
    //当前线程是否是创建 stack 的线程
    if (threadRef.get() == currentThread) {
        // The current Thread is the thread that belongs to the Stack, we can try to push the object now.
        pushNow(item);
    } else {
        // The current Thread is not the one that belongs to the Stack
        // (or the Thread that belonged to the Stack was collected already), we need to signal that the push
        // happens later.
        pushLater(item, currentThread);
    }
}
3.1 同线程回收对象
private void pushNow(DefaultHandle<?> item) {
    if ((item.recycleId | item.lastRecycledId) != 0) {
        throw new IllegalStateException("recycled already");
    }
    item.recycleId = item.lastRecycledId = OWN_THREAD_ID;

    int size = this.size;
    //判断存放 handle 数量是否达到上限, 或者
    if (size >= maxCapacity || dropHandle(item)) {
        // Hit the maximum capacity or should drop - drop the possibly youngest object.
        return;
    }
    //如果数组被填充满了,那么重新创建一个 两倍大小的数组,上限是maxCapacity
    if (size == elements.length) {
        elements = Arrays.copyOf(elements, min(size << 1, maxCapacity));
    }

    elements[size] = item;
    this.size = size + 1;
}
3.2 异线程回收对象
/**
 * 异线程回收对象
 *
 * 1、获取 WeakOrderQueue
 * 2、创建 WeakOrderQueue
 * 3、将对象追加到 WeakOrderQueue
 */
private void pushLater(DefaultHandle<?> item, Thread thread) {
    if (maxDelayedQueues == 0) {
        // We don't support recycling across threads and should just drop the item on the floor.
        return;
    }

    // we don't want to have a ref to the queue as the value in our weak map
    // so we null it out; to ensure there are no races with restoring it later
    // we impose a memory ordering here (no-op on x86)
    Map<Stack<?>, WeakOrderQueue> delayedRecycled = DELAYED_RECYCLED.get();
    // this 是指创建 handle 的线程持有的 Stack
    WeakOrderQueue queue = delayedRecycled.get(this);
    //当前线程没有回收过,创建 handle 的线程所持有 Stack 的 handle
    if (queue == null) {
        //当前线程回收的 Stack 数量是否 达到上限
        if (delayedRecycled.size() >= maxDelayedQueues) {
            // Add a dummy queue so we know we should drop the object
            delayedRecycled.put(this, WeakOrderQueue.DUMMY);
            return;
        }
        // Check if we already reached the maximum number of delayed queues and if we can allocate at all.
        if ((queue = newWeakOrderQueue(thread)) == null) {
            // drop object
            return;
        }
        delayedRecycled.put(this, queue);
    } else if (queue == WeakOrderQueue.DUMMY) {
        // drop object
        return;
    }

    queue.add(item);
}

创建 WeakOrderQueue,插入到创建 handle 的线程持有的 Stack 中的 head 在链表头部插入。

static WeakOrderQueue newQueue(Stack<?> stack, Thread thread) {
    // We allocated a Link so reserve the space
    // 被其他线程 缓冲的对象 有没有超过上限
    if (!Head.reserveSpaceForLink(stack.availableSharedCapacity)) {
        return null;
    }
    final WeakOrderQueue queue = new WeakOrderQueue(stack, thread);
    // Done outside of the constructor to ensure WeakOrderQueue.this does not escape the constructor and so
    // may be accessed while its still constructed.
    //插入到创建 handle 的线程持有的 Stack 中的 head 在链表头部插入 queue 例如:  queue(当前线程回收的) --> queue1(线程1回收的) --> queue2(线程2回收的)
    stack.setHead(queue);

    return queue;
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容