- 1.OkHttp源码解析(一):OKHttp初阶
- 2 OkHttp源码解析(二):OkHttp连接的"前戏"——HTTP的那些事
- 3 OkHttp源码解析(三):OKHttp中阶之线程池和消息队列
- 4 OkHttp源码解析(四):OKHttp中阶之拦截器及调用链
- 5 OkHttp源码解析(五):OKHttp中阶之OKio简介
- 6 OkHttp源码解析(六):OKHttp中阶之缓存基础
- 7 OkHttp源码解析(七):OKHttp中阶之缓存机制
- 8 OkHttp源码解析(八):OKHttp中阶之连接与请求值前奏
- 9 OkHttp源码解析(九):OKHTTP连接中三个"核心"RealConnection、ConnectionPool、StreamAllocation
- 10 OkHttp源码解析(十) OKHTTP中连接与请求
- 11 OkHttp的感谢
上篇文章已经说明了OKHttp有两种调用方式,一种是阻塞的同步请求,一种是异步的非阻塞的请求。今天主要是讲解的是异步的请求。其中涉及了Dispatcher这个类,其他的类基本已经在上篇文章介绍过了。所以本片文章的大体思路如下:
- 1.线程池的理解
- 2.Dispatcher类详解
- 3.OKHttp的任务调度
- 4.OKHttp调度的理解
在讲解线程池和消息队列的时候有必要讲下线程池的基本概念
一、线程池的理解
(一)android中的异步任务
android的异步任务一般都是用Thread+Handler或者AsyncTask来实现,其中笔者当初经历过各种各样坑,特别是内存泄漏,当初笔者可是相当的欲死欲仙啊!所以现在很少有开发者还在用这一套来做异步任务,现在一般都是Rxjava为主,当然还有自己自定义的异步任务框架(比如笔者),像RxJava都帮我们写好了对应场景的线程池,这是为什么?
1、线程池的理解
我对线程池的理解是有两个层次,一种是狭隘的,一种是广义的,那么咱们各自都说下
(1)狭义上的线程池:
线程池是一种多线程处理形式,处理过程中将任务添加到队列中,后面再创建线程去处理这些任务,线程池里面的线程都是后台线程,每个线程都是默认的优先级下运。如果某个线程处于空闲中,将添加一个任务进来,让空闲线程去处理任务。如果所有线程都很繁忙,消息队列会挂起,等待某个线程池空闲后再处理任务。这样可以保证线程数量不能超多最大数量。
(2)广义上的线程池:
多线程技术主要是解决处理器单元内多个线程执行的问题,它可以显著减少处理的单元闲置时间,增加处理器单元的吞吐能力。如果对多线程应用不当,会增加对单个任务的的处理时间。
举例说明:
假如一个服务器完成一项任务的时间为T:
T1 创建线程的时间
T2 在线程中执行任务的时间,包括线程同步所需要的时间
T3 线程销毁的时间
显然 T= T1+T2+T3. 注意:这是一个理想化的情况
可以看出,T1,T3是多线程自身带来的开销(在Java中,通过映射pThread,并进一步通过SystemCall实现native线程),我们渴望减少T1和T3的时间,从而减少T的时间。但是一些线程的使用者并没有注意到这一点,所以在线程中频繁的创建或者销毁线程,这导致T1和T3在T中占有相当比例。这显然突出的线程池的弱点(T1,T3),而不是有点(并发性)。
取自IBM知识库
所以线程池的技术正是如何关注缩短或调整T1,T3时间的技术,从而提高服务器程序的性能。
- 1、通过对线程进行缓存,减少创建和销毁时间的损失
- 2、通过控制线程数量的阀值,减少线程过少带来的CPU闲置(比如长时间卡在I/O上了)与线程过多给JVM内存与线程切换时系统调用的压力。
在平时我们可以通过线程工厂来创建线程池来尽量避免上述的问题。
二 Dispatcher 类详解
Dispatcher负责请求的分发
1、线程池executeService
/** Executes calls. Created lazily. */
private ExecutorService executorService;
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
由上面代码可以得出Dispatcher内部实现了懒加载的无边界限制的线程池。参数解析
- 1、0:核心线程数量,保持在线程池中的线程数量(即使已经空闲),为0代表线程空闲后不会保留,等待一段时间后停止。
- 2、Integer.MAX_VALUE:表示线程池可以容纳最大线程数量
- 3、TimeUnit.SECOND:当线程池中的线程数量大于核心线程时,空闲的线程就会等待60s才会被终止,如果小于,则会立刻停止。
- 4、new SynchronousQueue<Runnable>():线程等待队列。同步队列,按序排队,先来先服务
- 5、Util.threadFactory("OkHttp Dispatcher", false):线程工厂,直接创建一个名为OkHttp Dispatcher的非守护线程。
(1)SynchronousQueue每个插入操作必须等待另一个线程的移除操作,同样任何一个移除操作都等待另一个线程的插入操作。因此队列内部其实没有任何一个元素,或者说容量为0,严格说并不是一种容器,由于队列没有容量,因此不能调用peek等操作,因此只有移除元素才有元素,显然这是一种快速传递元素的方式,也就是说在这种情况下元素总是以最快的方式从插入者(生产者)传递给移除者(消费者),这在多任务队列中最快的处理任务方式。对于高频请求场景,无疑是最合适的。
(2)在OKHttp中,创建了一个阀值是Integer.MAX_VALUE的线程池,它不保留任何最小线程,随时创建更多的线程数,而且如果线程空闲后,只能多活60秒。所以也就说如果收到20个并发请求,线程池会创建20个线程,当完成后的60秒后会自动关闭所有20个线程。他这样设计成不设上限的线程,以保证I/O任务中高阻塞低占用的过程,不会长时间卡在阻塞上。
2、发起请求
整个框架主要通过Call来封装每一次的请求。同时Call持有OkHttpClient和一份Request。而每一次的同步或者异步请求都会有Dispatcher的参与。
(1)、同步
Dispatcher在执行同步的Call:直接加入到runningSyncCall队列中,实际上并没有执行该Call,而是交给外部执行
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
(2)、异步
将Call加入队列:如果当前正在执行的call的数量大于maxRequest(64),或者该call的Host上的call超过maxRequestsPerHos(5),则加入readyAsyncCall排队等待,否则加入runningAsyncCalls并执行
synchronized void enqueue(AsyncCall call) {
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
runningAsyncCalls.add(call);
executorService().execute(call);
} else {
readyAsyncCalls.add(call);
}
}
3、结束请求
从ready到running,在每个call结束的时候都会调用finished
private <T> void finished(Deque<T> calls, T call, boolean promoteCalls) {
int runningCallsCount;
Runnable idleCallback;
synchronized (this) {
if (!calls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
//每次remove完后,执行promoteCalls来轮转。
if (promoteCalls) promoteCalls();
runningCallsCount = runningCallsCount();
idleCallback = this.idleCallback;
}
//线程池为空时,执行回调
if (runningCallsCount == 0 && idleCallback != null) {
idleCallback.run();
}
}
private void promoteCalls() {
//如果当前执行的线程大于maxRequests(64),则不操作
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
通过上面代码,大家可以知道finished先执行calls.remove(call)删除call,然后执行promoteCalls(),在promoteCalls()方法里面:如果当前线程大于maxRequest则不操作,如果小于maxRequest则遍历readyAsyncCalls,取出一个call,并把这个call放入runningAsyncCalls,然后执行execute。在遍历过程中如果runningAsyncCalls超过maxRequest则不再添加,否则一直添加。所以可以这样说:
promoteCalls()负责ready的Call到running的Call的转化
具体的执行请求则在RealCall里面实现的,同步的在RealCall的execute里面实现的,而异步的则在AsyncCall的execute里面实现的。里面都是调用RealCall的getResponseWithInterceptorChain的方法来实现责任链的调用。
三、OKHttp的任务调度
在说调度任务之前先说下
1、Dispatcher任务调度
在OKHttp中,它使用Dispatcher作为任务的调度器。
如下图所示
在整个调度流程中涉及的成员如下:
其中
- Dispatcher 对象是分发者,也是生产者(默认在主线程中)
- AsyncCall 对象其实是一个任务即Runnable(内部做了包装异步接口)
// Dispatcher.java
maxRequests = 64 // 最大并发请求数为64
maxRequestsPerHost = 5 //每个主机最大请求数为5
ExecutorService executorService //消费者池(也就是线程池)
Deque<AsyncCall> readyAsyncCalls: // 异步的缓存,正在准备被消费的(用数组实现,可自动扩容,无大小限制)
Deque<AsyncCall> runningAsyncCalls //正在运行的 异步的任务集合,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
Deque<RealCall> runningSyncCalls //正在运行的,同步的任务集合。仅仅是用来引用正在运行的同步任务以判断并发量
通过将请求任务分发给多个线程,可以显著的减少I/O等待时间
2、OKHttp调度的具体流程分析
(1)同步调度分析
第一步是:是调用了RealCall的execute()方法里面调用executed(this);
@Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
第二步:在Dispatcher里面的executed执行入队操作
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
第三步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response,并返回Response result 。
第四步:执行client.dispatcher().finished(this)操作
void finished(RealCall call) {
finished(runningSyncCalls, call, false);
}
这里其实做的是出队操作。至此同步的调度就已经结束了
(2)异步调度分析
AsyncCall类简介
在讲解异步调度之前不得不提到AsyncCall这个类,AsyncCall,他其实是RealCall的内部类
//RealCall.java
final class AsyncCall extends NamedRunnable {
private final Callback responseCallback;
AsyncCall(Callback responseCallback) {
super("OkHttp %s", redactedUrl());
this.responseCallback = responseCallback;
}
String host() {
return originalRequest.url().host();
}
Request request() {
return originalRequest;
}
RealCall get() {
return RealCall.this;
}
@Override protected void execute() {
boolean signalledCallback = false;
try {
//执行耗时任务
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
//retryAndFollowUpInterceptor取消了 执行失败
signalledCallback = true;
//回调,注意这里回调是在线程池中,不是主线程
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
//一切正常走入正常流程
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
//最后执行出队
client.dispatcher().finished(this);
}
}
}
第一步 是调用了RealCall的enqueue()方法
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
captureCallStackTrace();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
在enqueue里面调用了client.dispatcher().enqueue(new AsyncCall(responseCallback));方法
第二步:在Dispatcher里面的enqueue执行入队操作
synchronized void enqueue(AsyncCall call) {
//判断是否满足入队的条件(立即执行)
if (runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
//正在运行的异步集合添加call
runningAsyncCalls.add(call);
//执行这个call
executorService().execute(call);
} else {
//不满足入队(立即执行)条件,则添加到等待集合中
readyAsyncCalls.add(call);
}
}
上述代码发现想要入队需要满足下面的条件
(runningAsyncCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost)
如果满足条件,那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行(线程池会根据当前负载自动创建,销毁,缓存相应的线程)。反之就放入readyAsyncCalls进行缓存等待。
runningAsyncCalls.size() < maxRequests 表示当前正在运行的AsyncCall是否小于maxRequests = 64
runningCallsForHost(call) < maxRequestsPerHos 表示同一个地址访问的AsyncCall是否小于maxRequestsPerHost = 5;
即 当前正在并发的请求不能超过64且同一个地址的访问不能超过5个
第三步:这里分两种情况
情况1 第三步 可以直接入队
runningAsyncCalls.add(call);
第四步:线程池executorService执行execute()方法
executorService().execute(call);
由于AsyncCall继承于NamedRunnable类,而NamedRunnable类又是Runnable类的实现类,所以走到了AsyncCall的execute()方法里面
第五步:执行AsyncCall的execute()方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
第六步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response。
第七步:如果是正常的获取到Response,则执行responseCallback.onResponse()
第八步:执行client.dispatcher().finished(this)操作 进行出队操作
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
PS:注意这里面第三个参数 同步是false,异步是true,如果是异步则需要进行是否添加继续入队的情景
情况2 第三步 不能直接入队,需要等待
readyAsyncCalls.add(call);
第四步 触发条件
能进入等待则说明当前要么有64条正在进行的并发,要么同一个地址有5个请求,所以要等待。
当有如下条件被满足或者触发的时候则执行promoteCalls操作
- 1 Dispatcher的setMaxRequestsPerHost()方法被调用时
public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
//设置的maxRequestsPerHost不能小于1
if (maxRequestsPerHost < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
}
this.maxRequestsPerHost = maxRequestsPerHost;
promoteCalls();
}
- 2 Dispatcher的setMaxRequests()被调用时
public synchronized void setMaxRequests(int maxRequests) {
//设置的maxRequests不能小于1
if (maxRequests < 1) {
throw new IllegalArgumentException("max < 1: " + maxRequests);
}
this.maxRequests = maxRequests;
promoteCalls();
}
- 3当有一条请求结束了,执行了finish()的出队操作,这时候会触发promoteCalls()进行调整
if (promoteCalls)
promoteCalls();
第五步 执行Dispatcher的promoteCalls()方法
private void promoteCalls() {
if (runningAsyncCalls.size() >= maxRequests) return; // Already running max capacity.
if (readyAsyncCalls.isEmpty()) return; // No ready calls to promote.
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
}
第六步 先判断是否满足 初步入队条件
if (runningAsyncCalls.size() >= maxRequests)
return;
if (readyAsyncCalls.isEmpty())
return; // No ready calls to promote.
如果此时 并发的数量还是大于maxRequests=64则return并继续等待
如果此时,没有等待的任务,则直接return并继续等待
第七步 满足初步的入队条件,进行遍历,然后进行第二轮入队判断
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall call = i.next();
if (runningCallsForHost(call) < maxRequestsPerHost) {
i.remove();
runningAsyncCalls.add(call);
executorService().execute(call);
}
if (runningAsyncCalls.size() >= maxRequests) return; // Reached max capacity.
}
进行同一个host是否已经有5请求在了,如果在了,则return返回并继续等待
第八步 此时已经全部满足条件,则从等待队列面移除这个call,然后添加到正在运行的队列中
i.remove();
runningAsyncCalls.add(call);
第九步 线程池executorService执行execute()方法
executorService().execute(call);
第十步:执行AsyncCall的execute()方法
@Override protected void execute() {
boolean signalledCallback = false;
try {
Response response = getResponseWithInterceptorChain();
if (retryAndFollowUpInterceptor.isCanceled()) {
signalledCallback = true;
responseCallback.onFailure(RealCall.this, new IOException("Canceled"));
} else {
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
}
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} finally {
client.dispatcher().finished(this);
}
}
第十一步:执行getResponseWithInterceptorChain();进入拦截器链流程,然后进行请求,获取Response。
第十二步:如果是正常的获取到Response,则执行responseCallback.onResponse()
第十三步:执行client.dispatcher().finished(this)操作 进行出队操作
void finished(AsyncCall call) {
finished(runningAsyncCalls, call, true);
}
至此 异步任务调度已经结束了
总结:
- 1、异步流程总结
所以简单的描述下异步调度为:如果当前还能可以执行异步任务,则入队,并立即执行,否则加入readyAsyncCalls队列,当一个请求执行完毕后,会调用promoteCalls(),来把readyAsyncCalls队列中的Async移出来并加入到runningAsyncCalls,并开始执行。然后在当前线程中去执行Call的getResponseWithInterceptorChain()方法,直接获取当前的返回数据Response- 2、对比同步和异步任务,我们会发现:同步请求和异步请求原理都是一样的,都是在getResponseWithInterceptorChain()函数通过Interceptor链条来实现网络请求逻辑,而异步任务则通过ExecutorService来实现的。PS:在Dispatcher中添加一个封装了Callback的Call的匿名内部类AsyncCall来执行当前 的Call。这个AsyncCall是Call的匿名内部类。AsyncCall的execute方法仍然会回调到Call的 getResponseWithInterceptorChain方法来完成请求,同时将返回数据或者状态通过Callback来完成。
四、OKHttp调度的"优雅'之处:
1、采用Dispacher作为调度,与线程池配合实现了高并发,低阻塞的的运行
2、采用Deque作为集合,按照入队的顺序先进先出
3、最精彩的就是在try/catch/finally中调用finished函数,可以主动控制队列的移动。避免了使用锁而wait/notify操作。