业务场景:跟设备通信,不同的厂家有不同的协议,这时后台服务要兼容,比如说,设备A使用的是String字符串,设备B使用的是byte[],这时候该怎么处理呢,使用自定义解码器,去识别是String,还是byte[],然后转发给相应的业务handler处理
public class server {
public static void main(String[] args) throws Exception {
//1 用于接受客户端连接的线程工作组
EventLoopGroup boss = new NioEventLoopGroup();
//ONE:
//2 用于对接受客户端连接读写操作的线程工作组
EventLoopGroup work = new NioEventLoopGroup();
//TWO:
//3 辅助类。用于帮助我们创建NETTY服务
ServerBootstrap b = new ServerBootstrap();
b.group(boss, work) //绑定两个工作线程组
.channel(NioServerSocketChannel.class) //设置NIO的模式
.option(ChannelOption.SO_BACKLOG, 1024*2) //设置TCP缓冲区
//.option(ChannelOption.SO_SNDBUF, 32*1024) // 设置发送数据的缓存大小
.option(ChannelOption.SO_RCVBUF, 32 * 1024*2*2)
.childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) // 设置保持连接
.childOption(ChannelOption.SO_SNDBUF, 32 * 1024)
// 初始化绑定服务通道
.childHandler(new initHandler());
//b.option("receiveBufferSizePredictorFactory", new FixedReceiveBufferSizePredictorFactory(65535));
ChannelFuture cf = b.bind(8765).sync();
//释放连接
cf.channel().closeFuture().sync();
work.shutdownGracefully();
boss.shutdownGracefully();
}
}
ChannelInitializer
public class initHandler extends ChannelInitializer<SocketChannel> {
final AcceptorIdleStateTrigger idleStateTrigger=new AcceptorIdleStateTrigger();
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringEncoder());
pipeline.addLast(new ByteArrayEncoder());
//自定义StringDecoder,其余的都是用的netty提供的
pipeline.addLast(new StringDecoder1());
pipeline.addLast(new byteArrayDecoder1());
pipeline.addLast(new ServerHandler());
pipeline.addLast(new ServerHandler2());
}
}
自定义StringDecoder
public class StringDecoder1 extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(StringDecoder1.class);
final int length = 2048;
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
try {
in.retain();
byte[] headArray = new byte[3];
in.readBytes(headArray);
String head = new String(headArray);
// 把读取的起始位置重置
in.resetReaderIndex();
if (TcnConstant.CMD_HEADER.equals(head)) {
int strBeginIndex = in.readerIndex();
int readableBytes = in.readableBytes();
byte[] tailArray = new byte[3];
//数据末尾
in.getBytes(readableBytes-3,tailArray);
String tail = new String(tailArray);
in.resetReaderIndex();
//没接收完
if (!TcnConstant.CMD_TAIL.equals(tail)) {
logger.info("可读字节数readableBytes is {}",readableBytes);
in.readerIndex(strBeginIndex);
return;
}
ByteBufToBytes reader = new ByteBufToBytes();
String msg = new String(reader.read(in));
//in.retain(1);
list.add(msg);d(in));
list.add(msg);
} else {
channelHandlerContext.fireChannelRead(in);
}
}catch (Exception e){
logger.info("=================");
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.err.println("--------数据读异常----------: ");
cause.printStackTrace();
ctx.close();
}
}
自定义byte解码器1
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.bangmart.android.util.DataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by zhangkai on 2018/7/20.
*/
public class byteArrayDecoder1 extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(byteArrayDecoder1.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
try {
in.retain();
byte[] headArray = new byte[2];
in.readBytes(headArray);
in.resetReaderIndex();
FactoryEnum fe = FactoryUtil.indentify(headArray,channelHandlerContext.channel());
if(null!=fe && FactoryEnum.XY.getName().equals(fe.getName())) {
int readableBytes = in.readableBytes();
int beginIndex = in.readerIndex();
boolean heart = false;
ByteBufToBytes reader = new ByteBufToBytes();
byte[] data = reader.read(in);
String hexStr = DataUtil.ByteArrToHexString(data);
String[] hexArray = hexStr.split(" ");
if (hexArray.length < XyConstant.headLength) {
if (hexArray.length == XyConstant.HEARTBEAT_MSG_FIELD.HEARTBEAT_MSG_LENGTH && hexArray[0].equalsIgnoreCase(XyConstant.heartHead)) {
//是心跳
heart = true;
} else {
return;
}
}
if (!heart) {
int dataLength = DataUtil.HexToInt(hexArray[XyConstant.msgLengthIndex]);
if (readableBytes < dataLength) {
logger.info("可读字节数小于数据长度");
in.readerIndex(beginIndex);
return;
}
//粘包
if (readableBytes > dataLength) {
data = Arrays.copyOf(data, dataLength);
}
}
/**
* 1.每一个bytebuf都有一个计数器,每次调用计数器减1,当计数器为0时则不可用。
* 2.如果当前bytebuf中数据包含多条消息,本条信息会通过list返回被继续封装成一个新的byte[]返回下一个hander处理
* 3.retain方法是将当前的bytebuf计数器加1
* 4.如果不这样做,会报异常 io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
* */
list.add(data);}
else {
//这里是为了让各自的消息一定能到各自的decoder中处理,不然会发生多次读半包异常
//比如说,如果一个消息因为网络的原因,netty需要多次(大于2)读取才能读完,那么就一定需要确保各自的消息
//在各自的decoder中才能正确读取到。
//第一次和第二次读取时,都是拼接好的全部数据,当第三次读取时,就是单个数据
if(null == fe){
if(null == fe){
byte[] msg = new byte[in.readableBytes()];
in.readBytes(msg);
logger.error("解析消息失败,未识别消息所属厂家{}",DataUtil.bytesToHexString(msg));
return;
}
channelHandlerContext.fireChannelRead(in);
}
}catch (Exception e){
logger.info("decoder异常================={}",e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.err.println("--------数据读异常----------: ");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
System.err.println("--------数据读取完毕----------");
}
}
自定义byte解码器2
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import org.bangmart.android.util.DataUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Created by zhangkai on 2018/7/20.
*/
public class byteArrayDecoder2 extends ByteToMessageDecoder {
private static final Logger logger = LoggerFactory.getLogger(byteArrayDecoder1.class);
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf in, List<Object> list) throws Exception {
try {
ByteBufToBytes reader = new ByteBufToBytes();
in.retain();
int readableBytes = in.readableBytes();
int beginIndex = in.readerIndex();
byte[] data = reader.read(in);
int frameLenHigh=data[2]& FactoryConstant.BYTE_MASK;
int frameLenLow=data[3]&FactoryConstant.BYTE_MASK;
int frameLen=frameLenHigh* (FactoryConstant. HEXADECIMAL* FactoryConstant.HEXADECIMAL)+frameLenLow;
int dataLength = frameLen;
if (readableBytes < dataLength) {
logger.info("可读字节数{}===小于===数据长度{}",readableBytes,dataLength);
// in.resetReaderIndex();
in.readerIndex(beginIndex);
return;
}
//粘包
if (readableBytes > dataLength) {
logger.info("可读字节数{}===大于===数据长度{}",readableBytes,dataLength);
data = Arrays.copyOf(data, dataLength);
}
/**
* 1.每一个bytebuf都有一个计数器,每次调用计数器减1,当计数器为0时则不可用。
* 2.如果当前bytebuf中数据包含多条消息,本条信息会通过list返回被继续封装成一个新的byte[]返回下一个hander处理
* 3.retain方法是将当前的bytebuf计数器加1
* 4.如果不这样做,会报异常 io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
* */
list.add(data);
}catch (Exception e){
logger.info("decoder异常================={}",e);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.err.println("--------数据读异常----------: ");
cause.printStackTrace();
ctx.close();
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
System.err.println("--------数据读取完毕----------");
}
}
业务处理Handler
/**
* Created by zhangkai on 2018/6/11.
*/
public class ServerHandler extends ChannelInboundHandlerAdapter{
/**
* 当我们通道进行激活的时候 触发的监听方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("--------通道激活------------");
}
/**
* 当我们的通道里有数据进行读取的时候 触发的监听方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx /*NETTY服务上下文*/, Object msg /*实际的传输数据*/) throws Exception {
if(msg instanceof String) {
System.out.println("----------XXX-----"+(String) msg);
ctx.writeAndFlush("我是XXX服务端");
}else {
ctx.fireChannelRead(msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.err.println("--------数据读取完毕----------");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.err.println("--------数据读异常----------: ");
cause.printStackTrace();
ctx.close();
}
}
/**
* Created by zhangkai on 2018/6/11.
*/
public class ServerHandler2 extends ChannelInboundHandlerAdapter{
/**
* 当我们通道进行激活的时候 触发的监听方法
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.err.println("--------通道激活------------");
}
/**
* 当我们的通道里有数据进行读取的时候 触发的监听方法
*/
@Override
public void channelRead(ChannelHandlerContext ctx /*NETTY服务上下文*/, Object msg /*实际的传输数据*/) throws Exception {
if(msg instanceof byte[]){
String hexStr = DataUtil.ByteArrToHexString((byte[]) msg);
System.out.println("----XXX------"+hexStr);
ctx.writeAndFlush(DataUtil.hexStringToBytes("1D011E"));
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.err.println("--------数据读取完毕----------");
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
System.err.println("--------数据读异常----------: ");
cause.printStackTrace();
ctx.close();
}
}
Client测试,模拟两个发送不同协议数据的客户端
/**
* Created by zhangkai on 2018/6/11.
*/
public class client {
public static void main(String[] args) throws Exception {
//ONE:
//1 线程工作组
EventLoopGroup work = new NioEventLoopGroup();
//TWO:
//3 辅助类。用于帮助我们创建NETTY服务
Bootstrap b = new Bootstrap();
b.group(work) //绑定工作线程组
.channel(NioSocketChannel.class) //设置NIO的模式
// 初始化绑定服务通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 为通道进行初始化: 数据传输过来的时候会进行拦截和执行
sc.pipeline().addLast( new StringEncoder());
sc.pipeline().addLast( new StringDecoder());
sc.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture cf = b.connect("0.0.0.0", 8765).syncUninterruptibly();
// String str = "###3000${\"Mid\":\"1711290001\",“TimeSp”: “1511936017”}$AABBCCDD&&&";
String str = "###5000${\"Mid\":\"7105000011\",\"MaxSlot\":\"60\",\"SlotInfo\":[{\"SlotNum\":\"1\",\"Status\":\"1\"},{\"SlotNum\":\"2\",\"Status\":\"1\"},\n" +
"{\"SlotNum\":\"3\",\"Status\":\"1\"},{\"SlotNum\":\"4\",\"Status\":\"1\"},{\"SlotNum\":\"5\",\"Status\":\"1\"},{\"SlotNum\":\"11\",\"Status\":\"1\"},{\"SlotNum\":\"12\",\"Status\":\"1\"},{\"SlotNum\":\"13\",\"Status\":\"1\"},{\"SlotNum\":\"14\",\"Status\":\"1\"},{\"SlotNum\":\"15\",\"Status\":\"1\"},{\"SlotNum\":\"21\",\"Status\":\"1\"},{\"SlotNum\":\"22\",\"Status\":\"1\"},{\"SlotNum\":\"23\",\"Status\":\"1\"},{\"SlotNum\":\"24\",\"Status\":\"1\"},{\"SlotNum\":\"25\",\"Status\":\"1\"},{\"SlotNum\":\"26\",\"Status\":\"1\"},{\"SlotNum\":\"27\",\"Status\":\"1\"},{\"SlotNum\":\"28\",\"Status\":\"1\"},{\"SlotNum\":\"29\",\"Status\":\"1\"},{\"SlotNum\":\"30\",\"Status\":\"1\"},{\"SlotNum\":\"31\",\"Status\":\"1\"},{\"SlotNum\":\"32\",\"Status\":\"1\"},{\"SlotNum\":\"33\",\"Status\":\"1\"},{\"SlotNum\":\"34\",\"Status\":\"1\"},{\"SlotNum\":\"35\",\"Status\":\"1\"},{\"SlotNum\":\"36\",\"Status\":\"1\"},{\"SlotNum\":\"37\",\"Status\":\"1\"},{\"SlotNum\":\"38\",\"Status\":\"1\"},{\"SlotNum\":\"39\",\"Status\":\"1\"},{\"SlotNum\":\"40\",\"Status\":\"1\"},{\"SlotNum\":\"41\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"43\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},\n" +
"{\"SlotNum\":\"3\",\"Status\":\"1\"},{\"SlotNum\":\"4\",\"Status\":\"1\"},{\"SlotNum\":\"5\",\"Status\":\"1\"},{\"SlotNum\":\"11\",\"Status\":\"1\"},{\"SlotNum\":\"12\",\"Status\":\"1\"},{\"SlotNum\":\"13\",\"Status\":\"1\"},{\"SlotNum\":\"14\",\"Status\":\"1\"},{\"SlotNum\":\"15\",\"Status\":\"1\"},{\"SlotNum\":\"21\",\"Status\":\"1\"},{\"SlotNum\":\"22\",\"Status\":\"1\"},{\"SlotNum\":\"23\",\"Status\":\"1\"},{\"SlotNum\":\"24\",\"Status\":\"1\"},{\"SlotNum\":\"25\",\"Status\":\"1\"},{\"SlotNum\":\"26\",\"Status\":\"1\"},{\"SlotNum\":\"27\",\"Status\":\"1\"},{\"SlotNum\":\"28\",\"Status\":\"1\"},{\"SlotNum\":\"29\",\"Status\":\"1\"},{\"SlotNum\":\"30\",\"Status\":\"1\"},{\"SlotNum\":\"31\",\"Status\":\"1\"},{\"SlotNum\":\"32\",\"Status\":\"1\"},{\"SlotNum\":\"33\",\"Status\":\"1\"},{\"SlotNum\":\"34\",\"Status\":\"1\"},{\"SlotNum\":\"35\",\"Status\":\"1\"},{\"SlotNum\":\"36\",\"Status\":\"1\"},{\"SlotNum\":\"37\",\"Status\":\"1\"},{\"SlotNum\":\"38\",\"Status\":\"1\"},{\"SlotNum\":\"39\",\"Status\":\"1\"},{\"SlotNum\":\"40\",\"Status\":\"1\"},{\"SlotNum\":\"41\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"43\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},\n" +
"{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"},{\"SlotNum\":\"42\",\"Status\":\"1\"}]}$AABBCCDD&&&";
System.out.println(str.length());
cf.channel().writeAndFlush(str);
//释放连接
cf.channel().closeFuture().sync();
work.shutdownGracefully();
}
}
/**
* Created by zhangkai on 2018/6/11.
*/
public class client2 {
public static void main(String[] args) throws Exception {
//ONE:
//1 线程工作组
EventLoopGroup work = new NioEventLoopGroup();
//TWO:
//3 辅助类。用于帮助我们创建NETTY服务
Bootstrap b = new Bootstrap();
b.group(work) //绑定工作线程组
.channel(NioSocketChannel.class) //设置NIO的模式
// 初始化绑定服务通道
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast( new ByteArrayEncoder());
sc.pipeline().addLast(new ByteArrayDecoder());
sc.pipeline().addLast(new ClientHandler2());
}
});
ChannelFuture cf = b.connect("0.0.0.0", 8765).syncUninterruptibly();
// 0xFF 0x01 OX00
byte[] bytes = DataUtil.hexStringToBytes("FF0100");
cf.channel().writeAndFlush(bytes);
//释放连接
cf.channel().closeFuture().sync();
work.shutdownGracefully();
}
}
用到的工具类
/**
*数据转换工具
*/
public class DataUtil {
//-------------------------------------------------------
// 判断奇数或偶数,位运算,最后一位是1则为奇数,为0是偶数
static public int isOdd(int num)
{
return num & 0x1;
}
//-------------------------------------------------------
static public int HexToInt(String inHex)//Hex字符串转int
{
return Integer.parseInt(inHex, 16);
}
//-------------------------------------------------------
static public byte HexToByte(String inHex)//Hex字符串转byte
{
return (byte)Integer.parseInt(inHex,16);
}
//-------------------------------------------------------
static public String Byte2Hex(Byte inByte)//1字节转2个Hex字符
{
return String.format("%02x", inByte).toUpperCase();
}
//-------------------------------------------------------
static public String ByteArrToHexString(byte[] inBytArr)//字节数组转hex字符串
{
StringBuilder strBuilder=new StringBuilder();
int j=inBytArr.length;
for (int i = 0; i < j; i++)
{
strBuilder.append(Byte2Hex(inBytArr[i]));
strBuilder.append(" ");
}
return strBuilder.toString();
}
//-------------------------------------------------------
static public String ByteArrToHexString(byte[] inBytArr, int offset, int byteCount)//字节数组转转hex字符串,可选长度
{
StringBuilder strBuilder=new StringBuilder();
int j=byteCount;
for (int i = offset; i < j; i++)
{
strBuilder.append(Byte2Hex(inBytArr[i]));
strBuilder.append(" ");
}
return strBuilder.toString();
}
//-------------------------------------------------------
//hex字符串转字节数组
static public byte[] HexToByteArr(String inHex)//hex字符串转字节数组
{
int hexlen = inHex.length();
byte[] result;
if (isOdd(hexlen)==1)
{//奇数
hexlen++;
result = new byte[(hexlen/2)];
inHex="0"+inHex;
}else {//偶数
result = new byte[(hexlen/2)];
}
int j=0;
for (int i = 0; i < hexlen; i+=2)
{
result[j]=HexToByte(inHex.substring(i,i+2));
j++;
}
return result;
}
/**
* 获取无空格的hexString
* @param str
* @return
*/
static public String getFomattedHexString(String str){
StringBuilder sb = new StringBuilder();
String[] strArr = str.split(" ");
int len = strArr.length;
for (int i = 0; i < len; i++) {
sb.append(strArr[i]);
}
return sb.toString();
}
/**********************************************************************/
/* Convert byte[] to hex string.这里我们可以将byte转换成int,然后利用Integer.toHexString(int)来转换成16进制字符串。
* @param src byte[] data
* @return hex string
*/
public static String bytesToHexString(byte[] src){
StringBuilder stringBuilder = new StringBuilder("");
if (src == null || src.length <= 0) {
return null;
}
for (int i = 0; i < src.length; i++) {
int v = src[i] & 0xFF;
String hv = Integer.toHexString(v);
if (hv.length() < 2) {
stringBuilder.append(0);
}
stringBuilder.append(hv);
}
return stringBuilder.toString();
}
/**
* Convert hex string to byte[]
* @param hexString the hex string
* @return byte[]
*/
public static byte[] hexStringToBytes(String hexString) {
if (hexString == null || hexString.equals("")) {
return null;
}
hexString = hexString.toUpperCase();
int length = hexString.length() / 2;
char[] hexChars = hexString.toCharArray();
byte[] d = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
d[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return d;
}
/**
* Convert char to byte
* @param c char
* @return byte
*/
private static byte charToByte(char c) {
return (byte) "0123456789ABCDEF".indexOf(c);
}
public static String getCheckXOR(String data) {
byte[] bytes = DataUtil.HexToByteArr(data);
byte bcc = caluBCC(bytes, 0, bytes.length);
return Byte2Hex(bcc);
}
/*****************************数据的bcc校验**************************/
public static byte caluBCC(byte[] pByte, int start, int length) {
if(pByte == null || pByte.length <= 0 || length <= 0 || start < 0){
return -1;
}
byte checkSum = 0;
for (int i = start; i < start+length; i++) {
checkSum ^= pByte[i];
}
return checkSum;
}
/**
* int型转化为byte数组
* @param i
* @return
*/
public static byte[] intToByteArray1(int i) {
byte[] result = new byte[4];
result[0] = (byte)((i >> 24) & 0xFF);
result[1] = (byte)((i >> 16) & 0xFF);
result[2] = (byte)((i >> 8) & 0xFF);
result[3] = (byte)(i & 0xFF);
return result;
}
}
下面是整个过程中的数据流向
注:在服务端检测到客户端断线后,服务端主动关闭连接,这时候会报这个错误,
io.netty.util.IllegalReferenceCountException: refCnt: 0, decrement: 1
at io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:101) ~[netty-all-4.0.28.Final.jar:4.0.28.Final];
解决办法:
1、添加ByteBuf.retain();生产上handler继承的是SimpleChannelInboundHandler,解决办法是下面这样处理