OkHttp源码分析-同步篇
很早就想拿okhttp开刀了,这次就记一次使用OKhttp的网络请求。
首先需要说明的是我这里不提前讲述类结构,1如果提前知道了都有什么看起来会很无趣,2开篇就是一大堆说明,也不方便观看。因此总结经验,减少赘述争取能图文并茂的为大家展示出一种清晰的流程和调用关系。
OK,Getting back to the point.
同步请求从这里开始:
Response execute = okHttpClient.newCall(build).execute();
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
进入看OKhttpClient类的newCall方法,入参是Request 对象,于此同时看这个注释,知道了创建这个Call 并准备好request,以便将来被调用。
/**
* Prepares the {@code request} to be executed at some point in the future.
* 准备再将来某一时刻被调用。
*/
@Override public Call newCall(Request request) {
return new RealCall(this, request, false /* for web socket */);
}
由此,我们可以看出 在请求之前先准备了三个对象:OkhttpClient,Request,和Call。其中的属性参数请查看源码。OKHttpClient使用的Builder模式:
public OkHttpClient() {
this(new Builder());
}
然而在builder创建的时候有两个类很特殊:当你定义拦截器后会把你当初定义好的拦截器的集合进行一次不可修改操作(即这里的拦截器一旦定义后放入拦截器集合就不能再被修改了),当然这两个拦截器是在什么时候被使用,我先不说 只是告诉大家,一但调用方法build(),就会创建OkhttpClient就会把当初链式定义的东西确定下来,而且是不允许修改的。
final List<Interceptor> interceptors = new ArrayList<>();
final List<Interceptor> networkInterceptors = new ArrayList<>();
························ ······························
this.interceptors = Util.immutableList(builder.interceptors);
this.networkInterceptors = Util.immutableList(builder.networkInterceptors);
························································
//这个工具类返回一个不可修改的list
/** Returns an immutable copy of {@code list}. */
public static <T> List<T> immutableList(List<T> list) {
return Collections.unmodifiableList(new ArrayList<>(list));
}
我们再看看Call 当然Call是一个接口其实现类是 new RealCall
为了能理解什么是Call我们先看Call这个类的注释吧:
/**
*
* A call is a request that has been prepared for execution. A call can be canceled. As this object
* represents a single request/response pair (stream), it cannot be executed twice.
*/
public interface Call extends Cloneable
忽略前半句(没有太大的意义)由于该对象表示单个请求/响应对(流),因此不能执行两次。
总体意思是:call是一个可以被取消的对象而且一个call表示单个请求/响应流,call是不会被反复调用的。
/** Returns the original request that initiated this call. */
Request request();
原来call有个request方法,执行该方法会把调用者本身给返回。
看看RealCall实现:
@Override public Request request() {
return originalRequest;
}
原来如此啊,就是把入参的request返回。还有这个同步请求的方法也在call里边。
/**
* Invokes the request immediately, and blocks until the response can be processed or is in error.
* 能快速并阻塞的invokes请求直到响应被处理或者出错
*
* 这里是同步请求
*
···
*/
Response execute() throws IOException;
这里originalRequest有一句注释:
The application's original request unadulterated by redirects or auth headers.
应用程序的原始的request, 没有搀杂重定向或认证headers。
既然如此我们看看初始化Request的时候做了什么吧,首先Request也是使用Builder的方式创建的:
public final class Request {
final HttpUrl url;
final String method;
final Headers headers;
final RequestBody body;
final Object tag;
private volatile CacheControl cacheControl; // Lazily initialized.
}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
public Builder() {
this.method = "GET";
this.headers = new Headers.Builder();
}
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
public Request build() {
if (url == null) throw new IllegalStateException("url == null");
return new Request(this);
}
这里看到创建Request 对象的时候会创建headers,请求方式默认是“GET”,且必须指定URL 。而且这里的CacheControl 是懒构造的,即一开始创建的Request的CacheControl 是null的(当你不指定CacheControl 的时候)。
看到这里我有点蒙蔽,看起来这个call并不像是请求对象,更像是一个调度对象(其实也是自己管理自己玩):它能取消自己,能执行请求。这样让我有些迫不及待的看call的实现:RealCall.calss
先看构造:看看当我们开始执行请求的时候它做了什么
RealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
//把client传了进来
this.client = client;
//把request传了进来
this.originalRequest = originalRequest;
//把是否WebSocket
this.forWebSocket = forWebSocket;
//重点:这里创建了一个RetryAndFollowUpInterceptor
this.retryAndFollowUpInterceptor = new RetryAndFollowUpInterceptor(client, forWebSocket);
}
好奇心驱使我需要先了解下RetryAndFollowUpInterceptor.class:
/**
* This interceptor recovers from failures and follows redirects as necessary.
* It may throw an
* {@link IOException} if the call was canceled.
* 从故障中恢复并遵循重定向
*/
public final class RetryAndFollowUpInterceptor implements Interceptor
哦,这里先理解为一个帮助失败的请求重新完成自己使命的Interceptor 。(看来OKhttp的重连机制在这里,先mark一下)
请求前的资源整合
好了,整合下我们现在的资源,出现在我们面前的有三个对象:
1 OkHttpClient,这个类是全局唯一的这里定义东西很多简要的说明一些都
2 Request: 一个请求对应一个Request对象,其中包含了:请求的URL,请求头,请求体,缓存控制器
3 RealCall: 实现了Call的方法,其中包含了Request对象,OkHttpClient对象的引用还有一个与Call唯一对应重连机制拦截器(RetryAndFollowUpInterceptor ),并且call提供了同步请求的方法和异步请求的方法,以及返回的response。
这里我把RealCall看作请求的facade(门面设计模式),把OkhttpClient看作网络请求的容器,request像是一个上下文对象。那这个容器都为RealCall提供了什么呢?
根据流程先看execute():
Override public Response execute() throws IOException {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
try {
client.dispatcher().executed(this);
Response result = getResponseWithInterceptorChain();
if (result == null) throw new IOException("Canceled");
return result;
} finally {
client.dispatcher().finished(this);
}
}
看到这里我们有了两个结论:
1原来是这样保证call不会被重复执行啊,调用前先检查当前的Call是不是被执行过(把当前的executed = true),当然这里也考虑到并发的情况,如果被执行了 或者说当前请求在处理队列里就抛异常,
2client.dispatcher().executed(this);看到这里说明call并不管理请求,这里调用了dispasrcher,并把自己传递给dispasrcher,这里正好说明了okhttpclient的作用之一就是请求调度。可是RealCall却提供了一套请求执行的栈处理。(符合面向对象的思想)
至于这个执行的dispatch是谁?什么时候创建的?
我们继续看:
/**
- Policy on when async requests are executed.
- <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you supply your
- own executor, it should be able to run {@linkplain #getMaxRequests the configured maximum} number
- of calls concurrently.
*/
public final class Dispatcher
1异步请求的执行策略,没有继承,不是抽象的,看来这里做了挺多东西,是个核心类。
什么时候被赋值呢?
public Builder() {
dispatcher = new Dispatcher();
···
}
public OkHttpClient() {
this(new Builder());
}
2这里我就明白了,创建OkClient的时候会把这个Dispatcher创建出来。
现在我又有问题了:
1 Dispatcher被创建出来都干了什么,
2 众所周知OKhttp是相当好的网络请求相对的这个OkClient将会是单例的出现在整个APP当中,这个Dispatcher将如果处理众多的call呢?因为volley一开始会创建多个(大概是5个)dispatcher(每个dispatcher将会是一个线程来处理请求)这里用一个dispatcher行不行呢?我很担忧啊。
这里先看看Dispatcher在创建的时候都做了什么吧:
public Dispatcher() {
}
纳尼?没搞错吧空构造?,事实是这样的:
private int maxRequests = 64; //最大并发请求数为64
private int maxRequestsPerHost = 5;// 每个主机最大请求数为5
private Runnable idleCallback;
/** Executes calls. Created lazily. */
private ExecutorService executorService;
这里要做一下讲解:
>Deque<RealCall> runningSyncCalls:同步调用的生产者队列。
ExecutorService executorService:call消费者池。
Deque<AsyncCall> readyAsyncCalls:缓存(用数组实现,可自动扩容,无大小限制)
Deque<AsyncCall> runningAsyncCalls:正在运行的任务,仅仅是用来引用正在运行的任务以判断并发量,注意它并不是消费者缓存
*根据生产者消费者模型的模型理论,当入队(enqueue)请求时,如果满足(runningRequests<64 && runningRequestsPerHost<5),那么就直接把AsyncCall直接加到runningCalls的队列中,并在线程池中执行。如果消费者缓存满了,就放入readyAsyncCalls进行缓存等待。*
但是这个ExecutorService 在同步的时候是没有创建的:而是通过executorService()方法返回ExecutorService ,然后使用线程池提供的execute方法把call传进去。
synchronized void enqueue(AsyncCall call) {
if(this.runningAsyncCalls.size() < this.maxRequests && this.runningCallsForHost(call) < this.maxRequestsPerHost) {
this.runningAsyncCalls.add(call);
this.executorService().execute(call);
} else {
this.readyAsyncCalls.add(call);
}
}
public synchronized ExecutorService executorService() {
if(this.executorService == null) {
this.executorService = new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), Util.threadFactory("OkHttp Dispatcher", false));
}
return this.executorService;
}
```
其实我是有些失望的怎么这个dispatcher这么不给力,只是把任务进行调度分发。
这些看看就行了,等遇到了再细说吧免得跑偏了:
```
/** Used by {@code Call#execute} to signal it is in-flight. */
synchronized void executed(RealCall call) {
runningSyncCalls.add(call);
}
```
果然马上就用到了runningSyncCalls,
Running synchronous calls. Includes canceled calls that haven't finished yet.
运行同步calls。包括尚未完成的canceled calls。没想到这里是同步的啊,那就先探索同步调用流程咯。。。(我会告诉大家我没用过OKHTTP吗?)
当然看到这里也治好了我的懵逼,原来在call的时候就分了同步执行和异步执行。
这里用ArrayDeque来存储,ArrayDeque?我没用过就先看下大概介绍吧:一个非线程安全,不限容量的队列。
到此为止的工作很简单我做个总结:准备对象放入队列runningSyncCalls,然后就没有然后了,至于怎么执行,就需要看RealCall了。
## 不论是同步还是异步,都是调用这里:
回到RealCall.class看这里,也许 Response result = getResponseWithInterceptorChain();能给我们答案:
```
//这里主要把用户添加的interceptor和OK自己必要的interceptor放入集合 List<Interceptor> interceptors然后在 Interceptor.Chain中反复创建自己通过传进去的index从前到后的把interceptor放入栈中,执行顺序参照栈的执行顺序。
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(retryAndFollowUpInterceptor);
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(
interceptors, null, null, null, 0, originalRequest);
return chain.proceed(originalRequest);
}
```
好想法,把所有的interceptor放入list中,需要了继续加,执行的时候从头到尾撸一遍,反正都定义的接口方法,只要够规范,就能处理的很好。(先膜拜一下)
那让我们看看RealInterceptorChain类的proceed方法:
```
public Response proceed(Request request, StreamAllocation streamAllocation, HttpCodec httpCodec,
Connection connection) throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
···
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
···
return response;
}
```
认真梳理流程:先看下到底RealCall有多少个interceptor:
在client里找到这两个集合
final List<Interceptor> interceptors;
final List<Interceptor> networkInterceptors;
List<Interceptor> interceptors = new ArrayList<>();
1 interceptors.addAll(client.interceptors());
2 interceptors.add(retryAndFollowUpInterceptor);
3 interceptors.add(new BridgeInterceptor(client.cookieJar()));
4 interceptors.add(new CacheInterceptor(client.internalCache()));
5 interceptors.add(new ConnectInterceptor(client));
还有这俩:
if (!forWebSocket) {
6 interceptors.addAll(client.networkInterceptors());
}
7 interceptors.add(new CallServerInterceptor(forWebSocket));
根据加入的先后顺序我已经排序好了。
就这样创建并调用实现intercept接口intercept()方法:
```
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(
interceptors, streamAllocation, httpCodec, connection, index + 1, request);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
```
明明就是调用,intercept,为什么还要创建一个RealInterceptorChain next,想不明白,还差点把自己绕晕了。假如我不自定义interceptor,即client.interceptors()的size=0,则第一个执行interceptor方法的是这个 interceptors.add(retryAndFollowUpInterceptor);
这里我画个图给大家做一下讲解:
![OKHttp_Call.png](http://upload-images.jianshu.io/upload_images/2916442-57a58c206621f78a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
先从这个retryAndFollowUpInterceptor开始,看看ok自己的内部interceptor要next干嘛,这样就方便我们理解并写出符合ok逻辑的代码(享受与大神肩并肩的赶脚):
```
Override public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
streamAllocation = new StreamAllocation(
client.connectionPool(), createAddress(request.url()), callStackTrace);
···
}
```
这里的Chain chain 就是RealInterceptorChain 类不同的地方是此时的index已经+1了,先获取request,chain.request()然后在retryAndFollowUpInterceptor中,创建了StreamAllocation:
```
/**
* This interceptor recovers from failures and follows redirects as necessary. It may throw an
* {@link IOException} if the call was canceled.
*/
public final class RetryAndFollowUpInterceptor
```
这个class协调三个实体client.connectionPool(),createAddress(request.url()),callStackTrace之间的关系。
client.connectionPool():ConnectionPool
```
/**
* Manages reuse of HTTP and HTTP/2 connections for reduced network latency.
* 管理重用HTTP和HTTP / 2连接减少网络延迟
* HTTP requests that share the same {@link Address} may share a {@link Connection}.
* This class implements the policy of which connections to keep open for future use.
* 该类实现了保持连接打开方便以后使用的策略
*/
public final class ConnectionPool
```
Sound that's good,让我们看看连接池的核心哈:
#### ThreadPoolExecutor
```
/**
* Background threads are used to cleanup expired connections. There will be at most a single
* thread running per connection pool. The thread pool executor permits the pool itself to be
* garbage collected.
*/
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
```
corePoolSize :一个executor ,最小并发0个线程存在(线程池维护线程的最少数量,这就意味着空闲一段时间后所有的线程将全部被销毁)
maximumPoolSize: 最大线程数,当任务进来时可以扩充的线程最大值,当大于了这个值就会根据丢弃处理机制来处理(这里直接用了int的最大值,意思就是可以承载无限多的连接)
keepAliveTime: 当线程数大于corePoolSize时,多余的空闲线程的最大存活时间
ThreadFactory threadFactory: 新建线程工厂,这里使用OKhttp的ThreadFactory返回创建的线程并把线程设置成daemon(true)
```
@Override public Thread newThread(Runnable runnable) {
Thread result = new Thread(runnable, name);
result.setDaemon(daemon);//设置为后台线程(守护线程,服务线程),即此线程不依赖jvm(当jvm退出之后该线程不会被结束),什么时候结束:守护线程在没有用户线程可服务时自动离开(即便是App退出这个线程中的内容也继续执行,执行完毕后退出)
return result;
}
```
其实还有一个参数handler(这里使用exector提供的RejectedExecutionHandler):当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理
>重点讲解:
其中比较容易让人误解的是:corePoolSize,maximumPoolSize,workQueue之间关系。
1.当线程池小于corePoolSize时,新提交任务将创建一个新线程执行任务,即使此时线程池中存在空闲线程。
2.当线程池达到corePoolSize时,新提交任务将被放入workQueue中,等待线程池中任务调度执行
3.当workQueue已满,且maximumPoolSize>corePoolSize时,新提交任务会创建新线程执行任务
4.当提交任务数超过maximumPoolSize时,新提交任务由RejectedExecutionHandler处理
5.当线程池中超过corePoolSize线程,空闲时间达到keepAliveTime时,关闭空闲线程
6.当设置allowCoreThreadTimeOut(true)时,线程池中corePoolSize线程空闲时间达到keepAliveTime也将关闭
#### SynchronousQueue:
>Java 6的并发编程包中的SynchronousQueue是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样。不像ArrayBlockingQueue或LinkedListBlockingQueue,SynchronousQueue内部并没有数据缓存空间,你不能调用peek()方法来看队列中是否有数据元素,因为数据元素只有当你试着取走的时候才可能存在,不取走而只想偷窥一下是不行的,当然遍历这个队列的操作也是不允许的。队列头元素是第一个排队要插入数据的线程,而不是要交换的数据。数据是在配对的生产者和消费者线程之间直接传递的,并不会将数据缓冲数据到队列中。
还没讲解第二个参数:createAddress(request.url())
创建Address 类前先准备好需要被初始化的对象:
```
private Address createAddress(HttpUrl url) {
SSLSocketFactory sslSocketFactory = null;
HostnameVerifier hostnameVerifier = null;
CertificatePinner certificatePinner = null;
if (url.isHttps()) {
sslSocketFactory = client.sslSocketFactory();
hostnameVerifier = client.hostnameVerifier();
certificatePinner = client.certificatePinner();
}
return new Address(url.host(), url.port(), client.dns(), client.socketFactory(),
sslSocketFactory, hostnameVerifier, certificatePinner, client.proxyAuthenticator(),
client.proxy(), client.protocols(), client.connectionSpecs(), client.proxySelector());
}
```
SSLSocketFactory :用于创建SSLSocket协议的工厂。在创建httpclient的时候默认会把这个sslSocketFactory 创建出来,默认TrustManager使用X509TrustManager 。
```
if (builder.sslSocketFactory != null || !isTLS) {
this.sslSocketFactory = builder.sslSocketFactory;
this.certificateChainCleaner = builder.certificateChainCleaner;
} else {
X509TrustManager trustManager = systemDefaultTrustManager();
this.sslSocketFactory = systemDefaultSslSocketFactory(trustManager);
this.certificateChainCleaner = CertificateChainCleaner.get(trustManager);
}
```
由此还可以看出,如果自定义sslSocket,也需要定义certificateChainCleaner,如果使用默认的sslSocketFactory ,certificateChainCleaner 也是会被配置完毕。
HostnameVerifier :hostnameVerifier = OkHostnameVerifier.INSTANCE;
>此类是用于主机名验证的基接口。在握手期间,如果 URL 的主机名和服务器的标识主机名不匹配,则验证机制可以回调此接口的实现程序来确定是否应该允许此连接。策略可以是基于证书的或依赖于其他验证方案。当验证 URL 主机名使用的默认规则失败时使用这些回调。
CertificatePinner :
```
/*Constrains which certificates are trusted.Pinning certificates defends against attacks on certificate authorities.It also prevents connections through man-in-the-middle certificate authorities either known or unknown to the application's user.
* /
certificatePinner = CertificatePinner.DEFAULT
public static final CertificatePinner DEFAULT = new Builder().build();
```
死磕OKHttp源码继续看retryAndFollowUpInterceptor.intercept(Chain chain)方法的代码:
```
int followUpCount = 0;
Response priorResponse = null;
//无限轮询开始
while (true) {
···
Response response = null;
boolean releaseConnection = true;
try {
response = ((RealInterceptorChain) chain).proceed(request, streamAllocation, null, null);
releaseConnection = false;
}
···
}
```
那我们就看下面那个Interceptor(BridgeInterceptor )
```
/**
* Bridges from application code to network code.
* 从应用程序代码到网络代码的桥梁。
* First it builds a network request from a user request.
* 首先,它从用户request建立网络请求
* Then it proceeds to call the network.
* 然后它继续调用网络。
* Finally it builds a user response from the network response.
* 最后,它建立一个用户response 从网络response
*/
public final class BridgeInterceptor implements Interceptor
private final CookieJar cookieJar;
public BridgeInterceptor(CookieJar cookieJar) {
this.cookieJar = cookieJar;
}
```
创建的时候需要传cookieJar,我们看下cookie从哪里取:
```
client.cookieJar():cookieJar = CookieJar.NO_COOKIES;
CookieJar NO_COOKIES = new CookieJar() {
@Override public void saveFromResponse(HttpUrl url, List<Cookie> cookies) {
}
@Override public List<Cookie> loadForRequest(HttpUrl url) {
return Collections.emptyList();
}
};
```
看意思算是明白了这时候只是创建一个空的
```
//这段代码是我杜撰的
List<Cookie> cookies = Collections.emptyList()
```
看看是怎么调用的,这里的Chain chain 依旧是RealInterceptorChain index=index+2
```
Override public Response intercept(Chain chain) throws IOException {
//获取到原始的request
Request userRequest = chain.request();
//说白了就是原始request的拷贝
//但是多了headers this.headers = new Headers.Builder();
//默认请求方式是GET
Request.Builder requestBuilder = userRequest.newBuilder();
RequestBody body = userRequest.body();
if (body != null) {
MediaType contentType = body.contentType();
//这里给header添加内容contentType,在一开创建request的时候已经添加进去了contentType = MediaType.parse(contentType + "; charset=utf-8")
if (contentType != null) {
requestBuilder.header("Content-Type", contentType.toString());
}
···
// If we add an "Accept-Encoding: gzip" header field we're responsible for also decompressing the transfer stream.
//这里默认使用gzip对传输流进行压缩
//如果不想使用定义header的Accept-Encoding和Range
boolean transparentGzip = false;
if (userRequest.header("Accept-Encoding") == null && userRequest.header("Range") == null) {
transparentGzip = true;
requestBuilder.header("Accept-Encoding", "gzip");
}
//这里怎么使用GZIP压缩需要向后看
//cookies 获取方案:Collections.emptyList()一个空的集合
List<Cookie> cookies = cookieJar.loadForRequest(userRequest.url());
if (!cookies.isEmpty()) {//如果不是空则以key=value的形式拼接cookie
requestBuilder.header("Cookie", cookieHeader(cookies));
}
//代理设置 如果没有设置代理 就填入"okhttp/3.6.0"
if (userRequest.header("User-Agent") == null) {
requestBuilder.header("User-Agent", Version.userAgent());
}
//在这里继续调用下一个intercept
Response networkResponse = chain.proceed(requestBuilder.build());
···
```
CacheInterceptor 到了cache处理:到了这里chain的index+3
首先判断cache的有无:有缓存从缓存中获取,没有缓存就null
```
Response cacheCandidate = cache != null ? cache.get(chain.request()) : null;
```
1什么是cache?
如果仔细看okhttpClient的话会发现cache分两个类
一个是Cache类:缓存HTTP和HTTPS响应文件系统,以便它们可以被重用,节省时间和带宽。
```
public final class Cache implements Closeable, Flushable
```
另一个是:InternalCache类,InternalCache是OkHttp的内部缓存接口。应用程序不应该实现它,而是使用Cache
```
/**
* OkHttp's internal cache interface. Applications shouldn't implement this: instead use {@link
* okhttp3.Cache}.
*/
public interface InternalCache
```
通过这个问题两个cache对比我们明白了:
Cache是外部使用的,InternalCache是内部使用的。
2那这个cache从哪里来的?
在httpclient里定义了这一个私有方法,还有一个public方法,如果是一开始就有cache就可以直接传进去,这样this.internalCache = null。(默认就是null)
```
MainActivity:
OkHttpClient okHttpClient = new OkHttpClient().newBuilder()
.cache(new Cache())
.build();
OkHttpClient:
/** Sets the response cache to be used to read and write cached responses. */
public Builder cache(Cache cache) {
this.cache = cache;
this.internalCache = null;
return this;
}
```
这样对比貌似cache和internalcache是对立的关系,一个存在另一个就没有必要了,而且再次验证了问题1的结论,一个是供外部使用的Cache,另一个是内部使用的InternalCache 。
```
/** Sets the response cache to be used to read and write cached responses. */
void setInternalCache(InternalCache internalCache) {
this.internalCache = internalCache;
this.cache = null;
}
```
一开始创建HttpClient的时候会先执行一个静态代码块:
```
static {
Internal.instance = new Internal() {
···
@Override public void setCache(OkHttpClient.Builder builder, InternalCache internalCache) {
builder.setInternalCache(internalCache);
}
···
}
```
这个代码块创建了Internal这个抽象类,其中就有setCache方法具体实现是调用了setInternalCache
现在又有了疑问,Internal是干嘛的?
```
/**
* Escalate internal APIs in {@code okhttp3} so they can be used from OkHttp's implementation packages.
* The only implementation of this interface is in {@link OkHttpClient}.
*/
public abstract class Internal
```
okhttp3内部API的升级版,因此在okhttp包中这些方法是可以被使用的。
该接口的实现只有在OkHttpClient中可见。
因此可以确定的是OkHttpClient在Application中只可能有一个,Internal这个升级方法包也就只会有一个而且它的实现是在OkHttpClient中的。这里先用到一个讲一个吧,
3又是在哪里保存着的呢?如果仔细研究Cache类将会发现其实这里对cache做了很多处理:
```
Cache(File directory, long maxSize, FileSystem fileSystem) {
this.cache = DiskLruCache.create(fileSystem, directory, VERSION, ENTRY_COUNT, maxSize);
}
```
1使用时需要设置路径和文件大小,这里默认fileSystem=FileSystem.SYSTEM(okio包)
2DiskLruCache根据字面意思,这里存储使用的是本地文件存储+LRUCache,算是二级缓存先存内存并用lruCache进行管理,同时在本地也会有副本,如果内存中没有就会去本地找,都没有,那就是真的没有了。
```
public static String key(HttpUrl url) {
return ByteString.encodeUtf8(url.toString()).md5().hex();
}
```
3这里集合使用的key是URL进行MD5再取哈希(挺好)
接着看是怎么取缓存的:
```
Response get(Request request) {
String key = key(request.url());
DiskLruCache.Snapshot snapshot;
Entry entry;
···
//1 根据KEY从cache中取值
snapshot = cache.get(key);
···
//2 把返回的snapshot对象zhuan换成Entry
entry = new Entry(snapshot.getSource(ENTRY_METADATA));
···
//3 返回response
Response response = entry.response(snapshot);
···
return response;
}
```
既然要死磕就把这个吃透:
#### DiskLruCache
DiskLruCache的创建是从create开始的
```
/**
* Create a cache which will reside in {@code directory}.
* This cache is lazily initialized on first access and will be created if it does not exist.
* 创建一个缓存,缓存在File directory中。
* 这个缓存在第一次访问时被初始化,如果它不存在,将被创建。
* 好了注释帮我把第三个问题回答了哈哈哈
* @param directory a writable directory
* @param valueCount the number of values per cache entry. Must be positive.
* @param maxSize the maximum number of bytes this cache should use to store
*/
public static DiskLruCache create(FileSystem fileSystem, File directory, int appVersion,
int valueCount, long maxSize) {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize <= 0");
}
if (valueCount <= 0) {
throw new IllegalArgumentException("valueCount <= 0");
}
// Use a single background thread to evict entries.
//这里缓存类使用一个自己的线程池,而且这个线程池和链接线程池一样是一个后台线程
//忍不住吐槽一下:Util.threadFactory只会创建一个后台线程,别的线程估计也用不到
Executor executor = new ThreadPoolExecutor(0, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), Util.threadFactory("OkHttp DiskLruCache", true));
return new DiskLruCache(fileSystem, directory, appVersion, valueCount, maxSize, executor);
}
```
但是这里还是会比较下这个cache线程池和连接线程池,先把ConnectionPool的线程池拿过来:
```
private static final Executor executor = new ThreadPoolExecutor(0 /* corePoolSize */,
Integer.MAX_VALUE /* maximumPoolSize */, 60L /* keepAliveTime */, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp ConnectionPool", true));
```
同样是建立deamon线程(后台线程),同样是当线程空闲时间>keepAliveTime就会被ThreadPoolExecutor收掉(corePoolSize = 0)
不同的地方在
maximumPoolSize : cache.maximumPoolSize = 1
深入DiskLruCache,关键方法 cache.get(key):
根据KEY返回对应的条目的快照(Snapshot 类,是DiskLruCache的内部类),如果不存在,它将不可读。如果一个返回值,它是移动到LRU queue头。
```
public synchronized Snapshot get(String key) throws IOException {
initialize();
checkNotClosed();
validateKey(key);
Entry entry = lruEntries.get(key);
if (entry == null || !entry.readable) return null;
Snapshot snapshot = entry.snapshot();
if (snapshot == null) return null;
redundantOpCount++;
journalWriter.writeUtf8(READ).writeByte(' ').writeUtf8(key).writeByte('\n');
if (journalRebuildRequired()) {
executor.execute(cleanupRunnable);
}
return snapshot;
}
```
让我们接下来看:
```
Override public Response intercept(Chain chain) throws IOException {
···
//获取当前时间
long now = System.currentTimeMillis();
//创建一个cache存储类
CacheStrategy strategy = new CacheStrategy.Factory(now, chain.request(), cacheCandidate).get();
//把网络请求和缓存的的结果赋值进去
Request networkRequest = strategy.networkRequest;
Response cacheResponse = strategy.cacheResponse;
if (cache != null) {
cache.trackResponse(strategy);
}
//缓存候选不适用。关闭它。
if (cacheCandidate != null && cacheResponse == null) {
closeQuietly(cacheCandidate.body()); // The cache candidate wasn't applicable. Close it.
}
// If we're forbidden from using the network and the cache is insufficient, fail.
//如果我们被禁止使用网络和缓存不足,失败:自己创建一个返回结果
if (networkRequest == null && cacheResponse == null) {
···
}
···
//先请求网络 看下一个intercept
Response networkResponse = null;
try {
networkResponse = chain.proceed(networkRequest);
} finally {
// If we're crashing on I/O or otherwise, don't leak the cache body.
if (networkResponse == null && cacheCandidate != null) {
closeQuietly(cacheCandidate.body());
}
}
···
return response;
}
```
ConnectInterceptor已经被解锁,在connectIntercept中强转成RealInterceptorChain,看看接口Chain提供了什么方法:
```
interface Chain {
//获取请求体
Request request();
//处理
Response proceed(Request request) throws IOException;
//链接
Connection connection();
}
```
然而在ConnectInterceptor中使用了realChain.streamAllocation获得Socket管理类(StreamAllocation )
看看ConnectInterceptor怎么处理:
```
@Override public Response intercept(Chain chain) throws IOException {
RealInterceptorChain realChain = (RealInterceptorChain) chain;
Request request = realChain.request();
//从RealInterceptorChain 中获取Socket管理(StreamAllocation )
StreamAllocation streamAllocation = realChain.streamAllocation();
// We need the network to satisfy this request. Possibly for validating a conditional GET.
//我们需要网络来满足这个要求。可能用于验证条件获取
boolean doExtensiveHealthChecks = !request.method().equals("GET");
//一开始的创建在这里得到了调用:newStream获取到HttpCodec
HttpCodec httpCodec = streamAllocation.newStream(client, doExtensiveHealthChecks);
//开始连接
RealConnection connection = streamAllocation.connection();
return realChain.proceed(request, streamAllocation, httpCodec, connection);
}
```
什么是HttpCodec?对HTTP请求进行编码并对HTTP响应进行解码。 我们进去看看会发现HttpCodec是个接口类,实现在streamAllocation之中:
```
** Encodes HTTP requests and decodes HTTP responses. */
//对HTTP请求进行编码并对HTTP响应进行解码
public interface HttpCodec
```
编码解码都有哪些方法呢?
```
/** Returns an output stream where the request body can be streamed. */
Sink createRequestBody(Request request, long contentLength);
/** This should update the HTTP engine's sentRequestMillis field. */
void writeRequestHeaders(Request request) throws IOException;
/** Flush the request to the underlying socket. */
//将请求刷新到基础套接字
void flushRequest() throws IOException;
/** Flush the request to the underlying socket and signal no more bytes will be transmitted. */
//将请求刷新到基础套接字,将不再发送更多的字节
void finishRequest() throws IOException;
/**
* Parses bytes of a response header from an HTTP transport.
* 从HTTP传输解析响应标头的字节。
*/
Response.Builder readResponseHeaders(boolean expectContinue) throws IOException;
/** Returns a stream that reads the response body. */
//返回读取响应体的流
ResponseBody openResponseBody(Response response) throws IOException;
/**
* Cancel this stream. Resources held by this stream will be cleaned up, though not synchronously.
* 取消此流。此流持有的资源将被清理,但不是同步的。
* That may happen later by the connection pool thread.
* 发生在连接池线程池之后,就是等连接完成后才执行清理线程。
*/
void cancel();
```
好了可以看看最后一个intercept:CallServerInterceptor
```
@Override public Response intercept(Chain chain) throws IOException {
HttpCodec httpCodec = ((RealInterceptorChain) chain).httpStream();
···
//到了这里才进行了请求
httpCodec.finishRequest();
···
//构造response
Response response = responseBuilder
.request(request)
.handshake(streamAllocation.connection().handshake())
.sentRequestAtMillis(sentRequestMillis)
.receivedResponseAtMillis(System.currentTimeMillis())
.build();
···
return response;
}
```
看似结束了其实这里返回了response之后还做了很多处理 我们还要返回回去继续从低到顶的撸一遍:
看ConnectInterceptor.class,这里直接把response做了处理直接返回了并没有什么处理:
```
return realChain.proceed(request, streamAllocation, httpCodec, connection);
```
接下来看CacheInterceptor.class 拿到网络请求的response还没完这里进行了再处理。
如果当前的请求在cache中存在且没有进行改变则进行更新并保存回去
```
// If we have a cache response too, then we're doing a conditional get.
//如果我们也有缓存响应,那么我们做一个条件得到。
if (cacheResponse != null) {
if (networkResponse.code() == HTTP_NOT_MODIFIED) {
//如果请求结果是没有被改动的(这里)
//重新构造一个response从cacheResponse里
Response response = cacheResponse.newBuilder()
.headers(combine(cacheResponse.headers(), networkResponse.headers()))
.sentRequestAtMillis(networkResponse.sentRequestAtMillis())
.receivedResponseAtMillis(networkResponse.receivedResponseAtMillis())
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
networkResponse.body().close();
// Update the cache after combining headers but before stripping the
// Content-Encoding header (as performed by initContentStream()).
cache.trackConditionalCacheHit();
cache.update(cacheResponse, response);
return response;
} else {
closeQuietly(cacheResponse.body());
}
}
···
```
如果没有缓存则进行缓存并返回给上一个intercept:
```
Response response = networkResponse.newBuilder()
.cacheResponse(stripBody(cacheResponse))
.networkResponse(stripBody(networkResponse))
.build();
if (HttpHeaders.hasBody(response)) {
CacheRequest cacheRequest = maybeCache(response, networkResponse.request(), cache);
response = cacheWritingResponse(cacheRequest, response);
}
return response;
```
继续往上走,在BridgeIntercept.class中
```
Response networkResponse = chain.proceed(requestBuilder.build());
//记录cookie到cookieJar,cookie从networkResponse.headers()中获取
HttpHeaders.receiveHeaders(cookieJar, userRequest.url(), networkResponse.headers());
//构造一个空的responseBuilder
Response.Builder responseBuilder = networkResponse.newBuilder()
.request(userRequest);
//把返回结果和head和responseBody加进去
if (transparentGzip
&& "gzip".equalsIgnoreCase(networkResponse.header("Content-Encoding"))
&& HttpHeaders.hasBody(networkResponse)) {
GzipSource responseBody = new GzipSource(networkResponse.body().source());
Headers strippedHeaders = networkResponse.headers().newBuilder()
.removeAll("Content-Encoding")
.removeAll("Content-Length")
.build();
responseBuilder.headers(strippedHeaders);
responseBuilder.body(new RealResponseBody(strippedHeaders, Okio.buffer(responseBody)));
}
return responseBuilder.build();
```
到了顶层RetryAndFollowUpInterceptor类得到response后在这里跳出循环
```
if (priorResponse != null) {
response = response.newBuilder()
.priorResponse(priorResponse.newBuilder()
.body(null)
.build())
.build();
}
Request followUp = followUpRequest(response);
if (followUp == null) {
if (!forWebSocket) {
streamAllocation.release();
}
return response;
}
```
// Attach the prior response if it exists. Such responses never have a body.
//如果存在,附加先前的response 。这样的responses 没有body
这样的priorResponse 一开始就是null,怎么会有值了呢?
看这里:
```
request = followUp;
priorResponse = response;
```
这里如果请求出现问题如:
```
catch (RouteException e) {
// The attempt to connect via a route failed. The request will not have been sent.
if (!recover(e.getLastConnectException(), false, request)) {
throw e.getLastConnectException();
}
releaseConnection = false;
continue;
} catch (IOException e) {
// An attempt to communicate with a server failed. The request may have been sent.
boolean requestSendStarted = !(e instanceof ConnectionShutdownException);
if (!recover(e, requestSendStarted, request)) throw e;
releaseConnection = false;
continue;
}
```
这里循环还是会继续重连机制生效,把response和request赋值开启请求二周目,那么问题来了什么时候会结束重连呢?看这里:
```
//1 大于最大重连次数
if (++followUpCount > MAX_FOLLOW_UPS) {
streamAllocation.release();
throw new ProtocolException("Too many follow-up requests: " + followUpCount);
}
//2如果requestbody是不可重连的,释放socket抛出异常
if (followUp.body() instanceof UnrepeatableRequestBody) {
streamAllocation.release();
throw new HttpRetryException("Cannot retry streamed HTTP body", response.code());
}
···
//3如果之前的streamAllocation没有被释放
if (streamAllocation.codec() != null) {
throw new IllegalStateException("Closing the body of " + response
+ " didn't close its backing stream. Bad interceptor?");
}
```
最大重连次数20
```
private static final int MAX_FOLLOW_UPS = 20;
```
最终结束dispatcher返回请求结果:
```
eturn result;
} finally {
client.dispatcher().finished(this);
}
```
呼,大工告成,流程大抵如此:
1会详细讲解DiskLruCache
2详细讲解StreamAllocation
3Gzip实现
4连接池工作
5HttpCodec这个http编码解码类