消息队列的概念

消息队列就是一个消息的链表,每个消息队列都有一个队列头,用结构struct msg_queue来描述。队列头中包含了该队列的大量信息,包括消息队列的键值、用户ID、组ID、消息数目、读写进程ID等。其定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
struct msg_queue
{
struct ipc_perm q_perm;
time_t q_stime; // last msgsnd time
time_t q_rtime; // last msgrcv time
time_t q_ctime; // last change time
unsigned long q_cbytes; // current number of bytes on queue
unsigned long q_qnum; // number of message in queue
unsigned long q_qbytes; // max number of bytes on queue
pid_t q_lspid; // pid of last msgsnd
pid_t q_lrpid; // last receive pid
struct list_head q_messages;
struct list_head q_receives;
struct list_head q_senders;
};

结构体msqid_ds用来设置或返回消息队列的信息,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 摘自所用ubuntu18.04电脑中的/usr/include/i386-linux-gnu/bits/msq.h
struct msqid_ds
{
struct ipc_perm msg_perm; /* structure describing operation permission */
__time_t msg_stime; /* time of last msgsnd command */
#ifndef __x86_64__
unsigned long int __glibc_reserved1;
#endif
__time_t msg_rtime; /* time of last msgrcv command */
#ifndef __x86_64__
unsigned long int __glibc_reserved2;
#endif
__time_t msg_ctime; /* time of last change */
#ifndef __x86_64__
unsigned long int __glibc_reserved3;
#endif
__syscall_ulong_t __msg_cbytes; /* current number of bytes on queue */
msgqnum_t msg_qnum; /* number of messages currently on queue */
msglen_t msg_qbytes; /* max number of bytes allowed on queue */
__pid_t msg_lspid; /* pid of last msgsnd() */
__pid_t msg_lrpid; /* pid of last msgrcv() */
__syscall_ulong_t __glibc_reserved4;
__syscall_ulong_t __glibc_reserved5;
};

消息队列的创建与打开

消息队列具有一个唯一的键值,或称引用标识符、消息队列的ID号,通过使用ftok()函数获取,函数原型:

1
2
3
#include <sys/types.h>
#include <sys/ipc.h>
key_t ftok(char *pathname, char proj);

获取成功返回消息队列的键值,失败返回-1。

参数pathname为一任意存在的路径名,参数proj为1~255之间的任一数字,ftok根据路径名,提取文件信息,再根据这些文件信息及proj的值合成key。(注:此段参考:ftok()函数深度解析)。

ftok()函数并不直接对消息队列操作,生成的键值用于msgget()函数使用,该函数用于创建或打开一个消息队列,其函数原型如下:

1
2
3
4
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgget(key_t key, int msgflg);

运行成功则返回消息队列的引用标识符(ID),失败则返回-1。

参数keyftok()产生的键值,参数msgflg是一些标志位,可以取IPC_CREATIPC_EXCLIPC_NOWAIT或三者的逻辑或结果。

在以下两种情况下,msgget()将创建一个新的消息队列:

  • 如果没有消息队列与键值key相对应,且msgflg中包含了IPC_CREAT标志位
  • key参数为IPC_PRIVATE

消息队列的读写

消息队列传递的消息由两部分组成,包括消息类型所传的数据,用结构体struct msgbuf表示:

1
2
3
4
5
struct msgbuf
{
long msgtype;
char msgtext[1024];
};

msgtype成员代表消息类型,msgtext成员为消息内容,长度不一定是1024。对于发送端,首先预置一个这样的msgbuf缓冲区并写入消息类型和内容,然后调用相应的发送函数;对于接收端,首先分配一个msgbuf缓冲区,然后把消息读入缓冲区即可。

发送数据(写)

向消息队列发送数据使用msgsnd()函数,发送的一个消息数据会被添加到队列的末尾,函数原型如下:

1
2
3
4
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgsnd(int msqid, const void *prt, size_t nbytes, int flags);

运行成功返回0,失败返回-1。参数msqid为消息队列的引用标识符(ID),参数prt为void型指针,指向要发送到的消息,参数nbytes为发送的消息的字节长度,参数flag用于指定消息队列满时的处理方法。

对发送消息来说,有意义的flags标志为IPC_NOWAIT,在消息队列没有足够的空间容纳要发送的数据时,设置了该标志,则msgsnd()函数立刻出错返回,否则发送消息的进程被阻塞,直至消息队列有空间或队列被删除时返回。

接收数据(读)

从消息队列接收数据使用msgrcv()函数,函数原型如下:

1
2
3
4
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgrcv(int msqid, const void *prt, size_t nbytes, long type, int flags);

运行成功返回0,失败返回-1。

参数含义与msgsnd()函数类似,参数flag用于指定消息队列满时的处理方法,取值有3种以及3种的或结果,参数type表示接收的数据类型。

flags取值 含义
IPC_NOWAIT 如果没有满足条件的消息,调用立即返回,此时errno=ENOMSG
IPC_EXCEPT 与type>0配合使用,返回队列中第一个类型不为type的消息
MSG_NOERROR 若满足条件的消息内容大于请求的nbytes,则截断该消息,截断部分丢失
type取值 含义
type=0 接收消息队列中的第一条消息
type>0 接收消息队列中类型值等于type的第一条消息
type<0 接收消息队列中类型值小于type的绝对值的所有消息中类型值最小的那一条消息

消息队列属性设置

消息队列的信息基本都保存在消息队列头中,可分配一个类似于消息队列头的结构struct msqid_ds来返回消息队列的属性,同样可以设置该数据结构。属性设置使用msgctl()函数,函数原型如下:

1
2
3
4
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
int msgctl(int msqid, int cmd, struct msqid_ds *buf);

运行成功返回0,失败返回-1。

函数msgctl()将对参数msqid标识的消息队列执行参数cmd所指的命令,包括3种命令:

  • IPC_STAT:用于获取消息队列信息,返回的信息存贮在参数buf
  • IPC_SET:用于设置消息队列的属性,要设置的属性存储在参数buf
  • PC_RMID:删除msqid标识的消息队列

编程示例

消息队列编程步骤:

  • 使用ftok()生成key
  • 使用msgget()创建/获取消息队列,返回值为队列标识符
  • 发送消息msgsnd()/接收消息msgrcv()
  • 消息队列属性与删除msgctl()

示例1

简单使用。

发送端,msg1_snd.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int main()
{
// generate key
key_t key = ftok(".", 100);
if(key == -1)
{
perror("ftoke failed");
exit(1);
}
printf("key = %#x\n", key);

// create message queue
int msgid = msgget(key, 0666|IPC_CREAT|IPC_EXCL);
if(msgid == -1)
{
perror("msgget failed");
exit(2);
}

// send data
msgsnd(msgid, "hello world!\n", 14, 0);

printf("use Enter to destory the message queue!\n");
getchar();
// detele message queue
if(msgctl(msgid, IPC_RMID, NULL) == -1)
{
perror("msgctl failed");
exit(3);
}

return 0;
}

接收端,msg1_rcv.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>

int main()
{
// generate key
key_t key = ftok(".", 100);
if(key == -1)
{
perror("ftoke failed");
exit(1);
}
printf("key = %#x\n", key);

// get message queue
int msgid = msgget(key, 0);
if(msgid == -1)
{
perror("msgget failed");
exit(2);
}

// read from the message queue
char buf[100] = {};
msgrcv(msgid, buf, 100, 0, 0);
printf("read from message queue:%s\n", buf);

return 0;
}

在一个shell中运行消息队列发送程序:

1
2
3
4
$ ./msg1_snd
key = 0x641102ed
use Enter to destory the message queue!

在另一个shell中运行消息队列接收程序:

1
2
3
4
$ ./msg1_rcv
key = 0x641102ed
read from message queue:hello world!

示例2

发送带消息类型的数据。

发送端,msg2_send.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

struct _msg
{
long mtype;
char buf[256];
}msg1,msg2;

int main()
{
// generate key
key_t key = ftok(".", 100);
if(key == -1)
{
perror("ftoke failed");
exit(1);
}
printf("key = %#x\n", key);

// create message queue
int msgid = msgget(key, 0666|IPC_CREAT);
if(msgid == -1)
{
perror("msgget failed");
exit(2);
}

//send data
msg1.mtype = 2;
strcpy(msg1.buf, "hello2");
msgsnd(msgid, &msg1, sizeof(msg1.buf), 0);

msg2.mtype = 1;
strcpy(msg2.buf, "hello1");
msgsnd(msgid, &msg2, sizeof(msg2.buf), 0);

printf("use Enter to destory the message queue!\n");
getchar();
// destroy the messsage queue
if(msgctl(msgid, IPC_RMID, NULL) == -1)
{
perror("msgctl failed");
exit(3);
}

return 0;
}

接收端,msg2_rcv.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <string.h>

struct _msg
{
long mtype;
char buf[256];
}msg1,msg2;

int main()
{
// generate key
key_t key = ftok(".", 100);
if(key == -1)
{
perror("ftoke failed");
exit(1);
}
printf("key = %#x\n", key);

// get message queue
int msgid = msgget(key, 0);
if(msgid == -1)
{
perror("msgget failed");
exit(2);
}

// read from the message queue
int res = msgrcv(msgid, &msg1, sizeof(msg1)-4, 0, 0);
while(res != 1)
{
printf("Message:%s, Type:%ld\n", msg1.buf, msg1.mtype);
res = msgrcv(msgid, &msg1, sizeof(msg1)-4, 0, 0);
}

return 0;
}

在一个shell中运行消息队列发送程序:

1
2
3
4
$ ./msg2_snd
key = 0x651102ed
use Enter to destory the message queue!

在另一个shell中运行消息队列接收程序:

1
2
3
4
5
$ ./msg2_rcv
key = 0x651102ed
Message:hello2, Type:2
Message:hello1, Type:1

示例3

消息队列的综合编程使用举例,msg_app.c:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>
#include <time.h>
#include <stdio.h>

void msg_stat(int, struct msqid_ds);

int main(void)
{
int gflags, sflags, rflags;
key_t key;
int msgid;
int reval;

struct msgsbuf
{
int mtype;
char mtext[10];
}msg_sbuf;// send

struct msgmbuf
{
int mtype;
char mtext[10];
}msg_rbuf;// receivr

struct msqid_ds msg_ginfo, msg_sinfo;

// create key
key = ftok(".", 30);
if(key == -1)
{
printf("ftok failed!");
return -1;
}
printf("key = %#x\n", key);

// create message queue
gflags = IPC_CREAT|IPC_EXCL;
msgid = msgget(key, 0666|gflags);
if(msgid == -1)
{
printf("msg create error\n");
return -1;
}
else
printf("msg create ok\n");

// after create the message queue, show it's property, use msg_stat 1st
printf("\n msg_stat1:");
msg_stat(msgid, msg_ginfo);

// send message
sflags = IPC_NOWAIT;
msg_sbuf.mtype = 8;
msg_sbuf.mtext[0] = 'a';
msg_sbuf.mtext[1] = 'b';
msg_sbuf.mtext[2] = 'c';
reval = msgsnd(msgid, &msg_sbuf, sizeof(msg_sbuf.mtext), sflags);
if(reval == -1)
printf("message send error\n");
else
printf("message send ok\n");

// after send a message, shoe it's property, use msg_stat 2st
printf("\n msg_stat2:");
msg_stat(msgid, msg_ginfo);

// receive message
rflags = IPC_NOWAIT|MSG_NOERROR;
reval = msgrcv(msgid, &msg_rbuf, 3, 0, rflags);
if(reval == -1)
printf("msg read error\n");// ===
else
{
printf("read from msg queue %d bytes\n", reval);
printf("type:%d, message:%s\n", msg_rbuf.mtype, msg_rbuf.mtext);
}

// use msg_stat 3st
printf("\n msg_stat3:");
msg_stat(msgid, msg_ginfo);

// change message property
msg_sinfo = msg_ginfo;
msg_sinfo.msg_perm.uid = 8; // user ID
msg_sinfo.msg_perm.gid = 8; // group ID
msg_sinfo.msg_qbytes = 16388; // queue bytes 16384->16388
reval = msgctl(msgid, IPC_SET, &msg_sinfo);
if(reval == -1)
{
printf("msg set info error\n");// ===
}

// use msg_stat 4st
printf("\n msg_stat4:");
msg_stat(msgid, msg_ginfo);

// delete the message queue
reval = msgctl(msgid, IPC_RMID, NULL);
if(reval == -1)
{
printf("unlink msg queue error\n");
}

return 0;
}

void msg_stat(int msgid, struct msqid_ds msg_info)
{
int reval;
sleep(1);
// get message property
reval = msgctl(msgid, IPC_STAT, &msg_info);
if(reval == -1)
{
printf("get msg info error\n");
return;
}

printf("\n");
printf("current number of bytes on queue is %ld\n", msg_info.msg_cbytes);
printf("number of message in queue is %ld\n", msg_info.msg_qnum);
printf("max number of bytes on queue is %ld\n", msg_info.msg_qbytes);
//
printf("pid of last msgsnd is %d\n", msg_info.msg_lspid); // last send opera te process's ID
printf("pid of last msgrcv is %d\n", msg_info.msg_lrpid); // last receive op erate process's ID
printf("last msgsnd time is %s", ctime(&(msg_info.msg_stime))); // last send time
printf("last msgrcv time is %s", ctime(&(msg_info.msg_rtime))); // last rece ive time
printf("last change time is %s", ctime(&(msg_info.msg_ctime))); // last chan ge time
printf("msg uid is %d\n", msg_info.msg_perm.uid); // message queue user ID
printf("msg gid is %d\n", msg_info.msg_perm.gid); // message queue group ID
}

编译后执行,需要sudo权限,否则change message property 步骤会失败:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
$ sudo ./msg_app
[sudo] password for deeplearning:
key = 0x1e11060e
msg create ok

msg_stat1:
current number of bytes on queue is 0
number of message in queue is 0
max number of bytes on queue is 16384
pid of last msgsnd is 0
pid of last msgrcv is 0
last msgsnd time is Thu Jan 1 08:00:00 1970
last msgrcv time is Thu Jan 1 08:00:00 1970
last change time is Wed Nov 27 10:40:41 2019
msg uid is 0
msg gid is 0

message send ok

msg_stat2:
current number of bytes on queue is 10
number of message in queue is 1
max number of bytes on queue is 16384
pid of last msgsnd is 3912
pid of last msgrcv is 0
last msgsnd time is Wed Nov 27 10:40:42 2019
last msgrcv time is Thu Jan 1 08:00:00 1970
last change time is Wed Nov 27 10:40:41 2019
msg uid is 0
msg gid is 0

read from msg queue 3 bytes
type:8, message:abc

msg_stat3:
current number of bytes on queue is 0
number of message in queue is 0
max number of bytes on queue is 16384
pid of last msgsnd is 3912
pid of last msgrcv is 3912
last msgsnd time is Wed Nov 27 10:40:42 2019
last msgrcv time is Wed Nov 27 10:40:43 2019
last change time is Wed Nov 27 10:40:41 2019
msg uid is 0
msg gid is 0

msg set ok

msg_stat4:
current number of bytes on queue is 0
number of message in queue is 0
max number of bytes on queue is 16388
pid of last msgsnd is 3912
pid of last msgrcv is 3912
last msgsnd time is Wed Nov 27 10:40:42 2019
last msgrcv time is Wed Nov 27 10:40:43 2019
last change time is Wed Nov 27 10:40:44 2019
msg uid is 8
msg gid is 8

unlink msg queue ok

程序首先生成key并创建消息队列,第1次输出消息队列的属性信息,接着发送数据,第2次输出属性信息,然后接收数据,第3次输出属性信息,再然后修改属性,第4次输出属性信息,最后删除消息队列。

注意观察结果中的时间变化以及各种ID数值的变化。

参考:

  1. 《精通Linux C编程》- 程国钢
  2. 《Linux C编程完全解密》- 闫敬 吴淑坤