这是OKHttp的第三个拦截器 - ConnectInterceptor
,如果缓存判定失败,就会走到这里进行真正的网络连接了。
连接池
连接池是对网络连接的一种优化,当需要与服务进行连接的时候,会线程连接池里面查找有没有闲置的连接,如果有那么就直接使用,否则就重新创建一个连接并放入连接池。
constructor() : this(5, 5, TimeUnit.MINUTES)
constructor(
maxIdleConnections: Int,
keepAliveDuration: Long,
timeUnit: TimeUnit
) : this(RealConnectionPool(
taskRunner = TaskRunner.INSTANCE,
maxIdleConnections = maxIdleConnections,
keepAliveDuration = keepAliveDuration,
timeUnit = timeUnit
))
连接池和线程池有些相似
maxIdleConnections
: 最大闲置的连接数,可以看到,最大连接数为5个
maxIdleConnections
: 闲置连接最大存活时间,如果超过了存活时间,那么就会将连接关闭,默认5分钟
timeUnit
: 时间单位,默认是分钟
建立连接
object ConnectInterceptor : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
val realChain = chain as RealInterceptorChain
val exchange = realChain.call.initExchange(chain)
val connectedChain = realChain.copy(exchange = exchange)
return connectedChain.proceed(realChain.request)
}
}
这个类看似只有几行代码,其实大部分逻辑都封装到其他类里面了,首先我们回顾一下重试和重定向里面的一段代码:
class RetryAndFollowUpInterceptor(private val client: OkHttpClient) : Interceptor {
@Throws(IOException::class)
override fun intercept(chain: Interceptor.Chain): Response {
// ................
while (true) {
call.enterNetworkInterceptorExchange(request, newExchangeFinder)
}
// // ................
}
// ................
}
每一次进行网络请求都会走到RealCall.enterNetworkInterceptorExchange()
方法:
fun enterNetworkInterceptorExchange(request: Request, newExchangeFinder: Boolean) {
check(interceptorScopedExchange == null)
check(exchange == null) {
"cannot make a new request because the previous response is still open: " +
"please call response.close()"
}
if (newExchangeFinder) {
this.exchangeFinder = ExchangeFinder(
connectionPool,
createAddress(request.url),
this,
eventListener
)
}
}
可以看到,这里面只初始化了一个exchangeFinder
对象,其实这个对象是用来连接的关键所在,在连接拦截器里面才用到了。
val exchange = realChain.call.initExchange(chain)
网络连接的一系列操作都封装到了Exchange
对象中,当一个请求发出,需要建立连接,连接建立后需要使用流用来读写数据;而这个Exchange
就是协调请求、连接与数据流三者之间的关系,它负责为一次请求寻找连接,然后获得流来实现网络通信。
// RealCall.initExchange()
internal fun initExchange(chain: RealInterceptorChain): Exchange {
synchronized(connectionPool) {
check(!noMoreExchanges) { "released" }
check(exchange == null)
}
// HttpCodec
val codec = exchangeFinder!!.find(client, chain)
val result = Exchange(this, eventListener, exchangeFinder!!, codec)
this.interceptorScopedExchange = result
synchronized(connectionPool) {
this.exchange = result
this.exchangeRequestDone = false
this.exchangeResponseDone = false
return result
}
}
这里使用的 exchangeFinder!!.find()
方法实际上就是去查找或者建立一个与请求主机有效的连接,返回值为ExchangeCodec
,包含了输入输出流,并且封装了对HTTP请求报文的编码与解码,直接使用它就能够与请求主机完成HTTP通信。Http1.x
和Http2.x
实现有所不同,可以看到它最终的两个实现类的创建http2Connection
和Http1ExchangeCodec
:
fun find(
client: OkHttpClient,
chain: RealInterceptorChain
): ExchangeCodec {
try {
// 建立连接
val resultConnection = findHealthyConnection(
connectTimeout = chain.connectTimeoutMillis,
readTimeout = chain.readTimeoutMillis,
writeTimeout = chain.writeTimeoutMillis,
pingIntervalMillis = client.pingIntervalMillis,
connectionRetryEnabled = client.retryOnConnectionFailure,
doExtensiveHealthChecks = chain.request.method != "GET"
)
// 创建HttpCodec
return resultConnection.newCodec(client, chain)
} catch (e: RouteException) {
trackFailure(e.lastConnectException)
throw e
} catch (e: IOException) {
trackFailure(e)
throw RouteException(e)
}
}
@Throws(SocketException::class)
internal fun newCodec(client: OkHttpClient, chain: RealInterceptorChain): ExchangeCodec {
val socket = this.socket!!
val source = this.source!!
val sink = this.sink!!
val http2Connection = this.http2Connection
return if (http2Connection != null) {
Http2ExchangeCodec(client, this, chain, http2Connection)
} else {
socket.soTimeout = chain.readTimeoutMillis()
source.timeout().timeout(chain.readTimeoutMillis.toLong(), MILLISECONDS)
sink.timeout().timeout(chain.writeTimeoutMillis.toLong(), MILLISECONDS)
Http1ExchangeCodec(client, this, source, sink)
}
}
接下来我们看看resultConnection
怎么创建的,跟踪代码:
findHealthyConnection() -> findConnection()
@Throws(IOException::class)
private fun findConnection(
connectTimeout: Int,
readTimeout: Int,
writeTimeout: Int,
pingIntervalMillis: Int,
connectionRetryEnabled: Boolean
): RealConnection {
var foundPooledConnection = false
var result: RealConnection? = null
var selectedRoute: Route? = null
var releasedConnection: RealConnection?
val toClose: Socket?
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
val callConnection = call.connection // changes within this overall method
releasedConnection = callConnection
toClose = if (callConnection != null && (callConnection.noNewExchanges ||
!sameHostAndPort(callConnection.route().address.url))) {
call.releaseConnectionNoEvents()
} else {
null
}
if (call.connection != null) {
// We had an already-allocated connection and it's good.
result = call.connection
releasedConnection = null
}
if (result == null) {
// The connection hasn't had any problems for this call.
refusedStreamCount = 0
connectionShutdownCount = 0
otherFailureCount = 0
// Attempt to get a connection from the pool.
if (connectionPool.callAcquirePooledConnection(address, call, null, false)) {
foundPooledConnection = true
result = call.connection
} else if (nextRouteToTry != null) {
selectedRoute = nextRouteToTry
nextRouteToTry = null
}
}
}
toClose?.closeQuietly()
if (releasedConnection != null) {
eventListener.connectionReleased(call, releasedConnection!!)
}
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
}
if (result != null) {
// If we found an already-allocated or pooled connection, we're done.
return result!!
}
// If we need a route selection, make one. This is a blocking operation.
var newRouteSelection = false
if (selectedRoute == null && (routeSelection == null || !routeSelection!!.hasNext())) {
var localRouteSelector = routeSelector
if (localRouteSelector == null) {
localRouteSelector = RouteSelector(address, call.client.routeDatabase, call, eventListener)
this.routeSelector = localRouteSelector
}
newRouteSelection = true
routeSelection = localRouteSelector.next()
}
var routes: List<Route>? = null
synchronized(connectionPool) {
if (call.isCanceled()) throw IOException("Canceled")
if (newRouteSelection) {
// Now that we have a set of IP addresses, make another attempt at getting a connection from
// the pool. This could match due to connection coalescing.
routes = routeSelection!!.routes
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
}
if (!foundPooledConnection) {
if (selectedRoute == null) {
selectedRoute = routeSelection!!.next()
}
// Create a connection and assign it to this allocation immediately. This makes it possible
// for an asynchronous cancel() to interrupt the handshake we're about to do.
result = RealConnection(connectionPool, selectedRoute!!)
connectingConnection = result
}
}
// If we found a pooled connection on the 2nd time around, we're done.
if (foundPooledConnection) {
eventListener.connectionAcquired(call, result!!)
return result!!
}
// Do TCP + TLS handshakes. This is a blocking operation.
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
call.client.routeDatabase.connected(result!!.route())
var socket: Socket? = null
synchronized(connectionPool) {
connectingConnection = null
// Last attempt at connection coalescing, which only occurs if we attempted multiple
// concurrent connections to the same host.
if (connectionPool.callAcquirePooledConnection(address, call, routes, true)) {
// We lost the race! Close the connection we created and return the pooled connection.
result!!.noNewExchanges = true
socket = result!!.socket()
result = call.connection
// It's possible for us to obtain a coalesced connection that is immediately unhealthy. In
// that case we will retry the route we just successfully connected with.
nextRouteToTry = selectedRoute
} else {
connectionPool.put(result!!)
call.acquireConnectionNoEvents(result!!)
}
}
socket?.closeQuietly()
eventListener.connectionAcquired(call, result!!)
return result!!
}
上面这么多代码不需要太多的关注,总体上就是判断了connection是否可复用,如果可以复用直接使用call.connection
即可,否则的话创建一个新的连接result = RealConnection(connectionPool, selectedRoute!!)
,找到了result之后进行三次握手连接:
result!!.connect(
connectTimeout,
readTimeout,
writeTimeout,
pingIntervalMillis,
connectionRetryEnabled,
call,
eventListener
)
连接复用
// 判断连接池是否有可复用的连接
if (connectionPool.callAcquirePooledConnection(address, call, routes, false)) {
foundPooledConnection = true
result = call.connection
}
最终可以看到满足条件就返回RealConnection
:
fun callAcquirePooledConnection(
address: Address,
call: RealCall,
routes: List<Route>?,
requireMultiplexed: Boolean
): Boolean {
this.assertThreadHoldsLock()
for (connection in connections) {
if (requireMultiplexed && !connection.isMultiplexed) continue
if (!connection.isEligible(address, routes)) continue
call.acquireConnectionNoEvents(connection)
return true
}
return false
}
看一下if (!connection.isEligible(address, routes))
这个条件, 遍历所有的连接,如果满足有条件的返回true,否则返回false
internal fun isEligible(address: Address, routes: List<Route>?): Boolean {
// If this connection is not accepting new exchanges, we're done.
if (calls.size >= allocationLimit || noNewExchanges) return false
// If the non-host fields of the address don't overlap, we're done.
if (!this.route.address.equalsNonHost(address)) return false
// If the host exactly matches, we're done: this connection can carry the address.
if (address.url.host == this.route().address.url.host) {
return true // This connection is a perfect match.
}
// At this point we don't have a hostname match. But we still be able to carry the request if
// our connection coalescing requirements are met. See also:
// https://hpbn.co/optimizing-application-delivery/#eliminate-domain-sharding
// https://daniel.haxx.se/blog/2016/08/18/http2-connection-coalescing/
// 1. This connection must be HTTP/2.
if (http2Connection == null) return false
// 2. The routes must share an IP address.
if (routes == null || !routeMatchesAny(routes)) return false
// 3. This connection's server certificate's must cover the new host.
if (address.hostnameVerifier !== OkHostnameVerifier) return false
if (!supportsUrl(address.url)) return false
// 4. Certificate pinning must match the host.
try {
address.certificatePinner!!.check(address.url.host, handshake()!!.peerCertificates)
} catch (_: SSLPeerUnverifiedException) {
return false
}
return true // The caller's address can be carried by this connection.
}
-
if (calls.size >= allocationLimit || noNewExchanges) return false
连接到达最大并发流或者连接不允许建立新的流;如http1.x正在使用的连接不能给其他人用(最大并发流为:1)或者连接被关闭;那就不允许复用;
-
if (!this.route.address.equalsNonHost(address)) return false if (address.url.host == this.route().address.url.host) { return true // This connection is a perfect match. }
DNS、代理、SSL证书、服务器域名、端口完全相同则可复用;
如果上述条件都不满足,在HTTP/2的某些场景下可能仍可以复用(http2先不管)。
所以综上,如果在连接池中找到个连接参数一致并且未被关闭没被占用的连接,则可以复用。
总结
这个拦截器中的所有实现都是为了获得一份与目标服务器的连接,在这个连接上进行HTTP数据的收发。
当连接完成,就会继续下一个拦截器CallServerInterceptor
。