同步请求过程
okhttp3.RealCall#execute
@Override public Response execute() throws IOException {
synchronized (this) {//call 对象不能执行两次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();//捕获调用栈信息
eventListener.callStart(this);//回调监听器中的方法
try {
client.dispatcher().executed(this);//把自己(RealCall)添加到 runningSyncCalls 队列中
Response result = getResponseWithInterceptorChain();//从拦截链中获取结果
if (result == null) throw new IOException("Canceled");
return result;//返回结果
} catch (IOException e) {
eventListener.callFailed(this, e);//回调监听器中的方法
throw e;
} finally {
client.dispatcher().finished(this);//call 执行完成了,调用 调度器的 finish 方法
}
}
finished(RealCall call)
void finished(RealCall call) {
finished(runningSyncCalls, call, false);//同步执行 call ,不会触发 finished 方法
}
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
//从 calls 队列中移除掉 call
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();//同步执行的情况下,promoteCalls 为 false
runningCallsCount = runningCallsCount();//获取当前正在执行的 call 的总数(包括同步 call 和 异步 call)
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();//如果当前没有执行的 call,执行「空闲任务」
}
}
异步请求过程
使用到 Dispatcher
okhttp3.RealCall#enqueue
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
//call 对象不能执行两次
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();//捕获调用栈
eventListener.callStart(this);//回调监听器方法
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
okhttp3.Dispatcher#enqueue
synchronized void enqueue(AsyncCall call) {
//运行中的异步请求数 < 64 && 连接同一个主机的 call 总数量 < 5
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);//
executorService().execute(call);//通过线程池执行 call 方法
} else {
//当前运行中的异步连接数>64,添加到就绪队列中,将有 promoteCall 方法触发调用
readyAsyncCalls.add(call);
}
}
调度器中的线程池,无上界,线程如果有 60s 没有被使用到,就会被回收。跟通过 Executors#newCachedThreadPool()
方法创建线程池的唯一区别在于这里指定了自定义的 ThreadFactory。
okhttp3.Dispatcher#executorService
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));//名称为 OkHttp Dispatcher
}
return executorService;
}
okhttp3.internal.Util#threadFactory
。自定义线程工厂。线程名字均为 "OkHttp Dispatcher"。
public static ThreadFactory threadFactory(final String name, final boolean daemon) {
return new ThreadFactory() {
@Override public Thread newThread(Runnable runnable) {
Thread result = new Thread(runnable, name);//创建新的线程
result.setDaemon(daemon);//是否为后台线程
return result;
}
};
}
注:Thread 的名字可以相同,但是 thread_id 不能相同。
调用 executorService().execute(call) 方法之后,会执行该方法会调用NamedRunnable#run
okhttp3.internal.NamedRunnable#run
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();//
} finally {
Thread.currentThread().setName(oldName);
}
}
okhttp3.RealCall.AsyncCall#execute
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();//从拦截链中获取结果
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));//请求被取消了
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);//回调 responseCallback 的 onResponse 方法,将结果传递出去
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
eventListener.callFailed(RealCall.this, e);//回调监听
responseCallback.onFailure(RealCall.this, e);//回调 responseCallback#onFailure
}
} finally {
client.dispatcher().finished(this);//call 执行完毕
}
}
finished(AsyncCall call)
AsyncCall 实际上是一个 NamedRunnable,它与 Call 没有继承/实现关系
/** Used by {@code AsyncCall#run} to signal completion. */
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);//异步 call 执行完成会触发 promoteCalls
}
okhttp3.Dispatcher#finished(java.util.Deque<T>, T, boolean)
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
异步任务执行完成之后,会调用 promoteCalls,运行异步就绪队列中 AsyncCall#run
okhttp3.Dispatcher#promoteCalls
private void promoteCalls() {
//当前运行中的 异步 call 数目大于 maxRequests,直接返回
if (runningAsyncCalls.size() >= maxRequests) return;
//就绪队列为空
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
//迭代遍历就绪队列
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
//
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);//执行 异步 call
}
//当前运行中的异步 call 已经达到了最大的请求数
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
总结
同步请求的执行过程比较简单。调用 execute 方法之后就直接进行请求了。
异步请求的执行是通过 dispatcher 来实现的。如果当前还能执行一个并发请求,那就立即执行,否则加入 readyAsyncCalls
队列,而正在执行的异步请求执行完毕之后,会调用 promoteCalls()
函数,来把 readyAsyncCalls
队列中的 AsyncCall
“提升”为 runningAsyncCalls
,并开始执行。
注:能执行一个并发请求的判断标准是:当前运行中的异步任务总数 <= 设置的 maxRequests并且连接到同一部主机上的 AsyncCall 总数 <= maxRequestsPerHost。
注:AsyncCall
是 RealCall
的一个内部类,它实现了 Runnable
接口,所以可以被提交到 ExecutorService
上执行,而它在执行时会调用 getResponseWithInterceptorChain()
函数,并把结果通过 responseCallback
传递给上层使用者。下一篇文章将会详细讲解 OkHttp 的拦截链。
由于本人水平有限,可能出于误解或者笔误难免出错,如果发现有问题或者对文中内容存在疑问欢迎在下面评论区告诉我。谢谢!