1 /* $NetBSD: sys_mqueue.c,v 1.16 2009/04/11 23:05:26 christos Exp $ */ 2 3 /* 4 * Copyright (c) 2007, 2008 Mindaugas Rasiukevicius <rmind at NetBSD org> 5 * All rights reserved. 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 1. Redistributions of source code must retain the above copyright 11 * notice, this list of conditions and the following disclaimer. 12 * 2. Redistributions in binary form must reproduce the above copyright 13 * notice, this list of conditions and the following disclaimer in the 14 * documentation and/or other materials provided with the distribution. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND 17 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 18 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 19 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE 20 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 21 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 22 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 23 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 24 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 25 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 26 * SUCH DAMAGE. 27 */ 28 29 /* 30 * Implementation of POSIX message queues. 31 * Defined in the Base Definitions volume of IEEE Std 1003.1-2001. 32 * 33 * Locking 34 * 35 * Global list of message queues (mqueue_head) and proc_t::p_mqueue_cnt 36 * counter are protected by mqlist_mtx lock. The very message queue and 37 * its members are protected by mqueue::mq_mtx. 38 * 39 * Lock order: 40 * mqlist_mtx 41 * -> mqueue::mq_mtx 42 */ 43 44 #include <stdbool.h> 45 #include <sys/param.h> 46 #include <sys/types.h> 47 #include <sys/errno.h> 48 #include <sys/fcntl.h> 49 #include <sys/file.h> 50 #include <sys/filedesc.h> 51 #include <sys/ucred.h> 52 #include <sys/priv.h> 53 #include <sys/kernel.h> 54 #include <sys/malloc.h> 55 #include <sys/mplock2.h> 56 #include <sys/mqueue.h> 57 #include <sys/objcache.h> 58 #include <sys/proc.h> 59 #include <sys/queue.h> 60 #include <sys/event.h> 61 #include <sys/serialize.h> 62 #include <sys/signal.h> 63 #include <sys/signalvar.h> 64 #include <sys/spinlock.h> 65 #include <sys/spinlock2.h> 66 #include <sys/stat.h> 67 #include <sys/sysctl.h> 68 #include <sys/sysproto.h> 69 #include <sys/systm.h> 70 #include <sys/lock.h> 71 #include <sys/unistd.h> 72 #include <sys/vnode.h> 73 74 /* System-wide limits. */ 75 static u_int mq_open_max = MQ_OPEN_MAX; 76 static u_int mq_prio_max = MQ_PRIO_MAX; 77 static u_int mq_max_msgsize = 16 * MQ_DEF_MSGSIZE; 78 static u_int mq_def_maxmsg = 32; 79 static u_int mq_max_maxmsg = 16 * 32; 80 81 struct lock mqlist_mtx; 82 static struct objcache * mqmsg_cache; 83 static LIST_HEAD(, mqueue) mqueue_head = 84 LIST_HEAD_INITIALIZER(mqueue_head); 85 86 typedef struct file file_t; /* XXX: Should we put this in sys/types.h ? */ 87 88 /* Function prototypes */ 89 static int mq_stat_fop(file_t *, struct stat *, struct ucred *cred); 90 static int mq_close_fop(file_t *); 91 static int mq_kqfilter_fop(struct file *fp, struct knote *kn); 92 static void mqfilter_read_detach(struct knote *kn); 93 static void mqfilter_write_detach(struct knote *kn); 94 static int mqfilter_read(struct knote *kn, long hint); 95 static int mqfilter_write(struct knote *kn, long hint); 96 97 /* Some time-related utility functions */ 98 static int itimespecfix(struct timespec *ts); 99 static int tstohz(const struct timespec *ts); 100 101 /* File operations vector */ 102 static struct fileops mqops = { 103 .fo_read = badfo_readwrite, 104 .fo_write = badfo_readwrite, 105 .fo_ioctl = badfo_ioctl, 106 .fo_stat = mq_stat_fop, 107 .fo_close = mq_close_fop, 108 .fo_kqfilter = mq_kqfilter_fop, 109 .fo_shutdown = badfo_shutdown 110 }; 111 112 /* Define a new malloc type for message queues */ 113 MALLOC_DECLARE(M_MQBUF); 114 MALLOC_DEFINE(M_MQBUF, "mqueues", "Buffers to message queues"); 115 116 /* Malloc arguments for object cache */ 117 struct objcache_malloc_args mqueue_malloc_args = { 118 sizeof(struct mqueue), M_MQBUF }; 119 120 /* 121 * Initialize POSIX message queue subsystem. 122 */ 123 void 124 mqueue_sysinit(void) 125 { 126 mqmsg_cache = objcache_create("mqmsg_cache", 127 0, /* infinite depot's capacity */ 128 0, /* default magazine's capacity */ 129 NULL, /* constructor */ 130 NULL, /* deconstructor */ 131 NULL, 132 objcache_malloc_alloc, 133 objcache_malloc_free, 134 &mqueue_malloc_args); 135 136 lockinit(&mqlist_mtx, "mqlist_mtx", 0, LK_CANRECURSE); 137 } 138 139 /* 140 * Free the message. 141 */ 142 static void 143 mqueue_freemsg(struct mq_msg *msg, const size_t size) 144 { 145 146 if (size > MQ_DEF_MSGSIZE) { 147 kfree(msg, M_MQBUF); 148 } else { 149 objcache_put(mqmsg_cache, msg); 150 } 151 } 152 153 /* 154 * Destroy the message queue. 155 */ 156 static void 157 mqueue_destroy(struct mqueue *mq) 158 { 159 struct mq_msg *msg; 160 size_t msz; 161 u_int i; 162 163 /* Note MQ_PQSIZE + 1. */ 164 for (i = 0; i < MQ_PQSIZE + 1; i++) { 165 while ((msg = TAILQ_FIRST(&mq->mq_head[i])) != NULL) { 166 TAILQ_REMOVE(&mq->mq_head[i], msg, msg_queue); 167 msz = sizeof(struct mq_msg) + msg->msg_len; 168 mqueue_freemsg(msg, msz); 169 } 170 } 171 lockuninit(&mq->mq_mtx); 172 kfree(mq, M_MQBUF); 173 } 174 175 /* 176 * Lookup for file name in general list of message queues. 177 * => locks the message queue 178 */ 179 static void * 180 mqueue_lookup(char *name) 181 { 182 struct mqueue *mq; 183 184 KKASSERT(lockstatus(&mqlist_mtx, curthread)); 185 186 LIST_FOREACH(mq, &mqueue_head, mq_list) { 187 if (strncmp(mq->mq_name, name, MQ_NAMELEN) == 0) { 188 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 189 return mq; 190 } 191 } 192 193 return NULL; 194 } 195 196 /* 197 * mqueue_get: get the mqueue from the descriptor. 198 * => locks the message queue, if found. 199 * => holds a reference on the file descriptor. 200 */ 201 static int 202 mqueue_get(struct lwp *l, mqd_t mqd, file_t **fpr) 203 { 204 struct mqueue *mq; 205 file_t *fp; 206 207 fp = holdfp(curproc->p_fd, (int)mqd, -1); /* XXX: Why -1 ? */ 208 if (__predict_false(fp == NULL)) 209 return EBADF; 210 211 if (__predict_false(fp->f_type != DTYPE_MQUEUE)) { 212 fdrop(fp); 213 return EBADF; 214 } 215 mq = fp->f_data; 216 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 217 218 *fpr = fp; 219 return 0; 220 } 221 222 /* 223 * mqueue_linear_insert: perform linear insert according to the message 224 * priority into the reserved queue (MQ_PQRESQ). Reserved queue is a 225 * sorted list used only when mq_prio_max is increased via sysctl. 226 */ 227 static inline void 228 mqueue_linear_insert(struct mqueue *mq, struct mq_msg *msg) 229 { 230 struct mq_msg *mit; 231 232 TAILQ_FOREACH(mit, &mq->mq_head[MQ_PQRESQ], msg_queue) { 233 if (msg->msg_prio > mit->msg_prio) 234 break; 235 } 236 if (mit == NULL) { 237 TAILQ_INSERT_TAIL(&mq->mq_head[MQ_PQRESQ], msg, msg_queue); 238 } else { 239 TAILQ_INSERT_BEFORE(mit, msg, msg_queue); 240 } 241 } 242 243 /* 244 * Validate input. 245 */ 246 int 247 itimespecfix(struct timespec *ts) 248 { 249 if (ts->tv_sec < 0 || ts->tv_nsec < 0 || ts->tv_nsec >= 1000000000) 250 return (EINVAL); 251 if (ts->tv_sec == 0 && ts->tv_nsec != 0 && ts->tv_nsec < nstick) 252 ts->tv_nsec = nstick; 253 return (0); 254 } 255 256 /* 257 * Compute number of ticks in the specified amount of time. 258 */ 259 int 260 tstohz(const struct timespec *ts) 261 { 262 struct timeval tv; 263 264 /* 265 * usec has great enough resolution for hz, so convert to a 266 * timeval and use tvtohz() above. 267 */ 268 TIMESPEC_TO_TIMEVAL(&tv, ts); 269 return tvtohz_high(&tv); /* XXX Why _high() and not _low() ? */ 270 } 271 272 /* 273 * Converter from struct timespec to the ticks. 274 * Used by mq_timedreceive(), mq_timedsend(). 275 */ 276 int 277 abstimeout2timo(struct timespec *ts, int *timo) 278 { 279 struct timespec tsd; 280 int error; 281 282 error = itimespecfix(ts); 283 if (error) { 284 return error; 285 } 286 getnanotime(&tsd); 287 timespecsub(ts, &tsd); 288 if (ts->tv_sec < 0 || (ts->tv_sec == 0 && ts->tv_nsec <= 0)) { 289 return ETIMEDOUT; 290 } 291 *timo = tstohz(ts); 292 KKASSERT(*timo != 0); 293 294 return 0; 295 } 296 297 static int 298 mq_stat_fop(file_t *fp, struct stat *st, struct ucred *cred) 299 { 300 struct mqueue *mq = fp->f_data; 301 302 (void)memset(st, 0, sizeof(*st)); 303 304 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 305 st->st_mode = mq->mq_mode; 306 st->st_uid = mq->mq_euid; 307 st->st_gid = mq->mq_egid; 308 st->st_atimespec = mq->mq_atime; 309 st->st_mtimespec = mq->mq_mtime; 310 /*st->st_ctimespec = st->st_birthtimespec = mq->mq_btime;*/ 311 st->st_uid = fp->f_cred->cr_uid; 312 st->st_gid = fp->f_cred->cr_svgid; 313 lockmgr(&mq->mq_mtx, LK_RELEASE); 314 315 return 0; 316 } 317 318 static struct filterops mqfiltops_read = 319 { FILTEROP_ISFD, NULL, mqfilter_read_detach, mqfilter_read }; 320 static struct filterops mqfiltops_write = 321 { FILTEROP_ISFD, NULL, mqfilter_write_detach, mqfilter_write }; 322 323 static int 324 mq_kqfilter_fop(struct file *fp, struct knote *kn) 325 { 326 struct mqueue *mq = fp->f_data; 327 struct klist *klist; 328 329 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 330 331 switch (kn->kn_filter) { 332 case EVFILT_READ: 333 kn->kn_fop = &mqfiltops_read; 334 kn->kn_hook = (caddr_t)mq; 335 klist = &mq->mq_rkq.ki_note; 336 break; 337 case EVFILT_WRITE: 338 kn->kn_fop = &mqfiltops_write; 339 kn->kn_hook = (caddr_t)mq; 340 klist = &mq->mq_wkq.ki_note; 341 break; 342 default: 343 lockmgr(&mq->mq_mtx, LK_RELEASE); 344 return (EOPNOTSUPP); 345 } 346 347 knote_insert(klist, kn); 348 lockmgr(&mq->mq_mtx, LK_RELEASE); 349 350 return (0); 351 } 352 353 static void 354 mqfilter_read_detach(struct knote *kn) 355 { 356 struct mqueue *mq = (struct mqueue *)kn->kn_hook; 357 358 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 359 struct klist *klist = &mq->mq_rkq.ki_note; 360 knote_remove(klist, kn); 361 lockmgr(&mq->mq_mtx, LK_RELEASE); 362 } 363 364 static void 365 mqfilter_write_detach(struct knote *kn) 366 { 367 struct mqueue *mq = (struct mqueue *)kn->kn_hook; 368 369 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 370 struct klist *klist = &mq->mq_wkq.ki_note; 371 knote_remove(klist, kn); 372 lockmgr(&mq->mq_mtx, LK_RELEASE); 373 } 374 375 static int 376 mqfilter_read(struct knote *kn, long hint) 377 { 378 struct mqueue *mq = (struct mqueue *)kn->kn_hook; 379 struct mq_attr *mqattr; 380 int ready = 0; 381 382 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 383 mqattr = &mq->mq_attrib; 384 /* Ready for receiving, if there are messages in the queue */ 385 if (mqattr->mq_curmsgs) 386 ready = 1; 387 lockmgr(&mq->mq_mtx, LK_RELEASE); 388 389 return (ready); 390 } 391 392 static int 393 mqfilter_write(struct knote *kn, long hint) 394 { 395 struct mqueue *mq = (struct mqueue *)kn->kn_hook; 396 struct mq_attr *mqattr; 397 int ready = 0; 398 399 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 400 mqattr = &mq->mq_attrib; 401 /* Ready for sending, if the message queue is not full */ 402 if (mqattr->mq_curmsgs < mqattr->mq_maxmsg) 403 ready = 1; 404 lockmgr(&mq->mq_mtx, LK_RELEASE); 405 406 return (ready); 407 } 408 409 static int 410 mq_close_fop(file_t *fp) 411 { 412 struct proc *p = curproc; 413 struct mqueue *mq = fp->f_data; 414 bool destroy; 415 416 lockmgr(&mqlist_mtx, LK_EXCLUSIVE); 417 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 418 419 /* Decrease the counters */ 420 p->p_mqueue_cnt--; 421 mq->mq_refcnt--; 422 423 /* Remove notification if registered for this process */ 424 if (mq->mq_notify_proc == p) 425 mq->mq_notify_proc = NULL; 426 427 /* 428 * If this is the last reference and mqueue is marked for unlink, 429 * remove and later destroy the message queue. 430 */ 431 if (mq->mq_refcnt == 0 && (mq->mq_attrib.mq_flags & MQ_UNLINK)) { 432 LIST_REMOVE(mq, mq_list); 433 destroy = true; 434 } else 435 destroy = false; 436 437 lockmgr(&mq->mq_mtx, LK_RELEASE); 438 lockmgr(&mqlist_mtx, LK_RELEASE); 439 440 if (destroy) 441 mqueue_destroy(mq); 442 443 return 0; 444 } 445 446 /* 447 * General mqueue system calls. 448 */ 449 450 int 451 sys_mq_open(struct mq_open_args *uap) 452 { 453 /* { 454 syscallarg(const char *) name; 455 syscallarg(int) oflag; 456 syscallarg(mode_t) mode; 457 syscallarg(struct mq_attr) attr; 458 } */ 459 struct thread *td = curthread; 460 struct proc *p = td->td_proc; 461 struct filedesc *fdp = p->p_fd; 462 struct mqueue *mq, *mq_new = NULL; 463 file_t *fp; 464 char *name; 465 int mqd, error, oflag; 466 467 /* Check access mode flags */ 468 oflag = SCARG(uap, oflag); 469 if ((oflag & O_ACCMODE) == (O_WRONLY | O_RDWR)) { 470 return EINVAL; 471 } 472 473 /* Get the name from the user-space */ 474 name = kmalloc(MQ_NAMELEN, M_MQBUF, M_WAITOK | M_ZERO); 475 error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL); 476 if (error) { 477 kfree(name, M_MQBUF); 478 return error; 479 } 480 481 if (oflag & O_CREAT) { 482 struct mq_attr attr; 483 u_int i; 484 485 /* Check the limit */ 486 if (p->p_mqueue_cnt == mq_open_max) { 487 kfree(name, M_MQBUF); 488 return EMFILE; 489 } 490 491 /* Empty name is invalid */ 492 if (name[0] == '\0') { 493 kfree(name, M_MQBUF); 494 return EINVAL; 495 } 496 497 /* Check for mqueue attributes */ 498 if (SCARG(uap, attr)) { 499 error = copyin(SCARG(uap, attr), &attr, 500 sizeof(struct mq_attr)); 501 if (error) { 502 kfree(name, M_MQBUF); 503 return error; 504 } 505 if (attr.mq_maxmsg <= 0 || 506 attr.mq_maxmsg > mq_max_maxmsg || 507 attr.mq_msgsize <= 0 || 508 attr.mq_msgsize > mq_max_msgsize) { 509 kfree(name, M_MQBUF); 510 return EINVAL; 511 } 512 attr.mq_curmsgs = 0; 513 } else { 514 memset(&attr, 0, sizeof(struct mq_attr)); 515 attr.mq_maxmsg = mq_def_maxmsg; 516 attr.mq_msgsize = 517 MQ_DEF_MSGSIZE - sizeof(struct mq_msg); 518 } 519 520 /* 521 * Allocate new mqueue, initialize data structures, 522 * copy the name, attributes and set the flag. 523 */ 524 mq_new = kmalloc(sizeof(struct mqueue), M_MQBUF, M_WAITOK | M_ZERO); 525 526 lockinit(&mq_new->mq_mtx, "mq_new->mq_mtx", 0, LK_CANRECURSE); 527 for (i = 0; i < (MQ_PQSIZE + 1); i++) { 528 TAILQ_INIT(&mq_new->mq_head[i]); 529 } 530 531 strlcpy(mq_new->mq_name, name, MQ_NAMELEN); 532 memcpy(&mq_new->mq_attrib, &attr, sizeof(struct mq_attr)); 533 534 /*CTASSERT((O_MASK & (MQ_UNLINK | MQ_RECEIVE)) == 0);*/ 535 /* mq_new->mq_attrib.mq_flags = (O_MASK & oflag); */ 536 mq_new->mq_attrib.mq_flags = oflag; 537 538 /* Store mode and effective UID with GID */ 539 mq_new->mq_mode = ((SCARG(uap, mode) & 540 ~p->p_fd->fd_cmask) & ALLPERMS) & ~S_ISTXT; 541 mq_new->mq_euid = td->td_ucred->cr_uid; 542 mq_new->mq_egid = td->td_ucred->cr_svgid; 543 } 544 545 /* Allocate file structure and descriptor */ 546 error = falloc(td->td_lwp, &fp, &mqd); 547 if (error) { 548 if (mq_new) 549 mqueue_destroy(mq_new); 550 kfree(name, M_MQBUF); 551 return error; 552 } 553 fp->f_type = DTYPE_MQUEUE; 554 fp->f_flag = FFLAGS(oflag) & (FREAD | FWRITE); 555 fp->f_ops = &mqops; 556 557 /* Look up for mqueue with such name */ 558 lockmgr(&mqlist_mtx, LK_EXCLUSIVE); 559 mq = mqueue_lookup(name); 560 if (mq) { 561 int acc_mode; 562 563 KKASSERT(lockstatus(&mq->mq_mtx, curthread)); 564 565 /* Check if mqueue is not marked as unlinking */ 566 if (mq->mq_attrib.mq_flags & MQ_UNLINK) { 567 error = EACCES; 568 goto exit; 569 } 570 /* Fail if O_EXCL is set, and mqueue already exists */ 571 if ((oflag & O_CREAT) && (oflag & O_EXCL)) { 572 error = EEXIST; 573 goto exit; 574 } 575 576 /* 577 * Check the permissions. Note the difference between 578 * VREAD/VWRITE and FREAD/FWRITE. 579 */ 580 acc_mode = 0; 581 if (fp->f_flag & FREAD) { 582 acc_mode |= VREAD; 583 } 584 if (fp->f_flag & FWRITE) { 585 acc_mode |= VWRITE; 586 } 587 if (vaccess(VNON, mq->mq_mode, mq->mq_euid, mq->mq_egid, 588 acc_mode, td->td_ucred)) { 589 590 error = EACCES; 591 goto exit; 592 } 593 } else { 594 /* Fail if mqueue neither exists, nor we create it */ 595 if ((oflag & O_CREAT) == 0) { 596 lockmgr(&mqlist_mtx, LK_RELEASE); 597 KKASSERT(mq_new == NULL); 598 fsetfd(fdp, NULL, mqd); 599 fp->f_ops = &badfileops; 600 fdrop(fp); 601 kfree(name, M_MQBUF); 602 return ENOENT; 603 } 604 605 /* Check the limit */ 606 if (p->p_mqueue_cnt == mq_open_max) { 607 error = EMFILE; 608 goto exit; 609 } 610 611 /* Insert the queue to the list */ 612 mq = mq_new; 613 lockmgr(&mq->mq_mtx, LK_EXCLUSIVE); 614 LIST_INSERT_HEAD(&mqueue_head, mq, mq_list); 615 mq_new = NULL; 616 getnanotime(&mq->mq_btime); 617 mq->mq_atime = mq->mq_mtime = mq->mq_btime; 618 } 619 620 /* Increase the counters, and make descriptor ready */ 621 p->p_mqueue_cnt++; 622 mq->mq_refcnt++; 623 fp->f_data = mq; 624 exit: 625 lockmgr(&mq->mq_mtx, LK_RELEASE); 626 lockmgr(&mqlist_mtx, LK_RELEASE); 627 628 if (mq_new) 629 mqueue_destroy(mq_new); 630 if (error) { 631 fsetfd(fdp, NULL, mqd); 632 fp->f_ops = &badfileops; 633 } else { 634 fsetfd(fdp, fp, mqd); 635 uap->sysmsg_result = mqd; 636 } 637 fdrop(fp); 638 kfree(name, M_MQBUF); 639 640 return error; 641 } 642 643 int 644 sys_mq_close(struct mq_close_args *uap) 645 { 646 return sys_close((void *)uap); 647 } 648 649 /* 650 * Primary mq_receive1() function. 651 */ 652 int 653 mq_receive1(struct lwp *l, mqd_t mqdes, void *msg_ptr, size_t msg_len, 654 unsigned *msg_prio, struct timespec *ts, ssize_t *mlen) 655 { 656 file_t *fp = NULL; 657 struct mqueue *mq; 658 struct mq_msg *msg = NULL; 659 struct mq_attr *mqattr; 660 u_int idx; 661 int error; 662 663 /* Get the message queue */ 664 error = mqueue_get(l, mqdes, &fp); 665 if (error) { 666 return error; 667 } 668 mq = fp->f_data; 669 if ((fp->f_flag & FREAD) == 0) { 670 error = EBADF; 671 goto error; 672 } 673 getnanotime(&mq->mq_atime); 674 mqattr = &mq->mq_attrib; 675 676 /* Check the message size limits */ 677 if (msg_len < mqattr->mq_msgsize) { 678 error = EMSGSIZE; 679 goto error; 680 } 681 682 /* Check if queue is empty */ 683 while (mqattr->mq_curmsgs == 0) { 684 int t; 685 686 if (mqattr->mq_flags & O_NONBLOCK) { 687 error = EAGAIN; 688 goto error; 689 } 690 if (ts) { 691 error = abstimeout2timo(ts, &t); 692 if (error) 693 goto error; 694 } else 695 t = 0; 696 /* 697 * Block until someone sends the message. 698 * While doing this, notification should not be sent. 699 */ 700 mqattr->mq_flags |= MQ_RECEIVE; 701 error = lksleep(&mq->mq_send_cv, &mq->mq_mtx, PCATCH, "mqsend", t); 702 mqattr->mq_flags &= ~MQ_RECEIVE; 703 if (error || (mqattr->mq_flags & MQ_UNLINK)) { 704 error = (error == EWOULDBLOCK) ? ETIMEDOUT : EINTR; 705 goto error; 706 } 707 } 708 709 710 /* 711 * Find the highest priority message, and remove it from the queue. 712 * At first, reserved queue is checked, bitmap is next. 713 */ 714 msg = TAILQ_FIRST(&mq->mq_head[MQ_PQRESQ]); 715 if (__predict_true(msg == NULL)) { 716 idx = ffs(mq->mq_bitmap); 717 msg = TAILQ_FIRST(&mq->mq_head[idx]); 718 KKASSERT(msg != NULL); 719 } else { 720 idx = MQ_PQRESQ; 721 } 722 TAILQ_REMOVE(&mq->mq_head[idx], msg, msg_queue); 723 724 /* Unmark the bit, if last message. */ 725 if (__predict_true(idx) && TAILQ_EMPTY(&mq->mq_head[idx])) { 726 KKASSERT((MQ_PQSIZE - idx) == msg->msg_prio); 727 mq->mq_bitmap &= ~(1 << --idx); 728 } 729 730 /* Decrement the counter and signal waiter, if any */ 731 mqattr->mq_curmsgs--; 732 wakeup_one(&mq->mq_recv_cv); 733 734 /* Ready for sending now */ 735 KNOTE(&mq->mq_wkq.ki_note, 0); 736 error: 737 lockmgr(&mq->mq_mtx, LK_RELEASE); 738 fdrop(fp); 739 if (error) 740 return error; 741 742 /* 743 * Copy the data to the user-space. 744 * Note: According to POSIX, no message should be removed from the 745 * queue in case of fail - this would be violated. 746 */ 747 *mlen = msg->msg_len; 748 error = copyout(msg->msg_ptr, msg_ptr, msg->msg_len); 749 if (error == 0 && msg_prio) 750 error = copyout(&msg->msg_prio, msg_prio, sizeof(unsigned)); 751 mqueue_freemsg(msg, sizeof(struct mq_msg) + msg->msg_len); 752 753 return error; 754 } 755 756 int 757 sys_mq_receive(struct mq_receive_args *uap) 758 { 759 /* { 760 syscallarg(mqd_t) mqdes; 761 syscallarg(char *) msg_ptr; 762 syscallarg(size_t) msg_len; 763 syscallarg(unsigned *) msg_prio; 764 } */ 765 ssize_t mlen; 766 int error; 767 768 error = mq_receive1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), 769 SCARG(uap, msg_len), SCARG(uap, msg_prio), 0, &mlen); 770 if (error == 0) 771 uap->sysmsg_result = mlen; 772 773 return error; 774 } 775 776 int 777 sys_mq_timedreceive(struct mq_timedreceive_args *uap) 778 { 779 /* { 780 syscallarg(mqd_t) mqdes; 781 syscallarg(char *) msg_ptr; 782 syscallarg(size_t) msg_len; 783 syscallarg(unsigned *) msg_prio; 784 syscallarg(const struct timespec *) abs_timeout; 785 } */ 786 int error; 787 ssize_t mlen; 788 struct timespec ts, *tsp; 789 790 /* Get and convert time value */ 791 if (SCARG(uap, abs_timeout)) { 792 error = copyin(SCARG(uap, abs_timeout), &ts, sizeof(ts)); 793 if (error) 794 return error; 795 tsp = &ts; 796 } else { 797 tsp = NULL; 798 } 799 800 error = mq_receive1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), 801 SCARG(uap, msg_len), SCARG(uap, msg_prio), tsp, &mlen); 802 if (error == 0) 803 uap->sysmsg_result = mlen; 804 805 return error; 806 } 807 808 /* 809 * Primary mq_send1() function. 810 */ 811 int 812 mq_send1(struct lwp *l, mqd_t mqdes, const char *msg_ptr, size_t msg_len, 813 unsigned msg_prio, struct timespec *ts) 814 { 815 file_t *fp = NULL; 816 struct mqueue *mq; 817 struct mq_msg *msg; 818 struct mq_attr *mqattr; 819 struct proc *notify = NULL; 820 /*ksiginfo_t ksi;*/ 821 size_t size; 822 int error; 823 824 /* Check the priority range */ 825 if (msg_prio >= mq_prio_max) 826 return EINVAL; 827 828 /* Allocate a new message */ 829 size = sizeof(struct mq_msg) + msg_len; 830 if (size > mq_max_msgsize) 831 return EMSGSIZE; 832 833 if (size > MQ_DEF_MSGSIZE) { 834 msg = kmalloc(size, M_MQBUF, M_WAITOK); 835 } else { 836 msg = objcache_get(mqmsg_cache, M_WAITOK); 837 } 838 839 /* Get the data from user-space */ 840 error = copyin(msg_ptr, msg->msg_ptr, msg_len); 841 if (error) { 842 mqueue_freemsg(msg, size); 843 return error; 844 } 845 msg->msg_len = msg_len; 846 msg->msg_prio = msg_prio; 847 848 /* Get the mqueue */ 849 error = mqueue_get(l, mqdes, &fp); 850 if (error) { 851 mqueue_freemsg(msg, size); 852 return error; 853 } 854 mq = fp->f_data; 855 if ((fp->f_flag & FWRITE) == 0) { 856 error = EBADF; 857 goto error; 858 } 859 getnanotime(&mq->mq_mtime); 860 mqattr = &mq->mq_attrib; 861 862 /* Check the message size limit */ 863 if (msg_len <= 0 || msg_len > mqattr->mq_msgsize) { 864 error = EMSGSIZE; 865 goto error; 866 } 867 868 /* Check if queue is full */ 869 while (mqattr->mq_curmsgs >= mqattr->mq_maxmsg) { 870 int t; 871 872 if (mqattr->mq_flags & O_NONBLOCK) { 873 error = EAGAIN; 874 goto error; 875 } 876 if (ts) { 877 error = abstimeout2timo(ts, &t); 878 if (error) 879 goto error; 880 } else 881 t = 0; 882 /* Block until queue becomes available */ 883 error = lksleep(&mq->mq_recv_cv, &mq->mq_mtx, PCATCH, "mqrecv", t); 884 if (error || (mqattr->mq_flags & MQ_UNLINK)) { 885 error = (error == EWOULDBLOCK) ? ETIMEDOUT : error; 886 goto error; 887 } 888 } 889 KKASSERT(mq->mq_attrib.mq_curmsgs < mq->mq_attrib.mq_maxmsg); 890 891 /* 892 * Insert message into the queue, according to the priority. 893 * Note the difference between index and priority. 894 */ 895 if (__predict_true(msg_prio < MQ_PQSIZE)) { 896 u_int idx = MQ_PQSIZE - msg_prio; 897 898 KKASSERT(idx != MQ_PQRESQ); 899 TAILQ_INSERT_TAIL(&mq->mq_head[idx], msg, msg_queue); 900 mq->mq_bitmap |= (1 << --idx); 901 } else { 902 mqueue_linear_insert(mq, msg); 903 } 904 905 /* Check for the notify */ 906 if (mqattr->mq_curmsgs == 0 && mq->mq_notify_proc && 907 (mqattr->mq_flags & MQ_RECEIVE) == 0 && 908 mq->mq_sig_notify.sigev_notify == SIGEV_SIGNAL) { 909 /* Initialize the signal */ 910 /*KSI_INIT(&ksi);*/ 911 /*ksi.ksi_signo = mq->mq_sig_notify.sigev_signo;*/ 912 /*ksi.ksi_code = SI_MESGQ;*/ 913 /*ksi.ksi_value = mq->mq_sig_notify.sigev_value;*/ 914 /* Unregister the process */ 915 notify = mq->mq_notify_proc; 916 mq->mq_notify_proc = NULL; 917 } 918 919 /* Increment the counter and signal waiter, if any */ 920 mqattr->mq_curmsgs++; 921 wakeup_one(&mq->mq_send_cv); 922 923 /* Ready for receiving now */ 924 KNOTE(&mq->mq_rkq.ki_note, 0); 925 error: 926 lockmgr(&mq->mq_mtx, LK_RELEASE); 927 fdrop(fp); 928 929 if (error) { 930 mqueue_freemsg(msg, size); 931 } else if (notify) { 932 /* Send the notify, if needed */ 933 lwkt_gettoken(&proc_token); 934 /*kpsignal(notify, &ksi, NULL);*/ 935 ksignal(notify, mq->mq_sig_notify.sigev_signo); 936 lwkt_reltoken(&proc_token); 937 } 938 939 return error; 940 } 941 942 int 943 sys_mq_send(struct mq_send_args *uap) 944 { 945 /* { 946 syscallarg(mqd_t) mqdes; 947 syscallarg(const char *) msg_ptr; 948 syscallarg(size_t) msg_len; 949 syscallarg(unsigned) msg_prio; 950 } */ 951 952 return mq_send1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), 953 SCARG(uap, msg_len), SCARG(uap, msg_prio), 0); 954 } 955 956 int 957 sys_mq_timedsend(struct mq_timedsend_args *uap) 958 { 959 /* { 960 syscallarg(mqd_t) mqdes; 961 syscallarg(const char *) msg_ptr; 962 syscallarg(size_t) msg_len; 963 syscallarg(unsigned) msg_prio; 964 syscallarg(const struct timespec *) abs_timeout; 965 } */ 966 struct timespec ts, *tsp; 967 int error; 968 969 /* Get and convert time value */ 970 if (SCARG(uap, abs_timeout)) { 971 error = copyin(SCARG(uap, abs_timeout), &ts, sizeof(ts)); 972 if (error) 973 return error; 974 tsp = &ts; 975 } else { 976 tsp = NULL; 977 } 978 979 return mq_send1(curthread->td_lwp, SCARG(uap, mqdes), SCARG(uap, msg_ptr), 980 SCARG(uap, msg_len), SCARG(uap, msg_prio), tsp); 981 } 982 983 int 984 sys_mq_notify(struct mq_notify_args *uap) 985 { 986 /* { 987 syscallarg(mqd_t) mqdes; 988 syscallarg(const struct sigevent *) notification; 989 } */ 990 file_t *fp = NULL; 991 struct mqueue *mq; 992 struct sigevent sig; 993 int error; 994 995 if (SCARG(uap, notification)) { 996 /* Get the signal from user-space */ 997 error = copyin(SCARG(uap, notification), &sig, 998 sizeof(struct sigevent)); 999 if (error) 1000 return error; 1001 if (sig.sigev_notify == SIGEV_SIGNAL && 1002 (sig.sigev_signo <= 0 || sig.sigev_signo >= NSIG)) 1003 return EINVAL; 1004 } 1005 1006 error = mqueue_get(curthread->td_lwp, SCARG(uap, mqdes), &fp); 1007 if (error) 1008 return error; 1009 mq = fp->f_data; 1010 1011 if (SCARG(uap, notification)) { 1012 /* Register notification: set the signal and target process */ 1013 if (mq->mq_notify_proc == NULL) { 1014 memcpy(&mq->mq_sig_notify, &sig, 1015 sizeof(struct sigevent)); 1016 mq->mq_notify_proc = curproc; 1017 } else { 1018 /* Fail if someone else already registered */ 1019 error = EBUSY; 1020 } 1021 } else { 1022 /* Unregister the notification */ 1023 mq->mq_notify_proc = NULL; 1024 } 1025 lockmgr(&mq->mq_mtx, LK_RELEASE); 1026 fdrop(fp); 1027 1028 return error; 1029 } 1030 1031 int 1032 sys_mq_getattr(struct mq_getattr_args *uap) 1033 { 1034 /* { 1035 syscallarg(mqd_t) mqdes; 1036 syscallarg(struct mq_attr *) mqstat; 1037 } */ 1038 file_t *fp = NULL; 1039 struct mqueue *mq; 1040 struct mq_attr attr; 1041 int error; 1042 1043 /* Get the message queue */ 1044 error = mqueue_get(curthread->td_lwp, SCARG(uap, mqdes), &fp); 1045 if (error) 1046 return error; 1047 mq = fp->f_data; 1048 memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr)); 1049 lockmgr(&mq->mq_mtx, LK_RELEASE); 1050 fdrop(fp); 1051 1052 return copyout(&attr, SCARG(uap, mqstat), sizeof(struct mq_attr)); 1053 } 1054 1055 int 1056 sys_mq_setattr(struct mq_setattr_args *uap) 1057 { 1058 /* { 1059 syscallarg(mqd_t) mqdes; 1060 syscallarg(const struct mq_attr *) mqstat; 1061 syscallarg(struct mq_attr *) omqstat; 1062 } */ 1063 file_t *fp = NULL; 1064 struct mqueue *mq; 1065 struct mq_attr attr; 1066 int error, nonblock; 1067 1068 error = copyin(SCARG(uap, mqstat), &attr, sizeof(struct mq_attr)); 1069 if (error) 1070 return error; 1071 nonblock = (attr.mq_flags & O_NONBLOCK); 1072 1073 /* Get the message queue */ 1074 error = mqueue_get(curthread->td_lwp, SCARG(uap, mqdes), &fp); 1075 if (error) 1076 return error; 1077 mq = fp->f_data; 1078 1079 /* Copy the old attributes, if needed */ 1080 if (SCARG(uap, omqstat)) { 1081 memcpy(&attr, &mq->mq_attrib, sizeof(struct mq_attr)); 1082 } 1083 1084 /* Ignore everything, except O_NONBLOCK */ 1085 if (nonblock) 1086 mq->mq_attrib.mq_flags |= O_NONBLOCK; 1087 else 1088 mq->mq_attrib.mq_flags &= ~O_NONBLOCK; 1089 1090 lockmgr(&mq->mq_mtx, LK_RELEASE); 1091 fdrop(fp); 1092 1093 /* 1094 * Copy the data to the user-space. 1095 * Note: According to POSIX, the new attributes should not be set in 1096 * case of fail - this would be violated. 1097 */ 1098 if (SCARG(uap, omqstat)) 1099 error = copyout(&attr, SCARG(uap, omqstat), 1100 sizeof(struct mq_attr)); 1101 1102 return error; 1103 } 1104 1105 int 1106 sys_mq_unlink(struct mq_unlink_args *uap) 1107 { 1108 /* { 1109 syscallarg(const char *) name; 1110 } */ 1111 struct thread *td = curthread; 1112 struct mqueue *mq; 1113 char *name; 1114 int error, refcnt = 0; 1115 1116 /* Get the name from the user-space */ 1117 name = kmalloc(MQ_NAMELEN, M_MQBUF, M_WAITOK | M_ZERO); 1118 error = copyinstr(SCARG(uap, name), name, MQ_NAMELEN - 1, NULL); 1119 if (error) { 1120 kfree(name, M_MQBUF); 1121 return error; 1122 } 1123 1124 /* Lookup for this file */ 1125 lockmgr(&mqlist_mtx, LK_EXCLUSIVE); 1126 mq = mqueue_lookup(name); 1127 if (mq == NULL) { 1128 error = ENOENT; 1129 goto error; 1130 } 1131 1132 /* Check the permissions */ 1133 if (td->td_ucred->cr_uid != mq->mq_euid && 1134 priv_check(td, PRIV_ROOT) != 0) { 1135 lockmgr(&mq->mq_mtx, LK_RELEASE); 1136 error = EACCES; 1137 goto error; 1138 } 1139 1140 /* Mark message queue as unlinking, before leaving the window */ 1141 mq->mq_attrib.mq_flags |= MQ_UNLINK; 1142 1143 /* Wake up all waiters, if there are such */ 1144 wakeup(&mq->mq_send_cv); 1145 wakeup(&mq->mq_recv_cv); 1146 1147 KNOTE(&mq->mq_rkq.ki_note, 0); 1148 KNOTE(&mq->mq_wkq.ki_note, 0); 1149 1150 refcnt = mq->mq_refcnt; 1151 if (refcnt == 0) 1152 LIST_REMOVE(mq, mq_list); 1153 1154 lockmgr(&mq->mq_mtx, LK_RELEASE); 1155 error: 1156 lockmgr(&mqlist_mtx, LK_RELEASE); 1157 1158 /* 1159 * If there are no references - destroy the message 1160 * queue, otherwise, the last mq_close() will do that. 1161 */ 1162 if (error == 0 && refcnt == 0) 1163 mqueue_destroy(mq); 1164 1165 kfree(name, M_MQBUF); 1166 return error; 1167 } 1168 1169 /* 1170 * SysCtl. 1171 */ 1172 SYSCTL_NODE(_kern, OID_AUTO, mqueue, 1173 CTLFLAG_RW, 0, "Message queue options"); 1174 1175 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_open_max, 1176 CTLFLAG_RW, &mq_open_max, 0, 1177 "Maximal number of message queue descriptors per process"); 1178 1179 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_prio_max, 1180 CTLFLAG_RW, &mq_prio_max, 0, 1181 "Maximal priority of the message"); 1182 1183 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_max_msgsize, 1184 CTLFLAG_RW, &mq_max_msgsize, 0, 1185 "Maximal allowed size of the message"); 1186 1187 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_def_maxmsg, 1188 CTLFLAG_RW, &mq_def_maxmsg, 0, 1189 "Default maximal message count"); 1190 1191 SYSCTL_INT(_kern_mqueue, OID_AUTO, mq_max_maxmsg, 1192 CTLFLAG_RW, &mq_max_maxmsg, 0, 1193 "Maximal allowed message count"); 1194 1195 SYSINIT(sys_mqueue_init, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, mqueue_sysinit, NULL); 1196