系列
开篇
- HttpClien中使用了连接池来管理持有连接,同一条TCP链路上,连接是可以复用的。
- 这篇文章的主要目的是分析下HttpClient连接池管理的过程。
整体流程
线程池管理的核心类是PoolingHttpClientConnectionManager、AbstractConnPool。
PoolingHttpClientConnectionManager类暴露相关的连接管理接口给外部调用,requestConnection方法返回一个匿名ConnectionRequest类。
ConnectionRequest持有一个Future,通过ConnectionRequest的get方法获取HttpClientConnection,实际上是通过Future获取一个CPoolEntry,再通过CPoolProxy.newProxy(CPoolEntry)就能返回一个代理的HttpClientConnection。
PoolingHttpClientConnectionManager请求和释放连接,实际上是通过AbstractConnPool类来实现的。
AbstractConnPool类通过两个维度来管理连接,分别是全局连接和单route连接。
源码分析
public class MainClientExec implements ClientExecChain {
private final HttpRequestExecutor requestExecutor;
// PoolingHttpClientConnectionManager的连接池管理
private final HttpClientConnectionManager connManager;
private final ConnectionReuseStrategy reuseStrategy;
private final ConnectionKeepAliveStrategy keepAliveStrategy;
@Override
public CloseableHttpResponse execute(
final HttpRoute route,
final HttpRequestWrapper request,
final HttpClientContext context,
final HttpExecutionAware execAware) {
// 根据route从connManager获取ConnectionRequest对象
final ConnectionRequest connRequest =
connManager.requestConnection(route, userToken);
final HttpClientConnection managedConn;
try {
// 从connRequest中获取managedConn
final int timeout = config.getConnectionRequestTimeout();
managedConn = connRequest.get(
timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
} catch(final ExecutionException ex) {
}
try {
HttpResponse response;
for (int execCount = 1;; execCount++) {
// 通过HttpRequestExecutor并通过managedConn发送请求
response = requestExecutor.execute(request,
managedConn, context);
}
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
connHolder.releaseConnection();
return new HttpResponseProxy(response, null);
}
return new HttpResponseProxy(response, connHolder);
} catch (final Error error) {
}
}
}
- 从连接池管理对象connManager当中获取ConnectionRequest对象。
- 从ConnectionRequest对象中获取HttpClientConnection对象。
- 通过HttpRequestExecutor并通过managedConn发送请求。
- connManager的类型是PoolingHttpClientConnectionManager。
- 核心的概念在于HttpClientConnection的对象的获取逻辑,继续分享connManager。
public class PoolingHttpClientConnectionManager
implements HttpClientConnectionManager,
ConnPoolControl<HttpRoute>, Closeable {
private final ConfigData configData;
// 连接池管理的核心对象
private final CPool pool;
private final HttpClientConnectionOperator connectionOperator;
private final AtomicBoolean isShutDown;
public ConnectionRequest requestConnection(
final HttpRoute route,
final Object state) {
// 从连接池中获取一个CPoolEntry(Connection的包装类)
final Future<CPoolEntry> future = this.pool.lease(route, state, null);
// 返回ConnectionRequest对象,内部通过leaseConnection获取HttpClientConnection
return new ConnectionRequest() {
@Override
public boolean cancel() {
return future.cancel(true);
}
// ConnectionRequest的get方法。调用leaseConnection方法,
// 并且传入future(CPoolEntry的封装(connection的封装))
@Override
public HttpClientConnection get(
final long timeout,
final TimeUnit timeUnit) {
final HttpClientConnection conn =
leaseConnection(future, timeout, timeUnit);
// 省略其他代码
return conn;
}
};
}
protected HttpClientConnection leaseConnection(
final Future<CPoolEntry> future,
final long timeout,
final TimeUnit timeUnit) {
final CPoolEntry entry;
try {
entry = future.get(timeout, timeUnit);
return CPoolProxy.newProxy(entry);
} catch (final TimeoutException ex) {
}
}
}
- PoolingHttpClientConnectionManager的核心思想在于CPool的lease方法返回CPoolEntry的Future对象。Future对象封装进行ConnectionRequest对象当中。
- leaseConnection负责从CPoolEntry的future对象中获取CPoolEntry对象,创建为CPoolProxy对象后返回。
- 核心的思路需要继续分析CPool的lease的过程。
abstract class RouteSpecificPool<T, C, E extends PoolEntry<T, C>> {
// route维度连接管理数据结构
private final T route;
private final Set<E> leased;
private final LinkedList<E> available;
// 等待队列中的Future对象
private final LinkedList<Future<E>> pending;
}
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
private final Lock lock;
private final Condition condition;
private final ConnFactory<T, C> connFactory;
// route维度连接管理数据结构
private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
// 全局维度连接管理数据结构
private final Set<E> leased;
private final LinkedList<E> available;
// 全局等待队列中Future
private final LinkedList<Future<E>> pending;
public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {
return new Future<E>() {
private final AtomicBoolean cancelled = new AtomicBoolean(false);
private final AtomicBoolean done = new AtomicBoolean(false);
private final AtomicReference<E> entryRef = new AtomicReference<E>(null);
@Override
public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
for (;;) {
synchronized (this) {
try {
final E entry = entryRef.get();
if (entry != null) {
return entry;
}
// 阻塞获取CPoolEntry
final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);
} catch (final IOException ex) {
}
}
}
}
};
}
}
- CPool继承自AbstractConnPool类。
- AbstractConnPool类维持全局维度和route维度的连接管理数据结构
- available、leased和pending,分别对应连接空闲,占用和堵塞三种状态。
- getPoolEntryBlocking阻塞的获取leasedEntry对象。
public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
implements ConnPool<T, E>, ConnPoolControl<T> {
private E getPoolEntryBlocking(
final T route, final Object state,
final long timeout, final TimeUnit timeUnit,
final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {
Date deadline = null;
if (timeout > 0) {
deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
}
// 加锁串行操作
this.lock.lock();
try {
E entry;
for (;;) {
// 每一个route都有一个连接池,这里获取指定route的连接池
final RouteSpecificPool<T, C, E> pool = getPool(route);
// 循环取,直到超时
for (;;) {
entry = pool.getFree(state);
// 为空则跳出循环
if (entry == null) {
break;
}
if (entry.isExpired(System.currentTimeMillis())) {
entry.close();
}
if (entry.isClosed()) {
this.available.remove(entry);
pool.free(entry, false);
} else {
break;
}
}
// entry不为空,则修改连接池的参数,并返回。
if (entry != null) {
this.available.remove(entry);
this.leased.add(entry);
onReuse(entry);
return entry;
}
// 按需创建新连接,如果需要的话
final int maxPerRoute = getMax(route);
// 每个route对应的连接最大数量是可配置的,如果超过了,就需要通过LRU清理掉一些连接
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
if (excess > 0) {
for (int i = 0; i < excess; i++) {
final E lastUsed = pool.getLastUsed();
if (lastUsed == null) {
break;
}
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
}
}
// 当前route池中的连接数,没有达到上线
if (pool.getAllocatedCount() < maxPerRoute) {
final int totalUsed = this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
// 判断连接池是否超过上线,如果超过了,需要通过LRU清理掉一些连接
if (freeCapacity > 0) {
final int totalAvailable = this.available.size();
// 如果空闲连接数已经大于剩余可用空间,则需要清理下空闲连接
if (totalAvailable > freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
}
}
// 根据route建立一个连接
final C conn = this.connFactory.create(route);
// 将这个连接放入route对应的“小池”中
entry = pool.add(conn);
// 将这个连接放入“大池”中
this.leased.add(entry);
return entry;
}
}
// 到这里证明没有从获得route池中获得有效连接,
// 并且想要自己建立连接时当前route连接池已经到达最大值
boolean success = false;
try {
// 将future放入route池中等待
pool.queue(future);
// 将future放入大连接池中等待
this.pending.add(future);
if (deadline != null) {
success = this.condition.awaitUntil(deadline);
} else {
this.condition.await();
success = true;
}
if (future.isCancelled()) {
throw new ExecutionException(operationAborted());
}
} finally {
pool.unqueue(future);
this.pending.remove(future);
}
// check for spurious wakeup vs. timeout
if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
break;
}
}
throw new TimeoutException("Timeout waiting for connection");
} finally {
this.lock.unlock();
}
}
}
- 连接池有个最大连接数,每个route对应一个小连接池也有个最大连接数。
- 不论是大连接池还是小连接池,当超过数量的时候,都要通过LRU释放一些连接。
- 如果拿到了可用连接,则返回给上层使用。
- 如果没有拿到可用连接,HttpClient会判断当前route连接池是否已经超过了最大数量,没有到上限就会新建一个连接,并放入池中。
- 如果到达了上限就排队等待,等到了信号量就重新获得一次,等待不到就抛超时异常。
- 通过线程池获取连接要通过ReetrantLock加锁,保证线程安全。