1. 协议格式
* * * 自定义协议 数据包格式
* * * -----------------------------------
* * Length 数据包长度 Data长度 + Head长度
* * Version 协议版本号 默认0x0
* * Command 操作指令码 十六进制
* * MsgType 数据类型 0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0
* * SecretKey 密钥key 默认0x0,明文
* * Data 业务数据包
2. 自定义编码器
/**
* @author wangxx
* @data 2020/6/19
* describe 自定义编码器
* * * -----------------------------------
*/
public class CustomizeEncoder extends MessageToByteEncoder<IMMessage> {
private static final String TAG = "CustomizeEncoder";
/**
* 头部固定长度7个字节
*/
public static final int HEAD_LENGTH = 7;
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, IMMessage message, ByteBuf byteBuf) throws Exception {
Log.i(TAG, channelHandlerContext.toString() + ",pk=" + message.toString());
if (null == message) {
throw new Exception();
}
String body = message.getData();
byte[] bodyBytes = null;
//包长度
int headLength = message.getLength();
if (!TextUtils.isEmpty(body)) {
bodyBytes = body.getBytes(Charset.forName("utf-8"));
headLength = bodyBytes.length+HEAD_LENGTH;
}
byteBuf.writeShort(headLength);
//版本号
byteBuf.writeByte(message.getVersion());
//操作指令
byteBuf.writeShort(message.getCommand());
//数据类型
byteBuf.writeByte(message.getMsgType());
//密钥key
byteBuf.writeByte(message.getSecretKey());
//数据包
if (bodyBytes != null && bodyBytes.length > 0) {
byteBuf.writeBytes(bodyBytes);
}
}
}
3. 自定义解码器
public class CustomizeDecoder extends ByteToMessageDecoder {
private static final String TAG = "CustomizeDecoder";
/**
* 头部固定长度7个字节
*/
public static final int HEAD_LENGTH = 7;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> out) throws Exception {
Log.i(TAG, "decode ChannelHandlerContext=" + channelHandlerContext.toString() + " ,bytebuf=" + buffer.toString() + ",list=" + out.toString());
Log.i(TAG, "decode ChannelHandlerContext=" + convertByteBufToString(buffer));
if (buffer.readableBytes() < HEAD_LENGTH) {
Log.i(TAG, "数据不足一条");
return;
}
// 记录包头开始的index
int beginReader;
int contentLength;
IMMessage message = new IMMessage();
while (true) {
// 获取包头开始的index
beginReader = buffer.readerIndex();
// 标记包头开始的index
buffer.markReaderIndex();
// 读到了协议的开始标志,结束while循环
short headLength = buffer.readShort();
//版本号
byte version = buffer.readByte();
//操作指令
int command = buffer.readUnsignedShort();
//数据类型
byte msgType = buffer.readByte();
//密钥key
byte secretKey = buffer.readByte();
message.setLength(headLength);
message.setVersion(version);
message.setCommand(command);
message.setMsgType(msgType);
message.setSecretKey(secretKey);
//按照协议计算data的长度
contentLength = headLength - HEAD_LENGTH;
if (buffer.readableBytes() == contentLength) {
break;
}
// 未读到包头,略过一个字节
// 每次略过,一个字节,去读取,包头信息的开始标记
buffer.resetReaderIndex();
buffer.readByte();
// 当略过,一个字节之后,
// 数据包的长度,又变得不满足
// 此时,应该结束。等待后面的数据到达
if (buffer.readableBytes() < HEAD_LENGTH) {
return;
}
}
// 消息的长度
// 判断请求数据包数据是否到齐
if (buffer.readableBytes() < contentLength) {
// 还原读指针
buffer.readerIndex(beginReader);
return;
}
// 读取data数据
byte[] bytes = new byte[contentLength];
buffer.readBytes(bytes);
if (message.getCommand() == TcpAPI.MIMSocketCommandHeartBeatPong) {
message.setTimeStamp(ByteUtil.bytesToInt(bytes));
} else {
message.setData(new String(bytes, StandardCharsets.UTF_8));
}
out.add(message);
}
public String convertByteBufToString(ByteBuf buf) {
String str;
if (buf.hasArray()) { // 处理堆缓冲区
str = new String(buf.array(), buf.arrayOffset() + buf.readerIndex(), buf.readableBytes());
} else { // 处理直接缓冲区以及复合缓冲区
byte[] bytes = new byte[buf.readableBytes()];
buf.getBytes(buf.readerIndex(), bytes);
str = new String(bytes, 0, buf.readableBytes());
}
return str;
}
}
4. 协议
/**
* 包协议
*/
public class IMMessage {
/**
* 数据包长度
*/
private int Length=0x0;
/**
* 版本
*/
private byte Version = 0x0;
/**
* 命令
*/
private int Command;
/**
* 0x0:Json,0x1:ProtoBuf,0x2:Xml,默认:0x0
*/
private byte MsgType = 0x0;
/**
* 密钥key | 默认0x0,明文
*/
private byte SecretKey = 0x0;
/**
* 数据
*/
private String Data;
/**
* 服务器时间(2分钟收到服务端给的)
*/
private int timeStamp;
public int getLength() {
return Length;
}
public void setLength(int length) {
Length = length;
}
public byte getVersion() {
return Version;
}
public void setVersion(byte version) {
Version = version;
}
public int getCommand() {
return Command;
}
public void setCommand(int command) {
Command = command;
}
public byte getMsgType() {
return MsgType;
}
public void setMsgType(byte msgType) {
MsgType = msgType;
}
public byte getSecretKey() {
return SecretKey;
}
public void setSecretKey(byte secretKey) {
SecretKey = secretKey;
}
public String getData() {
return Data;
}
public void setData(String data) {
Data = data;
}
public void setData(BaseSocketBody body){
Gson gson = new Gson();
this.Data = gson.toJson(body);
}
public int getTimeStamp() {
return timeStamp;
}
public void setTimeStamp(int timeStamp) {
this.timeStamp = timeStamp;
}
@Override
public String toString() {
return "IMMessage{" +
"Length=" + Length +
", Version=" + Version +
", Command=" + Command +
", MsgType=" + MsgType +
", SecretKey=" + SecretKey +
", Data='" + Data + '\'' +
", timeStamp=" + timeStamp +
'}';
}
}