上一篇主要介绍了RedisCommand在Lettuce中是如何流转的,以及Lettuce是如何通过单一连接处理多线程请求的。这一篇主要介绍一下Lettuce中是如何实现Redis断线重连的,在介绍之前先看一张图回忆一下RedisComman流转过程中牵扯到的几个类,以及类之间的关系。
如上图所示RedisCommand在Lettuce中经历了如下流程:
- RedisAsyncCommands调用StatefulRedisConnectionImpl的dispatch
- StatefulRedisConnectionImpl调用DefaultEndpoint的writeCommand
- 与Redis连接正常,autoFlush为true是,DefaultEndpoint会直接把Command通过Netty的channel发送出去
- Netty收到RedisCommand之后,会在EventLoop中回调我们的初始化Bootstrap时配置的CommandHandler的write方法,在这个write方法中CommandHandler会把RedisCommand放入stack(双向队列)的尾部
- 把RedisCommand序列化之后发送给Redis
- Netty在收到Redis的response之后会在EventLoop中回调CommandHandler的channelRead方法,CommandHandler会在这个方法调用中从stack的头部取一个RedisCommand,基于这个RedisCommand对Redis的response反序列化然后调用RedisCommand的complete方法,该RedisCommand的调用方就会收到通知收到Redis消息了。至此RedisCommand就算结束了旅程。
这个时候可能会有疑问?CommandHandler怎么确保Redis返回的消息就一定能与stack双向队列的第一个RedisCommand对应上的呢,也就是说Redis返回的消息为什么就刚好是第一个RedisCommand请求的结果呢。
其实上一篇已经介绍了,在正常场景下CommandHandler接收RedisCommand的是串行有序的,把RedisCommand通过tcp协议写入Redis也是有序的,Redis本身是单线程处理请求,所以Redis内部处理以及返回结果也是有序的1这样就能保证先进入CommandHandler的RedisCommand一定先收到Redis的响应。(这里可以思考一下,如果Redis不是单线程的,比如Dubbo也是单一长连接,但是服务端是多线程并发处理请求的,所以对于请求的返回是无序的,用这种stack数据结构是否可行呢?)
上面说了正常场景下CommandHandler的stack结构可以保证请求与Redis的返回结果对应上,那如果连接断开又连接上了,这种顺序还能保证吗?答案是不能保证,下面就具体看一下Lettuce的断线重连是如何实现,以及断线重连期间都做了什么工作保证RedisCommand能与Redis影响请求对应上的。
Lettuce实现断线重连的核心类是ConnectionWatchdog,那么ConnectionWatchdog具体是如何被实例化、被应用的,需要回过头来看下Redis连接的初始化过程。
- 初始化Netty的Bootstrap时设置PlainChannelInitializer
- Netty的channel连接初始化时会回调PlainChannelInitializer的initChannel方法
- 在initChannel方法中会调用ConnectionBuidler.buildHandlers方法获取所有的handler放入channel的pipeline中。(Netty对于收到和发送的所有消息都会挨个调用pipeline,具体可以参考Netty权威指南这本书)
- ConnectionBuidler.build方法中会负责创建ConnectionWatchdog
//RedisClient.initializeChannelAsync0
private void initializeChannelAsync0(ConnectionBuilder connectionBuilder, CompletableFuture<Channel> channelReadyFuture,
SocketAddress redisAddress) {
logger.debug("Connecting to Redis at {}", redisAddress);
Bootstrap redisBootstrap = connectionBuilder.bootstrap();
//创建一个RedisChannelInitializer
RedisChannelInitializer initializer = connectionBuilder.build();
//把initializer赋值给RedisBootstrap,Netty会在Channel初始化的时候回调该initializer
redisBootstrap.handler(initializer);
...
}
// ConnectionBuidler.build
public RedisChannelInitializer build() {
//创建PlainChannelInitializer对象,这个地方要注意this::buildHandlers方法,PlainChannelInitializer会在Channel初始化的时候调用该this::buildHandler方法获取所有的handler放入Channel的handler pipeline中。
return new PlainChannelInitializer(pingCommandSupplier, this::buildHandlers, clientResources, timeout);
}
// RedisChannelInitializer.initChannel 该方法会在建立连接,Channel初始化的时候被调用
@Override
protected void initChannel(Channel channel) throws Exception {
...
//调用ConnectionBuidler.buildHandlers方法获取所有的handler放入channel的pipeline中。(对于Netty的pipeline机制可以参考Netty权威指南这本书)
for (ChannelHandler handler : handlers.get()) {
channel.pipeline().addLast(handler);
}
clientResources.nettyCustomizer().afterChannelInitialized(channel);
}
//ConnectionBuidler.buildHandlers负责创建Channel所使用的ChannelHandler对象
protected List<ChannelHandler> buildHandlers() {
...
handlers.add(new ChannelGroupListener(channelGroup));
handlers.add(new CommandEncoder());
handlers.add(commandHandlerSupplier.get());
// 判断如果配置了自动重连就添加ConnectionWatchdog
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
handlers.add(new ConnectionEventTrigger(connectionEvents, connection, clientResources.eventBus()));
//这个地方又检查并添加了一遍,不太明白再次创建的目的是什么,有知道的朋友,欢迎指出。
if (clientOptions.isAutoReconnect()) {
handlers.add(createConnectionWatchdog());
}
return handlers;
}
上面可以看到ConnectionWatchdog是如何被应用到Netty的ChannelHandler中的,下面看下ConnectionWatchdog是如何构建的,以及如何自动重连的。
- 基于配置创建ConnectionWatchdog
- ConnectionWatchdog的ChannelActive和ChannelInActive会在Channel建立成功和断开连接的时候被回调
- 在ConnectionWatchdog的ChannelInActive方法中会尝试重连,断开连接之后并不是立即重连,而是根据一个延时重连的策略来延迟执行重连任务。
protected ConnectionWatchdog createConnectionWatchdog() {
// 可以看到即使上面被调用了两次,其实对象只有一个。另外因为对于一个StatefulConnectionImpl来说,ConnectionBuilder是同一个的,所以即使Channel断线重连了,ConnectionWatchdog也还是这个对象。
if (connectionWatchdog != null) {
return connectionWatchdog;
}
...
//基于一些配置项构建ConnectionWatchdog对象
ConnectionWatchdog watchdog = new ConnectionWatchdog(clientResources.reconnectDelay(), clientOptions, bootstrap, timer,
clientResources.eventExecutorGroup(), socketAddressSupplier, reconnectionListener, connection);
//把watchdog传给endpoint,前面已经说过endpoint是更高级别的抽象,用来抽象底层channel,注册给endpoint是为了让endpoint在某些场景下直接调用配置watchdog。
endpoint.registerConnectionWatchdog(watchdog);
//把创建的watchdog赋值给当前的ConnectionBuilder对象
connectionWatchdog = watchdog;
return watchdog;
}
//Channel建立成功之后回调channelActive,channelActive方法中其实没做什么实质性的工作,主要是把reconnectSchedulerSync设置为false,相当于释放锁
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//reconnectSchedulerSync可以理解是重连定时任务的锁,设置为false表示锁是释放的。
reconnectSchedulerSync.set(false);
channel = ctx.channel();
reconnectScheduleTimeout = null;
logPrefix = null;
remoteAddress = channel.remoteAddress();
logPrefix = null;
logger.debug("{} channelActive()", logPrefix());
super.channelActive(ctx);
}
//断开连接的时候channelInactive会被调用
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
...
//把旧的channl设置为null,旧的channel就是断开连接的channel
channel = null;
if (listenOnChannelInactive && !reconnectionHandler.isReconnectSuspended()) {
// 真正的重连逻辑在这里!!!!
scheduleReconnect();
} else {
logger.debug("{} Reconnect scheduling disabled", logPrefix(), ctx);
}
super.channelInactive(ctx);
}
//scheduleReconnect,顾名思义计划派发重连,并不是真正的重连
public void scheduleReconnect() {
...
// 通过对reconnectSchedulerSync做cas的方式获取锁
if ((channel == null || !channel.isActive()) && reconnectSchedulerSync.compareAndSet(false, true)) {
attempts++;
final int attempt = attempts;
//根据重试次数获取延迟执行重连的时间,这个应该也好理解,当连接断开的时候并不是立即重连的(因为此时重连大概率也是失败),默认的重连策略是等待X时间再尝试连接,这个X是递增的,也就是说失败的次数越多,下次重试之前间隔的时间越长,当然也有一个上限。
int timeout = (int) reconnectDelay.createDelay(attempt).toMillis();
logger.debug("{} Reconnect attempt {}, delay {}ms", logPrefix(), attempt, timeout);
this.reconnectScheduleTimeout = timer.newTimeout(it -> {
...
//通过reconnectWorkers来真正的执行重连逻辑,而不是在当前线程中
reconnectWorkers.submit(() -> {
//真正的重连逻辑!!!!
ConnectionWatchdog.this.run(attempt);
return null;
});
}, timeout, TimeUnit.MILLISECONDS);
// Set back to null when ConnectionWatchdog#run runs earlier than reconnectScheduleTimeout's assignment.
if (!reconnectSchedulerSync.get()) {
reconnectScheduleTimeout = null;
}
} else {
logger.debug("{} Skipping scheduleReconnect() because I have an active channel", logPrefix());
}
}
下面看一下ConnectionWatchdog真正执行重连的逻辑
//ConnectionWatchdog.run是真正执行重连的逻辑,并且是在其他线程中执行的
public void run(int attempt) throws Exception {
//设置为false,表示释放reconnectSchedulerSync的锁
reconnectSchedulerSync.set(false);
...
try {
reconnectionListener.onReconnectAttempt(new ConnectionEvents.Reconnect(attempt));
logger.log(infoLevel, "Reconnecting, last destination was {}", remoteAddress);
//真正的重连逻辑在这里!!!!
CompletableFuture<Channel> future = reconnectionHandler.reconnect();
future.whenComplete((c, t) -> {
//如果连接建立成功了就直接返回
if (c != null && t == null) {
return;
}
...
//如果连接建立失败,就重新尝试重连
if (!isReconnectSuspended()) {
scheduleReconnect();
}
});
} catch (Exception e) {
logger.log(warnLevel, "Cannot reconnect: {}", e.toString());
}
}
//ReconnectionHandler.reconnect
protected CompletableFuture<Channel> reconnect() {
CompletableFuture<Channel> future = new CompletableFuture<>();
//通过socketAddressSupplier获取实际Redis地址,获取到Redis地址之后执行真正的重连逻辑
socketAddressSupplier.subscribe(remoteAddress -> {
if (future.isCancelled()) {
return;
}
//真正的重连逻辑
reconnect0(future, remoteAddress);
}, future::completeExceptionally);
return this.currentFuture = future;
}
//ReconnectionHandler.reconnect0
private void reconnect0(CompletableFuture<Channel> result, SocketAddress remoteAddress) {
//其实重连就是调用bootstrap的connect方法
ChannelFuture connectFuture = bootstrap.connect(remoteAddress);
ChannelPromise initFuture = connectFuture.channel().newPromise();
logger.debug("Reconnecting to Redis at {}", remoteAddress);
result.whenComplete((c, t) -> {
if (t instanceof CancellationException) {
connectFuture.cancel(true);
initFuture.cancel(true);
}
});
initFuture.addListener((ChannelFuture it) -> {
if (it.cause() != null) {
connectFuture.cancel(true);
close(it.channel());
result.completeExceptionally(it.cause());
} else {
result.complete(connectFuture.channel());
}
});
//异常和超时逻辑处理
...
}
所以其实真正重连的实现方法就是调用bootstrap.connect方法,这里可能会有一个疑问:connection方法会的ChannelFuture对象并没有被使用,前面的文章中提到过DefaultEndpoint抽象了channel的调用,所以DefaultEndpoint对象中是有对Channel对象的引用的,那重新连接成功创建的Channel是如何告知DefaultEndpoint的呢。
其实根源还是PlainChannelInitializer中,PlainChannelInitializer对象是配置到Netty的bootstrap中的,所以当每次该bootstrap对象创建一个channel的时候都会调用PlainChannelInitializer的initchannel方法,从而把ConnectionBuilder中得handlers注册到channel中。这个handlers中有一个CommandHandler(虽然每次创建新的channel都会创建新的CommandHandler,但是所有的CommandHandler对象引用的DefaultEndpoint是同一个)。实现如下:
//CommandHandler.
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
...
//调用Endpoint的notifyChannelActive方法告知Channel变了
endpoint.notifyChannelActive(ctx.channel());
super.channelActive(ctx);
...
}
//DefaultEndpoint.notifyChannelActive
@Override
public void notifyChannelActive(Channel channel) {
this.logPrefix = null;
//把自己的channel指向为新的channel对象
this.channel = channel;
this.connectionError = null;
...
//让connectionWatchdog重新监听断开连接事件
if (connectionWatchdog != null) {
connectionWatchdog.arm();
}
//获取悲观锁
sharedLock.doExclusive(() -> {
try {
// Move queued commands to buffer before issuing any commands because of connection activation.
// That's necessary to prepend queued commands first as some commands might get into the queue
// after the connection was disconnected. They need to be prepended to the command buffer
...
//调用StatefulConnectionImpl的activated方法,这个里面也做很多总要的事情!!!!
connectionFacade.activated();
//把断开连接时缓存的Command重新通过Channel发送出去
flushCommands(disconnectedBuffer);
} catch (Exception e) {
...
}
});
}
可能还会有一个疑问,就是我们在第一次创建连接的时候,在连接成功之后有判断是否有密码,有密码就发送AUTH命令,有选择DB就发送Select命令等,在重连的时候却并没有看到这个操作,其实就是在上面的代码connectionFacade.activated()的实现中。
//StatefulRedisConnectionImpl.activated。
public void activated() {
super.activated();
// do not block in here, since the channel flow will be interrupted.
//如果密码不为空就设置密码
if (password != null) {
async.authAsync(password);
}
//如果db!=0就设置db
if (db != 0) {
async.selectAsync(db);
}
if (clientName != null) {
setClientName(clientName);
}
if (readOnly) {
async.readOnly();
}
}
从上面可以看到当Channel重新连接成功时StatefulRedisConnectionImpl的activated方法会被调用,在该方法中会检测密码不为空就调用auth命令,那么StatefulRedisConnectionImpl是如何知道密码的呢。原因是在preProcessCommand方法中:
//StatefulRedisConnectionImpl.preProcessCommand,该方法会在每次dispatchCommand的时候被调用,Lettuce在第一次建立连接的时候会调用AUTH和SELECT方法,在调用这些方法的时候StatefulRedisConnectionImpl就会记住password和db。从而在断线重连的时候会自动执行AUTH和SELECT方法。
protected <T> RedisCommand<K, V, T> preProcessCommand(RedisCommand<K, V, T> command) {
RedisCommand<K, V, T> local = command;
//如果该Command是AUTH,就等该Command返回成功之后记录下password
if (local.getType().name().equals(AUTH.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
char[] password = CommandArgsAccessor.getFirstCharArray(command.getArgs());
if (password != null) {
this.password = password;
} else {
String stringPassword = CommandArgsAccessor.getFirstString(command.getArgs());
if (stringPassword != null) {
this.password = stringPassword.toCharArray();
}
}
}
});
}
//如果该Command是SELECT,就等该Command返回成功之后记录下db
if (local.getType().name().equals(SELECT.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
Long db = CommandArgsAccessor.getFirstInteger(command.getArgs());
if (db != null) {
this.db = db.intValue();
}
}
});
}
//如果该Command是READONLY,就等该Command返回成功之后记录下readonly为true
if (local.getType().name().equals(READONLY.name())) {
local = attachOnComplete(local, status -> {
if ("OK".equals(status)) {
this.readOnly = true;
}
});
}
...
return local;
}
至此Lettuce的重连逻辑完成了,因为第一次创建连接的时候Bootstrap对象已经被配置好了,所以在断线重连的时候逻辑简单了很多,而且很多AUTH、SELECT等命令被放在了Channel的pipeline相对应类中去实现了。