redis6.0的multi threading设计

先上图,给个整体设计:


image.png

画外音:以下内容凌乱,仅是让自己看懂而已。

引入多线程IO后的两个问题:

  1. 原子性;
  2. 顺序性;

主线程

  • 调用initServer干初始化工作

initServer时,调用aeCreateFileEvent创建accept事件,当该server fd可读时(即client连接时),会调用到acceptTcpHandler函数,即创建client对象,并且每一个client的连接会创建一个aeFileEvent,并且设置该连接(或者说事件)的rfileProc为readQueryFromClient,readQueryFromClient处理该连接发送过来的所有数据。

void initServer(void) {

    ...  // 代码省略

    /* Create an event handler for accepting new connections in TCP and Unix

     * domain sockets. */

    for (j = 0; j < server.ipfd_count; j++) {

        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,

            acceptTcpHandler,NULL) == AE_ERR)

            {

                serverPanic(

                    "Unrecoverable error creating server.ipfd file event.");

            }

    }

    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,

        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");

    ... // 代码省略

}

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {

    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;

    char cip[NET_IP_STR_LEN];

    UNUSED(el);

    UNUSED(mask);

    UNUSED(privdata);

    while(max--) {

        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);

        if (cfd == ANET_ERR) {

            if (errno != EWOULDBLOCK)

                serverLog(LL_WARNING,

                    "Accepting client connection: %s", server.neterr);

            return;

        }

        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);

        acceptCommonHandler(cfd,0,cip);

    }

}

static void acceptCommonHandler(int fd, int flags, char *ip) {

    client *c;

    if ((c = createClient(fd)) == NULL) {

        serverLog(LL_WARNING,

            "Error registering fd event for the new client: %s (fd=%d)",

            strerror(errno),fd);

        close(fd); /* May be already closed, just ignore errors */

        return;

    }

    ... // 代码省略

}

client *createClient(int fd) {

    client *c = zmalloc(sizeof(client));

    /* passing -1 as fd it is possible to create a non connected client.

     * This is useful since all the commands needs to be executed

     * in the context of a client. When commands are executed in other

     * contexts (for instance a Lua script) we need a non connected client. */

    if (fd != -1) {

        anetNonBlock(NULL,fd);

        anetEnableTcpNoDelay(NULL,fd);

        if (server.tcpkeepalive)

            anetKeepAlive(NULL,fd,server.tcpkeepalive);

        if (aeCreateFileEvent(server.el,fd,AE_READABLE,

            readQueryFromClient, c) == AE_ERR)

        {

            close(fd);

            zfree(c);

            return NULL;

        }

    }

    ... // 代码省略

}

这里有两个非常重要的概念:

  • event,即事件,这些事件分成了三类:

    • aeFileEvent :File event

    • aeTimeEvent :Time event

    • aeFiredEvent :fired event

他们的定义如下,比如file event即为网络事件,封装了read和write的回调函数,mask即为可读或者可写事件(AE_READABLE, AE_WRITABLE)。当我们调用aeCreateFileEvent时会把该event添加到全局的eventloop里,哦,那eventloop是什么东西,以及如何管理file event的呢?

/* File event structure */

typedef struct aeFileEvent {

    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */

    aeFileProc *rfileProc;

    aeFileProc *wfileProc;

    void *clientData;

} aeFileEvent;

/* Time event structure */

typedef struct aeTimeEvent {

    long long id; /* time event identifier. */

    long when_sec; /* seconds */

    long when_ms; /* milliseconds */

    aeTimeProc *timeProc;

    aeEventFinalizerProc *finalizerProc;

    void *clientData;

    struct aeTimeEvent *prev;

    struct aeTimeEvent *next;

} aeTimeEvent;

/* A fired event */

typedef struct aeFiredEvent {

    int fd;

    int mask;

} aeFiredEvent;

*   aeEventLoop

/* State of an event based program */

typedef struct aeEventLoop {

    int maxfd;   /* highest file descriptor currently registered */

    int setsize; /* max number of file descriptors tracked */

    long long timeEventNextId;

    time_t lastTime;     /* Used to detect system clock skew */

    aeFileEvent *events; /* Registered events */

    aeFiredEvent *fired; /* Fired events */

    aeTimeEvent *timeEventHead;

    int stop;

    void *apidata; /* This is used for polling API specific data */

    aeBeforeSleepProc *beforesleep;

    aeBeforeSleepProc *aftersleep;

} aeEventLoop;

eventloop就是事件循环,上文中的事件都是保存在这个结构里,多路复用,如select和epoll的私有数据就是放在apidata里。还有两个重要的回调函数 beforesleep和aftersleep,这两个函数的作用等下再说。

  • 调用aeMain函数开始一直努力干活了,
int main(int argc, char **argv) {

    ...

    initServer();

    ...

    aeSetBeforeSleepProc(server.el,beforeSleep);

    aeSetAfterSleepProc(server.el,afterSleep);

    aeMain(server.el);

    aeDeleteEventLoop(server.el);

    return 0;

}

void aeMain(aeEventLoop *eventLoop) {

    eventLoop->stop = 0;

    while (!eventLoop->stop) {

        if (eventLoop->beforesleep != NULL)

            eventLoop->beforesleep(eventLoop);

        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);

    }

}

int aeProcessEvents(aeEventLoop *eventLoop, int flags)

{

    int processed = 0, numevents;

        /* Call the multiplexing API, will return only on timeout or when

         * some event fires. */

        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */

        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)

            eventLoop->aftersleep(eventLoop);

        for (j = 0; j < numevents; j++) {

            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];

            int mask = eventLoop->fired[j].mask;

            int fd = eventLoop->fired[j].fd;

            int fired = 0; /* Number of events fired for current fd. */

            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {

                fe->rfileProc(eventLoop,fd,fe->clientData,mask);

                fired++;

            }

            /* Fire the writable event. */

            if (fe->mask & mask & AE_WRITABLE) {

                if (!fired || fe->wfileProc != fe->rfileProc) {

                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);

                    fired++;

                }

            }

            /* If we have to invert the call, fire the readable event now

             * after the writable one. */

            if (invert && fe->mask & mask & AE_READABLE) {

                if (!fired || fe->wfileProc != fe->rfileProc) {

                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);

                    fired++;

                }

            }

            processed++;

        }

    }

    ...

    return processed; /* return the number of processed file/time events */

}

对于client的请求来讲,rfileProc就是readQueryFromClient,那readQueryFromClient在multi thread IO的配置下,是如何工作的呢?

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {

    /* Check if we want to read from the client later when exiting from

     * the event loop. This is the case if threaded I/O is enabled. */

    if (postponeClientRead(c)) return;

    ... // 代码略

}

/* Return 1 if we want to handle the client read later using threaded I/O.

* This is called by the readable handler of the event loop.

* As a side effect of calling this function the client is put in the

* pending read clients and flagged as such. */

int postponeClientRead(client *c) {

    if (io_threads_active &&

        server.io_threads_do_reads &&

        !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))

    {

        c->flags |= CLIENT_PENDING_READ;

        listAddNodeHead(server.clients_pending_read,c);

        return 1;

    } else {

        return 0;

    }

}

也就是说在多线程IO的情况下,主线程仅仅是把每一个可读连接的客户端放在一个队列里,即server.clients_pending_read里。等到

IO线程

每个IO线程的入口函数IOThreadMain,对于客户端发送数据到redis时,主线程把所有的读事件按照Round Robin的方式放入每一个IO线程,IO线程干三件比较重要的事情:

  1. 里从它的io_threads_list读事件链表里一个个读取出来,并且read每一个fd的数据,然后放在每个连接的querybuf;

  2. 解析为一个个可执行command,详见 readQueryFromClient的实现;

  3. 设置io_threads_pending[id]为0,即“通知”主线程,已经完成所有的读实现了

void *IOThreadMain(void *myid) {

    /* The ID is the thread number (from 0 to server.iothreads_num-1), and is

     * used by the thread to just manipulate a single sub-array of clients. */

    long id = (unsigned long)myid;

    while(1) {

        /* Wait for start */

        for (int j = 0; j < 1000000; j++) {

            if (io_threads_pending[id] != 0) break;

        }

        /* Give the main thread a chance to stop this thread. */

        if (io_threads_pending[id] == 0) {

            pthread_mutex_lock(&io_threads_mutex[id]);

            pthread_mutex_unlock(&io_threads_mutex[id]);

            continue;

        }

        serverAssert(io_threads_pending[id] != 0);

        if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));

        /* Process: note that the main thread will never touch our list

         * before we drop the pending count to 0\. */

        listIter li;

        listNode *ln;

        listRewind(io_threads_list[id],&li);

        while((ln = listNext(&li))) {

            client *c = listNodeValue(ln);

            if (io_threads_op == IO_THREADS_OP_WRITE) {

                writeToClient(c->fd,c,0);

            } else if (io_threads_op == IO_THREADS_OP_READ) {

                readQueryFromClient(NULL,c->fd,c,0);

            } else {

                serverPanic("io_threads_op value is unknown");

            }

        }

        listEmpty(io_threads_list[id]);

        io_threads_pending[id] = 0;

        if (tio_debug) printf("[%ld] Done\n", id);

    }

}
  • 什么是Pending Read和Pending Write

最后给个图:


redis6.0_multi_threading.jpg
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,214评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,307评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,543评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,221评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,224评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,007评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,313评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,956评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,441评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,925评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,018评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,685评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,234评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,240评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,464评论 1 261
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,467评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,762评论 2 345

推荐阅读更多精彩内容