最近由于一个项目需要和单片机通信,和硬件工程师沟通好之后,大致确定协议为 :
消息头部 + 消息长度 + 设备号 + 命令 + data + crc16
由于netty自带的decoder有些不满足这个格式,所以自定义了一个decoder。
代码如下
/**
* 消息格式为 消息头部(1字节) + 消息长度(2字节) + 设备号(12字节) + 命令(2字节) + data(n字节) + crc16(2字节)
*
* @author watermelon
* @time 2020/5/21
*/
public class SmartHomeDecoder extends ByteToMessageDecoder implements SmartHomeCodeC {
private final Logger LOG = LoggerFactory.getLogger(SmartHomeDecoder.class);
/**
* ByteBuf 超过这个值之后,会清除已读区域
* 默认不清除
*/
private int clearReadMaxLength;
/**
* 默认构造器,ByteBuf 可能会无限扩容
* ByteBuf 超过1024之后,会清除已读区域
*/
public SmartHomeDecoder() {
this(0);
}
/**
*
* ByteBuf 超过 clearReadMaxLength 之后,会清除已读区域
*
* @param clearReadMaxLength
*/
public SmartHomeDecoder(int clearReadMaxLength) {
this(clearReadMaxLength);
}
private SmartHomeDecoder(int clearReadMaxLength) {
this.clearReadMaxLength = clearReadMaxLength;
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = this.decode(ctx, in);
if (decoded != null) {
out.add(decoded);
}
}
/**
* 解码消息
*
* @param ctx
* @param in
* @throws Exception
*/
protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
clearRead(in);
//消息小于接收的最小长度
if (in.readableBytes() < MSG_MIN_LENGTH) {
return null;
}
//记录消息头部位置
int beginIndex;
while (true) {
//获取消息头部位置
beginIndex = in.readerIndex();
//读到消息头部的时候,跳出循环
byte b = in.readByte();
if (b == HEADER) {
break;
}
//如果读完了所有的数据 都没有获取到 消息头部 则判定所有消息为无效消息,直接放弃掉
if (in.readableBytes() == 0) {
return null;
}
}
//消息长度
if (in.readableBytes() < MSG_LENGTH_LENGTH) {
//消息长度不够,还原readerIndex到消息头部的位置
in.readerIndex(beginIndex);
return null;
}
//获取 消息长度
//长度 共两个字节 所以将第一个左移8位
int length1 = in.readByte();
length1 = length1 << 8;
int length2 = in.readByte();
//最终的长度
length1 = length1 + length2;
//判断数据包是否完整
if (in.readableBytes() < length1) {
//还原readerIndex到消息头部的位置
in.readerIndex(beginIndex);
return null;
}
//读取数据
byte[] data = new byte[length1];
in.readBytes(data);
//所有的数据
byte[] data1 = ConvertUtil.byteSplit(data, 0, data.length - CSC2_LENGTH);
//获取数据对应的 crc2 校验码
byte[] crc= ConvertUtil.crc(data1);
//获取传过来来的校验码
byte[] crc2 = ConvertUtil.byteSplit(data, data.length - CSC2_LENGTH, CSC2_LENGTH);
//比较,如果校验不通过,就忽略这次消息
if (!Arrays.equals(crc, crc2)) {
LOG.debug("crc2校验不通过");
return null;
}
//将得到的data 根据约定 转换为实体
return new MessagePush().setReceiveEntity(new MessageDistributor(data1).distribute());
}
/**
* 清除 0 - readIndex 的数据,以免 ByteBuf 过大
* 如果每一次消息最后,都带有一段解析不了的脏消息,或者有一段小于{@link #MSG_MIN_LENGTH} 的消息,这样每次都会有未读完的消息, 就可能导致 ByteBuf 无限扩容
*
* @param in
*/
private void clearRead(ByteBuf in) {
if (clearReadMaxLength > 0 && in.writerIndex() > clearReadMaxLength) {
LOG.debug("byteBuf中留存的数据太大,自动清除已读数据");
in.discardReadBytes();
}
}
}
整个解码主要是检索头部,然后根据头部之后的长度去读取消息。
这里主要是处理了一下当出现了粘包问题,消息不完整的时候,指针要回到头部,等待下一次读取完整消息。
以及合理的设置一个 clearReadMaxLength ,当缓冲区过大时,清除掉已经读取的数据。