skynet源码分析(2)--消息队列mq

作者:shihuaping0918@163.com,转载请注明作者

消息队列是skynet的核心功能之一,它的功能说白了就是入队出队,先进先出,这个数据结构都有讲过。源码实现在skynet_mq.h和skynet_mq.c中。下面来看一下这两个文件。

#ifndef SKYNET_MESSAGE_QUEUE_H
#define SKYNET_MESSAGE_QUEUE_H

#include <stdlib.h>
#include <stdint.h>
//消息结构体
struct skynet_message {
    uint32_t source; //从哪里来
    int session; //参考云风的博客,这个session是个标识
    void * data; //消息体
    size_t sz;//长度
};

// type is encoding in skynet_message.sz high 8bit
#define MESSAGE_TYPE_MASK (SIZE_MAX >> 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)

struct message_queue;
//全局消息入队
void skynet_globalmq_push(struct message_queue * queue);
//全局消息出队
struct message_queue * skynet_globalmq_pop(void);
//创建一个消息队列
struct message_queue * skynet_mq_create(uint32_t handle);
void skynet_mq_mark_release(struct message_queue *q);
//消息移除
typedef void (*message_drop)(struct skynet_message *, void *);
//队列释放
void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud);
//消息处理者handler
uint32_t skynet_mq_handle(struct message_queue *);

// 0 for success
//消息出队
int skynet_mq_pop(struct message_queue *q, struct skynet_message *message);
//消息入队
void skynet_mq_push(struct message_queue *q, struct skynet_message *message);

// return the length of message queue, for debug
int skynet_mq_length(struct message_queue *q);
int skynet_mq_overload(struct message_queue *q);

void skynet_mq_init();

#endif

上面有价值的东西实际就是消息的结构体,其它都是声明而已。

//默认队列长度为64
#define DEFAULT_QUEUE_SIZE 64
//最大长度为max(16bit)+1=65536
#define MAX_GLOBAL_MQ 0x10000

// 0 means mq is not in global mq.
// 1 means mq is in global mq , or the message is dispatching.

#define MQ_IN_GLOBAL 1
#define MQ_OVERLOAD 1024

struct message_queue {
    struct spinlock lock;
    uint32_t handle;  //目标handler
    int cap; //容量
    int head; //头位置
    int tail; //末尾位置
    int release; //释放标记
    int in_global; //是否在全局队列中
    int overload; //最大负载
    int overload_threshold; //最大负载阀值
    struct skynet_message *queue; //循环数组
    struct message_queue *next; //下一个队列,链表
};
//全局消息队列,链表
struct global_queue {
    struct message_queue *head; //头
    struct message_queue *tail; //尾
    struct spinlock lock;
};

static struct global_queue *Q = NULL;

void 
skynet_globalmq_push(struct message_queue * queue) {
    struct global_queue *q= Q;

    SPIN_LOCK(q)
    assert(queue->next == NULL);
    if(q->tail) { //链表不为空
        q->tail->next = queue;
        q->tail = queue;
    } else { //链表为空
        q->head = q->tail = queue;
    }
    SPIN_UNLOCK(q)
}

//取链表中第一个消息队列
struct message_queue * 
skynet_globalmq_pop() {
    struct global_queue *q = Q;

    SPIN_LOCK(q)
    struct message_queue *mq = q->head;
    if(mq) {
         //注意这里,队列取出来后,就从链表中删除了
        q->head = mq->next;
        if(q->head == NULL) {
            assert(mq == q->tail);
            q->tail = NULL;
        }
        mq->next = NULL;
    }
    SPIN_UNLOCK(q)

    return mq;
}
//创建一个消息队列
struct message_queue * 
skynet_mq_create(uint32_t handle) {
    struct message_queue *q = skynet_malloc(sizeof(*q));
    q->handle = handle;
    q->cap = DEFAULT_QUEUE_SIZE;
    q->head = 0;//刚开始头为0
    q->tail = 0;//刚开始尾也为0
    SPIN_INIT(q)
    // When the queue is create (always between service create and service init) ,
    // set in_global flag to avoid push it to global queue .
    // If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.
    q->in_global = MQ_IN_GLOBAL;
    q->release = 0;
    q->overload = 0;
    q->overload_threshold = MQ_OVERLOAD;
    ///这里分配的是数组,是数组,是数组
    q->queue = skynet_malloc(sizeof(struct skynet_message) * q->cap);
    q->next = NULL;

    return q;
}
//释放队列,回收内存
static void 
_release(struct message_queue *q) {
    assert(q->next == NULL);
    SPIN_DESTROY(q)
    skynet_free(q->queue);
    skynet_free(q);
}
//返回队列的handler
uint32_t 
skynet_mq_handle(struct message_queue *q) {
    return q->handle;
}
//获取队列长度,注意数组被循环使用的情况
int
skynet_mq_length(struct message_queue *q) {
    int head, tail,cap;

    SPIN_LOCK(q)
    head = q->head;
    tail = q->tail;
    cap = q->cap;
    SPIN_UNLOCK(q)
    //当还没有循环使用数组的时候
    if (head <= tail) {
        return tail - head;
    }
    //当数组已经被循环使用的时候
    return tail + cap - head;
}
//获取负载情况
int
skynet_mq_overload(struct message_queue *q) {
    if (q->overload) {
        int overload = q->overload;
        q->overload = 0; //这里清零是为了避免持续产生报警,在skynet-server.c中
        return overload;
    } 
    return 0;
}

//消息队列出队,从数组中出队
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {
    int ret = 1;
    SPIN_LOCK(q)
    //说明队列不是空的
    if (q->head != q->tail) {
        *message = q->queue[q->head++];  //注意head++,数据不移动,移动的是游标
        ret = 0;
        int head = q->head;
        int tail = q->tail;
        int cap = q->cap;
    //因为是循环数组,超出边界后要重头开始,所以设为0
        if (head >= cap) {
            q->head = head = 0;
        }
        //如果数组被循环使用了,那么tail < head
        int length = tail - head;
        if (length < 0) {
            length += cap;
        }
        //长度要超过阀值了,扩容一倍,和c++的vector一样的策略
        while (length > q->overload_threshold) {
            q->overload = length;
            q->overload_threshold *= 2;
        }
    } else { //队列是空的
        // reset overload_threshold when queue is empty
        q->overload_threshold = MQ_OVERLOAD;
    }

    if (ret) {
        q->in_global = 0;
    }
    
    SPIN_UNLOCK(q)

    return ret;
}

//为了方便和上面的函数对比,我把skynet_mq_push提上来了
void 
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
    assert(message);
    SPIN_LOCK(q)
//入队
    q->queue[q->tail] = *message;
//因为是循环数组,越界了要重头开始
    if (++ q->tail >= q->cap) {
        q->tail = 0;
    }
//如果首尾重叠了,要扩展
    if (q->head == q->tail) {
        expand_queue(q);
    }
//重新放回全局队列中
    if (q->in_global == 0) {
        q->in_global = MQ_IN_GLOBAL;
        skynet_globalmq_push(q);
    }
    
    SPIN_UNLOCK(q)
}
//扩展循环数组
static void
expand_queue(struct message_queue *q) {
//新建一个数组
    struct skynet_message *new_queue = skynet_malloc(sizeof(struct skynet_message) * q->cap * 2);
    int i;
    for (i=0;i<q->cap;i++) { //老数据拷过来
        new_queue[i] = q->queue[(q->head + i) % q->cap];
    }
    q->head = 0; //重设head
    q->tail = q->cap; //重设tail
    q->cap *= 2;
    
    skynet_free(q->queue); //释放老数组
    q->queue = new_queue;
}

//初始化全局队列
void 
skynet_mq_init() {
    struct global_queue *q = skynet_malloc(sizeof(*q));
    memset(q,0,sizeof(*q));
    SPIN_INIT(q);
    Q=q;
}
//服务释放标记
void 
skynet_mq_mark_release(struct message_queue *q) {
    SPIN_LOCK(q)
    assert(q->release == 0);
    q->release = 1;
    if (q->in_global != MQ_IN_GLOBAL) {
        skynet_globalmq_push(q);
    }
    SPIN_UNLOCK(q)
}
//释放服务,清空循环数组
static void
_drop_queue(struct message_queue *q, message_drop drop_func, void *ud) {
    struct skynet_message msg;
    while(!skynet_mq_pop(q, &msg)) {
        drop_func(&msg, ud);
    }
    _release(q); //回收内存
}
//释放服务相关的队列
void 
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {
    SPIN_LOCK(q)
    
    if (q->release) {
        SPIN_UNLOCK(q)
        _drop_queue(q, drop_func, ud);
    } else {
        skynet_globalmq_push(q);
        SPIN_UNLOCK(q)
    }
}

代码到此分析结束,可以看出来,skynet的消息队列实际上是有两种,一种是全局消息队列,一种是服务消息队列。每个服务都有自己的消息队列,每个服务消息队列中都有服务的handle标识。这个涉及到消息的派发,这里就不展开了。每个服务消息队列被全局消息队列引用。

全局消息队列用的是经典的链表来实现的,而服务的消息队列用的是比较不直观,可能对有些人来说理解起来特别困难的循环数组来实现的。而且数组空间不够的时候,会动态扩展,容量扩展为当前容量的2倍。

消息队列的出队入队函数名都比较简单而且明了,push/pop。这个名字可能会带来一定的误解,如果改成enqueue/dequeue的话,就更符合它的实际功能。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,319评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,801评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,567评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,156评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,019评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,090评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,500评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,192评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,474评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,566评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,338评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,212评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,572评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,890评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,169评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,478评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,661评论 2 335

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,495评论 18 139
  • 引言: 一直都是从事客户端的开发工作,最近抽了点时间想了解一下服务器开发的相关知识,一番博客瞎逛之后,发现了一个不...
    linshuhe1阅读 5,120评论 0 10
  • 一、 消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用耦合、异步消息、流量削锋等问题。实现高性能...
    步积阅读 56,771评论 10 138
  • 消息队列设计精要 消息队列已经逐渐成为企业IT系统内部通信的核心手段。它具有低耦合、可靠投递、广播、流量控制、最终...
    meng_philip123阅读 1,504评论 1 25
  • 第一次见到阿金的男友,我就很不舒服。那眼镜男是个整形医生,一双小眼睛顺着我的头顶看到脚底,开口第一句话就是,詹小姐...
    红酥手贱阅读 703评论 2 1