1 /* Copyright 2002, Red Hat Inc. */
2
3 #include <mqueue.h>
4 #include <fcntl.h>
5 #include <unistd.h>
6 #include <errno.h>
7 #include <sys/stat.h>
8 #include <sys/mman.h>
9 #include <sys/ipc.h>
10 #include <sys/sem.h>
11 #include <string.h>
12 #include <stdlib.h>
13 #include <time.h>
14 #include <stdarg.h>
15 #include <machine/weakalias.h>
16 #define _LIBC 1
17 #include <sys/lock.h>
18 #undef _LIBC
19
20 #include "mqlocal.h"
21
22 #define NHASH 32 /* Num of hash lists, must be a power of 2 */
23 #define LOCHASH(i) ((i)&(NHASH-1))
24
25 static long mq_index; /* Index of next entry */
26 static struct libc_mq *mq_hash[NHASH]; /* Hash list heads for mqopen_infos */
27
28 __LOCK_INIT(static, mq_hash_lock);
29
30 mqd_t
mq_open(const char * name,int oflag,...)31 mq_open (const char *name, int oflag, ...)
32 {
33 MSG *wrbuf = NULL;
34 MSG *rdbuf = NULL;
35 int msgqid = -1;
36 int rc = -1;
37 int fd = -1;
38 int semid = -1;
39 int created = 0;
40 key_t key = (key_t)-1;
41 struct mq_attr *attr = (struct mq_attr *)MAP_FAILED;
42 struct sembuf sb = {0, 0, 0};
43 mode_t mode = 0;
44 int size;
45 int i, index, saved_errno;
46 char *real_name;
47 char *ptr;
48 struct mq_attr *user_attr = NULL;
49 struct libc_mq *info;
50 union semun arg;
51
52 /* ignore opening slash if present */
53 if (*name == '/')
54 ++name;
55 size = strlen(name);
56
57 if ((real_name = (char *)malloc (size + sizeof(MSGQ_PREFIX))) == NULL ||
58 (info = (struct libc_mq *)malloc (sizeof(struct libc_mq))) == NULL)
59 {
60 errno = ENOSPC;
61 if (real_name)
62 free (real_name);
63 return (mqd_t)-1;
64 }
65
66 /* use given name to create shared memory file name - we convert any
67 slashes to underscores so we don't have to create directories */
68 memcpy (real_name, MSGQ_PREFIX, sizeof(MSGQ_PREFIX) - 1);
69 memcpy (real_name + sizeof(MSGQ_PREFIX) - 1, name, size + 1);
70 ptr = real_name + sizeof(MSGQ_PREFIX) - 1;
71 for (i = 0; i < size; ++i)
72 {
73 if (*ptr == '/')
74 *ptr = '_';
75 ++ptr;
76 }
77
78 /* open shared memory file based on msg queue open flags and then use memory
79 file to create a unique key to use for semaphores, etc.. */
80 if (oflag & O_CREAT)
81 {
82 va_list list;
83 va_start (list, oflag);
84
85 saved_errno = errno;
86 mode = (mode_t)va_arg (list, int);
87 user_attr = va_arg(list,struct mq_attr *);
88 va_end (list);
89
90 /* attempt to open the shared memory file for exclusive create so we know
91 whether we are the owners or not */
92 fd = open (real_name, O_RDWR | O_CREAT | O_EXCL, mode);
93 if (fd < 0 && (oflag & O_EXCL))
94 {
95 /* we failed and the user wanted exclusive create */
96 free (real_name);
97 free (info);
98 return (mqd_t)-1;
99 }
100 errno = saved_errno;
101 /* check if we created the file or not */
102 if (fd >= 0)
103 created = 1;
104 }
105
106 if (fd < 0)
107 fd = open (real_name, O_RDWR, 0);
108
109 if (fd >= 0)
110 key = ftok(real_name, 255);
111
112 if (key != (key_t)-1)
113 /* memory map the shared memory file so we have a global shared data area to use */
114 attr = (struct mq_attr *)mmap (0, sizeof(struct mq_attr), PROT_READ | PROT_WRITE,
115 MAP_SHARED, fd, 0);
116
117 if (attr != (struct mq_attr *)MAP_FAILED)
118 {
119 /* we need semaphores to prevent multi-process race conditions on the
120 shared storage which contains a shared structure. The following
121 are the ones we need.
122
123 0 = open semaphore
124 1 = number of opens
125 2 = number of writes left until queue is full
126 3 = number of reads available in queue
127 4 = notify semaphore
128 5 = number of readers */
129 arg.val = 0;
130 /* make sure the creator of the shared memory file also is the creator of the
131 semaphores...this will ensure that it also creates the message queue */
132 if (created)
133 {
134 saved_errno = errno;
135 semid = semget (key, 6, IPC_CREAT | IPC_EXCL | mode);
136 errno = saved_errno;
137 /* now that we have created the semaphore, we should initialize it */
138 if (semid != -1)
139 semctl (semid, 0, SETVAL, arg);
140 }
141 else
142 {
143 /* if we didn't create the shared memory file but have gotten to here, we want
144 to ensure we haven't gotten ahead of the creator temporarily so we will
145 loop until the semaphore exists. This ensures that the creator will be the
146 one to create the message queue with the correct mode and we will be blocked
147 by the open semaphore 0. We impose a time limit to ensure something terrible
148 hasn't gone wrong. */
149 struct timespec tms;
150 int i;
151
152 tms.tv_sec = 0;
153 tms.tv_nsec = 10000; /* 10 microseconds */
154 for (i = 0; i < 100; ++i)
155 {
156 if ((semid = semget (key, 6, 0)) != -1)
157 break;
158 /* sleep in case we our a higher priority process */
159 nanosleep (&tms, NULL);
160 }
161 }
162 }
163
164 if (semid != -1)
165 {
166 /* acquire main open semaphore if we didn't create it */
167 if (!created)
168 {
169 sb.sem_op = -1;
170 rc = semop (semid, &sb, 1);
171 }
172 else
173 rc = 0; /* need this to continue below */
174 }
175
176 if (rc == 0)
177 {
178 if (created)
179 {
180 /* the creator must get here first so the message queue will be created */
181 msgqid = msgget (key, IPC_CREAT | mode);
182 if (msgqid >= 0)
183 {
184 /* we have created the message queue so check and set the attributes */
185 if ((wrbuf = (MSG *)malloc (user_attr->mq_msgsize + sizeof(int))) == NULL ||
186 (rdbuf = (MSG *)malloc (user_attr->mq_msgsize + sizeof(int))) == NULL ||
187 user_attr == NULL || user_attr->mq_msgsize <= 0 || user_attr->mq_maxmsg <= 0)
188 {
189 /* we're out of space and we created the message queue so we should
190 try to remove it */
191 msgctl (msgqid, IPC_RMID, NULL);
192 msgqid = -1; /* allow clean up to occur below */
193 if (wrbuf && rdbuf)
194 errno = EINVAL;
195 else
196 errno = ENOSPC;
197 }
198 else /* valid attributes */
199 {
200 write (fd, user_attr, sizeof(struct mq_attr));
201 attr->mq_curmsgs = 0;
202 attr->mq_flags = oflag & O_NONBLOCK;
203 arg.val = 0;
204 semctl (semid, 1, SETVAL, arg); /* number of opens starts at 0 */
205 semctl (semid, 3, SETVAL, arg); /* number of reads available starts at 0 */
206 semctl (semid, 5, SETVAL, arg); /* number of readers starts at 0 */
207 arg.val = 1;
208 semctl (semid, 4, SETVAL, arg); /* notify semaphore */
209 arg.val = user_attr->mq_maxmsg;
210 semctl (semid, 2, SETVAL, arg); /* number of writes left starts at mq_maxmsg */
211 }
212 }
213 }
214 else /* just open it */
215 {
216 msgqid = msgget (key, 0);
217 wrbuf = (MSG *)malloc (attr->mq_msgsize + sizeof(int));
218 rdbuf = (MSG *)malloc (attr->mq_msgsize + sizeof(int));
219 }
220
221 /* release semaphore acquired earlier */
222 sb.sem_op = 1;
223 semop (semid, &sb, 1);
224 }
225
226 /* if we get here and we haven't got a message queue id, then we need to clean up
227 our mess and return failure */
228 if (msgqid < 0)
229 {
230 if (fd >= 0)
231 close (fd);
232 if (attr != (struct mq_attr *)MAP_FAILED)
233 munmap (attr, sizeof(struct mq_attr));
234 if (created)
235 {
236 unlink (real_name);
237 if (semid != -1)
238 semctl (semid, 0, IPC_RMID);
239 }
240 free (real_name);
241 free (info);
242 if (wrbuf)
243 free (wrbuf);
244 if (rdbuf)
245 free (rdbuf);
246 return (mqd_t)-1;
247 }
248
249 /* we are successful so register the message queue */
250
251 /* up the count of msg queue opens */
252 sb.sem_op = 1;
253 sb.sem_num = 1;
254 semop (semid, &sb, 1);
255
256 /* success, translate into index into mq_info array */
257 __lock_acquire(mq_hash_lock);
258 index = mq_index++;
259 info->index = index;
260 info->msgqid = msgqid;
261 info->name = real_name;
262 info->semid = semid;
263 info->fd = fd;
264 info->oflag = oflag;
265 info->wrbuf = wrbuf;
266 info->rdbuf = rdbuf;
267 info->cleanup_notify = NULL;
268 info->next = mq_hash[LOCHASH(index)];
269 info->attr = attr;
270 mq_hash[LOCHASH(index)] = info;
271 __lock_release(mq_hash_lock);
272
273 return (mqd_t)index;
274 }
275
276 struct libc_mq *
__find_mq(mqd_t mq)277 __find_mq (mqd_t mq)
278 {
279 struct libc_mq *ptr;
280
281 __lock_acquire(mq_hash_lock);
282
283 ptr = mq_hash[LOCHASH((int)mq)];
284
285 while (ptr)
286 {
287 if (ptr->index == (int)mq)
288 break;
289 ptr = ptr->next;
290 }
291
292 __lock_release(mq_hash_lock);
293
294 return ptr;
295 }
296
297 void
__cleanup_mq(mqd_t mq)298 __cleanup_mq (mqd_t mq)
299 {
300 struct libc_mq *ptr;
301 struct libc_mq *prev;
302 int semid;
303 struct sembuf sb = {0, 0, 0};
304
305 __lock_acquire(mq_hash_lock);
306
307 ptr = mq_hash[LOCHASH((int)mq)];
308 prev = NULL;
309
310 while (ptr)
311 {
312 if (ptr->index == (int)mq)
313 break;
314 prev = ptr;
315 ptr = ptr->next;
316 }
317
318 if (ptr != NULL)
319 {
320 if (ptr->cleanup_notify != NULL)
321 ptr->cleanup_notify (ptr);
322 if (prev != NULL)
323 prev->next = ptr->next;
324 else
325 mq_hash[LOCHASH((int)mq)] = NULL;
326 munmap (ptr->attr, sizeof(struct mq_attr));
327 close (ptr->fd);
328 free (ptr->name);
329 free (ptr->wrbuf);
330 free (ptr->rdbuf);
331 semid = ptr->semid;
332 free (ptr);
333 /* lower the count of msg queue opens */
334 sb.sem_op = -1;
335 sb.sem_num = 1;
336 sb.sem_flg = IPC_NOWAIT;
337 semop (semid, &sb, 1);
338 }
339
340 __lock_release(mq_hash_lock);
341 }
342
343
344
345
346
347