Redis客户端Lettuce源码【二】Lettuce是如何基于Netty建立连接的

lettuce-core版本: 5.1.7.RELEASE

先看一下Lettuce的基本使用方法,使用Lettuce大概分为如下几步:

  1. 基于Redis连接信息创建RedisClient
  2. 基于RedisClient创建StatefulRedisConnection
  3. 从Connection中获取Command,基于Command执行Redis命令操作。
/**
 * @author xiaobing
 * @date 2019/12/20
 */
public class LettuceSimpleUse {
    private void testLettuce() throws ExecutionException, InterruptedException {
        //构建RedisClient对象,RedisClient包含了Redis的基本配置信息,可以基于RedisClient创建RedisConnection
        RedisClient client = RedisClient.create("redis://localhost");

        //创建一个线程安全的StatefulRedisConnection,可以多线程并发对该connection操作,底层只有一个物理连接.
        StatefulRedisConnection<String, String> connection = client.connect();

        //获取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三种command
        RedisStringCommands<String, String> sync = connection.sync();
        String value = sync.get("key");
        System.out.println("get redis value with lettuce sync command, value is :" + value);

        //获取SyncCommand。Lettuce支持SyncCommand、AsyncCommands、ActiveCommand三种command
        RedisAsyncCommands<String, String> async = connection.async();
        RedisFuture<String> getFuture = async.get("key");
        value = getFuture.get();
        System.out.println("get redis value with lettuce async command, value is :" + value);
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        new LettuceSimpleUse().testLettuce();
    }
}

先看一张建立连接的时序图,有一个直观的印象。


lettuce源码--建立redis连接

RedisClient

一个可扩展、线程安全的RedisClient,支持sync、async、reactor执行模式。
RedisClient.create只是传入了一些配置信息,此时并没有创建连接。

// 使用默认的ClientResource
public static RedisClient create(String uri) {
    LettuceAssert.notEmpty(uri, "URI must not be empty");
    return new RedisClient(null, RedisURI.create(uri));
}
// ClientResources中包含了一些配置和线程池信息,是一个比较重的资源,多个RedisClient可以共享同一个ClientResource
protected RedisClient(ClientResources clientResources, RedisURI redisURI) {
    super(clientResources);
    assertNotNull(redisURI);
    this.redisURI = redisURI;
    setDefaultTimeout(redisURI.getTimeout());
 }

RedisClient.connnect

可以看到connect方法有一些重载方法,默认的是用UTF8 String对key和value序列化,通过传入RedisCodec支持自定义的对Key和Value的序列化方式。

    public StatefulRedisConnection<String, String> connect() {
        return connect(newStringStringCodec());
    }

    public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {

        checkForRedisURI();
        //connectStandaloneAsync是异步创建connection,返回的是Future对象,通过getConnection转为同步操作
        return getConnection(connectStandaloneAsync(codec, this.redisURI, timeout));
    }
    //异步转同步操作
    protected <T> T getConnection(ConnectionFuture<T> connectionFuture) {
        try {
            return connectionFuture.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
        } catch (Exception e) {

            if (e instanceof ExecutionException) {
                throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e.getCause());
            }
            throw RedisConnectionException.create(connectionFuture.getRemoteAddress(), e);
        }
    }

RedisClient.connectStandaloneAsync

    private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec,
            RedisURI redisURI, Duration timeout) {

        assertNotNull(codec);
        checkValidRedisURI(redisURI);

        logger.debug("Trying to get a Redis connection for: " + redisURI);
        //创建一个有状态的EndPoint用于抽象底层channel的实现,DefaultEndpoint内部封装断线重连、重连后成功后回放连接失败期间的command。同时封装了AT_MOST_ONCE、AT_LEAST_ONCE的可靠性实现(该逻辑是基于内存的,所以并不可靠)。
        DefaultEndpoint endpoint = new DefaultEndpoint(clientOptions, clientResources);
        RedisChannelWriter writer = endpoint;
        
        //进一步封装,添加支持过期时间的执行命令
        if (CommandExpiryWriter.isSupported(clientOptions)) {
            writer = new CommandExpiryWriter(writer, clientOptions, clientResources);
        }
        //创建StatefulRedisConnectionImpl对象,StatefulRedisConnectionImpl对外提供RedisCommand对象,内部基于writer发送命令。此时并没有真正的创建物理连接,该类本身是无状态、线程安全的。
        StatefulRedisConnectionImpl<K, V> connection = newStatefulRedisConnection(writer, codec, timeout);
        //异步创建Redis物理连接,返回future对象。后面可以看到future中返回的对象其实还是上面的connection
        ConnectionFuture<StatefulRedisConnection<K, V>> future = connectStatefulAsync(connection, codec, endpoint, redisURI,
                () -> new CommandHandler(clientOptions, clientResources, endpoint));

        future.whenComplete((channelHandler, throwable) -> {

            if (throwable != null) {
                connection.close();
            }
        });

        return future;
    }
    //StatefulRedisConnectionImpl的构造函数,此时已经创建了sync、async、reactive三种类型的RedisCommand。基于RedisCodec对key和value序列化,通过write把命令真正的发出去。
    public StatefulRedisConnectionImpl(RedisChannelWriter writer, RedisCodec<K, V>  codec, Duration timeout) {

        super(writer, timeout);

        this.codec = codec;
        this.async = newRedisAsyncCommandsImpl();
        this.sync = newRedisSyncCommandsImpl();
        this.reactive = newRedisReactiveCommandsImpl();
    }

RedisClient.connectStatefulAsync

    private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection,
            RedisCodec<K, V> codec, Endpoint endpoint,
            RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
        //构建ConnectionBuidler,通过ConnectionBuilder来创建connection
        ConnectionBuilder connectionBuilder;
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        //填充StatefulRedisConnectionImpl
        connectionBuilder.connection(connection);
        //控制RedisClient行为的一些配置参数
        connectionBuilder.clientOptions(clientOptions);
        //ClientResource包含了一些EventLoopGroup信息
        connectionBuilder.clientResources(clientResources);
        //配置commandHandlerSupplier,这个commandHandler很重要,是实现StatefulRedisConnectionImpl线程安全的关键,后面会详细讲。
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
        //connectionBuilder填充Bootstrap等更多的信息
        //getSocketAddressSupplier是根据redisURI获取真正的Redis连接信息,如:sentinel模式下,需要从sentinel获取到真实的redis连接地址
        connectionBuilder(getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        //配置netty的channeltype
        channelType(connectionBuilder, redisURI);

        if (clientOptions.isPingBeforeActivateConnection()) {
            if (hasPassword(redisURI)) {
                connectionBuilder.enableAuthPingBeforeConnect();
            } else {
                connectionBuilder.enablePingBeforeConnect();
            }
        }
        //初始化channel,在这一步才真正的异步的去创建物理连接
        ConnectionFuture<RedisChannelHandler<K, V>> future = initializeChannelAsync(connectionBuilder);
        ConnectionFuture<?> sync = future;

        if (!clientOptions.isPingBeforeActivateConnection() && hasPassword(redisURI)) {
            //连接成功之后发送auth命令,做密码的验证
            sync = sync.thenCompose(channelHandler -> {

                CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getPassword());
                return connection.async().dispatch(CommandType.AUTH, new StatusOutput<>(codec), args);
            });
        }
        //设置clientName,从Redis服务端执行client list可以看到clientname
        if (LettuceStrings.isNotEmpty(redisURI.getClientName())) {
            sync = sync.thenApply(channelHandler -> {
                connection.setClientName(redisURI.getClientName());
                return channelHandler;
            });
        }
        //选择db
        if (redisURI.getDatabase() != 0) {

            sync = sync.thenCompose(channelHandler -> {

                CommandArgs<K, V> args = new CommandArgs<>(codec).add(redisURI.getDatabase());
                return connection.async().dispatch(CommandType.SELECT, new StatusOutput<>(codec), args);
            });
        }
        //返回connection对象
        return sync.thenApply(channelHandler -> (S) connection);
    }

RedisClient.connectionBuilder

//为ConnectionBuidler填充更多的信息,如Bootstrap、channelGroup
protected void connectionBuilder(Mono<SocketAddress> socketAddressSupplier, ConnectionBuilder connectionBuilder,
            RedisURI redisURI) {
        //创建Netty客户端的Bootstrap对象
        Bootstrap redisBootstrap = new Bootstrap();
        //Bootstrap的一些配置参数,具体可以参考Netty的相关书籍(Netty权威指南)
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
        redisBootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 8 * 1024);
        redisBootstrap.option(ChannelOption.ALLOCATOR, BUF_ALLOCATOR);

        SocketOptions socketOptions = getOptions().getSocketOptions();

        redisBootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS,
                Math.toIntExact(socketOptions.getConnectTimeout().toMillis()));

        if (LettuceStrings.isEmpty(redisURI.getSocket())) {
            //keepAlive参数,默认为true
            redisBootstrap.option(ChannelOption.SO_KEEPALIVE, socketOptions.isKeepAlive());
            //tcp_nodelay参数,默认为true
            redisBootstrap.option(ChannelOption.TCP_NODELAY, socketOptions.isTcpNoDelay());
        }

        connectionBuilder.timeout(redisURI.getTimeout());
        connectionBuilder.password(redisURI.getPassword());
        //把构建出来的bootStrap对象赋值给connectionBuidler,由connectionBuilder创建连接
        connectionBuilder.bootstrap(redisBootstrap);
        
        //Netty的相关参数配置,待研究        
        connectionBuilder.channelGroup(channels).connectionEvents(connectionEvents).timer(timer);
        //配置socket地址提供者
        connectionBuilder.socketAddressSupplier(socketAddressSupplier);
    }

RedisClient.initializeChannelAsync

//初始化redis连接,返回ChannelFuture对象
protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initializeChannelAsync(
            ConnectionBuilder connectionBuilder) {

        Mono<SocketAddress> socketAddressSupplier = connectionBuilder.socketAddress();

        if (clientResources.eventExecutorGroup().isShuttingDown()) {
            throw new IllegalStateException("Cannot connect, Event executor group is terminated.");
        }
        //创建socketAddressFuture 对象,当socketAddressSupplier异步获取SocketAddress成功之后会把SocketAddress数据放入该对象中
        CompletableFuture<SocketAddress> socketAddressFuture = new CompletableFuture<>();
        //创建channelReadyFuture,当连接建立成功之后会把Channel对象放入该对象中
        CompletableFuture<Channel> channelReadyFuture = new CompletableFuture<>();

        //配置获取SocketAddress异步操作之后的操作:
        //1. 把SocketAddress对象放入socketAddressFuture中
        //2. 基于SocketAddress调用initializeChannelAsync0方法真正去建立连接
        socketAddressSupplier.doOnError(socketAddressFuture::completeExceptionally).doOnNext(socketAddressFuture::complete)
                .subscribe(redisAddress -> {

                    if (channelReadyFuture.isCancelled()) {
                        return;
                    }
                    //异步建立真正的连接,如果建立成功会把生产的Channel对象放入channelReadyFuture中
                    initializeChannelAsync0(connectionBuilder, channelReadyFuture, redisAddress);
                }, channelReadyFuture::completeExceptionally);
        //建立连接成功之后返回的还是connectionBuilder的connection对象,即StatefulRedisConnectionImpl
        return new DefaultConnectionFuture<>(socketAddressFuture, channelReadyFuture.thenApply(channel -> (T) connectionBuilder
                .connection()));
    }

RedisClient.initializeChannelAsync0

//真正的去建立Redis物理连接,这里面有很多基于Future的异步操作,如果看不太懂,建议先看看Future的相关知识,多看几遍。
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
            SocketAddress redisAddress) {

        logger.debug("Connecting to Redis at {}", redisAddress);

        Bootstrap redisBootstrap = connectionBuilder.bootstrap();
        //创建PlainChannelInitializer对象,PlainChannelIntializer对象会在Channel初始化的时候添加很多Handlers(Netty的Handler概念可以参考Netty权威指南),如:CommandEncoder、CommandHandler(非常重要的Handler)、ConnectionWatchdog(实现断线重连)
        RedisChannelInitializer initializer = connectionBuilder.build();
        //RedisChannelInitializer配置到Bootstrap中
        redisBootstrap.handler(initializer);

        //调用一些通过ClientResources自定义的回调函数
        clientResources.nettyCustomizer().afterBootstrapInitialized(redisBootstrap);
        //获取initFuture 对象,如果Channel初始化完成,可以通过该对象获取到初始化的结果
        CompletableFuture<Boolean> initFuture = initializer.channelInitialized();
        //真正的通过Netty异步的方式去建立物理连接,返回ChannelFuture对象
        ChannelFuture connectFuture = redisBootstrap.connect(redisAddress);
        //配置异常处理
        channelReadyFuture.whenComplete((c, t) -> {

            if (t instanceof CancellationException) {
                connectFuture.cancel(true);
                initFuture.cancel(true);
            }
        });

        connectFuture.addListener(future -> {
            //异常处理
            if (!future.isSuccess()) {

                logger.debug("Connecting to Redis at {}: {}", redisAddress, future.cause());
                connectionBuilder.endpoint().initialState();
                //赋值channelReadyFuture告知出现异常了
                channelReadyFuture.completeExceptionally(future.cause());
                return;
            }
            //当Channel初始化完成之后,根据初始化的结果做判断
            initFuture.whenComplete((success, throwable) -> {
                //如果异常为空,则初始化成功。
                if (throwable == null) {

                    logger.debug("Connecting to Redis at {}: Success", redisAddress);
                    RedisChannelHandler<?, ?> connection = connectionBuilder.connection();
                    connection.registerCloseables(closeableResources, connection);
                    //把成功之后的结果赋值给channelReadyFuture对象
                    channelReadyFuture.complete(connectFuture.channel());
                    return;
                }
                
                //如果初始化Channel的过程中出现异常的处理逻辑
                logger.debug("Connecting to Redis at {}, initialization: {}", redisAddress, throwable);
                connectionBuilder.endpoint().initialState();
                Throwable failure;

                if (throwable instanceof RedisConnectionException) {
                    failure = throwable;
                } else if (throwable instanceof TimeoutException) {
                    failure = new RedisConnectionException("Could not initialize channel within "
                            + connectionBuilder.getTimeout(), throwable);
                } else {
                    failure = throwable;
                }
                //赋值channelReadyFuture告知出现异常了
                channelReadyFuture.completeExceptionally(failure);
            });
        });
    }

至此,Redis的Connection的建立连接的主流程就结束了,具体的一些逻辑如:断线重连是如何实现的,Redis模式下是怎么基于Sentinel获取Redis实际连接的等等会在后续的文章中介绍。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • 1.1 资料 ,最好的入门小册子,可以先于一切文档之前看,免费。 作者Antirez的博客,Antirez维护的R...
    JefferyLcm阅读 17,030评论 1 51
  • Redis 简介 Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。 Redi...
    奋斗的小鸟GO阅读 437评论 0 2
  • NOSQL类型简介键值对:会使用到一个哈希表,表中有一个特定的键和一个指针指向特定的数据,如redis,volde...
    MicoCube阅读 3,956评论 2 27
  • Ubuntu下安装redis 安装redis 在 Ubuntu 系统安装 Redi 可以使用以下命令: 启动 Re...
    riverstation阅读 906评论 0 0
  • 一、Python简介和环境搭建以及pip的安装 4课时实验课主要内容 【Python简介】: Python 是一个...
    _小老虎_阅读 5,718评论 0 10