之前进程间通信的知识都是应付面试的时候临时补的,最近还是要通过码代码来学习下,主要参考《Linux/Unix系统编程手册》,本来照着这本书的示例一路敲下来就足够了,但是书上给的都是比较完整的例子(毕竟便于即敲即用嘛),这里我尽量简化。
1. 准备
这里给出下文可能用到的变量的定义
mqd_t mqd; // 消息队列句柄,可用于IO多路复用
struct mq_attr attr; // 消息队列属性,下面为字段说明,[]内为可以设置/获取该字段的API
// .mq_flags 0或者O_NONBLOCK [mq_getattr(), mq_setattr()]
// .mq_maxmsg 最大消息数量 [mq_open(), mq_getattr()]
// .mq_msgsize 最大消息大小 [mq_open(), mq_getattr()]
// .mq_curmsgs 队列中消息数量 [mq_getattr()]
const char* mq_name = "/mq"; // 消息队列名称,必须以'/'开头
消息队列的maxmsg
和msgsize
都是在创建时指定的,之后不能改变。是否非阻塞读写则可以手动设定。消息数量也是只读的,是随着消息队列的读取/写入而改变的,每次写入则加1,每次读取则减1。
另外,相关API都是返回-1作为错误码,之后API说明略去检查返回值的步骤。
gcc
编译时需要加上-lrt
选项,动态链接到共享库librt.so
。
2. 基本API
创建消息队列
// 参数2和3同系统调用open(2),参数4设置自定义消息队列属性,若为NULL则不设置,也可以不要参数4
mqd = mq_open(mq_name, O_CREAT | O_EXCL, S_IRUSR | S_IWUSR, &attr);
生产者
下列代码发送3个消息到队列中
mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
const char* messages[] = {"msg-1", "msg-2", "msg-3"};
int priorities[] = {4, 0, 6};
for (int i = 0; i < 3; ++i)
mq_send(mqd, messages[i], strlen(messages[i]), priorities[i]);
注意对现有的消息队列调用mq_open
时,第2个参数不带O_CREAT
时不需要指定权限。
发送时需要指定优先级,消息按照优先级降序存在消息队列中,因此队列中的消息依次是"msg-3", "msg-1", "msg-2"
。
消费者
读取消息队列中所有消息并显示优先级
mqd = mq_open(mq_name, O_WRONLY | O_NONBLOCK);
mq_getattr(mqd, &attr); // 取得队列属性,从而定义合适大小的缓冲区
char* buffer = new buffer[attr.mq_msgsize + 1];
unsigned int priority;
ssize_t num_read;
while ((num_read = mq_receive(mqd, buffer, attr.mq_msgsize, &priority)) != -1) {
buffer[num_read] = '\0';
printf("[%2u] %s\n", priority, buffer);
}
// 若errno != EAGAIN则需要处理错误,非阻塞模式下若队列为空errno会被设置为EAGAIN
delete[] buffer;
另外关于EINTR
错误,若设置信号处理器时指定了SA_RESTART
标志,则mq_receive
和mq_send
会自动重启,所以无需代码处理。但是这针对的是阻塞式I/O,参考man 7 signal
如下内容
If a blocked call to one of the following interfaces is interrupted by a signal handler, then the call will be automatically restarted after the signal handler returns if the SA_RESTART flag was used; otherwise the call will fail with the error EINTR
至于非阻塞式I/O,个人觉得根本就不会被打断,因为没有意义,但是实际情况还是得看内核怎么实现,有篇讨论可以参考。EINTR and non-blocking calls
超时读写
mq_timedsend()
和mq_timedreceive
相比mq_send()
和mq_receive()
仅仅多出一个参数const struct timespec *abs_timeout
,用于指定超时时间,仅仅在O_NONBLOCK
标记不起作用时才有效。
struct timespec
{
__time_t tv_sec; /* Seconds. */
__syscall_slong_t tv_nsec; /* Nanoseconds. */
};
注意该参数设置的是绝对时间,因此可以用clock_gettime()
来获取CLOCK_REALTIME
时钟的当前值,并在该值上加上所需的时间量来生成一个恰当初始化的timespec
结构。
2. 消息通知
示例程序的流程是顺序的,先创建消息队列,再写入若干消息,再依次读取所有消息。实际上用于进程间通信时,写者和读者是并发执行的,即生产者-消费者问题。
POSIX消息队列提供了消息通知API,在队列为空时若有新的消息到来,能够接收通知。通过下列API可以注册通知,指定具体通知方式(比如信号)后,若新消息到来使得队列从空变成非空,就会调用自定义的通知处理函数。
int mq_notify(mqd_t mqdes, const struct sigevent *sevp);
以下几点需要特别注意:
- 任一时刻只能有1个进程能够向特定消息队列注册通知,如果已经存在,再次注册会失败,
errno
被置为EBUSY
。 - 若对非空队列注册通知,只有等到队列被清空后,新消息到来时才能发出通知。
- 若向注册进程发送了一个通知之后就会删除注册信息,这样其他进程就可以向队列注册通知。
- 若有其他进程调用
mq_receive()
发生阻塞,则有新消息到来时,其他进程接收消息,注册进程继续等待通知。 - 可以传入NULL来撤销通知。
通过man 7 sigevent
可以查看该结构的详细信息
union sigval { /* 通知传递的数据 */
int sival_int; /* Integer value */
void *sival_ptr; /* Pointer value */
};
struct sigevent {
int sigev_notify; /* 通知方法 */
int sigev_signo; /* 通知信号 */
union sigval sigev_value; /* 通知传递的数据 */
void (*sigev_notify_function) (union sigval);
/* 线程通知的函数 (SIGEV_THREAD) */
void *sigev_notify_attributes;
/* 线程通知的线程属性 (SIGEV_THREAD) */
pid_t sigev_notify_thread_id;
/* 用于接收信号的线程ID (SIGEV_THREAD_ID) */
};
其中sigev_notify
指定通知的方法,有SIGEV_NONE
(不作任何处理)、SIGEV_SIGNAL
(使用信号通知)和SIGEV_THREAD
(使用线程通知)。
用示例代码解释典型用法(忽略了错误处理),预定义变量如下
struct sigevent sev;
constexpr int NOTIFY_SIG = SIGUSR1; // 自定义通知信号的种类
使用信号通知的框架如下
int main() {
// ...
signal(SIGUSR1, [](int){}); // 仅仅用于跳出sigsuspend
sev.sigev_notify = SIGEV_SIGNAL;
sev.sigev_signo = NOTIFY_SIG;
mq_notify(mqd, &sev);
sigset_t empty_mask;
sigemptyset(&empty_mask);
while (true) {
sigsuspend(&empty_mask);
mq_notify(mqd, &sev);
// TODO: 处理由空变为非空的消息队列
}
}
使用线程通知的框架如下
static void notifySetup(mqd_t& mqd); // 注册通知线程,函数的前置声明
static void notifyFunc(union sigval sv) {
auto& mqd = *reinterpret_cast<mqd_t*>(sv.sival_ptr);
notifySetup(mqd);
// TODO: 处理由空变为非空的消息队列
}
static void notifySetup(mqd_t& mqd) {
sev.sigev_notify = SIGEV_THREAD;
sev.sigev_notify_function = notifyFunc;
sev.sigev_notify_attributes = NULL;
sev.sigev_value.sival_ptr = &mqd;
mq_notify(mqd, &sev);
}
int main() {
// ...
notifySetup(mqd);
pause(); // 主线程永远中止,因为定时器通知是在一个单独的线程中调用notifyFunc()来分发的
}
3. 相关系统命令
常见的命令,ipcs
显示IPC状态,ipcrm
释放IPC。但是它们都只能用于System V IPC,对POSIX IPC没有作用。POSIX IPC被实现成了虚拟文件系统的文件,可以直接使用ls
和rm
来列出和删除这些文件,挂载在/dev/
目录下,因此可以用常见的Linux命令来管理。
/dev/mqueue/xxx
:记录了消息队列/xxx
的状态,这也解释了为何IPC名字要以/
开头。
# cat /dev/mqueue/mq
QSIZE:13 NOTIFY:0 SIGNO:10 NOTIFY_PID:30527
其中QSIZE
为所有未消费的消息大小之和,并且进程30527在等待信号10(SIGUSR1)的通知。这是我在启动之前的mq_notify
示例程序监听所致。
如果rm
掉该文件,那么之后消息队列也无效了。
/proc/sys/fs/mqueue/
下记录了一些文件
-
msg_default
:消息队列默认属性的mq_maxmsg
字段的值; -
msg_max
:消息队列默认属性的mq_maxmsg
字段的上限; -
msgsize_default
:消息队列默认属性的mq_msgsize
字段的值; -
msgsize_max
:消息队列默认属性的mq_msgsize
字段的上限; -
queues_max
:系统可以创建的消息队列个数上限。
# tail -n +1 /proc/sys/fs/mqueue/* | grep -v "^$"
==> /proc/sys/fs/mqueue/msg_default <==
10
==> /proc/sys/fs/mqueue/msg_max <==
10
==> /proc/sys/fs/mqueue/msgsize_default <==
8192
==> /proc/sys/fs/mqueue/msgsize_max <==
8192
==> /proc/sys/fs/mqueue/queues_max <==
256
其中tail
的+NUM
代表打印第NUM行开始的内容。
4. I/O多路复用
POSIX消息队列相比System V消息队列的最大优点就是,mqd_t
类型的句柄可以被select/poll/epoll
监听。
示例代码epoll_consumer.cc
5. 总结
消息队列本质是消息组成的链表,允许进程以消息的形式交换数据,和数据报socket一样。不同于TCP的流式传输,消息具有边界,N次写入对应N次读取,若读取缓冲区太小,则剩余的部分会被舍弃,而不会留给下次继续读。
队列具有容量上限和单条消息的大小上限,队列填满时,写入操作会被阻塞,非阻塞模式下会失败并设置errno
为EAGAIN
,因此采用非阻塞模式能用循环读取整个队列而进行后续操作。单次发送的数据超过消息大小上限时,会发送失败。
相比System V消息队列,POSIX消息队列的优点是:
- 支持I/O多路复用;
- 支持队列从空变为非空时的消息异步通知。
- 如同其他POSIX IPC,POSIX消息队列维护了引用计数,支持安全地删除。
缺点在于可移植性较差,因为诞生较晚。大多情况下这不是问题,不维护老代码的话完全可以用POSIX消息队列。另一方面POSIX消息队列严格按照优先级排序,System V消息队列支持按照消息的类型字段来读取消息。