引言
Redis 离用户最近的就是网络层了,网络层不负责命令的具体执行,只负责网络数据的收发,虽然它不负责具体的功能实现,却是Redis单线程,高性能的核心。
本文只点出一条关键的代码路径,如果想要彻底理解这一部分代码,需要一些Reactor模型,epoll相关的知识,请自行查阅其他资料。
后台运行Redis
这是main中配置解析结束后的第一行代码。
如果你设置了--daemonize
,那么Redis就会在后台启动redis.c:4027:
// 将服务器设置为守护进程
if (server.daemonize) daemonize();
服务器启动
C语言网络编程中常见的服务器启动流程就是socket->bind->listen->accept
,对于epoll这样的io多路复用,则是socket->bind->listen->set_non_block->register_fd_event->epoll_wait
,稍微解释一下这些流程节点的含义:
-
socket
:创建监听套接字 -
bind
:绑定端口 -
listen
:监听端口 -
set_non_block
:设置套接字为非阻塞 -
register_fd_event
:将监听套接字的"新连接到达"事件注册到epoll上 -
epoll_wait
:阻塞在IO多路复用上,等待事件到来
我们就按照这条路径来追踪服务器启动的代码。
服务器启动的socket->bind->listen->set_non_block->register_fd_event
在initServer方法的调用中完成,redis.c:2050:
void initServer() {
//...
// socket -> bind -> listen -> set_non_block
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == REDIS_ERR)
exit(1);
//...
// ->register_fd_event
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
redisPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
//...
}
因为redis.conf
中是允许监听多个ip地址,所以所有要监听的地址会在配置解析时被放到server结构体的bindaddr字段(字符串数组),如果bindaddr
为NULL
则表示绑定当前机器的全部地址:
struct redisServer {
//...
char *bindaddr[REDIS_BINDADDR_MAX]; /* Addresses we should bind to */
//...
}
在listenToPort
中会将所有完成了socket->bind->listen->set_non_block
的监听套接字放到bindaddr
数组中,之后在一个循环(redis.c:2160)中将事件全部注册到epoll
上。注意一下aeCreateFileEvent
的第四个参数acceptTcpHandler是一个回调函数,这个回调函数会在事件发生时被调用(即有新的客户端连接请求到达时被调用)。
Redis称这种事件为File Event(从方法名CreateFileEvent可以看出),因为在linux上一切都是文件,所以套接字本身也是文件,所以Redis中的File Event就是指套接字事件。
listenToPort会遍历bindaddr
数组,绑定数组中的所有ip地址+server.port
:
int listenToPort(int port, int *fds, int *count) {
int j;
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
//...
// set_non_block
anetNonBlock(NULL,fds[*count]);
//...
}
//..
}
for
循环里面的代码虽然很长,但是逻辑很简单,其实就是判断是NULL,IPv6地址还是IPv4地址,如果是NULL的话,就要绑定本机的全部地址,如果是IPv6地址,则调用anetTcp6Server获取一个IPv6套接字,如果是IPv4的话,则调用anetTcpServer获取一个IPv4套接字。获得套接字后立即就调用anetNonBlock将其设置为非阻塞的(即set_non_block
)。anetTcpServer
将会返回一个已经完成了bind
和listen
操作的套接字。anetTcpServer
函数其实就直接调用了一下_anetTcpServer,socket->bind->listen
都是在这个函数里完成的:
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
//...
for (p = servinfo; p != NULL; p = p->ai_next) {
// socket
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
// bind->listen
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) goto error;
goto end;
}
//...
}
到此准备工作就完成了,接下来的epoll_wait
阶段,服务器就已经正式启动了,翻到main
的最后几行,aeMain
就启动服务器主循环的函数,redis.c:4079:
aeMain(server.el);
在aeMain方法中可以看到明显的循环:
/*
* 事件处理器的主循环
*/
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件处理前执行的函数,那么运行它
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
// 开始处理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
进入aeProcessEvents
,里面代码很长,其中的aeApiPoll
函数调用会阻塞在IO多路复用上(即epoll_wait
),ae.c:560:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//...
numevents = aeApiPoll(eventLoop, tvp);
//...
}
aeApiPoll
调用完后,在eventLoop
的fired
字段上就本轮触发的所有事件。
文件事件
之前解释过Redis将所有的套接字事件都称为文件事件。
刚刚提到创建文件事件的函数是aeCreateFileEvent
函数,它会创建一个aeFileEvent结构体,并以fd
为下标,将其放置在eventLoop
的events
字段(是一个aeFileEvent
数组),ae.c:181:
// 取出文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
其实这里本质上就是构建一个fd
到aeFileEvent
的映射,之后在某个fd
的事件触发时,方面通过fd
将aeFileEvent
结构体取出来,然后调用里面的回调函数,从后面的代码中也可以看出aeFileEvent
中存储着回调函数,ae.c:189:
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
高级语言写得比较多的程序员可能会对fd为什么能够用来做下标感到疑惑,其实对于一个进程来说,fd是从3开始递增的,0,1,2分别代表stdin, stdout和stderr。所以虽然会损失三个元素的空间,但是对于性能却是能得到不少提升的。
当然,aeCreateFileEvent
最重要的还是将事件在epoll上注册,ae.c:184:
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
然后回调函数会在aeApiPoll
返回之后,检查eventLoop.fired
中的fd
和事件,通过fd
取出相应的aeFileEvent
结构体来调用相应的回调函数,[ae.c:561]:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
//...
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 从已就绪数组中获取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
//...
// 读事件
if (fe->mask & mask & AE_READABLE) {
// rfired 确保读/写事件只能执行其中一个
rfired = 1;
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
}
// 写事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
}
//...
}
}
定时事件
除了文件事件以外,Redis还有一类事件是定时事件,其实定时事件总共就只有一个,就是在initServer
时注册的serverCron
,redis.c:2151:
// 为 serverCron() 创建时间事件 新建了一个aeTimeEvent并插入了evetloop中的时间事件列表
// 1 表示在 1ms 后执行,这个 1ms 定义的只是 serverCron 的初始执行时间,而不是执行间隔
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
redisPanic("Can't create the serverCron time event.");
exit(1);
}
跳转到aeCreateTimeEvent,发现它做的事情就就是创建一个aeFileEvent
结构体并将其插入到eventLoop
的定时事件列表(即aeEventLoop
的timeEventHead字段)中。并没有任何epoll相关的操作,而且epoll其实也并不支持定时器的功能,那Redis是怎么实现定时任务的呢?其实秘密就在巧妙地设置epoll
的超时时间上,aeApiPoll
的第二个参数就是超时时间,在aeProcessEvents
里调用aeApiPoll
之前干的事情,就是计算距离现在最近的一次定时事件的时间,并以这个时间作为超时时间,ae.c:512:
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp; // 和最近一次定时事件的间隔
//...
}
numevents = aeApiPoll(eventLoop, tvp);
//...
}
serverCron
的执行频率是由 redisServer.hz
来决定的,默认值为 10,也就是每秒执行 10 次,即每隔 100 ms 执行一次。在前面代码分析中我们看到了创建 serverCron
定时事件的代码aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL)
给的定时时间似乎是 1ms,其实这是初始时间,真正的间隔时间是由时间执行函数的返回值决定的,看一看 serverCron
的返回值,你就能找到它真正的定时时间了,redis.c:1562:
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
//...
return 1000/server.hz;
}
创建新连接
其处理逻辑就是当时在监听套接字上注册的回调函数acceptTcpHandler。
每当有新的客户端连接到来时,都会分发相关事件而触发该函数。
该函数会先accept这条连接,得到新连接的fd
,创建一个redisClient
结构体代表和这条连接相关的状态,最后注册该套接字的File Event:
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;//1000
while(max--) { // accept 多次防止同时过来多条连接
// accept 客户端连接
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
redisLog(REDIS_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
redisLog(REDIS_VERBOSE,"Accepted %s:%d", cip, cport);
// 为客户端创建客户端状态(redisClient)
acceptCommonHandler(cfd,0);
}
}
这里需要accept这么多次,是因为即使同时有多条连接过来,监听套接字注册的事件也只会激活一次,所以要多次accept,直到出现EWOULDBLOCK,防止有连接漏掉。
acceptCommonHandler中最重要的调用createClient
方法,networking.c:752:
static void acceptCommonHandler(int fd, int flags) {
// 创建客户端
redisClient *c;
if ((c = createClient(fd)) == NULL) {
//...
}
//...
createClient中先将fd设置为非阻塞,之后就在eventLoop注册了该套接字的File Event,注意这里注册的回调函数readQueryFromClient
,它就是之后从客户端读取数据的函数,最后将其加入了redisServer的clients列表中:
redisClient *createClient(int fd) {
if (fd != -1) {
// 设置为非阻塞
anetNonBlock(NULL,fd);
// 禁用 Nagle 算法, 降低延迟
anetEnableTcpNoDelay(NULL,fd);
// 设置 keep alive
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
// 注册该套接字的的File event
if (aeCreateFileEvent(server.el,fd,AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
//...
}
}
//...
//加入RedisServer的客户端列表中
if (fd != -1) listAddNodeTail(server.clients,c);
//...
}
读取客户端数据
上一节提到,这里其实就是readQueryFromClient函数。
读过上面的内容,大概也能猜出这个函数在做什么事情了,首先将传输来的数据读到代表该条连接的redisClient
的缓存中querybuf
字段中(每次最多读16kB),然后从中解析出命令名称,通过命令名称从之前的server.commands
字典中取出redisCommand
,然后执行里面的函数。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
redisClient *c = (redisClient*) privdata;
//...
// 读入内容到缓存
nread = read(fd, c->querybuf+qblen, readlen);
//...
// 处理命令
processInputBuffer(c);
//..
}
processInputBuffer中会尽可能地读取querybuf
字段,并将它解析成字符串数组放到redisClient
的argv
数组中:
void processInputBuffer(redisClient *c) {
while(sdslen(c->querybuf)) { // 尽可能读取querybuf
// 将缓冲区中的内容转换成命令,以及命令参数 放到c->argv属性中
// 因为通过telnet和通过客户端连接,命令格式不同,所以这里需要两种解析
if (c->reqtype == REDIS_REQ_INLINE) {
if (processInlineBuffer(c) != REDIS_OK) break;
} else if (c->reqtype == REDIS_REQ_MULTIBULK) {
if (processMultibulkBuffer(c) != REDIS_OK) break;
}
if (c->argc == 0) {
//...
} else {
// 执行命令,并重置客户端
if (processCommand(c) == REDIS_OK)
resetClient(c);
}
}
}
processCommand会通过命令名称(c->argv[0])取到redisCommand
结构体并执行:
int processCommand(redisClient *c) {
//...
// lookupCommand其实就是去server.commands字典中去找command L2550
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
//...
if {
//...
} else {
// 执行命令 L2766
call(c,REDIS_CALL_FULL);
}
}
向客户端返回数据
在后面实现各个Redis命令的时候,如果需要向客户端写数据,一般都是调用的addReply方法,addReply
先在创建了一个写文件事件,然后将要发送的数据写入到该条连接(其实就是redisClient
结构体)的写缓存中:
void addReply(redisClient *c, robj *obj) {
// 创建写文件事件
if (prepareClientToWrite(c) != REDIS_OK) return;
// 将要发送的数据写入写缓存中
if (sdsEncodedObject(obj)) {
//...
} else if (obj->encoding == REDIS_ENCODING_INT) {
//...
}
}
进入prepareClientToWrite方法,你将看到熟悉的aeCreateFileEvent
,创建了一个写文件事件,并且回调方法是sendReplyToClient
,由这个回调方法负责最终将数据发送给客户端:
int prepareClientToWrite(redisClient *c) {
//...
// 创建写文件事件
if (c->bufpos == 0 && listLength(c->reply) == 0 &&
(c->replstate == REDIS_REPL_NONE ||
c->replstate == REDIS_REPL_ONLINE) &&
aeCreateFileEvent(server.el, c->fd, AE_WRITABLE,
sendReplyToClient, c) == AE_ERR) return REDIS_ERR;
return REDIS_OK;
}
对于redisClient
,它有两个字段是用来存放写缓冲的,一个buf(就是一个16KB的缓冲),还有一个reply链表,当buf
满的时候,会先将回复内容链接在reply
后面。
End
作者:元青
微信公众号 「技乐书香」