本文由竹子爱熊猫分享,原题“(十一)Netty实战篇:基于Netty框架打造一款高性能的IM即时通讯程序”,本文有修订和改动。
1、引言
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。
原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
2、配套源码
本文配套源码的开源托管地址是:
1)主地址:https://github.com/liuhaijieAdmin/springboot-netty
2)备地址:https://github.com/52im/springboot-netty
3、知识准备
关于 Netty 是什么,这里简单介绍下:
Netty 是一个 Java 开源框架。Netty 提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。
也就是说,Netty 是一个基于 NIO 的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户,服务端应用。
Netty 相当简化和流线化了网络应用的编程开发过程,例如,TCP 和 UDP 的 Socket 服务开发。
有关Netty的入门文章:
1)新手入门:目前为止最透彻的的Netty高性能原理和框架架构解析
2)写给初学者:Java高性能NIO框架Netty的学习方法和进阶策略
3)史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战
如果你连Java NIO都不知道,下面的文章建议优先读:
2)史上最强Java NIO入门:担心从入门到放弃的,请读这篇!
3)Java的BIO和NIO很难懂?用代码实践给你看,再不懂我转行!
Netty源码和API 在线查阅地址:
4、基于Netty设计通信协议
协议,这玩意儿相信大家肯定不陌生了,简单回顾一下协议的概念:网络协议是指一种通信双方都必须遵守的约定,两个不同的端,按照一定的格式对数据进行“编码”,同时按照相同的规则进行“解码”,从而实现两者之间的数据传输与通信。
当自己想要打造一款IM通信程序时,对于消息的封装、拆分也同样需要设计一个协议,通信的两端都必须遵守该协议工作,这也是实现通信程序的前提。
但为什么需要通信协议呢?
因为TCP/IP中是基于流的方式传输消息,消息与消息之间没有边界,而协议的目的则在于约定消息的样式、边界等。
5、Redis通信的RESP协议参考学习
不知大家是否还记得之前我聊到的RESP客户端协议,这是Redis提供的一种客户端通信协议。如果想要操作Redis,就必须遵守该协议的格式发送数据。
这个协议特别简单,如下:
1)首先要求所有命令,都以*开头,后面跟着具体的子命令数量,接着用换行符分割;
2)接着需要先用$符号声明每个子命令的长度,然后再用换行符分割;
3)最后再拼接上具体的子命令,同样用换行符分割。
这样描述有些令人难懂,那就直接看个案例,例如一条简单set命令。
如下:
客户端命令:
setname ZhuZi
转变为RESP指令:
*3
$3
set
$4
name
$5
ZhuZi
按照Redis的规定,但凡满足RESP协议的客户端,都可以直接连接并操作Redis服务端,这也就意味着咱们可以直接通过Netty来手写一个Redis客户端。
代码如下:
// 基于Netty、RESP协议实现的Redis客户端
publicclassRedisClient {
// 换行符的ASCII码
staticfinalbyte[] LINE = {13, 10};
publicstaticvoidmain(String[] args) {
EventLoopGroup worker = newNioEventLoopGroup();
Bootstrap client = newBootstrap();
try{
client.group(worker);
client.channel(NioSocketChannel.class);
client.handler(newChannelInitializer<SocketChannel>() {
@Override
protectedvoidinitChannel(SocketChannel socketChannel)
throwsException {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(newChannelInboundHandlerAdapter(){
// 通道建立成功后调用:向Redis发送一条set命令
@Override
publicvoidchannelActive(ChannelHandlerContext ctx)
throwsException {
String command = "set name ZhuZi";
ByteBuf buffer = respCommand(command);
ctx.channel().writeAndFlush(buffer);
}
// Redis响应数据时触发:打印Redis的响应结果
@Override
publicvoidchannelRead(ChannelHandlerContext ctx,
Object msg) throwsException {
// 接受Redis服务端执行指令后的结果
ByteBuf buffer = (ByteBuf) msg;
System.out.println(buffer.toString(CharsetUtil.UTF_8));
}
});
}
});
// 根据IP、端口连接Redis服务端
client.connect("192.168.12.129", 6379).sync();
} catch(Exception e){
e.printStackTrace();
}
}
privatestaticByteBuf respCommand(String command){
// 先对传入的命令以空格进行分割
String[] commands = command.split(" ");
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
// 遵循RESP协议:先写入指令的个数
buffer.writeBytes(("*"+ commands.length).getBytes());
buffer.writeBytes(LINE);
// 接着分别写入每个指令的长度以及具体值
for(String s : commands) {
buffer.writeBytes(("$"+ s.length()).getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes(s.getBytes());
buffer.writeBytes(LINE);
}
// 把转换成RESP格式的命令返回
returnbuffer;
}
}
在上述这个案例中,也仅仅只是通过respCommand()这个方法,对用户输入的指令进行了转换。同时在上面通过Netty,与Redis的地址、端口建立了连接。在连接建立成功后,就会向Redis发送一条转换成RESP指令的set命令。接着等待Redis的响应结果并输出,如下:
+OK
因为这是一条写指令,所以当Redis收到执行完成后,最终就会返回一个OK,大家也可直接去Redis中查询,也依旧能够查询到刚刚写入的name这个键值。
6、HTTP超文本传输协议参考学习
前面咱们自己针对于Redis的RESP协议,对用户指令进行了封装,然后发往Redis执行。
但对于这些常用的协议,Netty早已提供好了现成的处理器,想要使用时无需从头开发,可以直接使用现成的处理器来实现。
比如现在咱们可以基于Netty提供的处理器,实现一个简单的HTTP服务器。
代码如下:
// 基于Netty提供的处理器实现HTTP服务器
publicclassHttpServer {
publicstaticvoidmain(String[] args) throwsInterruptedException {
EventLoopGroup boss = newNioEventLoopGroup();
EventLoopGroup worker = newNioEventLoopGroup();
ServerBootstrap server = newServerBootstrap();
server
.group(boss,worker)
.channel(NioServerSocketChannel.class)
.childHandler(newChannelInitializer<NioSocketChannel>() {
@Override
protectedvoidinitChannel(NioSocketChannel ch) {
ChannelPipeline pipeline = ch.pipeline();
// 添加一个Netty提供的HTTP处理器
pipeline.addLast(newHttpServerCodec());
pipeline.addLast(newChannelInboundHandlerAdapter() {
@Override
publicvoidchannelRead(ChannelHandlerContext ctx,
Object msg) throwsException {
// 在这里输出一下消息的类型
System.out.println("消息类型:"+ msg.getClass());
super.channelRead(ctx, msg);
}
});
pipeline.addLast(newSimpleChannelInboundHandler<HttpRequest>() {
@Override
protectedvoidchannelRead0(ChannelHandlerContext ctx,
HttpRequest msg) throwsException {
System.out.println("客户端的请求路径:"+ msg.uri());
// 创建一个响应对象,版本号与客户端保持一致,状态码为OK/200
DefaultFullHttpResponse response =
newDefaultFullHttpResponse(
msg.protocolVersion(),
HttpResponseStatus.OK);
// 构造响应内容
byte[] content = "<h1>Hi, ZhuZi!</h1>".getBytes();
// 设置响应头:告诉客户端本次响应的数据长度
response.headers().setInt(
HttpHeaderNames.CONTENT_LENGTH,content.length);
// 设置响应主体
response.content().writeBytes(content);
// 向客户端写入响应数据
ctx.writeAndFlush(response);
}
});
}
})
.bind("127.0.0.1",8888)
.sync();
}
}
在该案例中,咱们就未曾手动对HTTP的数据包进行拆包处理了,而是在服务端的pipeline上添加了一个HttpServerCodec处理器,这个处理器是Netty官方提供的。
其类继承关系如下:
publicfinalclassHttpServerCodec
extendsCombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implementsSourceCodec {
// ......
}
观察会发现,该类继承自CombinedChannelDuplexHandler这个组合类,它组合了编码器、解码器。
这也就意味着HttpServerCodec即可以对客户端的数据做解码,也可以对服务端响应的数据做编码。
同时除开添加了这个处理器外,在第二个处理器中打印了一下客户端的消息类型,最后一个处理器中,对客户端的请求做出了响应,其实也就是返回了一句话而已。
此时在浏览器输入http://127.0.0.1:8888/index.html,结果如下:
消息类型:classio.netty.handler.codec.http.DefaultHttpRequest
消息类型:classio.netty.handler.codec.http.LastHttpContent$1
客户端的请求路径:/index.html
此时来看结果,客户端的请求会被解析成两个部分:
1)第一个是请求信息;
2)第二个是主体信息。
但按理来说浏览器发出的请求,属于GET类型的请求,GET请求是没有请求体信息的,但Netty依旧会解析成两部分~,只不过GET请求的第二部分是空的。
在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。
在第三个处理器中,咱们直接向客户端返回了一个h1标签,同时也要记得在响应头里面,加上响应内容的长度信息,否则浏览器的加载圈,会一直不同的转动,毕竟浏览器也不知道内容有多长,就会一直反复加载,尝试等待更多的数据。
7、自定义消息传输协议
7.1概述
Netty除开提供了HTTP协议的处理器外,还提供了DNS、HaProxy、MemCache、MQTT、Protobuf、Redis、SCTP、RTSP.....一系列协议的实现,具体定义位于io.netty.handler.codec这个包下,当然,咱们也可以自己实现自定义协议,按照自己的逻辑对数据进行编解码处理。
很多基于Netty开发的中间件/组件,其内部基本上都开发了专属的通信协议,以此来作为不同节点间通信的基础,所以解下来咱们基于Netty也来自己设计一款通信协议,这也会作为后续实现聊天程序时的基础。
所谓的协议设计,其实仅仅只需要按照一定约束,实现编码器与解码器即可,发送方在发出数据之前,会经过编码器对数据进行处理,而接收方在收到数据之前,则会由解码器对数据进行处理。
7.2自定义协议的要素
在自定义传输协议时,咱们必然需要考虑几个因素,如下:
1)魔数:用来第一时间判断是否为自己需要的数据包;
2)版本号:提高协议的拓展性,方便后续对协议进行升级;
3)序列化算法:消息正文具体该使用哪种方式进行序列化传输,例如Json、ProtoBuf、JDK...;
4)消息类型:第一时间判断出当前消息的类型;
5)消息序号:为了实现双工通信,客户端和服务端之间收/发消息不会相互阻塞;
6)正文长度:提供给LTC解码器使用,防止解码时出现粘包、半包的现象;
7)消息正文:本次消息要传输的具体数据。
在设计协议时,一个完整的协议应该涵盖上述所说的几方面,这样才能提供双方通信时的基础。
基于上述几个字段,能够在第一时间内判断出:
1)消息是否可用;
2)当前协议版本;
3)消息的具体类型;
4)消息的长度等各类信息。
从而给后续处理器使用(自定义的协议规则本身就是一个编解码处理器而已)。
7.3自定义协议实战
前面简单聊到过,所谓的自定义协议就是自己规定消息格式,以及自己实现编/解码器对消息实现封装/拆解,所以这里想要自定义一个消息协议,就只需要满足前面两个条件即可。
因此实现如下:
@ChannelHandler.Sharable
publicclassChatMessageCodec extendsMessageToMessageCodec<ByteBuf, Message> {
// 消息出站时会经过的编码方法(将原生消息对象封装成自定义协议的消息格式)
@Override
protectedvoidencode(ChannelHandlerContext ctx, Message msg,
List<Object> list) throwsException {
ByteBuf outMsg = ctx.alloc().buffer();
// 前五个字节作为魔数
byte[] magicNumber = newbyte[]{'Z','h','u','Z','i'};
outMsg.writeBytes(magicNumber);
// 一个字节作为版本号
outMsg.writeByte(1);
// 一个字节表示序列化方式 0:JDK、1:Json、2:ProtoBuf.....
outMsg.writeByte(0);
// 一个字节用于表示消息类型
outMsg.writeByte(msg.getMessageType());
// 四个字节表示消息序号
outMsg.writeInt(msg.getSequenceId());
// 使用Java-Serializable的方式对消息对象进行序列化
ByteArrayOutputStream bos = newByteArrayOutputStream();
ObjectOutputStream oos = newObjectOutputStream(bos);
oos.writeObject(msg);
byte[] msgBytes = bos.toByteArray();
// 使用四个字节描述消息正文的长度
outMsg.writeInt(msgBytes.length);
// 将序列化后的消息对象作为消息正文
outMsg.writeBytes(msgBytes);
// 将封装好的数据传递给下一个处理器
list.add(outMsg);
}
// 消息入站时会经过的解码方法(将自定义格式的消息转变为具体的消息对象)
@Override
protectedvoiddecode(ChannelHandlerContext ctx,
ByteBuf inMsg, List<Object> list) throwsException {
// 读取前五个字节得到魔数
byte[] magicNumber = newbyte[5];
inMsg.readBytes(magicNumber,0,5);
// 再读取一个字节得到版本号
byteversion = inMsg.readByte();
// 再读取一个字节得到序列化方式
byteserializableType = inMsg.readByte();
// 再读取一个字节得到消息类型
bytemessageType = inMsg.readByte();
// 再读取四个字节得到消息序号
intsequenceId = inMsg.readInt();
// 再读取四个字节得到消息正文长度
intmessageLength = inMsg.readInt();
// 再根据正文长度读取序列化后的字节正文数据
byte[] msgBytes = newbyte[messageLength];
inMsg.readBytes(msgBytes,0,messageLength);
// 对于读取到的消息正文进行反序列化,最终得到具体的消息对象
ByteArrayInputStream bis = newByteArrayInputStream(msgBytes);
ObjectInputStream ois = newObjectInputStream(bis);
Message message = (Message) ois.readObject();
// 最终把反序列化得到的消息对象传递给后续的处理器
list.add(message);
}
}
上面自定义的处理器中,继承了MessageToMessageCodec类,主要负责将数据在原生ByteBuf与Message之间进行相互转换,而Message对象是自定义的消息对象,这里暂且无需过多关心。
其中主要实现了两个方法:
1)encode():出站时会经过的编码方法,会将原生消息对象按自定义的协议封装成对应的字节数据;
2)decode():入站时会经过的解码方法,会将协议格式的字节数据,转变为具体的消息对象。
上述自定义的协议,也就是一定规则的字节数据,每条消息数据的组成如下:
1)魔数:使用第1~5个字节来描述,这个魔数值可以按自己的想法自定义;
2)版本号:使用第6个字节来描述,不同数字表示不同版本;
3)序列化算法:使用第7个字节来描述,不同数字表示不同序列化方式;
4)消息类型:使用第8个字节来描述,不同的消息类型使用不同数字表示;
5)消息序号:使用第9~12个字节来描述,其实就是一个四字节的整数;
6)正文长度:使用第13~16个字节来描述,也是一个四字节的整数;
7)消息正文:长度不固定,根据每次具体发送的数据来决定。
在其中,为了实现简单,这里的序列化方式,则采用的是JDK默认的Serializable接口方式,但这种方式生成的对象字节较大,实际情况中最好还是选择谷歌的ProtoBuf方式,这种算法属于序列化算法中,性能最佳的一种落地实现。
当然,这个自定义的协议是提供给后续的聊天业务使用的,但这种实战型的内容分享,基本上代码量较高,所以大家看起来会有些枯燥,而本文所使用的聊天室案例,是基于《B站-黑马Netty视频教程》二次改良的,因此如若感觉文字描述较为枯燥,可直接点击前面给出的链接,观看P101~P121视频进行学习。
最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。
PS:如果你想系统学习Protobuf,可以从以下文章入手:
《强列建议将Protobuf作为你的即时通讯应用数据传输格式》
《IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!》
《IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点》
《IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理》
《IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理》
《IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)》
最后来观察一下,大家会发现,在咱们定义的这个协议编解码处理器上,存在着一个@ChannelHandler.Sharable注解,这个注解的作用是干吗的呢?其实很简单,用来标识当前处理器是否可在多线程环境下使用,如果带有该注解的处理器,则表示可以在多个通道间共用,因此只需要创建一个即可,反之同理,如果不带有该注解的处理器,则每个通道需要单独创建使用。
PS:如果你想系统学习Protobuf,可以从以下文章入手:
《强列建议将Protobuf作为你的即时通讯应用数据传输格式》
《IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!》
《IM通讯协议专题学习(二):快速理解Protobuf的背景、原理、使用、优缺点》
《IM通讯协议专题学习(三):由浅入深,从根上理解Protobuf的编解码原理》
《IM通讯协议专题学习(四):从Base64到Protobuf,详解Protobuf的数据编码原理》
《IM通讯协议专题学习(八):金蝶随手记团队的Protobuf应用实践(原理篇)》
12、系列文章
《跟着源码学IM(一):手把手教你用Netty实现心跳机制、断线重连机制》
《跟着源码学IM(二):自已开发IM很难?手把手教你撸一个Andriod版IM》
《跟着源码学IM(三):基于Netty,从零开发一个IM服务端》
《跟着源码学IM(四):拿起键盘就是干,教你徒手开发一套分布式IM系统》
《跟着源码学IM(五):正确理解IM长连接、心跳及重连机制,并动手实现》
《跟着源码学IM(六):手把手教你用Go快速搭建高性能、可扩展的IM系统》
《跟着源码学IM(七):手把手教你用WebSocket打造Web端IM聊天》
《跟着源码学IM(八):万字长文,手把手教你用Netty打造IM聊天》
《跟着源码学IM(九):基于Netty实现一套分布式IM系统》
《跟着源码学IM(十):基于Netty,搭建高性能IM集群(含技术思路+源码)》
《跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)》
《跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序》(* 本文)
《SpringBoot集成开源IM框架MobileIMSDK,实现即时通讯IM聊天功能》
13、参考资料
[1]浅谈IM系统的架构设计
[2]简述移动端IM开发的那些坑:架构设计、通信协议和客户端
[3]一套海量在线用户的移动端IM架构设计实践分享(含详细图文)
[5]一套亿级用户的IM架构技术干货(上篇):整体架构、服务拆分等
[6]一套亿级用户的IM架构技术干货(下篇):可靠性、有序性、弱网优化等
[7]史上最通俗Netty框架入门长文:基本介绍、环境搭建、动手实战
[8]强列建议将Protobuf作为你的即时通讯应用数据传输格式
[9]IM通讯协议专题学习(一):Protobuf从入门到精通,一篇就够!
[12]零基础IM开发入门(四):什么是IM系统的消息时序一致性?
[17]为何基于TCP协议的移动端IM仍然需要心跳保活机制?
[18]一文读懂即时通讯应用中的网络心跳包机制:作用、原理、实现思路等
[19]微信团队原创分享:Android版微信后台保活实战分享(网络保活篇)
[20]融云技术分享:融云安卓端IM产品的网络链路保活技术实践
(本文已同步发布于:http://www.52im.net/thread-4530-1-1.html)