Netty消息发送

写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的,所以当你注册写事件后,写操作一直是就绪的,选择处理线程全占用整个CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。


1.当我们某个channel写消息失败 很有可能就是缓冲区没有空间,所以这个时候我们注册写事件,当缓冲区有空闲空间时候会触发写,然后直接flush
在flush为写消息之前 我们后续的writeAndflsh操作都不会flush只会write

消息发送的代码方式:

1. ctx.writeAndFlush(new Object());

代表object从当前的handler流向head节点

2.ctx.channel().writeAndFlush(new Object());

代表object从tail节点流向head节点

源码分析--整体逻辑
    @Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
如果要发送的消息为null 抛出异常
        if (msg == null) {
            throw new NullPointerException("msg");
        }
校验promise是否可用
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
发送消息
        write(msg, true, promise);

        return promise;
    }
write方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
获取outbound类型的handlercontext
        AbstractChannelHandlerContext next = findContextOutbound();
touch只是返回对象本身
        final Object m = pipeline.touch(msg, next);
获取Nioeventloop
        EventExecutor executor = next.executor();
判断是否在io线程执行
        if (executor.inEventLoop()) {
根据是否flush执行不同的方法
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
依靠RECYCLER实现对象池,RECYCLER的机制以后再述
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
依靠NioEventLoop的execute去执行(若是外部线程存储,会唤醒正在阻塞的selector,如果是第一次被调用,则会启动一个本地线程做为nioeventloop的载体)
            safeExecute(executor, task, promise, m);
        }
    }
invokeWriteAndFlush
   private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
invokeHandler()是判断当前channelHandlerContext是否已经调用了handlerAdded
        if (invokeHandler()) {
将消息写入写队列
            invokeWrite0(msg, promise);
刷写队列
            invokeFlush0();
        } else {
没调用过则循环到下一个channelHandlerContext
            writeAndFlush(msg, promise);
        }
    }
invokeWrite0(msg, promise);
 private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
这个方法会调用outbound类型的channelCOntext的write方法,调用结束继续通过ctx.write方法调用到最后head的write方法
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
失败了设置promise的异常
            notifyOutboundHandlerException(t, promise);
        }
    }
HeadContext的write方法(最终调用的是unsafe的write方法)
  public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();
ChannelOutboundBuffer是保存待发送的数据
            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
      如果不存在就设置为失败
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
               是否msg的引用计数
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
如果是ByteBuf尝试把其包装成directByteBuf,如果是FileRegion直接发送
其他的都不会发送
                msg = filterOutboundMessage(msg);
获得要发送数据的大小
                size = pipeline.estimatorHandle().size(msg);
大小小于0把他置位0
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }
把当前的msg加入outboundBuffer的内部存储链表
            outboundBuffer.addMessage(msg, size, promise);
        }
filterOutboundMessage
  @Override
    protected final Object filterOutboundMessage(Object msg) {
判断是否是ByteBuf
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
如果是堆外byteBuf直接返回
            if (buf.isDirect()) {
                return msg;
            }
否则包装下
            return newDirectBuffer(buf);
        }
FileRegion直接返回
        if (msg instanceof FileRegion) {
            return msg;
        }
其他的类型抛出异常
        throw new UnsupportedOperationException(
                "unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
    }

pipeline.estimatorHandle().size(msg);
  @Override
        public int size(Object msg) {
如果是ByteBuf,直接估算
            if (msg instanceof ByteBuf) {
                return ((ByteBuf) msg).readableBytes();
            }
如果ByteBufHolder
            if (msg instanceof ByteBufHolder) {
                return ((ByteBufHolder) msg).content().readableBytes();
            }
如果是FileRegion直接就是0
            if (msg instanceof FileRegion) {
                return 0;
            }
            return unknownSize;
        }
    }
addMessage
 public void addMessage(Object msg, int size, ChannelPromise promise) {
这边还是依靠RECYCLER从对象池获取Entry , total(msg)针对ByteBuf是获取字节的长度,针对fileRegion是获取count即代表文件的个数
FileRegion支持零拷贝文件,如果jdk不支持可能出现异常
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
如果tailEntry==null 说明
        if (tailEntry == null) {
置flushedEntry 为null
            flushedEntry = null;
        } else {
获取尾部节点,将我们新创建的节点放入最后一个
            Entry tail = tailEntry;
            tail.next = entry;
        }
把握们创建的节点放入最后一个
        tailEntry = entry;
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }
pendingSize=size+io.netty.transport.outboundBufferEntrySizeOverhead属性的长度,
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
如果不在addmessage后面调用incrementPendingOutboundBytes这可能会触发用户处理程序写入第二条消息,从而可能以错误的顺序将消息保存在“未刷新”缓冲区中。
  private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }
通过AtomicLongFieldUpdater修改ChannelOutboundBuffer的totalPendingSize属性
        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
如果这个channel的缓冲字节长度已经超过,则Channel#isWritable()返回false,这边是如果已经超过,则设置不可写标识。isWritable()方法底层就是调用unwritable == 0;只有unwritable ==0才可以写
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }
fireChannelWritabilityChanged的好处就是channel的写能力变化时,消息发送者可以捕捉到,并调整消息发生速率
  private void setUnwritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue | 1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue == 0 && newValue != 0) {
当oldValue 是0新value不是0则传递写能力编号的事情,即不可以写了
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }
HeadContext的flush方法(最终调用的是unsafe的flush方法)
   public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                return;
            }
调用outboundBuffer的addflush
            outboundBuffer.addFlush();
开始发送到缓冲区
            flush0();
        }
调用这个方法意味着在此之前加入的消息都会被标识flushed
 public void addFlush() {
   如果在此期间已经在刷新且没有新的消息则不需要处理
        Entry entry = unflushedEntry;
因为在addMessage中 我们已经把unflushedEntry设置为非null
如果这边为null 代表已经刷新完成了所以不需要刷新
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
初始化是0 代表已经刷新的entry
                flushed ++;
设置该entry不可取消的标识
                if (!entry.promise.setUncancellable()) {
         entry如果已经取消 则 我们们释放entry对应的内存,减小outboudbuffer的大小,如果此时没有超过可写标识,则设置fireChannelWritabilityChanged
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // All flushed so reset unflushedEntry
            unflushedEntry = null;
        }
    }
flush0

   @Override
        protected final void flush0() {
isFlushPending
我们正常writeAndFlsh操作会判断isFlushPending
这个如果是true表示上一次flush的时候失败了,应该是channel不可写
所以我们继续调用flush不会真正flush
我们的reactor线程会因为注册了写事件,所以轮训selector时候会有写事件发生
然后绕过isFlushPending,继续去写,写成功了之后会清除写事件,所以后续的
writeAndFlsh时候isFlushPending为false。
结论:我觉得之所以这么做 有一个很重要的原因就是抱着消息的有序
前面的消息失败了,应该继续先重试前面的消息,如果不设置该标识
后续的writeAndFlsh会把后面的业务消息现行发送成功
            if (!isFlushPending()) {
                super.flush0();
            }
        }

   protected void flush0() {
是否正在flush这个此针对当前channel的channeloutboundbuffer
            if (inFlush0) {
                // Avoid re-entrance
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
如果为空直接返回        
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }
设置正在flush的标识
            inFlush0 = true;


如果channel不是激活状态,则设置失败标识,并且会删除entry(已经flushed-tail),清除NIO_BUFFERS中的数据,回收Entry对象
            if (!isActive()) {
                try {
                    if (isOpen()) {
                        outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
                    } else {
                        // Do not trigger channelWritabilityChanged because the channel is closed already.
                        outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                } finally {
                    inFlush0 = false;
                }
                return;
            }

            try {
发送数据
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                if (t instanceof IOException && config().isAutoClose()) {
                    /**
                     * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
                     * failing all flushed messages and also ensure the actual close of the underlying transport
                     * will happen before the promises are notified.
                     *
                     * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
                     * may still return {@code true} even if the channel should be closed as result of the exception.
                     */
                    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                } else {
                    try {
                        shutdownOutput(voidPromise(), t);
                    } catch (Throwable t2) {
                        close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
                    }
                }
            } finally {
                inFlush0 = false;
            }
        }

doWrite--在写入过程中把flushed的链表所有的bytebuffer数据聚集到一起组成一个1024个ByteBuffer数组进行发送,发送的中间但凡出现一点的写入失败则设置写事件,然后结束写任务,fileRegion是等bytebuf都处理完了才会去处理
 protected void doWrite(ChannelOutboundBuffer in) throws Exception {
获取最大发送的次数
  protected void doWrite(ChannelOutboundBuffer in) throws Exception {
        SocketChannel ch = javaChannel();
默认是16,也就是说entry链表 要发送16次,如果16次不够就说明没发完要设置半包标识也就是读事件
        int writeSpinCount = config().getWriteSpinCount();
        do {
说明发完了清除OP_WRITE
            if (in.isEmpty()) {
                // All written so clear OP_WRITE
                clearOpWrite();
                // Directly return here so incompleteWrite(...) is not called.
                return;
            }

         maxBytesPerGatheringWrite是sendBuffer的2倍
            int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
把整个链表组装成1024个ByteBuffer 每个ByteBuffer最大是maxBytesPerGatheringWrite
这里面只包含了ByteBuf 对于FileRegion在下面处理
            ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
            int nioBufferCnt = in.nioBufferCount();

            // Always us nioBuffers() to workaround data-corruption.
            // See https://github.com/netty/netty/issues/2761
            switch (nioBufferCnt) {
                case 0:
                  如果没有ByteBuffer,直接执行(其中会处理fileRegion)
                    writeSpinCount -= doWrite0(in);
                    break;
                case 1: {
           如果只有一个就直接写
                    ByteBuffer buffer = nioBuffers[0];
我们尝试发生的数据量
                    int attemptedBytes = buffer.remaining();
本写入的数据
                    final int localWrittenBytes = ch.write(buffer);
localWrittenBytes 小于0 说明没有写成功
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
调整MaxBytesPerGatheringWrite
                    adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
删除已经写完的数据
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
                default: {
   尝试一次性的吧nioBuffers都写完
                    long attemptedBytes = in.nioBufferSize();
                    final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
写失败了设置 写事件
                    if (localWrittenBytes <= 0) {
                        incompleteWrite(true);
                        return;
                    }
                  调整MaxBytesPerGatheringWrite
                    adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
                            maxBytesPerGatheringWrite);
                    in.removeBytes(localWrittenBytes);
                    --writeSpinCount;
                    break;
                }
            }
        } while (writeSpinCount > 0);
  一般writeSpinCount < 0 ,但是如果在这过程中发生发送失败的问题 都是直接结束 等待下一次的写
        incompleteWrite(writeSpinCount < 0);
    }

    protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
        Object msg = in.current();
看看当前是否为null null 直接返回
        if (msg == null) {
            // Directly return here so incompleteWrite(...) is not called.
            return 0;
        }
        return doWriteInternal(in, in.current());
    }

 private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
如果是ByteBuf
        if (msg instanceof ByteBuf) {
            ByteBuf buf = (ByteBuf) msg;
            if (!buf.isReadable()) {
不可读直接删除
                in.remove();
                return 0;
            }

            final int localFlushedAmount = doWriteBytes(buf);
如果大于0代表已经写出 否则返回Inter最大值标识有未写入的
            if (localFlushedAmount > 0) {
给ChannelProgressivePromise触发tryProcess方法
                in.progress(localFlushedAmount);
                if (!buf.isReadable()) {
                    in.remove();
                }
                return 1;
            }
FileRegion同上
        } else if (msg instanceof FileRegion) {
            FileRegion region = (FileRegion) msg;
            if (region.transferred() >= region.count()) {
                in.remove();
                return 0;
            }

            long localFlushedAmount = doWriteFileRegion(region);
            if (localFlushedAmount > 0) {
                in.progress(localFlushedAmount);
                if (region.transferred() >= region.count()) {
                    in.remove();
                }
                return 1;
            }
        } else {
            // Should not reach here.
            throw new Error();
        }
        return WRITE_STATUS_SNDBUF_FULL;
    }

如果为true 代表在发生flushed链表的时候一定是有发生失败的 重新在写 如果为false
则代表已经发送结束了 清除写事件
 protected final void incompleteWrite(boolean setOpWrite) {
        // Did not write completely.
        if (setOpWrite) {
            setOpWrite();
        } else {
            // It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
            // use our write quantum. In this case we no longer want to set the write OP because the socket is still
            // writable (as far as we know). We will find out next time we attempt to write if the socket is writable
            // and set the write OP if necessary.
            clearOpWrite();

          设置一个未来直接flush的任务
            eventLoop().execute(flushTask);
        }
    }

flushedEntry--第一个被写到操作系统Socket缓冲区中的节点(已经被被写入)
unflushedEntry--指针表示第一个未被写入到操作系统Socket缓冲区中的节点
tailEntry---链表的最后一个节点
具体可以看闪电侠的简书:https://www.jianshu.com/p/feaeaab2ce56

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,293评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,604评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,958评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,729评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,719评论 5 366
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,630评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,000评论 3 397
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,665评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,909评论 1 299
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,646评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,726评论 1 330
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,400评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,986评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,959评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,197评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 44,996评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,481评论 2 342

推荐阅读更多精彩内容

  • Netty源码分析——flush流程 前言 承接上篇写流程,这篇看下flush流程。之前文章中我们已经提到过,wr...
    Java小铺阅读 1,702评论 0 0
  • 前言 netty源码分析之pipeline(一)中,我们已经了解了pipeline在netty中所处的角色,像是一...
    简书闪电侠阅读 16,139评论 15 35
  • 一、ChannelOutboundBuffer 1、定义 是AbstractUnsafe使用的数据结构,用来存储待...
    益文的圈阅读 4,809评论 0 2
  • 物竞天择,优胜劣汰的生存法则天天在上演。 在我看来,其实是优质时间吞并劣质时间的一个过程,是未来战胜了当下。 每一...
    士心文人阅读 381评论 0 0
  • 《夜雨吟》 夜雨诗词吟无数,不想老去已十年。 更有二老在前头,寒露过后是霜天。
    梅心梅飞阅读 126评论 6 23