OkHttp 的同步、异步请求的调度

同步请求过程

okhttp3.RealCall#execute

@Override public Response execute() throws IOException {
  synchronized (this) {//call 对象不能执行两次
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();//捕获调用栈信息
  eventListener.callStart(this);//回调监听器中的方法
  try {
    client.dispatcher().executed(this);//把自己(RealCall)添加到 runningSyncCalls 队列中
    Response result = getResponseWithInterceptorChain();//从拦截链中获取结果
    if (result == null) throw new IOException("Canceled");
    return result;//返回结果
  } catch (IOException e) {
    eventListener.callFailed(this, e);//回调监听器中的方法
    throw e;
  } finally {
    client.dispatcher().finished(this);//call 执行完成了,调用 调度器的 finish 方法
  }
}

finished(RealCall call)

void finished(RealCall call) {
  finished(runningSyncCalls, call, false);//同步执行 call ,不会触发 finished 方法
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
      //从 calls 队列中移除掉 call
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    if (promoteCalls) promoteCalls();//同步执行的情况下,promoteCalls 为 false
    runningCallsCount = runningCallsCount();//获取当前正在执行的 call 的总数(包括同步 call 和 异步 call)
    idleCallback = this.idleCallback;
  }

  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();//如果当前没有执行的 call,执行「空闲任务」
  }
}

异步请求过程

使用到 Dispatcher

okhttp3.RealCall#enqueue

@Override public void enqueue(Callback responseCallback) {
  synchronized (this) {
    //call 对象不能执行两次
    if (executed) throw new IllegalStateException("Already Executed");
    executed = true;
  }
  captureCallStackTrace();//捕获调用栈
  eventListener.callStart(this);//回调监听器方法
  client.dispatcher().enqueue(new AsyncCall(responseCallback));
}

okhttp3.Dispatcher#enqueue

synchronized void enqueue(AsyncCall call) {
   //运行中的异步请求数 < 64 &&  连接同一个主机的 call 总数量 < 5
  if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    runningAsyncCalls.add(call);//
    executorService().execute(call);//通过线程池执行 call 方法
  } else {
      //当前运行中的异步连接数>64,添加到就绪队列中,将有 promoteCall 方法触发调用
    readyAsyncCalls.add(call);
  }
}

调度器中的线程池,无上界,线程如果有 60s 没有被使用到,就会被回收。跟通过 Executors#newCachedThreadPool() 方法创建线程池的唯一区别在于这里指定了自定义的 ThreadFactory。

okhttp3.Dispatcher#executorService

public synchronized ExecutorService executorService() {
  if (executorService == null) {
    executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));//名称为 OkHttp Dispatcher
  }
  return executorService;
}

okhttp3.internal.Util#threadFactory 。自定义线程工厂。线程名字均为 "OkHttp Dispatcher"。

public static ThreadFactory threadFactory(final String name, final boolean daemon) {
  return new ThreadFactory() {
    @Override public Thread newThread(Runnable runnable) {
      Thread result = new Thread(runnable, name);//创建新的线程
      result.setDaemon(daemon);//是否为后台线程
      return result;
    }
  };
}

注:Thread 的名字可以相同,但是 thread_id 不能相同。

调用 executorService().execute(call) 方法之后,会执行该方法会调用NamedRunnable#run

okhttp3.internal.NamedRunnable#run

@Override public final void run() {
  String oldName = Thread.currentThread().getName();
  Thread.currentThread().setName(name);
  try {
    execute();//
  } finally {
    Thread.currentThread().setName(oldName);
  }
}

okhttp3.RealCall.AsyncCall#execute

@Override protected void execute() {
  boolean signalledCallback = false;
  try {
    Response response = getResponseWithInterceptorChain();//从拦截链中获取结果
    if (retryAndFollowUpInterceptor.isCanceled()) {
      signalledCallback = true;
      responseCallback.onFailure(RealCall.this, new IOException("Canceled"));//请求被取消了
    } else {
      signalledCallback = true;
      responseCallback.onResponse(RealCall.this, response);//回调 responseCallback 的 onResponse 方法,将结果传递出去
    }
  } catch (IOException e) {
    if (signalledCallback) {
      // Do not signal the callback twice!
      Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
    } else {
      eventListener.callFailed(RealCall.this, e);//回调监听
      responseCallback.onFailure(RealCall.this, e);//回调 responseCallback#onFailure
    }
  } finally {
    client.dispatcher().finished(this);//call 执行完毕
  }
}

finished(AsyncCall call)

AsyncCall 实际上是一个 NamedRunnable,它与 Call 没有继承/实现关系

/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
  finished(runningAsyncCalls, call, true);//异步 call 执行完成会触发 promoteCalls
}

okhttp3.Dispatcher#finished(java.util.Deque<T>, T, boolean)

private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
  int runningCallsCount;
  Runnable idleCallback;
  synchronized (this) {
    if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    if (promoteCalls) promoteCalls();
    runningCallsCount = runningCallsCount();
    idleCallback = this.idleCallback;
  }

  if (runningCallsCount == 0 && idleCallback != null) {
    idleCallback.run();
  }
}

异步任务执行完成之后,会调用 promoteCalls,运行异步就绪队列中 AsyncCall#run

okhttp3.Dispatcher#promoteCalls

private void promoteCalls() {
    //当前运行中的 异步 call 数目大于 maxRequests,直接返回
  if (runningAsyncCalls.size() >= maxRequests) return; 
    //就绪队列为空
  if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
    //迭代遍历就绪队列
  for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
    AsyncCall call = i.next();
    //
    if (runningCallsForHost(call) < maxRequestsPerHost) {
      i.remove();
      runningAsyncCalls.add(call);
      executorService().execute(call);//执行 异步 call
    }
    //当前运行中的异步 call 已经达到了最大的请求数
    if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
  }
}

总结

同步请求的执行过程比较简单。调用 execute 方法之后就直接进行请求了。

异步请求的执行是通过 dispatcher 来实现的。如果当前还能执行一个并发请求,那就立即执行,否则加入 readyAsyncCalls 队列,而正在执行的异步请求执行完毕之后,会调用 promoteCalls() 函数,来把 readyAsyncCalls 队列中的 AsyncCall “提升”为 runningAsyncCalls,并开始执行。

注:能执行一个并发请求的判断标准是:当前运行中的异步任务总数 <= 设置的 maxRequests并且连接到同一部主机上的 AsyncCall 总数 <= maxRequestsPerHost。

注:AsyncCallRealCall 的一个内部类,它实现了 Runnable接口,所以可以被提交到 ExecutorService 上执行,而它在执行时会调用 getResponseWithInterceptorChain() 函数,并把结果通过 responseCallback 传递给上层使用者。下一篇文章将会详细讲解 OkHttp 的拦截链。

由于本人水平有限,可能出于误解或者笔误难免出错,如果发现有问题或者对文中内容存在疑问欢迎在下面评论区告诉我。谢谢!

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342

推荐阅读更多精彩内容