死磕Redis5.0订阅和发布

       最近和一些朋友讨论Redis的订阅和发布功能,发现有些公司喜欢用Redis的订阅和发布功能来当作消息中间件来使用,当时我就纳闷,消息中间件比较牛逼的不就是那几个RocketMQ、Kafka、Rabbit MQ等专门的消息中间件么,Redis 的订阅发布功能也能当消息中间件用?带着这个疑问我们一起来探究一下Redis的订阅和发布的实现吧。
       文章分为以下几个部分讲解:
       1. 涉及的命令
       2. 数据结构
       3. 订阅和发布主流程源码分析
       4. Redis 订阅发布功能整的适合做消息中间件吗?

一、涉及的命令

       Redis 订阅和发布非常简单,一共就六个命令:psubscribe、publish、pubsub、punsubscribe、subscribe、unsubscribe。具体命令的使用大家可以参考 黄健宏老师总结的 Redis命令参考,黄健宏老师是我非常崇拜的一个人。黄健宏老师把 redis 所用到的命令都总结好了,我就不在这里再总结一遍了。

二、数据结构

       Redis 订阅和发布有两种类型,一种是频道,还有一种就是模式。我们先看频道的数据结构。
       Redis将所有频道的订阅关系都保存在服务器状态的 pubsub_channels 字典里面,这个字典的键是被某个订阅的频道,而键的值是一个链表,链表里面纪录了所有订阅这个频道的客户端:

// redisServer 中是使用字典保存的,这里保存着全部的频道
struct redisServer {
    // ...
    // 保存所有频道的订阅关系
    dict *pubsub_channels;
    // ...
}
// client 中也会保存自己感兴趣的频道
typedef struct client {
    // client 中的感兴趣的频道
    dict *pubsub_channels;  
} client;

/*
 * 下面通过 pubsub.c 文件中的 pubsubSubscribeChannel 方法
 * 看看 channel 和 client 具体是如何映射的。
 */

/*
 * 将客户订阅到频道。 如果操作成功,则返回1,如果客户端已订阅该频道,则为0。
 */
int pubsubSubscribeChannel(client *c, robj *channel) {
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* 查看 client 是否已经订阅了该频道 */
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
        retval = 1;
        incrRefCount(channel);
        /* 将客户端添加到 channel - >client list 哈希表中 */

        /*
         * 查找指定频道是否在 pubsub_channels 字典中存在,
         * 如果存在直接将客户端添加到 clients 尾部即可。
         * 否则创建一个 clients 链表,然后将 client 添加到 clients 中
         */
        de = dictFind(server.pubsub_channels,channel);
        // 如果根据该 channel 查出的值为 null,说明字典中还没有该频道信息
        if (de == NULL) {
            // 从这里我们可以看出多个客户端是通过链表连接在一起的
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
        } else {
            clients = dictGetVal(de);
        }
        // 频道已经存在,直接添加到尾部
        listAddNodeTail(clients,c);
    }
    ...
}

通过源码我们脑海中应该有个大概的印象了,接着我们举个栗子加深印象。比如:
① client-1、client-2、client-3 三个客户端正在订阅 "order.it" 频道
② client-4 正在订阅 "order.sport" 频道
③ client-5 和 client-6 两个客户端正在订阅 "order.business" 频道
则结构如下图:

image.png

上面就是频道的订阅关系图,模式和频道类似,都是存储到服务器状态中,但是具体的数据结构却大不相同。

struct redisServer {
    // ...
    // 保存所有模式的订阅关系
    list *pubsub_patterns;
    // ...
}
// client 中也会保存自己感兴趣的模式
typedef struct client {
    // client 中的感兴趣的模式
    list *pubsub_patterns;
} client;

/*
 * 我们可以看到 redisServer 中直接就是使用链表来存储模式的
 * 下面我们看看具体的模式和 客户端的映射关系吧
 */
/**
 * 订阅模式的结构体
 * 也就是 pubsub_patterns 链表中保存的结构
 */
typedef struct pubsubPattern {
    /**
     * 客户端
     */
    client *client;
    /**
     * 模式
     */
    robj *pattern;
} pubsubPattern;


/*
 * 下面我们看看 Redis 是如何构造 pubsubPattern 并添加到 pubsub_patterns 中
 * 通过 pubsub.c 中的 pubsubSubscribePattern 方法我们可以看到全过程
 */

int pubsubSubscribePattern(client *c, robj *pattern) {
    int retval = 0;
    // 查看 client 自己是否已经订阅该模式
    if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
        retval = 1;
        pubsubPattern *pat;
        // 没有订阅则将 pubsubPattern 结构体加到 client 的 pubsub_patterns 中
        listAddNodeTail(c->pubsub_patterns,pattern);
        incrRefCount(pattern);
        pat = zmalloc(sizeof(*pat));
        pat->pattern = getDecodedObject(pattern);
        pat->client = c;
        // 将该模式和订阅该模式的client 添加到服务端的 pubsub_patterns 链表中
        listAddNodeTail(server.pubsub_patterns,pat);
    }
    ...
}

举个 demo,比如:
① client-7 正在订阅 "music."。
② client-8 正在订阅 "book.
"。
③ client-9 正在订阅 "order.*".
则结构图如下

image.png

       到这里Redis 的频道和模式的数据结构就解剖完了,同学们都理解了么?看完频道和模式的数据结构,不知道同学们有没有这样的疑问,频道和模式到底有啥区别呢?下面我们就来看看他们之间到底有什么区别。我们还是通过 demo来了解吧。
       现在我们有 client-1、client-2、client-3、client-4 个客户端,我们让 client-1 订阅"order.create"频道,让 client-2 订阅 "order.waitpay",让 client-3 订阅 "order.pay" 频道,让 client-4 订阅 "order.*" 模式。然后我们分别往 "order.create"、"order.waitpay"、"order.pay" 发送消息,我们看看每个客户端有何变化。
client-1 订阅 order.create 频道:subscribe order.create


image.png

client-2 订阅 order.waitpay 频道:subscribe order.waitpay

image.png

client-3 订阅 order.pay 频道:subscribe order.pay

image.png

client-4 订阅 order.* 模式:psubscribe order.*

image.png

然后我们使用一个客户端分别往这几个客户端发送消息:

image.png

然后我们看看每个客户端之间的变化
client-1:

image.png

client-2:

image.png

client-3:

image.png

client-4:

image.png

我们看到client-1、client-2、client-3都只接受了和自己频道相关的消息,但是 client-4 把发向 client-1、client-2、client-3 的消息都接收了,现在大家应该明白了吧,模式其实就是模式匹配的概念,order.* 就表示匹配所有和 order 相关的消息。

三、订阅和发布的源码分析

我们就拿 publish order.create "order create" 这条消息来分析吧!直接上源码分析:

/**
 * 发布一条消息
 *
 * 时间复杂度 O(N+M),其中 N 是频道 channel 的订阅者数量,而 M 则是使用
 * 模式订阅(subscribed patterns)的客户端的数量。
 * 
 * @param channel 频道
 * @param message 消息体
 * @return 接收到信息 message 的订阅者数量
 */
int pubsubPublishMessage(robj *channel, robj *message) {
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* 发送给监听该频道的客户端 */
    // 根据键值 channel 从字典中获取 dictEntry 对象
    de = dictFind(server.pubsub_channels,channel);
    if (de) {
        // 从 dictEntry 中获取监听 channel 的 client list
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        // 循环整个订阅消息的列表,然后发送消息
        while ((ln = listNext(&li)) != NULL) {
            client *c = ln->value;
            // 往指定的客户端输出缓冲区中发送消息
            // todo: 如果 client 消费消息不及时,那么 client 输出缓冲区
                    // 就会造成消息堆积,会使 redis 内存突然增大
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        }
    }
    /* 往监听了 channel 模式的 client 发送消息*/
    if (listLength(server.pubsub_patterns)) {
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        // 循环整个模式链表
        while ((ln = listNext(&li)) != NULL) {
            pubsubPattern *pat = ln->value;
            // 匹配指定的模式,找出指定模式对应的客户端,然后往
                    // 订阅该模式的客户端发送消息
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) {
                // 往指定的客户端输出缓冲区中发送消息
                // todo: 如果 client 消费消息不及时,那么 client 输出缓冲区
                // 就会造成消息堆积,会使 redis 内存突然增大
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            }
        }
        decrRefCount(channel);
    }
    return receivers;
}

流程图如下:

image.png

四、Redis 订阅发布功能整的适合做消息中间件吗?

       通过上面的分析,我想大家心里应该都已经有答案了。我们根据上面的源码分析,可以举一个小 demo,Redis 发送消息,是循环订阅者列表实现的,比如我有 100 个频道,每个频道有100个订阅者,由于是单线程,岂不是要循环处理,那么最后一个频道的最后一个订阅者岂不是会等死去。使用 redis 做消息中间件的,redis 并没有提供消息重试机制,也没有提供消息确认机制,更没有提供消息的持久化,所以一旦消息丢失,我们是没有任何办法的。而且现在突然订阅方断线,那么他将会丢失所有在短线期间发布者发布的消息,这个决定会让很多人都感到失望吧。所以还是建议大家不要使用 Redis 做消息中间件了,存在很大的风险。如果要用,还是使用强大的 RocketMQ 或 Kafka 吧。
       文章到这里就结束了,本人水平有限,写的不好还请大家多多见谅,如有不对的地方,希望大家多提意见,我也会尽快改正。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容