Redis客户端Lettuce源码【四】Lettuce是如何断线重连的

上一篇主要介绍了RedisCommand在Lettuce中是如何流转的,以及Lettuce是如何通过单一连接处理多线程请求的。这一篇主要介绍一下Lettuce中是如何实现Redis断线重连的,在介绍之前先看一张图回忆一下RedisComman流转过程中牵扯到的几个类,以及类之间的关系。


file

如上图所示RedisCommand在Lettuce中经历了如下流程:

  1. RedisAsyncCommands调用StatefulRedisConnectionImpl的dispatch
  2. StatefulRedisConnectionImpl调用DefaultEndpoint的writeCommand
  3. 与Redis连接正常,autoFlush为true是,DefaultEndpoint会直接把Command通过Netty的channel发送出去
  4. Netty收到RedisCommand之后,会在EventLoop中回调我们的初始化Bootstrap时配置的CommandHandler的write方法,在这个write方法中CommandHandler会把RedisCommand放入stack(双向队列)的尾部
  5. 把RedisCommand序列化之后发送给Redis
  6. Netty在收到Redis的response之后会在EventLoop中回调CommandHandler的channelRead方法,CommandHandler会在这个方法调用中从stack的头部取一个RedisCommand,基于这个RedisCommand对Redis的response反序列化然后调用RedisCommand的complete方法,该RedisCommand的调用方就会收到通知收到Redis消息了。至此RedisCommand就算结束了旅程。

这个时候可能会有疑问?CommandHandler怎么确保Redis返回的消息就一定能与stack双向队列的第一个RedisCommand对应上的呢,也就是说Redis返回的消息为什么就刚好是第一个RedisCommand请求的结果呢。

其实上一篇已经介绍了,在正常场景下CommandHandler接收RedisCommand的是串行有序的,把RedisCommand通过tcp协议写入Redis也是有序的,Redis本身是单线程处理请求,所以Redis内部处理以及返回结果也是有序的1这样就能保证先进入CommandHandler的RedisCommand一定先收到Redis的响应。(这里可以思考一下,如果Redis不是单线程的,比如Dubbo也是单一长连接,但是服务端是多线程并发处理请求的,所以对于请求的返回是无序的,用这种stack数据结构是否可行呢?)

上面说了正常场景下CommandHandler的stack结构可以保证请求与Redis的返回结果对应上,那如果连接断开又连接上了,这种顺序还能保证吗?答案是不能保证,下面就具体看一下Lettuce的断线重连是如何实现,以及断线重连期间都做了什么工作保证RedisCommand能与Redis影响请求对应上的。

Lettuce实现断线重连的核心类是ConnectionWatchdog,那么ConnectionWatchdog具体是如何被实例化、被应用的,需要回过头来看下Redis连接的初始化过程。

  1. 初始化Netty的Bootstrap时设置PlainChannelInitializer
  2. Netty的channel连接初始化时会回调PlainChannelInitializer的initChannel方法
  3. 在initChannel方法中会调用ConnectionBuidler.buildHandlers方法获取所有的handler放入channel的pipeline中。(Netty对于收到和发送的所有消息都会挨个调用pipeline,具体可以参考Netty权威指南这本书)
  4. ConnectionBuidler.build方法中会负责创建ConnectionWatchdog
//RedisClient.initializeChannelAsync0
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
            SocketAddress redisAddress) {

    logger.debug("Connecting to Redis at {}", redisAddress);

    Bootstrap redisBootstrap = connectionBuilder.bootstrap();
    //创建一个RedisChannelInitializer
    RedisChannelInitializer initializer = connectionBuilder.build();
    //把initializer赋值给RedisBootstrap,Netty会在Channel初始化的时候回调该initializer
    redisBootstrap.handler(initializer);

    ...
}

// ConnectionBuidler.build
public RedisChannelInitializer build() {
    //创建PlainChannelInitializer对象,这个地方要注意this::buildHandlers方法,PlainChannelInitializer会在Channel初始化的时候调用该this::buildHandler方法获取所有的handler放入Channel的handler pipeline中。
    return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
}

// RedisChannelInitializer.initChannel 该方法会在建立连接,Channel初始化的时候被调用
@Override
protected void initChannel(Channel channel) throws Exception {

    ...
    //调用ConnectionBuidler.buildHandlers方法获取所有的handler放入channel的pipeline中。(对于Netty的pipeline机制可以参考Netty权威指南这本书)
    for (ChannelHandler handler : handlers.get()) {
            channel.pipeline().addLast(handler);
    }

    clientResources.nettyCustomizer().afterChannelInitialized(channel);
}

//ConnectionBuidler.buildHandlers负责创建Channel所使用的ChannelHandler对象
protected List<ChannelHandler> buildHandlers() {

    ...

    handlers.add(new ChannelGroupListener(channelGroup));
    handlers.add(new CommandEncoder());
    handlers.add(commandHandlerSupplier.get());
    // 判断如果配置了自动重连就添加ConnectionWatchdog
    if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
    }

    handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
    //这个地方又检查并添加了一遍,不太明白再次创建的目的是什么,有知道的朋友,欢迎指出。
    if (clientOptions.isAutoReconnect()) {
            handlers.add(createConnectionWatchdog());
    }

    return handlers;
}

上面可以看到ConnectionWatchdog是如何被应用到Netty的ChannelHandler中的,下面看下ConnectionWatchdog是如何构建的,以及如何自动重连的。

  1. 基于配置创建ConnectionWatchdog
  2. ConnectionWatchdog的ChannelActive和ChannelInActive会在Channel建立成功和断开连接的时候被回调
  3. 在ConnectionWatchdog的ChannelInActive方法中会尝试重连,断开连接之后并不是立即重连,而是根据一个延时重连的策略来延迟执行重连任务。
protected ConnectionWatchdog createConnectionWatchdog() {
    // 可以看到即使上面被调用了两次,其实对象只有一个。另外因为对于一个StatefulConnectionImpl来说,ConnectionBuilder是同一个的,所以即使Channel断线重连了,ConnectionWatchdog也还是这个对象。
    if (connectionWatchdog != null) {
            return connectionWatchdog;
    }

    ...
    //基于一些配置项构建ConnectionWatchdog对象
    ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer,
                    clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection);
    //把watchdog传给endpoint,前面已经说过endpoint是更高级别的抽象,用来抽象底层channel,注册给endpoint是为了让endpoint在某些场景下直接调用配置watchdog。
    endpoint.registerConnectionWatchdog(watchdog);
    
    //把创建的watchdog赋值给当前的ConnectionBuilder对象
    connectionWatchdog = watchdog;
    return watchdog;
}

//Channel建立成功之后回调channelActive,channelActive方法中其实没做什么实质性的工作,主要是把reconnectSchedulerSync设置为false,相当于释放锁
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //reconnectSchedulerSync可以理解是重连定时任务的锁,设置为false表示锁是释放的。
    reconnectSchedulerSync.set(false);
    channel = ctx.channel();
    reconnectScheduleTimeout = null;
    logPrefix = null;
    remoteAddress = channel.remoteAddress();
    logPrefix = null;
    logger.debug("{} channelActive()", logPrefix());

    super.channelActive(ctx);
}

//断开连接的时候channelInactive会被调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {

        ...
        
    //把旧的channl设置为null,旧的channel就是断开连接的channel
    channel = null;

    if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
        // 真正的重连逻辑在这里!!!!
        scheduleReconnect();
    } else {
        logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
    }

    super.channelInactive(ctx);
}

//scheduleReconnect,顾名思义计划派发重连,并不是真正的重连
public void scheduleReconnect() {
    ...
    // 通过对reconnectSchedulerSync做cas的方式获取锁
    if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {

        attempts++;
        final int attempt = attempts;
        //根据重试次数获取延迟执行重连的时间,这个应该也好理解,当连接断开的时候并不是立即重连的(因为此时重连大概率也是失败),默认的重连策略是等待X时间再尝试连接,这个X是递增的,也就是说失败的次数越多,下次重试之前间隔的时间越长,当然也有一个上限。
        int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
        logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);

        this.reconnectScheduleTimeout = timer.newTimeout(it -> {
            ...
            //通过reconnectWorkers来真正的执行重连逻辑,而不是在当前线程中
            reconnectWorkers.submit(() -> {
                //真正的重连逻辑!!!!
                ConnectionWatchdog.this.run(attempt);
                return null;
            });
        }, timeout, TimeUnit.MILLISECONDS);

        // Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
        if (!reconnectSchedulerSync.get()) {
            reconnectScheduleTimeout = null;
        }
    } else {
        logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
    }
}

下面看一下ConnectionWatchdog真正执行重连的逻辑

//ConnectionWatchdog.run是真正执行重连的逻辑,并且是在其他线程中执行的
public void run(int attempt) throws Exception {
    //设置为false,表示释放reconnectSchedulerSync的锁
    reconnectSchedulerSync.set(false);
    ...

    try {
        reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));
        logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);
        
        //真正的重连逻辑在这里!!!!
        CompletableFuture<Channel> future = reconnectionHandler.reconnect();

        future.whenComplete((c, t) -> {
            //如果连接建立成功了就直接返回
            if (c != null && t == null) {
                    return;
            }

            ...
            //如果连接建立失败,就重新尝试重连
            if (!isReconnectSuspended()) {
                    scheduleReconnect();
            }
        });
    } catch (Exception e) {
            logger.log(warnLevel, "Cannot reconnect: {}", e.toString());
    }
}

//ReconnectionHandler.reconnect
protected CompletableFuture<Channel> reconnect() {

    CompletableFuture<Channel> future = new CompletableFuture<>();
    //通过socketAddressSupplier获取实际Redis地址,获取到Redis地址之后执行真正的重连逻辑
    socketAddressSupplier.subscribe(remoteAddress -> {

            if (future.isCancelled()) {
                    return;
            }
            //真正的重连逻辑
            reconnect0(future, remoteAddress);

    }, future::completeExceptionally);

    return this.currentFuture = future;
}

//ReconnectionHandler.reconnect0
private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {
    //其实重连就是调用bootstrap的connect方法
    ChannelFuture connectFuture = bootstrap.connect(remoteAddress);
    ChannelPromise initFuture = connectFuture.channel().newPromise();

    logger.debug("Reconnecting to Redis at {}", remoteAddress);

    result.whenComplete((c, t) -> {

            if (t instanceof CancellationException) {
                    connectFuture.cancel(true);
                    initFuture.cancel(true);
            }
    });

    initFuture.addListener((ChannelFuture it) -> {

            if (it.cause() != null) {

                    connectFuture.cancel(true);
                    close(it.channel());
                    result.completeExceptionally(it.cause());
            } else {
                    result.complete(connectFuture.channel());
            }
    });
    //异常和超时逻辑处理
    ...
}

所以其实真正重连的实现方法就是调用bootstrap.connect方法,这里可能会有一个疑问:connection方法会的ChannelFuture对象并没有被使用,前面的文章中提到过DefaultEndpoint抽象了channel的调用,所以DefaultEndpoint对象中是有对Channel对象的引用的,那重新连接成功创建的Channel是如何告知DefaultEndpoint的呢。

其实根源还是PlainChannelInitializer中,PlainChannelInitializer对象是配置到Netty的bootstrap中的,所以当每次该bootstrap对象创建一个channel的时候都会调用PlainChannelInitializer的initchannel方法,从而把ConnectionBuilder中得handlers注册到channel中。这个handlers中有一个CommandHandler(虽然每次创建新的channel都会创建新的CommandHandler,但是所有的CommandHandler对象引用的DefaultEndpoint是同一个)。实现如下:

//CommandHandler.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {

    ...
    //调用Endpoint的notifyChannelActive方法告知Channel变了
    endpoint.notifyChannelActive(ctx.channel());

    super.channelActive(ctx);

    ...
}

//DefaultEndpoint.notifyChannelActive
@Override
public void notifyChannelActive(Channel channel) {

    this.logPrefix = null;
    //把自己的channel指向为新的channel对象
    this.channel = channel;
    this.connectionError = null;

    ...

    //让connectionWatchdog重新监听断开连接事件
    if (connectionWatchdog != null) {
            connectionWatchdog.arm();
    }
    //获取悲观锁
    sharedLock.doExclusive(() -> {
        try {
                // Move queued commands to buffer before issuing any commands because of connection activation.
                // That's necessary to prepend queued commands first as some commands might get into the queue
                // after the connection was disconnected. They need to be prepended to the command buffer

                ...
                //调用StatefulConnectionImpl的activated方法,这个里面也做很多总要的事情!!!!
                connectionFacade.activated();

                //把断开连接时缓存的Command重新通过Channel发送出去
                flushCommands(disconnectedBuffer);
        } catch (Exception e) {

                ...
        }
    });
}

可能还会有一个疑问,就是我们在第一次创建连接的时候,在连接成功之后有判断是否有密码,有密码就发送AUTH命令,有选择DB就发送Select命令等,在重连的时候却并没有看到这个操作,其实就是在上面的代码connectionFacade.activated()的实现中。

//StatefulRedisConnectionImpl.activated。
public void activated() {

    super.activated();
    // do not block in here, since the channel flow will be interrupted.
    //如果密码不为空就设置密码
    if (password != null) {
        async.authAsync(password);
    }
    //如果db!=0就设置db
    if (db != 0) {
        async.selectAsync(db);
    }

    if (clientName != null) {
        setClientName(clientName);
    }

    if (readOnly) {
        async.readOnly();
    }
}

从上面可以看到当Channel重新连接成功时StatefulRedisConnectionImpl的activated方法会被调用,在该方法中会检测密码不为空就调用auth命令,那么StatefulRedisConnectionImpl是如何知道密码的呢。原因是在preProcessCommand方法中:

//StatefulRedisConnectionImpl.preProcessCommand,该方法会在每次dispatchCommand的时候被调用,Lettuce在第一次建立连接的时候会调用AUTH和SELECT方法,在调用这些方法的时候StatefulRedisConnectionImpl就会记住password和db。从而在断线重连的时候会自动执行AUTH和SELECT方法。
protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> command) {

    RedisCommand<K, V, T> local = command;
    //如果该Command是AUTH,就等该Command返回成功之后记录下password
    if (local.getType().name().equals(AUTH.name())) {
            local = attachOnComplete(local, status -> {
                    if ("OK".equals(status)) {

                            char[] password = CommandArgsAccessor.getFirstCharArray(command.getArgs());

                            if (password != null) {
                                    this.password = password;
                            } else {

                                    String stringPassword = CommandArgsAccessor.getFirstString(command.getArgs());
                                    if (stringPassword != null) {
                                            this.password = stringPassword.toCharArray();
                                    }
                            }
                    }
            });
    }
    //如果该Command是SELECT,就等该Command返回成功之后记录下db
    if (local.getType().name().equals(SELECT.name())) {
            local = attachOnComplete(local, status -> {
                    if ("OK".equals(status)) {
                            Long db = CommandArgsAccessor.getFirstInteger(command.getArgs());
                            if (db != null) {
                                    this.db = db.intValue();
                            }
                    }
            });
    }
    //如果该Command是READONLY,就等该Command返回成功之后记录下readonly为true
    if (local.getType().name().equals(READONLY.name())) {
            local = attachOnComplete(local, status -> {
                    if ("OK".equals(status)) {
                            this.readOnly = true;
                    }
            });
    }

    ...
    
    return local;
}

至此Lettuce的重连逻辑完成了,因为第一次创建连接的时候Bootstrap对象已经被配置好了,所以在断线重连的时候逻辑简单了很多,而且很多AUTH、SELECT等命令被放在了Channel的pipeline相对应类中去实现了。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容

  • 昨日种种 已成今日纷纷 命运的轨迹依旧不可捉摸 生活的旋律却如以往 满布艰辛 孤独的使者常在夜晚哭泣 他的眼中 没...
    腾_飞阅读 142评论 0 0
  • 2018.03.28
    一只_Iris阅读 380评论 0 0
  • 001.只有当一个人观察那些书本里的词汇,如何在现实政治中展开时,才能认识到民主不仅仅是一个抽象的概念,而且是一种...
    阳菌阅读 182评论 0 0
  • 流年,无影无踪、无声无息的。日复一日,总觉得没有什么区别。 回过头来,才发现,无忧无虑的童年已是很遥远...
    叔琳阅读 452评论 0 1
  • 家具展建立的重要细节 1、品牌形象和产品的杰出展现,品牌形象是一个企业的标志,更是产品的“质量保证书”。消费者对品...
    wuwu35阅读 391评论 0 0