写操作的就绪条件为底层缓冲区有空闲空间,而写缓冲区绝大部分时间都是有空闲空间的,所以当你注册写事件后,写操作一直是就绪的,选择处理线程全占用整个CPU资源。所以,只有当你确实有数据要写时再注册写操作,并在写完以后马上取消注册。
1.当我们某个channel写消息失败 很有可能就是缓冲区没有空间,所以这个时候我们注册写事件,当缓冲区有空闲空间时候会触发写,然后直接flush
在flush为写消息之前 我们后续的writeAndflsh操作都不会flush只会write
消息发送的代码方式:
1. ctx.writeAndFlush(new Object());
代表object从当前的handler流向head节点
2.ctx.channel().writeAndFlush(new Object());
代表object从tail节点流向head节点
源码分析--整体逻辑
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
如果要发送的消息为null 抛出异常
if (msg == null) {
throw new NullPointerException("msg");
}
校验promise是否可用
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
发送消息
write(msg, true, promise);
return promise;
}
write方法
private void write(Object msg, boolean flush, ChannelPromise promise) {
获取outbound类型的handlercontext
AbstractChannelHandlerContext next = findContextOutbound();
touch只是返回对象本身
final Object m = pipeline.touch(msg, next);
获取Nioeventloop
EventExecutor executor = next.executor();
判断是否在io线程执行
if (executor.inEventLoop()) {
根据是否flush执行不同的方法
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
依靠RECYCLER实现对象池,RECYCLER的机制以后再述
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
依靠NioEventLoop的execute去执行(若是外部线程存储,会唤醒正在阻塞的selector,如果是第一次被调用,则会启动一个本地线程做为nioeventloop的载体)
safeExecute(executor, task, promise, m);
}
}
invokeWriteAndFlush
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
invokeHandler()是判断当前channelHandlerContext是否已经调用了handlerAdded
if (invokeHandler()) {
将消息写入写队列
invokeWrite0(msg, promise);
刷写队列
invokeFlush0();
} else {
没调用过则循环到下一个channelHandlerContext
writeAndFlush(msg, promise);
}
}
invokeWrite0(msg, promise);
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
这个方法会调用outbound类型的channelCOntext的write方法,调用结束继续通过ctx.write方法调用到最后head的write方法
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
失败了设置promise的异常
notifyOutboundHandlerException(t, promise);
}
}
HeadContext的write方法(最终调用的是unsafe的write方法)
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer是保存待发送的数据
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
如果不存在就设置为失败
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
是否msg的引用计数
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
如果是ByteBuf尝试把其包装成directByteBuf,如果是FileRegion直接发送
其他的都不会发送
msg = filterOutboundMessage(msg);
获得要发送数据的大小
size = pipeline.estimatorHandle().size(msg);
大小小于0把他置位0
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
把当前的msg加入outboundBuffer的内部存储链表
outboundBuffer.addMessage(msg, size, promise);
}
filterOutboundMessage
@Override
protected final Object filterOutboundMessage(Object msg) {
判断是否是ByteBuf
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
如果是堆外byteBuf直接返回
if (buf.isDirect()) {
return msg;
}
否则包装下
return newDirectBuffer(buf);
}
FileRegion直接返回
if (msg instanceof FileRegion) {
return msg;
}
其他的类型抛出异常
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
pipeline.estimatorHandle().size(msg);
@Override
public int size(Object msg) {
如果是ByteBuf,直接估算
if (msg instanceof ByteBuf) {
return ((ByteBuf) msg).readableBytes();
}
如果ByteBufHolder
if (msg instanceof ByteBufHolder) {
return ((ByteBufHolder) msg).content().readableBytes();
}
如果是FileRegion直接就是0
if (msg instanceof FileRegion) {
return 0;
}
return unknownSize;
}
}
addMessage
public void addMessage(Object msg, int size, ChannelPromise promise) {
这边还是依靠RECYCLER从对象池获取Entry , total(msg)针对ByteBuf是获取字节的长度,针对fileRegion是获取count即代表文件的个数
FileRegion支持零拷贝文件,如果jdk不支持可能出现异常
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
如果tailEntry==null 说明
if (tailEntry == null) {
置flushedEntry 为null
flushedEntry = null;
} else {
获取尾部节点,将我们新创建的节点放入最后一个
Entry tail = tailEntry;
tail.next = entry;
}
把握们创建的节点放入最后一个
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
pendingSize=size+io.netty.transport.outboundBufferEntrySizeOverhead属性的长度,
incrementPendingOutboundBytes(entry.pendingSize, false);
}
如果不在addmessage后面调用incrementPendingOutboundBytes这可能会触发用户处理程序写入第二条消息,从而可能以错误的顺序将消息保存在“未刷新”缓冲区中。
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
通过AtomicLongFieldUpdater修改ChannelOutboundBuffer的totalPendingSize属性
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
如果这个channel的缓冲字节长度已经超过,则Channel#isWritable()返回false,这边是如果已经超过,则设置不可写标识。isWritable()方法底层就是调用unwritable == 0;只有unwritable ==0才可以写
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
fireChannelWritabilityChanged的好处就是channel的写能力变化时,消息发送者可以捕捉到,并调整消息发生速率
private void setUnwritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
当oldValue 是0新value不是0则传递写能力编号的事情,即不可以写了
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
HeadContext的flush方法(最终调用的是unsafe的flush方法)
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
return;
}
调用outboundBuffer的addflush
outboundBuffer.addFlush();
开始发送到缓冲区
flush0();
}
调用这个方法意味着在此之前加入的消息都会被标识flushed
public void addFlush() {
如果在此期间已经在刷新且没有新的消息则不需要处理
Entry entry = unflushedEntry;
因为在addMessage中 我们已经把unflushedEntry设置为非null
如果这边为null 代表已经刷新完成了所以不需要刷新
if (entry != null) {
if (flushedEntry == null) {
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
初始化是0 代表已经刷新的entry
flushed ++;
设置该entry不可取消的标识
if (!entry.promise.setUncancellable()) {
entry如果已经取消 则 我们们释放entry对应的内存,减小outboudbuffer的大小,如果此时没有超过可写标识,则设置fireChannelWritabilityChanged
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true);
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
flush0
@Override
protected final void flush0() {
isFlushPending
我们正常writeAndFlsh操作会判断isFlushPending
这个如果是true表示上一次flush的时候失败了,应该是channel不可写
所以我们继续调用flush不会真正flush
我们的reactor线程会因为注册了写事件,所以轮训selector时候会有写事件发生
然后绕过isFlushPending,继续去写,写成功了之后会清除写事件,所以后续的
writeAndFlsh时候isFlushPending为false。
结论:我觉得之所以这么做 有一个很重要的原因就是抱着消息的有序
前面的消息失败了,应该继续先重试前面的消息,如果不设置该标识
后续的writeAndFlsh会把后面的业务消息现行发送成功
if (!isFlushPending()) {
super.flush0();
}
}
protected void flush0() {
是否正在flush这个此针对当前channel的channeloutboundbuffer
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
如果为空直接返回
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
设置正在flush的标识
inFlush0 = true;
如果channel不是激活状态,则设置失败标识,并且会删除entry(已经flushed-tail),清除NIO_BUFFERS中的数据,回收Entry对象
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
发送数据
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
close(voidPromise(), t2, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
}
}
} finally {
inFlush0 = false;
}
}
doWrite--在写入过程中把flushed的链表所有的bytebuffer数据聚集到一起组成一个1024个ByteBuffer数组进行发送,发送的中间但凡出现一点的写入失败则设置写事件,然后结束写任务,fileRegion是等bytebuf都处理完了才会去处理
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
获取最大发送的次数
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SocketChannel ch = javaChannel();
默认是16,也就是说entry链表 要发送16次,如果16次不够就说明没发完要设置半包标识也就是读事件
int writeSpinCount = config().getWriteSpinCount();
do {
说明发完了清除OP_WRITE
if (in.isEmpty()) {
// All written so clear OP_WRITE
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
maxBytesPerGatheringWrite是sendBuffer的2倍
int maxBytesPerGatheringWrite = ((NioSocketChannelConfig) config).getMaxBytesPerGatheringWrite();
把整个链表组装成1024个ByteBuffer 每个ByteBuffer最大是maxBytesPerGatheringWrite
这里面只包含了ByteBuf 对于FileRegion在下面处理
ByteBuffer[] nioBuffers = in.nioBuffers(1024, maxBytesPerGatheringWrite);
int nioBufferCnt = in.nioBufferCount();
// Always us nioBuffers() to workaround data-corruption.
// See https://github.com/netty/netty/issues/2761
switch (nioBufferCnt) {
case 0:
如果没有ByteBuffer,直接执行(其中会处理fileRegion)
writeSpinCount -= doWrite0(in);
break;
case 1: {
如果只有一个就直接写
ByteBuffer buffer = nioBuffers[0];
我们尝试发生的数据量
int attemptedBytes = buffer.remaining();
本写入的数据
final int localWrittenBytes = ch.write(buffer);
localWrittenBytes 小于0 说明没有写成功
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
调整MaxBytesPerGatheringWrite
adjustMaxBytesPerGatheringWrite(attemptedBytes, localWrittenBytes, maxBytesPerGatheringWrite);
删除已经写完的数据
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
default: {
尝试一次性的吧nioBuffers都写完
long attemptedBytes = in.nioBufferSize();
final long localWrittenBytes = ch.write(nioBuffers, 0, nioBufferCnt);
写失败了设置 写事件
if (localWrittenBytes <= 0) {
incompleteWrite(true);
return;
}
调整MaxBytesPerGatheringWrite
adjustMaxBytesPerGatheringWrite((int) attemptedBytes, (int) localWrittenBytes,
maxBytesPerGatheringWrite);
in.removeBytes(localWrittenBytes);
--writeSpinCount;
break;
}
}
} while (writeSpinCount > 0);
一般writeSpinCount < 0 ,但是如果在这过程中发生发送失败的问题 都是直接结束 等待下一次的写
incompleteWrite(writeSpinCount < 0);
}
protected final int doWrite0(ChannelOutboundBuffer in) throws Exception {
Object msg = in.current();
看看当前是否为null null 直接返回
if (msg == null) {
// Directly return here so incompleteWrite(...) is not called.
return 0;
}
return doWriteInternal(in, in.current());
}
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
如果是ByteBuf
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) {
不可读直接删除
in.remove();
return 0;
}
final int localFlushedAmount = doWriteBytes(buf);
如果大于0代表已经写出 否则返回Inter最大值标识有未写入的
if (localFlushedAmount > 0) {
给ChannelProgressivePromise触发tryProcess方法
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
FileRegion同上
} else if (msg instanceof FileRegion) {
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region);
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
如果为true 代表在发生flushed链表的时候一定是有发生失败的 重新在写 如果为false
则代表已经发送结束了 清除写事件
protected final void incompleteWrite(boolean setOpWrite) {
// Did not write completely.
if (setOpWrite) {
setOpWrite();
} else {
// It is possible that we have set the write OP, woken up by NIO because the socket is writable, and then
// use our write quantum. In this case we no longer want to set the write OP because the socket is still
// writable (as far as we know). We will find out next time we attempt to write if the socket is writable
// and set the write OP if necessary.
clearOpWrite();
设置一个未来直接flush的任务
eventLoop().execute(flushTask);
}
}
flushedEntry--第一个被写到操作系统Socket缓冲区中的节点(已经被被写入)
unflushedEntry--指针表示第一个未被写入到操作系统Socket缓冲区中的节点
tailEntry---链表的最后一个节点
具体可以看闪电侠的简书:https://www.jianshu.com/p/feaeaab2ce56