1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 * 37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.44 2007/07/04 19:40:35 dillon Exp $ 38 */ 39 40 #ifdef _KERNEL 41 42 #include <sys/param.h> 43 #include <sys/systm.h> 44 #include <sys/kernel.h> 45 #include <sys/proc.h> 46 #include <sys/rtprio.h> 47 #include <sys/queue.h> 48 #include <sys/sysctl.h> 49 #include <sys/kthread.h> 50 #include <sys/signalvar.h> 51 #include <sys/signal2.h> 52 #include <machine/cpu.h> 53 #include <sys/lock.h> 54 55 #include <vm/vm.h> 56 #include <vm/vm_param.h> 57 #include <vm/vm_kern.h> 58 #include <vm/vm_object.h> 59 #include <vm/vm_page.h> 60 #include <vm/vm_map.h> 61 #include <vm/vm_pager.h> 62 #include <vm/vm_extern.h> 63 #include <vm/vm_zone.h> 64 65 #include <sys/thread2.h> 66 #include <sys/msgport2.h> 67 #include <sys/spinlock2.h> 68 69 #include <machine/stdarg.h> 70 #include <machine/cpufunc.h> 71 #ifdef SMP 72 #include <machine/smp.h> 73 #endif 74 75 #include <sys/malloc.h> 76 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 77 78 #else 79 80 #include <sys/stdint.h> 81 #include <libcaps/thread.h> 82 #include <sys/thread.h> 83 #include <sys/msgport.h> 84 #include <sys/errno.h> 85 #include <libcaps/globaldata.h> 86 #include <machine/cpufunc.h> 87 #include <sys/thread2.h> 88 #include <sys/msgport2.h> 89 #include <string.h> 90 91 #endif /* _KERNEL */ 92 93 94 /************************************************************************ 95 * MESSAGE FUNCTIONS * 96 ************************************************************************/ 97 98 /* 99 * lwkt_sendmsg() 100 * 101 * Request asynchronous completion and call lwkt_beginmsg(). The 102 * target port can opt to execute the message synchronously or 103 * asynchronously and this function will automatically queue the 104 * response if the target executes the message synchronously. 105 * 106 * NOTE: The message is in an indeterminant state until this call 107 * returns. The caller should not mess with it (e.g. try to abort it) 108 * until then. 109 */ 110 void 111 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 112 { 113 int error; 114 115 KKASSERT(msg->ms_reply_port != NULL && 116 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 117 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 118 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 119 lwkt_replymsg(msg, error); 120 } 121 } 122 123 /* 124 * lwkt_domsg() 125 * 126 * Request asynchronous completion and call lwkt_beginmsg(). The 127 * target port can opt to execute the message synchronously or 128 * asynchronously and this function will automatically queue the 129 * response if the target executes the message synchronously. 130 */ 131 int 132 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 133 { 134 int error; 135 136 KKASSERT(msg->ms_reply_port != NULL && 137 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 138 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 139 msg->ms_flags |= MSGF_SYNC; 140 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 141 error = lwkt_waitmsg(msg, flags); 142 } else { 143 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 144 } 145 return(error); 146 } 147 148 /* 149 * lwkt_forwardmsg() 150 * 151 * Forward a message received on one port to another port. 152 */ 153 int 154 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 155 { 156 int error; 157 158 crit_enter(); 159 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 160 if ((error = port->mp_putport(port, msg)) != EASYNC) 161 lwkt_replymsg(msg, error); 162 crit_exit(); 163 return(error); 164 } 165 166 /* 167 * lwkt_abortmsg() 168 * 169 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 170 * The caller must ensure that the message will not be both replied AND 171 * destroyed while the abort is in progress. 172 * 173 * This function issues a callback which might block! 174 */ 175 void 176 lwkt_abortmsg(lwkt_msg_t msg) 177 { 178 /* 179 * A critical section protects us from reply IPIs on this cpu. 180 */ 181 crit_enter(); 182 183 /* 184 * Shortcut the operation if the message has already been returned. 185 * The callback typically constructs a lwkt_msg with the abort request, 186 * issues it synchronously, and waits for completion. The callback 187 * is not required to actually abort the message and the target port, 188 * upon receiving an abort request message generated by the callback 189 * should check whether the original message has already completed or 190 * not. 191 */ 192 if (msg->ms_flags & MSGF_ABORTABLE) { 193 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 194 msg->ms_abortfn(msg); 195 } 196 crit_exit(); 197 } 198 199 /************************************************************************ 200 * PORT INITIALIZATION API * 201 ************************************************************************/ 202 203 static void *lwkt_thread_getport(lwkt_port_t port); 204 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 205 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 206 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 207 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 208 209 static void *lwkt_spin_getport(lwkt_port_t port); 210 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 211 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 212 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 213 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 214 215 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 216 static void *lwkt_panic_getport(lwkt_port_t port); 217 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 218 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 219 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 220 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 221 222 /* 223 * Core port initialization (internal) 224 */ 225 static __inline 226 void 227 _lwkt_initport(lwkt_port_t port, 228 void *(*gportfn)(lwkt_port_t), 229 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 230 int (*wmsgfn)(lwkt_msg_t, int), 231 void *(*wportfn)(lwkt_port_t, int), 232 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 233 { 234 bzero(port, sizeof(*port)); 235 TAILQ_INIT(&port->mp_msgq); 236 port->mp_getport = gportfn; 237 port->mp_putport = pportfn; 238 port->mp_waitmsg = wmsgfn; 239 port->mp_waitport = wportfn; 240 port->mp_replyport = rportfn; 241 } 242 243 /* 244 * lwkt_initport_thread() 245 * 246 * Initialize a port for use by a particular thread. The port may 247 * only be used by <td>. 248 */ 249 void 250 lwkt_initport_thread(lwkt_port_t port, thread_t td) 251 { 252 _lwkt_initport(port, 253 lwkt_thread_getport, 254 lwkt_thread_putport, 255 lwkt_thread_waitmsg, 256 lwkt_thread_waitport, 257 lwkt_thread_replyport); 258 port->mpu_td = td; 259 } 260 261 /* 262 * lwkt_initport_spin() 263 * 264 * Initialize a port for use with descriptors that might be accessed 265 * via multiple LWPs, processes, or threads. Has somewhat more 266 * overhead then thread ports. 267 */ 268 void 269 lwkt_initport_spin(lwkt_port_t port) 270 { 271 _lwkt_initport(port, 272 lwkt_spin_getport, 273 lwkt_spin_putport, 274 lwkt_spin_waitmsg, 275 lwkt_spin_waitport, 276 lwkt_spin_replyport); 277 spin_init(&port->mpu_spin); 278 } 279 280 /* 281 * Similar to the standard initport, this function simply marks the message 282 * as being done and does not attempt to return it to an originating port. 283 */ 284 void 285 lwkt_initport_replyonly_null(lwkt_port_t port) 286 { 287 _lwkt_initport(port, 288 lwkt_panic_getport, 289 lwkt_panic_putport, 290 lwkt_panic_waitmsg, 291 lwkt_panic_waitport, 292 lwkt_null_replyport); 293 } 294 295 /* 296 * Initialize a reply-only port, typically used as a message sink. Such 297 * ports can only be used as a reply port. 298 */ 299 void 300 lwkt_initport_replyonly(lwkt_port_t port, 301 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 302 { 303 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 304 lwkt_panic_waitmsg, lwkt_panic_waitport, 305 rportfn); 306 } 307 308 void 309 lwkt_initport_putonly(lwkt_port_t port, 310 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 311 { 312 _lwkt_initport(port, lwkt_panic_getport, pportfn, 313 lwkt_panic_waitmsg, lwkt_panic_waitport, 314 lwkt_panic_replyport); 315 } 316 317 void 318 lwkt_initport_panic(lwkt_port_t port) 319 { 320 _lwkt_initport(port, 321 lwkt_panic_getport, lwkt_panic_putport, 322 lwkt_panic_waitmsg, lwkt_panic_waitport, 323 lwkt_panic_replyport); 324 } 325 326 /* 327 * lwkt_getport() 328 * 329 * Retrieve the next message from the port's message queue, return NULL 330 * if no messages are pending. The retrieved message will either be a 331 * request or a reply based on the MSGF_REPLY bit. 332 * 333 * The calling thread MUST own the port. 334 */ 335 336 static __inline 337 void 338 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 339 { 340 /* 341 * normal case, remove and return the message. 342 */ 343 TAILQ_REMOVE(&port->mp_msgq, msg, ms_node); 344 msg->ms_flags &= ~MSGF_QUEUED; 345 } 346 347 /************************************************************************ 348 * THREAD PORT BACKEND * 349 ************************************************************************ 350 * 351 * This backend is used when the port a message is retrieved from is owned 352 * by a single thread (the calling thread). Messages are IPId to the 353 * correct cpu before being enqueued to a port. Note that this is fairly 354 * optimal since scheduling would have had to do an IPI anyway if the 355 * message were headed to a different cpu. 356 */ 357 358 #ifdef SMP 359 360 /* 361 * This function completes reply processing for the default case in the 362 * context of the originating cpu. 363 */ 364 static 365 void 366 lwkt_thread_replyport_remote(lwkt_msg_t msg) 367 { 368 lwkt_port_t port = msg->ms_reply_port; 369 370 /* 371 * Chase any thread migration that occurs 372 */ 373 if (port->mpu_td->td_gd != mycpu) { 374 lwkt_send_ipiq(port->mpu_td->td_gd, 375 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 376 return; 377 } 378 379 /* 380 * Cleanup 381 */ 382 #ifdef INVARIANTS 383 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 384 msg->ms_flags &= ~MSGF_INTRANSIT; 385 #endif 386 if (msg->ms_flags & MSGF_SYNC) { 387 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 388 } else { 389 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 390 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 391 } 392 if (port->mp_flags & MSGPORTF_WAITING) 393 lwkt_schedule(port->mpu_td); 394 } 395 396 #endif 397 398 /* 399 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 400 * 401 * Called with the reply port as an argument but in the context of the 402 * original target port. Completion must occur on the target port's 403 * cpu. 404 * 405 * The critical section protects us from IPIs on the this CPU. 406 */ 407 void 408 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 409 { 410 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 411 412 if (msg->ms_flags & MSGF_SYNC) { 413 /* 414 * If a synchronous completion has been requested, just wakeup 415 * the message without bothering to queue it to the target port. 416 * 417 * Assume the target thread is non-preemptive, so no critical 418 * section is required. 419 */ 420 #ifdef SMP 421 if (port->mpu_td->td_gd == mycpu) { 422 #endif 423 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 424 if (port->mp_flags & MSGPORTF_WAITING) 425 lwkt_schedule(port->mpu_td); 426 #ifdef SMP 427 } else { 428 #ifdef INVARIANTS 429 msg->ms_flags |= MSGF_INTRANSIT; 430 #endif 431 msg->ms_flags |= MSGF_REPLY; 432 lwkt_send_ipiq(port->mpu_td->td_gd, 433 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 434 } 435 #endif 436 } else { 437 /* 438 * If an asynchronous completion has been requested the message 439 * must be queued to the reply port. 440 * 441 * A critical section is required to interlock the port queue. 442 */ 443 #ifdef SMP 444 if (port->mpu_td->td_gd == mycpu) { 445 #endif 446 crit_enter(); 447 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 448 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 449 if (port->mp_flags & MSGPORTF_WAITING) 450 lwkt_schedule(port->mpu_td); 451 crit_exit(); 452 #ifdef SMP 453 } else { 454 #ifdef INVARIANTS 455 msg->ms_flags |= MSGF_INTRANSIT; 456 #endif 457 msg->ms_flags |= MSGF_REPLY; 458 lwkt_send_ipiq(port->mpu_td->td_gd, 459 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 460 } 461 #endif 462 } 463 } 464 465 /* 466 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 467 * 468 * Called with the target port as an argument but in the context of the 469 * reply port. This function always implements an asynchronous put to 470 * the target message port, and thus returns EASYNC. 471 * 472 * The message must already have cleared MSGF_DONE and MSGF_REPLY 473 */ 474 475 #ifdef SMP 476 477 static 478 void 479 lwkt_thread_putport_remote(lwkt_msg_t msg) 480 { 481 lwkt_port_t port = msg->ms_target_port; 482 483 /* 484 * Chase any thread migration that occurs 485 */ 486 if (port->mpu_td->td_gd != mycpu) { 487 lwkt_send_ipiq(port->mpu_td->td_gd, 488 (ipifunc1_t)lwkt_thread_putport_remote, msg); 489 return; 490 } 491 492 /* 493 * Cleanup 494 */ 495 #ifdef INVARIANTS 496 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 497 msg->ms_flags &= ~MSGF_INTRANSIT; 498 #endif 499 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 500 msg->ms_flags |= MSGF_QUEUED; 501 if (port->mp_flags & MSGPORTF_WAITING) 502 lwkt_schedule(port->mpu_td); 503 } 504 505 #endif 506 507 static 508 int 509 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 510 { 511 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 512 513 msg->ms_target_port = port; 514 #ifdef SMP 515 if (port->mpu_td->td_gd == mycpu) { 516 #endif 517 crit_enter(); 518 msg->ms_flags |= MSGF_QUEUED; 519 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 520 if (port->mp_flags & MSGPORTF_WAITING) 521 lwkt_schedule(port->mpu_td); 522 crit_exit(); 523 #ifdef SMP 524 } else { 525 #ifdef INVARIANTS 526 msg->ms_flags |= MSGF_INTRANSIT; 527 #endif 528 lwkt_send_ipiq(port->mpu_td->td_gd, 529 (ipifunc1_t)lwkt_thread_putport_remote, msg); 530 } 531 #endif 532 return (EASYNC); 533 } 534 535 /* 536 * lwkt_thread_getport() 537 * 538 * Retrieve the next message from the port or NULL if no messages 539 * are ready. 540 */ 541 void * 542 lwkt_thread_getport(lwkt_port_t port) 543 { 544 lwkt_msg_t msg; 545 546 KKASSERT(port->mpu_td == curthread); 547 548 crit_enter_quick(port->mpu_td); 549 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 550 _lwkt_pullmsg(port, msg); 551 crit_exit_quick(port->mpu_td); 552 return(msg); 553 } 554 555 /* 556 * lwkt_thread_waitmsg() 557 * 558 * Wait for a particular message to be replied. We must be the only 559 * thread waiting on the message. The port must be owned by the 560 * caller. 561 */ 562 int 563 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 564 { 565 if ((msg->ms_flags & MSGF_DONE) == 0) { 566 /* 567 * If the done bit was not set we have to block until it is. 568 */ 569 lwkt_port_t port = msg->ms_reply_port; 570 thread_t td = curthread; 571 int sentabort; 572 573 KKASSERT(port->mpu_td == td); 574 crit_enter_quick(td); 575 sentabort = 0; 576 577 while ((msg->ms_flags & MSGF_DONE) == 0) { 578 port->mp_flags |= MSGPORTF_WAITING; 579 if (sentabort == 0) { 580 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 581 lwkt_abortmsg(msg); 582 } 583 } else { 584 lwkt_sleep("waitabt", 0); 585 } 586 port->mp_flags &= ~MSGPORTF_WAITING; 587 } 588 if (msg->ms_flags & MSGF_QUEUED) 589 _lwkt_pullmsg(port, msg); 590 crit_exit_quick(td); 591 } else { 592 /* 593 * If the done bit was set we only have to mess around with the 594 * message if it is queued on the reply port. 595 */ 596 if (msg->ms_flags & MSGF_QUEUED) { 597 lwkt_port_t port = msg->ms_reply_port; 598 thread_t td = curthread; 599 600 KKASSERT(port->mpu_td == td); 601 crit_enter_quick(td); 602 _lwkt_pullmsg(port, msg); 603 crit_exit_quick(td); 604 } 605 } 606 return(msg->ms_error); 607 } 608 609 void * 610 lwkt_thread_waitport(lwkt_port_t port, int flags) 611 { 612 thread_t td = curthread; 613 lwkt_msg_t msg; 614 int error; 615 616 KKASSERT(port->mpu_td == td); 617 crit_enter_quick(td); 618 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 619 port->mp_flags |= MSGPORTF_WAITING; 620 error = lwkt_sleep("waitport", flags); 621 port->mp_flags &= ~MSGPORTF_WAITING; 622 if (error) 623 goto done; 624 } 625 _lwkt_pullmsg(port, msg); 626 done: 627 crit_exit_quick(td); 628 return(msg); 629 } 630 631 /************************************************************************ 632 * SPIN PORT BACKEND * 633 ************************************************************************ 634 * 635 * This backend uses spinlocks instead of making assumptions about which 636 * thread is accessing the port. It must be used when a port is not owned 637 * by a particular thread. This is less optimal then thread ports but 638 * you don't have a choice if there are multiple threads accessing the port. 639 * 640 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 641 * on the message port, it is the responsibility of the code doing the 642 * wakeup to clear this flag rather then the blocked threads. Some 643 * superfluous wakeups may occur, which is ok. 644 * 645 * XXX synchronous message wakeups are not current optimized. 646 */ 647 648 static 649 void * 650 lwkt_spin_getport(lwkt_port_t port) 651 { 652 lwkt_msg_t msg; 653 654 spin_lock_wr(&port->mpu_spin); 655 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 656 _lwkt_pullmsg(port, msg); 657 spin_unlock_wr(&port->mpu_spin); 658 return(msg); 659 } 660 661 static 662 int 663 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 664 { 665 int dowakeup; 666 667 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 668 669 msg->ms_target_port = port; 670 spin_lock_wr(&port->mpu_spin); 671 msg->ms_flags |= MSGF_QUEUED; 672 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 673 dowakeup = 0; 674 if (port->mp_flags & MSGPORTF_WAITING) { 675 port->mp_flags &= ~MSGPORTF_WAITING; 676 dowakeup = 1; 677 } 678 spin_unlock_wr(&port->mpu_spin); 679 if (dowakeup) 680 wakeup(port); 681 return (EASYNC); 682 } 683 684 static 685 int 686 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 687 { 688 lwkt_port_t port; 689 int sentabort; 690 int error; 691 692 if ((msg->ms_flags & MSGF_DONE) == 0) { 693 port = msg->ms_reply_port; 694 sentabort = 0; 695 spin_lock_wr(&port->mpu_spin); 696 while ((msg->ms_flags & MSGF_DONE) == 0) { 697 void *won; 698 699 /* 700 * If message was sent synchronously from the beginning 701 * the wakeup will be on the message structure, else it 702 * will be on the port structure. 703 */ 704 if (msg->ms_flags & MSGF_SYNC) { 705 won = msg; 706 } else { 707 won = port; 708 port->mp_flags |= MSGPORTF_WAITING; 709 } 710 711 /* 712 * Only messages which support abort can be interrupted. 713 * We must still wait for message completion regardless. 714 */ 715 if ((flags & PCATCH) && sentabort == 0) { 716 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 717 if (error) { 718 sentabort = error; 719 spin_unlock_wr(&port->mpu_spin); 720 lwkt_abortmsg(msg); 721 spin_lock_wr(&port->mpu_spin); 722 } 723 } else { 724 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0); 725 } 726 /* see note at the top on the MSGPORTF_WAITING flag */ 727 } 728 /* 729 * Turn EINTR into ERESTART if the signal indicates. 730 */ 731 if (sentabort && msg->ms_error == EINTR) 732 msg->ms_error = sentabort; 733 if (msg->ms_flags & MSGF_QUEUED) 734 _lwkt_pullmsg(port, msg); 735 spin_unlock_wr(&port->mpu_spin); 736 } else { 737 if (msg->ms_flags & MSGF_QUEUED) { 738 port = msg->ms_reply_port; 739 spin_lock_wr(&port->mpu_spin); 740 _lwkt_pullmsg(port, msg); 741 spin_unlock_wr(&port->mpu_spin); 742 } 743 } 744 return(msg->ms_error); 745 } 746 747 static 748 void * 749 lwkt_spin_waitport(lwkt_port_t port, int flags) 750 { 751 lwkt_msg_t msg; 752 int error; 753 754 spin_lock_wr(&port->mpu_spin); 755 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 756 port->mp_flags |= MSGPORTF_WAITING; 757 error = msleep(port, &port->mpu_spin, flags, "waitport", 0); 758 /* see note at the top on the MSGPORTF_WAITING flag */ 759 if (error) { 760 spin_unlock_wr(&port->mpu_spin); 761 return(NULL); 762 } 763 } 764 _lwkt_pullmsg(port, msg); 765 spin_unlock_wr(&port->mpu_spin); 766 return(msg); 767 } 768 769 static 770 void 771 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 772 { 773 int dowakeup; 774 775 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 776 777 if (msg->ms_flags & MSGF_SYNC) { 778 /* 779 * If a synchronous completion has been requested, just wakeup 780 * the message without bothering to queue it to the target port. 781 */ 782 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 783 wakeup(msg); 784 } else { 785 /* 786 * If an asynchronous completion has been requested the message 787 * must be queued to the reply port. 788 */ 789 spin_lock_wr(&port->mpu_spin); 790 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 791 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 792 dowakeup = 0; 793 if (port->mp_flags & MSGPORTF_WAITING) { 794 port->mp_flags &= ~MSGPORTF_WAITING; 795 dowakeup = 1; 796 } 797 spin_unlock_wr(&port->mpu_spin); 798 if (dowakeup) 799 wakeup(port); 800 } 801 } 802 803 /************************************************************************ 804 * PANIC AND SPECIAL PORT FUNCTIONS * 805 ************************************************************************/ 806 807 /* 808 * You can point a port's reply vector at this function if you just want 809 * the message marked done, without any queueing or signaling. This is 810 * often used for structure-embedded messages. 811 */ 812 static 813 void 814 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 815 { 816 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 817 } 818 819 static 820 void * 821 lwkt_panic_getport(lwkt_port_t port) 822 { 823 panic("lwkt_getport() illegal on port %p", port); 824 } 825 826 static 827 int 828 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 829 { 830 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 831 } 832 833 static 834 int 835 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 836 { 837 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 838 } 839 840 static 841 void * 842 lwkt_panic_waitport(lwkt_port_t port, int flags) 843 { 844 panic("port %p cannot be waited on", port); 845 } 846 847 static 848 void 849 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 850 { 851 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 852 } 853 854