IPC学习之POSIX消息队列

之前进程间通信的知识都是应付面试的时候临时补的,最近还是要通过码代码来学习下,主要参考《Linux/Unix系统编程手册》,本来照着这本书的示例一路敲下来就足够了,但是书上给的都是比较完整的例子(毕竟便于即敲即用嘛),这里我尽量简化。

1. 准备

这里给出下文可能用到的变量的定义

mqd_t mqd;  // 消息队列句柄,可用于IO多路复用
struct mq_attr attr;  // 消息队列属性,下面为字段说明,[]内为可以设置/获取该字段的API
                      // .mq_flags 0或者O_NONBLOCK [mq_getattr(), mq_setattr()]
                      // .mq_maxmsg 最大消息数量 [mq_open(), mq_getattr()]
                      // .mq_msgsize 最大消息大小 [mq_open(), mq_getattr()]
                      // .mq_curmsgs 队列中消息数量 [mq_getattr()]
const char* mq_name = "/mq";  // 消息队列名称,必须以'/'开头

消息队列的maxmsgmsgsize都是在创建时指定的,之后不能改变。是否非阻塞读写则可以手动设定。消息数量也是只读的,是随着消息队列的读取/写入而改变的,每次写入则加1,每次读取则减1。
另外,相关API都是返回-1作为错误码,之后API说明略去检查返回值的步骤。
gcc编译时需要加上-lrt选项,动态链接到共享库librt.so

2. 基本API

创建消息队列

// 参数2和3同系统调用open(2),参数4设置自定义消息队列属性,若为NULL则不设置,也可以不要参数4
mqd = mq_open(mq_name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, &attr);

生产者

下列代码发送3个消息到队列中

mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
const char* messages[] = {"msg-1", "msg-2", "msg-3"};
int priorities[] = {4, 0, 6};
for (int i = 0; i < 3; ++i)
    mq_send(mqd, messages[i], strlen(messages[i]), priorities[i]);

注意对现有的消息队列调用mq_open时,第2个参数不带O_CREAT时不需要指定权限。
发送时需要指定优先级,消息按照优先级降序存在消息队列中,因此队列中的消息依次是"msg-3", "msg-1", "msg-2"

消费者

读取消息队列中所有消息并显示优先级

mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
mq_getattr(mqd, &attr);  // 取得队列属性,从而定义合适大小的缓冲区
char* buffer = new buffer[attr.mq_msgsize + 1];
unsigned int priority;
ssize_t num_read;
while ((num_read = mq_receive(mqd, buffer, attr.mq_msgsize, &priority)) != -1) {
    buffer[num_read] = '\0';
    printf("[%2u] %s\n", priority, buffer); 
}
// 若errno != EAGAIN则需要处理错误,非阻塞模式下若队列为空errno会被设置为EAGAIN
delete[] buffer;

另外关于EINTR错误,若设置信号处理器时指定了SA_RESTART标志,则mq_receivemq_send会自动重启,所以无需代码处理。但是这针对的是阻塞式I/O,参考man 7 signal如下内容

If a blocked call to one of the following interfaces is interrupted by a signal handler, then the call will be automatically restarted after the signal handler returns if the SA_RESTART flag was used; otherwise the call will fail with the error EINTR

至于非阻塞式I/O,个人觉得根本就不会被打断,因为没有意义,但是实际情况还是得看内核怎么实现,有篇讨论可以参考。EINTR and non-blocking calls

超时读写

mq_timedsend()mq_timedreceive相比mq_send()mq_receive()仅仅多出一个参数const struct timespec *abs_timeout,用于指定超时时间,仅仅在O_NONBLOCK标记不起作用时才有效。

 struct timespec
   {     
     __time_t tv_sec;        /* Seconds.  */
     __syscall_slong_t tv_nsec;  /* Nanoseconds.  */                                                                                                             
   };    

注意该参数设置的是绝对时间,因此可以用clock_gettime()来获取CLOCK_REALTIME时钟的当前值,并在该值上加上所需的时间量来生成一个恰当初始化的timespec结构。

2. 消息通知

示例程序的流程是顺序的,先创建消息队列,再写入若干消息,再依次读取所有消息。实际上用于进程间通信时,写者和读者是并发执行的,即生产者-消费者问题。
POSIX消息队列提供了消息通知API,在队列为空时若有新的消息到来,能够接收通知。通过下列API可以注册通知,指定具体通知方式(比如信号)后,若新消息到来使得队列从空变成非空,就会调用自定义的通知处理函数。

int mq_notify(mqd_t mqdes, const struct sigevent *sevp);

以下几点需要特别注意

  • 任一时刻只能有1个进程能够向特定消息队列注册通知,如果已经存在,再次注册会失败,errno被置为EBUSY
  • 若对非空队列注册通知,只有等到队列被清空后,新消息到来时才能发出通知。
  • 若向注册进程发送了一个通知之后就会删除注册信息,这样其他进程就可以向队列注册通知。
  • 若有其他进程调用mq_receive()发生阻塞,则有新消息到来时,其他进程接收消息,注册进程继续等待通知。
  • 可以传入NULL来撤销通知。

通过man 7 sigevent可以查看该结构的详细信息

       union sigval {          /* 通知传递的数据 */
           int     sival_int;         /* Integer value */
           void   *sival_ptr;         /* Pointer value */
       };

       struct sigevent {
           int          sigev_notify; /* 通知方法 */
           int          sigev_signo;  /* 通知信号 */
           union sigval sigev_value;  /* 通知传递的数据 */
           void       (*sigev_notify_function) (union sigval);
                            /* 线程通知的函数 (SIGEV_THREAD) */
           void        *sigev_notify_attributes;
                            /* 线程通知的线程属性 (SIGEV_THREAD) */
           pid_t        sigev_notify_thread_id;
                            /* 用于接收信号的线程ID (SIGEV_THREAD_ID) */
       };

其中sigev_notify指定通知的方法,有SIGEV_NONE(不作任何处理)、SIGEV_SIGNAL(使用信号通知)和SIGEV_THREAD(使用线程通知)。
用示例代码解释典型用法(忽略了错误处理),预定义变量如下

struct sigevent sev;
constexpr int NOTIFY_SIG = SIGUSR1;  // 自定义通知信号的种类

使用信号通知的框架如下

int main() {
    // ...
    signal(SIGUSR1, [](int){});  // 仅仅用于跳出sigsuspend

    sev.sigev_notify = SIGEV_SIGNAL;
    sev.sigev_signo = NOTIFY_SIG;
    mq_notify(mqd, &sev);

    sigset_t empty_mask;
    sigemptyset(&empty_mask);
    while (true) {
        sigsuspend(&empty_mask);
        mq_notify(mqd, &sev);
        // TODO: 处理由空变为非空的消息队列
    }
}

使用线程通知的框架如下

static void notifySetup(mqd_t& mqd);  // 注册通知线程,函数的前置声明

static void notifyFunc(union sigval sv) {
    auto& mqd = *reinterpret_cast<mqd_t*>(sv.sival_ptr);
    notifySetup(mqd);
    // TODO: 处理由空变为非空的消息队列
}

static void notifySetup(mqd_t& mqd) {
    sev.sigev_notify = SIGEV_THREAD;
    sev.sigev_notify_function = notifyFunc;
    sev.sigev_notify_attributes = NULL;
    sev.sigev_value.sival_ptr = &mqd;
    mq_notify(mqd, &sev);
}

int main() {
    // ...
    notifySetup(mqd);
    pause();  // 主线程永远中止,因为定时器通知是在一个单独的线程中调用notifyFunc()来分发的
}

3. 相关系统命令

常见的命令,ipcs显示IPC状态,ipcrm释放IPC。但是它们都只能用于System V IPC,对POSIX IPC没有作用。POSIX IPC被实现成了虚拟文件系统的文件,可以直接使用lsrm来列出和删除这些文件,挂载在/dev/目录下,因此可以用常见的Linux命令来管理。
/dev/mqueue/xxx:记录了消息队列/xxx的状态,这也解释了为何IPC名字要以/开头。

# cat /dev/mqueue/mq
QSIZE:13         NOTIFY:0     SIGNO:10    NOTIFY_PID:30527 

其中QSIZE为所有未消费的消息大小之和,并且进程30527在等待信号10(SIGUSR1)的通知。这是我在启动之前的mq_notify示例程序监听所致。
如果rm掉该文件,那么之后消息队列也无效了。
/proc/sys/fs/mqueue/下记录了一些文件

  • msg_default:消息队列默认属性的mq_maxmsg字段的值;
  • msg_max:消息队列默认属性的mq_maxmsg字段的上限;
  • msgsize_default:消息队列默认属性的mq_msgsize字段的值;
  • msgsize_max:消息队列默认属性的mq_msgsize字段的上限;
  • queues_max:系统可以创建的消息队列个数上限。
# tail -n +1 /proc/sys/fs/mqueue/* | grep -v "^$"
==> /proc/sys/fs/mqueue/msg_default <==
10
==> /proc/sys/fs/mqueue/msg_max <==
10
==> /proc/sys/fs/mqueue/msgsize_default <==
8192
==> /proc/sys/fs/mqueue/msgsize_max <==
8192
==> /proc/sys/fs/mqueue/queues_max <==
256

其中tail+NUM代表打印第NUM行开始的内容。

4. I/O多路复用

POSIX消息队列相比System V消息队列的最大优点就是,mqd_t类型的句柄可以被select/poll/epoll监听。
示例代码epoll_consumer.cc

5. 总结

消息队列本质是消息组成的链表,允许进程以消息的形式交换数据,和数据报socket一样。不同于TCP的流式传输,消息具有边界,N次写入对应N次读取,若读取缓冲区太小,则剩余的部分会被舍弃,而不会留给下次继续读。
队列具有容量上限和单条消息的大小上限,队列填满时,写入操作会被阻塞,非阻塞模式下会失败并设置errnoEAGAIN,因此采用非阻塞模式能用循环读取整个队列而进行后续操作。单次发送的数据超过消息大小上限时,会发送失败。
相比System V消息队列,POSIX消息队列的优点是:

  1. 支持I/O多路复用;
  2. 支持队列从空变为非空时的消息异步通知。
  3. 如同其他POSIX IPC,POSIX消息队列维护了引用计数,支持安全地删除。

缺点在于可移植性较差,因为诞生较晚。大多情况下这不是问题,不维护老代码的话完全可以用POSIX消息队列。另一方面POSIX消息队列严格按照优先级排序,System V消息队列支持按照消息的类型字段来读取消息。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,362评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,330评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,247评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,560评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,580评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,569评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,929评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,587评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,840评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,596评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,678评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,366评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,945评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,929评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,165评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,271评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,403评论 2 342