主要是还是BaseHanlderAdapter,通过解码判断出是否是http协议,如果是在WebSocketServerHandler 判断是否是websocket 如下
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 传统http接入 第一次需要使用http建立握手
if (msg instanceof FullHttpRequest) {
handlerHttpRequest(ctx, (FullHttpRequest) msg);
ctx.channel().write(new TextWebSocketFrame("服务端:客户端" + ctx.channel().localAddress() + "连接成功"));
}
// WebSocket接入
else if (msg instanceof WebSocketFrame) {
handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
/**
* @author anyly
*/
public class MultiProtocolChannelInitializer extends io.netty.channel.ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler("DEBUG"));
// 将请求与应答消息编码或者解码为HTTP消息
ch.pipeline().addLast(new BaseHanlderAdapter());
}
}
/**
* 通过解码,动态选择某个协议
*
* @author anyly
*/
public class BaseHanlderAdapter extends ChannelInboundHandlerAdapter {
private static final Logger log = LogManager.getLogger(BaseHanlderAdapter.class);
private final int maxLength = 4096;
private final boolean failFast = false;
private boolean discarding;
private int discardedBytes;
/**
* Last scan position.
*/
private int offset;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.info("channelRead:begin");
if (msg instanceof ByteBuf) {
ByteBuf byteBuf = Unpooled.wrappedBuffer((ByteBuf) msg);
log.info("请求报文:\r {}", byteBuf.toString(Charset.defaultCharset()));
boolean isHttp = false;
Object obj = null;
try {
obj = decode(ctx, byteBuf);
} catch (Exception e) {
e.printStackTrace();
}
if (obj instanceof ByteBuf) {
ByteBuf b = (ByteBuf) obj;
String str = b.toString(Charset.defaultCharset());
String[] arr = str.split("\\s+");
String lastStr = arr[arr.length - 1];
if (lastStr.contains("HTTP")) {
isHttp = true;
}
log.info("lineParse decode:{}", str);
}
log.info("{}Http协议", isHttp ? "是" : "否");
if (isHttp) {
ctx.pipeline().addLast("http-codec", new HttpServerCodec());
// 将http消息的多个部分组合成一条完整的HTTP消息
ctx.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
// 向客户端发送HTML5文件。主要用于支持浏览器和服务端进行WebSocket通信
ctx.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
ctx.pipeline().addLast(new WebSocketServerHandler());
} else {
ctx.pipeline().addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
ctx.pipeline().addLast(new StringDecoder());
ctx.pipeline().addLast(new SocketHanlder<String>());
}
ctx.pipeline().remove(this);
} else {
log.info("msg not instanceof ByteBuf");
}
super.channelRead(ctx, msg);
}
private void fail(final ChannelHandlerContext ctx, int length) {
fail(ctx, String.valueOf(length));
}
private void fail(final ChannelHandlerContext ctx, String length) {
ctx.fireExceptionCaught(
new TooLongFrameException(
"frame length (" + length + ") exceeds the allowed maximum (" + maxLength + ')'));
}
private int findEndOfLine(final ByteBuf buffer) {
int totalLength = buffer.readableBytes();
int i = buffer.forEachByte(buffer.readerIndex() + offset, totalLength - offset, ByteProcessor.FIND_LF);
if (i >= 0) {
offset = 0;
if (i > 0 && buffer.getByte(i - 1) == '\r') {
i--;
}
} else {
offset = totalLength;
}
return i;
}
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) {
final int eol = findEndOfLine(buffer);
if (!discarding) {
if (eol >= 0) {
final int length = eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1;
if (length > maxLength) {
buffer.readerIndex(eol + delimLength);
fail(ctx, length);
return null;
}
return buffer.readRetainedSlice(length + delimLength);
} else {
final int length = buffer.readableBytes();
if (length > maxLength) {
discardedBytes = length;
buffer.readerIndex(buffer.writerIndex());
discarding = true;
offset = 0;
if (failFast) {
fail(ctx, "over " + discardedBytes);
}
}
return null;
}
} else {
if (eol >= 0) {
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r' ? 2 : 1;
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
discarding = false;
if (!failFast) {
fail(ctx, length);
}
} else {
discardedBytes += buffer.readableBytes();
buffer.readerIndex(buffer.writerIndex());
// We skip everything in the buffer, we need to set the offset to 0 again.
offset = 0;
}
return null;
}
}
/**
* 异常
*
* @param channelHandlerContext channelHandlerContext
* @param cause 异常
*/
@Override
public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable cause) {
log.info("异常:{},{}", channelHandlerContext.channel().remoteAddress(), cause.getMessage());
channelHandlerContext.close();
}
/**
* 当客户端主动链接服务端的链接后,调用此方法
*
* @param channelHandlerContext ChannelHandlerContext
*/
@Override
public void channelActive(ChannelHandlerContext channelHandlerContext) {
log.info("客户端已连接:{}", channelHandlerContext.channel().remoteAddress());
channelHandlerContext.channel().writeAndFlush("connection reply");
}
/**
* 与客户端断开连接时
*
* @param channelHandlerContext channelHandlerContext
*/
@Override
public void channelInactive(ChannelHandlerContext channelHandlerContext) {
log.info("连接断开:{}", channelHandlerContext.channel().remoteAddress());
}
/**
* 读完之后调用的方法
*
* @param channelHandlerContext ChannelHandlerContext
*/
@Override
public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
channelHandlerContext.flush();
}
可以自己使用火狐的websocket插件测试+自定义的tcp协议测试