连接池commons-pool源码学习

上一篇简单的hello world

ReaderUtil readerUtil = new ReaderUtil(new GenericObjectPool<StringBuffer>(new StringBufferFactory()));

理解commons pool,需要了解主要类,接口

  • PooledObject : 池化对象,在对象的基础上,添加对象的属性,方法,方便判断对象的可用状
  • ObjectPool : 对象池,利用它管理,获取对象
  • PoolableObjectFactory : 可池化对象的维护工厂
  • GenericObjectPoolConfig : 连接池配置
  • LinkedBlockingDeque : 线程安全的阻塞队列数据结构
  • Evictor : 驱逐者线程,当该线程启动的是否,负责判断队列中的对象是否需要驱逐,驱逐完如果空闲对象数量小于最小可以使用的数量,维持最小的idel个对象,就创建等于最小数量的对象数

一步步来吧

  • PooledObject : 池化对象,在对象的基础上,添加对象的属性,方法,方便判断对象的可用状态,比如池化StringBuffer字符操作对象
PooledObject pooledObject = new DefaultPooledObject(new StringBuffer());

public DefaultPooledObject(final T object) {
    //真正操作的还是StringBuffer对象,只是为了方便维护,让DefaultPooledObject装饰一番
    this.object = object;
}

ObjectPool : 对象池,利用它管理,获取对象,看看主要方法

ObjectPool接口实现GenericObjectPool

public GenericObjectPool(final PooledObjectFactory<T> factory,
            final GenericObjectPoolConfig config) {

        super(config, ONAME_BASE, config.getJmxNamePrefix());

        if (factory == null) {
            jmxUnregister(); // tidy up
            throw new IllegalArgumentException("factory may not be null");
        }
        this.factory = factory;
        //支持阻塞的线程安全队列
        idleObjects = new LinkedBlockingDeque<PooledObject<T>>(config.getFairness());
        //设置连接池配置
        setConfig(config);
        //是否启动驱逐线程
        startEvictor(getTimeBetweenEvictionRunsMillis());
    }

主要方法

//从空闲队列LinkedBlockingDeque获取连接对象
T borrowObject() throws Exception, NoSuchElementException,
            IllegalStateException;

//把使用完的对象归还到空闲队列LinkedBlockingDeque
void returnObject(T obj) throws Exception;

//是否启动驱逐线程
final void startEvictor(final long delay);
获取连接对象
public T borrowObject(final long borrowMaxWaitMillis) throws Exception {
        assertOpen();

        final AbandonedConfig ac = this.abandonedConfig;
        //如果配置了遗弃,当前空闲队列对象数量小于2,并且正在使用中的对象数量大于(池中最多对象数量-3)
        if (ac != null && ac.getRemoveAbandonedOnBorrow() &&
                (getNumIdle() < 2) &&
                (getNumActive() > getMaxTotal() - 3) ) {
            //正在使用状态的,并且使用时间超过配置时间还没有归还的对象,则销毁
            removeAbandoned(ac);
        }

        PooledObject<T> p = null;

        // Get local copy of current config so it is consistent for entire
        // method execution
        final boolean blockWhenExhausted = getBlockWhenExhausted();

        boolean create;
        final long waitTime = System.currentTimeMillis();

        while (p == null) {
            create = false;
            p = idleObjects.pollFirst();
            if (p == null) {
                p = create();
                if (p != null) {
                    create = true;
                }
            }
            if (blockWhenExhausted) {
                if (p == null) {
                    //如果没有设置获取等待超时就一直等待,直到有对象可以获取
                    if (borrowMaxWaitMillis < 0) {
                        p = idleObjects.takeFirst();
                    } else {
                        //设置了获取超时时间,如果超过设置时间还没有获取到,直接返回null
                        p = idleObjects.pollFirst(borrowMaxWaitMillis,
                                TimeUnit.MILLISECONDS);
                    }
                }
                if (p == null) {
                    throw new NoSuchElementException(
                            "Timeout waiting for idle object");
                }
            } else {
                if (p == null) {
                    throw new NoSuchElementException("Pool exhausted");
                }
            }
            if (!p.allocate()) {
                p = null;
            }

            if (p != null) {
                try {
                    //激活对象
                    factory.activateObject(p);
                } catch (final Exception e) {
                    try {
                        //激活失败,销毁对象
                        destroy(p);
                    } catch (final Exception e1) {
                        // Ignore - activation failure is more important
                    }
                    p = null;
                    if (create) {
                        final NoSuchElementException nsee = new NoSuchElementException(
                                "Unable to activate object");
                        nsee.initCause(e);
                        throw nsee;
                    }
                }
                if (p != null && (getTestOnBorrow() || create && getTestOnCreate())) {
                    boolean validate = false;
                    Throwable validationThrowable = null;
                    try {
                        //检查对象是否有效可用
                        validate = factory.validateObject(p);
                    } catch (final Throwable t) {
                        PoolUtils.checkRethrow(t);
                        validationThrowable = t;
                    }
                    if (!validate) {
                        try {
                            //无效则销毁
                            destroy(p);
                            destroyedByBorrowValidationCount.incrementAndGet();
                        } catch (final Exception e) {
                            // Ignore - validation failure is more important
                        }
                        p = null;
                        if (create) {
                            final NoSuchElementException nsee = new NoSuchElementException(
                                    "Unable to validate object");
                            nsee.initCause(validationThrowable);
                            throw nsee;
                        }
                    }
                }
            }
        }

        updateStatsBorrow(p, System.currentTimeMillis() - waitTime);

        return p.getObject();
    }
removeAbandoned销毁状态在使用中,超过一段时间没有归还的对象
    private void removeAbandoned(final AbandonedConfig ac) {
        // Generate a list of abandoned objects to remove
        final long now = System.currentTimeMillis();
        final long timeout =
                now - (ac.getRemoveAbandonedTimeout() * 1000L);
        final ArrayList<PooledObject<T>> remove = new ArrayList<PooledObject<T>>();
        final Iterator<PooledObject<T>> it = allObjects.values().iterator();
        while (it.hasNext()) {
            final PooledObject<T> pooledObject = it.next();
            synchronized (pooledObject) {
                if (pooledObject.getState() == PooledObjectState.ALLOCATED &&
                        pooledObject.getLastUsedTime() <= timeout) {
                    pooledObject.markAbandoned();
                    remove.add(pooledObject);
                }
            }
        }

        // Now remove the abandoned objects
        final Iterator<PooledObject<T>> itr = remove.iterator();
        while (itr.hasNext()) {
            final PooledObject<T> pooledObject = itr.next();
            if (ac.getLogAbandoned()) {
                pooledObject.printStackTrace(ac.getLogWriter());
            }
            try {
                invalidateObject(pooledObject.getObject());
            } catch (final Exception e) {
                e.printStackTrace();
            }
        }
    }
归还连接
    public void returnObject(final T obj) {
        //从所有对象池中获取返回的对象
        final PooledObject<T> p = allObjects.get(new IdentityWrapper<T>(obj));

        if (p == null) {
            //如果没有遗弃配置AbandonedConfig,抛出异常,有则直接返回
            if (!isAbandonedConfig()) {
                throw new IllegalStateException(
                        "Returned object not currently part of this pool");
            }
            return; // Object was abandoned and removed
        }

        synchronized(p) {
            final PooledObjectState state = p.getState();
            //判断对象状态是否是正在使用,如果不是抛出异常,是则修改对象状态为正在归还,防止被遗弃
            if (state != PooledObjectState.ALLOCATED) {
                throw new IllegalStateException(
                        "Object has already been returned to this pool or is invalid");
            }
            p.markReturning(); // Keep from being marked abandoned
        }

        final long activeTime = p.getActiveTimeMillis();

        if (getTestOnReturn()) {
            //如果对象无效
            if (!factory.validateObject(p)) {
                try {
                    //销毁对象,在空闲队列,所有集合中剔除对象,并且更新销毁对象数量,创建对象数量
                    destroy(p);
                } catch (final Exception e) {
                    swallowException(e);
                }
                try {
                    //试图确保空闲池中存在有可用的实例
                    ensureIdle(1, false);
                } catch (final Exception e) {
                    swallowException(e);
                }
                updateStatsReturn(activeTime);
                return;
            }
        }

        try {
            //钝化对象,下次之前可以再复用该对象,比如对象是StringBuffer,可以用setLength(0)清空
            factory.passivateObject(p);
        } catch (final Exception e1) {
            swallowException(e1);
            try {
                //同上
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
            try {
                //同上
                ensureIdle(1, false);
            } catch (final Exception e) {
                swallowException(e);
            }
            updateStatsReturn(activeTime);
            return;
        }

        //释放资源
        if (!p.deallocate()) {
            throw new IllegalStateException(
                    "Object has already been returned to this pool or is invalid");
        }

        final int maxIdleSave = getMaxIdle();
        //空闲队列是否已经等于配置的最多空闲数量,如果是则销毁对象,不是则归还到空闲队列中
        if (isClosed() || maxIdleSave > -1 && maxIdleSave <= idleObjects.size()) {
            try {
                destroy(p);
            } catch (final Exception e) {
                swallowException(e);
            }
        } else {
            //如果配置的是先进先出,先进后出归还到空闲队列中
            if (getLifo()) {
                idleObjects.addFirst(p);
            } else {
                idleObjects.addLast(p);
            }
            if (isClosed()) {
                // Pool closed while object was being added to idle objects.
                // Make sure the returned object is destroyed rather than left
                // in the idle object pool (which would effectively be a leak)
                clear();
            }
        }
        updateStatsReturn(activeTime);
    }

PoolableObjectFactory : 可池化对象的维护工厂

public interface PooledObjectFactory<T> {
  /**
   * 创建对象
   * Create an instance that can be served by the pool and wrap it in a
   * {@link PooledObject} to be managed by the pool.
   *
   * @return a {@code PooledObject} wrapping an instance that can be served by the pool
   *
   * @throws Exception if there is a problem creating a new instance,
   *    this will be propagated to the code requesting an object.
   */
  PooledObject<T> makeObject() throws Exception;

  /**
   * 销毁对象
   * Destroys an instance no longer needed by the pool.
   * <p>
   * It is important for implementations of this method to be aware that there
   * is no guarantee about what state <code>obj</code> will be in and the
   * implementation should be prepared to handle unexpected errors.
   * <p>
   * Also, an implementation must take in to consideration that instances lost
   * to the garbage collector may never be destroyed.
   * </p>
   *
   * @param p a {@code PooledObject} wrapping the instance to be destroyed
   *
   * @throws Exception should be avoided as it may be swallowed by
   *    the pool implementation.
   *
   * @see #validateObject
   * @see ObjectPool#invalidateObject
   */
  void destroyObject(PooledObject<T> p) throws Exception;

  /**
   * 检验对象的有效性
   * Ensures that the instance is safe to be returned by the pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be validated
   *
   * @return <code>false</code> if <code>obj</code> is not valid and should
   *         be dropped from the pool, <code>true</code> otherwise.
   */
  boolean validateObject(PooledObject<T> p);

  /**
   * 激活对象
   * Reinitialize an instance to be returned by the pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be activated
   *
   * @throws Exception if there is a problem activating <code>obj</code>,
   *    this exception may be swallowed by the pool.
   *
   * @see #destroyObject
   */
  void activateObject(PooledObject<T> p) throws Exception;

  /**
   * 钝化对象,简单来说就是在归还对象的时候,清空对象,下次借用的可以直接使用
   * Uninitialize an instance to be returned to the idle object pool.
   *
   * @param p a {@code PooledObject} wrapping the instance to be passivated
   *
   * @throws Exception if there is a problem passivating <code>obj</code>,
   *    this exception may be swallowed by the pool.
   *
   * @see #destroyObject
   */
  void passivateObject(PooledObject<T> p) throws Exception;
}

看看jedis的实现JedisFactory,方便理解

class JedisFactory implements PooledObjectFactory<Jedis> {

  @Override
  public void activateObject(PooledObject<Jedis> pooledJedis) throws Exception {
    final BinaryJedis jedis = pooledJedis.getObject();
    if (jedis.getDB() != database) {
      jedis.select(database);
    }

  }

  @Override
  public void destroyObject(PooledObject<Jedis> pooledJedis) throws Exception {
    final BinaryJedis jedis = pooledJedis.getObject();
    if (jedis.isConnected()) {
      try {
        try {
          jedis.quit();
        } catch (Exception e) {
        }
        jedis.disconnect();
      } catch (Exception e) {

      }
    }

  }

  @Override
  public PooledObject<Jedis> makeObject() throws Exception {
    final HostAndPort hostAndPort = this.hostAndPort.get();
    final Jedis jedis = new Jedis(hostAndPort.getHost(), hostAndPort.getPort(), connectionTimeout,
        soTimeout, ssl, sslSocketFactory, sslParameters, hostnameVerifier);

    try {
      jedis.connect();
      if (password != null) {
        jedis.auth(password);
      }
      if (database != 0) {
        jedis.select(database);
      }
      if (clientName != null) {
        jedis.clientSetname(clientName);
      }
    } catch (JedisException je) {
      jedis.close();
      throw je;
    }

    return new DefaultPooledObject<Jedis>(jedis);

  }

  @Override
  public void passivateObject(PooledObject<Jedis> pooledJedis) throws Exception {
    // TODO maybe should select db 0? Not sure right now.
  }

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}

GenericObjectPoolConfig : 连接池配置

  • lifo: true为先进先出;false为先进后出,默认为true,表示对象的出借方式
  • maxWaitMillis: 当连接池资源耗尽时,调用者最大等待阻塞的时间(ms),默认为-1表示永不超时,建议设置值,如果资源一直等待超时,会卡死服务
  • maxTotal: 连接池中最大连接数,默认为8.
  • maxIdle: 连接池中最大空闲的连接数,默认为8.该参数一般尽量与maxTotal相同,以提高并发数
  • minIdle: 连接池中最小空闲的连接数,默认为0,该参数一般尽量比maxIdle小一些
  • blockWhenExhausted: 当连接池资源耗尽时,是否会阻塞等待,默认为true:阻塞
  • testOnBorrow: 调用者获取连接池资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试继续获取。默认为false。建议保持默认值
  • testOnReturn: 向连接池归还连接时,是否检测“连接”对象的有效性。默认为false。建议保持默认值
  • testOnCreate:向连接池添加创建对象时,是否检测“连接”对象的有效性。默认为false。建议保持默认值
  • testWhileIdle: 当驱逐空闲队列的连接对象时,是否允许空闲时进行有效性测试,默认为false
  • timeBetweenEvictionRunsMillis: “空闲连接”驱逐线程,检测的周期,毫秒数。如果为负值,表示不运行“驱逐线程”。默认为-1
  • numTestsPerEvictionRun:驱逐线程一次运行检查多少条“连接”,不要设置太大,太大需要更多的时间来执行
  • minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除
  • softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1.

PooledObject对象的状态

/**
 * Provides the possible states that a {@link PooledObject} may be in.
 *
 * @version $Revision: $
 *
 * @since 2.0
 */
public enum PooledObjectState {
    /**
     * In the queue, not in use.
     * 位于队列中,未使用
     */
    IDLE,

    /**
     * In use.
     * 在使用
     */
    ALLOCATED,

    /**
     * In the queue, currently being tested for possible eviction.
     * 位于队列中,当前正在测试,可能会被回收
     */
    EVICTION,

    /**
     * Not in the queue, currently being tested for possible eviction. An
     * attempt to borrow the object was made while being tested which removed it
     * from the queue. It should be returned to the head of the queue once
     * eviction testing completes.
     * TODO: Consider allocating object and ignoring the result of the eviction
     *       test.
     * 不在队列中,当前正在测试,可能会被回收。从池中借出对象时需要从队列出移除并进行测试
     */
    EVICTION_RETURN_TO_HEAD,

    /**
     * In the queue, currently being validated.
     * 2.0没有用到
     */
    VALIDATION,

    /**
     * Not in queue, currently being validated. The object was borrowed while
     * being validated and since testOnBorrow was configured, it was removed
     * from the queue and pre-allocated. It should be allocated once validation
     * completes.
     * 2.0没有用到
     */
    VALIDATION_PREALLOCATED,

    /**
     * Not in queue, currently being validated. An attempt to borrow the object
     * was made while previously being tested for eviction which removed it from
     * the queue. It should be returned to the head of the queue once validation
     * completes.
     * 2.0没有用到
     */
    VALIDATION_RETURN_TO_HEAD,

    /**
     * Failed maintenance (e.g. eviction test or validation) and will be / has
     * been destroyed
     * 回收或验证失败,将销毁
     */
    INVALID,

    /**
     * Deemed abandoned, to be invalidated.
     * 即将无效
     */
    ABANDONED,

    /**
     * Returning to the pool.
     * 正在返还到池中
     */
    RETURNING
}

LinkedBlockingDeque是保存空闲队列的地方,借出,归还都在这里

双向链表实现的双向并发阻塞队列。该阻塞队列同时支持FIFO和FILO两种操作方式,即可以从队列的头和尾同时操作(插入/删除);并且,该阻塞队列是支持线程安全

private static final class Node<E> {
        /**
         * The item, or null if this node has been removed.
         */
        E item;

        /**
         * One of:
         * - the real predecessor Node
         * - this Node, meaning the predecessor is tail
         * - null, meaning there is no predecessor
         */
        Node<E> prev;

        /**
         * One of:
         * - the real successor Node
         * - this Node, meaning the successor is head
         * - null, meaning there is no successor
         */
        Node<E> next;

        /**
         * Create a new list node.
         *
         * @param x The list item
         * @param p Previous item
         * @param n Next item
         */
        Node(final E x, final Node<E> p, final Node<E> n) {
            item = x;
            prev = p;
            next = n;
        }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,491评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,856评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,745评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,196评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,073评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,112评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,531评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,215评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,485评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,578评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,356评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,215评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,583评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,898评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,174评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,497评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,697评论 2 335

推荐阅读更多精彩内容