本文是结合redis的源码来总结一些原理性内容(如有不正确请多多指正)。重新梳理redis的sentinel的高可用解决方案在于更好的去对比理解区块链的分布式存储问题的架构方案。理解两种模式的优缺点,从而更好的将不同的模式放入不同的应用场景中去。现在区块链大火,有很多的人是为了区块链而区块链,更有甚者是为了资本和致富而区块链。从个人角度看,其分布式存储解决方案、安全性和奖励机制及交易体制都是整个区块链缺一不可的。尽管如此但还是会有很多人将其一一拆解开来。不过从技术的角度拆分出来看区块链架构方面的解决方案,确是有必要的和给人启发的。有关区块链的分布式存储架构的内容,见《区块链至分布式存储》,及其两者在架构层面的对比见《区块链分布式存储与redis分布式存储对比》。
什么是redis的sentinel高可用解决方案
redis的sentinel的解决方案基于主从复制结构着眼于分布式存储容错、容灾问题的高可用方案。以确保redis可以从容去应对多种突发情况(比如网络连接问题、宕机问题,设备故障问题等等)。首先容错的基础就是数据备份、备份自然就离不开持久化和复制两种方式。容错问题在于当我们拥有了多份备份(这备份指的是复制,个人认为在某种意义上redis的主从复制结构就是一个热备的过程,这种结构一方面可以容错,另一方面也可以根据业务特性利用来做读写分离,从一定程度上缓解服务大流量带来的压力。当被使用成为进行读写分离的时候就需要根据业务对于数据的一致性要求程度了。)系统时如何能够自主的做出相对正确合理的选择去应对这些问题,并对客户端做到透明。sentinel
机制正是redis对这一问题的一种解决方案,对于配合sentinel
进行master
切换客户端连接的代码主要被实现在各种语言的客户端代码里,不在服务器代码中。
sentinel的服务架构体系
首先需要理解sentinel的几点:
- sentinel本身是监督者的身份,没有存储功能。在整个体系中一个sentinel者或一群sentinels与主从服务架构体系是监督与被监督的关系。
- 作为一个sentinel在整个架构体系中有就可能有如下三种交互:sentinel与主服务器、sentinel与从服务器、sentinel与其他sentinel。
- 既然是交互,交互所需要的基本内容对于这三种场景还是一样的,首先要构建这样的一个交互网络无可避免,需要节点的注册与发现、节点之间的通信连接、节点保活、节点之间的通信协议等。
- 因为角色不同所以在这个架构体系中承担的功能也不一样。所以交互的内容也不一样。
在理解了以上几点之后,我们一步步从构建sentinel网络体系到这整个体系结构是如何来保证其高可用性来分析。
构建sentinel网络结构体系
初始化sentinel
- 启动sentinel模式初始化使用命令
redis-sentinel /path/to/sentinel.conf
或者redis-server /path/to/sentinel.conf --sentinel
这个官方文档都有介绍可参考:Redis Sentinel Documentation - 启动sentinel模式时,sentinel主要做了一下几件事:初始化服务器、加载命令表、加载配置文件初始化、监听主服务器信息启动周期函数。
如下便是server.c
中main()
方法的有关sentinel模式的源码,有兴趣可以自己从头到尾调试的方法可以参考另一篇博客linux上用gdb调试redis源码和Redis debugging guide
...
setlocale(LC_COLLATE,"");
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
char hashseed[16];
getRandomHexChars(hashseed,sizeof(hashseed));
dictSetHashFunctionSeed((uint8_t*)hashseed);
server.sentinel_mode = checkForSentinelMode(argc,argv);
initServerConfig();
moduleInitModulesSystem();
/* Store the executable path and arguments in a safe place in order
* to be able to restart the server later. */
server.executable = getAbsolutePath(argv[0]);
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
...
在代码中可以找到与sentinel
模式的相关初始化方法initSentinelConfig()、initSentinel()
sentinel.c/initSentinelConfig
/* This function overwrites a few normal Redis config default with Sentinel
* specific defaults. */
void initSentinelConfig(void) {
server.port = REDIS_SENTINEL_PORT;
}
sentinel.c/initSentinel
/* Perform the Sentinel mode initialization. */
void initSentinel(void) {
unsigned int j;
/* Remove usual Redis commands from the command table, then just add
* the SENTINEL command. */
dictEmpty(server.commands,NULL);
for (j = 0; j < sizeof(sentinelcmds)/sizeof(sentinelcmds[0]); j++) {
int retval;
struct redisCommand *cmd = sentinelcmds+j;
retval = dictAdd(server.commands, sdsnew(cmd->name), cmd);
serverAssert(retval == DICT_OK);
}
/* Initialize various data structures. */
sentinel.current_epoch = 0;
sentinel.masters = dictCreate(&instancesDictType,NULL);
sentinel.tilt = 0;
sentinel.tilt_start_time = 0;
sentinel.previous_time = mstime();
sentinel.running_scripts = 0;
sentinel.scripts_queue = listCreate();
sentinel.announce_ip = NULL;
sentinel.announce_port = 0;
sentinel.simfailure_flags = SENTINEL_SIMFAILURE_NONE;
memset(sentinel.myid,0,sizeof(sentinel.myid));
}
从两段源码中看到sentinel
模式的初始化过程,先初始化默认端口26379
,然后加载命令表和初始化sentinelState
。
sentinel
加载的命令表与普通redis
模式的命令表有所不同,sentinel
模式只支持的如下几种命令,因此也就意味着sentinel
架构本身也只会用这些命令。
struct redisCommand sentinelcmds[] = {
{"ping",pingCommand,1,"",0,NULL,0,0,0,0,0},
{"sentinel",sentinelCommand,-2,"",0,NULL,0,0,0,0,0},
{"subscribe",subscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"unsubscribe",unsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"psubscribe",psubscribeCommand,-2,"",0,NULL,0,0,0,0,0},
{"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0},
{"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0},
{"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0},
{"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0},
{"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0},
{"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}
};
基本数据结构
sentinelState
/* Main state. */
struct sentinelState {
char myid[CONFIG_RUN_ID_SIZE+1]; /* This sentinel ID. */
uint64_t current_epoch; /* Current epoch. */
dict *masters; /* Dictionary of master sentinelRedisInstances.
Key is the instance name, value is the
sentinelRedisInstance structure pointer. */
int tilt; /* Are we in TILT mode? */
int running_scripts; /* Number of scripts in execution right now. */
mstime_t tilt_start_time; /* When TITL started. */
mstime_t previous_time; /* Last time we ran the time handler. */
list *scripts_queue; /* Queue of user scripts to execute. */
char *announce_ip; /* IP addr that is gossiped to other sentinels if
not NULL. */
int announce_port; /* Port that is gossiped to other sentinels if
non zero. */
unsigned long simfailure_flags; /* Failures simulation. */
} sentinel;
sentinelRedisInstance
typedef struct sentinelRedisInstance {
int flags; /* See SRI_... defines */
char *name; /* Master name from the point of view of this sentinel. */
char *runid; /* Run ID of this instance, or unique ID if is a Sentinel.*/
uint64_t config_epoch; /* Configuration epoch. */
sentinelAddr *addr; /* Master host. */
instanceLink *link; /* Link to the instance, may be shared for Sentinels. */
mstime_t last_pub_time; /* Last time we sent hello via Pub/Sub. */
mstime_t last_hello_time; /* Only used if SRI_SENTINEL is set. Last time
we received a hello from this Sentinel
via Pub/Sub. */
mstime_t last_master_down_reply_time; /* Time of last reply to
SENTINEL is-master-down command. */
mstime_t s_down_since_time; /* Subjectively down since time. */
mstime_t o_down_since_time; /* Objectively down since time. */
mstime_t down_after_period; /* Consider it down after that period. */
mstime_t info_refresh; /* Time at which we received INFO output from it. */
/* Role and the first time we observed it.
* This is useful in order to delay replacing what the instance reports
* with our own configuration. We need to always wait some time in order
* to give a chance to the leader to report the new configuration before
* we do silly things. */
int role_reported;
mstime_t role_reported_time;
mstime_t slave_conf_change_time; /* Last time slave master addr changed. */
/* Master specific. */
dict *sentinels; /* Other sentinels monitoring the same master. */
dict *slaves; /* Slaves for this master instance. */
unsigned int quorum;/* Number of sentinels that need to agree on failure. */
int parallel_syncs; /* How many slaves to reconfigure at same time. */
char *auth_pass; /* Password to use for AUTH against master & slaves. */
/* Slave specific. */
mstime_t master_link_down_time; /* Slave replication link down time. */
int slave_priority; /* Slave priority according to its INFO output. */
mstime_t slave_reconf_sent_time; /* Time at which we sent SLAVE OF <new> */
struct sentinelRedisInstance *master; /* Master instance if it's slave. */
char *slave_master_host; /* Master host as reported by INFO */
int slave_master_port; /* Master port as reported by INFO */
int slave_master_link_status; /* Master link status as reported by INFO */
unsigned long long slave_repl_offset; /* Slave replication offset. */
/* Failover */
char *leader; /* If this is a master instance, this is the runid of
the Sentinel that should perform the failover. If the Sentinel that should perform the failover. If
this is a Sentinel, this is the runid of the Sentinel
that this Sentinel voted as leader. */
uint64_t leader_epoch; /* Epoch of the 'leader' field. */
uint64_t failover_epoch; /* Epoch of the currently started failover. */
int failover_state; /* See SENTINEL_FAILOVER_STATE_* defines. */
mstime_t failover_state_change_time;
mstime_t failover_start_time; /* Last failover attempt start time. */
mstime_t failover_timeout; /* Max time to refresh failover state. */
mstime_t failover_delay_logged; /* For what failover_start_time value we
logged the failover delay. */
struct sentinelRedisInstance *promoted_slave; /* Promoted slave instance. */
/* Scripts executed to notify admin or reconfigure clients: when they
* are set to NULL no script is executed. */
char *notification_script;
char *client_reconfig_script;
sds info; /* cached INFO output */
} sentinelRedisInstance;
这两个结构体,就是sentinel
核心的数据结构。一个存储了sentinel
的自身状态,一个存储了master、slave及其他监听同一个master的sentinel应用实例的信息。在服务启动的时候构建一个这样的关系来存储sentinel
与其他三种角色的关系。如图:
sentinel&master
-
发现服务
在sentinelState
结构体中发现,有一个master的指针,这是个字典表,字典里保存是一个个指向sentinelRedisInstance
实例的地址。而这个监听的master的ip、port是从配置文件sentinel.conf
中的配置sentinel monitor mymaster 127.0.0.1 6379 2
中所解析。详细配置说明可查询配置文件中的注解。入口方法调用server.c/main->config,c/loadServerConfig->config.c/loadServerConfigFromString->sentinel.c/sentinelHandleConfiguration
loadServerConfigFromString
中相关加载sentinel
模式的代码如下:
} else if (!strcasecmp(argv[0],"sentinel")) {
/* argc == 1 is handled by main() as we need to enter the sentinel
* mode ASAP. */
if (argc != 1) {
if (!server.sentinel_mode) {
err = "sentinel directive while not in sentinel mode";
goto loaderr;
}
err = sentinelHandleConfiguration(argv+1,argc-1);
if (err) goto loaderr;
}
}
sentinel.c
的sentinelHandleConfiguration
方法解析加载监督master
配置代码如下:
char *sentinelHandleConfiguration(char **argv, int argc) {
sentinelRedisInstance *ri;
if (!strcasecmp(argv[0],"monitor") && argc == 5) {
/* monitor <name> <host> <port> <quorum> */
int quorum = atoi(argv[4]);
if (quorum <= 0) return "Quorum must be 1 or greater.";
if (createSentinelRedisInstance(argv[1],SRI_MASTER,argv[2],
atoi(argv[3]),quorum,NULL) == NULL)
{
switch(errno) {
case EBUSY: return "Duplicated master name.";
case ENOENT: return "Can't resolve master instance hostname.";
case EINVAL: return "Invalid port number";
}
}
} ...
return NULL;
}
根据对monitor master
的配置的解析,sentinel
创建了存储master
信息的sentinelRedisInstance
结构。接着看到createSentinelRedisInstance
方法:
sentinelRedisInstance *createSentinelRedisInstance(char *name, int flags, char *hostname, int port, int quorum, sentinelRedisInstance *master) {
sentinelRedisInstance *ri;
sentinelAddr *addr;
dict *table = NULL;
char slavename[NET_PEER_ID_LEN], *sdsname;
serverAssert(flags & (SRI_MASTER|SRI_SLAVE|SRI_SENTINEL));
serverAssert((flags & SRI_MASTER) || master != NULL);
/* Check address validity. */
addr = createSentinelAddr(hostname,port);
if (addr == NULL) return NULL;
/* For slaves use ip:port as name. */
if (flags & SRI_SLAVE) {
anetFormatAddr(slavename, sizeof(slavename), hostname, port);
name = slavename;
}
/* Make sure the entry is not duplicated. This may happen when the same
* name for a master is used multiple times inside the configuration or
* if we try to add multiple times a slave or sentinel with same ip/port
* to a master. */
if (flags & SRI_MASTER) table = sentinel.masters;
else if (flags & SRI_SLAVE) table = master->slaves;
else if (flags & SRI_SENTINEL) table = master->sentinels;
sdsname = sdsnew(name);
if (dictFind(table,sdsname)) {
releaseSentinelAddr(addr);
sdsfree(sdsname);
errno = EBUSY;
return NULL;
}
/* Create the instance object. */
ri = zmalloc(sizeof(*ri));
/* Note that all the instances are started in the disconnected state,
* the event loop will take care of connecting them. */
ri->flags = flags;
ri->name = sdsname;
ri->runid = NULL;
ri->config_epoch = 0;
ri->addr = addr;
ri->link = createInstanceLink();
ri->last_pub_time = mstime();
ri->last_hello_time = mstime();
ri->last_master_down_reply_time = mstime();
ri->s_down_since_time = 0;
ri->o_down_since_time = 0;
ri->down_after_period = master ? master->down_after_period :
SENTINEL_DEFAULT_DOWN_AFTER;
ri->master_link_down_time = 0;
ri->auth_pass = NULL;
ri->slave_priority = SENTINEL_DEFAULT_SLAVE_PRIORITY;
ri->slave_reconf_sent_time = 0;
ri->slave_master_host = NULL;
ri->slave_master_port = 0;
ri->slave_master_link_status = SENTINEL_MASTER_LINK_STATUS_DOWN;
ri->slave_repl_offset = 0;
ri->sentinels = dictCreate(&instancesDictType,NULL);
ri->quorum = quorum;
ri->parallel_syncs = SENTINEL_DEFAULT_PARALLEL_SYNCS;
ri->master = master;
ri->slaves = dictCreate(&instancesDictType,NULL);
ri->info_refresh = 0;
/* Failover state. */
ri->leader = NULL;
ri->leader_epoch = 0;
ri->failover_epoch = 0;
ri->failover_state = SENTINEL_FAILOVER_STATE_NONE;
ri->failover_state_change_time = 0;
ri->failover_start_time = 0;
ri->failover_timeout = SENTINEL_DEFAULT_FAILOVER_TIMEOUT;
ri->failover_delay_logged = 0;
ri->promoted_slave = NULL;
ri->notification_script = NULL;
ri->client_reconfig_script = NULL;
ri->info = NULL;
/* Role */
ri->role_reported = ri->flags & (SRI_MASTER|SRI_SLAVE);
ri->role_reported_time = mstime();
ri->slave_conf_change_time = mstime();
/* Add into the right table. */
dictAdd(table, ri->name, ri);
return ri;
}
这段代码是通用代码,用于创建master、slave、sentinel实例关系的,通过flags来进行区分。看到master
相关的代码:
- 首先对传入的
ip、port
进行检验并初始化SentinelAddr
. - 查找
dict
表中是否有重名的master
、如果有则返回并抛出错误码EBUSY
. - 创建一个实例对象,初始化一些配置的默认值如:
InstanceLink
、sentinels
、slaves
等等。 - 将当前的实例对象添加进字典中。
如便完成了监听master字典表的构建。
-
建立与master的连接
redis是单线程的,基于事件回调来实现。因此还是回到server.c
文件的main()
方法,可以看到在加载完sentinel
的配置文件后会启动事件循环,代码如下:
...
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
aeMain(server.el);
aeDeleteEventLoop(server.el);
...
在redis的事件回调中分了时间事件和文件事件,对于sentinel
的一些连接心跳检测、服务状态检测,sentinel
的发现等等都是一个周期性的过程。因此创建连接、发送cmd获取服务器状态和广播消息等这些一定都是通过时间事件来完成。在debug
的aeMain
方法途中,发现当运行到时间事件时进入是serverCron
回调方法。其实仔细一点就会发现这个方法是在initServer
的时候被注册为时间事件的回调,那么顺藤摸瓜我们也就发现sentinel.c
注册的在serverCron
中入口方法sentinelTimer
。
initServer
中注册的时间回调
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
serverCron
中有关sentinel
的处理代码
...
/* Run the Sentinel timer if we are in sentinel mode. */
run_with_period(100) {
if (server.sentinel_mode) sentinelTimer();
}
...
那么继续深入sentinelTimer
方法内部很快便发现了创建连接代码。在sentinel.c
文件方法调用链如下sentinelTimer->sentinelHandleDictOfRedisInstances->sentinelHandleRedisInstance->sentinelReconnectInstance
,这个链路是通用方法。包括了在sentinel结构体系中三种角色的连接创建。
sentinelReconnectInstance
代码:
/* Create the async connections for the instance link if the link
* is disconnected. Note that link->disconnected is true even if just
* one of the two links (commands and pub/sub) is missing. */
void sentinelReconnectInstance(sentinelRedisInstance *ri) {
if (ri->link->disconnected == 0) return;
if (ri->addr->port == 0) return; /* port == 0 means invalid address. */
instanceLink *link = ri->link;
mstime_t now = mstime();
if (now - ri->link->last_reconn_time < SENTINEL_PING_PERIOD) return;
ri->link->last_reconn_time = now;
/* Commands connection. */
if (link->cc == NULL) {
link->cc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->cc->err) {
sentinelEvent(LL_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
link->cc->errstr);
instanceLinkCloseConnection(link,link->cc);
} else {
link->pending_commands = 0;
link->cc_conn_time = mstime();
link->cc->data = link;
redisAeAttach(server.el,link->cc);
redisAsyncSetConnectCallback(link->cc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->cc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->cc);
sentinelSetClientName(ri,link->cc,"cmd");
/* Send a PING ASAP when reconnecting. */
sentinelSendPing(ri);
}
}
/* Pub / Sub */
if ((ri->flags & (SRI_MASTER|SRI_SLAVE)) && link->pc == NULL) {
link->pc = redisAsyncConnectBind(ri->addr->ip,ri->addr->port,NET_FIRST_BIND_ADDR);
if (link->pc->err) {
sentinelEvent(LL_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
link->pc->errstr);
instanceLinkCloseConnection(link,link->pc);
} else {
int retval;
link->pc_conn_time = mstime();
link->pc->data = link;
redisAeAttach(server.el,link->pc);
redisAsyncSetConnectCallback(link->pc,
sentinelLinkEstablishedCallback);
redisAsyncSetDisconnectCallback(link->pc,
sentinelDisconnectCallback);
sentinelSendAuthIfNeeded(ri,link->pc);
sentinelSetClientName(ri,link->pc,"pubsub");
/* Now we subscribe to the Sentinels "Hello" channel. */
retval = redisAsyncCommand(link->pc,
sentinelReceiveHelloMessages, ri, "SUBSCRIBE %s",
SENTINEL_HELLO_CHANNEL);
if (retval != C_OK) {
/* If we can't subscribe, the Pub/Sub connection is useless
* and we can simply disconnect it and try again. */
instanceLinkCloseConnection(link,link->pc);
return;
}
}
}
/* Clear the disconnected status only if we have both the connections
* (or just the commands connection if this is a sentinel instance). */
if (link->cc && (ri->flags & SRI_SENTINEL || link->pc))
link->disconnected = 0;
}
对于master
,sentinel
会创建两个连接,一个是用于发送command
而另一个是广播pub/sub
的连接。
- commands连接创建完后,注册连接回调处理方法
sentinelLinkEstablishedCallback
、连接断开回调处理方法sentinelDisconnectCallback
,还有master
连接需要权限验证的方法sentinelSendAuthIfNeeded
该方法在连接建立会后发送Auth pwd
命令验证权限,然后设置客户端的名字为cmd
,最后给master发送一个ping
命令,来测试这个command
命令。 -
pub/sub
连接创建之后的内容和command一致,但最后sentinel会发送SUBSCRIBE __sentinel__:hello
命令来订阅这个频道,并注册sentinelReceiveHelloMessages
函数处理该频道广播回的消息,主要用于发现网络之中其他监听该master
的sentinels
。
问题:为什么需要创建两个连接,而不用同一个连接呢?
答: 有一种解释是为了防止command
连接断开时,丢失广播的消息。但个人认为理由有点牵强。
sentinel&slave:
-
发现从服务器
从sentinelHandleRedisInstance
的代码中
/* Perform scheduled operations for the specified Redis instance. */
void sentinelHandleRedisInstance(sentinelRedisInstance *ri) {
/* ========== MONITORING HALF ============ */
/* Every kind of instance */
sentinelReconnectInstance(ri);
sentinelSendPeriodicCommands(ri);
/* ============== ACTING HALF ============= */
/* We don't proceed with the acting half if we are in TILT mode.
* TILT happens when we find something odd with the time, like a
* sudden change in the clock. */
if (sentinel.tilt) {
if (mstime()-sentinel.tilt_start_time < SENTINEL_TILT_PERIOD) return;
sentinel.tilt = 0;
sentinelEvent(LL_WARNING,"-tilt",NULL,"#tilt mode exited");
}
/* Every kind of instance */
sentinelCheckSubjectivelyDown(ri);
/* Masters and slaves */
if (ri->flags & (SRI_MASTER|SRI_SLAVE)) {
/* Nothing so far. */
}
/* Only masters */
if (ri->flags & SRI_MASTER) {
sentinelCheckObjectivelyDown(ri);
if (sentinelStartFailoverIfNeeded(ri))
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_ASK_FORCED);
sentinelFailoverStateMachine(ri);
sentinelAskMasterStateToOtherSentinels(ri,SENTINEL_NO_FLAGS);
}
}
发现与主服务器创建完连接之后,就会运行一个周期函数sentinelSendPeriodicCommands
代码如下:
/* Send periodic PING, INFO, and PUBLISH to the Hello channel to
* the specified master or slave instance. */
void sentinelSendPeriodicCommands(sentinelRedisInstance *ri) {
mstime_t now = mstime();
mstime_t info_period, ping_period;
int retval;
/* Return ASAP if we have already a PING or INFO already pending, or
* in the case the instance is not properly connected. */
if (ri->link->disconnected) return;
/* For INFO, PING, PUBLISH that are not critical commands to send we
* also have a limit of SENTINEL_MAX_PENDING_COMMANDS. We don't
* want to use a lot of memory just because a link is not working
* properly (note that anyway there is a redundant protection about this,
* that is, the link will be disconnected and reconnected if a long
* timeout condition is detected. */
if (ri->link->pending_commands >=
SENTINEL_MAX_PENDING_COMMANDS * ri->link->refcount) return;
/* If this is a slave of a master in O_DOWN condition we start sending
* it INFO every second, instead of the usual SENTINEL_INFO_PERIOD
* period. In this state we want to closely monitor slaves in case they
* are turned into masters by another Sentinel, or by the sysadmin.
*
* Similarly we monitor the INFO output more often if the slave reports
* to be disconnected from the master, so that we can have a fresh
* disconnection time figure. */
if ((ri->flags & SRI_SLAVE) &&
((ri->master->flags & (SRI_O_DOWN|SRI_FAILOVER_IN_PROGRESS)) ||
(ri->master_link_down_time != 0)))
{
info_period = 1000;
} else {
info_period = SENTINEL_INFO_PERIOD;
}
/* We ping instances every time the last received pong is older than
* the configured 'down-after-milliseconds' time, but every second
* anyway if 'down-after-milliseconds' is greater than 1 second. */
ping_period = ri->down_after_period;
if (ping_period > SENTINEL_PING_PERIOD) ping_period = SENTINEL_PING_PERIOD;
/* Send INFO to masters and slaves, not sentinels. */
if ((ri->flags & SRI_SENTINEL) == 0 &&
(ri->info_refresh == 0 ||
(now - ri->info_refresh) > info_period))
{
retval = redisAsyncCommand(ri->link->cc,
sentinelInfoReplyCallback, ri, "INFO");
if (retval == C_OK) ri->link->pending_commands++;
}
/* Send PING to all the three kinds of instances. */
if ((now - ri->link->last_pong_time) > ping_period &&
(now - ri->link->last_ping_time) > ping_period/2) {
sentinelSendPing(ri);
}
/* PUBLISH hello messages to all the three kinds of instances. */
if ((now - ri->last_pub_time) > SENTINEL_PUBLISH_PERIOD) {
sentinelSendHello(ri);
}
}
在这段周期代码中,sentinel
平时状态下,每10s就会发送一个info
命令,默认每1s发送ping
命令且down-after-milliseconds
参数可配,每2s广播hello msg
。先看info
命令的返回:
127.0.0.1:6379> info
# Server
redis_version:4.0.10
redis_git_sha1:00000000
redis_git_dirty:0
redis_build_id:564e829c2a2c36f6
redis_mode:standalone
os:Linux 4.4.0-17134-Microsoft x86_64
arch_bits:64
multiplexing_api:epoll
atomicvar_api:atomic-builtin
gcc_version:5.4.0
process_id:27
run_id:f137124d98e21709eaa1def3b192c152a2500750
tcp_port:6379
uptime_in_seconds:339
uptime_in_days:0
hz:10
lru_clock:4977260
executable:/home/jane-zhang/redis-server
config_file:/svr/redis_config/redis_6379.conf
# Clients
connected_clients:5
client_longest_output_list:0
client_biggest_input_buf:0
blocked_clients:0
# Memory
used_memory:2022832
used_memory_human:1.93M
used_memory_rss:2912256
used_memory_rss_human:2.78M
used_memory_peak:2082832
used_memory_peak_human:1.99M
used_memory_peak_perc:97.12%
used_memory_overhead:1985938
used_memory_startup:786584
used_memory_dataset:36894
used_memory_dataset_perc:2.98%
total_system_memory:17048510464
total_system_memory_human:15.88G
used_memory_lua:37888
used_memory_lua_human:37.00K
maxmemory:0
maxmemory_human:0B
maxmemory_policy:noeviction
mem_fragmentation_ratio:1.44
mem_allocator:jemalloc-4.0.3
active_defrag_running:0
lazyfree_pending_objects:0
# Persistence
loading:0
rdb_changes_since_last_save:0
rdb_bgsave_in_progress:0
rdb_last_save_time:1531703643
rdb_last_bgsave_status:ok
rdb_last_bgsave_time_sec:0
rdb_current_bgsave_time_sec:-1
rdb_last_cow_size:0
aof_enabled:0
aof_rewrite_in_progress:0
aof_rewrite_scheduled:0
aof_last_rewrite_time_sec:-1
aof_current_rewrite_time_sec:-1
aof_last_bgrewrite_status:ok
aof_last_write_status:ok
aof_last_cow_size:0
# Stats
total_connections_received:9
total_commands_processed:1445
instantaneous_ops_per_sec:5
total_net_input_bytes:66681
total_net_output_bytes:328934
instantaneous_input_kbps:0.28
instantaneous_output_kbps:0.66
rejected_connections:0
sync_full:2
sync_partial_ok:0
sync_partial_err:2
expired_keys:0
expired_stale_perc:0.00
expired_time_cap_reached_count:0
evicted_keys:0
keyspace_hits:0
keyspace_misses:0
pubsub_channels:1
pubsub_patterns:0
latest_fork_usec:1527
migrate_cached_sockets:0
slave_expires_tracked_keys:0
active_defrag_hits:0
active_defrag_misses:0
active_defrag_key_hits:0
active_defrag_key_misses:0
# Replication
role:master
connected_slaves:2
slave0:ip=127.0.0.1,port=6381,state=online,offset=36201,lag=1
slave1:ip=127.0.0.1,port=6380,state=online,offset=36334,lag=0
master_replid:5d4684d94bde70a56746ea1c4c30cccd00df7f56
master_replid2:0000000000000000000000000000000000000000
master_repl_offset:36334
second_repl_offset:-1
repl_backlog_active:1
repl_backlog_size:1048576
repl_backlog_first_byte_offset:1
repl_backlog_histlen:36334
# CPU
used_cpu_sys:0.33
used_cpu_user:0.16
used_cpu_sys_children:0.03
used_cpu_user_children:0.00
# Cluster
cluster_enabled:0
# Keyspace
看到向master
发送的info
命令返回结果的# Replication section
里就有关于slave
的信息。info
的命令的回调链路sentinel.c/sentinelInfoReplyCallback->sentinel.c/sentinelRefreshInstanceInfo
如下:
/* Process the INFO output from masters. */
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
sds *lines;
int numlines, j;
int role = 0;
/* cache full INFO output for instance */
sdsfree(ri->info);
ri->info = sdsnew(info);
/* The following fields must be reset to a given value in the case they
* are not found at all in the INFO output. */
ri->master_link_down_time = 0;
/* Process line by line. */
lines = sdssplitlen(info,strlen(info),"\r\n",2,&numlines);
for (j = 0; j < numlines; j++) {
sentinelRedisInstance *slave;
sds l = lines[j];
/* run_id:<40 hex chars>*/
if (sdslen(l) >= 47 && !memcmp(l,"run_id:",7)) {
if (ri->runid == NULL) {
ri->runid = sdsnewlen(l+7,40);
} else {
if (strncmp(ri->runid,l+7,40) != 0) {
sentinelEvent(LL_NOTICE,"+reboot",ri,"%@");
sdsfree(ri->runid);
ri->runid = sdsnewlen(l+7,40);
}
}
}
/* old versions: slave0:<ip>,<port>,<state>
* new versions: slave0:ip=127.0.0.1,port=9999,... */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l,"slave",5) && isdigit(l[5]))
{
char *ip, *port, *end;
if (strstr(l,"ip=") == NULL) {
/* Old format. */
ip = strchr(l,':'); if (!ip) continue;
ip++; /* Now ip points to start of ip address. */
port = strchr(ip,','); if (!port) continue;
*port = '\0'; /* nul term for easy access. */
port++; /* Now port points to start of port number. */
end = strchr(port,','); if (!end) continue;
*end = '\0'; /* nul term for easy access. */
} else {
/* New format. */
ip = strstr(l,"ip="); if (!ip) continue;
ip += 3; /* Now ip points to start of ip address. */
port = strstr(l,"port="); if (!port) continue;
port += 5; /* Now port points to start of port number. */
/* Nul term both fields for easy access. */
end = strchr(ip,','); if (end) *end = '\0';
end = strchr(port,','); if (end) *end = '\0';
}
/* Check if we already have this slave into our table,
* otherwise add it. */
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}
/* master_link_down_since_seconds:<seconds> */
if (sdslen(l) >= 32 &&
!memcmp(l,"master_link_down_since_seconds",30))
{
ri->master_link_down_time = strtoll(l+31,NULL,10)*1000;
}
/* role:<role> */
if (!memcmp(l,"role:master",11)) role = SRI_MASTER;
else if (!memcmp(l,"role:slave",10)) role = SRI_SLAVE;
if (role == SRI_SLAVE) {
/* master_host:<host> */
if (sdslen(l) >= 12 && !memcmp(l,"master_host:",12)) {
if (ri->slave_master_host == NULL ||
strcasecmp(l+12,ri->slave_master_host))
{
sdsfree(ri->slave_master_host);
ri->slave_master_host = sdsnew(l+12);
ri->slave_conf_change_time = mstime();
}
}
/* master_port:<port> */
if (sdslen(l) >= 12 && !memcmp(l,"master_port:",12)) {
int slave_master_port = atoi(l+12);
if (ri->slave_master_port != slave_master_port) {
ri->slave_master_port = slave_master_port;
ri->slave_conf_change_time = mstime();
}
}
/* master_link_status:<status> */
if (sdslen(l) >= 19 && !memcmp(l,"master_link_status:",19)) {
ri->slave_master_link_status =
(strcasecmp(l+19,"up") == 0) ?
SENTINEL_MASTER_LINK_STATUS_UP :
SENTINEL_MASTER_LINK_STATUS_DOWN;
}
/* slave_priority:<priority> */
if (sdslen(l) >= 15 && !memcmp(l,"slave_priority:",15))
ri->slave_priority = atoi(l+15);
/* slave_repl_offset:<offset> */
if (sdslen(l) >= 18 && !memcmp(l,"slave_repl_offset:",18))
ri->slave_repl_offset = strtoull(l+18,NULL,10);
}
}
ri->info_refresh = mstime();
sdsfreesplitres(lines,numlines);
/* ---------------------------- Acting half -----------------------------
* Some things will not happen if sentinel.tilt is true, but some will
* still be processed. */
/* Remember when the role changed. */
if (role != ri->role_reported) {
ri->role_reported_time = mstime();
ri->role_reported = role;
if (role == SRI_SLAVE) ri->slave_conf_change_time = mstime();
/* Log the event with +role-change if the new role is coherent or
* with -role-change if there is a mismatch with the current config. */
sentinelEvent(LL_VERBOSE,
((ri->flags & (SRI_MASTER|SRI_SLAVE)) == role) ?
"+role-change" : "-role-change",
ri, "%@ new reported role is %s",
role == SRI_MASTER ? "master" : "slave",
ri->flags & SRI_MASTER ? "master" : "slave");
}
/* None of the following conditions are processed when in tilt mode, so
* return asap. */
if (sentinel.tilt) return;
/* Handle master -> slave role switch. */
if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
/* Nothing to do, but masters claiming to be slaves are
* considered to be unreachable by Sentinel, so eventually
* a failover will be triggered. */
}
/* Handle slave -> master role switch. */
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
/* If this is a promoted slave we can change state to the
* failover state machine. */
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
/* Now that we are sure the slave was reconfigured as a master
* set the master configuration epoch to the epoch we won the
* election to perform this failover. This will force the other
* Sentinels to update their config (assuming there is not
* a newer one already available). */
ri->master->config_epoch = ri->master->failover_epoch;
ri->master->failover_state = SENTINEL_FAILOVER_STATE_RECONF_SLAVES;
ri->master->failover_state_change_time = mstime();
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+promoted-slave",ri,"%@");
if (sentinel.simfailure_flags &
SENTINEL_SIMFAILURE_CRASH_AFTER_PROMOTION)
sentinelSimFailureCrash();
sentinelEvent(LL_WARNING,"+failover-state-reconf-slaves",
ri->master,"%@");
sentinelCallClientReconfScript(ri->master,SENTINEL_LEADER,
"start",ri->master->addr,ri->addr);
sentinelForceHelloUpdateForMaster(ri->master);
} else {
/* A slave turned into a master. We want to force our view and
* reconfigure as slave. Wait some time after the change before
* going forward, to receive new configs if any. */
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->role_reported_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}
/* Handle slaves replicating to a different master address. */
if ((ri->flags & SRI_SLAVE) &&
role == SRI_SLAVE &&
(ri->slave_master_port != ri->master->addr->port ||
strcasecmp(ri->slave_master_host,ri->master->addr->ip)))
{
mstime_t wait_time = ri->master->failover_timeout;
/* Make sure the master is sane before reconfiguring this instance
* into a slave. */
if (sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->slave_conf_change_time > wait_time)
{
int retval = sentinelSendSlaveOf(ri,
ri->master->addr->ip,
ri->master->addr->port);
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+fix-slave-config",ri,"%@");
}
}
/* Detect if the slave that is in the process of being reconfigured
* changed state. */
if ((ri->flags & SRI_SLAVE) && role == SRI_SLAVE &&
(ri->flags & (SRI_RECONF_SENT|SRI_RECONF_INPROG)))
{
/* SRI_RECONF_SENT -> SRI_RECONF_INPROG. */
if ((ri->flags & SRI_RECONF_SENT) &&
ri->slave_master_host &&
strcmp(ri->slave_master_host,
ri->master->promoted_slave->addr->ip) == 0 &&
ri->slave_master_port == ri->master->promoted_slave->addr->port)
{
ri->flags &= ~SRI_RECONF_SENT;
ri->flags |= SRI_RECONF_INPROG;
sentinelEvent(LL_NOTICE,"+slave-reconf-inprog",ri,"%@");
}
/* SRI_RECONF_INPROG -> SRI_RECONF_DONE */
if ((ri->flags & SRI_RECONF_INPROG) &&
c == SENTINEL_MASTER_LINK_STATUS_UP)
{
ri->flags &= ~SRI_RECONF_INPROG;
ri->flags |= SRI_RECONF_DONE;
sentinelEvent(LL_NOTICE,"+slave-reconf-done",ri,"%@");
}
}
}
由于info
命令返回结果内容繁多、新旧版本格式兼容、以及tilt
模式和故障转移时master与slave
角色对换的处理过程复杂,所以该方法也巨长,这里先只关注与slave
信息获取有关的主要内容。
...
/* old versions: slave0:<ip>,<port>,<state>
* new versions: slave0:ip=127.0.0.1,port=9999,... */
if ((ri->flags & SRI_MASTER) &&
sdslen(l) >= 7 &&
!memcmp(l,"slave",5) && isdigit(l[5]))
{
char *ip, *port, *end;
if (strstr(l,"ip=") == NULL) {
/* Old format. */
ip = strchr(l,':'); if (!ip) continue;
ip++; /* Now ip points to start of ip address. */
port = strchr(ip,','); if (!port) continue;
*port = '\0'; /* nul term for easy access. */
port++; /* Now port points to start of port number. */
end = strchr(port,','); if (!end) continue;
*end = '\0'; /* nul term for easy access. */
} else {
/* New format. */
ip = strstr(l,"ip="); if (!ip) continue;
ip += 3; /* Now ip points to start of ip address. */
port = strstr(l,"port="); if (!port) continue;
port += 5; /* Now port points to start of port number. */
/* Nul term both fields for easy access. */
end = strchr(ip,','); if (end) *end = '\0';
end = strchr(port,','); if (end) *end = '\0';
}
/* Check if we already have this slave into our table,
* otherwise add it. */
if (sentinelRedisInstanceLookupSlave(ri,ip,atoi(port)) == NULL) {
if ((slave = createSentinelRedisInstance(NULL,SRI_SLAVE,ip,
atoi(port), ri->quorum, ri)) != NULL)
{
sentinelEvent(LL_NOTICE,"+slave",slave,"%@");
sentinelFlushConfig();
}
}
}
...
在解析完# Replication
中的slave0:ip=127.0.0.1,port=6381,state=online,offset=36201,lag=1
后,检查并查找该slave
信息是否已经存在表中,没有就创建一个SentinelRedisInstance
结构存储信息并添加进slave dict
中,最后slave
的信息保存至配置文件。
2.创建连接
在第一次对master
初始化完获得slave
的信息之后。在下一个周期,通过sentinelHandleDictOfRedisInstances
方法的递归便可以用和主服务器建立连接同样的方法建立command
和pub/sub
两个连接,并用ping
命令来监测心跳,info
命令来更新slaves
的信息。
/* Perform scheduled operations for all the instances in the dictionary.
* Recursively call the function against dictionaries of slaves. */
void sentinelHandleDictOfRedisInstances(dict *instances) {
dictIterator *di;
dictEntry *de;
sentinelRedisInstance *switch_to_promoted = NULL;
/* There are a number of things we need to perform against every master. */
di = dictGetIterator(instances);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *ri = dictGetVal(de);
sentinelHandleRedisInstance(ri);
if (ri->flags & SRI_MASTER) {
sentinelHandleDictOfRedisInstances(ri->slaves);
sentinelHandleDictOfRedisInstances(ri->sentinels);
if (ri->failover_state == SENTINEL_FAILOVER_STATE_UPDATE_CONFIG) {
switch_to_promoted = ri;
}
}
}
if (switch_to_promoted)
sentinelFailoverSwitchToPromotedSlave(switch_to_promoted);
dictReleaseIterator(di);
}
sentinel&sentinel:
-
发现其他sentinel
在与master
和slave
的连接中会有一条pub/sub
的连接,都订阅了相同master
的__sentinel__:hello
频道,在上面的周期方法也看到每隔2秒钟sentinel
便会向master
的频道广播hello
消息。那也就意味着,如果有两个sentinel
同时监听同一个master
时,这两个sentinel
会收到互相广播的信息,而这个信息的内容就可以用来传播自身的信息,从而让其知道对方的存在。这个消息的实际处理方法如下:
/* Send an "Hello" message via Pub/Sub to the specified 'ri' Redis
* instance in order to broadcast the current configuraiton for this
* master, and to advertise the existence of this Sentinel at the same time.
*
* The message has the following format:
*
* sentinel_ip,sentinel_port,sentinel_runid,current_epoch,
* master_name,master_ip,master_port,master_config_epoch.
*
* Returns C_OK if the PUBLISH was queued correctly, otherwise
* C_ERR is returned. */
int sentinelSendHello(sentinelRedisInstance *ri) {
char ip[NET_IP_STR_LEN];
char payload[NET_IP_STR_LEN+1024];
int retval;
char *announce_ip;
int announce_port;
sentinelRedisInstance *master = (ri->flags & SRI_MASTER) ? ri : ri->master;
sentinelAddr *master_addr = sentinelGetCurrentMasterAddress(master);
if (ri->link->disconnected) return C_ERR;
/* Use the specified announce address if specified, otherwise try to
* obtain our own IP address. */
if (sentinel.announce_ip) {
announce_ip = sentinel.announce_ip;
} else {
if (anetSockName(ri->link->cc->c.fd,ip,sizeof(ip),NULL) == -1)
return C_ERR;
announce_ip = ip;
}
announce_port = sentinel.announce_port ?
sentinel.announce_port : server.port;
/* Format and send the Hello message. */
snprintf(payload,sizeof(payload),
"%s,%d,%s,%llu," /* Info about this sentinel. */
"%s,%s,%d,%llu", /* Info about current master. */
announce_ip, announce_port, sentinel.myid,
(unsigned long long) sentinel.current_epoch,
/* --- */
master->name,master_addr->ip,master_addr->port,
(unsigned long long) master->config_epoch);
retval = redisAsyncCommand(ri->link->cc,
sentinelPublishReplyCallback, ri, "PUBLISH %s %s",
SENTINEL_HELLO_CHANNEL,payload);
if (retval != C_OK) return C_ERR;
ri->link->pending_commands++;
return C_OK;
}
广播的消息内容格式为sentinel_ip,sentinel_port,sentinel_runid,current_epoch,master_name,master_ip,master_port,master_config_epoch.
这条消息被广播给所有订阅这个频道的节点,包括发送者本身也会收到。那么对于收到这条广播信息的sentinel
节点会怎么处理呢?上面解说sentinel
在与主服务器建立pub/sub
连接时,就注册回调方法sentinelReceiveHelloMessages->sentinelProcessHelloMessage
/* Process an hello message received via Pub/Sub in master or slave instance,
* or sent directly to this sentinel via the (fake) PUBLISH command of Sentinel.
*
* If the master name specified in the message is not known, the message is
* discarded. */
void sentinelProcessHelloMessage(char *hello, int hello_len) {
/* Format is composed of 8 tokens:
* 0=ip,1=port,2=runid,3=current_epoch,4=master_name,
* 5=master_ip,6=master_port,7=master_config_epoch. */
int numtokens, port, removed, master_port;
uint64_t current_epoch, master_config_epoch;
char **token = sdssplitlen(hello, hello_len, ",", 1, &numtokens);
sentinelRedisInstance *si, *master;
if (numtokens == 8) {
/* Obtain a reference to the master this hello message is about */
master = sentinelGetMasterByName(token[4]);
if (!master) goto cleanup; /* Unknown master, skip the message. */
/* First, try to see if we already have this sentinel. */
port = atoi(token[1]);
master_port = atoi(token[6]);
si = getSentinelRedisInstanceByAddrAndRunID(
master->sentinels,token[0],port,token[2]);
current_epoch = strtoull(token[3],NULL,10);
master_config_epoch = strtoull(token[7],NULL,10);
if (!si) {
/* If not, remove all the sentinels that have the same runid
* because there was an address change, and add the same Sentinel
* with the new address back. */
removed = removeMatchingSentinelFromMaster(master,token[2]);
if (removed) {
sentinelEvent(LL_NOTICE,"+sentinel-address-switch",master,
"%@ ip %s port %d for %s", token[0],port,token[2]);
} else {
/* Check if there is another Sentinel with the same address this
* new one is reporting. What we do if this happens is to set its
* port to 0, to signal the address is invalid. We'll update it
* later if we get an HELLO message. */
sentinelRedisInstance *other =
getSentinelRedisInstanceByAddrAndRunID(
master->sentinels, token[0],port,NULL);
if (other) {
sentinelEvent(LL_NOTICE,"+sentinel-invalid-addr",other,"%@");
other->addr->port = 0; /* It means: invalid address. */
sentinelUpdateSentinelAddressInAllMasters(other);
}
}
/* Add the new sentinel. */
si = createSentinelRedisInstance(token[2],SRI_SENTINEL,
token[0],port,master->quorum,master);
if (si) {
if (!removed) sentinelEvent(LL_NOTICE,"+sentinel",si,"%@");
/* The runid is NULL after a new instance creation and
* for Sentinels we don't have a later chance to fill it,
* so do it now. */
si->runid = sdsnew(token[2]);
sentinelTryConnectionSharing(si);
if (removed) sentinelUpdateSentinelAddressInAllMasters(si);
sentinelFlushConfig();
}
}
/* Update local current_epoch if received current_epoch is greater.*/
if (current_epoch > sentinel.current_epoch) {
sentinel.current_epoch = current_epoch;
sentinelFlushConfig();
sentinelEvent(LL_WARNING,"+new-epoch",master,"%llu",
(unsigned long long) sentinel.current_epoch);
}
/* Update master info if received configuration is newer. */
if (si && master->config_epoch < master_config_epoch) {
master->config_epoch = master_config_epoch;
if (master_port != master->addr->port ||
strcmp(master->addr->ip, token[5]))
{
sentinelAddr *old_addr;
sentinelEvent(LL_WARNING,"+config-update-from",si,"%@");
sentinelEvent(LL_WARNING,"+switch-master",
master,"%s %s %d %s %d",
master->name,
master->addr->ip, master->addr->port,
token[5], master_port);
old_addr = dupSentinelAddr(master->addr);
sentinelResetMasterAndChangeAddress(master, token[5], master_port);
sentinelCallClientReconfScript(master,
SENTINEL_OBSERVER,"start",
old_addr,master->addr);
releaseSentinelAddr(old_addr);
}
}
/* Update the state of the Sentinel. */
if (si) si->last_hello_time = mstime();
}
cleanup:
sdsfreesplitres(token,numtokens);
}
在获得publish消息后,
- 将消息按照“,”号分割开来。
- 查找并获取
master
,如果master
不在监听列表便跳过该消息,反之下一步。 - 检查自己是否已经记录该
sentinel
节点的记录,根据runId
和ip
查找。如果就没有就先移除掉原来有相同runId
的SentinelRedisInstance
,因为可能是节点的地址变了,需要添加节点以新地址。反之下一步。 - 检查有没有ip和port一样的sentinel正在运行,它的端口至为0,设为无效。等待下一轮hello消息的更新。
- 根据接收到的sentinel信息创建一个新的
SentinelRedisInstance
结构,并填充runId
。这个执行有一个比较有意思的优化点就是sentinelTryConnectionSharing
方法。 - 将
sentinel
节点信息保存至配置文件。 - 当其他节点的当前纪元大于自己的纪元时,修改统一并保存至配置文件。
- 当节点保存的
master
信息的配置纪元小于其他节点时,更新master的配置纪元和其ip和port
。所以切记不同sentinel
监听同一个master
时配置的名字不能不一致。 - 最后更新上次
hello
的时间。
通过处理hello msg
,就解决了其他sentinel
节点的发现。和节点之间master配置和纪元的同步一致性问题,所有的纪元统一使用最新的。
- 建立连接
- 与slave的连接建立一样,
sentinel
与sentinel
互相的连接建立也是在周期方法中递归调用创建的,值得一提的是sentinel
互相之间只有一个命令连接而没有pub/sub
连接代码见上面的sentinelReconnectInstance
方法。 - 在建立
sentinel
其中还有一个优化点sentinelTryConnectionSharing
方法,在上面代码中也有提到,这里提出来分析一下解释一下什么叫连接共享,该方法的代码如下:
/* This function will attempt to share the instance link we already have
* for the same Sentinel in the context of a different master, with the
* instance we are passing as argument.
*
* This way multiple Sentinel objects that refer all to the same physical
* Sentinel instance but in the context of different masters will use
* a single connection, will send a single PING per second for failure
* detection and so forth.
*
* Return C_OK if a matching Sentinel was found in the context of a
* different master and sharing was performed. Otherwise C_ERR
* is returned. */
int sentinelTryConnectionSharing(sentinelRedisInstance *ri) {
serverAssert(ri->flags & SRI_SENTINEL);
dictIterator *di;
dictEntry *de;
if (ri->runid == NULL) return C_ERR; /* No way to identify it. */
if (ri->link->refcount > 1) return C_ERR; /* Already shared. */
di = dictGetIterator(sentinel.masters);
while((de = dictNext(di)) != NULL) {
sentinelRedisInstance *master = dictGetVal(de), *match;
/* We want to share with the same physical Sentinel referenced
* in other masters, so skip our master. */
if (master == ri->master) continue;
match = getSentinelRedisInstanceByAddrAndRunID(master->sentinels,
NULL,0,ri->runid);
if (match == NULL) continue; /* No match. */
if (match == ri) continue; /* Should never happen but... safer. */
/* We identified a matching Sentinel, great! Let's free our link
* and use the one of the matching Sentinel. */
releaseInstanceLink(ri->link,NULL);
ri->link = match->link;
match->link->refcount++;
return C_OK;
}
dictReleaseIterator(di);
return C_ERR;
}
在解说这个方法之前先需要给出一个比较重要的数据结sentinelLink
/* The link to a sentinelRedisInstance. When we have the same set of Sentinels
* monitoring many masters, we have different instances representing the
* same Sentinels, one per master, and we need to share the hiredis connections
* among them. Oherwise if 5 Sentinels are monitoring 100 masters we create
* 500 outgoing connections instead of 5.
* So this structure represents a reference counted link in terms of the two
* hiredis connections for commands and Pub/Sub, and the fields needed for
* failure detection, since the ping/pong time are now local to the link: if
* the link is available, the instance is avaialbe. This way we don't just
* have 5 connections instead of 500, we also send 5 pings instead of 500.
*
* Links are shared only for Sentinels: master and slave instances have
* a link with refcount = 1, always. */
typedef struct instanceLink {
int refcount; /* Number of sentinelRedisInstance owners. */
int disconnected; /* Non-zero if we need to reconnect cc or pc. */
int pending_commands; /* Number of commands sent waiting for a reply. */
redisAsyncContext *cc; /* Hiredis context for commands. */
redisAsyncContext *pc; /* Hiredis context for Pub / Sub. */
mstime_t cc_conn_time; /* cc connection time. */
mstime_t pc_conn_time; /* pc connection time. */
mstime_t pc_last_activity; /* Last time we received any message. */
mstime_t last_avail_time; /* Last time the instance replied to ping with
a reply we consider valid. */
mstime_t act_ping_time; /* Time at which the last pending ping (no pong
received after it) was sent. This field is
set to 0 when a pong is received, and set again
to the current time if the value is 0 and a new
ping is sent. */
mstime_t last_ping_time; /* Time at which we sent the last ping. This is
only used to avoid sending too many pings
during failure. Idle time is computed using
the act_ping_time field. */
mstime_t last_pong_time; /* Last time the instance replied to ping,
whatever the reply was. That's used to check
if the link is idle and must be reconnected. */
mstime_t last_reconn_time; /* Last reconnection attempt performed when
the link was down. */
} instanceLink;
正如这个方法和数据结构的注释所描述的一样,如果一个sentinel
集群,它们同时监听着同样的一批master
,如:除了自身还有其他5个sentinel
共同监听100个master
的话,按照通过master
查找sentinel
节点循环来创建的连接的方式,就可能与其他5个sentinel
建立500个连接,但实际上只要5个连接就可以了,但是sentinelReconnectInstance
结构体还是500个。因此在检测到有一样连接时(根据runId
判断),就会去共享该sentinel
连接,保留一个共享就可以了,这样就可以保证与其他5个sentinel
只建立5个连接,而不是持有500个连接,并且ping
的命令也只用发5个了。这个优化过程也是针对sentinel
的所以instanceLink
结构的连接共享也是只针对flags=SRI_sentinel
,其他的模式refcount
总是为1。
至此整个sentinel
的体系结构的网络构建就完成了。
小结
- 在
sentinel
体系中有三种角色sentinel
、master
、slave
。 -
sentinel
与master
的连接是通过配置文件来获取监听服务器的ip+port
,sentinel
通过注册周期性的时间事件来与master
创建command
与pub/sub
两个连接。 -
sentinel
与slave
的连接信息是通过向主服务器发送info
命令而获得,并通过周期函数递归来建立连接。同样创建两个连接。注意向slave
节点广播的内容是其指向的master
节点的ip、port
。 -
sentinel
与sentinel
的连接中节点的发现是通过订阅master
的__sentinel__:hello
频道来发现的。sentinel
会通过周期函数发布hello msg
。而订阅了该频道的其他sentinel
节点,就会收到消息而获得其他节点的信息,并通过周期方法递归建立连接。但是互相之间只创建一个命令连接。 - 周期函数中有每10s发送一次
info
命令(主、从),默认每1s发送ping
命令且可通过down-after-milliseconds
参数配置,默认1s,间隔时间最大不超过1s,每2s广播hello msg
。 -
sentinel
状态持久化,sentinel
会把其某些状态信息保存在配置文件中。 -
sentinel
节点之间的连接共享化,两者之间通过共享link
来保持只有一个连接。