OkHttp源码解析之(二)任务调度——Dispatcher和拦截器

0.前言

欢迎阅读笔者文章,本文是OkHttp源码解析系列文章的第二篇,如果没看过上一篇文章的,请转OkHttp源码分析之(一)请求流程,有什么不足的地方,记得提出来哦,您的支持,是对我写作的最大支持。
皮一下:笔者在android技术上遇到瓶颈了,但是通过这几个星期对框架源码的学习,终于得到突破了,强烈推荐在技术上遇到瓶颈的童鞋尝试阅读下流行框架的源码。这玩意,还很容易上瘾>_<

1.Dispatcher

在上一篇文章中,已经对Dispatcher里面的部分方法进行了解析,在这里,我将站在总体的角度上对其进行一个总结。
(1)Dispatcher做了什么?
先来看看Dispatcher里面定义的参数:

public final class Dispatcher {
  private int maxRequests = 64;
  private int maxRequestsPerHost = 5;
  private @Nullable Runnable idleCallback;

  /** Executes calls. Created lazily. */
  private @Nullable ExecutorService executorService;

  /** Ready async calls in the order they'll be run. */
  private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();

  /** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();

  /** Running synchronous calls. Includes canceled calls that haven't finished yet. */
  private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();

}

可以看到,Dispatcher里面维护了三个请求队列:同步请求队列、异步请求就绪队列、异步请求正在执行队列,没错,Dispatcher就干维护这三个请求队列、处理所有请求这两件事。
(2)Dispatcher为什么要维护这三个队列?
这个问题好解释,还是回到上一节讲到的网略请求部分,先看看同步请求:

@Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);//代码1,入队
      Response result = getResponseWithInterceptorChain();
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      e = timeoutExit(e);
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);//代码2,出队
    }
  }
/**Dispatcher中**/

//代码1
 /** Used by {@code Call#execute} to signal it is in-flight. */
  synchronized void executed(RealCall call) {
    runningSyncCalls.add(call);
  }

//代码2
 /** Used by {@code Call#execute} to signal completion. */
  void finished(RealCall call) {
    finished(runningSyncCalls, call);
  }

 private <T> void finished(Deque<T> calls, T call) {
    Runnable idleCallback;
    synchronized (this) {
      if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
      idleCallback = this.idleCallback;
    }

    boolean isRunning = promoteAndExecute();

    if (!isRunning && idleCallback != null) {
      idleCallback.run();
    }
  }

//重点代码
 private boolean promoteAndExecute() {
    assert (!Thread.holdsLock(this));

    List<AsyncCall> executableCalls = new ArrayList<>();
    boolean isRunning;
    synchronized (this) {
      //遍历就绪异步请求队列(有异步请求时才会执行)
      for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
        AsyncCall asyncCall = i.next();//从就绪队列中取出请求

        if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.//如果正在运行的异步请求没有超限
        if (runningCallsForHost(asyncCall) >= maxRequestsPerHost) continue; // Host max capacity.//并且Host请求没有大于最大数

        i.remove();//从就绪队列中移除掉异步请求
        executableCalls.add(asyncCall);//将其添加至可执行的队列中
        runningAsyncCalls.add(asyncCall);//添加至正在执行的异步请求队列中
      }
      isRunning = runningCallsCount() > 0;//重新计算正在执行的队列数
    }
    
    //具体调用执行异步操作的方法(有异步请求时才会执行)
    for (int i = 0, size = executableCalls.size(); i < size; i++) {
      AsyncCall asyncCall = executableCalls.get(i);
      asyncCall.executeOn(executorService());
    }

    return isRunning;
  }

代码在上一篇文章中已经解析过,这里就不解析了,主要看代码1和代码2,同步请求主要就干了两件事:入队和出队,这里重点还是promoteAndExecute()方法,当执行同步请求是,在上一步重正在执行的同步请求队列中移除掉请求之后,这里仅仅是重新计算正在执行的队列数(等于runningAsyncCalls.size() + runningSyncCalls.size());如果是异步请求,则会执行注释中的代码,其中重要的一点就是当runningAsyncCalls.size()超额时仅会把请求放到就绪的异步请求队列(readyAsyncCalls)中(breake了),而这里,则是当runningAsyncCalls小于maxRequests时(有空位了),将异步请求从就绪队列readyAsyncCalls中取出来放到runningAsyncCalls之中。
因此,Dispatcher才需要维护这三个队列。
关于Dispatcher就介绍到这里,接下来介绍下OkHttp另一核心内容:拦截器

2.拦截器

啥?什么拦截器?我怎么没听说过呀?
别紧张,先来看看拦截器的是什么。
拦截器是什么?官网的解释是:

Interceptors are a powerful mechanism that can monitor, rewrite, and retry calls.
拦截器是OkHttp中提供的一种强大机制,可以实现网略监听、重写和请求、请求失败重试等功能。
官网

顺便附上官网提供的图:


interceptors@2x.png

由官方解释可见拦截器是不管同步还是异步请求都会执行的。
关于拦截器的分析,由于代码实在是太多了,由于篇幅关系,无法进行详细解释,但是流程还是要有的,做了什么也是要交代清楚的,于是,在这里,我就简单讲讲这厮做了什么。
(1)流程
首先还是从RealCall的代码中入手(execute中和Async里面的execute,同步异步都会执行),这里举个栗子就从同步请求入手吧:

 @Override public Response execute() throws IOException {
    synchronized (this) {
      if (executed) throw new IllegalStateException("Already Executed");
      executed = true;
    }
    captureCallStackTrace();
    timeout.enter();
    eventListener.callStart(this);
    try {
      client.dispatcher().executed(this);
      Response result = getResponseWithInterceptorChain();//重点
      if (result == null) throw new IOException("Canceled");
      return result;
    } catch (IOException e) {
      e = timeoutExit(e);
      eventListener.callFailed(this, e);
      throw e;
    } finally {
      client.dispatcher().finished(this);
    }
  }

重点就是getResponseWithInterceptorChain()方法,来看看它做了什么:

 Response getResponseWithInterceptorChain() throws IOException {
    // Build a full stack of interceptors.
    //包含所有拦截器的List
    List<Interceptor> interceptors = new ArrayList<>();
    interceptors.addAll(client.interceptors());//将用户定义的拦截器添加进来,即上图中的Application拦截器
    //添加OkHttp内部的五大拦截器
    interceptors.add(retryAndFollowUpInterceptor);
    interceptors.add(new BridgeInterceptor(client.cookieJar()));
    interceptors.add(new CacheInterceptor(client.internalCache()));
    interceptors.add(new ConnectInterceptor(client));
    if (!forWebSocket) {
      interceptors.addAll(client.networkInterceptors());
    }
    interceptors.add(new CallServerInterceptor(forWebSocket));
    //重点,拦截器链
    Interceptor.Chain chain = new RealInterceptorChain(interceptors, null, null, null, 0,
        originalRequest, this, eventListener, client.connectTimeoutMillis(),
        client.readTimeoutMillis(), client.writeTimeoutMillis());

    return chain.proceed(originalRequest);
  }

可以看到,先是将用户自定义的拦截器添加进拦截器列表中,后又将OkHttp内部的五个拦截器分别添加进去,这五个拦截器分别:

  • retryAndFollowUpInterceptor:重试和失败重定向拦截器
  • BridgeInterceptor:桥接适配器拦截器
  • CacheInterceptor:缓存拦截器
    这两个拦截器主要负责补充请求头等信息和处理缓存。
  • ConnectInterceptor:连接拦截器。负责连接。
  • CallServerInterceptor:服务响应拦截器。负责将Http请求写入到网略中,并从网略IO流中读取数据返回给客户端。
    接下来是重点,这里为什么说是拦截器链呢?请往下看RealInterceptorChain:
 private final List<Interceptor> interceptors;
  private final StreamAllocation streamAllocation;
  private final HttpCodec httpCodec;
  private final RealConnection connection;
  private final int index;
  private final Request request;
  private final Call call;
  private final EventListener eventListener;
  private final int connectTimeout;
  private final int readTimeout;
  private final int writeTimeout;
  private int calls;

  public RealInterceptorChain(List<Interceptor> interceptors, StreamAllocation streamAllocation,
      HttpCodec httpCodec, RealConnection connection, int index, Request request, Call call,
      EventListener eventListener, int connectTimeout, int readTimeout, int writeTimeout) {
    this.interceptors = interceptors;
    this.connection = connection;
    this.streamAllocation = streamAllocation;
    this.httpCodec = httpCodec;
    this.index = index;
    this.request = request;
    this.call = call;
    this.eventListener = eventListener;
    this.connectTimeout = connectTimeout;
    this.readTimeout = readTimeout;
    this.writeTimeout = writeTimeout;
  }

构造方法主要初始化了一些数据,这里不多讲,关键在于它的proceed方法:

 @Override public Response proceed(Request request) throws IOException {
    return proceed(request, streamAllocation, httpCodec, connection);
  }

public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
      RealConnection connection) throws IOException {
    if (index >= interceptors.size()) throw new AssertionError();

    calls++;

    // If we already have a stream, confirm that the incoming request will use it.
    if (this.httpCodec != null && !this.connection.supportsUrl(request.url())) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must retain the same host and port");
    }

    // If we already have a stream, confirm that this is the only call to chain.proceed().
    if (this.httpCodec != null && calls > 1) {
      throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
          + " must call proceed() exactly once");
    }

    //重点1:index + 1,即下一个拦截器
    // Call the next interceptor in the chain.
    RealInterceptorChain next = new RealInterceptorChain(interceptors, streamAllocation, httpCodec,
        connection, index + 1, request, call, eventListener, connectTimeout, readTimeout,
        writeTimeout);
    Interceptor interceptor = interceptors.get(index);
    Response response = interceptor.intercept(next);//将下一个拦截器传递进去

    // Confirm that the next interceptor made its required call to chain.proceed().
    if (httpCodec != null && index + 1 < interceptors.size() && next.calls != 1) {
      throw new IllegalStateException("network interceptor " + interceptor
          + " must call proceed() exactly once");
    }

    // Confirm that the intercepted response isn't null.
    if (response == null) {
      throw new NullPointerException("interceptor " + interceptor + " returned null");
    }

    if (response.body() == null) {
      throw new IllegalStateException(
          "interceptor " + interceptor + " returned a response with no body");
    }

    return response;
  }

这里重要的一点就是indext+1,表示只能访问下一个拦截器,这样就把所有的拦截器连接起来成为一条链了。
总结起来,getResponseWithInterceptorChain做了两件事:

  • 将五大拦截器添加进List中
  • 创建拦截器链RealInterceptorChain,并执行proceed方法
    而具体的拦截器里面执行的其实也是下一个拦截器的proceed方法,这里以RetryAndFollowUpInterceptor未来,看看他里面的intercept方法:
@Override public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    RealInterceptorChain realChain = (RealInterceptorChain) chain;
    Call call = realChain.call();
    EventListener eventListener = realChain.eventListener();

    StreamAllocation streamAllocation = new StreamAllocation(client.connectionPool(),
        createAddress(request.url()), call, eventListener, callStackTrace);
    this.streamAllocation = streamAllocation;

    int followUpCount = 0;
    Response priorResponse = null;
    while (true) {
      if (canceled) {
        streamAllocation.release();
        throw new IOException("Canceled");
      }

      Response response;
      boolean releaseConnection = true;
      try {
        response = realChain.proceed(request, streamAllocation, null, null);//重点,代码1
        releaseConnection = false;
      } catch (RouteException e) {
        // The attempt to connect via a route failed. The request will not have been sent.
        if (!recover(e.getLastConnectException(), streamAllocation, false, request)) {
          throw e.getFirstConnectException();
        }
        releaseConnection = false;
        continue;
      } catch (IOException e) {
        // An attempt to communicate with a server failed. The request may have been sent.
        boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
        if (!recover(e, streamAllocation, requestSendStarted, request)) throw e;
        releaseConnection = false;
        continue;
      } finally {
        // We're throwing an unchecked exception. Release any resources.
        if (releaseConnection) {
          streamAllocation.streamFailed(null);
          streamAllocation.release();
        }
      }

      // Attach the prior response if it exists. Such responses never have a body.
      if (priorResponse != null) {
        response = response.newBuilder()
            .priorResponse(priorResponse.newBuilder()
                    .body(null)
                    .build())
            .build();
      }

      Request followUp;
      try {
        followUp = followUpRequest(response, streamAllocation.route());
      } catch (IOException e) {
        streamAllocation.release();
        throw e;
      }

      if (followUp == null) {
        streamAllocation.release();
        return response;
      }

      closeQuietly(response.body());

      if (++followUpCount > MAX_FOLLOW_UPS) {
        streamAllocation.release();
        throw new ProtocolException("Too many follow-up requests: " + followUpCount);
      }

      if (followUp.body() instanceof UnrepeatableRequestBody) {
        streamAllocation.release();
        throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
      }

      if (!sameConnection(response, followUp.url())) {
        streamAllocation.release();
        streamAllocation = new StreamAllocation(client.connectionPool(),
            createAddress(followUp.url()), call, eventListener, callStackTrace);
        this.streamAllocation = streamAllocation;
      } else if (streamAllocation.codec() != null) {
        throw new IllegalStateException("Closing the body of " + response
            + " didn't close its backing stream. Bad interceptor?");
      }

      request = followUp;
      priorResponse = response;
    }
  }

重点看代码1,由前面分析可知,这里执行的是下一个拦截器的intercept,而下一个拦截器的intercept又会执行realChain.proceed,由此形成连接,将所有拦截器连接起来。
嘛,流程图还是要有的:

getResponseWithInterceptorChain (1).png

原创文章,欢迎转载,转载请附上原文地址https://www.jianshu.com/p/0f955f38ada1

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 196,200评论 5 462
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 82,526评论 2 373
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 143,321评论 0 325
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,601评论 1 267
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,446评论 5 358
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,345评论 1 273
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,753评论 3 387
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,405评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,712评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,743评论 2 314
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,529评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,369评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,770评论 3 300
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,026评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,301评论 1 251
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,732评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,927评论 2 336

推荐阅读更多精彩内容