OkHttp阅读笔记(二)

第一篇入口
本文基于上一篇的前提,着重介绍其中的默认拦截器的意义
这里稍微回顾一下拦截器的调用顺序

Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    //尝试建立一个拦截器列表,之后会进行一次链式调用
    //简单说就是从上到下执行获取response之前的过程,
    //然后获取到response之后,再从下到上执行
    List<Interceptor> interceptors = new ArrayList<>();
    //首先执行自定义的拦截器
    interceptors.addAll(client.interceptors());
    //处理重试逻辑
    interceptors.add(retryAndFollowUpInterceptor);
    //添加一些预定义头部之类的数据
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    //处理http协议的缓存逻辑
    interceptors.add(new CacheInterceptor(client.internalCache()));
    //处理socket建立连接或者复用连接过程,总之这里会建立连接
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    //在已经建立的链接上进行参数发送和获取响应封装等操作
    interceptors.add(new CallServerInterceptor(forWebSocket));

    Interceptor.Chain chain = new RealInterceptorChain(
        interceptors, null, null, null, 0, originalRequest);
    return chain.proceed(originalRequest);
  }

接下来会按照拦截器的执行顺序进行介绍,稍微留意一下所有拦截器都是在子线程中执行,使用中不应该出现线程错误的问题

自定义拦截器

在子线程中,请求先执行的是OkHttpClient中的拦截器列表,这个实际上是自定义拦截器

    public Builder addInterceptor(Interceptor interceptor) {
      interceptors.add(interceptor);
      return this;
    }

实际上OkHttpClient通过Builder模式构建,然后可以在Builder中通过上述方法添加自定义拦截器,然后自定义拦截器会按照添加的顺序来执行。
作为优先执行的拦截器,实际上主要有两个好处,第一就是可以监听请求的开始,其次就是可以监听请求的结束,那么平时这里至少可以有两种常用的拦截器
1.打印日志的拦截器:用于在请求开始前打印请求参数等数据,请求结束时打印请求结果数据等
2.追踪请求的拦截器:这个一般可以用于测量一个请求从发起到接收的时间
当然上面只是我想到的两种,实际使用中根据自身需求添加就是了。

RetryAndFollowUpInterceptor

顾名思义,重试和追随拦截器,其实就是用于在请求完成之后,如果当前请求失败但是允许重试,那么会重新发起请求。如果当前请求成功,但是要求重定向,那么请求重定向地址的请求。
当然上面的描述比较粗浅,接下来看一下细节

@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    //流的分配者,这里重点是通过当前请求连接构建Address和RouteSelector
    streamAllocation = new StreamAllocation(
        client.connectionPool(), createAddress(request.url()), callStackTrace);

    int followUpCount = 0;//重定向的次数,最大20
    Response priorResponse = null;//用于记录重定向之前上一次的响应成果,这个响应是会清空响应体的
    while (true) {//用于重连或者重定向
      if (canceled) {//每一次请求之前先判断当前Call(Request)是否被要求取消
        streamAllocation.release();//清理连接池的资源,并且关闭Socket
        throw new IOException("Canceled");//直接在此处抛出IOException即可,会在AsyncCall中回调onFailure
      }

      Response response = null;
      //用于标记每一次请求是否应该释放套接字连接
      boolean releaseConnection = true;
      try {
        //进行请求
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        //正常来说此时服务端已经返回结果,单次请求完成
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        // 尝试连接到指定的节点时候失败
        // 1.在通过host去dns查找ip地址的时候出现异常
        // 2.进行socket连接过程中出现异常,实际上socket在连接的时候会尝试一直进行连接
        // 除非retryOnConnectionFailure要求不能重连,或者socket连接过程中出现不可重新连接相关的异常的时候会抛出
        // 后续会进行异常回调
        if (!recover(e.getLastConnectException(), false, request)) {
          throw e.getLastConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        // 这个异常可能更多的出现在网络传输数据的时候
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {// 当前连接中出现异常,并且不可尝试重新连接
          streamAllocation.streamFailed(null);//从复用池中移除当前连接,并且关闭当前套接字连接
          streamAllocation.release();//标记当前流分配者后续不再可用
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      // 在重试之前记录上一次请求的结果
      if (priorResponse != null) {
        //这里记录了上一次请求返回的响应,不过上一次请求的正文体在这里被置空
        //因为之前的响应的不应该被关心的,应该关心的是当前响应的正文体
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }
      //判断是否存在代理、重定向和一些异常情况,可能需要尝试发出新的请求
      //这里可能要进行新的请求的构建
      Request followUp = followUpRequest(response);

      if (followUp == null) {//不需要重定向
        if (!forWebSocket) {//App连接非WebSocket
          //标记当前流分配者后续不再可用,释放当前RealConnection所关联的流分配者
          //通过连接池的空闲时间来判断当前连接是否回收,如果回收则会关闭socket
          //否则会待在复用池中,等待复用或者在指定的时间后被清理
          streamAllocation.release();
        }
        return response;//返回成功的响应结果
      }
      //如果到这里,说明要重定向
      //先关闭之前服务端响应的输入流
      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {//最大重定向次数20
        streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }
      //判断当前是否可以重用连接,主要是Route的问题,包括ip地址等数据
      //如果不能则需要通过新的请求地址来新建连接
      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();//这里只是单纯的标记流分配者,连接本身会根据连接池状态觉得是否回收,同理socket也根据情况关闭
        //通过新的Address构建新的流分配者
        streamAllocation = new StreamAllocation(
            client.connectionPool(), createAddress(followUp.url()), callStackTrace);
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }
      //准备进行下一次重定向的请求
      request = followUp;
      priorResponse = response;
    }
  }

代码相对抽象,通过流程相对好阐述这个过程:
1.正常的一次请求成功:

try {
        //进行请求
        response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
        //正常来说此时服务端已经返回结果,单次请求完成
        releaseConnection = false;
    }finally {
        if (releaseConnection) {//正常请求成功不释放当前连接,主要是为了连接的复用
            streamAllocation.streamFailed(null);
            streamAllocation.release();
        }
    }
    if (followUp == null) {//正常请求不会返回重定向
        if (!forWebSocket) {//App连接非WebSocket
            //标记当前流分配者后续不再可用,释放当前RealConnection所关联的流分配者
            //通过连接池的空闲时间来判断当前连接是否回收,如果回收则会关闭socket
            //否则会待在复用池中,等待复用或者在指定的时间后被清理
            //总之就是当前流在后续一段时间内可以复用
            streamAllocation.release();
        }
        return response;//返回成功的响应结果
    }

非常简单,主要就是请求成功之后释放连接,让连接可以在连接池中被后续复用。
2.第一次连接失败:

while(true){
        try {
            //进行请求
            response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
            //抛出异常
        }catch (IOException e) {
            boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
            //判断当前是否可以重试,如果不能,这里抛出异常会回调onFailed,此时releaseConnection = true,走finally
            if (!recover(e, requestSendStarted, request)) throw e;
            //可以重试,先不释放连接
            releaseConnection = false;
            continue;
        }finally {
            if (releaseConnection) {//当前不可尝试重新连接
                streamAllocation.streamFailed(null);//从复用池中移除当前连接,并且关闭当前套接字连接
                streamAllocation.release();//标记当前流分配者后续不再可用
            }
        }
        //连接成功之后,走之前的流程
    }

private boolean recover(IOException e, boolean requestSendStarted, Request userRequest) {
    streamAllocation.streamFailed(e);//这里会关闭之前的socket

    // 应用层是否允许在连接失败之后重新尝试连接
    if (!client.retryOnConnectionFailure()) return false;

    // We can't send the request body again.
    // 这个是Http2协议中的情况,这里先不考虑
    if (requestSendStarted && userRequest.body() instanceof UnrepeatableRequestBody) return false;

    // This exception is fatal.
    // 检查当前异常类型,因为有的异常是无法再次进行重试连接的
    if (!isRecoverable(e, requestSendStarted)) return false;

    // 当前没有连接节点可以去尝试
    // 一般来说就是当前节点
    if (!streamAllocation.hasMoreRoutes()) return false;

    // For failure recovery, use the same route selector with a new connection.
    return true;
  }

  private boolean isRecoverable(IOException e, boolean requestSendStarted) {
    // If there was a protocol problem, don't recover.
    // 协议异常,比方说协议规定的报文格式不符之类的情况
    // 总之就是一些不满足当前协议的条件
    if (e instanceof ProtocolException) {
      return false;
    }

    // If there was an interruption don't recover, but if there was a timeout connecting to a route
    // we should try the next route (if there is one).
    // 这个一般是Okio的异常,会在流操作超时之后抛出
    // SocketTimeoutException一般可以认为是socket连接超时或者读写超时
    // 这种时候可以尝试重连
    if (e instanceof InterruptedIOException) {
      return e instanceof SocketTimeoutException && !requestSendStarted;
    }

    // Look for known client-side or negotiation errors that are unlikely to be fixed by trying
    // again with a different route.
    // 当前是Https握手失败异常
    if (e instanceof SSLHandshakeException) {
      // 如果是证书异常,这样没有必要重试,因为重试了也会失败
      // If the problem was a CertificateException from the X509TrustManager,
      // do not retry.
      if (e.getCause() instanceof CertificateException) {
        return false;
      }
    }
    if (e instanceof SSLPeerUnverifiedException) {
      // e.g. a certificate pinning error.
      return false;
    }

    // An example of one we might want to retry with a different route is a problem connecting to a
    // proxy and would manifest as a standard IOException. Unless it is one we know we should not
    // retry, we return true and try a new route.
    return true;
  }

当连接失败之后,会进行重新连接的判断,首先先标记当前流失败
1.当前应用层是否允许重新连接
2.当前异常是否是重连无意义异常,比方说Http证书校验异常,这种就算重连也会失败
3.检查当前是否有节点可用
在判断条件之前,会先标记当前流失败,看一下细节实现,具体注释会对应当前场景

public void streamFailed(IOException e) {
        Socket socket;
        boolean noNewStreams = false;

        synchronized (connectionPool) {
            if (e instanceof StreamResetException) {//这个是Http2协议的,先忽略
                //...
            } else if (connection != null
                    && (!connection.isMultiplexed() || e instanceof ConnectionShutdownException)) {//当前连接不为空,且不是http2协议
                noNewStreams = true;//标记之后不会新建连接
                // 当前连接没有成功,标记当前连接节点失败
                // 当前连接被标记成功的条件是要求完成一次请求,并且从服务端成功读取响应体中的正文部分
                if (connection.successCount == 0) {
                    if (route != null && e != null) {
                        routeSelector.connectFailed(route, e);
                    }
                    route = null;//会导致当前节点失效,那么如果要重新连接,要求必须有下一个备用的节点
                }
            }
            //不新建流、当前流完成、不释放流分配者
            //当前socket连接会被关闭
            socket = deallocate(noNewStreams, false, true);
        }

        closeQuietly(socket);
    }

    private Socket deallocate(boolean noNewStreams, boolean released, boolean streamFinished) {
        assert (Thread.holdsLock(connectionPool));
        //连接完成之后,无论成功失败,都清理当前codec
        if (streamFinished) {
            this.codec = null;
        }
        //当前流分配者是否继续可用
        if (released) {
            this.released = true;
        }
        Socket socket = null;
        if (connection != null) {
            if (noNewStreams) {//当前连接是否可以重用
                //一旦被标记,会导致连接池移除当前连接
                //后续会关闭socket连接
                connection.noNewStreams = true;
            }
            //在一次正常的请求完成之后,要进行连接的释放
            //包括socket的关闭
            if (this.codec == null && (this.released || connection.noNewStreams)) {
                release(connection);
                if (connection.allocations.isEmpty()) {
                    connection.idleAtNanos = System.nanoTime();
                    if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
                        socket = connection.socket();
                    }
                }
                connection = null;
            }
        }
        return socket;
    }

    private void release(RealConnection connection) {
        //这里实际上就是将连接和流分配者的关联解除
        //如果允许连接复用,那么不应该关闭socket连接,会在一定时间内等待复用
        //否则后面应该主动关闭socket连接
        for (int i = 0, size = connection.allocations.size(); i < size; i++) {
            Reference<StreamAllocation> reference = connection.allocations.get(i);
            if (reference.get() == this) {
                connection.allocations.remove(i);
                return;
            }
        }
        throw new IllegalStateException();
    }

因为当前请求没有完全成功,那么是没有办法进行复用的,所以说这里会关闭当前的socket连接并且从复用池中移除。
除此之外,还进行了节点是否有效的判断,因为复用的时候必须要有指定的连接节点,当前连接出现异常的请求会有一个对应的节点,如果当前节点连接成功,那么当前节点是有效的,重连的时候可以连接这个节点,否则就应该标记当前节点无效,重连的时候必须找寻其他节点。

连接复用池

上面既然提到了连接复用池,那么这里就看一下ConnectionPool的实现
首先是为什么需要使用复用池?
在Http1.1的协议中,通过"Connection: Keep-Alive"这个头部报文,可以指定当前socket连接为长连接,这意味者如果服务端/客户端没有主动终止当前连接,那么这个连接会一直保持
(实际中可能会因为很多原因导致长连接中断,这个不是重点)
此外在一次socket连接的过程中,至少会有3次握手这个流程的开销,可能还会有SSL证书/域名校验这一块,那么如果每一次连接都重复上述操作,显然不是什么很好的选择。
那么复用之前已经建立的长连接就是最好的一个选择

public ConnectionPool() {
    this(5, 5, TimeUnit.MINUTES);
  }

OkHttp中默认允许最多5个可以复用的连接,连接的最大等待时间是5分钟。
上面在将重试拦截器的时候提到,如果一个连接完成之后会进行连接释放,其中

  if (connection.allocations.isEmpty()) {
          connection.idleAtNanos = System.nanoTime();
          if (Internal.instance.connectionBecameIdle(connectionPool, connection)) {
            socket = connection.socket();
          }
        }

boolean connectionBecameIdle(RealConnection connection) {
    assert (Thread.holdsLock(this));
    //noNewStreams实际上是标记当前流不可复用
    if (connection.noNewStreams || maxIdleConnections == 0) {
      connections.remove(connection);//直接从连接池中移除
      return true;
    } else {
      //这里是唤醒cleaup线程
      notifyAll(); // Awake the cleanup thread: we may have exceeded the idle connection limit.
      return false;
    }
  }

private final Runnable cleanupRunnable = new Runnable() {
    @Override public void run() {
      while (true) {
        long waitNanos = cleanup(System.nanoTime());
        if (waitNanos == -1) return;
        if (waitNanos > 0) {//计算下一次唤醒进行清理任务的间隔
          long waitMillis = waitNanos / 1000000L;//毫秒
          waitNanos -= (waitMillis * 1000000L);//纳秒
          synchronized (ConnectionPool.this) {
            try {//线程挂起指定时间
              ConnectionPool.this.wait(waitMillis, (int) waitNanos);
            } catch (InterruptedException ignored) {
            }
          }
        }
      }
    }
  };

long cleanup(long now) {
    int inUseConnectionCount = 0;
    int idleConnectionCount = 0;
    RealConnection longestIdleConnection = null;
    long longestIdleDurationNs = Long.MIN_VALUE;

    // Find either a connection to evict, or the time that the next eviction is due.
    synchronized (this) {
      //遍历当前连接池中的所有连接
      for (Iterator<RealConnection> i = connections.iterator(); i.hasNext(); ) {
        RealConnection connection = i.next();

        // If the connection is in use, keep searching.
        // 如果当前连接正在使用中,不回收,继续查找下一个
        if (pruneAndGetAllocationCount(connection, now) > 0) {
          inUseConnectionCount++;
          continue;
        }

        idleConnectionCount++;//空闲连接数+1

        // If the connection is ready to be evicted, we're done.
        // 当前连接空闲时间 = 当前时间 - 当前连接被释放允许参与复用的时间
        long idleDurationNs = now - connection.idleAtNanos;
        // 这里是记录当前连接池中所有空闲连接中空闲的最久的连接
        if (idleDurationNs > longestIdleDurationNs) {
          longestIdleDurationNs = idleDurationNs;//当前最大的空闲的连接到现在所经过的纳秒数
          longestIdleConnection = connection;//当前空闲最久的可复用的连接
        }
      }

      if (longestIdleDurationNs >= this.keepAliveDurationNs
          || idleConnectionCount > this.maxIdleConnections) {//当前连接超过最大的空闲时间或者数量大于最大的空闲连接数量
        // We've found a connection to evict. Remove it from the list, then close it below (outside
        // of the synchronized block).
        // 从连接池中移除,不再复用
        connections.remove(longestIdleConnection);
      } else if (idleConnectionCount > 0) {//当前存在空闲连接,并且空闲连接没有超时
        // A connection will be ready to evict soon.
        // 当前清理线程挂起最大的空闲时间,下一次唤醒可能会清理当前最久的空闲连接
        return keepAliveDurationNs - longestIdleDurationNs;
      } else if (inUseConnectionCount > 0) {
        // All connections are in use. It'll be at least the keep alive duration 'til we run again.
        return keepAliveDurationNs;
      } else {
        // No connections, idle or in use.
        cleanupRunning = false;
        return -1;
      }
    }
    //每一次清理的对象为当前空闲连接列表中最久的空闲连接
    closeQuietly(longestIdleConnection.socket());

    // Cleanup again immediately.
    return 0;
  }

void put(RealConnection connection) {
    assert (Thread.holdsLock(this));
    //往连接池中添加连接
    if (!cleanupRunning) {//当前清理线程没有运行
      cleanupRunning = true;//执行清理线程
      executor.execute(cleanupRunnable);
    }
    connections.add(connection);
  }

private final Deque<RealConnection> connections = new ArrayDeque<>();

连接池中核心就是一个连接队列和一个清理线程,在每一次添加连接到连接队列的时候尝试开启清理线程,清理线程会尝试清理当前在链接队列中空闲最久的连接。
这里只是讲了逻辑,实际上在OkHttp中Http1.1要想复用连接,那么必须读取完ResponseBody中的数据,这样才会释放连接

总结

这一篇主要是了解重连和复用策略,下一篇会着重讲解Http缓存相关

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,176评论 5 469
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,190评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,232评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,953评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,879评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,177评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,626评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,295评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,436评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,365评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,414评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,096评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,685评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,771评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,987评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,438评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,032评论 2 341

推荐阅读更多精彩内容