聊聊spring-data-redis的连接池的校验

本文主要研究一下spring-data-redis的连接池的校验

lettuce

LettucePoolingConnectionProvider

spring-data-redis/2.0.10.RELEASE/spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettucePoolingConnectionProvider.java

class LettucePoolingConnectionProvider implements LettuceConnectionProvider, RedisClientProvider, DisposableBean {
    private static final Log log = LogFactory.getLog(LettucePoolingConnectionProvider.class);
    private final LettuceConnectionProvider connectionProvider;
    private final GenericObjectPoolConfig poolConfig;
    private final Map<StatefulConnection<?, ?>, GenericObjectPool<StatefulConnection<?, ?>>> poolRef = new ConcurrentHashMap(32);
    private final Map<Class<?>, GenericObjectPool<StatefulConnection<?, ?>>> pools = new ConcurrentHashMap(32);

    LettucePoolingConnectionProvider(LettuceConnectionProvider connectionProvider, LettucePoolingClientConfiguration clientConfiguration) {
        Assert.notNull(connectionProvider, "ConnectionProvider must not be null!");
        Assert.notNull(clientConfiguration, "ClientConfiguration must not be null!");
        this.connectionProvider = connectionProvider;
        this.poolConfig = clientConfiguration.getPoolConfig();
    }

    public <T extends StatefulConnection<?, ?>> T getConnection(Class<T> connectionType) {
        GenericObjectPool pool = (GenericObjectPool)this.pools.computeIfAbsent(connectionType, (poolType) -> {
            return ConnectionPoolSupport.createGenericObjectPool(() -> {
                return this.connectionProvider.getConnection(connectionType);
            }, this.poolConfig, false);
        });

        try {
            StatefulConnection<?, ?> connection = (StatefulConnection)pool.borrowObject();
            this.poolRef.put(connection, pool);
            return (StatefulConnection)connectionType.cast(connection);
        } catch (Exception var4) {
            throw new PoolException("Could not get a resource from the pool", var4);
        }
    }

    public AbstractRedisClient getRedisClient() {
        if (this.connectionProvider instanceof RedisClientProvider) {
            return ((RedisClientProvider)this.connectionProvider).getRedisClient();
        } else {
            throw new IllegalStateException(String.format("Underlying connection provider %s does not implement RedisClientProvider!", this.connectionProvider.getClass().getName()));
        }
    }

    public void release(StatefulConnection<?, ?> connection) {
        GenericObjectPool<StatefulConnection<?, ?>> pool = (GenericObjectPool)this.poolRef.remove(connection);
        if (pool == null) {
            throw new PoolException("Returned connection " + connection + " was either previously returned or does not belong to this connection provider");
        } else {
            pool.returnObject(connection);
        }
    }

    public void destroy() throws Exception {
        if (!this.poolRef.isEmpty()) {
            log.warn("LettucePoolingConnectionProvider contains unreleased connections");
            this.poolRef.forEach((connection, pool) -> {
                pool.returnObject(connection);
            });
            this.poolRef.clear();
        }

        this.pools.forEach((type, pool) -> {
            pool.close();
        });
        this.pools.clear();
    }
}
  • 这里调用ConnectionPoolSupport.createGenericObjectPool来创建连接池

ConnectionPoolSupport.createGenericObjectPool

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

    public static <T extends StatefulConnection<?, ?>> GenericObjectPool<T> createGenericObjectPool(
            Supplier<T> connectionSupplier, GenericObjectPoolConfig config, boolean wrapConnections) {

        LettuceAssert.notNull(connectionSupplier, "Connection supplier must not be null");
        LettuceAssert.notNull(config, "GenericObjectPoolConfig must not be null");

        AtomicReference<ObjectPool<T>> poolRef = new AtomicReference<>();

        GenericObjectPool<T> pool = new GenericObjectPool<T>(new RedisPooledObjectFactory<T>(connectionSupplier), config) {

            @Override
            public T borrowObject() throws Exception {
                return wrapConnections ? wrapConnection(super.borrowObject(), this) : super.borrowObject();
            }

            @Override
            public void returnObject(T obj) {

                if (wrapConnections && obj instanceof HasTargetConnection) {
                    super.returnObject((T) ((HasTargetConnection) obj).getTargetConnection());
                    return;
                }
                super.returnObject(obj);
            }
        };

        poolRef.set(pool);

        return pool;
    }
  • 这里使用了RedisPooledObjectFactory

ConnectionPoolSupport.RedisPooledObjectFactory

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/support/ConnectionPoolSupport.java

    private static class RedisPooledObjectFactory<T extends StatefulConnection<?, ?>> extends BasePooledObjectFactory<T> {

        private final Supplier<T> connectionSupplier;

        RedisPooledObjectFactory(Supplier<T> connectionSupplier) {
            this.connectionSupplier = connectionSupplier;
        }

        @Override
        public T create() throws Exception {
            return connectionSupplier.get();
        }

        @Override
        public void destroyObject(PooledObject<T> p) throws Exception {
            p.getObject().close();
        }

        @Override
        public PooledObject<T> wrap(T obj) {
            return new DefaultPooledObject<>(obj);
        }

        @Override
        public boolean validateObject(PooledObject<T> p) {
            return p.getObject().isOpen();
        }
    }
  • 这里继承了BasePooledObjectFactory,重写了validate等方法,这里validate是通过isOpen来判断

RedisChannelHandler.isOpen

lettuce-core-5.0.5.RELEASE-sources.jar!/io/lettuce/core/RedisChannelHandler.java

public abstract class RedisChannelHandler<K, V> implements Closeable, ConnectionFacade {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisChannelHandler.class);

    private Duration timeout;
    private CloseEvents closeEvents = new CloseEvents();

    private final RedisChannelWriter channelWriter;
    private final boolean debugEnabled = logger.isDebugEnabled();

    private volatile boolean closed;
    private volatile boolean active = true;
    private volatile ClientOptions clientOptions;

    //......

    /**
     * Notification when the connection becomes active (connected).
     */
    public void activated() {
        active = true;
        closed = false;
    }

    /**
     * Notification when the connection becomes inactive (disconnected).
     */
    public void deactivated() {
        active = false;
    }

    /**
     *
     * @return true if the connection is active and not closed.
     */
    public boolean isOpen() {
        return active;
    }

    @Override
    public synchronized void close() {

        if (debugEnabled) {
            logger.debug("close()");
        }

        if (closed) {
            logger.warn("Connection is already closed");
            return;
        }

        if (!closed) {
            active = false;
            closed = true;
            channelWriter.close();
            closeEvents.fireEventClosed(this);
            closeEvents = new CloseEvents();
        }
    }
}
  • isOpen是通过active字段来判断的,而active在deactivated或者close的时候变为false,初始化以及在activated的时候变为true
  • 可以看到对于docker pause这种造成的timeout,active这种方式检测不出来

LettuceConnectionFactory.SharedConnection.validateConnection

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/LettuceConnectionFactory.java

        /**
         * Validate the connection. Invalid connections will be closed and the connection state will be reset.
         */
        void validateConnection() {

            synchronized (this.connectionMonitor) {

                boolean valid = false;

                if (connection != null && connection.isOpen()) {
                    try {

                        if (connection instanceof StatefulRedisConnection) {
                            ((StatefulRedisConnection) connection).sync().ping();
                        }

                        if (connection instanceof StatefulRedisClusterConnection) {
                            ((StatefulRedisConnection) connection).sync().ping();
                        }
                        valid = true;
                    } catch (Exception e) {
                        log.debug("Validation failed", e);
                    }
                }

                if (!valid) {

                    if (connection != null) {
                        connectionProvider.release(connection);
                    }

                    log.warn("Validation of shared connection failed. Creating a new connection.");

                    resetConnection();
                    this.connection = getNativeConnection();
                }
            }
        }
  • 这个是默认开启LettuceConnectionFactory的shareNativeConnection走的获取连接的方法
  • 如果LettuceConnectionFactory的validateConnection为true的话(默认为false),则会自己在每次get的时候执行一下validateConnection

DefaultLettucePool.LettuceFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/lettuce/DefaultLettucePool.java

private static class LettuceFactory extends BasePooledObjectFactory<StatefulConnection<byte[], byte[]>> {
        private final RedisClient client;
        private int dbIndex;

        public LettuceFactory(RedisClient client, int dbIndex) {
            this.client = client;
            this.dbIndex = dbIndex;
        }

        public void activateObject(PooledObject<StatefulConnection<byte[], byte[]>> pooledObject) throws Exception {
            if (pooledObject.getObject() instanceof StatefulRedisConnection) {
                ((StatefulRedisConnection)pooledObject.getObject()).sync().select(this.dbIndex);
            }

        }

        public void destroyObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) throws Exception {
            try {
                ((StatefulConnection)obj.getObject()).close();
            } catch (Exception var3) {
                ;
            }

        }

        public boolean validateObject(PooledObject<StatefulConnection<byte[], byte[]>> obj) {
            try {
                if (obj.getObject() instanceof StatefulRedisConnection) {
                    ((StatefulRedisConnection)obj.getObject()).sync().ping();
                }

                return true;
            } catch (Exception var3) {
                return false;
            }
        }

        public StatefulConnection<byte[], byte[]> create() throws Exception {
            return this.client.connect(LettuceConnection.CODEC);
        }

        public PooledObject<StatefulConnection<byte[], byte[]>> wrap(StatefulConnection<byte[], byte[]> obj) {
            return new DefaultPooledObject(obj);
        }
    }
  • 被废弃的DefaultLettucePool里头有个LettuceFactory,其validate是通过ping来判断的,因而更为准确

jedis

JedisConnectionFactory

spring-data-redis-2.0.10.RELEASE-sources.jar!/org/springframework/data/redis/connection/jedis/JedisConnectionFactory.java

public class JedisConnectionFactory implements InitializingBean, DisposableBean, RedisConnectionFactory {
    //......
    private Pool<Jedis> createPool() {

        if (isRedisSentinelAware()) {
            return createRedisSentinelPool(this.sentinelConfig);
        }
        return createRedisPool();
    }

    /**
     * Creates {@link JedisSentinelPool}.
     *
     * @param config the actual {@link RedisSentinelConfiguration}. Never {@literal null}.
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisSentinelPool(RedisSentinelConfiguration config) {

        GenericObjectPoolConfig poolConfig = getPoolConfig() != null ? getPoolConfig() : new JedisPoolConfig();
        return new JedisSentinelPool(config.getMaster().getName(), convertToJedisSentinelSet(config.getSentinels()),
                poolConfig, getConnectTimeout(), getReadTimeout(), getPassword(), getDatabase(), getClientName());
    }

    /**
     * Creates {@link JedisPool}.
     *
     * @return the {@link Pool} to use. Never {@literal null}.
     * @since 1.4
     */
    protected Pool<Jedis> createRedisPool() {

        return new JedisPool(getPoolConfig(), getHostName(), getPort(), getConnectTimeout(), getReadTimeout(),
                getPassword(), getDatabase(), getClientName(), isUseSsl(),
                clientConfiguration.getSslSocketFactory().orElse(null), //
                clientConfiguration.getSslParameters().orElse(null), //
                clientConfiguration.getHostnameVerifier().orElse(null));
    }
    //......
}
  • 不管是JedisPool还是JedisSentinelPool,里头使用的是JedisFactory

JedisFactory.validateObject

jedis-2.9.0-sources.jar!/redis/clients/jedis/JedisFactory.java

class JedisFactory implements PooledObjectFactory<Jedis> {
  private final AtomicReference<HostAndPort> hostAndPort = new AtomicReference<HostAndPort>();
  private final int connectionTimeout;
  private final int soTimeout;
  private final String password;
  private final int database;
  private final String clientName;
  private final boolean ssl;
  private final SSLSocketFactory sslSocketFactory;
  private SSLParameters sslParameters;
  private HostnameVerifier hostnameVerifier;

  //......

  @Override
  public boolean validateObject(PooledObject<Jedis> pooledJedis) {
    final BinaryJedis jedis = pooledJedis.getObject();
    try {
      HostAndPort hostAndPort = this.hostAndPort.get();

      String connectionHost = jedis.getClient().getHost();
      int connectionPort = jedis.getClient().getPort();

      return hostAndPort.getHost().equals(connectionHost)
          && hostAndPort.getPort() == connectionPort && jedis.isConnected()
          && jedis.ping().equals("PONG");
    } catch (final Exception e) {
      return false;
    }
  }
}
  • JedisFactory实现了PooledObjectFactory接口,其validateObject方法不仅校验isConnected,而且也校验了ping方法
  • ping方法只要超时就会抛出异常,从而校验失败,因而可以感知到docker pause带来的timeout,从而将连接从连接池剔除

小结

  • spring-date-redis的2.0及以上版本废弃了原来的LettucePool,改为使用LettucePoolingClientConfiguration
  • 这里有一个问题,就是旧版连接池校验是采用ping的方式,而新版连接池校验则是使用active字段来标识,对于docker pause识别不出来
  • 对于lettuce其shareNativeConnection参数默认为true,且validateConnection为false,第一次从连接池borrow到连接之后,就一直复用底层的连接,也没有归还。如果要每次获取连接都走连接池获取然后归还,需要设置shareNativeConnection为false
  • jedis的连接池实现,其validateObject方法不仅校验isConnected,而且也校验了ping方法,因而能够感知到docker pause带来的timeout,从而将连接从连接池剔除
  • 对于lettuce来说,如果要识别docker pause的异常,有两个方案,一个是修复ConnectionPoolSupport中RedisPooledObjectFactory的validateObject方法,不仅判断isOpen,还需要ping一下;另外一个是不开启连接池,并且将LettuceConnectionFactory的validateConnection参数设置为true

doc

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 205,132评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,802评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,566评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,858评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,867评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,695评论 1 282
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,064评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,705评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,915评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,677评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,796评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,432评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,041评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,992评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,223评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,185评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,535评论 2 343

推荐阅读更多精彩内容

  • 熬夜看了枪版《我不是药神》 想起剧情中提到的意思:禁药虽是“真有效药”,但代购违禁药,服用违禁药都是不合法的。 看...
    叶叶叶界心令阅读 221评论 0 0
  • Docker Docker 是一个为开发者和系统管理员在容器中开发、部署和运行的平台。 灵活、轻量级、可互换、部署...
    Jason_c8d4阅读 372评论 1 0