前言
即时通信是指能够实时发送和接收互联网消息等的业务通信系统,允许两人或多人使用网络实时的传递文字消息、文件、语音与视频交流,其主要的功能核心是收发消息。即时通信发展至今,已是非常重要的互联网应用形态之一,尤其移动互联网时代,它正以无与论比的优势降低了沟通成本和交流门槛,对各种应用形态产生了深远影响。
做为即时通信开发者或即将成为即时通信开发者的技术人员,即时通信的价值和重要性不言自明。但从技术实现来说,即时通信系统的开发(尤其是移动端即时通信)还是存在许多技术难点和坑点的。也正因如此,优质的即时通信开发资料、实践成果,对于没有太多技术储备的新手来说,尤其难以获得。
本文将以新手的视角引导你阅读相关文章,便于你从零开发一个即时通信做好方方面面的知识准备:包括但不限于网络编程基础、通信协议的选型、即时通信的架构设计等等。文笔有限,如有不妥之处还请批评指正,希望对你有用。
即时通讯架构
从上边的架构图中分为4层,分别为网络层,连接管理层,数据层,业务层。其实4层可以整理成3层,将连接层和网络层进行合并。为什么要分这么多层呢?其实分层是为了系统更好的开发和维护更加清晰方便,例如要换socket网络框架,我们可以避免修改大量的修改业务层和数据层,可以避免大量代码和业务交叉,形成高内聚低耦合。
客户端与服务端是通过TCP/UDP进行网络通信,具体即时通讯流程逻辑如下,看图就明白了。
网络层
七层协议
ISO/OSI七层协议,自下而上分别是物理层、数据链路层、网络层、运输层、会话层、表示层和应用层。我们这里的网络层是中七层协议运输层TCP和UDP。
TCP与UDP的区别
TCP是长连接,可以保证数据的传输可靠性 ,保证数据顺序,发送数据前必须要进行连接,面向流模式。
UDP是无连接,可能产生丢包,不保证数据顺序,面向数据报,知道对端的IP和端口号就直接进行传输,不需要建立连接可以发送数据。
UDP和TCP各有各的应用场景,早期的即时通讯因为服务端资源(服务器硬件、网络带宽等)比较昂贵且没有更好的办法来分担性能负载,所以很多时候会考虑使用UDP,这其中主要是早期的QQ为代表。
时至今日,TCP的服务端负载已经有了很好的解决方案,加之服务器资源成本的下降,目前很多即时通讯、消息推送解决方案也都在使用TCP作为传输层协议。不过,UDP也并未排除在即时通讯、消息推送的解决方案之外,比如:弱网络通信(包括跨国的高延迟网络环境)、物联网通信、即时通讯中的实时音视频通信等等场景下,UDP依然是首选项。
socket缓冲区
socket数据是怎么样发送和接收的?其中的过程是怎么样的?接下来我们从缓冲区中来进行了解。
缓冲区简介
write()/send() 每个 socket 被创建后,都会分配两个缓冲区,输入缓冲区和输出缓冲区。无论是发送和接收,并不立即向网络中传输数据,而是先将数据写入缓冲区中,再由TCP协议将数据从缓冲区发送到目标机器。一旦将数据写入到缓冲区,函数就可以成功返回,不管它们有没有到达目标机器,也不管它们何时被发送到网络,这些都是TCP协议负责的事情。
read()/recv() 函数也是如此,也从输入缓冲区中读取数据,而不是直接从网络中读取。
- I/O缓冲区在每个TCP套接字中单独存在;
- I/O缓冲区在创建套接字时自动生成;
- 即使关闭套接字也会继续传送输出缓冲区中遗留的数据;
- 关闭套接字将丢失输入缓冲区中的数据。
阻塞模式
【对于TCP套接字(默认情况下),当使用 write()/send() 发送数据时】
首先会检查缓冲区,如果缓冲区的可用空间长度小于要发送的数据,那么 write()/send() 会被阻塞(暂停执行),直到缓冲区中的数据被发送到目标机器,腾出足够的空间,才唤醒 write()/send() 函数继续写入数据;
如果TCP协议正在向网络发送数据,那么输出缓冲区会被锁定,不允许写入,write()/send() 也会被阻塞,直到数据发送完毕缓冲区解锁,write()/send() 才会被唤醒。如果TCP协议正在向网络发送数据,那么输出缓冲区会被锁定,不允许写入,write()/send() 也会被阻塞,直到数据发送完毕缓冲区解锁,write()/send() 才会被唤醒;
如果要写入的数据大于缓冲区的最大长度,那么将分批写入。如果要写入的数据大于缓冲区的最大长度,那么将分批写入;
直到所有数据被写入缓冲区 write()/send() 才能返回。直到所有数据被写入缓冲区 write()/send() 才能返回。
【当使用 read()/recv() 读取数据时】
首先会检查缓冲区,如果缓冲区中有数据,那么就读取,否则函数会被阻塞,直到网络上有数据到来;
如果要读取的数据长度小于缓冲区中的数据长度,那么就不能一次性将缓冲区中的所有数据读出,剩余数据将不断积压,直到有 read()/recv() 函数再次读取。如果要读取的数据长度小于缓冲区中的数据长度,那么就不能一次性将缓冲区中的所有数据读出,剩余数据将不断积压,直到有 read()/recv() 函数再次读取;
直到读取到数据后 read()/recv() 函数才会返回,否则就一直被阻塞。
TCP发送数据的过程
CP发送数据的大体过程:首先,TCP是有链接的可靠传输协议,所谓可靠也就是说保证客户端发送的数据服务端都能够收到,并且是按序收到。那么对于上面的问题就不可能存在数据的丢弃。那么客户端一直发送数据越来越多怎么办?下面我们分析一下TCP的传输过程。
数据首先由应用程序缓冲区复制到发送端的输出缓冲区(位于内核),注意这个过程是用类似write功能的函数完成的。有的人通常看到write成功就以为数据发送到了对端主机,其实这是错误的,write成功仅仅表示数据成功的由应用进程缓冲区复制到了输出缓冲区。
然后内核协议栈将输出缓冲区中的数据发送到对端主机,注意这个过程不受应用程序控制,而是发送端内核协议栈完成,其中包括使用滑动窗口、拥塞控制等功能。
数据到达接收端主机的输入缓冲区,注意这个接收过程也不受应用程序控制,而是由接收端内核协议栈完成,其中包括发送ack确认等。
数据由套接字接收缓冲区复制到接收端应用程序缓冲区,注意这个过程是由类似read等函数来完成。
相关文章
- 《TCP/IP详解 - 第11章·UDP:用户数据报协议》
- 《TCP/IP详解 - 第17章·TCP:传输控制协议》
- 《TCP/IP详解 - 第18章·TCP连接的建立与终止》
- 《TCP/IP详解 - 第21章·TCP的超时与重传》
- 《通俗易懂-深入理解TCP协议(上):理论基础》
- 《网络编程懒人入门(三):快速理解TCP协议一篇就够》
TCP/IP发送数据简要流程
网络层技术框架选型
服务端
对于服务端选择选择是Netty通信网络框架,首先是广泛应用的java语言,而且异步高性能、高可靠性和高成熟度。Netty对网络底层封装很好了,不用考虑上边说的数据发送和接收的过程。
移动端
iOS可以选择SocketRocket,CocoaAsyncSocket。
安卓可以选择Netty通信网络框架,和java服务端一样。
不过 微信自用的移动端IM网络层跨平台组件库Mars已正式开源,这个网络框架好处如下。
- 提供长连、短连两种网络通道;
- 常规的网络能力,例如 DNS 防劫持、动态 IP 下发、就近接入、容灾恢复等;
- 贴合移动互联网的网络层解决方案;
- 贴合移动终端的平台特性:前后台、活跃态、休眠、省电、省流量等。
数据层
其实良好的架构,移动端和服务端是逻辑思维相同的,做到万法归一。服务端和移动端都要进行数据模型转化,数据存储,报文的定义,数据二进制处理,数据加密解密处理。只不过是语言不同。类似我们和老外一样都需要吃饭,我们用筷子,老外用刀叉。
报文数据定义
一个完整的报文包括报文头和报文体,报文头固定长度里面包含报文体的长度。每次先读报文头,在读报文体。读完报文体后,在接下来读下一个报文头。报文体里面包含加密的业务数据。
二进制处理
二进制和数据转换处理和字符串压缩解压处理
java版
import org.xerial.snappy.Snappy;
import java.io.IOException;
public class ByteUtil {
//将低字节在前转为int,高字节在后的byte数组(与IntToByteArray1想对应)
public static int byteArrayToInt(byte[] b){
byte[] a = new byte[4];
int i = a.length - 1,j = b.length - 1;
for (; i >= 0 ; i--,j--) {//从b的尾部(即int值的低位)开始copy数据
if(j >= 0)
a[i] = b[j];
else
a[i] = 0;//如果b.length不足4,则将高位补0
}
int v0 = (a[0] & 0xff) << 24;//&0xff将byte值无差异转成int,避免Java自动类型提升后,会保留高位的符号位
int v1 = (a[1] & 0xff) << 16;
int v2 = (a[2] & 0xff) << 8;
int v3 = (a[3] & 0xff) ;
return v0 + v1 + v2 + v3;
}
public static long byteArrayToLong(byte[] byteNum) {
long num = 0;
for (int ix = 0; ix < 8; ++ix) {
num <<= 8;
num |= (byteNum[ix] & 0xff);
}
return num;
}
public static byte[] integerToBytes(int res) {
byte[] targets = new byte[4];
targets[3] = (byte) (res & 0xff);// 最低位
targets[2] = (byte) ((res >> 8) & 0xff);// 次低位
targets[1] = (byte) ((res >> 16) & 0xff);// 次高位
targets[0] = (byte) (res >>> 24);// 最高位,无符号右移。
return targets;
}
/**
*
* @param data1
* @param data2
* @return data1 与 data2拼接的结果
*/
public static byte[] addBytes(byte[] data1, byte[] data2) {
if (data1 == null) {
return data2;
} else {
byte[] data3 = new byte[data1.length + data2.length];
System.arraycopy(data1, 0, data3, 0, data1.length);
System.arraycopy(data2, 0, data3, data1.length, data2.length);
return data3;
}
}
/**
* 压缩字符
* @param
* @param
*
* */
public static byte[] compressHtml(String value){
try {
return Snappy.compress(value.getBytes("UTF-8"));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
/**
* 解压字符
* @param
* @param
*
* */
public static String decompressString(byte[] bytes){
try {
return new String(Snappy.uncompress(bytes));
} catch (IOException e) {
e.printStackTrace();
return null;
}
}
}
oc版
#import "Msg.h"
#import "SecurityUtil.h"
#import "snappy-ios.h"
@implementation Msg
-(id)init;
{
self=[super init];
buffer=[[NSMutableData alloc] init];
return self;
}
-(void) dealloc;
{
[buffer release];
buffer = nil;
[super dealloc];
}
-(void)flush
{
SInt32 v=(SInt32)[buffer length] -4;
SInt8 bytes[4];
bytes[0]=(SInt8)((v >> 24) & 0xFF);
bytes[1]=(SInt8)((v >> 16) & 0xFF);
bytes[2]=(SInt8)((v >> 8)& 0xFF);
bytes[3]=(SInt8)(v & 0xFF);
[buffer replaceBytesInRange:NSMakeRange(0, 4) withBytes:bytes];
}
-(void)writeInt:(SInt32)value
{
SInt8 bytes[4];
bytes[0]=(SInt8)((value >> 24) & 0xFF);
bytes[1]=(SInt8)((value >> 16) & 0xFF);
bytes[2]=(SInt8)((value >> 8)& 0xFF);
bytes[3]=(SInt8)(value & 0xFF);
[buffer appendBytes:&bytes length:4];
}
-(void)writeLong:(SInt64)value
{
SInt8 bytes[8];
bytes[0]=(SInt8)((value >> 56) & 0xFF);
bytes[1]=(SInt8)((value >> 48) & 0xFF);
bytes[2]=(SInt8)((value >> 40)& 0xFF);
bytes[3]=(SInt8)((value >> 32)& 0xFF);
bytes[4]=(SInt8)((value >> 24)& 0xFF);
bytes[5]=(SInt8)((value >> 16)& 0xFF);
bytes[6]=(SInt8)((value >> 8)& 0xFF);
bytes[7]=(SInt8)(value & 0xFF);
[buffer appendBytes:&bytes length:8];
}
-(void)writeString:(NSString *)value
{
NSData *data = [value dataUsingEncoding: NSUTF8StringEncoding];
SInt32 count= (SInt32)[data length];
[self writeInt:count];
[buffer appendData:data];
}
-(void)writeContent:(NSString *)value
{
NSData *data = [value dataUsingEncoding: NSUTF8StringEncoding];
if ([data length] >=DataPacketLimit){
[self writeInt:1];
data= [data compressed];
}else{
[self writeInt:0];
}
SInt32 count= (SInt32)[data length];
[self writeInt:count];
[buffer appendData:data];
}
-(void)writeEncrypt:(NSString *)key Content:(NSString *)value
{
value=[SecurityUtil encrypt:key content:value];
[self writeString:value];
}
-(void)writeBytes:(NSData *) value
{
[buffer appendData:value];
}
-(NSData*) getBuffer
{
NSData *result=[NSData dataWithData:buffer];
return result;
}
@end
加密解密处理
数据进行的加密和解密采用的方式是Base64和AES(AES/ECB/PKCS5Padding 对称加密算法)
AES,RSA对称加密和非对称加密
关于RSA加密机制:是非对称加密方式,两个钥,公钥和私钥,公钥用于加密数据,可以分享给其他用户,私钥可以用于解密用公钥加密的数据,关于安全问题是公钥泄露不会影响安全问题,公钥与私钥是一一对应的关系。
AES加密也叫对称加密:A用密码对数据进行AES加密后,B用同样的密码对密文进行AES解密。
数据和模型之间的转换
数据和模型之间的转换也就是JSON和模型之间的转换,把JSON转换为模型是为了更好的业务处理。而模型转换为JSON是为了数据之间的传输。
数据存储
数据存储放到数据库中,分为服务端和客户端,具体如下
服务端
服务端数据存储把未读的消息放到redis或者是内存中。把全都消息放mysql中。客户端向服务端拉消息时,以时间戳为比较,进行数据拉取。
移动端
移动端可以选择的数据库比较多比如iOS中可以选择苹果自带的数据库,但是不支持其他平台进行数据迁移。如果sql语句学的好的,也可以选SQLite,支持iOS和安卓平台。推荐使用Realm,使用方便,面向对象,是跨平台数据库。
业务层
这里业务层不做过多的阐述,只是客户端和服务端的业务往来。根据自己项目的需求,务端和客户端定义数据格式和内容。一般的即时通讯的业务有单聊,群聊,好友管理,房间信息,好友信息等等。也可以进行外带其他业务。
服务端负载均衡集群
单机系统的时候,客户端和连接都有同一台服务器管理。
在本地维护一份userId到connetciont的映射
服务器可以根据userId找出对应的连接,然后把消息push出去
一个真正的互联网产品不可能只有一台服务器,做单机场景,把服务端ip和端口写死,如果一台服务器崩掉,没有办法立即恢复。所以我们要通过集群的来保证高可用,来进行服务端负载均衡,具体流程如下。
- client每次访问tcp-im-server前,先调用一个新增的get-tcp-ip接口,对于client而言,这个http接口只返回一个tcp-im-server的IP;
- 这个http接口,实现的是原client的IP均衡策略;
-
拿到tcp-im-server的IP后,和原来一样向tcp-im-server发起TCP长连接,若连接不上在进行get-tcp-ip请求。
扩展性问题解决
- 如果原有IP发生变化,只需要修改get-tcp-ip接口的配置;
- 如果新增IP,也是修改get-tcp-ip接口的配置;
-
如果负载均衡策略变化,需要升级客户端。
主动拉取tcp-im-server的状态
分发服务器通过“拉”的方式获取各个tcp-im-server的状态,每个tcp-im-server都独立与解耦,只需专注于资深的tcp业务功能即可。
高可用、负载均衡、扩展性等任务由get-tcp-ip的web-server专注来执行。
服务器消息的路由
若是两个用户在不同的tcp-im-server上,需要进行消息穿透转发。
例如:
小明向小张发的消息,需要小张的对应连接的tcp-im-server才能推送
要完成这个需求需要解决两个问题:
1、聊天服务器这么多,怎么才能知道小张连接了哪一台tcp-im-server?
2、知道是哪一台服务器,怎么才能把消息精准发送到对应的tcp-im-server?
第一个问题好解决,用redis去做map映射。当用户创建连接的时候在redis中把userId(key),tcp-im-server(value)存储起来。这样我们根据userId找到tcp-im-server。
第二个问题,怎样把消息精准的转发到对应的tcp-im-server。
可以让每个tcp-im-server启动的时候都有自己的mq和routingkey,使用rabbitMq的TopicExchange交换器,这样消息就能精准投递到对应的机器,routingKey可以用上面定义的tcp-im-server。
同时queue的熟悉选专属队列,这样服务器重启后,连接断开后,旧的队列会自动删除
这样就搭建好来服务端负载均衡集群
相关文章
dome代码实例
IM聊天服务器代码----IMServer
IM应用服务器代码----IMAppServer
IMiOS客户端代码----IMApp-iOS
结语
即时通讯技术发展的非常快,总结出来希望对大家有所帮助,不对的地方欢迎指出。
QQ:328555416