1.触发场景
Client端发出Netty rocks!
字符串,Server将其解释成3个Integer类型
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 当被通知Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
Decode
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if(in.readableBytes()>0)
{
out.add(String.valueOf(in.readInt()));
}
}
}
结果是Server同时接收到3次请求处理,直接将发出请求截断掉了
Server received: 1315271796
Server received: 2032169583
Server received: 1667986209
2. ByteToMessageDecoder的callDecode方法
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
try {
while (in.isReadable()) {
int outSize = out.size();
if (outSize > 0) {
fireChannelRead(ctx, out, outSize);
out.clear();
// Check if this handler was removed before continuing with decoding.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See:
// - https://github.com/netty/netty/issues/4635
if (ctx.isRemoved()) {
break;
}
outSize = 0;
}
int oldInputLength = in.readableBytes();
decodeRemovalReentryProtection(ctx, in, out);
// Check if this handler was removed before continuing the loop.
// If it was removed, it is not safe to continue to operate on the buffer.
//
// See https://github.com/netty/netty/issues/1664
if (ctx.isRemoved()) {
break;
}
if (outSize == out.size()) {
if (oldInputLength == in.readableBytes()) {
break;
} else {
continue;
}
}
if (oldInputLength == in.readableBytes()) {
throw new DecoderException(
StringUtil.simpleClassName(getClass()) +
".decode() did not read anything but decoded a message.");
}
if (isSingleDecode()) {
break;
}
}
} catch (DecoderException e) {
throw e;
} catch (Throwable cause) {
throw new DecoderException(cause);
}
}
final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
decodeState = STATE_CALLING_CHILD_DECODE;
try {
decode(ctx, in, out);
} finally {
boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
decodeState = STATE_INIT;
if (removePending) {
handlerRemoved(ctx);
}
}
}
- 其主要是循环调用decode方法,循环的条件是还有数据可读
- 子类更新List<Object>数据,每次读到数据则发起一次ChannelRead事件(fireChannelRead)
根据以上代码,如果不想调用多次ChannelRead事件,则需要在解码的时候,一次将数据解码完成,如下示例
public class ToIntegerDecoder extends ByteToMessageDecoder {
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
StringBuilder sb=new StringBuilder();
// 检查是否至少有4字节可读(一个int的字节长度)
while (in.readableBytes() >= 4) {
sb.append(String.valueOf(in.readInt()));
}
out.add(sb.toString());
}
}
输出结果
Server received: 131527179620321695831667986209