val client = OkHttpClient.Builder().build()
val request = Request.Builder().get().url("https://www.baidu.com/").build()
// 同步请求
val response:Response = client.newCall(request).execute()
Log.e("===",response.body?.string()?:"Null")
//异步请求
client.newCall(request).enqueue(object : Callback {
override fun onFailure(call: Call, e: IOException) {}
override fun onResponse(call: Call, response: Response) {}
})
1、OkHttpClient 的创建
val client = OkHttpClient.Builder().build()
通过 Builder 模式设置OkHttpClient 的参数,比如超时、dns等,可以设置 dns 解决 IPV6 访问慢问题解决 解决方案
2、创建 Request
- 设置 url url(url: String)
- 设置 header addHeader(name: String, value: String)
- 设置 method
- get() = method("GET", null)
- post(body: RequestBody) = method("POST", body)
- method(method: String, body: RequestBody?)
- 设置 body method(method: String, body: RequestBody?)
3、创建 RealCall
用来包裹一个请求,这个请求可以被取消,一个请求只能被执行一次
/**
* 用来包裹一个请求,这个请求可以被取消,一个请求只能被执行一次
*/
interface Call : Cloneable {
/** 真正请求包含 url、header、method、body */
fun request(): Request
/**
* 同步请求 为了防止内存泄露 需要关闭 Response
*/
@Throws(IOException::class)
fun execute(): Response
/**
* 异步请求
* @param responseCallback 请求回调
*/
fun enqueue(responseCallback: Callback)
/**
* 取消请求,但是已经执行完成的请求无法取消
*/
fun cancel()
/**
* 如果执行了 executed 或 enqueued 返回 true
* 如果一个 call 被执行的次数多余1次就会抛出异常
*/
fun isExecuted(): Boolean
/**
* 判定 这个call 是否被被取消了
*/
fun isCanceled(): Boolean
/**
* 通过 OkHttpClient.Builder.callTimeout 设置
* 包含 DNS解析、链接、写入请求、服务器处理、读取响应
* 如果包含重定向和重试 这些用时也包含其中
*/
fun timeout(): Timeout
/**
* 重新复制一个新的 call ,可以被再次执行
*/
public override fun clone(): Call
/**
* 创建一个新的Call
*/
fun interface Factory {
fun newCall(request: Request): Call
}
}
4、同步流程
override fun execute(): Response {
// CAS 判定这个 Call 是否被执行,如果被执行就 返回异常
check(executed.compareAndSet(false, true)) { "Already Executed" }
timeout.enter() //超时计时开始
callStart() //回调请求监听器的请求开始
try {
client.dispatcher.executed(this) //放入队列
return getResponseWithInterceptorChain() // 执行请求获取结果
} finally {
client.dispatcher.finished(this) //请求结束
}
}
- 这是一个同步的方法,无论多少线程调用 execute 方法都被阻塞掉
- runningSyncCalls 是一个双向列表
@Synchronized
internal fun executed(call: RealCall) {
runningSyncCalls.add(call)
}
internal fun finished(call: RealCall) {
finished(runningSyncCalls, call)
}
//异步、同步的结束都会走到这里:从running中移除,并调用promoteAndExecute
private fun <T> finished(calls: Deque<T>, call: T) {
val idleCallback: Runnable?
synchronized(this) {
//从队列中移除
if (!calls.remove(call)) throw AssertionError("Call wasn't in-flight!")
idleCallback = this.idleCallback
}
val isRunning = promoteAndExecute() //当一个请求执行完成 会调用这个方法 执行新的任务
//如果异步调用都执行完成 网络空隙的回调
// 通过 client.dispatcher.idleCallback 可以设置回调
if (!isRunning && idleCallback != null) {
idleCallback.run()
}
}
5、异步流程
override fun enqueue(responseCallback: Callback) {
check(executed.compareAndSet(false, true)) { "Already Executed" }
callStart() //回调请求监听器的请求开始
client.dispatcher.enqueue(AsyncCall(responseCallback)) //请求调度
}
// 继承自 Runable 在线程池中执行
internal inner class AsyncCall(
private val responseCallback: Callback // 执行完成的回调函数
) : Runnable {
//相同的 host 的请求通用一个 AtomicInteger 一个 socket 最多有5个链接
@Volatile
var callsPerHost = AtomicInteger(0)
private set
fun reuseCallsPerHostFrom(other: AsyncCall) {
this.callsPerHost = other.callsPerHost
}
val host: String
get() = originalRequest.url.host
val request: Request
get() = originalRequest
val call: RealCall
get() = this@RealCall
/**
* 通过线程池执行
*/
fun executeOn(executorService: ExecutorService) {
client.dispatcher.assertThreadDoesntHoldLock()
var success = false
try {
executorService.execute(this)
success = true
} catch (e: RejectedExecutionException) {
val ioException = InterruptedIOException("executor rejected")
ioException.initCause(e)
noMoreExchanges(ioException)
responseCallback.onFailure(this@RealCall, ioException)
} finally {
if (!success) {
client.dispatcher.finished(this) // This call is no longer running!
}
}
}
/**
* 执行的方法
*/
override fun run() {
threadName("OkHttp ${redactedUrl()}") {
var signalledCallback = false
timeout.enter()
try {
val response = getResponseWithInterceptorChain() // 和同步RealCall的方法一直
signalledCallback = true
responseCallback.onResponse(this@RealCall, response)
} catch (e: IOException) {
if (signalledCallback) {
// Do not signal the callback twice!
Platform.get().log("Callback failure for ${toLoggableString()}", Platform.INFO, e)
} else {
responseCallback.onFailure(this@RealCall, e)
}
} catch (t: Throwable) {
cancel()
if (!signalledCallback) {
val canceledException = IOException("canceled due to $t")
canceledException.addSuppressed(t)
responseCallback.onFailure(this@RealCall, canceledException)
}
throw t
} finally {
client.dispatcher.finished(this)
}
}
}
}
private fun promoteAndExecute(): Boolean {
this.assertThreadDoesntHoldLock()
val executableCalls = mutableListOf<AsyncCall>()
val isRunning: Boolean
synchronized(this) {
val i = readyAsyncCalls.iterator()
while (i.hasNext()) {
val asyncCall = i.next() //取出一个 Call
// runningAsyncCalls 的个数大于64 就停止加入
if (runningAsyncCalls.size >= this.maxRequests) break
// 一个socket 最多有 5个请求 如果大于5个请求就会继续添加
if (asyncCall.callsPerHost.get() >= this.maxRequestsPerHost) continue // Host max capacity.
i.remove() //从 readyAsyncCalls 中移除
asyncCall.callsPerHost.incrementAndGet() //callsPerHost 增加
executableCalls.add(asyncCall)
runningAsyncCalls.add(asyncCall) //加入到 执行中的列表
}
isRunning = runningCallsCount() > 0
}
for (i in 0 until executableCalls.size) {
val asyncCall = executableCalls[i]
asyncCall.executeOn(executorService) // 放入到线程池中执行
}
return isRunning //返回执行中的 call 不为空 不是空标识 有任务需要执行
}
6、getResponseWithInterceptorChain
无论是异步请求还是同步请求都会走到 RealCall中的这个方法,是责任链模式
@Throws(IOException::class)
internal fun getResponseWithInterceptorChain(): Response {
// Build a full stack of interceptors.
val interceptors = mutableListOf<Interceptor>()
//使用者配置的 应用拦截器,最先拦截 使用 OkHttpClient.Builder().addInterceptor(interceptor: Interceptor)
interceptors += client.interceptors
interceptors += RetryAndFollowUpInterceptor(client) //重试跟进拦截器
interceptors += BridgeInterceptor(client.cookieJar) //桥拦截器
interceptors += CacheInterceptor(client.cache) //缓存拦截器
interceptors += ConnectInterceptor //连接拦截器
if (!forWebSocket) {
//使用者配置的网络拦截器 OkHttpClient.Builder().addNetworkInterceptor(interceptor: Interceptor)
interceptors += client.networkInterceptors
}
interceptors += CallServerInterceptor(forWebSocket) //请求服务拦截器
//拦截器链
val chain = RealInterceptorChain(
call = this,
interceptors = interceptors,
index = 0,
exchange = null,
request = originalRequest,
connectTimeoutMillis = client.connectTimeoutMillis,
readTimeoutMillis = client.readTimeoutMillis,
writeTimeoutMillis = client.writeTimeoutMillis
)
var calledNoMoreExchanges = false
try {
val response = chain.proceed(originalRequest)
if (isCanceled()) {
response.closeQuietly()
throw IOException("Canceled")
}
return response
} catch (e: IOException) {
calledNoMoreExchanges = true
throw noMoreExchanges(e) as Throwable
} finally {
if (!calledNoMoreExchanges) {
noMoreExchanges(null)
}
}
}