发布与订阅
一、介绍
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
SUBSCRIBE channel [channel ...] 订阅1-N个频道
UNSUBSCRIBE [channel [channel ...]] 指示客户端退订给定的频道。
PSUBSCRIBE pattern [pattern ...] 订阅一个或多个符合给定模式的频道。
PUNSUBSCRIBE [pattern [pattern ...]] 指示客户端退订所有给定模式。
PUBLISH channel message 将信息 message 发送到指定的频道 channel 。
PUBSUB CHANNELS [pattern] 列出当前的活跃频道。
PUBSUB NUMSUB [channel-1 ... channel-N] 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。
PUBSUB NUMPAT 返回订阅模式的数量
二、数据结构
redis服务器通过一个(频道名称->客户端链表)字典,和一个(匹配模式)链表来完成发布与订阅的功能。
struct redisServer {
.....
dict *pubsub_channels; /*所有频道的订阅关系 字典 (频道名称->client链表)*/
list *pubsub_patterns; /*保存所有订阅频道模式链表 */
.....
}
//pubsub_patterns链表的Node
typedef struct pubsubPattern {
client *client;//用户
robj *pattern;//订阅的频道模式
} pubsubPattern;
三、实现
-
subscribeCommand
//subscribe c c c ... c void subscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]);//订阅每个频道 c->flags |= CLIENT_PUBSUB; } //返回1 成功监听 返回0监听已经存在 int pubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0; if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {//自己的订阅频道字典加入 retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel);//在服务器中订阅频道字典找 if (de == NULL) {//没找到这个频道 创建 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else {//找到拿出来 clients = dictGetVal(de); } listAddNodeTail(clients,c);//加入到list里面去 } addReplyPubsubSubscribed(c,channel);//通知客户端 return retval; }
-
psubscribeCommand
void psubscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]);//订阅匹配频道 c->flags |= CLIENT_PUBSUB; } //加入订阅频道模式成功返回1,否则返回0 int pubsubSubscribePattern(client *c, robj *pattern) { int retval = 0; if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {//没找到的话 retval = 1; pubsubPattern *pat; listAddNodeTail(c->pubsub_patterns,pattern);//加入到c里面的模式频道 incrRefCount(pattern); pat = zmalloc(sizeof(*pat)); pat->pattern = getDecodedObject(pattern); pat->client = c; listAddNodeTail(server.pubsub_patterns,pat);//加入到server里面的模式频道 } addReplyPubsubPatSubscribed(c,pattern); return retval; }
-
publish
void publishCommand(client *c) { int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);//发送一个消息 if (server.cluster_enabled)//如果做了Redis集群 clusterPropagatePublish(c->argv[1],c->argv[2]); else forceCommandPropagation(c,PROPAGATE_REPL);//强制命令传播 addReplyLongLong(c,receivers);//告诉哦用户发送了几个订阅者 } /* 发布一个消息*/ int pubsubPublishMessage(robj *channel, robj *message) { int receivers = 0; dictEntry *de; listNode *ln; listIter li; de = dictFind(server.pubsub_channels,channel); if (de) {//找到此频道的话 list *list = dictGetVal(de); listNode *ln; listIter li; listRewind(list,&li); while ((ln = listNext(&li)) != NULL) { client *c = ln->value; addReplyPubsubMessage(c,channel,message);//发送消息给每一个订阅者 receivers++; } } /* 模式匹配频道*/ if (listLength(server.pubsub_patterns)) { listRewind(server.pubsub_patterns,&li); channel = getDecodedObject(channel); while ((ln = listNext(&li)) != NULL) { pubsubPattern *pat = ln->value; if (stringmatchlen((char*)pat->pattern->ptr, sdslen(pat->pattern->ptr), (char*)channel->ptr, sdslen(channel->ptr),0))//匹配 发送 { addReplyPubsubPatMessage(pat->client, pat->pattern,channel,message); receivers++; } } decrRefCount(channel); } return receivers; }
其中,反向操作就是以上的操作的逆顺序。
四、参考
- https://redis.io/
- 《Redis设计与实现》