netty之write和flush

在netty开发中,当调用pipeline的write方法时,并不会将数据直接写入到底层channel通道发送出去,而是先添加到缓冲区中;只有当调用flush方法,才会真正将数据从缓冲区写入到channel并发送出去。netty还提供了一个简便的方法,结合两者的功能writeAndFlush。

在之前的文章说过,应用程序开发中,主动调用IO操作,比如write、bind等等,触发的IO操作在pipeline上会从尾部的出站handler传播到头部出站handler,中间可能会经过各种编码器等等,但最终都会经过netty内置在pipeline上的HeadContext,而HeadContext的IO操作方法又是委托给内部的unsafe。

因此本文就来讲述下unsafe的write和flush方法的处理逻辑。

write源码
public final void write(Object msg, ChannelPromise promise) {
    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    int size;
    try {
        msg = filterOutboundMessage(msg);
        size = pipeline.estimatorHandle().size(msg);
        if (size < 0) {
            size = 0;
        }
    } catch (Throwable t) {
        try {
            ReferenceCountUtil.release(msg);
        } finally {
            safeSetFailure(promise, t);
        }
        return;
    }
      
    outboundBuffer.addMessage(msg, size, promise);
}

write方法主要有两个步骤

  1. 会先对msg进行校验,校验msg是不是ByteBuf或者FileRegion类型的,非这两种类型直接抛出异常
  2. 将消息添加到缓冲区outboundBuffer中
public void addMessage(Object msg, int size, ChannelPromise promise) {
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
        flushedEntry = null;
    } else {
        Entry tail = tailEntry;
        tail.next = entry;
    }
    tailEntry = entry;
    if (unflushedEntry == null) {
        unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    incrementPendingOutboundBytes(entry.pendingSize, false);
}

其实,netty底层维护的是一个链表,链表的每个元素是一个Entry对象,而待发送的消息msg和promise就是维护在entry对象中的。

将消息添加到链表后,还会将此条消息占用的字节数(实际发送的+entry本身的开销)维护到channel的一个全局变量totalPendingSize中,表示待发送的总字节数。并且与高水位值比较,若比较大,那么会触发handler的unwritable方法。这个后续再用专门的章节来说明。

至此,write方法主要是将消息封装成entry对象,然后添加到channel对应的outboundBuffer维护的链表当中,并且会判断待发送的总字节数是否超过高水位值,若超出了,则触发handler的unwritable方法

接下来,来看下flush方法处理逻辑,调用这方法会真正的将数据从buffer写入到channel通道去

flush源码
public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
        return;
    }

    outboundBuffer.addFlush();
    flush0();
}

addFlushed()方法主要是将outboundBuffer底层的链表的首指针flushedEntry指向链表第一个元素,然后unflushedEntry置为null,再算下待发送的entry数量。

flush0()会先判断channel注册到的selector有没正在监控write事件,没有的话才会去处理数据,有的话,在NioEventLoop的事件循环中会处理,这里就无需处理了

if ((readyOps & SelectionKey.OP_WRITE) != 0) {
    // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
    ch.unsafe().forceFlush();
}

下面为真正的处理数据发送逻辑

@Override
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SocketChannel ch = javaChannel();
//此次写操作,调用channel的write方法次数上限,默认值为16
    int writeSpinCount = config().getWriteSpinCount();
    do {
//若输出缓冲区是空的,也就是底层entry链表没有元素了,一般是已经写完数据了,那么取消selector的OP_WRITE事件,然后直接return
        if (in.isEmpty()) {
            clearOpWrite();
            return;
        }
        // 每次写请求,gathering的最大字节数
        int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
//将outboundBuffer的entry链表的消息转成nio的ByteBuffer数组
        ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
//ByteBuffer的数量
        int nioBufferCnt = in.nioBufferCount();

        //将ByteBuffer写入到channel,分为三种情况处理
        switch (nioBufferCnt) {

            case 0:
                //为0的情况正常不会出现
                writeSpinCount -= doWrite0(in);
                break;
            case 1: {
                //只有1个ByteBuffer,通常是write后便直接flush的,只有一个buffer的情况,直接用普通的
                ByteBuffer buffer = nioBuffers[0];
                int attemptedBytes = buffer.remaining();
                final int localWrittenBytes = ch.write(buffer);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
            default: {
                // 有多个ByteBuffer,一般是调用多次write方法后,再调flush方法的
                long attemptedBytes = in.nioBufferSize();
                final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
                if (localWrittenBytes <= 0) {
                    incompleteWrite(true);
                    return;
                }
                // Casting to int is safe because we limit the total amount of data in the nioBuffers to int above.
                adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                        maxBytesPerGatheringWrite);
                in.removeBytes(localWrittenBytes);
                --writeSpinCount;
                break;
            }
        }
    } while (writeSpinCount > 0);

//若这此次的循环中未将所有数据写入到channel中,那么注册selector的OP_WRITE事件,再下次的事件循环中处理
    incompleteWrite(writeSpinCount < 0);
}

上述switch块内有几个重要点

  1. 不同nioBufferCnt数量的不同处理逻辑,主要有当为1时,直接用底层channel的write方法,写入单个ByteBuffer;当于1,一次性写入多个ByteBuffer,使用的是gathering技术
  2. 当调用write方法返回后,如实际写入的字节数<=0,可能是此时channel的socket缓冲区已经满了,不允许写入数据了,所以需要向selector注册OP_WRITE事件,待下次事件循环中再处理。
  3. 会根据试图写入的数据和实际写入的数据动态调整maxBytesPerGatheringWrite
  4. 已写入的数据需要从缓冲区移除,不然下次循环会重复写入数据
public void removeBytes(long writtenBytes) {
    for (;;) {
//获取当前flushedEntry对象
        Object msg = current();
        if (!(msg instanceof ByteBuf)) {
            assert writtenBytes == 0;
            break;
        }

        final ByteBuf buf = (ByteBuf) msg;
        final int readerIndex = buf.readerIndex();
        final int readableBytes = buf.writerIndex() - readerIndex;
//比较当前entry对应的消息字节数和已写入字节数,
//当写入的较大或者相等时,表示这个entry对象消息已经全部写入到channel了  
//因此,会调用remove方法(这方法主要作用是将entry对象从链表移除,且通知ChannelPromise的监听器)
//当写入的字节数<这个entry对象的消息字节数时,说明只写入了一部分,那么只需要移动readerIndex的索引值
..
        if (readableBytes <= writtenBytes) {
            if (writtenBytes != 0) {
                progress(readableBytes);
                writtenBytes -= readableBytes;
            }
            remove();
        } else { // readableBytes > writtenBytes
            if (writtenBytes != 0) {
                buf.readerIndex(readerIndex + (int) writtenBytes);
                progress(writtenBytes);
            }
            break;
        }
    }
//将ByteBuffer数组的所有ByteBuffer值置为null
    clearNioBuffers();
}

在上面这个方法内部的remove方法,主要逻辑是将entry对象从链表移除,并且通知promise的监听器

public boolean remove() {
    Entry e = flushedEntry;
//当前链表为空了,那么将线程的ByteBuffer数组的所有元素都置为null
    if (e == null) {
        clearNioBuffers();
        return false;
    }
    Object msg = e.msg;

    ChannelPromise promise = e.promise;
    int size = e.pendingSize;

//当前entry对象从链表移除
    removeEntry(e);

    if (!e.cancelled) {
        // 每个entry对象维护的msg是ByteBuf类型的,因为已经写入到channel了,所以将引用计数-1
        ReferenceCountUtil.safeRelease(msg);
//promise结果值置为成功,并通知promise的监听器
        safeSuccess(promise);
//扣掉缓冲区待发送的字节数,并且判断是否小于低水位值了,若小于,则触发channelWritabilityChanged方法
        decrementPendingOutboundBytes(size, false, true);
    }

    //回收entry对象
    e.recycle();

    return true;
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 195,783评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,360评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 142,942评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,507评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,324评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,299评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,685评论 3 386
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,358评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,652评论 1 293
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,704评论 2 312
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,465评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,318评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,711评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,991评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,265评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,661评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,864评论 2 335

推荐阅读更多精彩内容