最近在项目的开发中,碰到了这样一个需求:需要在长连接的心跳发送时执行一些业务上的逻辑。那么,问题就在于如何在现有的长连接的基础上,以尽可能小的改动,实现这个需求。故事也就由此开始了。
确定okhttp是否有提供相应的API
首先肯定是要确定okhttp中是否有类似的API可以使用,或者是否可以通过更新版本来解决这个问题。刚好,我找到了GitHub中有人提出了类似的问题,可以来看看官方的说法:
WebSocket ping logic is not customizable · Issue #3197 · square/okhttp
可以看到,开发者明确表示了并不希望让应用层自定义ping方法的逻辑,那么看来只能另想办法了。
okhttp中的心跳的使用方法与实现原理
首先,我来简单梳理一下okhttp中心跳的实现原理,如果只是想要解决方法的朋友可以直接跳过这一部分。
在okhttp中,实现心跳的方式非常简单,只需要在OkHttpClient创建时添加相应的配置即可:
OkHttpClient.Builder()
.pingInterval(HEART_BEAT_RATE, TimeUnit.SECONDS)
.build()
那么具体的心跳逻辑是如何实现的呢,一起来看看具体的代码细节。
//OkHttpClient.java
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
webSocket.connect(this);
return webSocket;
}
//RealWebSocket.java
public RealWebSocket(Request request, WebSocketListener listener, Random random,long pingIntervalMillis) {
//...
this.pingIntervalMillis = pingIntervalMillis;
//...
}
public void initReaderAndWriter(String name, Streams streams) throws IOException {
synchronized (this) {
//...
this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
if (pingIntervalMillis != 0) {
executor.scheduleAtFixedRate(
new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
}
//...
}
}
private final class PingRunnable implements Runnable {
@Override public void run() {
writePingFrame();
}
}
void writePingFrame() {
//...
try {
writer.writePing(ByteString.EMPTY);
} catch (IOException e) {
failWebSocket(e, null);
}
//...
}
//WebSocketWriter.java
void writePing(ByteString payload) throws IOException {
writeControlFrame(OPCODE_CONTROL_PING, payload);
}
上面的代码就是ping的主要发送逻辑了,简单总结一下就是如果pingInterval不为0,那就开启一个的循环任务,定时的去发送代表ping的ControlFrame。
其中值得一提的就是ControlFrame这个概念,在WebSocket中的frame分为两类,一类叫做MessageFrame,也就是平时客户端与服务端互相通信的部分。另一类叫做ControlFrame,其中包括CONTROL_PING,CONTROL_PONG,CONTROL_CLOSE,可以看出这一类更偏重与功能性的方面。具体为哪一类的Frame可以在Header中进行区分。
上面已经介绍了心跳的发送逻辑,那么下面就轮到接收的逻辑了,还是先来看看代码:
//RealWebSocket.java
public void loopReader() throws IOException {
while (receivedCloseCode == -1) {
// This method call results in one or more onRead* methods being called on this thread.
reader.processNextFrame();
}
}
//WebSocketReader.java
void processNextFrame() throws IOException {
readHeader();
if (isControlFrame) {
readControlFrame();
} else {
readMessageFrame();
}
}
private void readControlFrame() throws IOException {
//...
switch (opcode) {
case OPCODE_CONTROL_PING:
frameCallback.onReadPing(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_PONG:
frameCallback.onReadPong(controlFrameBuffer.readByteString());
break;
case OPCODE_CONTROL_CLOSE:
//...
default:
throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
}
}
可以看到,接收的部分逻辑也很简单,就是通过一个循环去读取,如果接收到了消息,那就先通过header确定frame的类型,然后再分类进行处理。
而且值得注意的是,上面代码中出现了一个frameCallback的对象,而这个对象是WebSocketReader.FrameCallback这个接口的实现,而里面的onReadPing和onReadPong就是我们之后能够做文章的地方了。
WebSocketReader.FrameCallback
public interface FrameCallback {
void onReadMessage(String text) throws IOException;
void onReadMessage(ByteString bytes) throws IOException;
void onReadPing(ByteString buffer);
void onReadPong(ByteString buffer);
void onReadClose(int code, String reason);
}
添加回调的具体实现
在上面的源码分析中,我们注意到了WebSocketReader.FrameCallback这个接口,如果我们能够自己实现这个接口,并且注入到websocket的reader中,那么这个需求不就实现了吗。
那么我们再来看看reader中的frameCallback按照原来的逻辑应该是个什么东西:
//RealWebSocket.java
reader = new WebSocketReader(streams.client, streams.source, this);
//WebSocketReader.java
WebSocketReader(boolean isClient, BufferedSource source, FrameCallback frameCallback) {
//...
this.frameCallback = frameCallback;
//...
}
原来frameCallback就是RealWebSocket,而我们所持有的webSocket正是RealWebSocket的对象,那么只需要做一个静态代理,然后通过反射将reader替换为我们自己的实现就可以了:
private fun replaceReaderCallBack() {
val wsClass = webSocket!!.javaClass
val callbackClass = wsClass.interfaces.find { it.name.contains("FrameCallback") } ?: return
val readerField = wsClass.getDeclaredField("reader")
readerField.isAccessible = true
val reader = readerField.get(webSocket)
val callbackInstance = Proxy.newProxyInstance(reader.javaClass.classLoader, arrayOf(callbackClass)) { proxy, method, args ->
when (method?.name) {
"onReadMessage" -> {
if (args!![0] is String) {
webSocket?.onReadMessage(args[0] as String)
} else {
webSocket?.onReadMessage(args[0] as ByteString)
}
}
"onReadPing" -> { webSocket?.onReadPing(args!![0] as ByteString) }
"onReadPong" -> { webSocket?.onReadPong(args!![0] as ByteString) }
"onReadClose" -> { webSocket?.onReadClose(args!![0] as Int, args[1] as String) }
}
0
}
reader.javaClass.getDeclaredField("frameCallback").apply {
isAccessible = true
set(reader, callbackInstance)
}
}
至此,回调已经添加完成,只需要在对应的回调中补上自己的业务逻辑,然后在websocket创建完成之后调用一下这个方法就完成了。