【Android WebSocket】状态管理框架 - WebSocketGo

前言

为了方便大家更加了解故事的背景,顺便科普一下智能家居,想直奔WebSocket主题的童鞋可以直接第二章节

智能家居,算是物联网的一个典型应用场景。什么是物联网呢,字面意思就是把众多物体连接起来组成一个网络,英文是Internet of things(IoT)。小到一个你的手机跟蓝牙耳机,大到一个城市的各个角落,究极形态就是“万物互联”。(为什么突然想起万佛朝宗 -_-!)

IoT其实不在意网络的协议,也不在意连接的到底是什么东东,就这种形态本身而言,就是IoT。其实这个概念其实很早就有了,在笔者上大学那会,不怕暴露年龄的说是在09到10年左右就听说过物联网这个词。还记得实验课的GPRS智能抄表系统么,你可能没有觉得多么高大上,不就是水表上插个SIM卡,定期把值发到客户端上么,再也不怕被人上门查水表啦~ 没错,这也是IoT的一种体现。

但这么多年,物联网但一直不温不火。至于为什么最近几年又被提出来呢,智能家居在其中扮演了很重要的作用。另外AI这把火也起到了推波助澜的作用,任何产品都要加上智能二字。所以有了AI + IoT,也就是AIOT。具体就不详述了,以后可以再单开一文详细介绍下物联网。

说回智能家居,智能家居是家庭为单位的物联网,简单说就是你可以通过家里中控或者App控制制家里的灯、插座、监控、电器等等。

简单解释一下,网关的作用是负责连接屋内的所有设备,网关和设备之间一般不会采用HTTP(个别单品除外),而会采用比如zigbee、lora等近场通信协议(或者直接用有线方式更稳定),因为功耗低,比较省电,你想家里如果装了几十个开关面板,总归每个月也能节约十几二十块钱的电费吧。当然最重要的原因还是因为方案也比较成熟。所以使用这类设备之前需要有个“组网”的操作,把设备组到网关上。如果断开了,网关就认为设备离线了。

网关会将设备的状态上报给云平台,云平台就将状态下发给客户端了。同样,客户端控制设备就是个反向的过程。当然如果以后5G普及了,设备直接通过5G网络连接云平台,因为5G功耗低、延迟低、速度快,就可以不需要网关了。

整体的架构还算是比较简单的,当然中间也有很多复杂的逻辑,涉及到比如设备状态、人员、权限、房屋的管理等等这里就不关注了。

在服务器下发设备状态的时候就涉及到了推送。因为智能家居有情景模式的概念,比如回家模式,执行一个请求,咔咔打开十几个灯,所以靠请求的返回值判断状态是不太现实的,只能依靠推送。

由于业务的特殊性,智能家居的推送需要有较高的实时性,比如用户打开了灯(无论在app上打开或者直接打开),app上的灯的状态都需要立即变成开。如果等个3、5秒状态再发生改变,这样的用户体验很不好。

我们最早使用的是某光推送,发现延迟严重,有时需要等十几秒才收到推送,毕竟适用场景毕竟不一样。所以决定自建长连接当推送。经过选型,决定采用本文的主角,WebSocket。

WebSocket状态管理现存的问题

市面上有很多现成的WebSocket连接库,比较著名的有Java-WebSocket,OkHttp也自带WebSocket支持。

最初因为项目内已经接入了OkHttp,所以直接使用了OkHttp。使用的方式很简单,熟悉OkHttp的童鞋应该很懂,

OkHttpClient client = OkHttpClient.Builder().build();
Request request = new Request.Builder().build();
client.newWebSocket(request, new WebSocketListener() {
    @Override
    public void onOpen(okhttp3.WebSocket webSocket, Response response) {}

    @Override
    public void onMessage(okhttp3.WebSocket webSocket, String text) {}

    @Override
    public void onClosed(okhttp3.WebSocket webSocket, int code, String reason) {}

    @Override
    public void onFailure(okhttp3.WebSocket webSocket, Throwable t, Response response) {}
});
复制代码 

调用方便,回调状态也很清晰。Java-Websocket也差不多类似,但总体来说有以下几个问题:

1. 心跳机制

无论OkHttp还是Java-WebSocket,都是有心跳机制的。但OkHttp的心跳间隔,也就是pingInterval,在创建client时候就已经固定了,是不支持中途调整的。(Jake大神还在github上回复过相关问题,意思是调用者不需要关注这些。但往往确实有这种需求的啊😢😢😢)

比如应用在前台的时候心跳可能稍微频繁点,但应用出于后台时出于省电的优化,心跳间隔可以设置的稍微长一点。如果需要调整心跳,需要创建新的client,断掉旧的,重新连接。有两种策略:

  • 先断旧的,再连新的。如果服务端的消息没有做缓存策略,在断掉和新连接的间隔可能会漏掉消息。
  • 先连新的,再断旧的。这要在一瞬时就会有两个连接连上服务端,服务端对于消息的状态可能会发生异常。

以上两种策略都需要客户端和服务端先约定好,服务端做额外处理,增加研发成本不说,也有增加出错概率。

2. 断掉重连

通常连接断掉有以下这么几种情况:

  • 客户端主动断(无需重连)
  • 服务端主动断(无需重连)
  • 客户端因为网络原因,比如手机断网,非主动段(需要重连)
  • 运营商因连接空闲导致的连接断开(需要重连)

关于最后一种情况,可以参见 移动端IM实践:实现Android版微信的智能心跳机制

对于重连,还需要考虑的问题是重连的时间间隔,如果手机网断了,不停地重连同样是失败,所以需要自己制定一个重连次数和时间间隔关系。

总之,重连的逻辑和策略是需要我们自己维护的。

3. 多线程

OkHttp中关于状态的回调发生在OkHttp创建的子线程中,根据状态我们需要发起重连。如果这时候我们的应用发生了前后台切换又要创建新连接,各种连接(重连)交织在一起,那真是一团乱麻。需要考虑的问题有:

  • 同时发起多个连接的情况,需要等上一个连接请求执行完,再根据请求发起下一个连接请求。上一个连接如果连上了,那下一个等待的连接请求就可以直接跳过了。
  • 连接请求的发生在不同线程中,状态的同步

解决方案

因为笔者项目内用的是OkHttp,所以最初的解决方案都是针对于OkHttp来做的。大致思路如下:

1. 心跳机制

1.1 OkHttp WebSocket心跳分析

前面说到过,OkHttp是不支持动态调整心跳的,那OkHttp是怎么维护心跳的呢,我们来分析一下它的源码:

分析源码,我们从调用入口client.newWebSocket开始:

// newWebSocket@OkHttpClient.java

/**
 * 内部使用RealWebSocket类来发起连接
 */
@Override public WebSocket newWebSocket(Request request, WebSocketListener listener) {
    // 注意这里的pingInterval,所以在创建RealWebSocket时就已经固定,没法再修改了。
    RealWebSocket webSocket = new RealWebSocket(request, listener, new Random(), pingInterval);
    webSocket.connect(this);
    return webSocket;
}
复制代码

// RealWebSocket@RealWebSocket.java

public RealWebSocket(Request request, WebSocketListener listener, Random random, long pingIntervalMillis) {
    this.originalRequest = request;
    this.listener = listener;
    this.random = random;
    this.pingIntervalMillis = pingIntervalMillis;
    // ...
    // 这里的writerRunnable是用来向服务端发送消息的,在后面详述
    this.writerRunnable = new Runnable() {
      @Override public void run() {
        try {
          while (writeOneFrame()) {
          }
        } catch (IOException e) {
          failWebSocket(e, null);
        }
      }
    };
  }
  
  public void connect(OkHttpClient client) {
    // WebSocket协议相关的header
    client = client.newBuilder()
        .eventListener(EventListener.NONE)
        .protocols(ONLY_HTTP1)
        .build();
    final Request request = originalRequest.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build();
    call = Internal.instance.newWebSocketCall(client, request);
    call.enqueue(new Callback() {
      @Override public void onResponse(Call call, Response response) {
        try {
          checkResponse(response);
        } catch (ProtocolException e) {
          failWebSocket(e, response);
          closeQuietly(response);
          return;
        }

        // Promote the HTTP streams into web socket streams.
        StreamAllocation streamAllocation = Internal.instance.streamAllocation(call);
        streamAllocation.noNewStreams(); // Prevent connection pooling!
        Streams streams = streamAllocation.connection().newWebSocketStreams(streamAllocation);

        // Process all web socket messages.
        try {
          // 连接成功,调用listern的onOpen
          listener.onOpen(RealWebSocket.this, response);
          String name = "OkHttp WebSocket " + request.url().redact();
          // 重点1:创建reader和writer,见下文
          initReaderAndWriter(name, streams);
          streamAllocation.connection().socket().setSoTimeout(0);
          // 重点2:轮询读
          loopReader();
        } catch (Exception e) {
          failWebSocket(e, null);
        }
      }

      @Override public void onFailure(Call call, IOException e) {
        failWebSocket(e, null);
      }
    });
  }
  
   public void initReaderAndWriter(String name, Streams streams) throws IOException {
    synchronized (this) {
      this.streams = streams;
      this.writer = new WebSocketWriter(streams.client, streams.sink, random);
      // 创建Scheduled线程池
      this.executor = new ScheduledThreadPoolExecutor(1, Util.threadFactory(name, false));
      if (pingIntervalMillis != 0) {
        // 利用线程池定期运行PingRunnable,越来越接近真像了
        executor.scheduleAtFixedRate(
            new PingRunnable(), pingIntervalMillis, pingIntervalMillis, MILLISECONDS);
      }
      if (!messageAndCloseQueue.isEmpty()) {
        runWriter(); // Send messages that were enqueued before we were connected.
      }
    }
    // 创建reader,用来读取消息
    reader = new WebSocketReader(streams.client, streams.source, this);
  }
复制代码 

重点看一下PingRunnable都做了什么

private final class PingRunnable implements Runnable {
    PingRunnable() {
    }

    @Override public void run() {
      // 发送ping frame
      writePingFrame();
    }
  }

  void writePingFrame() {
    WebSocketWriter writer;
    int failedPing;
    synchronized (this) {
      if (failed) return;
      writer = this.writer;
      // 判断是否在等待pong,如果在等待failedPing置为sentPingCount,否则置为-1
      failedPing = awaitingPong ? sentPingCount : -1;
      sentPingCount++;
      awaitingPong = true;
    }

    if (failedPing != -1) {
      // 运行到此处的条件是上一个pong还没有返回,报错退出
      failWebSocket(new SocketTimeoutException("sent ping but didn't receive pong within "
          + pingIntervalMillis + "ms (after " + (failedPing - 1) + " successful ping/pongs)"),
          null);
      return;
    }

    try {
      // 发送一个ping消息
      writer.writePing(ByteString.EMPTY);
    } catch (IOException e) {
      failWebSocket(e, null);
    }
  }
复制代码 

那么怎么判断有没有收到pong了,就需要用到reader了。我们来看一下reader里做了什么。

还记得前文重点2标记的loopReader么?

// loopReader@RealWebSocket.java

/** Receive frames until there are no more. Invoked only by the reader thread. */
public void loopReader() throws IOException {
    while (receivedCloseCode == -1) {
        // This method call results in one or more onRead* methods being called on this thread.
        // 处理收到的frame
        reader.processNextFrame();
    }
}

/**
 * 根据header判断是消息帧还是控制帧,心跳pong属于控制帧
 */
void processNextFrame() throws IOException {
    readHeader();
    if (isControlFrame) {
      readControlFrame();
    } else {
      readMessageFrame();
    }
  }

/**
 * 解析opcode
 */
private void readControlFrame() throws IOException {
    // ...
    switch (opcode) {
      case OPCODE_CONTROL_PING:
        frameCallback.onReadPing(controlFrameBuffer.readByteString());
        break;
      case OPCODE_CONTROL_PONG:
        // 注意:收到pong,触发onReadPing
        frameCallback.onReadPong(controlFrameBuffer.readByteString());
        break;
      case OPCODE_CONTROL_CLOSE:
        // ...
        frameCallback.onReadClose(code, reason);
        closed = true;
        break;
      default:
        throw new ProtocolException("Unknown control opcode: " + toHexString(opcode));
    }
  }
  
  /**
    * 计数器自增,将awaitingPong状态置为false
    */
  @Override public synchronized void onReadPong(ByteString buffer) {
    // This API doesn't expose pings.
    receivedPongCount++;
    awaitingPong = false;
  }
复制代码 

一路追下来发现,其实收到pong之后,okhttp就是简单地改变awaitingPong标记位的状态,就代表收到了上一个pong消息。

总结一下,okhttp的心跳流程,

  1. ReadWebSocket连接的时候创建schedule线程池,定期地执行PingRunnable发送ping消息;(发送其他的请求消息在WriteRunnable中)
  2. 同时创建一个reader不断接受消息;
  3. 发送ping之前,通过awaitingPong标记,判断上一个pong有没有回来;
  4. 对于收到的消息,进行解析,如果收到的pong,改变awaitingPong状态。

1.2 动态调整okhttp心跳

我们发现,只要改变线程池执行PingRunable的delayTime,就能达到在不重新创建OkHttpClient的情况下,动态改变ping的速率。

所以可以用反射的方式,shutDown原来的线程池,创建新的线程池,用新的pingInterval执行pingRunnable。

Class clazz = Class.forName("okhttp3.internal.ws.RealWebSocket");
Field field = clazz.getDeclaredField("executor");
field.setAccessible(true);
ScheduledExecutorService oldService = (ScheduledExecutorService) field.get(mWebSocket);

Class[] innerClasses = Class.forName("okhttp3.internal.ws.RealWebSocket").getDeclaredClasses();
for (Class innerClass : innerClasses) {
    if ("PingRunnable".equals(innerClass.getSimpleName())) {
        // 创建新的PingRunnable实例
        Constructor constructor = innerClass.getDeclaredConstructor(RealWebSocket.class);
        constructor.setAccessible(true);
        Object pingRunnable = constructor.newInstance(mWebSocket);

        // 创建新的线程池
        ScheduledThreadPoolExecutor newService = new ScheduledThreadPoolExecutor(1, Util.threadFactory("ws-ping", false));
        newService.scheduleAtFixedRate((Runnable) pingRunnable, interval, interval, unit);
        field.set(mWebSocket, newService);

        // shutdown旧的线程池
        oldService.shutdown();
    }
}
复制代码 

这样,我们就达成了动态调整心跳的目的。

Java-WebSocket的ping是通过外部调用sendPing()来达到发送ping的目的的,内部没有ping/pong状态机制,所以需要我们自己去维护这个关系。其实可以仿照OkHttp那样,利用定时消息去发送ping,然后解析pong来维护心跳状态。这里就不过多阐述了。

2. 断掉重连

正如前面提过的,断掉重连只要处理好几种断开状态的逻辑即可。好在绝大多数websocket库都分close、error回调,可以区分异常断开和主动断开。

需要自己维护的也就是重连的间隔和重试次数的关系,类似指数退避算法。

3. 多线程

针对上文说到的多线程问题,主要的问题点就是发生在同时发生多个连接请求的时候,与其对多线程加各种同步,不如并行该串行。采用类似消息队列的方式,消息的处理放在单独的线程中取做。这样就可以省却了线程的同步,同时也能保证状态的有序性。

其实Android中,利用HandlerThread就可以完美地满足上述两点,最初笔者也确实是利用HandlerThread来做的。后来觉得可以独立于平台,也可以运行在纯Java的平台上,所以还是自己手动实现了。同时自己维护消息队列,也可以更好地自己处理优先级、延迟等。

WebSocketGo

之前也说过,笔者的项目是基于OkHttp的,但发现状态管理这部分可以单独抽象出来,所以才会有了本文的标题WebSocketGo(以下简称WsGo)。

数据流如下:

  1. Dispatcher就是前文提到的消息队列,就是生产者-消费者模式,维护两个队列,发送command,接收event。

    • command主要有:CONNECT、DISCONNECT、RECONNECT、CHANGE_PING、SEND

    • event主要有:OnConnect、onMessage、onSend、onRetry、onDisConnect、onClose

  2. Channel Manager主要负责状态管理。

  3. WebSocket interface可以理解为适配层,负责调用WebSocket库。

  4. Event Listener就是前台的状态回调。

接入WsGo

implementation 'com.gnepux:wsgo:1.0.2'
// use okhttp
implementation 'com.gnepux:wsgo-okwebsocket:1.0.1'
// use java websocket
implementation 'com.gnepux:wsgo-jwebsocket:1.0.1'
复制代码 

初始化

WsConfig config = new WsConfig.Builder()
        .debugMode(true)    // true to print log
        .setUrl(pushUrl)    // ws url
        .setHttpHeaders(headerMap)  // http headers
        .setConnectTimeout(10 * 1000L)  // connect timeout
        .setReadTimeout(10 * 1000L)     // read timeout
        .setWriteTimeout(10 * 1000L)    // write timeout
        .setPingInterval(10 * 1000L)    // initial ping interval
        .setWebSocket(OkWebSocket.create()) // websocket client
        .setRetryStrategy(retryStrategy)    // retry count and delay time strategy
        .setEventListener(eventListener)    // event listener
        .build();
WsGo.init(config);
复制代码 

Go

// 连接
WsGo.getInstance().connect();
// 发送消息
WsGo.getInstance().send("hello from WsGo");
// 断开
WsGo.getInstance().disconnect(1000, "close");
WsGo.getInstance().disconnectNormal("close");
// 改变心跳
WsGo.getInstance().changePingInterval(10, TimeUnit.SECONDS);
// 释放
WsGo.getInstance().destroyInstance();
复制代码 

WsConfig的更多配置

setWebSocket(WebSocket socket)

WsGo 已经支持OkHttp and Java WebSocket

// for OkHttp (wsgo-okwebsocket)
setWebSocket(OkWebSocket.create());
// for Java WebSocket (wsgo-jwebsocket)
setWebSocket(JWebSocket.create());
复制代码 

如果你需要使用其他的WebSocket库或自定义客户端,只需要实现一个WebSocket接口,将对应结果传递给ChannelCallback即可。剩下的连接管理,WsGo会帮你完成。

public interface WebSocket {
    void connect(WsConfig config, ChannelCallback callback);
    void reconnect(WsConfig config, ChannelCallback callback);
    boolean disconnect(int code, String reason);
    void changePingInterval(long interval, TimeUnit unit);
    boolean send(String msg);
}
复制代码 

setRetryStrategy(RetryStrategy retryStrategy)

对于非正常断开,WsGo会自动重连。RetryStrategy指的是重连次数和延时的关系。

WsGo默认有一个DefaultRetryStrategy,如果你需要自己调整,实现RetryStrategy接口里的onRetry方法即可。

public interface RetryStrategy {
    /**
     * 重试次数和延迟的关系
     *
     * @param retryCount 已经重试的次数
     * @return 延时的时间
     */
    long onRetry(long retryCount);
}
复制代码 

setEventListener(EventListener eventListener)

添加事件回调。需要注意,回调在WsGo自己创建的一个线程中运行,不在调用线程中。如有必要,需要在会调用手动切换线程。

public interface EventListener {
    void onConnect();
    void onDisConnect(Throwable throwable);
    void onClose(int code, String reason);
    void onMessage(String text);
    void onReconnect(long retryCount, long delayMillSec);
    void onSend(String text, boolean success);
}
复制代码 

PS

目前WsGo没有与手机网络的变化通知关联(即断网可以收到回调,网络恢复不会自动重连),因为笔者觉得这部分不属于WebSocket自身的状态管理范畴,而且已经不属于平台无关了。如有有这方便需求,需要自己从外部发起连接。在项目中简单地封装一下就ok啦~

后记

本文稍显长,从最初的智能家居的场景开始,介绍到websocket状态管理现存的问题,然后给出了笔者的解决方案,最后是通用的状态管理框架WebSocketGo。也算是笔者在工作中的一点思考和总结。其实细想,不光WebSocket,所有的长连接应该都会面临相同的问题。但思考的角度和解决的出发点应该都是一样的。

最后

在技术领域内,没有任何一门课程可以让你学完后一劳永逸,再好的课程也只能是“师傅领进门,修行靠个人”。“学无止境”这句话,在任何技术领域,都不只是良好的习惯,更是程序员和工程师们不被时代淘汰、获得更好机会和发展的必要前提。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容