Netty学习之内置处理器以及编解码器
前言
SSL/TLS
SSL/TLS是目前广泛使用的加密,位于TCP之上,其他的应用层协议之下,当应用层将数据交给SSL/TLS之后,数据会被进行加密,关于SSL/TLS更多的内容,可以参考:SSL/TLS协议运行机制的概述、OpenSSL 与 SSL 数字证书概念贴
在javax.net.ssl
中提供了原生的SSL/TLS支持,通过SSLContext
、SSLEngine
可以方便地进行数据的加密及解密。
在Netty中,为了方便开发者使用SSL/TLS,Netty提供了SSlHandler
(本质是一个ChannelHandler),只要为其配置一个SSLEngine即可进行加密数据传输。
class SslEngineInitializer extends ChannelInitializer<Channel> {
private final SslContext context;
private final boolean startTls;
public SslEngineInitializer(SslContext context, boolean startTls) {
this.context = context;
this.startTls = startTls;
}
@Override
protected void initChannel(Channel ch) throws Exception {
SSLEngine engine = context.newEngine(ch.alloc());
ch.pipeline().addFirst("ssl", new SslHandler(engine, startTls));
}
}
HTTP/HTTPS
一个HTTP请求或者相应可能由多个部分组成,一个完整的Http请求由以下内容组成
- 一个
HttpRequest
,表示请求头部 - 一个或者多个
HttpContent
表示Http的内容 - 一个
LastHttpContent
标志Http内容的结束
由于一个Http请求包含请求部分以及相应部分,而对于客户端及服务端来说,这两者是不相同的,客户端发送请求,服务端接收请求,服务端发送响应,客户端接收响应,所以,需要有不同的处理器来处理不同的内容
常用编解码器
HttpRequestEncoder
HttpResponseEncoder
HttpRequestDecoder
HttpResponseDecoder
class HttpPipelineInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpPipelineInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
// 解码响应
pipeline.addLast("decoder", new HttpResponseEncoder());
// 编码请求
pipeline.addLast("encoder", new HttpRequestEncoder());
}else {
// 解码请求
pipeline.addLast("decoder", new HttpRequestDecoder());
// 编码响应
pipeline.addLast("encoder", new HttpResponseEncoder());
}
}
}
当一个字节流被解码成Http内容之后,就可以操作具体的HttpObject消息了,但是由于一个完整的请求/响应可能会被拆分成几个部分,所以,直接使用其实不是很合适,更好地方式是使用聚合器。
class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpAggregatorInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
// 客户端编解码器
// 等同于上面的两者
pipeline.addLast("codec", new HttpClientCodec());
}else {
// 服务端编解码器
pipeline.addLast("codec", new HttpServerCodec());
}
// 聚合器,允许最大大小为 512 * 1024
pipeline.addLast("aggregator", new HttpObjectAggregator(512 * 1024));
}
}
聚合之后我们可以直接使用FullHttpRequest
、FullHttpResponse
消息来处理,这两个对象表示的是完整的请求/响应了。
当使用HTTP的时候,如果内容大部分是文本数据,我们一般会使用压缩技术,虽然会增加CPU开销,但是可以有效地节省网络带宽,Netty同样提供了对应的handler并且提供gzip和deflate技术。
class HttpCompressionInitializer extends ChannelInitializer<Channel> {
private final boolean client;
public HttpCompressionInitializer(boolean client) {
this.client = client;
}
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
if (client) {
pipeline.addLast("codec", new HttpClientCodec());
// 客户端解压
pipeline.addLast("decompressor", new HttpContentDecompressor());
}else {
pipeline.addLast("codec", new HttpServerCodec());
// 服务端加压
pipeline.addLast("compressor", new HttpContentCompressor());
}
}
}
同时需要注意,如果是JDK6及以前的版本,需要引入JZlib
依赖。
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jzlib</artifactId>
<version>1.1.3</version>
</dependency>
如果我们需要使用HTTPS,只需要将SslHanlder配置在所有handler的最前面即可。
空闲检测及超时
空闲检测及超时,也可以称为心跳检测,目的就是确保连接的另一端依旧在线,如果不在线,则断开连接,节省资源。
Netty中提供了几个常用的handler
- IdleStateHandler,如果连接空闲时间过长,则触发一个
IdleStateEvent
,可以通过在ChannelInboundHandler覆盖userEventTriggered()
来处理该事件 - ReadTimeoutHandler,当channel中一段时间没有inbound数据的时候,抛出
readTimeoutException
并且关闭channel,可以通过exceptionCaught()
捕获该异常。 - WriteTimeoutHandler,当channel中有一段时间没有outbound数据时,抛出
writeTimeoutException
并且关闭channel,可以通过exceptionCaught()
捕获异常。
需要注意的是,IdleStateHandler
的作用是用于检测channel在指定时间内是否有数据流通,如果没有的话,则触发一个IdleStateEvent
,该Event是用于通知本channel的,而不是用于通知对方,所以,我们可以根据收到的Event来决定处理逻辑,比如
- 对于服务端:收到超过3个对应的事件,表示超过3 * time时间内没有交互,因此决定断开连接。
- 对于客户端:收到事件后,发送一个心跳包(内容其实是随意的,主要是由数据流动),表明自己还活着(注意该事件同样是给自己的,不是给对方的,所以都需要增加对应的逻辑)
下面举一个具体的例子
服务端
public class Server {
public static void main(String[] args) {
ServerBootstrap bootstrap = new ServerBootstrap();
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
bootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.childHandler(new IdleStateHandlerInitializer());
try {
ChannelFuture future = bootstrap.bind(8080).sync();
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
worker.shutdownGracefully();
boss.shutdownGracefully();
}
}
}
/**
* 服务端空闲检测
*/
class IdleStateHandlerInitializer extends ChannelInitializer<Channel>{
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(new IdleStateHandler(0, 0, 60));
pipeline.addLast(new HeartbeatHandler());
}
/**
* 服务端的空闲处理逻辑
*/
private class HeartbeatHandler extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
// 如果超过,则断开连接
if (event.state() == IdleState.ALL_IDLE) {
ctx.writeAndFlush(Unpooled.copiedBuffer("bybe".getBytes()));
ctx.close();
}
}else {
super.userEventTriggered(ctx, evt);
}
}
}
}
客户端
class Client {
public static void main(String[] args) {
Bootstrap bootstrap = new Bootstrap();
EventLoopGroup group = new NioEventLoopGroup();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 如果这里改成70,则会断开
pipeline.addLast(new IdleStateHandler(0, 0, 50, TimeUnit.SECONDS));
pipeline.addLast(new Heartbeat());
}
});
try {
ChannelFuture fu = bootstrap.connect(new InetSocketAddress("localhost", 8080)).sync();
fu.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
group.shutdownGracefully();
}
}
/**
* 客户端空闲检测
*/
private static class Heartbeat extends ChannelInboundHandlerAdapter {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
// 发送心跳包
ctx.writeAndFlush(Unpooled.copiedBuffer("heartbeat".getBytes()));
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf data = (ByteBuf) msg;
System.out.println(data.toString(CharsetUtil.UTF_8));
}
}
}
基于分隔符及长度的协议处理
在某些协议中,是根据换行符或者指定长度来划分的,Netty中提供了基于这两者的处理器
基于分隔符
Netty中主要的基于分隔符的处理器有以下两个
- DelimeterBasedFrameDecoder,基于指定分隔符
- LineBasedFrameDecoder,基于换行符(DelimeterBasedFrameDecoder的特例)
基于长度
Netty中基于长度的处理器有以下两个
- FixedLenghtFrameDecoder,固定长度
- LengthFieldBasedFrameDecoder,通过构造器指定长度字段的偏移及所占字节数
发送大数据
为了高效地发送大量数据,Netty中提供了FileRegion
接口(默认实现DefaultFileRegion
),作为支持zero-copy
的传输器,用于在channel中发送文件
如果需要将数据从文件系统拷贝到用户空间,可以使用ChunkedWriteHandler
,它提供了低消耗内存异步将大数据流写出。
序列化
Netty提供的JDK序列化相关的处理器
- CompatibleObjectDecoder,适用于非Netty的并且使用JDK的序列化器
- CompatibleObjectEncoder,同上
- ObjectDecoder,适用于在JDK序列化器之上使用自定义序列化
- ObjectEncoder,同上
同时,Netty还提供了基于ProtoBuf的处理器,具体的可以参考文件即可,使用上基本差不多
总结
本小节我们主要学习了Netty所提供的几个常用的handler,包括了SSL/TLS相关的handler、HTTP相关的handler、空闲处理器(心跳)、协议分割处理器以及序列化处理器等,有了这些常用的处理器,可以不用处理具体协议的相关内容,从而可以更专注于逻辑方面的处理。