Netty 之 ChannelOutboundBuffer 源码分析

每个 ChannelSocket 的 Unsafe 都有一个绑定的 ChannelOutboundBuffer , Netty 向站外输出数据的过程统一通过 ChannelOutboundBuffer 类进行封装,目的是为了提高网络的吞吐量,在外面调用 write 的时候,数据并没有写到 Socket,而是写到了 ChannelOutboundBuffer 这里,当调用 flush 的时候,才真正的向 Socket 写出。

ChannelOutboundBuffer 类属性

public final class ChannelOutboundBuffer {

    // Assuming a 64-bit JVM:
    //  - 16 bytes object header
    //  - 8 reference fields
    //  - 2 long fields
    //  - 2 int fields
    //  - 1 boolean field
    //  - padding
    static final int CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD =
            SystemPropertyUtil.getInt("io.netty.transport.outboundBufferEntrySizeOverhead", 96);


    private final Channel channel;

    // Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
    //
    // 缓存链表中被刷新的第一个元素
    private Entry flushedEntry;
    // 缓存链表中中第一个未刷新的元素
    private Entry unflushedEntry;
    // 缓存链表中的尾元素
    private Entry tailEntry;
    // 刷新但还没有写入到 socket 中的数量
    private int flushed;

从类的属性中可以看出 ChannelOutboundBuffer 定义了一个链表进行存储写入的数据。

1、ChannelOutboundBuffer 添加了N个 Entry


2、当 flush 后的效果


3、flush 后把所有的数据都写入 Socket 中


4、当再次写入 Entry 后


下面针对操作这些操作进行分析,添加、刷新、删除等操作。

addMessage() 添加操作

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

1、创建一个 新的Entry。
2、判断 tailEntry 是否为 null,如果为 null 说明链表为空。则把 flushedEntry 置为null。
3、如果 tailEntry 不为空,则把新添加的 Entry 添加到 tailEntry 后面 。
4、 将新添加的 Entry 设置为 链表的 tailEntry。
5、如果 unflushedEntry 为null,说明没有未被刷新的元素。新添加的Entry 肯定是未被刷新的,则把当前 Entry 设置为 unflushedEntry 。
6、统计未被刷新的元素的总大小。

addFlush() 刷新操作

当 addMessage 成功添加进 ChannelOutboundBuffer 后,就需要 flush 刷新到 Socket 中去。但是这个方法并不是做刷新到 Socket 的操作。而是将 unflushedEntry 的引用转移到 flushedEntry 引用中,表示即将刷新这个 flushedEntry,至于为什么这么做?
答:因为 Netty 提供了 promise,这个对象可以做取消操作,例如,不发送这个 ByteBuf 了,所以,在 write 之后,flush 之前需要告诉 promise 不能做取消操作了。

public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
        if (flushedEntry == null) {
            // there is no flushedEntry yet, so start with the entry
            flushedEntry = entry;
        }
        do {
            flushed ++;
            if (!entry.promise.setUncancellable()) {
                // Was cancelled so make sure we free up memory and notify about the freed bytes
                int pending = entry.cancel();
                decrementPendingOutboundBytes(pending, false, true);
            }
            entry = entry.next;
        } while (entry != null);

        // All flushed so reset unflushedEntry
        unflushedEntry = null;
    }
}

1、通过 unflushedEntry 获取未被刷新元素 entry。
2、如果 entry 为null 说明没有待刷新的元素,不执行任何操作
3、如果 entry 不为 null,说明有需要被刷新的元素
4、如果 flushedEntry == null 说明当前没有正在刷新的任务,则把 entry 设置为 flushedEntry 刷新的起点。
5、循环设置 entry, 设置这些 entry 状态设置为非取消状态,如果设置失败,则把这些entry 节点取消并使 totalPendingSize 减去这个节点的字节大小。

在调用完 outboundBuffer.addFlush() 方法后,Channel 会调用 flush0 方法做真正的刷新。代码参见 AbstractUnsafe.flush() 方法。

remove() 删除操作

public boolean remove() {
    Entry e = flushedEntry;
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

    removeEntry(e);

    if (!e.cancelled) {
        // only release message, notify and decrement if it was not canceled before.
        ReferenceCountUtil.safeRelease(msg);
        safeSuccess(promise);
        decrementPendingOutboundBytes(size, false, true);
    }

    // recycle the entry
    e.recycle();

    return true;
}

1、获取 flushedEntry 节点,链表的头结点。如果获取不到清空 ByteBuf 缓存。
2、在链表上移除该 Entry。如果之前没有取消,只释放消息、通知和递减。
3、回收 Entry 对象。

removeEntry() 链表移除节点

private void removeEntry(Entry e) {
    if (-- flushed == 0) {
        // processed everything
        flushedEntry = null;
        if (e == tailEntry) {
            tailEntry = null;
            unflushedEntry = null;
        }
    } else {
        flushedEntry = e.next;
    }
}

1、如果 flushed ==0 说明,链表中所有 flush 的数据都已经发送到 Socket 中。把 flushedEntry 置位 null。此时链表可能还有 unflushedEntry 数据。
如果此时 e == tailEntry 说明链表为空,则把 tailEntry 和 unflushedEntry 都置为空。

flush==0,可能是这种状态,如下图:


2、把 flushedEntry 置为下一个节点(flushedEntry 此时是头结点)。

Entry 对象池使用

由于 Entry 使用比较频繁,会频繁的创建和销毁,这里使用了 Entry 的对象池,创建的时候从缓存中获取,销毁时回收。

下面看下 Entry 的创建过程

Entry 创建

public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    ..... 

使用 Entry.newInstance 方法进行创建 Entry 对象。

static final class Entry {
    private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
        @Override
        protected Entry newObject(Handle<Entry> handle) {
            return new Entry(handle);
        }
    };

    static Entry newInstance(Object msg, int size, long total, ChannelPromise promise) {
        Entry entry = RECYCLER.get();
        entry.msg = msg;
        entry.pendingSize = size + CHANNEL_OUTBOUND_BUFFER_ENTRY_OVERHEAD;
        entry.total = total;
        entry.promise = promise;
        return entry;
    }

Entry.newInstance() 方法收下从 Recycler 中获取 Entry 对象,如果获取不到则使用 Recycler.newObject() 方法创建一个 Entry 对象,调用 newObject() 方法在 Recycler.get() 方法中,代码如下:

public abstract class Recycler<T> {
    public final T get() {
        if (maxCapacityPerThread == 0) {
            return newObject((Handle<T>) NOOP_HANDLE);
        }
        Stack<T> stack = threadLocal.get();
        DefaultHandle<T> handle = stack.pop();
        if (handle == null) {
            handle = stack.newHandle();
            handle.value = newObject(handle);
        }
        return (T) handle.value;
    }

从 Recycler 中可以看出,Entry 对象是存储在 Stack 中。如果 Stack 中没有可用的 Stack,则调用 newObject() 方法创建。

Entry 对象回收

public boolean remove() {
    Entry e = flushedEntry;
    ...
    e.recycle();

    return true;
}

当 Entry 从链表中移除的时候回调用 e.recycle() 方法。

static final class Entry {
    void recycle() {
        next = null;
        bufs = null;
        buf = null;
        msg = null;
        promise = null;
        progress = 0;
        total = 0;
        pendingSize = 0;
        count = -1;
        cancelled = false;
        handle.recycle(this);
    }

Entry.recycle() 方法会把 Entry 中成员变量全部初始化。然后在调用 handle.recycle() 方法。

public abstract class Recycler<T> {
    static final class DefaultHandle<T> implements Handle<T> {
        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            if (lastRecycledId != recycleId || stack == null) {
                throw new IllegalStateException("recycled already");
            }

            stack.push(this);
        }
    }

Recycler.recycle() 方法又把 Entry 对象压进 stack 中。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,088评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,715评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,361评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,099评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 60,987评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,063评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,486评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,175评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,440评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,518评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,305评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,190评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,550评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,880评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,152评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,451评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,637评论 2 335

推荐阅读更多精彩内容