HttpClient连接池管理

系列

开篇

  • HttpClien中使用了连接池来管理持有连接,同一条TCP链路上,连接是可以复用的。
  • 这篇文章的主要目的是分析下HttpClient连接池管理的过程。


整体流程

连接池处理流程
  • 线程池管理的核心类是PoolingHttpClientConnectionManager、AbstractConnPool。

  • PoolingHttpClientConnectionManager类暴露相关的连接管理接口给外部调用,requestConnection方法返回一个匿名ConnectionRequest类。

  • ConnectionRequest持有一个Future,通过ConnectionRequest的get方法获取HttpClientConnection,实际上是通过Future获取一个CPoolEntry,再通过CPoolProxy.newProxy(CPoolEntry)就能返回一个代理的HttpClientConnection。

  • PoolingHttpClientConnectionManager请求和释放连接,实际上是通过AbstractConnPool类来实现的。

  • AbstractConnPool类通过两个维度来管理连接,分别是全局连接和单route连接。

源码分析

public class MainClientExec implements ClientExecChain {

    private final HttpRequestExecutor requestExecutor;
    // PoolingHttpClientConnectionManager的连接池管理
    private final HttpClientConnectionManager connManager;
    private final ConnectionReuseStrategy reuseStrategy;
    private final ConnectionKeepAliveStrategy keepAliveStrategy;

    @Override
    public CloseableHttpResponse execute(
            final HttpRoute route,
            final HttpRequestWrapper request,
            final HttpClientContext context,
            final HttpExecutionAware execAware) {

        // 根据route从connManager获取ConnectionRequest对象
        final ConnectionRequest connRequest = 
                         connManager.requestConnection(route, userToken);

        final HttpClientConnection managedConn;
        try {
            // 从connRequest中获取managedConn
            final int timeout = config.getConnectionRequestTimeout();
            managedConn = connRequest.get(
                  timeout > 0 ? timeout : 0, TimeUnit.MILLISECONDS);
        } catch(final ExecutionException ex) {
        }

        try {
            HttpResponse response;
            for (int execCount = 1;; execCount++) {
                // 通过HttpRequestExecutor并通过managedConn发送请求
                response = requestExecutor.execute(request, 
                        managedConn, context);
            }

            // check for entity, release connection if possible
            final HttpEntity entity = response.getEntity();

            if (entity == null || !entity.isStreaming()) {
                connHolder.releaseConnection();
                return new HttpResponseProxy(response, null);
            }

            return new HttpResponseProxy(response, connHolder);
        } catch (final Error error) {
        }
    }
}
  • 从连接池管理对象connManager当中获取ConnectionRequest对象。
  • 从ConnectionRequest对象中获取HttpClientConnection对象。
  • 通过HttpRequestExecutor并通过managedConn发送请求。
  • connManager的类型是PoolingHttpClientConnectionManager。
  • 核心的概念在于HttpClientConnection的对象的获取逻辑,继续分享connManager。


public class PoolingHttpClientConnectionManager
    implements HttpClientConnectionManager, 
               ConnPoolControl<HttpRoute>, Closeable {

    private final ConfigData configData;
    // 连接池管理的核心对象
    private final CPool pool;
    private final HttpClientConnectionOperator connectionOperator;
    private final AtomicBoolean isShutDown;

    public ConnectionRequest requestConnection(
            final HttpRoute route,
            final Object state) {
        // 从连接池中获取一个CPoolEntry(Connection的包装类)
        final Future<CPoolEntry> future = this.pool.lease(route, state, null);

        // 返回ConnectionRequest对象,内部通过leaseConnection获取HttpClientConnection
        return new ConnectionRequest() {

            @Override
            public boolean cancel() {
                return future.cancel(true);
            }

            // ConnectionRequest的get方法。调用leaseConnection方法,
            // 并且传入future(CPoolEntry的封装(connection的封装))
            @Override
            public HttpClientConnection get(
                    final long timeout,
                    final TimeUnit timeUnit) {

                final HttpClientConnection conn = 
                      leaseConnection(future, timeout, timeUnit);

                // 省略其他代码

                return conn;
            }
        };
    }

    protected HttpClientConnection leaseConnection(
            final Future<CPoolEntry> future,
            final long timeout,
            final TimeUnit timeUnit) {
        final CPoolEntry entry;
        try {
            entry = future.get(timeout, timeUnit);
            return CPoolProxy.newProxy(entry);
        } catch (final TimeoutException ex) {
        }
    }

}
  • PoolingHttpClientConnectionManager的核心思想在于CPool的lease方法返回CPoolEntry的Future对象。Future对象封装进行ConnectionRequest对象当中。
  • leaseConnection负责从CPoolEntry的future对象中获取CPoolEntry对象,创建为CPoolProxy对象后返回。
  • 核心的思路需要继续分析CPool的lease的过程。


abstract class RouteSpecificPool<T, C, E extends PoolEntry<T, C>> {
    // route维度连接管理数据结构
    private final T route;
    private final Set<E> leased;
    private final LinkedList<E> available;
    // 等待队列中的Future对象
    private final LinkedList<Future<E>> pending;
}

public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                      implements ConnPool<T, E>, ConnPoolControl<T> {

    private final Lock lock;
    private final Condition condition;
    private final ConnFactory<T, C> connFactory;
    // route维度连接管理数据结构
    private final Map<T, RouteSpecificPool<T, C, E>> routeToPool;
    // 全局维度连接管理数据结构
    private final Set<E> leased;
    private final LinkedList<E> available;
    // 全局等待队列中Future
    private final LinkedList<Future<E>> pending;

    public Future<E> lease(final T route, final Object state, final FutureCallback<E> callback) {

        return new Future<E>() {

            private final AtomicBoolean cancelled = new AtomicBoolean(false);
            private final AtomicBoolean done = new AtomicBoolean(false);
            private final AtomicReference<E> entryRef = new AtomicReference<E>(null);

            @Override
            public E get(final long timeout, final TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
                for (;;) {
                    synchronized (this) {
                        try {
                            final E entry = entryRef.get();
                            if (entry != null) {
                                return entry;
                            }

                            // 阻塞获取CPoolEntry        
                            final E leasedEntry = getPoolEntryBlocking(route, state, timeout, timeUnit, this);

                        } catch (final IOException ex) {
                        }
                    }
                }
            }

        };
    }
}
  • CPool继承自AbstractConnPool类。
  • AbstractConnPool类维持全局维度和route维度的连接管理数据结构
  • available、leased和pending,分别对应连接空闲,占用和堵塞三种状态。
  • getPoolEntryBlocking阻塞的获取leasedEntry对象。


public abstract class AbstractConnPool<T, C, E extends PoolEntry<T, C>>
                      implements ConnPool<T, E>, ConnPoolControl<T> {

    private E getPoolEntryBlocking(
            final T route, final Object state,
            final long timeout, final TimeUnit timeUnit,
            final Future<E> future) throws IOException, InterruptedException, ExecutionException, TimeoutException {

        Date deadline = null;
        if (timeout > 0) {
            deadline = new Date (System.currentTimeMillis() + timeUnit.toMillis(timeout));
        }
        // 加锁串行操作
        this.lock.lock();
        try {
            E entry;
            for (;;) {
                // 每一个route都有一个连接池,这里获取指定route的连接池
                final RouteSpecificPool<T, C, E> pool = getPool(route);
                // 循环取,直到超时
                for (;;) {
                    entry = pool.getFree(state);
                    // 为空则跳出循环
                    if (entry == null) {
                        break;
                    }
                    if (entry.isExpired(System.currentTimeMillis())) {
                        entry.close();
                    }
                    if (entry.isClosed()) {
                        this.available.remove(entry);
                        pool.free(entry, false);
                    } else {
                        break;
                    }
                }

                // entry不为空,则修改连接池的参数,并返回。
                if (entry != null) {
                    this.available.remove(entry);
                    this.leased.add(entry);
                    onReuse(entry);
                    return entry;
                }

                // 按需创建新连接,如果需要的话
                final int maxPerRoute = getMax(route);

                // 每个route对应的连接最大数量是可配置的,如果超过了,就需要通过LRU清理掉一些连接
                final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
                if (excess > 0) {
                    for (int i = 0; i < excess; i++) {
                        final E lastUsed = pool.getLastUsed();
                        if (lastUsed == null) {
                            break;
                        }
                        lastUsed.close();
                        this.available.remove(lastUsed);
                        pool.remove(lastUsed);
                    }
                }

                // 当前route池中的连接数,没有达到上线
                if (pool.getAllocatedCount() < maxPerRoute) {
                    final int totalUsed = this.leased.size();
                    final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
                    // 判断连接池是否超过上线,如果超过了,需要通过LRU清理掉一些连接
                    if (freeCapacity > 0) {
                        final int totalAvailable = this.available.size();
                        // 如果空闲连接数已经大于剩余可用空间,则需要清理下空闲连接 
                        if (totalAvailable > freeCapacity - 1) {
                            if (!this.available.isEmpty()) {
                                final E lastUsed = this.available.removeLast();
                                lastUsed.close();
                                final RouteSpecificPool<T, C, E> otherpool = getPool(lastUsed.getRoute());
                                otherpool.remove(lastUsed);
                            }
                        }
                        // 根据route建立一个连接
                        final C conn = this.connFactory.create(route);
                        // 将这个连接放入route对应的“小池”中
                        entry = pool.add(conn);
                        // 将这个连接放入“大池”中
                        this.leased.add(entry);
                        return entry;
                    }
                }
                // 到这里证明没有从获得route池中获得有效连接,
                // 并且想要自己建立连接时当前route连接池已经到达最大值
                boolean success = false;
                try {
                    // 将future放入route池中等待
                    pool.queue(future);
                    // 将future放入大连接池中等待
                    this.pending.add(future);
                    if (deadline != null) {
                        success = this.condition.awaitUntil(deadline);
                    } else {
                        this.condition.await();
                        success = true;
                    }
                    if (future.isCancelled()) {
                        throw new ExecutionException(operationAborted());
                    }
                } finally {
                    pool.unqueue(future);
                    this.pending.remove(future);
                }
                // check for spurious wakeup vs. timeout
                if (!success && (deadline != null && deadline.getTime() <= System.currentTimeMillis())) {
                    break;
                }
            }
            throw new TimeoutException("Timeout waiting for connection");
        } finally {
            this.lock.unlock();
        }
    }
}
  • 连接池有个最大连接数,每个route对应一个小连接池也有个最大连接数。
  • 不论是大连接池还是小连接池,当超过数量的时候,都要通过LRU释放一些连接。
  • 如果拿到了可用连接,则返回给上层使用。
  • 如果没有拿到可用连接,HttpClient会判断当前route连接池是否已经超过了最大数量,没有到上限就会新建一个连接,并放入池中。
  • 如果到达了上限就排队等待,等到了信号量就重新获得一次,等待不到就抛超时异常。
  • 通过线程池获取连接要通过ReetrantLock加锁,保证线程安全。


参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容