基于Netty实现自定义消息通信协议(协议设计及解析应用实战)

所谓的协议,是由语法、语义、时序这三个要素组成的一种规范,通信双方按照该协议规范来实现网络数据传输,这样通信双方才能实现数据正常通信和解析。

由于不同的中间件在功能方面有一定差异,所以其实应该是没有一种标准化协议来满足不同差异化需求,因此很多中间件都会定义自己的通信协议,另外通信协议可以解决粘包和拆包问题。

在本篇文章中,我们来实现一个自定义消息协议。

自定义协议的要素

自定义协议,那这个协议必须要有组成的元素,

  • 魔数: 用来判断数据包的有效性
  • 版本号: 可以支持协议升级
  • 序列化算法: 消息正文采用什么样的序列化和反序列化方式,比如json、protobuf、hessian等
  • 指令类型:也就是当前发送的是一个什么类型的消息,像zookeeper中,它传递了一个Type
  • 请求序号: 基于双工协议,提供异步能力,也就是收到的异步消息需要找到前面的通信请求进行响应处理
  • 消息长度
  • 消息正文

协议定义

sessionId | reqType | Content-Length | Content |

其中Version,Content-Length,SessionId就是Header信息,Content就是交互的主体。

定义项目结构以及引入包

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
</dependency>
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
</dependency>

项目结构如图4-1所示:

  • netty-message-mic : 表示协议模块。
  • netty-message-server :表示nettyserver。

<center>图4-1</center>

  • 引入log4j.properties

在nettyMessage-mic中,包的结构如下。

image-20210831103346370

定义Header

表示消息头

@Data
public class Header{
    private long sessionId; //会话id  : 占8个字节
    private byte type; //消息类型: 占1个字节

    private int length;     //消息长度 : 占4个字节
}

定义MessageRecord

表示消息体

@Data
public class MessageRecord{

    private Header header;
    private Object body;
}

OpCode

定义操作类型

public enum OpCode {

    BUSI_REQ((byte)0),
    BUSI_RESP((byte)1),
    PING((byte)3),
    PONG((byte)4);

    private byte code;

    private OpCode(byte code) {
        this.code=code;
    }

    public byte code(){
        return this.code;
    }
}

定义编解码器

分别定义对该消息协议的编解码器

MessageRecordEncoder

@Slf4j
public class MessageRecordEncoder extends MessageToByteEncoder<MessageRecord> {

    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord record, ByteBuf byteBuf) throws Exception {
        log.info("===========开始编码Header部分===========");
        Header header=record.getHeader();
        byteBuf.writeLong(header.getSessionId()); //保存8个字节的sessionId
        byteBuf.writeByte(header.getType());  //写入1个字节的请求类型

        log.info("===========开始编码Body部分===========");
        Object body=record.getBody();
        if(body!=null){
            ByteArrayOutputStream bos=new ByteArrayOutputStream();
            ObjectOutputStream oos=new ObjectOutputStream(bos);
            oos.writeObject(body);
            byte[] bytes=bos.toByteArray();
            byteBuf.writeInt(bytes.length); //写入消息体长度:占4个字节
            byteBuf.writeBytes(bytes); //写入消息体内容
        }else{
            byteBuf.writeInt(0); //写入消息长度占4个字节,长度为0
        }
    }
}

MessageRecordDecode

@Slf4j
public class MessageRecordDecode extends ByteToMessageDecoder {

    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        MessageRecord record=new MessageRecord();
        Header header=new Header();
        header.setSessionId(byteBuf.readLong());  //读取8个字节的sessionid
        header.setType(byteBuf.readByte()); //读取一个字节的操作类型
        record.setHeader(header);
        //如果byteBuf剩下的长度还有大于4个字节,说明body不为空
        if(byteBuf.readableBytes()>4){
            int length=byteBuf.readInt(); //读取四个字节的长度
            header.setLength(length);
            byte[] contents=new byte[length];
            byteBuf.readBytes(contents,0,length);
            ByteArrayInputStream bis=new ByteArrayInputStream(contents);
            ObjectInputStream ois=new ObjectInputStream(bis);
            record.setBody(ois.readObject());
            list.add(record);
            log.info("序列化出来的结果:"+record);
        }else{
            log.error("消息内容为空");
        }
    }
}

测试协议的解析和编码

EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的

public class CodesMainTest {
    public static void main( String[] args ) throws Exception {
        EmbeddedChannel channel=new EmbeddedChannel(
            new LoggingHandler(),
            new MessageRecordEncoder(),
            new MessageRecordDecode());
        Header header=new Header();
        header.setSessionId(123456);
        header.setType(OpCode.PING.code());
        MessageRecord record=new MessageRecord();
        record.setBody("Hello World");
        record.setHeader(header);
        channel.writeOutbound(record);

        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        new MessageRecordEncoder().encode(null,record,buf);
        channel.writeInbound(buf);
    }
}

编码包分析

运行上述代码后,会得到下面的一个信息

         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 01 e2 40 03 00 00 00 12 ac ed 00 |.......@........|
|00000010| 05 74 00 0b 48 65 6c 6c 6f 20 57 6f 72 6c 64    |.t..Hello World |
+--------+-------------------------------------------------+----------------+

按照协议规范:

  • 前面8个字节表示sessionId
  • 一个字节表示请求类型
  • 4个字节表示长度
  • 后面部分内容表示消息体

测试粘包和半包问题

通过slice方法进行拆分,得到两个包。

ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分。

public class CodesMainTest {
    public static void main( String[] args ) throws Exception {
        //EmbeddedChannel是netty专门针对ChannelHandler的单元测试而提供的类。可以通过这个类来测试channel输入入站和出站的实现
        EmbeddedChannel channel=new EmbeddedChannel(
                //解决粘包和半包问题
//                new LengthFieldBasedFrameDecoder(2048,10,4,0,0),
                new LoggingHandler(),
                new MessageRecordEncoder(),
                new MessageRecordDecode());
        Header header=new Header();
        header.setSessionId(123456);
        header.setType(OpCode.PING.code());
        MessageRecord record=new MessageRecord();
        record.setBody("Hello World");
        record.setHeader(header);
        channel.writeOutbound(record);

        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        new MessageRecordEncoder().encode(null,record,buf);

       //*********模拟半包和粘包问题************//
        //把一个包通过slice拆分成两个部分
        ByteBuf bb1=buf.slice(0,7); //获取前面7个字节
        ByteBuf bb2=buf.slice(7,buf.readableBytes()-7); //获取后面的字节
        bb1.retain();

        channel.writeInbound(bb1);
        channel.writeInbound(bb2);
    }
}

运行上述代码会得到如下异常, readerIndex(0) +length(8)表示要读取8个字节,但是只收到7个字节,所以直接报错。

2021-08-31 15:53:01,385 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B
         +-------------------------------------------------+
         |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 01 e2                            |.......         |
+--------+-------------------------------------------------+----------------+
2021-08-31 15:53:01,397 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
Exception in thread "main" io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(7): UnpooledSlicedByteBuf(ridx: 0, widx: 7, cap: 7/7, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 256))

解决拆包问题

LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。

首先来说明一下该解码器的核心参数

  • lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置
  • lengthFieldLength,长度字段锁占用的字节数
  • lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值
  • initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
  • lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength
public class CodesMainTest {
    public static void main( String[] args ) throws Exception {
        EmbeddedChannel channel=new EmbeddedChannel(
                //解决粘包和半包问题
                new LengthFieldBasedFrameDecoder(1024,
                        9,4,0,0),
                new LoggingHandler(),
                new MessageRecordEncoder(),
                new MessageRecordDecode());
        Header header=new Header();
        header.setSessionId(123456);
        header.setType(OpCode.PING.code());
        MessageRecord record=new MessageRecord();
        record.setBody("Hello World");
        record.setHeader(header);
        channel.writeOutbound(record);

        ByteBuf buf= ByteBufAllocator.DEFAULT.buffer();
        new MessageRecordEncoder().encode(null,record,buf);

       //*********模拟半包和粘包问题************//
        //把一个包通过slice拆分成两个部分
        ByteBuf bb1=buf.slice(0,7);
        ByteBuf bb2=buf.slice(7,buf.readableBytes()-7);
        bb1.retain();

        channel.writeInbound(bb1);
        channel.writeInbound(bb2);
    }
}

添加一个长度解码器,就解决了拆包带来的问题。运行结果如下

2021-08-31 16:09:35,115 [com.netty.example.codec.MessageRecordDecode]-[INFO] 序列化出来的结果:MessageRecord(header=Header(sessionId=123456, type=3, length=18), body=Hello World)
2021-08-31 16:09:35,116 [io.netty.handler.logging.LoggingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE

基于自定义消息协议通信

下面我们把整个通信过程编写完整,代码结构如图4-2所示.

image-20210831175056500

<center>图4-2</center>

服务端开发

@Slf4j
public class ProtocolServer {

    public static void main(String[] args){
        EventLoopGroup boss = new NioEventLoopGroup();
        //2 用于对接受客户端连接读写操作的线程工作组
        EventLoopGroup work = new NioEventLoopGroup();
        ServerBootstrap b = new ServerBootstrap();
        b.group(boss, work) //绑定两个工作线程组
            .channel(NioServerSocketChannel.class)  //设置NIO的模式
            // 初始化绑定服务通道
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline()
                        .addLast(
                        new LengthFieldBasedFrameDecoder(1024,
                                                         9,4,0,0))
                        .addLast(new MessageRecordEncoder())
                        .addLast(new MessageRecordDecode())
                        .addLast(new ServerHandler());
                }
            });
        ChannelFuture cf= null;
        try {
            cf = b.bind(8080).sync();
            log.info("ProtocolServer start success");
            cf.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            work.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }
}

ServerHandler

@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MessageRecord messageRecord=(MessageRecord)msg;
        log.info("server receive message:"+messageRecord);
        MessageRecord res=new MessageRecord();
        Header header=new Header();
        header.setSessionId(messageRecord.getHeader().getSessionId());
        header.setType(OpCode.BUSI_RESP.code());
        String message="Server Response Message!";
        res.setBody(message);
        header.setLength(message.length());
        ctx.writeAndFlush(res);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        log.error("服务器读取数据异常");
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}

客户端开发

public class ProtocolClient {

    public static void main(String[] args) {
        //创建工作线程组
        EventLoopGroup group = new NioEventLoopGroup();
        Bootstrap b = new Bootstrap();
        b.group(group).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024,
                                                                           9,4,0,0))
                        .addLast(new MessageRecordEncoder())
                        .addLast(new MessageRecordDecode())
                        .addLast(new ClientHandler());

                }
            });
        // 发起异步连接操作
        try {
            ChannelFuture future = b.connect(new InetSocketAddress("localhost", 8080)).sync();
            Channel c = future.channel();
            for (int i = 0; i < 500; i++) {
                MessageRecord message = new MessageRecord();
                Header header = new Header();
                header.setSessionId(10001);
                header.setType((byte) OpCode.BUSI_REQ.code());
                message.setHeader(header);
                String context="我是请求数据"+i;
                header.setLength(context.length());
                message.setBody(context);
                c.writeAndFlush(message);
            }
            //closeFuture().sync()就是让当前线程(即主线程)同步等待Netty server的close事件,Netty server的channel close后,主线程才会继续往下执行。closeFuture()在channel close的时候会通知当前线程。
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            group.shutdownGracefully();
        }
    }
}

ClientHandler

@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        MessageRecord record=(MessageRecord)msg;
        log.info("Client Receive message:"+record);
        super.channelRead(ctx, msg);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
        ctx.close();
    }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容