长连接貌似是一个很高深莫测的知识,但是只要你做直播、IM、游戏、弹幕里面的任何一种,或者是你的app想要实时的接收某些消息,你就会要接触到长连接技术。本文主要教你如何在客户端如何使用Socket实现长连接。
Socket背景知识
要做长连接的话,是不能用http协议来做的,因为http协议已经是应用层协议了,并且http协议是无状态的,而我们要做长连接,肯定是需要在应用层封装自己的业务,所以就需要基于TCP协议来做,而基于TCP协议的话,就要用到Socket了。
Socket是java针对tcp层通信封装的一套网络方案
TCP协议我们知道,是基于ip(或者域名)和端口对指定机器进行的点对点访问,他的连接成功有两个条件,就是对方ip可以到达和端口是开放的
Socket能帮完成TCP三次握手,而应用层的头部信息需要自己去解析,也就是说,自己要制定好协议,并且要去解析byte
Socket使用方式
Socket看上去不是很好用,因为他是基于java.io来实现的,你要直接跟InputStream和OutputStream打交道,也就是直接跟byte[]打交道,所以用起来并不是这么友好。
下面通过一个简单的例子,往一台服务器发\01 \00 \00 \00 \00这一串字节,服务器也返回相同的字节流,上代码
public void testSocketChannelBlock() throws Exception {
final SocketChannel channel = SocketChannel.open(address);
ByteBuffer output = ByteBuffer.allocate(5);
output.put((byte) 1);
output.putInt(0);
output.flip();
channel.write(output);
logger.debug("write complete, start read");
ByteBuffer input = ByteBuffer.allocate(5);
int readByte = channel.read(input);
logger.debug("readByte " + readByte);
input.flip();
if (readByte == -1) {
logger.debug("readByte == -1, return!");
return;
}
for (int i = 0; i < readByte; i++) {
logger.debug("read [" + i + "]:" + input.get());
}
}
Selector
我们知道,传统io是阻塞的,也就是说,一个线程只能处理一个io流,也就是一个Socket。有了Selector之后,一个线程就能处理多个SocketChannel。
Selector的原理是,他能接受多个SocketChannel,然后不断的遍历每一个Channel的状态,如果有Channel已经ready了,他就能通过他自身提供的方法,通知到线程,让线程去处理对应的业务。流程图如下:
Netty对nio这一套有比较好的封装,里面就涉及到了Selector,
Netty 优点
1.并发高
2.传输快
3.封装好
(1)Netty为什么并发高
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高,两张图让你了解BIO和NIO的区别:
从这两图可以看出,NIO的单线程能处理连接的数量比BIO要高出很多,而为什么单线程能处理更多的连接呢?原因就是图二中出现的Selector。
当一个连接建立之后,他有两个步骤要做,第一步是接收完客户端发过来的全部数据,第二步是服务端处理完请求业务之后返回response给客户端。NIO和BIO的区别主要是在第一步。
在BIO中,等待客户端发数据这个过程是阻塞的,这样就造成了一个线程只能处理一个请求的情况,而机器能支持的最大线程数是有限的,这就是为什么BIO不能支持高并发的原因。
而NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断的去遍历所有的Socket,一旦有一个Socket建立完成,他会通知Thread,然后Thread处理完数据再返回给客户端——这个过程是不阻塞的,这样就能让一个Thread处理更多的请求了。
下面两张图是基于BIO的处理流程和netty的处理流程,辅助你理解两种方式的差别:
除了BIO和NIO之外,还有一些其他的IO模型,下面这张图就表示了五种IO模型的处理流程:
BIO,同步阻塞IO,阻塞整个步骤,如果连接少,他的延迟是最低的,因为一个线程只处理一个连接,适用于少连接且延迟低的场景,比如说数据库连接。
NIO,同步非阻塞IO,阻塞业务处理但不阻塞数据接收,适用于高并发且处理简单的场景,比如聊天软件。
多路复用IO,他的两个步骤处理是分开的,也就是说,一个连接可能他的数据接收是线程a完成的,数据处理是线程b完成的,他比BIO能处理更多请求。
信号驱动IO,这种IO模型主要用在嵌入式开发,不参与讨论。
异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。
(2)Netty为什么传输快
Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。
(3)为什么说Netty封装好?
阻塞I/O
public class PlainOioServer {
public void serve(int port) throws IOException {
final ServerSocket socket = new ServerSocket(port); //1
try {
for (;;) {
final Socket clientSocket = socket.accept(); //2
System.out.println("Accepted connection from " + clientSocket);
new Thread(new Runnable() { //3
@Override
public void run() {
OutputStream out;
try {
out = clientSocket.getOutputStream();
out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8"))); //4
out.flush();
clientSocket.close(); //5
} catch (IOException e) {
e.printStackTrace();
try {
clientSocket.close();
} catch (IOException ex) {
// ignore on close
}
}
}
}).start(); //6
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
非阻塞IO
public class PlainNioServer {
public void serve(int port) throws IOException {
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
ServerSocket ss = serverChannel.socket();
InetSocketAddress address = new InetSocketAddress(port);
ss.bind(address); //1
Selector selector = Selector.open(); //2
serverChannel.register(selector, SelectionKey.OP_ACCEPT); //3
final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
for (;;) {
try {
selector.select(); //4
} catch (IOException ex) {
ex.printStackTrace();
// handle exception
break;
}
Set<SelectionKey> readyKeys = selector.selectedKeys(); //5
Iterator<SelectionKey> iterator = readyKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
try {
if (key.isAcceptable()) { //6
ServerSocketChannel server =
(ServerSocketChannel)key.channel();
SocketChannel client = server.accept();
client.configureBlocking(false);
client.register(selector, SelectionKey.OP_WRITE |
SelectionKey.OP_READ, msg.duplicate()); //7
System.out.println(
"Accepted connection from " + client);
}
if (key.isWritable()) { //8
SocketChannel client =
(SocketChannel)key.channel();
ByteBuffer buffer =
(ByteBuffer)key.attachment();
while (buffer.hasRemaining()) {
if (client.write(buffer) == 0) { //9
break;
}
}
client.close(); //10
}
} catch (IOException ex) {
key.cancel();
try {
key.channel().close();
} catch (IOException cex) {
// 在关闭时忽略
}
}
}
}
}
}
Netty
public class NettyOioServer {
public void server(int port) throws Exception {
final ByteBuf buf = Unpooled.unreleasableBuffer(
Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
EventLoopGroup group = new OioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); //1
b.group(group) //2
.channel(OioServerSocketChannel.class)
.localAddress(new InetSocketAddress(port))
.childHandler(new ChannelInitializer<SocketChannel>() {//3
@Override
public void initChannel(SocketChannel ch)
throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { //4
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
}
});
}
});
ChannelFuture f = b.bind().sync(); //6
f.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync(); //7
}
}
}
Channel
数据传输流,与channel相关的概念有以下四个,上一张图让你了解netty里面的Channel。
Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
ChannelHandler,核心处理业务就在这里,用于处理业务请求。
ChannelHandlerContext,用于传输业务数据。
ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
ByteBuf
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:
Heap Buffer 堆缓冲区
堆缓冲区是ByteBuf最常用的模式,他将数据存储在堆空间。
Direct Buffer 直接缓冲区
直接缓冲区是ByteBuf的另外一种常用模式,他的内存分配都不发生在堆,jdk1.4引入的nio的ByteBuffer类允许jvm通过本地方法调用分配内存,这样做有两个好处
通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.
Composite Buffer 复合缓冲区
复合缓冲区相当于多个不同ByteBuf的视图,这是netty提供的,jdk不提供这样的功能。
Codec
Netty中的编码/解码器,通过他你能完成字节与pojo、pojo与pojo的相互转换,从而达到自定义协议的目的。
在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了。
认识Http请求
在动手写Netty框架之前,我们先要了解http请求的组成,如下图:
HTTP Request 第一部分是包含的头信息
HttpContent 里面包含的是数据,可以后续有多个 HttpContent 部分
LastHttpContent 标记是 HTTP request 的结束,同时可能包含头的尾部信息
完整的 HTTP request,由1,2,3组成
HTTP response 第一部分是包含的头信息
HttpContent 里面包含的是数据,可以后续有多个 HttpContent 部分
LastHttpContent 标记是 HTTP response 的结束,同时可能包含头的尾部信息
完整的 HTTP response,由1,2,3组成
从request的介绍我们可以看出来,一次http请求并不是通过一次对话完成的,他中间可能有很次的连接。通过上一章我们队netty的了解,每一次对话都会建立一个channel,并且一个ChannelInboundHandler一般是不会同时去处理多个Channel的。
如何在一个Channel里面处理一次完整的Http请求?这就要用到我们上图提到的FullHttpRequest,我们只需要在使用netty处理channel的时候,只处理消息是FullHttpRequest的Channel,这样我们就能在一个ChannelHandler中处理一个完整的Http请求了。
什么是Decoder和Encoder
在学习Decoder和Encoder之前,首先要了解他们在具体是个什么东西。在Netty里面,有四个核心概念,这个在第一篇文章提到的,他们的分别是:
Channel,一个客户端与服务器通信的通道
ChannelHandler,业务逻辑处理器,分为ChannelInboundHandler和ChannelOutboundHandler
ChannelInboundHandler,输入数据处理器
ChannelOutboundHandler,输出业务处理器
通常情况下,业务逻辑都是存在于ChannelHandler之中
ChannelPipeline,用于存放ChannelHandler的容器
ChannelContext,通信管道的上下文
他们之间的交流流程如下图:
他们的交互流程是:
事件传递给 ChannelPipeline 的第一个 ChannelHandler
ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
ChannelHandler 通过关联的 ChannelHandlerContext 传递事件给 ChannelPipeline 中的 下一个
而我们本文所需要详细讲的Decoder和Encoder,他们分别就是ChannelInboundHandler和ChannelOutboundHandler,分别用于在数据流进来的时候将字节码转换为消息对象和数据流出去的时候将消息对象转换为字节码。
Encoder
Encoder最重要的实现类是MessageToByteEncoder<T>,这个类的作用就是将消息实体T从对象转换成byte,写入到ByteBuf,然后再丢给剩下的ChannelOutboundHandler传给客户端,流程图如下:
encode方法是继承MessageToByteEncoder唯一需要重写的方法,可见其简单程度。也是因为Encoder相比于Decoder更为简单,在这里也不多做赘述,直接上代码:
public class ShortToByteEncoder extends MessageToByteEncoder<Short> { //1
@Override
public void encode(ChannelHandlerContext ctx, Short msg, ByteBuf out)
throws Exception {
out.writeShort(msg); //2
}
}
Decoder
和Encoder一样,decoder就是在服务端收到数据的时候,将字节流转换为实体对象Message。但是和Encoder的处理逻辑不一样,数据传到服务端有可能不是一次请求就能完成的,中间可能需要经过几次数据传输,并且每一次传输传多少数据也是不确定的,所以它有两个重要方法:
decode和decodeLast的不同之处,在于他们的调用时机不同,正如描述所说,decodeLast只有在Channel的生命周期结束之前会调用一次,默认是调用decode方法。
同样是ToInteger的解码器,他的代码如下:
public class ToIntegerDecoder extends ByteToMessageDecoder { //1
@Override
public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
throws Exception {
if (in.readableBytes() >= 4) { //2
out.add(in.readInt()); //3
}
}
}
从这段代码可以看出,因为不知道这次请求发过来多少数据,所以每次都要判断byte长度够不够4,如果你的数据长度更长,且不固定的话,这里的逻辑会变得非常复杂。所以在这里介绍另一个我们常用的解码器 :ReplayingDecoder。
ReplayingDecoder
ReplayingDecoder 是 byte-to-message 解码的一种特殊的抽象基类,读取缓冲区的数据之前需要检查缓冲区是否有足够的字节,使用ReplayingDecoder就无需自己检查;若ByteBuf中有足够的字节,则会正常读取;若没有足够的字节则会停止解码。
RelayingDecoder在使用的时候需要搞清楚的两个方法是checkpoint(S s)和state(),其中checkpoint的参数S,代表的是ReplayingDecoder所处的状态,一般是枚举类型。RelayingDecoder是一个有状态的Handler,状态表示的是它目前读取到了哪一步,checkpoint(S s)是设置当前的状态,state()是获取当前的状态。
在这里我们模拟一个简单的Decoder,假设每个包包含length:int和content:String两个数据,其中length可以为0,代表一个空包,大于0的时候代表content的长度。代码如下:
public class LiveDecoder extends ReplayingDecoder<LiveDecoder.LiveState> { //1
public enum LiveState { //2
LENGTH,
CONTENT
}
private LiveMessage message = new LiveMessage();
public LiveDecoder() {
super(LiveState.LENGTH); // 3
}
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
switch (state()) { // 4
case LENGTH:
int length = byteBuf.readInt();
if (length > 0) {
checkpoint(LiveState.CONTENT); // 5
} else {
list.add(message); // 6
}
break;
case CONTENT:
byte[] bytes = new byte[message.getLength()];
byteBuf.readBytes(bytes);
String content = new String(bytes);
message.setContent(content);
list.add(message);
break;
default:
throw new IllegalStateException("invalid state:" + state());
}
}
}
继承ReplayingDecoder,泛型LiveState,用来表示当前读取的状态
描述LiveState,有读取长度和读取内容两个状态
初始化的时候设置为读取长度的状态
读取的时候通过state()方法来确定当前处于什么状态
如果读取出来的长度大于0,则设置为读取内容状态,下一次读取的时候则从这个位置开始
读取完成,往结果里面放解析好的数据
以上就是ReplayingDecoder的使用方法,他比ByteToMessageDecoder更加灵活,能够通过巧妙的方式来处理复杂的业务逻辑,但是也是因为这个原因,使得ReplayingDecoder带有一定的局限性:
不是所有的标准 ByteBuf 操作都被支持,如果调用一个不支持的操作会抛出 UnreplayableOperationException
ReplayingDecoder 略慢于 ByteToMessageDecoder
所以,如果不引入过多的复杂性 使用 ByteToMessageDecoder 。否则,使用ReplayingDecoder。
MessageToMessage
Encoder和Decoder除了能完成Byte和Message的相互转换之外,为了处理复杂的业务逻辑,还能帮助使用者完成Message和Message的相互转换,我们熟悉的Http协议的处理,其中就用到了很多MessageToMessage的派生类。
Netty如何实现长连接
一个简单的长连接demo分为以下几个步骤:
1.创建连接(Channel)
2.发心跳包
3.发消息,并通知其他用户
4.一段时间没收到心跳包或者用户主动关闭之后关闭连接
看似简单的步骤,里面有两个技术难点:
1.如何保存已创建的Channel
这里我们是将Channel放在一个Map中,以Channel.hashCode()作为key
其实这样做有一个劣势,就是不适合水平扩展,每个机器都有一个连接数的上线,如果需要实现多用户实时在线,对机器的数量要求会很高,在这里我们不多做讨论,不同的业务场景,设计方案也是不同的,可以在长连接方案和客户端轮询方案中进行选择。
2.如何自动关闭没有心跳的连接
Netty有一个比较好的Feature,就是ScheduledFuture,他可以通过ChannelHandlerContext.executor().schedule()创建,支持延时提交,也支持取消任务,这就给我们心跳包的自动关闭提供了一个很好的实现方案。
开始动手
首先,我们需要用一个JavaBean来封装通信的协议内容,在这里我们只需要三个数据就行了:
1.type : byte,表示消息的类型,有心跳类型和内容类型
2.length : int,表示消息的长度
3.content : String,表示消息的内容(心跳包在这里没有内容)
然后,因为我们需要将Channel和ScheduledFuture缓存在Map里面,所以需要将两个对象组合成一个JavaBean。
接着,需要完成输入输出流的解析和转换,我们需要重写Decoder和Encoder,
服务端:
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* Created by RoyDeng on 17/7/20.
*/
public class LiveHandler extends SimpleChannelInboundHandler<LiveMessage> { // 1
private static Map<Integer, LiveChannelCache> channelCache = new HashMap<>();
private Logger logger = LoggerFactory.getLogger(LiveHandler.class);
@Override
protected void channelRead0(ChannelHandlerContext ctx, LiveMessage msg) throws Exception {
Channel channel = ctx.channel();
final int hashCode = channel.hashCode();
System.out.println("channel hashCode:" + hashCode + " msg:" + msg + " cache:" + channelCache.size());
if (!channelCache.containsKey(hashCode)) {
System.out.println("channelCache.containsKey(hashCode), put key:" + hashCode);
channel.closeFuture().addListener(future -> {
System.out.println("channel close, remove key:" + hashCode);
channelCache.remove(hashCode);
});
ScheduledFuture scheduledFuture = ctx.executor().schedule(
() -> {
System.out.println("schedule runs, close channel:" + hashCode);
channel.close();
}, 10, TimeUnit.SECONDS);
channelCache.put(hashCode, new LiveChannelCache(channel, scheduledFuture));
}
switch (msg.getType()) {
case LiveMessage.TYPE_HEART: {
LiveChannelCache cache = channelCache.get(hashCode);
ScheduledFuture scheduledFuture = ctx.executor().schedule(
() -> channel.close(), 5, TimeUnit.SECONDS);
cache.getScheduledFuture().cancel(true);
cache.setScheduledFuture(scheduledFuture);
ctx.channel().writeAndFlush(msg);
break;
}
case LiveMessage.TYPE_MESSAGE: {
channelCache.entrySet().stream().forEach(entry -> {
Channel otherChannel = entry.getValue().getChannel();
otherChannel.writeAndFlush(msg);
});
break;
}
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelReadComplete");
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.debug("exceptionCaught");
if(null != cause) cause.printStackTrace();
if(null != ctx) ctx.close();
}
}
2.客户端
package com.dz.test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Scanner;
public class LongConnection {
private Logger logger = LoggerFactory.getLogger(LongConnection.class);
String host = "localhost";
int port = 8080;
public void testLongConnection() throws Exception {
logger.debug("start");
final Socket socket = new Socket();
socket.connect(new InetSocketAddress(host, port));
Scanner scanner = new Scanner(System.in);
new Thread(() -> {
while (true) {
try {
byte[] input = new byte[64];
int readByte = socket.getInputStream().read(input);
logger.debug("readByte " + readByte);
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
int code;
while (true) {
code = scanner.nextInt();
logger.debug("input code:" + code);
if (code == 0) {
break;
} else if (code == 1) {
ByteBuffer byteBuffer = ByteBuffer.allocate(5);
byteBuffer.put((byte) 1);
byteBuffer.putInt(0);
socket.getOutputStream().write(byteBuffer.array());
logger.debug("write heart finish!");
} else if (code == 2) {
byte[] content = ("hello, I'm" + hashCode()).getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(content.length + 5);
byteBuffer.put((byte) 2);
byteBuffer.putInt(content.length);
byteBuffer.put(content);
socket.getOutputStream().write(byteBuffer.array());
logger.debug("write content finish!");
}
}
socket.close();
}
// 因为Junit不支持用户输入,所以用main的方式来执行用例
public static void main(String[] args) throws Exception {
new LongConnection().testLongConn();
}
}