前言
文章会分为两部分介绍,这一主要篇介绍从发起请求到调用getResponseWithInterceptorChain方法为止。
从getResponseWithInterceptorChain方法里开始,就开启了okhttp中的责任链模式,这个之后的文章再去做分析
流程图
Okhttp的使用
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url("http://wwww.baidu.com").build();
//同步请求
Response syncResponse = client.newCall(request).execute();
//异步请求
client.newCall(request).enqueue(new Callback() {
@Override
public void onFailure(@NotNull Call call, @NotNull IOException e) { }
@Override
public void onResponse(@NotNull Call call, @NotNull Response response) throws IOException { }
});
请求从OkhttpClient发起,调用OkhttpClient#newCall方法,传入Request请求参数,返回一个Call,实际返回的是RealCall,再调用RealCall的execute或enqueue发起请求
OkHttpClient
open class OkHttpClient internal constructor(
builder: Builder
) : Cloneable, Call.Factory, WebSocket.Factory {
@get:JvmName("dispatcher") val dispatcher: Dispatcher = builder.dispatcher//调度器,每一个请求都会记录在里面,里面包含了一个三个队列和一个线程池。
...省略部分代码...
@get:JvmName("minWebSocketMessageToCompress")//这里@get:JvmName的目的是为了兼容OKHTTP3,其实就是取了个别名
val minWebSocketMessageToCompress: Long = builder.minWebSocketMessageToCompress
val routeDatabase: RouteDatabase = builder.routeDatabase ?: RouteDatabase()
constructor() : this(Builder())
OkHttpClient可以直接创建或者通过Builder方式创建。里面没什么代码,只是根据建造者模式传入的Builder进行赋值,初始化操作都是在Builder里进行
再来看Builder
class Builder constructor() {
internal var dispatcher: Dispatcher = Dispatcher()//Dispatcher就是在这里初始化
internal var connectionPool: ConnectionPool = ConnectionPool()
internal val interceptors: MutableList<Interceptor> = mutableListOf()
internal val networkInterceptors: MutableList<Interceptor> = mutableListOf()
internal var eventListenerFactory: EventListener.Factory = EventListener.NONE.asFactory()
internal var retryOnConnectionFailure = true
internal var authenticator: Authenticator = Authenticator.NONE
internal var followRedirects = true
internal var followSslRedirects = true
internal var cookieJar: CookieJar = CookieJar.NO_COOKIES
internal var cache: Cache? = null
internal var dns: Dns = Dns.SYSTEM
internal var proxy: Proxy? = null
internal var proxySelector: ProxySelector? = null
internal var proxyAuthenticator: Authenticator = Authenticator.NONE
internal var socketFactory: SocketFactory = SocketFactory.getDefault()
internal var sslSocketFactoryOrNull: SSLSocketFactory? = null
internal var x509TrustManagerOrNull: X509TrustManager? = null
internal var connectionSpecs: List<ConnectionSpec> = DEFAULT_CONNECTION_SPECS
internal var protocols: List<Protocol> = DEFAULT_PROTOCOLS
internal var hostnameVerifier: HostnameVerifier = OkHostnameVerifier
internal var certificatePinner: CertificatePinner = CertificatePinner.DEFAULT
internal var certificateChainCleaner: CertificateChainCleaner? = null
internal var callTimeout = 0
internal var connectTimeout = 10_000
internal var readTimeout = 10_000
internal var writeTimeout = 10_000
internal var pingInterval = 0
internal var minWebSocketMessageToCompress = RealWebSocket.DEFAULT_MINIMUM_DEFLATE_SIZE
internal var routeDatabase: RouteDatabase? = null
}
Builder里面进行参数的初始化,像拦截器就是通过Builder设置,这里主要知道Dispatcher是在这里初始化的就行。
Request
open class Builder {
internal var url: HttpUrl? = null
internal var method: String
internal var headers: Headers.Builder
internal var body: RequestBody? = null
/** A mutable map of tags, or an immutable empty map if we don't have any. */
internal var tags: MutableMap<Class<*>, Any> = mutableMapOf()
constructor() {
this.method = "GET"
this.headers = Headers.Builder()
}
}
Request里默认请求方式为get,通过建造者模式配置请求地址、请求方式、请求头等参数。
RealCall
请求第一步是client.newCall(request),看看newCall是什么
override fun newCall(request: Request): Call = RealCall(this, request, forWebSocket = false)
newCall返回了一个RealCall,RealCall是继承自Call接口
来看看Call接口定义了些什么方法
interface Call : Cloneable {
fun request(): Request
@Throws(IOException::class)
fun execute(): Response//同步请求
fun enqueue(responseCallback: Callback)//异步请求
fun cancel()
fun isExecuted(): Boolean
fun isCanceled(): Boolean
fun timeout(): Timeout
public override fun clone(): Call
fun interface Factory {//OkHttpClient实现了这个工厂接口用来创建RealCall
fun newCall(request: Request): Call
}
}
OK,发现里面定义了execute和enqueue两个请求方法,那么网络请求应该就是由client.newCall(request)构建一个RealCall,再由RealCall调用execute和enqueue发起请求了。
看看RealCall怎么实现的execute同步请求
override fun execute(): Response {
check(executed.compareAndSet(false, true)) { "Already Executed" }//判断请求是否已经执行
timeout.enter()//超时逻辑
callStart()
try {
client.dispatcher.executed(this)//往dispatcher的同步队列里添加当前任务
return getResponseWithInterceptorChain()//通过责任链进行网络请求等逻辑
} finally {
client.dispatcher.finished(this)
}
}
同步方法非常简单,往dispatcher里面的同步队列添加了一个任务,然后直接在当前线程调用getResponseWithInterceptorChain开始进行网络请求。
再来看看异步请求
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart()
client.dispatcher.enqueue(AsyncCall(responseCallback))////往dispatcher的同步队列里添加AsyncCall任务
}
这次没有直接调用getResponseWithInterceptorChain方法,而是只调用了dispatcher.enqueue,并且传入的不是RealCall而是一个AsyncCall
//AsyncCall是RealCall的一个内部类
internal inner class AsyncCall(
private val responseCallback: Callback
) : Runnable {
//供外部调用,传入一个线程池,在线程池里执行这个任务
fun executeOn(executorService: ExecutorService) {
try {
executorService.execute(this)
} catch (e: RejectedExecutionException) {
responseCallback.onFailure(this@RealCall, ioException)
} finally {
}
}
override fun run() {
try {
val response = getResponseWithInterceptorChain()//请求最终也是调用的这个方法
//相关回调
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
responseCallback.onFailure(this@RealCall, e)
} catch (t: Throwable) {
responseCallback.onFailure(this@RealCall, canceledException)
} finally {
client.dispatcher.finished(this)
}
}
}
}
上面是精简过的代码,AsyncCall是RealCall的一个内部类,实现了Runnable 接口。通过executeOn方法传入一个线程池执行this,由于AsyncCall是一个Runnable,所以会在子线程中执行run方法,run方法里调用getResponseWithInterceptorChain这个方法进行网络请求,拿到请求结果后回调responseCallback。
可以这样认为,RealCall代表一个同步请求,AsyncCall代表一个异步请求
由于AsyncCall是传入到了Dispatcher里面,所以AsyncCall#executeOn方法肯定是在Dispatcher里面调用的。
下面看看Dispatcher是怎么去执行executed和enqueue这两个方法的
Dispatcher
class Dispatcher constructor() {
//#1
@get:Synchronized var maxRequests = 64//最大请求数
set(maxRequests) {
require(maxRequests >= 1) { "max < 1: $maxRequests" }
synchronized(this) {
field = maxRequests
}
promoteAndExecute()
}
@get:Synchronized var maxRequestsPerHost = 5//最大并发数
set(maxRequestsPerHost) {
require(maxRequestsPerHost >= 1) { "max < 1: $maxRequestsPerHost" }
synchronized(this) {
field = maxRequestsPerHost
}
promoteAndExecute()
}
@set:Synchronized
@get:Synchronized
var idleCallback: Runnable? = null
//这里定义了两个线程池executorServiceOrNull和executorService,实际上可以认为只定义了一个executorService
//当调用executorService线程池时,会触发get()方法,里面会初始化executorServiceOrNull线程池并将其返回
//对kotlin get() set()不了解的可以查看相关资料https://blog.csdn.net/c1392851600/article/details/80933130
private var executorServiceOrNull: ExecutorService? = null
@get:Synchronized
@get:JvmName("executorService") val executorService: ExecutorService
get() {
if (executorServiceOrNull == null) {
executorServiceOrNull = ThreadPoolExecutor(0, Int.MAX_VALUE, 60, TimeUnit.SECONDS,
SynchronousQueue(), threadFactory("$okHttpName Dispatcher", false))
}
return executorServiceOrNull!!
}
private val readyAsyncCalls = ArrayDeque<AsyncCall>()//准备执行的异步任务
private val runningAsyncCalls = ArrayDeque<AsyncCall>()//正在执行的异步任务
private val runningSyncCalls = ArrayDeque<RealCall>()//正在执行的同步任务
//#2
//添加任务到同步队列
@Synchronized internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
//添加任务到异步等待队列
internal fun enqueue(call: AsyncCall) {
synchronized(this) {
readyAsyncCalls.add(call)
if (!call.call.forWebSocket) {
val existingCall = findExistingCallWithHost(call.host)
if (existingCall != null) call.reuseCallsPerHostFrom(existingCall)
}
}
promoteAndExecute()
}
//执行异步任务
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()//新建一个列表,用来存放需要加入线程池的任务
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next()
if (runningAsyncCalls.size >= this.maxRequests) break //是否达到最大请求数
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // 是否达到最大并发数
i.remove()//从等待队列中移除
asyncCall.callsPerHost.incrementAndGet()
executableCalls.add(asyncCall)//加入缓存列表
runningAsyncCalls.add(asyncCall)//加入正在执行的队列
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {//遍历缓存列表
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService)//执行异步请求
}
return isRunning
}
}
代码看着很长,但是并不复杂。
#1-#2处的代码定义了线程池参数然后初始化了线程池,线程池只有异步任务会用到,还初始化了三个队列。
先来说三个队列,这三个队列的目的是为了对请求进行管理。
首先是两个异步的队列,由于线程池有上限,肯定不是所有任务都是正在执行,readyAsyncCalls就是用来放准备执行的异步请求,runningAsyncCalls 用来放正在执行的异步请求(已经添加到线程池当中的请求)。
同步任务都是同步执行的,也就不存在等待队列,故所有同步任务都是正在执行的任务,都放在runningSyncCalls 里面。这里注意下,同步任务并不是一定都在主线程开启,可能同时创建多个线程在这些线程里同时执行同步任务,所以同步任务也需要有一个队列来对请求进行管理。
executed
方法往runningSyncCalls队列添加一个RealCall同步请求。
enqueue
方法往readyAsyncCalls队列添加一个AsyncCall异步请求,然后调用promoteAndExecute
方法判断是否执行异步任务
promoteAndExecute
方法将缓存队列的请求加入到正在执行的队列中,然后调用asyncCall.executeOn(executorService)执行这些异步请求。
不论同步还是异步任务,最终都会调用getResponseWithInterceptorChain方法开始真正的网络请求