1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 * 37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.46 2008/05/18 20:57:56 nth Exp $ 38 */ 39 40 #include <sys/param.h> 41 #include <sys/systm.h> 42 #include <sys/kernel.h> 43 #include <sys/proc.h> 44 #include <sys/rtprio.h> 45 #include <sys/queue.h> 46 #include <sys/sysctl.h> 47 #include <sys/kthread.h> 48 #include <sys/signalvar.h> 49 #include <sys/signal2.h> 50 #include <machine/cpu.h> 51 #include <sys/lock.h> 52 53 #include <vm/vm.h> 54 #include <vm/vm_param.h> 55 #include <vm/vm_kern.h> 56 #include <vm/vm_object.h> 57 #include <vm/vm_page.h> 58 #include <vm/vm_map.h> 59 #include <vm/vm_pager.h> 60 #include <vm/vm_extern.h> 61 #include <vm/vm_zone.h> 62 63 #include <sys/thread2.h> 64 #include <sys/msgport2.h> 65 #include <sys/spinlock2.h> 66 #include <sys/serialize.h> 67 68 #include <machine/stdarg.h> 69 #include <machine/cpufunc.h> 70 #ifdef SMP 71 #include <machine/smp.h> 72 #endif 73 74 #include <sys/malloc.h> 75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 76 77 /************************************************************************ 78 * MESSAGE FUNCTIONS * 79 ************************************************************************/ 80 81 /* 82 * lwkt_sendmsg() 83 * 84 * Request asynchronous completion and call lwkt_beginmsg(). The 85 * target port can opt to execute the message synchronously or 86 * asynchronously and this function will automatically queue the 87 * response if the target executes the message synchronously. 88 * 89 * NOTE: The message is in an indeterminant state until this call 90 * returns. The caller should not mess with it (e.g. try to abort it) 91 * until then. 92 */ 93 void 94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 95 { 96 int error; 97 98 KKASSERT(msg->ms_reply_port != NULL && 99 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 100 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 101 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 102 lwkt_replymsg(msg, error); 103 } 104 } 105 106 /* 107 * lwkt_domsg() 108 * 109 * Request asynchronous completion and call lwkt_beginmsg(). The 110 * target port can opt to execute the message synchronously or 111 * asynchronously and this function will automatically queue the 112 * response if the target executes the message synchronously. 113 */ 114 int 115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 116 { 117 int error; 118 119 KKASSERT(msg->ms_reply_port != NULL && 120 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 121 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 122 msg->ms_flags |= MSGF_SYNC; 123 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 124 error = lwkt_waitmsg(msg, flags); 125 } else { 126 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 127 } 128 return(error); 129 } 130 131 /* 132 * lwkt_forwardmsg() 133 * 134 * Forward a message received on one port to another port. 135 */ 136 int 137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 138 { 139 int error; 140 141 crit_enter(); 142 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 143 if ((error = port->mp_putport(port, msg)) != EASYNC) 144 lwkt_replymsg(msg, error); 145 crit_exit(); 146 return(error); 147 } 148 149 /* 150 * lwkt_abortmsg() 151 * 152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 153 * The caller must ensure that the message will not be both replied AND 154 * destroyed while the abort is in progress. 155 * 156 * This function issues a callback which might block! 157 */ 158 void 159 lwkt_abortmsg(lwkt_msg_t msg) 160 { 161 /* 162 * A critical section protects us from reply IPIs on this cpu. 163 */ 164 crit_enter(); 165 166 /* 167 * Shortcut the operation if the message has already been returned. 168 * The callback typically constructs a lwkt_msg with the abort request, 169 * issues it synchronously, and waits for completion. The callback 170 * is not required to actually abort the message and the target port, 171 * upon receiving an abort request message generated by the callback 172 * should check whether the original message has already completed or 173 * not. 174 */ 175 if (msg->ms_flags & MSGF_ABORTABLE) { 176 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 177 msg->ms_abortfn(msg); 178 } 179 crit_exit(); 180 } 181 182 /************************************************************************ 183 * PORT INITIALIZATION API * 184 ************************************************************************/ 185 186 static void *lwkt_thread_getport(lwkt_port_t port); 187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 191 192 static void *lwkt_spin_getport(lwkt_port_t port); 193 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 194 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 195 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 196 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 197 198 static void *lwkt_serialize_getport(lwkt_port_t port); 199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 203 204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 205 static void *lwkt_panic_getport(lwkt_port_t port); 206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 210 211 /* 212 * Core port initialization (internal) 213 */ 214 static __inline 215 void 216 _lwkt_initport(lwkt_port_t port, 217 void *(*gportfn)(lwkt_port_t), 218 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 219 int (*wmsgfn)(lwkt_msg_t, int), 220 void *(*wportfn)(lwkt_port_t, int), 221 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 222 { 223 bzero(port, sizeof(*port)); 224 TAILQ_INIT(&port->mp_msgq); 225 port->mp_getport = gportfn; 226 port->mp_putport = pportfn; 227 port->mp_waitmsg = wmsgfn; 228 port->mp_waitport = wportfn; 229 port->mp_replyport = rportfn; 230 } 231 232 /* 233 * lwkt_initport_thread() 234 * 235 * Initialize a port for use by a particular thread. The port may 236 * only be used by <td>. 237 */ 238 void 239 lwkt_initport_thread(lwkt_port_t port, thread_t td) 240 { 241 _lwkt_initport(port, 242 lwkt_thread_getport, 243 lwkt_thread_putport, 244 lwkt_thread_waitmsg, 245 lwkt_thread_waitport, 246 lwkt_thread_replyport); 247 port->mpu_td = td; 248 } 249 250 /* 251 * lwkt_initport_spin() 252 * 253 * Initialize a port for use with descriptors that might be accessed 254 * via multiple LWPs, processes, or threads. Has somewhat more 255 * overhead then thread ports. 256 */ 257 void 258 lwkt_initport_spin(lwkt_port_t port) 259 { 260 _lwkt_initport(port, 261 lwkt_spin_getport, 262 lwkt_spin_putport, 263 lwkt_spin_waitmsg, 264 lwkt_spin_waitport, 265 lwkt_spin_replyport); 266 spin_init(&port->mpu_spin); 267 } 268 269 void 270 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 271 { 272 _lwkt_initport(port, 273 lwkt_serialize_getport, 274 lwkt_serialize_putport, 275 lwkt_serialize_waitmsg, 276 lwkt_serialize_waitport, 277 lwkt_serialize_replyport); 278 port->mpu_serialize = slz; 279 } 280 281 /* 282 * Similar to the standard initport, this function simply marks the message 283 * as being done and does not attempt to return it to an originating port. 284 */ 285 void 286 lwkt_initport_replyonly_null(lwkt_port_t port) 287 { 288 _lwkt_initport(port, 289 lwkt_panic_getport, 290 lwkt_panic_putport, 291 lwkt_panic_waitmsg, 292 lwkt_panic_waitport, 293 lwkt_null_replyport); 294 } 295 296 /* 297 * Initialize a reply-only port, typically used as a message sink. Such 298 * ports can only be used as a reply port. 299 */ 300 void 301 lwkt_initport_replyonly(lwkt_port_t port, 302 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 303 { 304 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 305 lwkt_panic_waitmsg, lwkt_panic_waitport, 306 rportfn); 307 } 308 309 void 310 lwkt_initport_putonly(lwkt_port_t port, 311 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 312 { 313 _lwkt_initport(port, lwkt_panic_getport, pportfn, 314 lwkt_panic_waitmsg, lwkt_panic_waitport, 315 lwkt_panic_replyport); 316 } 317 318 void 319 lwkt_initport_panic(lwkt_port_t port) 320 { 321 _lwkt_initport(port, 322 lwkt_panic_getport, lwkt_panic_putport, 323 lwkt_panic_waitmsg, lwkt_panic_waitport, 324 lwkt_panic_replyport); 325 } 326 327 /* 328 * lwkt_getport() 329 * 330 * Retrieve the next message from the port's message queue, return NULL 331 * if no messages are pending. The retrieved message will either be a 332 * request or a reply based on the MSGF_REPLY bit. 333 * 334 * The calling thread MUST own the port. 335 */ 336 337 static __inline 338 void 339 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 340 { 341 /* 342 * normal case, remove and return the message. 343 */ 344 TAILQ_REMOVE(&port->mp_msgq, msg, ms_node); 345 msg->ms_flags &= ~MSGF_QUEUED; 346 } 347 348 /************************************************************************ 349 * THREAD PORT BACKEND * 350 ************************************************************************ 351 * 352 * This backend is used when the port a message is retrieved from is owned 353 * by a single thread (the calling thread). Messages are IPId to the 354 * correct cpu before being enqueued to a port. Note that this is fairly 355 * optimal since scheduling would have had to do an IPI anyway if the 356 * message were headed to a different cpu. 357 */ 358 359 #ifdef SMP 360 361 /* 362 * This function completes reply processing for the default case in the 363 * context of the originating cpu. 364 */ 365 static 366 void 367 lwkt_thread_replyport_remote(lwkt_msg_t msg) 368 { 369 lwkt_port_t port = msg->ms_reply_port; 370 371 /* 372 * Chase any thread migration that occurs 373 */ 374 if (port->mpu_td->td_gd != mycpu) { 375 lwkt_send_ipiq(port->mpu_td->td_gd, 376 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 377 return; 378 } 379 380 /* 381 * Cleanup 382 */ 383 #ifdef INVARIANTS 384 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 385 msg->ms_flags &= ~MSGF_INTRANSIT; 386 #endif 387 if (msg->ms_flags & MSGF_SYNC) { 388 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 389 } else { 390 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 391 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 392 } 393 if (port->mp_flags & MSGPORTF_WAITING) 394 lwkt_schedule(port->mpu_td); 395 } 396 397 #endif 398 399 /* 400 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 401 * 402 * Called with the reply port as an argument but in the context of the 403 * original target port. Completion must occur on the target port's 404 * cpu. 405 * 406 * The critical section protects us from IPIs on the this CPU. 407 */ 408 void 409 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 410 { 411 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 412 413 if (msg->ms_flags & MSGF_SYNC) { 414 /* 415 * If a synchronous completion has been requested, just wakeup 416 * the message without bothering to queue it to the target port. 417 * 418 * Assume the target thread is non-preemptive, so no critical 419 * section is required. 420 */ 421 #ifdef SMP 422 if (port->mpu_td->td_gd == mycpu) { 423 #endif 424 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 425 if (port->mp_flags & MSGPORTF_WAITING) 426 lwkt_schedule(port->mpu_td); 427 #ifdef SMP 428 } else { 429 #ifdef INVARIANTS 430 msg->ms_flags |= MSGF_INTRANSIT; 431 #endif 432 msg->ms_flags |= MSGF_REPLY; 433 lwkt_send_ipiq(port->mpu_td->td_gd, 434 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 435 } 436 #endif 437 } else { 438 /* 439 * If an asynchronous completion has been requested the message 440 * must be queued to the reply port. 441 * 442 * A critical section is required to interlock the port queue. 443 */ 444 #ifdef SMP 445 if (port->mpu_td->td_gd == mycpu) { 446 #endif 447 crit_enter(); 448 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 449 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 450 if (port->mp_flags & MSGPORTF_WAITING) 451 lwkt_schedule(port->mpu_td); 452 crit_exit(); 453 #ifdef SMP 454 } else { 455 #ifdef INVARIANTS 456 msg->ms_flags |= MSGF_INTRANSIT; 457 #endif 458 msg->ms_flags |= MSGF_REPLY; 459 lwkt_send_ipiq(port->mpu_td->td_gd, 460 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 461 } 462 #endif 463 } 464 } 465 466 /* 467 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 468 * 469 * Called with the target port as an argument but in the context of the 470 * reply port. This function always implements an asynchronous put to 471 * the target message port, and thus returns EASYNC. 472 * 473 * The message must already have cleared MSGF_DONE and MSGF_REPLY 474 */ 475 476 #ifdef SMP 477 478 static 479 void 480 lwkt_thread_putport_remote(lwkt_msg_t msg) 481 { 482 lwkt_port_t port = msg->ms_target_port; 483 484 /* 485 * Chase any thread migration that occurs 486 */ 487 if (port->mpu_td->td_gd != mycpu) { 488 lwkt_send_ipiq(port->mpu_td->td_gd, 489 (ipifunc1_t)lwkt_thread_putport_remote, msg); 490 return; 491 } 492 493 /* 494 * Cleanup 495 */ 496 #ifdef INVARIANTS 497 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 498 msg->ms_flags &= ~MSGF_INTRANSIT; 499 #endif 500 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 501 msg->ms_flags |= MSGF_QUEUED; 502 if (port->mp_flags & MSGPORTF_WAITING) 503 lwkt_schedule(port->mpu_td); 504 } 505 506 #endif 507 508 static 509 int 510 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 511 { 512 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 513 514 msg->ms_target_port = port; 515 #ifdef SMP 516 if (port->mpu_td->td_gd == mycpu) { 517 #endif 518 crit_enter(); 519 msg->ms_flags |= MSGF_QUEUED; 520 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 521 if (port->mp_flags & MSGPORTF_WAITING) 522 lwkt_schedule(port->mpu_td); 523 crit_exit(); 524 #ifdef SMP 525 } else { 526 #ifdef INVARIANTS 527 msg->ms_flags |= MSGF_INTRANSIT; 528 #endif 529 lwkt_send_ipiq(port->mpu_td->td_gd, 530 (ipifunc1_t)lwkt_thread_putport_remote, msg); 531 } 532 #endif 533 return (EASYNC); 534 } 535 536 /* 537 * lwkt_thread_getport() 538 * 539 * Retrieve the next message from the port or NULL if no messages 540 * are ready. 541 */ 542 void * 543 lwkt_thread_getport(lwkt_port_t port) 544 { 545 lwkt_msg_t msg; 546 547 KKASSERT(port->mpu_td == curthread); 548 549 crit_enter_quick(port->mpu_td); 550 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 551 _lwkt_pullmsg(port, msg); 552 crit_exit_quick(port->mpu_td); 553 return(msg); 554 } 555 556 /* 557 * lwkt_thread_waitmsg() 558 * 559 * Wait for a particular message to be replied. We must be the only 560 * thread waiting on the message. The port must be owned by the 561 * caller. 562 */ 563 int 564 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 565 { 566 if ((msg->ms_flags & MSGF_DONE) == 0) { 567 /* 568 * If the done bit was not set we have to block until it is. 569 */ 570 lwkt_port_t port = msg->ms_reply_port; 571 thread_t td = curthread; 572 int sentabort; 573 574 KKASSERT(port->mpu_td == td); 575 crit_enter_quick(td); 576 sentabort = 0; 577 578 while ((msg->ms_flags & MSGF_DONE) == 0) { 579 port->mp_flags |= MSGPORTF_WAITING; 580 if (sentabort == 0) { 581 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 582 lwkt_abortmsg(msg); 583 } 584 } else { 585 lwkt_sleep("waitabt", 0); 586 } 587 port->mp_flags &= ~MSGPORTF_WAITING; 588 } 589 if (msg->ms_flags & MSGF_QUEUED) 590 _lwkt_pullmsg(port, msg); 591 crit_exit_quick(td); 592 } else { 593 /* 594 * If the done bit was set we only have to mess around with the 595 * message if it is queued on the reply port. 596 */ 597 if (msg->ms_flags & MSGF_QUEUED) { 598 lwkt_port_t port = msg->ms_reply_port; 599 thread_t td = curthread; 600 601 KKASSERT(port->mpu_td == td); 602 crit_enter_quick(td); 603 _lwkt_pullmsg(port, msg); 604 crit_exit_quick(td); 605 } 606 } 607 return(msg->ms_error); 608 } 609 610 void * 611 lwkt_thread_waitport(lwkt_port_t port, int flags) 612 { 613 thread_t td = curthread; 614 lwkt_msg_t msg; 615 int error; 616 617 KKASSERT(port->mpu_td == td); 618 crit_enter_quick(td); 619 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 620 port->mp_flags |= MSGPORTF_WAITING; 621 error = lwkt_sleep("waitport", flags); 622 port->mp_flags &= ~MSGPORTF_WAITING; 623 if (error) 624 goto done; 625 } 626 _lwkt_pullmsg(port, msg); 627 done: 628 crit_exit_quick(td); 629 return(msg); 630 } 631 632 /************************************************************************ 633 * SPIN PORT BACKEND * 634 ************************************************************************ 635 * 636 * This backend uses spinlocks instead of making assumptions about which 637 * thread is accessing the port. It must be used when a port is not owned 638 * by a particular thread. This is less optimal then thread ports but 639 * you don't have a choice if there are multiple threads accessing the port. 640 * 641 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 642 * on the message port, it is the responsibility of the code doing the 643 * wakeup to clear this flag rather then the blocked threads. Some 644 * superfluous wakeups may occur, which is ok. 645 * 646 * XXX synchronous message wakeups are not current optimized. 647 */ 648 649 static 650 void * 651 lwkt_spin_getport(lwkt_port_t port) 652 { 653 lwkt_msg_t msg; 654 655 spin_lock_wr(&port->mpu_spin); 656 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 657 _lwkt_pullmsg(port, msg); 658 spin_unlock_wr(&port->mpu_spin); 659 return(msg); 660 } 661 662 static 663 int 664 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 665 { 666 int dowakeup; 667 668 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 669 670 msg->ms_target_port = port; 671 spin_lock_wr(&port->mpu_spin); 672 msg->ms_flags |= MSGF_QUEUED; 673 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 674 dowakeup = 0; 675 if (port->mp_flags & MSGPORTF_WAITING) { 676 port->mp_flags &= ~MSGPORTF_WAITING; 677 dowakeup = 1; 678 } 679 spin_unlock_wr(&port->mpu_spin); 680 if (dowakeup) 681 wakeup(port); 682 return (EASYNC); 683 } 684 685 static 686 int 687 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 688 { 689 lwkt_port_t port; 690 int sentabort; 691 int error; 692 693 if ((msg->ms_flags & MSGF_DONE) == 0) { 694 port = msg->ms_reply_port; 695 sentabort = 0; 696 spin_lock_wr(&port->mpu_spin); 697 while ((msg->ms_flags & MSGF_DONE) == 0) { 698 void *won; 699 700 /* 701 * If message was sent synchronously from the beginning 702 * the wakeup will be on the message structure, else it 703 * will be on the port structure. 704 */ 705 if (msg->ms_flags & MSGF_SYNC) { 706 won = msg; 707 } else { 708 won = port; 709 port->mp_flags |= MSGPORTF_WAITING; 710 } 711 712 /* 713 * Only messages which support abort can be interrupted. 714 * We must still wait for message completion regardless. 715 */ 716 if ((flags & PCATCH) && sentabort == 0) { 717 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 718 if (error) { 719 sentabort = error; 720 spin_unlock_wr(&port->mpu_spin); 721 lwkt_abortmsg(msg); 722 spin_lock_wr(&port->mpu_spin); 723 } 724 } else { 725 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0); 726 } 727 /* see note at the top on the MSGPORTF_WAITING flag */ 728 } 729 /* 730 * Turn EINTR into ERESTART if the signal indicates. 731 */ 732 if (sentabort && msg->ms_error == EINTR) 733 msg->ms_error = sentabort; 734 if (msg->ms_flags & MSGF_QUEUED) 735 _lwkt_pullmsg(port, msg); 736 spin_unlock_wr(&port->mpu_spin); 737 } else { 738 if (msg->ms_flags & MSGF_QUEUED) { 739 port = msg->ms_reply_port; 740 spin_lock_wr(&port->mpu_spin); 741 _lwkt_pullmsg(port, msg); 742 spin_unlock_wr(&port->mpu_spin); 743 } 744 } 745 return(msg->ms_error); 746 } 747 748 static 749 void * 750 lwkt_spin_waitport(lwkt_port_t port, int flags) 751 { 752 lwkt_msg_t msg; 753 int error; 754 755 spin_lock_wr(&port->mpu_spin); 756 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 757 port->mp_flags |= MSGPORTF_WAITING; 758 error = msleep(port, &port->mpu_spin, flags, "waitport", 0); 759 /* see note at the top on the MSGPORTF_WAITING flag */ 760 if (error) { 761 spin_unlock_wr(&port->mpu_spin); 762 return(NULL); 763 } 764 } 765 _lwkt_pullmsg(port, msg); 766 spin_unlock_wr(&port->mpu_spin); 767 return(msg); 768 } 769 770 static 771 void 772 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 773 { 774 int dowakeup; 775 776 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 777 778 if (msg->ms_flags & MSGF_SYNC) { 779 /* 780 * If a synchronous completion has been requested, just wakeup 781 * the message without bothering to queue it to the target port. 782 */ 783 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 784 wakeup(msg); 785 } else { 786 /* 787 * If an asynchronous completion has been requested the message 788 * must be queued to the reply port. 789 */ 790 spin_lock_wr(&port->mpu_spin); 791 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 792 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 793 dowakeup = 0; 794 if (port->mp_flags & MSGPORTF_WAITING) { 795 port->mp_flags &= ~MSGPORTF_WAITING; 796 dowakeup = 1; 797 } 798 spin_unlock_wr(&port->mpu_spin); 799 if (dowakeup) 800 wakeup(port); 801 } 802 } 803 804 /************************************************************************ 805 * SERIALIZER PORT BACKEND * 806 ************************************************************************ 807 * 808 * This backend uses serializer to protect port accessing. Callers are 809 * assumed to have serializer held. This kind of port is usually created 810 * by network device driver along with _one_ lwkt thread to pipeline 811 * operations which may temporarily release serializer. 812 * 813 * Implementation is based on SPIN PORT BACKEND. 814 */ 815 816 static 817 void * 818 lwkt_serialize_getport(lwkt_port_t port) 819 { 820 lwkt_msg_t msg; 821 822 ASSERT_SERIALIZED(port->mpu_serialize); 823 824 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 825 _lwkt_pullmsg(port, msg); 826 return(msg); 827 } 828 829 static 830 int 831 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 832 { 833 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 834 ASSERT_SERIALIZED(port->mpu_serialize); 835 836 msg->ms_target_port = port; 837 msg->ms_flags |= MSGF_QUEUED; 838 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 839 if (port->mp_flags & MSGPORTF_WAITING) { 840 port->mp_flags &= ~MSGPORTF_WAITING; 841 wakeup(port); 842 } 843 return (EASYNC); 844 } 845 846 static 847 int 848 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 849 { 850 lwkt_port_t port; 851 int sentabort; 852 int error; 853 854 if ((msg->ms_flags & MSGF_DONE) == 0) { 855 port = msg->ms_reply_port; 856 857 ASSERT_SERIALIZED(port->mpu_serialize); 858 859 sentabort = 0; 860 while ((msg->ms_flags & MSGF_DONE) == 0) { 861 void *won; 862 863 /* 864 * If message was sent synchronously from the beginning 865 * the wakeup will be on the message structure, else it 866 * will be on the port structure. 867 */ 868 if (msg->ms_flags & MSGF_SYNC) { 869 won = msg; 870 } else { 871 won = port; 872 port->mp_flags |= MSGPORTF_WAITING; 873 } 874 875 /* 876 * Only messages which support abort can be interrupted. 877 * We must still wait for message completion regardless. 878 */ 879 if ((flags & PCATCH) && sentabort == 0) { 880 error = serialize_sleep(won, port->mpu_serialize, PCATCH, 881 "waitmsg", 0); 882 if (error) { 883 sentabort = error; 884 lwkt_serialize_exit(port->mpu_serialize); 885 lwkt_abortmsg(msg); 886 lwkt_serialize_enter(port->mpu_serialize); 887 } 888 } else { 889 error = serialize_sleep(won, port->mpu_serialize, 0, 890 "waitmsg", 0); 891 } 892 /* see note at the top on the MSGPORTF_WAITING flag */ 893 } 894 /* 895 * Turn EINTR into ERESTART if the signal indicates. 896 */ 897 if (sentabort && msg->ms_error == EINTR) 898 msg->ms_error = sentabort; 899 if (msg->ms_flags & MSGF_QUEUED) 900 _lwkt_pullmsg(port, msg); 901 } else { 902 if (msg->ms_flags & MSGF_QUEUED) { 903 port = msg->ms_reply_port; 904 905 ASSERT_SERIALIZED(port->mpu_serialize); 906 _lwkt_pullmsg(port, msg); 907 } 908 } 909 return(msg->ms_error); 910 } 911 912 static 913 void * 914 lwkt_serialize_waitport(lwkt_port_t port, int flags) 915 { 916 lwkt_msg_t msg; 917 int error; 918 919 ASSERT_SERIALIZED(port->mpu_serialize); 920 921 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 922 port->mp_flags |= MSGPORTF_WAITING; 923 error = serialize_sleep(port, port->mpu_serialize, flags, 924 "waitport", 0); 925 /* see note at the top on the MSGPORTF_WAITING flag */ 926 if (error) 927 return(NULL); 928 } 929 _lwkt_pullmsg(port, msg); 930 return(msg); 931 } 932 933 static 934 void 935 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 936 { 937 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 938 ASSERT_SERIALIZED(port->mpu_serialize); 939 940 if (msg->ms_flags & MSGF_SYNC) { 941 /* 942 * If a synchronous completion has been requested, just wakeup 943 * the message without bothering to queue it to the target port. 944 */ 945 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 946 wakeup(msg); 947 } else { 948 /* 949 * If an asynchronous completion has been requested the message 950 * must be queued to the reply port. 951 */ 952 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 953 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 954 if (port->mp_flags & MSGPORTF_WAITING) { 955 port->mp_flags &= ~MSGPORTF_WAITING; 956 wakeup(port); 957 } 958 } 959 } 960 961 /************************************************************************ 962 * PANIC AND SPECIAL PORT FUNCTIONS * 963 ************************************************************************/ 964 965 /* 966 * You can point a port's reply vector at this function if you just want 967 * the message marked done, without any queueing or signaling. This is 968 * often used for structure-embedded messages. 969 */ 970 static 971 void 972 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 973 { 974 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 975 } 976 977 static 978 void * 979 lwkt_panic_getport(lwkt_port_t port) 980 { 981 panic("lwkt_getport() illegal on port %p", port); 982 } 983 984 static 985 int 986 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 987 { 988 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 989 } 990 991 static 992 int 993 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 994 { 995 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 996 } 997 998 static 999 void * 1000 lwkt_panic_waitport(lwkt_port_t port, int flags) 1001 { 1002 panic("port %p cannot be waited on", port); 1003 } 1004 1005 static 1006 void 1007 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1008 { 1009 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1010 } 1011 1012