1. Netty编码概述
一个问题
- 如何把对象编程字节流,最终写到socket底层?
相关调试代码:
/**
* @author stone
* @date 2019/8/11 19:11
*/
public class Ch09Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new Encoder());
ch.pipeline().addLast(new BizHandler());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
Encoder.java
public class Encoder extends MessageToByteEncoder<User> {
// 疑问1 out从哪里传来的?
// 疑问2 netty如何将out写入到socket中的?
@Override
protected void encode(ChannelHandlerContext ctx, User msg, ByteBuf out) throws Exception {
byte[] bytes = msg.getName().getBytes();
out.writeInt(4 + bytes.length);
out.writeInt(msg.getAge());
out.writeBytes(bytes);
}
}
BizHandler:
/**
* @author stone
* @date 2019/8/11 19:09
*/
public class BizHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
User user = new User();
user.setAge(19);
user.setName("zhangsan");
ctx.channel().writeAndFlush(user);
}
}
User
public class User {
private int age;
private String name;
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
2. writeAndFlush()抽象步骤
writeAndFlush大致步骤如下:
- 从tail节点开始往前传播
- 逐个调用channelHandler的write方法
- 逐个调用channelHandler的flush方法
跟踪 ctx.channel().writeAndFlush(user);
源码:
首先调用AbstractChannel中的方法
@Override
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}
接下来调用DefaultChannelPipeline的方法:
@Override
public final ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}
最终将消息写入tail,接着从tail节点writeAndFlush。AbstractChannelHandlerContext的方法:
@Override
public ChannelFuture writeAndFlush(Object msg) {
return writeAndFlush(msg, newPromise());
}
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
write(msg, true, promise); // 继续调用write方法,true表明消息是要写入socket的,false表示先写入缓存
return promise;
}
AbstractChannelHandlerContext的write方法:
private void write(Object msg, boolean flush, ChannelPromise promise) {
ObjectUtil.checkNotNull(msg, "msg");
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) { // 首先判断当前调用线程是不是我们的reactor线程
if (flush) { // 如果是reactor线程在调用,直接调用invokeWriteAndFlush方法
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else { // 如果当前调用线程不是reactor中的线程,则将写的动作封装成一个task任务,
final AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
if (!safeExecute(executor, task, promise, m)) {
// We failed to submit the AbstractWriteTask. We need to cancel it so we decrement the pending bytes
// and put it back in the Recycler for re-use later.
//
// See https://github.com/netty/netty/issues/8343.
task.cancel();
}
}
}
接着看invokeWriteAndFlush:
private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise); // 写的动作
invokeFlush0(); // flush的动作
} else {
writeAndFlush(msg, promise);
}
}
看invokeWrite0的方法,这里会传播到outBoundhandler的wirte方法:
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
会调用MessageToByteEncoder的write方法,其中会调用子类的encode方法:
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) {
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf); // 调用子类的编码方法
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) {
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}
buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
我们来看invokeFlush0方法:
private void invokeFlush0() {
try {
((ChannelOutboundHandler) handler()).flush(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
}
默认情况下flush动作会继续往前传播到head节点。
3. 抽象解码器MessageToByteEncoder
我们来分析MessageToByteEncoder的write方法。它的总体流程是这样的:
匹配对象(看看能不能处理,不能处理就扔到前面一个处理器处理) --> 分配内存 --> 编码实现 (encode方法实现)--> 释放对象(释放之前的ByteBuf)--> 传播数据 --> 释放内存(如果出现异常)。
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;
try {
if (acceptOutboundMessage(msg)) { // 判断当前encoder能否处理这个对象
@SuppressWarnings("unchecked")
I cast = (I) msg;
buf = allocateBuffer(ctx, cast, preferDirect);
try {
encode(ctx, cast, buf);
} finally {
ReferenceCountUtil.release(cast);
}
if (buf.isReadable()) { // 说明buf中已经写入了数据
ctx.write(buf, promise); // 把buf传播到head节点
} else { // 如果buf中没有字节需要对其进行释放
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise); // 然后传播了一个空的buf和回调
}
buf = null; // 传播完成将buf置空
} else { // 如果当前encoder不能处理,往下传播
ctx.write(msg, promise);
}
} catch (EncoderException e) {
throw e;
} catch (Throwable e) {
throw new EncoderException(e);
} finally {
if (buf != null) {
buf.release();
}
}
}
第一步匹配对象,我们来看acceptOutboundMessage:
public boolean acceptOutboundMessage(Object msg) throws Exception {
return matcher.match(msg);
}
private static final class ReflectiveMatcher extends TypeParameterMatcher {
private final Class<?> type;
ReflectiveMatcher(Class<?> type) {
this.type = type;
}
@Override
public boolean match(Object msg) {
return type.isInstance(msg); // 这里就直接判断是不是对应的类类型
}
}
第二步,分配内存allocateBuffer
,主要工作是调用内存分配器去分配内存:
protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, @SuppressWarnings("unused") I msg,
boolean preferDirect) throws Exception {
if (preferDirect) {
return ctx.alloc().ioBuffer();
} else {
return ctx.alloc().heapBuffer();
}
}
第三步,编码实现encode具体实现(在子类中的实现),encode(ctx, cast, buf);
将cast中的数据填充到buf中。
第四步,释放之前的对象(cast,如果cast是ByteBuf对象,netty就会对齐自动释放)。
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
第五步,传播数据。
if (buf.isReadable()) { // 说明buf中已经写入了数据
ctx.write(buf, promise); // 把buf传播到head节点
} else { // 如果buf中没有字节需要对其进行释放
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise); // 然后传播了一个空的buf和回调
}
buf = null; // 传播完成将buf置空
第六步,释放内存(如果出现异常且buf不为空,就释放内存)。
4. 写buffer队列
上一节中的write(bytebuf)最终传播到了head节点。我们来看head节点中的write方法。
来看DefaultChannelPipeline中内部类HeadContext的write方法:
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
unsafe.write(msg, promise); // 可以看出通过unsafe将msg写出去
}
我们看AbstractUnsafe的write方法,这一步我们叫做write写buffer队列。具体的步骤有:
- direct化ByteBuf
- 插入写队列
- 设置写状态(如果内存不足,不能一直往里边写ByteBuf)
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;// 缓冲写进来的ByteBuf
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, newWriteException(initialCloseCause));
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg); // direct化ByteBuf
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise); // 将堆外内存插入到outboundBuffer
}
第一步,direct化ByteBuf
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // 缓冲写进来的ByteBuf
我们看filterOutboundMessage(msg);
:
AbstractNioByteChannel中的filterOutboundMessage方法:
protected final Object filterOutboundMessage(Object msg) {
if (msg instanceof ByteBuf) {
ByteBuf buf = (ByteBuf) msg;
if (buf.isDirect()) { // 如果是direct buffer 直接返回
return msg;
}
return newDirectBuffer(buf); // 如果是heapBuffer,new一个directBuffer
}
if (msg instanceof FileRegion) {
return msg;
}
throw new UnsupportedOperationException(
"unsupported message type: " + StringUtil.simpleClassName(msg) + EXPECTED_TYPES);
}
看 newDirectBuffer:
protected final ByteBuf newDirectBuffer(ByteBuf buf) {
final int readableBytes = buf.readableBytes();
if (readableBytes == 0) {
ReferenceCountUtil.safeRelease(buf);
return Unpooled.EMPTY_BUFFER;
}
final ByteBufAllocator alloc = alloc();
if (alloc.isDirectBufferPooled()) {
ByteBuf directBuf = alloc.directBuffer(readableBytes);
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
final ByteBuf directBuf = ByteBufUtil.threadLocalDirectBuffer();
if (directBuf != null) {
directBuf.writeBytes(buf, buf.readerIndex(), readableBytes);
ReferenceCountUtil.safeRelease(buf);
return directBuf;
}
// Allocating and deallocating an unpooled direct buffer is very expensive; give up.
return buf;
}
第二步,插入写队列outboundBuffer.addMessage(msg, size, promise);
,ChannelOutboundBuffer是存储outbound写请求的:
/**
* Add given message to this {@link ChannelOutboundBuffer}. The given {@link ChannelPromise} will be notified once
* the message was written.
*/
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise); // 将message封装成Entry,接下来做的就是调整三个指针,调整outbuffer的状态
if (tailEntry == null) {
flushedEntry = null;
} else {
Entry tail = tailEntry;
tail.next = entry;
}
tailEntry = entry;
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false); //
}
三个指针的排列:
// Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
// 统计,更新缓冲区中的待写字节
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
// getWriteBufferHighWaterMark -- 写buffer的高水位置,默认64k
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
默认情况下超过64k调用setUnwritable:
private void setUnwritable(boolean invokeLater) {
for (;;) { // 自旋锁+cas的操作
final int oldValue = unwritable;
final int newValue = oldValue | 1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue == 0 && newValue != 0) {
fireChannelWritabilityChanged(invokeLater); // 传播一个不能写的事件
}
break;
}
}
}
5. flush-刷新buffer队列
我们来分析flush方法。
DefaultChannelPipeline中的HeadContext内部类,它的flush方法:
@Override
public void flush(ChannelHandlerContext ctx) {
unsafe.flush();
}
AbsttractUnsafe的flush方法:
public final void flush() {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; // 首先也是拿到缓冲区
if (outboundBuffer == null) {
return;
}
outboundBuffer.addFlush();
flush0();
}
三个步骤:
- 添加刷新标志并设置写状态
- 遍历buffer队列,过滤ByteBuf
- 调用jdk底层的api进行自旋的写
下面我们来看代码。
第一步,添加刷新标志并设置写状态,outboundBuffer.addFlush();
:
/**
* Add a flush to this {@link ChannelOutboundBuffer}. This means all previous added messages are marked as flushed
* and so you will be able to handle them.
*/
public void addFlush() {
// There is no need to process all entries if there was already a flush before and no new messages
// where added in the meantime.
//
// See https://github.com/netty/netty/issues/2577
Entry entry = unflushedEntry;
if (entry != null) {
if (flushedEntry == null) { // 把flushEntry指向第一个unflushEntry
// there is no flushedEntry yet, so start with the entry
flushedEntry = entry;
}
do {
flushed ++; // 计数可以flush多少个对象
if (!entry.promise.setUncancellable()) {
// Was cancelled so make sure we free up memory and notify about the freed bytes
int pending = entry.cancel();
decrementPendingOutboundBytes(pending, false, true); // 统计,更新缓冲区中的待写字节
}
entry = entry.next;
} while (entry != null);
// All flushed so reset unflushedEntry
unflushedEntry = null;
}
}
来看decrementPendingOutboundBytes,主要做两件事,一个更新缓冲区的待写字节,如果小于低水位,设置当前通道为可写状态:
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) { // 默认是32k
setWritable(invokeLater);
}
}
第二步,遍历buffer队列,过滤ByteBuf。flush0()
:
protected void flush0() {
if (inFlush0) {
// Avoid re-entrance
return;
}
final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null || outboundBuffer.isEmpty()) {
return;
}
inFlush0 = true;
// Mark all pending write requests as failure if the channel is inactive.
if (!isActive()) {
try {
if (isOpen()) {
outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
} else {
// Do not trigger channelWritabilityChanged because the channel is closed already.
outboundBuffer.failFlushed(newFlush0Exception(initialCloseCause), false);
}
} finally {
inFlush0 = false;
}
return;
}
try {
doWrite(outboundBuffer);
} catch (Throwable t) {
if (t instanceof IOException && config().isAutoClose()) {
/**
* Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
* failing all flushed messages and also ensure the actual close of the underlying transport
* will happen before the promises are notified.
*
* This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
* may still return {@code true} even if the channel should be closed as result of the exception.
*/
initialCloseCause = t;
close(voidPromise(), t, newFlush0Exception(t), false);
} else {
try {
shutdownOutput(voidPromise(), t);
} catch (Throwable t2) {
initialCloseCause = t;
close(voidPromise(), t2, newFlush0Exception(t), false);
}
}
} finally {
inFlush0 = false;
}
}
重点方法AbstractNioByteChannel中的dowrite方法:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = config().getWriteSpinCount();
do {
Object msg = in.current(); // 拿到第一个flushEntry的ByteBuf
if (msg == null) {
// Wrote all messages.
clearOpWrite();
// Directly return here so incompleteWrite(...) is not called.
return;
}
writeSpinCount -= doWriteInternal(in, msg);
} while (writeSpinCount > 0);
incompleteWrite(writeSpinCount < 0);
}
第三步,调用jdk底层api进行自旋写。来看doWriteInternal
。使用自旋锁可以提高并发情况下系统的吞吐量,int writeSpinCount = config().getWriteSpinCount();
,默认最多尝试16次:
private int doWriteInternal(ChannelOutboundBuffer in, Object msg) throws Exception {
if (msg instanceof ByteBuf) { // 如果是ByteBuf节点
ByteBuf buf = (ByteBuf) msg;
if (!buf.isReadable()) { // 如果bytebuf没有可写的,直接remove掉
in.remove();
return 0; // 返回0,外层的自旋锁减0,说明会继续尝试
}
final int localFlushedAmount = doWriteBytes(buf); // 将buf写入到对应的socket
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (!buf.isReadable()) {
in.remove();
}
return 1;
}
} else if (msg instanceof FileRegion) {// FileRegion可以理解为从磁盘的缓冲区写入到socket缓冲区
FileRegion region = (FileRegion) msg;
if (region.transferred() >= region.count()) {
in.remove();
return 0;
}
long localFlushedAmount = doWriteFileRegion(region); // localFlushedAmount记录向jdk底层写了多少字节
if (localFlushedAmount > 0) {
in.progress(localFlushedAmount);
if (region.transferred() >= region.count()) {
in.remove();
}
return 1;
}
} else {
// Should not reach here.
throw new Error();
}
return WRITE_STATUS_SNDBUF_FULL;
}
我们来看NioSocketChannel的doWriteBytes:
protected int doWriteBytes(ByteBuf buf) throws Exception {
final int expectedWrittenBytes = buf.readableBytes();
return buf.readBytes(javaChannel(), expectedWrittenBytes); // javaChannel() - jdk底层的channel,buf写入到java原生channel中
}
来看PooledDirectByteBuf的readBytes方法:
public int readBytes(GatheringByteChannel out, int length) throws IOException {
checkReadableBytes(length);
int readBytes = getBytes(readerIndex, out, length, true); // out - jdk底层的对象
readerIndex += readBytes;
return readBytes;
}
getBytes:
private int getBytes(int index, GatheringByteChannel out, int length, boolean internal) throws IOException {
checkIndex(index, length);
if (length == 0) {
return 0;
}
ByteBuffer tmpBuf; // 把数据写入tmpBuf,jdk底层的
if (internal) {
tmpBuf = internalNioBuffer();
} else {
tmpBuf = memory.duplicate();
}
index = idx(index);
tmpBuf.clear().position(index).limit(index + length);
return out.write(tmpBuf); // 把tmpBuf写入channel通道
}
6. netty编码总结
1. 如何把对象变成字节流,最终写到socket底层?
答:writeAndFlush举例,当调用writeAndFlush将自定义对象往前进行传播的时候,可以拆分成两个过程。第一个过程就是write事件通过pipeline逐层往前进行传播,传播到其中一个encoder节点,这个encoder节点默认继承自MessageToByteEncoder,就会调用这个encoder的write方法,将message对象转化为ByteBuf;MessageToByteEncoder要做的事情是首先分配一个ByteBuf,然后调用子类的encode方法,将message填充到ByteBuf,然后把ByteBuf往前进行传播,最终会将ByteBuf传播到pipeline的head节点;head节点调用write方法,会通过底层的unsafe进行相应的处理,处理流程是将当前的ByteBuf添加到unsafe维护的一个缓冲区中,同时会判断当前缓冲区的字节数是否超过最高水位(默认64k),如果超过设置当前通道为不可写,这一步完成之后,unsafe缓冲区中就维护了一个ByteBuf列表。
write方法完成之后,调用flush方法。flush事件从处理业务的handler一直传播到pipeline的head节点,然后head节点接收到flush事件后会调用底层的unsafe对象中的flush方法进行一系列的处理。首先对unsafe缓冲区的指针进行调整,不断地从缓冲区中拿出ByteBuf;然后将ByteBuf转为为jdk的ByteBuffer对象,通过jdk底层的channel,将ByteBuffer写出去,每写出一个就会对缓冲区中的当前节点进行删除;同时会判断当前缓冲区中的字节数是否小于最低水位(默认32k),如果小于就设置当前通道为可写状态;写到socket的过程是一个子旋写的过程,为了提高并发情况下的吞吐量,netty默认自旋时最多尝试16次。