前言
相信大家想仔细认真的看下源码,然后去网上找资料都会有这样的两个感觉
- 版本不一致,导致和博主分析的有偏差
- 突然就跳转到某个关键类,根本不知道是从哪里进去的
反正阿简在看源码找资料的时候就是这样,说着说着快流了泪...
那阿简今天来和大家一起慢慢分析一下okhttp拦截器源码
先上总的框架图
源码分析准备
- app的build.gradle引入okhttp相关包
implementation 'com.squareup.okhttp3:okhttp:3.14.4'
- 创建一个简单的activity,写一个简单的GET请求的okhttp异步方法
private void okhttpTest() {
String url = "http://wwww.baidu.com";
OkHttpClient okHttpClient = new OkHttpClient.Builder().addInterceptor(TestInterceptor);
final Request request = new Request.Builder()
.url(url)
.get()//默认就是GET请求,可以不写
.build();
Call call = okHttpClient.newCall(request);
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: ");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "onResponse: " + response.body().string());
}
});
}
然后开始我们的源码分析~
首先,我们创建了一个OkHttpClient,点进去看它的构造方法,初始化了一个叫做Builder的静态内部类,我们看看初始化了哪些参数(部分参数介绍请看注释)
public Builder() {
//任务调度器
dispatcher = new Dispatcher();
//支持的协议
protocols = DEFAULT_PROTOCOLS;
connectionSpecs = DEFAULT_CONNECTION_SPECS;
eventListenerFactory = EventListener.factory(EventListener.NONE);
proxySelector = ProxySelector.getDefault();
if (proxySelector == null) {
proxySelector = new NullProxySelector();
}
cookieJar = CookieJar.NO_COOKIES;
socketFactory = SocketFactory.getDefault();
hostnameVerifier = OkHostnameVerifier.INSTANCE;
certificatePinner = CertificatePinner.DEFAULT;
proxyAuthenticator = Authenticator.NONE;
authenticator = Authenticator.NONE;
//ConnectionPool是一个连接池对象,它可以用来管理连接对象,从它的构造方法中可以看到连接池的默认空闲连接数为5个,keepAlive时间为5分钟。
connectionPool = new ConnectionPool();
dns = Dns.SYSTEM;
followSslRedirects = true;
followRedirects = true;
retryOnConnectionFailure = true;
callTimeout = 0;
connectTimeout = 10_000;
readTimeout = 10_000;
writeTimeout = 10_000;
pingInterval = 0;
}
阿简认为,初始化的这些这些成员变量中最重要的是Dispatcher,其他都是一些一看命名就大致知道的成员变量,那我们来看下这个任务调度器包含什么
public final class Dispatcher {
private int maxRequests = 64;
private int maxRequestsPerHost = 5;
private @Nullable Runnable idleCallback;
/** Executes calls. Created lazily. */
private @Nullable ExecutorService executorService;
/** Ready async calls in the order they'll be run. */
private final Deque<AsyncCall> readyAsyncCalls = new ArrayDeque<>();
/** Running asynchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<AsyncCall> runningAsyncCalls = new ArrayDeque<>();
/** Running synchronous calls. Includes canceled calls that haven't finished yet. */
private final Deque<RealCall> runningSyncCalls = new ArrayDeque<>();
public Dispatcher(ExecutorService executorService) {
this.executorService = executorService;
}
public Dispatcher() {
}
可以看到Dispatcher这个类中包含了两个异步队列,一个同步队列用来保存请求,该文章主要从异步请求分析,所以大家可以记一下这两个异步请求的成员变量名称,后面会用到readyAsyncCalls ,runningAsyncCalls
接着我们通过建造者模式创建了一个Request请求:
final Request request = new Request.Builder()
.url(url)
.get()//默认就是GET请求,可以不写
.build();
可以看到Request里面就是包含一个完整的请求报文的几个成员变量,对这部门计算机网络有疑惑的小伙伴可以参考阿简的这篇文章哟:网络基本知识-http
public final class Request {
final HttpUrl url;
final String method;
final Headers headers;
final @Nullable RequestBody body;
final Map<Class<?>, Object> tags;
紧接着,我们通过okhttpClient去new了一个请求(call),newCall点进去
Call call = okHttpClient.newCall(request);
可以看到最后是调用到了一个叫RealCall类的一个静态方法newRealCall,该方法创建了一个realCall的成员变量,传入了OkHttpClient 对象,同时创建了一个叫Transmitter的对象,这个类是干嘛的后面会提到(官方注释是:OkHttp的应用层和网络层之间的桥梁)
static RealCall newRealCall(OkHttpClient client, Request originalRequest, boolean forWebSocket) {
// Safely publish the Call instance to the EventListener.
RealCall call = new RealCall(client, originalRequest, forWebSocket);
call.transmitter = new Transmitter(client, call);
return call;
}
然后开始我们的异步请求
call.enqueue(new Callback() {
@Override
public void onFailure(Call call, IOException e) {
Log.d(TAG, "onFailure: ");
}
@Override
public void onResponse(Call call, Response response) throws IOException {
Log.d(TAG, "onResponse: " + response.body().string());
}
});
点进去,可以看到Call只有一个实现类,就是我们刚才的那个RealCall,
@Override public void enqueue(Callback responseCallback) {
synchronized (this) {
if (executed) throw new IllegalStateException("Already Executed");
executed = true;
}
transmitter.callStart();
client.dispatcher().enqueue(new AsyncCall(responseCallback));
}
可以看到这里transmitter干了一件事callStart()
然后通过Okhttp的那个成员变量dispatcher(任务调度器)调用了异步方法enqueue(AsyncCall call),传入了我们的CallBack回调监听(观察者模式喔)
然后最最最重要的来啦
我们看RealCall里面的这个内部类AsyncCall,继承了NamedRunnable,NamedRunnable又实现了Runnable 接口,那必然要实现run方法,run方法里面又调用了execute
public abstract class NamedRunnable implements Runnable {
protected final String name;
public NamedRunnable(String format, Object... args) {
this.name = Util.format(format, args);
}
@Override public final void run() {
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName(name);
try {
execute();
} finally {
Thread.currentThread().setName(oldName);
}
}
protected abstract void execute();
大家记住这个AsyncCall,后面会调用到这里的关键代码
回到刚才 client.dispatcher().enqueue这里,可以看到Dispatcher中的enqueue方法,我们的请求放入了readyAsyncCalls这个异步队列
void enqueue(AsyncCall call) {
synchronized (this) {
//请求放入了该异步队列
readyAsyncCalls.add(call);
// Mutate the AsyncCall so that it shares the AtomicInteger of an existing running call to
// the same host.
if (!call.get().forWebSocket) {
AsyncCall existingCall = findExistingCallWithHost(call.host());
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall);
}
}
promoteAndExecute();
}
进入promoteAndExecute()方法,这个方法很重要哦,可以看到两个判断条件
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
如果正在执行的异步请求数量大于最大的请求数量(maxRequests 默认等于64)的话
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity
每个主机最大请求数,默认为5
private boolean promoteAndExecute() {
assert (!Thread.holdsLock(this));
//创建一个临时变量List用作储存异步请求
List<AsyncCall> executableCalls = new ArrayList<>();
boolean isRunning;
synchronized (this) {
for (Iterator<AsyncCall> i = readyAsyncCalls.iterator(); i.hasNext(); ) {
AsyncCall asyncCall = i.next();
if (runningAsyncCalls.size() >= maxRequests) break; // Max capacity.
if (asyncCall.callsPerHost().get() >= maxRequestsPerHost) continue; // Host max capacity.
i.remove();
//循环中把请求分别放入临时变量和readyAsyncCalls中
asyncCall.callsPerHost().incrementAndGet();
executableCalls.add(asyncCall);
runningAsyncCalls.add(asyncCall);
}
isRunning = runningCallsCount() > 0;
}
//遍历该临时变量,执行executeOn
for (int i = 0, size = executableCalls.size(); i < size; i++) {
AsyncCall asyncCall = executableCalls.get(i);
asyncCall.executeOn(executorService());
}
return isRunning;
}
我们看到遍历的 asyncCall.executeOn(executorService())这行代码,然后回到了RealCall的内部类AsyncCall的executeOn方法,然后传入了ThreadPoolExecutor线程池对象
void executeOn(ExecutorService executorService) {
assert (!Thread.holdsLock(client.dispatcher()));
boolean success = false;
try {
executorService.execute(this);
success = true;
} catch (RejectedExecutionException e) {
InterruptedIOException ioException = new InterruptedIOException("executor rejected");
ioException.initCause(e);
transmitter.noMoreExchanges(ioException);
responseCallback.onFailure(RealCall.this, ioException);
} finally {
if (!success) {
client.dispatcher().finished(this); // This call is no longer running!
}
}
}
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
执行了 executorService.execute(this),传入了this,this是什么,是不是就是我们刚才的那个实现Runable的实现类AsyncCall本身,其实这里已经很清楚了,传入Runable到线程池,线程池肯定会多态重写执行run方法,run方法又调用了execute
@Override protected void execute() {
boolean signalledCallback = false;
transmitter.timeoutEnter();
try {
Response response = getResponseWithInterceptorChain();
signalledCallback = true;
responseCallback.onResponse(RealCall.this, response);
} catch (IOException e) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log(INFO, "Callback failure for " + toLoggableString(), e);
} else {
responseCallback.onFailure(RealCall.this, e);
}
} catch (Throwable t) {
cancel();
if (!signalledCallback) {
IOException canceledException = new IOException("canceled due to " + t);
canceledException.addSuppressed(t);
responseCallback.onFailure(RealCall.this, canceledException);
}
throw t;
} finally {
client.dispatcher().finished(this);
}
}
}
然后拦截器来了,大家进入RealCall的
Response getResponseWithInterceptorChain() throws IOException {
// Build a full stack of interceptors.
List<Interceptor> interceptors = new ArrayList<>();
interceptors.addAll(client.interceptors());
interceptors.add(new RetryAndFollowUpInterceptor(client));
interceptors.add(new BridgeInterceptor(client.cookieJar()));
interceptors.add(new CacheInterceptor(client.internalCache()));
interceptors.add(new ConnectInterceptor(client));
if (!forWebSocket) {
interceptors.addAll(client.networkInterceptors());
}
interceptors.add(new CallServerInterceptor(forWebSocket));
Interceptor.Chain chain = new RealInterceptorChain(interceptors, transmitter, null, 0,
originalRequest, this, client.connectTimeoutMillis(),
client.readTimeoutMillis(), client.writeTimeoutMillis());
boolean calledNoMoreExchanges = false;
try {
Response response = chain.proceed(originalRequest);
if (transmitter.isCanceled()) {
closeQuietly(response);
throw new IOException("Canceled");
}
return response;
} catch (IOException e) {
calledNoMoreExchanges = true;
throw transmitter.noMoreExchanges(e);
} finally {
if (!calledNoMoreExchanges) {
transmitter.noMoreExchanges(null);
}
}
}
可以看到interceptors 这个list中放入了很多拦截器,最新放入的是我们的应用拦截器OkHttpClient.Builder().addInterceptor(TestInterceptor)
这些拦截器的作用我后面会说明
然后就开始了我们的拦截器责任链的代码了(原理也是非常的简单)
这里创建了一个RealInterceptorChain对象,然后开始执行proceed传入原始的请求originalRequest
public Response proceed(Request request, Transmitter transmitter, @Nullable Exchange exchange)
throws IOException {
if (index >= interceptors.size()) throw new AssertionError();
calls++;
// If we already have a stream, confirm that the incoming request will use it.
if (this.exchange != null && !this.exchange.connection().supportsUrl(request.url())) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must retain the same host and port");
}
// If we already have a stream, confirm that this is the only call to chain.proceed().
if (this.exchange != null && calls > 1) {
throw new IllegalStateException("network interceptor " + interceptors.get(index - 1)
+ " must call proceed() exactly once");
}
// Call the next interceptor in the chain.
RealInterceptorChain next = new RealInterceptorChain(interceptors, transmitter, exchange,
index + 1, request, call, connectTimeout, readTimeout, writeTimeout);
Interceptor interceptor = interceptors.get(index);
Response response = interceptor.intercept(next);
// Confirm that the next interceptor made its required call to chain.proceed().
if (exchange != null && index + 1 < interceptors.size() && next.calls != 1) {
throw new IllegalStateException("network interceptor " + interceptor
+ " must call proceed() exactly once");
}
// Confirm that the intercepted response isn't null.
if (response == null) {
throw new NullPointerException("interceptor " + interceptor + " returned null");
}
if (response.body() == null) {
throw new IllegalStateException(
"interceptor " + interceptor + " returned a response with no body");
}
return response;
}
每次传入的index+1取集合中的下一个拦截器执行intercept,然后intercept中又执行proceed进行下一次的拦截执行
是不是非常非常的简单!!
然后开始收尾,execute中最后finall执行了 client.dispatcher().finished(this);
void finished(AsyncCall call) {
call.callsPerHost().decrementAndGet();
finished(runningAsyncCalls, call);
}
可以看到finished了promoteAndExecute()中放入runningAsyncCalls的对象,然后结束了
大致就分析到这里
开始分析下刚才说的几个拦截器
RetryAndFollowUpInterceptor,用来实现连接失败的重试和重定向。
BridgeInterceptor,用来修改请求和响应的 header 信息。
CacheInterceptor,用来实现响应缓存。比如获取到的 Response 带有 Date,Expires,Last-Modified,Etag 等 header,表示该 Response 可以缓存一定的时间,下次请求就可以不需要发往服务端,直接拿缓存的。
ConnectInterceptor,用来打开到服务端的连接。其实是调用了 StreamAllocation 的newStream 方法来打开连接的。建联的 TCP 握手,TLS 握手都发生该阶段。过了这个阶段,和服务端的 socket 连接打通。
CallServerInterceptor,用来发起请求并且得到响应。上一个阶段已经握手成功,HttpStream 流已经打开,所以这个阶段把 Request 的请求信息传入流中,并且从流中读取数据封装成 Response 返回。
拦截器分类
- 应用拦截器,就是我们addInterceptor添加的拦截器
- 网络拦截器 addNetworkInterceptor添加的拦截器
两个的特点分别是:
应用拦截器
1.不需要关心是否重定向或者失败重连
2.应用拦截器只会调用一次,即使数据来源于缓存
3.自定义的应用拦截器是第一个开始执行的拦截器,所以这句话的意思就是,应用拦截器可以决定是否执行其他的拦截器(如果不想继续往下传递请求,那么就不调用Chain.proceed()),通过Chain.proceed()
看getResponseWithInterceptorChain()方法结合责任链,可以很好的理解这几个特点
网络拦截器
1.允许像重定向和重试一样操作中间响应。
2.网络发生短路时不调用缓存响应。
3.在数据被传递到网络时观察数据。
4.有权获得装载请求的连接。
以下关于官网介绍的这张图区分应用拦截器和网络拦截器,通过getResponseWithInterceptorChain方法可以很好的理解,最新执行的是应用拦截器