前言
雨露均沾的OkHttp—WebSocket长连接(使用篇)
雨露均沾的OkHttp—WebSocket长连接(源码篇)
上期我们熟悉了OkHttp中实现WebSocket
长连接的接入,并且可以通过OkHttp
官方的MockWebSocket
服务来模拟服务端,实现整个流程。
今天我们就来说下具体OkHttp
中是怎么实现这些功能的呢?相信看过这篇文章你也能深刻了解WebSocket
这个协议。
使用回顾
简单贴下WebSocket
使用方法,方便下面解析:
//初始化
mClient = new OkHttpClient.Builder()
.pingInterval(10, TimeUnit.SECONDS)
.build();
Request request = new Request.Builder()
.url(mWbSocketUrl)
.build();
mWebSocket = mClient.newWebSocket(request, new WsListener());
//收到消息回调
@Override
public void onMessage(@NotNull WebSocket webSocket, @NotNull String text) {
super.onMessage(webSocket, text);
Log.e(TAG,"收到消息!");
onWSDataChanged(DATE_NORMAL, text);
}
//发送消息
mWebSocket.send(message);
//主动关闭连接
mWebSocket.close(code, reason);
源码解析
WebSocket
整个流程无非三个功能:连接,接收消息,发送消息。下面我们就从这三个方面
分析下具体是怎么实现的。
连接
通过上面的代码我们得知,WebSocket
连接是通过newWebSocket
方法。直接点进去看这个方法:
override fun newWebSocket(request: Request, listener: WebSocketListener): WebSocket {
val webSocket = RealWebSocket(
taskRunner = TaskRunner.INSTANCE,
originalRequest = request,
listener = listener,
random = Random(),
pingIntervalMillis = pingIntervalMillis.toLong(),
extensions = null, // Always null for clients.
minimumDeflateSize = minWebSocketMessageToCompress
)
webSocket.connect(this)
return webSocket
}
这里做了两件事:
- 初始化
RealWebSocket
,主要是设置了一些参数(比如pingIntervalMillis
心跳包时间间隔,还有监听事件之类的) -
connect
方法进行WebSocket
连接
继续查看connect方法:
connect(WebSocket连接握手)
fun connect(client: OkHttpClient) {
//***
val webSocketClient = client.newBuilder()
.eventListener(EventListener.NONE)
.protocols(ONLY_HTTP1)
.build()
val request = originalRequest.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Extensions", "permessage-deflate")
.build()
call = RealCall(webSocketClient, request, forWebSocket = true)
call!!.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
//得到数据流
val streams: Streams
try {
checkUpgradeSuccess(response, exchange)
streams = exchange!!.newWebSocketStreams()
}
//***
// Process all web socket messages.
try {
val name = "$okHttpName WebSocket ${request.url.redact()}"
initReaderAndWriter(name, streams)
listener.onOpen(this@RealWebSocket, response)
loopReader()
} catch (e: Exception) {
failWebSocket(e, null)
}
}
})
}
上一篇使用篇文章中说过,Websocket
连接需要一次Http
协议的握手,然后才能把协议升级成WebSocket
。所以这段代码就体现出这个功能了。
首先就new
了一个用来进行Http
连接的request
,其中Header
的参数就表示我要进行WebSocket
连接了,参数解析如下:
-
Connection:Upgrade
,表示客户端要连接升级 -
Upgrade:websocket
, 表示客户端要升级建立Websocket连接 -
Sec-Websocket-Key:key
, 这个key是随机生成的,服务器会通过这个参数验证该请求是否有效 -
Sec-WebSocket-Version:13
, websocket使用的版本,一般就是13 -
Sec-webSocket-Extension:permessage-deflate
,客户端指定的一些扩展协议,比如这里permessage-deflate
就是WebSocket
的一种压缩协议。
Header
设置好之后,就调用了call
的enqueue
方法,这个方法大家应该都很熟悉吧,OkHttp
里面对于Http
请求的异步请求就是这个方法。
至此,握手结束,服务器返回响应码101
,表示协议升级。
然后我们继续看看获取服务器响应之后又做了什么?
在发送Http
请求成功之后,onResponse
响应方法里面主要表现为四个处理逻辑:
- 将
Http
流转换成WebSocket
流,得到Streams
对象,这个流后面会转化成输入流和输出流,也就是进行发送和读取的操作流 -
listener.onOpen(this@RealWebSocket, response)
,回调了接口WebSocketListener
的onOpen
方法,告诉用户WebSocket
已经连接 initReaderAndWriter(name, streams)
loopReader()
前两个逻辑还是比较好理解,主要是后两个方法,我们分别解析下。
首先看initReaderAndWriter
方法。
initReaderAndWriter(初始化输入流输出流)
//RealWebSocket.kt
@Throws(IOException::class)
fun initReaderAndWriter(name: String, streams: Streams) {
val extensions = this.extensions!!
synchronized(this) {
//***
//写数据,发送数据的工具类
this.writer = WebSocketWriter()
//设置心跳包事件
if (pingIntervalMillis != 0L) {
val pingIntervalNanos = MILLISECONDS.toNanos(pingIntervalMillis)
taskQueue.schedule("$name ping", pingIntervalNanos) {
writePingFrame()
return@schedule pingIntervalNanos
}
}
//***
}
//***
//读取数据的工具类
reader = WebSocketReader(
***
frameCallback = this,
***
)
}
internal fun writePingFrame() {
//***
try {
writer.writePing(ByteString.EMPTY)
} catch (e: IOException) {
failWebSocket(e, null)
}
}
这个方法主要干了两件事:
- 实例化输出流输入流工具类,也就是
WebSocketWriter
和WebSocketReader
,用来处理数据的收发。 - 设置心跳包事件。如果
pingIntervalMillis
参数不为0,就通过计时器,每隔pingIntervalNanos
发送一个ping
消息。其中writePingFrame
方法就是发送了ping
帧数据。
接收消息处理消息
loopReader
接着看看这个loopReader
方法是干什么的,看这个名字我们大胆猜测下,难道这个方法就是用来循环读取数据的?去代码里找找答案:
fun loopReader() {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader!!.processNextFrame()
}
}
代码很简单,一个while
循环,循环条件是receivedCloseCode == -1
的时候,做的事情是reader!!.processNextFrame()
方法。继续:
//WebSocketWriter.kt
fun processNextFrame() {
//读取头部信息
readHeader()
if (isControlFrame) {
//如果是控制帧,读取控制帧内容
readControlFrame()
} else {
//读取普通消息内容
readMessageFrame()
}
}
//读取头部信息
@Throws(IOException::class, ProtocolException::class)
private fun readHeader() {
if (closed) throw IOException("closed")
try {
//读取数据,获取数据帧的前8位
b0 = source.readByte() and 0xff
} finally {
source.timeout().timeout(timeoutBefore, TimeUnit.NANOSECONDS)
}
//***
//获取数据帧的opcode(数据格式)
opcode = b0 and B0_MASK_OPCODE
//是否为最终帧
isFinalFrame = b0 and B0_FLAG_FIN != 0
//是否为控制帧(指令)
isControlFrame = b0 and OPCODE_FLAG_CONTROL != 0
//判断最终帧,获取帧长度等等
}
//读取控制帧(指令)
@Throws(IOException::class)
private fun readControlFrame() {
if (frameLength > 0L) {
source.readFully(controlFrameBuffer, frameLength)
}
when (opcode) {
OPCODE_CONTROL_PING -> {
//ping 帧
frameCallback.onReadPing(controlFrameBuffer.readByteString())
}
OPCODE_CONTROL_PONG -> {
//pong 帧
frameCallback.onReadPong(controlFrameBuffer.readByteString())
}
OPCODE_CONTROL_CLOSE -> {
//关闭 帧
var code = CLOSE_NO_STATUS_CODE
var reason = ""
val bufferSize = controlFrameBuffer.size
if (bufferSize == 1L) {
throw ProtocolException("Malformed close payload length of 1.")
} else if (bufferSize != 0L) {
code = controlFrameBuffer.readShort().toInt()
reason = controlFrameBuffer.readUtf8()
val codeExceptionMessage = WebSocketProtocol.closeCodeExceptionMessage(code)
if (codeExceptionMessage != null) throw ProtocolException(codeExceptionMessage)
}
//回调onReadClose方法
frameCallback.onReadClose(code, reason)
closed = true
}
}
}
//读取普通消息
@Throws(IOException::class)
private fun readMessageFrame() {
readMessage()
if (readingCompressedMessage) {
val messageInflater = this.messageInflater
?: MessageInflater(noContextTakeover).also { this.messageInflater = it }
messageInflater.inflate(messageFrameBuffer)
}
if (opcode == OPCODE_TEXT) {
frameCallback.onReadMessage(messageFrameBuffer.readUtf8())
} else {
frameCallback.onReadMessage(messageFrameBuffer.readByteString())
}
}
代码还是比较直观,这个processNextFrame
其实就是读取数据用的,首先读取头部信息,获取数据帧的类型,判断是否为控制帧,再分别去读取控制帧数据或者普通消息帧数据。
数据帧格式
问题来了,什么是数据头部信息,什么是控制帧?
这里就要说下WebSocket
的数据帧了,先附上一个数据帧格式:
0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+-+-+-+-+-------+ +-+-------------+ +-----------------------------+
|F|R|R|R| OP | |M| LENGTH | Extended payload length
|I|S|S|S| CODE | |A| | (if LENGTH=126)
|N|V|V|V| | |S| |
| |1|2|3| | |K| |
+-+-+-+-+-------+ +-+-------------+
| Extended payload length(if LENGTH=127)
+ +-------------------------------
| Extended payload length | Masking-key,if Mask set to 1
+----------------------------------+-------------------------------
| Masking-key | Data
+----------------------------------+-------------------------------
| Data
+----------------------------------+-------------------------------
我承认,我懵逼了。
冷静冷静,一步一步分析下吧。
首先每一行代表4个字节,一共也就是32位数,哦,那也就是几个字节而已嘛,每个字节有他自己的代表意义呗,这样想是不是就很简单了,下面来具体看看每个字节。
第1个字节:
- 第一位是
FIN码
,其实就是一个标示位,因为数据可能多帧操作嘛,所以多帧情况下,只有最后一帧的FIN
设置成1,标示结束帧,前面所有帧设置为0。 - 第二位到第四位是
RSV码
,一般通信两端没有设置自定义协议,就默认为0。 - 后四位是
opcode
,我们叫它操作码。这个就是判断这个数据帧的类型了,一般有以下几个被定义好的类型:
1) 0x0
表示附加数据帧
2) 0x1
表示文本数据帧
3) 0x2
表示二进制数据帧
4) 0x3-7
保留用于未来的非控制帧
5) 0x8
表示连接关闭
6) 0x9
表示ping
7) 0xA
表示pong
8) 0xB-F
保留用于未来的非控制帧
是不是发现了些什么,这不就对应了我们应用中的几种格式吗?2和3
对应的是普通消息帧,包括了文本和二进制数据。567
对应的就是控制帧格式,包括了close,ping,pong
。
第2个字节:
- 第一位是
Mask
掩码,其实就是标识数据是否加密混淆,1代表数据经过掩码的,0是没有经过掩码的,如果是1的话,后续就会有4个字节代表掩码key
,也就是数据帧中Masking-key
所处的位置。 - 后7位是
LENGTH
,用来标示数据长度。因为只有7位,所以最大只能储存1111111对应的十进制数127长度
的数据,如果需要更大的数据,这个储存长度肯定就不够了。
所以规定来了,1)小于126长度
则数据用这七位表示实际长度。2) 如果长度设置为126
,也就是二进制1111110,就代表取额外2个字节
表示数据长度,共是16位表示数据长度。3) 如果长度设置为127
,也就是二进制1111111,就代表取额外8个字节
,共是64位表示数据长度。
需要注意的是LENGHT的三种情况在一个数据帧里面只会出现一种情况,不共存,所以在图中是用if表示。同样的,Masking-key也是当Mask为1的时候才存在。
所以也就有了数据帧里面的Extended payload length(LENGTH=126)
所处的2个字节,以及Extended payload length(LENGTH=127)
所处的8个字节。
最后的字节部分自然就是掩码key
(Mask为1的时候才存在)和具体的传输数据
了。
还是有点晕吧😷,来张图总结下:
好了,了解了数据帧格式后,我们再来读源码就清晰多了。
先看看怎么读的头部信息
并解析的:
//取数据帧前8位数据
b0 = source.readByte() and 0xff
//获取数据帧的opcode(数据格式)
opcode = b0 and B0_MASK_OPCODE(15)
//是否为最终帧
isFinalFrame = b0 and B0_FLAG_FIN(128) != 0
//是否为控制帧(指令)
isControlFrame = b0 and OPCODE_FLAG_CONTROL(8) != 0
- 第一句获取头信息,
and
是按位与计算,and 0xff
意思就是按位与11111111,所以头部信息其实就是取了数据帧的前8位数据
,一个字节。 - 第二句获取
opcode
,and 15
也就是按位与00001111,其实也就是取了后四位数据,刚好对应上opcode
的位置,第一个字节的后四位。 - 第三句获取是否为
最终帧
,刚才数据帧格式中说过,第一位FIN
标识了是否为最后一帧数据,1代表结束帧,所以这里and 128
也就是按位与10000000,也就是取的第一位数。 - 第四句获取是否为控制帧,
and 8
也就是按位与00001000,取得是第五位,也就是opcode
的第一位,这是什么意思呢?我们看看刚才的数据帧格式,发现从0x8
开始就是所谓的控制帧了。0x8
对应的二进制是1000,0x7
对应的二进制是0111。发现了吧,如果为控制帧的时候,opcode
第一位肯定是为1的,所以这里就判断的第五位。
后面还有读取第二个字节的代码,大家可以自己沿着这个思路自己看看,包括了读取MASK
,读取数据长度的三种长度等。
所以这个processNextFrame
方法主要做了三件事:
-
readHeader
方法中,判断了是否为控制帧,是否为结束帧
,然后获取了Mask
标识,帧长度等参数 -
readControlFrame
方法中,主要处理了该帧数据为ping,pong,close
三种情况,并且在收到close关闭帧
的情况下,回调了onReadClose
方法,这个待会要细看下。 -
readMessageFrame
方法中,主要是读取了消息后,回调了onReadMessage方法。
至此可以发现,其实WebSocket
传输数据并不是一个简单的事,只是OkHttp
都帮我们封装好了,我们只需要直接传输数据即可,感谢这些三方库为我们开发作出的贡献,不知道什么时候我也能做出点贡献呢🤔。
对了,刚才说回调也很重要,接着看看。onReadClose
和onReadMessage
回调到哪了呢?还记得上文初始化WebSocketWriter
的时候设置了回调接口吗。所以就是回调给RealWebSocket
了:
//RealWebSocket.kt
override fun onReadClose(code: Int, reason: String) {
require(code != -1)
var toClose: Streams? = null
var readerToClose: WebSocketReader? = null
var writerToClose: WebSocketWriter? = null
synchronized(this) {
check(receivedCloseCode == -1) { "already closed" }
receivedCloseCode = code
receivedCloseReason = reason
//...
}
try {
listener.onClosing(this, code, reason)
if (toClose != null) {
listener.onClosed(this, code, reason)
}
} finally {
toClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}
@Throws(IOException::class)
override fun onReadMessage(text: String) {
listener.onMessage(this, text)
}
@Throws(IOException::class)
override fun onReadMessage(bytes: ByteString) {
listener.onMessage(this, bytes)
}
onReadClose
回调方法里面有个关键的参数,receivedCloseCode
。还记得这个参数吗?上文中解析消息的循环条件就是receivedCloseCode == -1
,所以当收到关闭帧的时候,receivedCloseCode
就不再等于-1(规定大于1000),也就不再去读取解析消息了。这样整个流程就结束了。
其中还有一些WebSocketListener
的回调,比如onClosing,onClosed,onMessage
等,就直接回调给用户使用了。至此,接收消息处理消息说完了。
发消息
好了。接着说发送,看看send
方法:
@Synchronized private fun send(data: ByteString, formatOpcode: Int): Boolean {
// ***
// Enqueue the message frame.
queueSize += data.size.toLong()
messageAndCloseQueue.add(Message(formatOpcode, data))
runWriter()
return true
}
首先,把要发送的data
封装成Message
对象,然后入队列messageAndCloseQueue
。最后执行runWriter
方法。这都不用猜了,runWriter
肯定就要开始发送消息了,继续看:
//RealWebSocket.kt
private fun runWriter() {
this.assertThreadHoldsLock()
val writerTask = writerTask
if (writerTask != null) {
taskQueue.schedule(writerTask)
}
}
private inner class WriterTask : Task("$name writer") {
override fun runOnce(): Long {
try {
if (writeOneFrame()) return 0L
} catch (e: IOException) {
failWebSocket(e, null)
}
return -1L
}
}
//以下是schedule方法转到WriterTask的runOnce方法过程
//TaskQueue.kt
fun schedule(task: Task, delayNanos: Long = 0L) {
synchronized(taskRunner) {
if (scheduleAndDecide(task, delayNanos, recurrence = false)) {
taskRunner.kickCoordinator(this)
}
}
}
internal fun scheduleAndDecide(task: Task, delayNanos: Long, recurrence: Boolean): Boolean {
//***
if (insertAt == -1) insertAt = futureTasks.size
futureTasks.add(insertAt, task)
// Impact the coordinator if we inserted at the front.
return insertAt == 0
}
//TaskRunner.kt
internal fun kickCoordinator(taskQueue: TaskQueue) {
this.assertThreadHoldsLock()
if (taskQueue.activeTask == null) {
if (taskQueue.futureTasks.isNotEmpty()) {
readyQueues.addIfAbsent(taskQueue)
} else {
readyQueues.remove(taskQueue)
}
}
if (coordinatorWaiting) {
backend.coordinatorNotify(this@TaskRunner)
} else {
backend.execute(runnable)
}
}
private val runnable: Runnable = object : Runnable {
override fun run() {
while (true) {
val task = synchronized(this@TaskRunner) {
awaitTaskToRun()
} ?: return
logElapsed(task, task.queue!!) {
var completedNormally = false
try {
runTask(task)
completedNormally = true
} finally {
// If the task is crashing start another thread to service the queues.
if (!completedNormally) {
backend.execute(this)
}
}
}
}
}
}
private fun runTask(task: Task) {
try {
delayNanos = task.runOnce()
}
}
代码有点长,这里是从runWriter
开始跟的几个方法,拿到writerTask
实例后,存到TaskQueue
的futureTasks列表
里,然后到runnable
这里可以看到是一个while
死循环,不断的从futureTasks
中取出Task
并执行runTask
方法,直到Task
为空,循环停止。
其中涉及到两个新的类:
-
TaskQueue类
主要就是管理消息任务列表,保证按顺序执行 -
TaskRunner类
主要就是做一些任务的具体操作,比如线程池里执行任务,记录消息任务的状态(准备发送的任务队列readyQueues
,正在执行的任务队列busyQueues
等等)
而每一个Task最后都是执行到了WriterTask
的runOnce
方法,也就是writeOneFrame
方法:
internal fun writeOneFrame(): Boolean {
synchronized(this@RealWebSocket) {
if (failed) {
return false // Failed web socket.
}
writer = this.writer
pong = pongQueue.poll()
if (pong == null) {
messageOrClose = messageAndCloseQueue.poll()
if (messageOrClose is Close) {
} else if (messageOrClose == null) {
return false // The queue is exhausted.
}
}
}
//发送消息逻辑,包括`pong`消息,普通消息,关闭消息
try {
if (pong != null) {
writer!!.writePong(pong)
} else if (messageOrClose is Message) {
val message = messageOrClose as Message
writer!!.writeMessageFrame(message.formatOpcode, message.data)
synchronized(this) {
queueSize -= message.data.size.toLong()
}
} else if (messageOrClose is Close) {
val close = messageOrClose as Close
writer!!.writeClose(close.code, close.reason)
// We closed the writer: now both reader and writer are closed.
if (streamsToClose != null) {
listener.onClosed(this, receivedCloseCode, receivedCloseReason!!)
}
}
return true
} finally {
streamsToClose?.closeQuietly()
readerToClose?.closeQuietly()
writerToClose?.closeQuietly()
}
}
这里就会执行发送消息的逻辑了,主要有三种消息情况处理:
-
pong消息
,这个主要是为服务器端准备的,发送给客户端回应心跳包。 -
普通消息
,就会把数据类型Opcode
和具体数据发送过去 -
关闭消息
,其实当用户执行close
方法关闭WebSocket
的时候,也是发送了一条Close控制帧
消息给服务器告知这个关闭需求,并带上code状态码
和reason关闭原因
,然后服务器端就会关闭当前连接。
好了。最后一步了,就是把这些数据组装成WebSocket
数据帧并写入流,分成控制帧
数据和普通消息数据帧
:
//写入(发送)控制帧
private fun writeControlFrame(opcode: Int, payload: ByteString) {
if (writerClosed) throw IOException("closed")
val length = payload.size
require(length <= PAYLOAD_BYTE_MAX) {
"Payload size must be less than or equal to $PAYLOAD_BYTE_MAX"
}
val b0 = B0_FLAG_FIN or opcode
sinkBuffer.writeByte(b0)
var b1 = length
if (isClient) {
b1 = b1 or B1_FLAG_MASK
sinkBuffer.writeByte(b1)
random.nextBytes(maskKey!!)
sinkBuffer.write(maskKey)
if (length > 0) {
val payloadStart = sinkBuffer.size
sinkBuffer.write(payload)
sinkBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(payloadStart)
toggleMask(maskCursor, maskKey)
maskCursor.close()
}
} else {
sinkBuffer.writeByte(b1)
sinkBuffer.write(payload)
}
sink.flush()
}
//写入(发送)普通消息数据帧
@Throws(IOException::class)
fun writeMessageFrame(formatOpcode: Int, data: ByteString) {
if (writerClosed) throw IOException("closed")
messageBuffer.write(data)
var b0 = formatOpcode or B0_FLAG_FIN
val dataSize = messageBuffer.size
sinkBuffer.writeByte(b0)
var b1 = 0
if (isClient) {
b1 = b1 or B1_FLAG_MASK
}
when {
dataSize <= PAYLOAD_BYTE_MAX -> {
b1 = b1 or dataSize.toInt()
sinkBuffer.writeByte(b1)
}
dataSize <= PAYLOAD_SHORT_MAX -> {
b1 = b1 or PAYLOAD_SHORT
sinkBuffer.writeByte(b1)
sinkBuffer.writeShort(dataSize.toInt())
}
else -> {
b1 = b1 or PAYLOAD_LONG
sinkBuffer.writeByte(b1)
sinkBuffer.writeLong(dataSize)
}
}
if (isClient) {
random.nextBytes(maskKey!!)
sinkBuffer.write(maskKey)
if (dataSize > 0L) {
messageBuffer.readAndWriteUnsafe(maskCursor!!)
maskCursor.seek(0L)
toggleMask(maskCursor, maskKey)
maskCursor.close()
}
}
sinkBuffer.write(messageBuffer, dataSize)
sink.emit()
}
大家应该都能看懂了吧,其实就是组装数据帧,包括Opcode,mask,数据长度
等等。两个方法的不同就在于普通数据需要判断数据长度的三种情况,再组装数据帧。最后都会通过sinkBuffer
写入到输出数据流。
终于,基本的流程说的差不多了。其中还有很多细节,同学们可以自己花时间看看琢磨琢磨,比如Okio
部分。还是那句话,希望大家有空自己也读一读相关源码,这样理解才能深刻,而且你肯定会发现很多我没说到的细节,欢迎大家讨论。我也会继续努力,最后大家给我加个油点个赞吧,感谢感谢。
总结
再来个图总结下吧!🎉
参考
你的一个👍,就是我分享的动力❤️。