1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 * 37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.45 2008/03/05 13:03:29 sephe Exp $ 38 */ 39 40 #ifdef _KERNEL 41 42 #include <sys/param.h> 43 #include <sys/systm.h> 44 #include <sys/kernel.h> 45 #include <sys/proc.h> 46 #include <sys/rtprio.h> 47 #include <sys/queue.h> 48 #include <sys/sysctl.h> 49 #include <sys/kthread.h> 50 #include <sys/signalvar.h> 51 #include <sys/signal2.h> 52 #include <machine/cpu.h> 53 #include <sys/lock.h> 54 55 #include <vm/vm.h> 56 #include <vm/vm_param.h> 57 #include <vm/vm_kern.h> 58 #include <vm/vm_object.h> 59 #include <vm/vm_page.h> 60 #include <vm/vm_map.h> 61 #include <vm/vm_pager.h> 62 #include <vm/vm_extern.h> 63 #include <vm/vm_zone.h> 64 65 #include <sys/thread2.h> 66 #include <sys/msgport2.h> 67 #include <sys/spinlock2.h> 68 #include <sys/serialize.h> 69 70 #include <machine/stdarg.h> 71 #include <machine/cpufunc.h> 72 #ifdef SMP 73 #include <machine/smp.h> 74 #endif 75 76 #include <sys/malloc.h> 77 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 78 79 #else 80 81 #include <sys/stdint.h> 82 #include <libcaps/thread.h> 83 #include <sys/thread.h> 84 #include <sys/msgport.h> 85 #include <sys/errno.h> 86 #include <libcaps/globaldata.h> 87 #include <machine/cpufunc.h> 88 #include <sys/thread2.h> 89 #include <sys/msgport2.h> 90 #include <string.h> 91 92 #endif /* _KERNEL */ 93 94 95 /************************************************************************ 96 * MESSAGE FUNCTIONS * 97 ************************************************************************/ 98 99 /* 100 * lwkt_sendmsg() 101 * 102 * Request asynchronous completion and call lwkt_beginmsg(). The 103 * target port can opt to execute the message synchronously or 104 * asynchronously and this function will automatically queue the 105 * response if the target executes the message synchronously. 106 * 107 * NOTE: The message is in an indeterminant state until this call 108 * returns. The caller should not mess with it (e.g. try to abort it) 109 * until then. 110 */ 111 void 112 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 113 { 114 int error; 115 116 KKASSERT(msg->ms_reply_port != NULL && 117 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 118 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 119 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 120 lwkt_replymsg(msg, error); 121 } 122 } 123 124 /* 125 * lwkt_domsg() 126 * 127 * Request asynchronous completion and call lwkt_beginmsg(). The 128 * target port can opt to execute the message synchronously or 129 * asynchronously and this function will automatically queue the 130 * response if the target executes the message synchronously. 131 */ 132 int 133 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 134 { 135 int error; 136 137 KKASSERT(msg->ms_reply_port != NULL && 138 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 139 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 140 msg->ms_flags |= MSGF_SYNC; 141 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 142 error = lwkt_waitmsg(msg, flags); 143 } else { 144 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 145 } 146 return(error); 147 } 148 149 /* 150 * lwkt_forwardmsg() 151 * 152 * Forward a message received on one port to another port. 153 */ 154 int 155 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 156 { 157 int error; 158 159 crit_enter(); 160 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 161 if ((error = port->mp_putport(port, msg)) != EASYNC) 162 lwkt_replymsg(msg, error); 163 crit_exit(); 164 return(error); 165 } 166 167 /* 168 * lwkt_abortmsg() 169 * 170 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 171 * The caller must ensure that the message will not be both replied AND 172 * destroyed while the abort is in progress. 173 * 174 * This function issues a callback which might block! 175 */ 176 void 177 lwkt_abortmsg(lwkt_msg_t msg) 178 { 179 /* 180 * A critical section protects us from reply IPIs on this cpu. 181 */ 182 crit_enter(); 183 184 /* 185 * Shortcut the operation if the message has already been returned. 186 * The callback typically constructs a lwkt_msg with the abort request, 187 * issues it synchronously, and waits for completion. The callback 188 * is not required to actually abort the message and the target port, 189 * upon receiving an abort request message generated by the callback 190 * should check whether the original message has already completed or 191 * not. 192 */ 193 if (msg->ms_flags & MSGF_ABORTABLE) { 194 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 195 msg->ms_abortfn(msg); 196 } 197 crit_exit(); 198 } 199 200 /************************************************************************ 201 * PORT INITIALIZATION API * 202 ************************************************************************/ 203 204 static void *lwkt_thread_getport(lwkt_port_t port); 205 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 206 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 207 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 208 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 209 210 static void *lwkt_spin_getport(lwkt_port_t port); 211 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 212 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 213 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 214 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 215 216 static void *lwkt_serialize_getport(lwkt_port_t port); 217 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 218 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 219 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 220 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 221 222 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 223 static void *lwkt_panic_getport(lwkt_port_t port); 224 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 225 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 226 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 227 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 228 229 /* 230 * Core port initialization (internal) 231 */ 232 static __inline 233 void 234 _lwkt_initport(lwkt_port_t port, 235 void *(*gportfn)(lwkt_port_t), 236 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 237 int (*wmsgfn)(lwkt_msg_t, int), 238 void *(*wportfn)(lwkt_port_t, int), 239 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 240 { 241 bzero(port, sizeof(*port)); 242 TAILQ_INIT(&port->mp_msgq); 243 port->mp_getport = gportfn; 244 port->mp_putport = pportfn; 245 port->mp_waitmsg = wmsgfn; 246 port->mp_waitport = wportfn; 247 port->mp_replyport = rportfn; 248 } 249 250 /* 251 * lwkt_initport_thread() 252 * 253 * Initialize a port for use by a particular thread. The port may 254 * only be used by <td>. 255 */ 256 void 257 lwkt_initport_thread(lwkt_port_t port, thread_t td) 258 { 259 _lwkt_initport(port, 260 lwkt_thread_getport, 261 lwkt_thread_putport, 262 lwkt_thread_waitmsg, 263 lwkt_thread_waitport, 264 lwkt_thread_replyport); 265 port->mpu_td = td; 266 } 267 268 /* 269 * lwkt_initport_spin() 270 * 271 * Initialize a port for use with descriptors that might be accessed 272 * via multiple LWPs, processes, or threads. Has somewhat more 273 * overhead then thread ports. 274 */ 275 void 276 lwkt_initport_spin(lwkt_port_t port) 277 { 278 _lwkt_initport(port, 279 lwkt_spin_getport, 280 lwkt_spin_putport, 281 lwkt_spin_waitmsg, 282 lwkt_spin_waitport, 283 lwkt_spin_replyport); 284 spin_init(&port->mpu_spin); 285 } 286 287 void 288 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 289 { 290 _lwkt_initport(port, 291 lwkt_serialize_getport, 292 lwkt_serialize_putport, 293 lwkt_serialize_waitmsg, 294 lwkt_serialize_waitport, 295 lwkt_serialize_replyport); 296 port->mpu_serialize = slz; 297 } 298 299 /* 300 * Similar to the standard initport, this function simply marks the message 301 * as being done and does not attempt to return it to an originating port. 302 */ 303 void 304 lwkt_initport_replyonly_null(lwkt_port_t port) 305 { 306 _lwkt_initport(port, 307 lwkt_panic_getport, 308 lwkt_panic_putport, 309 lwkt_panic_waitmsg, 310 lwkt_panic_waitport, 311 lwkt_null_replyport); 312 } 313 314 /* 315 * Initialize a reply-only port, typically used as a message sink. Such 316 * ports can only be used as a reply port. 317 */ 318 void 319 lwkt_initport_replyonly(lwkt_port_t port, 320 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 321 { 322 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 323 lwkt_panic_waitmsg, lwkt_panic_waitport, 324 rportfn); 325 } 326 327 void 328 lwkt_initport_putonly(lwkt_port_t port, 329 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 330 { 331 _lwkt_initport(port, lwkt_panic_getport, pportfn, 332 lwkt_panic_waitmsg, lwkt_panic_waitport, 333 lwkt_panic_replyport); 334 } 335 336 void 337 lwkt_initport_panic(lwkt_port_t port) 338 { 339 _lwkt_initport(port, 340 lwkt_panic_getport, lwkt_panic_putport, 341 lwkt_panic_waitmsg, lwkt_panic_waitport, 342 lwkt_panic_replyport); 343 } 344 345 /* 346 * lwkt_getport() 347 * 348 * Retrieve the next message from the port's message queue, return NULL 349 * if no messages are pending. The retrieved message will either be a 350 * request or a reply based on the MSGF_REPLY bit. 351 * 352 * The calling thread MUST own the port. 353 */ 354 355 static __inline 356 void 357 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 358 { 359 /* 360 * normal case, remove and return the message. 361 */ 362 TAILQ_REMOVE(&port->mp_msgq, msg, ms_node); 363 msg->ms_flags &= ~MSGF_QUEUED; 364 } 365 366 /************************************************************************ 367 * THREAD PORT BACKEND * 368 ************************************************************************ 369 * 370 * This backend is used when the port a message is retrieved from is owned 371 * by a single thread (the calling thread). Messages are IPId to the 372 * correct cpu before being enqueued to a port. Note that this is fairly 373 * optimal since scheduling would have had to do an IPI anyway if the 374 * message were headed to a different cpu. 375 */ 376 377 #ifdef SMP 378 379 /* 380 * This function completes reply processing for the default case in the 381 * context of the originating cpu. 382 */ 383 static 384 void 385 lwkt_thread_replyport_remote(lwkt_msg_t msg) 386 { 387 lwkt_port_t port = msg->ms_reply_port; 388 389 /* 390 * Chase any thread migration that occurs 391 */ 392 if (port->mpu_td->td_gd != mycpu) { 393 lwkt_send_ipiq(port->mpu_td->td_gd, 394 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 395 return; 396 } 397 398 /* 399 * Cleanup 400 */ 401 #ifdef INVARIANTS 402 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 403 msg->ms_flags &= ~MSGF_INTRANSIT; 404 #endif 405 if (msg->ms_flags & MSGF_SYNC) { 406 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 407 } else { 408 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 409 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 410 } 411 if (port->mp_flags & MSGPORTF_WAITING) 412 lwkt_schedule(port->mpu_td); 413 } 414 415 #endif 416 417 /* 418 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 419 * 420 * Called with the reply port as an argument but in the context of the 421 * original target port. Completion must occur on the target port's 422 * cpu. 423 * 424 * The critical section protects us from IPIs on the this CPU. 425 */ 426 void 427 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 428 { 429 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 430 431 if (msg->ms_flags & MSGF_SYNC) { 432 /* 433 * If a synchronous completion has been requested, just wakeup 434 * the message without bothering to queue it to the target port. 435 * 436 * Assume the target thread is non-preemptive, so no critical 437 * section is required. 438 */ 439 #ifdef SMP 440 if (port->mpu_td->td_gd == mycpu) { 441 #endif 442 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 443 if (port->mp_flags & MSGPORTF_WAITING) 444 lwkt_schedule(port->mpu_td); 445 #ifdef SMP 446 } else { 447 #ifdef INVARIANTS 448 msg->ms_flags |= MSGF_INTRANSIT; 449 #endif 450 msg->ms_flags |= MSGF_REPLY; 451 lwkt_send_ipiq(port->mpu_td->td_gd, 452 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 453 } 454 #endif 455 } else { 456 /* 457 * If an asynchronous completion has been requested the message 458 * must be queued to the reply port. 459 * 460 * A critical section is required to interlock the port queue. 461 */ 462 #ifdef SMP 463 if (port->mpu_td->td_gd == mycpu) { 464 #endif 465 crit_enter(); 466 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 467 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 468 if (port->mp_flags & MSGPORTF_WAITING) 469 lwkt_schedule(port->mpu_td); 470 crit_exit(); 471 #ifdef SMP 472 } else { 473 #ifdef INVARIANTS 474 msg->ms_flags |= MSGF_INTRANSIT; 475 #endif 476 msg->ms_flags |= MSGF_REPLY; 477 lwkt_send_ipiq(port->mpu_td->td_gd, 478 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 479 } 480 #endif 481 } 482 } 483 484 /* 485 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 486 * 487 * Called with the target port as an argument but in the context of the 488 * reply port. This function always implements an asynchronous put to 489 * the target message port, and thus returns EASYNC. 490 * 491 * The message must already have cleared MSGF_DONE and MSGF_REPLY 492 */ 493 494 #ifdef SMP 495 496 static 497 void 498 lwkt_thread_putport_remote(lwkt_msg_t msg) 499 { 500 lwkt_port_t port = msg->ms_target_port; 501 502 /* 503 * Chase any thread migration that occurs 504 */ 505 if (port->mpu_td->td_gd != mycpu) { 506 lwkt_send_ipiq(port->mpu_td->td_gd, 507 (ipifunc1_t)lwkt_thread_putport_remote, msg); 508 return; 509 } 510 511 /* 512 * Cleanup 513 */ 514 #ifdef INVARIANTS 515 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 516 msg->ms_flags &= ~MSGF_INTRANSIT; 517 #endif 518 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 519 msg->ms_flags |= MSGF_QUEUED; 520 if (port->mp_flags & MSGPORTF_WAITING) 521 lwkt_schedule(port->mpu_td); 522 } 523 524 #endif 525 526 static 527 int 528 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 529 { 530 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 531 532 msg->ms_target_port = port; 533 #ifdef SMP 534 if (port->mpu_td->td_gd == mycpu) { 535 #endif 536 crit_enter(); 537 msg->ms_flags |= MSGF_QUEUED; 538 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 539 if (port->mp_flags & MSGPORTF_WAITING) 540 lwkt_schedule(port->mpu_td); 541 crit_exit(); 542 #ifdef SMP 543 } else { 544 #ifdef INVARIANTS 545 msg->ms_flags |= MSGF_INTRANSIT; 546 #endif 547 lwkt_send_ipiq(port->mpu_td->td_gd, 548 (ipifunc1_t)lwkt_thread_putport_remote, msg); 549 } 550 #endif 551 return (EASYNC); 552 } 553 554 /* 555 * lwkt_thread_getport() 556 * 557 * Retrieve the next message from the port or NULL if no messages 558 * are ready. 559 */ 560 void * 561 lwkt_thread_getport(lwkt_port_t port) 562 { 563 lwkt_msg_t msg; 564 565 KKASSERT(port->mpu_td == curthread); 566 567 crit_enter_quick(port->mpu_td); 568 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 569 _lwkt_pullmsg(port, msg); 570 crit_exit_quick(port->mpu_td); 571 return(msg); 572 } 573 574 /* 575 * lwkt_thread_waitmsg() 576 * 577 * Wait for a particular message to be replied. We must be the only 578 * thread waiting on the message. The port must be owned by the 579 * caller. 580 */ 581 int 582 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 583 { 584 if ((msg->ms_flags & MSGF_DONE) == 0) { 585 /* 586 * If the done bit was not set we have to block until it is. 587 */ 588 lwkt_port_t port = msg->ms_reply_port; 589 thread_t td = curthread; 590 int sentabort; 591 592 KKASSERT(port->mpu_td == td); 593 crit_enter_quick(td); 594 sentabort = 0; 595 596 while ((msg->ms_flags & MSGF_DONE) == 0) { 597 port->mp_flags |= MSGPORTF_WAITING; 598 if (sentabort == 0) { 599 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 600 lwkt_abortmsg(msg); 601 } 602 } else { 603 lwkt_sleep("waitabt", 0); 604 } 605 port->mp_flags &= ~MSGPORTF_WAITING; 606 } 607 if (msg->ms_flags & MSGF_QUEUED) 608 _lwkt_pullmsg(port, msg); 609 crit_exit_quick(td); 610 } else { 611 /* 612 * If the done bit was set we only have to mess around with the 613 * message if it is queued on the reply port. 614 */ 615 if (msg->ms_flags & MSGF_QUEUED) { 616 lwkt_port_t port = msg->ms_reply_port; 617 thread_t td = curthread; 618 619 KKASSERT(port->mpu_td == td); 620 crit_enter_quick(td); 621 _lwkt_pullmsg(port, msg); 622 crit_exit_quick(td); 623 } 624 } 625 return(msg->ms_error); 626 } 627 628 void * 629 lwkt_thread_waitport(lwkt_port_t port, int flags) 630 { 631 thread_t td = curthread; 632 lwkt_msg_t msg; 633 int error; 634 635 KKASSERT(port->mpu_td == td); 636 crit_enter_quick(td); 637 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 638 port->mp_flags |= MSGPORTF_WAITING; 639 error = lwkt_sleep("waitport", flags); 640 port->mp_flags &= ~MSGPORTF_WAITING; 641 if (error) 642 goto done; 643 } 644 _lwkt_pullmsg(port, msg); 645 done: 646 crit_exit_quick(td); 647 return(msg); 648 } 649 650 /************************************************************************ 651 * SPIN PORT BACKEND * 652 ************************************************************************ 653 * 654 * This backend uses spinlocks instead of making assumptions about which 655 * thread is accessing the port. It must be used when a port is not owned 656 * by a particular thread. This is less optimal then thread ports but 657 * you don't have a choice if there are multiple threads accessing the port. 658 * 659 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 660 * on the message port, it is the responsibility of the code doing the 661 * wakeup to clear this flag rather then the blocked threads. Some 662 * superfluous wakeups may occur, which is ok. 663 * 664 * XXX synchronous message wakeups are not current optimized. 665 */ 666 667 static 668 void * 669 lwkt_spin_getport(lwkt_port_t port) 670 { 671 lwkt_msg_t msg; 672 673 spin_lock_wr(&port->mpu_spin); 674 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 675 _lwkt_pullmsg(port, msg); 676 spin_unlock_wr(&port->mpu_spin); 677 return(msg); 678 } 679 680 static 681 int 682 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 683 { 684 int dowakeup; 685 686 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 687 688 msg->ms_target_port = port; 689 spin_lock_wr(&port->mpu_spin); 690 msg->ms_flags |= MSGF_QUEUED; 691 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 692 dowakeup = 0; 693 if (port->mp_flags & MSGPORTF_WAITING) { 694 port->mp_flags &= ~MSGPORTF_WAITING; 695 dowakeup = 1; 696 } 697 spin_unlock_wr(&port->mpu_spin); 698 if (dowakeup) 699 wakeup(port); 700 return (EASYNC); 701 } 702 703 static 704 int 705 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 706 { 707 lwkt_port_t port; 708 int sentabort; 709 int error; 710 711 if ((msg->ms_flags & MSGF_DONE) == 0) { 712 port = msg->ms_reply_port; 713 sentabort = 0; 714 spin_lock_wr(&port->mpu_spin); 715 while ((msg->ms_flags & MSGF_DONE) == 0) { 716 void *won; 717 718 /* 719 * If message was sent synchronously from the beginning 720 * the wakeup will be on the message structure, else it 721 * will be on the port structure. 722 */ 723 if (msg->ms_flags & MSGF_SYNC) { 724 won = msg; 725 } else { 726 won = port; 727 port->mp_flags |= MSGPORTF_WAITING; 728 } 729 730 /* 731 * Only messages which support abort can be interrupted. 732 * We must still wait for message completion regardless. 733 */ 734 if ((flags & PCATCH) && sentabort == 0) { 735 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 736 if (error) { 737 sentabort = error; 738 spin_unlock_wr(&port->mpu_spin); 739 lwkt_abortmsg(msg); 740 spin_lock_wr(&port->mpu_spin); 741 } 742 } else { 743 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0); 744 } 745 /* see note at the top on the MSGPORTF_WAITING flag */ 746 } 747 /* 748 * Turn EINTR into ERESTART if the signal indicates. 749 */ 750 if (sentabort && msg->ms_error == EINTR) 751 msg->ms_error = sentabort; 752 if (msg->ms_flags & MSGF_QUEUED) 753 _lwkt_pullmsg(port, msg); 754 spin_unlock_wr(&port->mpu_spin); 755 } else { 756 if (msg->ms_flags & MSGF_QUEUED) { 757 port = msg->ms_reply_port; 758 spin_lock_wr(&port->mpu_spin); 759 _lwkt_pullmsg(port, msg); 760 spin_unlock_wr(&port->mpu_spin); 761 } 762 } 763 return(msg->ms_error); 764 } 765 766 static 767 void * 768 lwkt_spin_waitport(lwkt_port_t port, int flags) 769 { 770 lwkt_msg_t msg; 771 int error; 772 773 spin_lock_wr(&port->mpu_spin); 774 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 775 port->mp_flags |= MSGPORTF_WAITING; 776 error = msleep(port, &port->mpu_spin, flags, "waitport", 0); 777 /* see note at the top on the MSGPORTF_WAITING flag */ 778 if (error) { 779 spin_unlock_wr(&port->mpu_spin); 780 return(NULL); 781 } 782 } 783 _lwkt_pullmsg(port, msg); 784 spin_unlock_wr(&port->mpu_spin); 785 return(msg); 786 } 787 788 static 789 void 790 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 791 { 792 int dowakeup; 793 794 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 795 796 if (msg->ms_flags & MSGF_SYNC) { 797 /* 798 * If a synchronous completion has been requested, just wakeup 799 * the message without bothering to queue it to the target port. 800 */ 801 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 802 wakeup(msg); 803 } else { 804 /* 805 * If an asynchronous completion has been requested the message 806 * must be queued to the reply port. 807 */ 808 spin_lock_wr(&port->mpu_spin); 809 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 810 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 811 dowakeup = 0; 812 if (port->mp_flags & MSGPORTF_WAITING) { 813 port->mp_flags &= ~MSGPORTF_WAITING; 814 dowakeup = 1; 815 } 816 spin_unlock_wr(&port->mpu_spin); 817 if (dowakeup) 818 wakeup(port); 819 } 820 } 821 822 /************************************************************************ 823 * SERIALIZER PORT BACKEND * 824 ************************************************************************ 825 * 826 * This backend uses serializer to protect port accessing. Callers are 827 * assumed to have serializer held. This kind of port is usually created 828 * by network device driver along with _one_ lwkt thread to pipeline 829 * operations which may temporarily release serializer. 830 * 831 * Implementation is based on SPIN PORT BACKEND. 832 */ 833 834 static 835 void * 836 lwkt_serialize_getport(lwkt_port_t port) 837 { 838 lwkt_msg_t msg; 839 840 ASSERT_SERIALIZED(port->mpu_serialize); 841 842 if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) 843 _lwkt_pullmsg(port, msg); 844 return(msg); 845 } 846 847 static 848 int 849 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 850 { 851 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 852 ASSERT_SERIALIZED(port->mpu_serialize); 853 854 msg->ms_target_port = port; 855 msg->ms_flags |= MSGF_QUEUED; 856 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 857 if (port->mp_flags & MSGPORTF_WAITING) { 858 port->mp_flags &= ~MSGPORTF_WAITING; 859 wakeup(port); 860 } 861 return (EASYNC); 862 } 863 864 static 865 int 866 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 867 { 868 lwkt_port_t port; 869 int sentabort; 870 int error; 871 872 if ((msg->ms_flags & MSGF_DONE) == 0) { 873 port = msg->ms_reply_port; 874 875 ASSERT_SERIALIZED(port->mpu_serialize); 876 877 sentabort = 0; 878 while ((msg->ms_flags & MSGF_DONE) == 0) { 879 void *won; 880 881 /* 882 * If message was sent synchronously from the beginning 883 * the wakeup will be on the message structure, else it 884 * will be on the port structure. 885 */ 886 if (msg->ms_flags & MSGF_SYNC) { 887 won = msg; 888 } else { 889 won = port; 890 port->mp_flags |= MSGPORTF_WAITING; 891 } 892 893 /* 894 * Only messages which support abort can be interrupted. 895 * We must still wait for message completion regardless. 896 */ 897 if ((flags & PCATCH) && sentabort == 0) { 898 error = serialize_sleep(won, port->mpu_serialize, PCATCH, 899 "waitmsg", 0); 900 if (error) { 901 sentabort = error; 902 lwkt_serialize_exit(port->mpu_serialize); 903 lwkt_abortmsg(msg); 904 lwkt_serialize_enter(port->mpu_serialize); 905 } 906 } else { 907 error = serialize_sleep(won, port->mpu_serialize, 0, 908 "waitmsg", 0); 909 } 910 /* see note at the top on the MSGPORTF_WAITING flag */ 911 } 912 /* 913 * Turn EINTR into ERESTART if the signal indicates. 914 */ 915 if (sentabort && msg->ms_error == EINTR) 916 msg->ms_error = sentabort; 917 if (msg->ms_flags & MSGF_QUEUED) 918 _lwkt_pullmsg(port, msg); 919 } else { 920 if (msg->ms_flags & MSGF_QUEUED) { 921 port = msg->ms_reply_port; 922 923 ASSERT_SERIALIZED(port->mpu_serialize); 924 _lwkt_pullmsg(port, msg); 925 } 926 } 927 return(msg->ms_error); 928 } 929 930 static 931 void * 932 lwkt_serialize_waitport(lwkt_port_t port, int flags) 933 { 934 lwkt_msg_t msg; 935 int error; 936 937 ASSERT_SERIALIZED(port->mpu_serialize); 938 939 while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { 940 port->mp_flags |= MSGPORTF_WAITING; 941 error = serialize_sleep(port, port->mpu_serialize, flags, 942 "waitport", 0); 943 /* see note at the top on the MSGPORTF_WAITING flag */ 944 if (error) 945 return(NULL); 946 } 947 _lwkt_pullmsg(port, msg); 948 return(msg); 949 } 950 951 static 952 void 953 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 954 { 955 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 956 ASSERT_SERIALIZED(port->mpu_serialize); 957 958 if (msg->ms_flags & MSGF_SYNC) { 959 /* 960 * If a synchronous completion has been requested, just wakeup 961 * the message without bothering to queue it to the target port. 962 */ 963 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 964 wakeup(msg); 965 } else { 966 /* 967 * If an asynchronous completion has been requested the message 968 * must be queued to the reply port. 969 */ 970 TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); 971 msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; 972 if (port->mp_flags & MSGPORTF_WAITING) { 973 port->mp_flags &= ~MSGPORTF_WAITING; 974 wakeup(port); 975 } 976 } 977 } 978 979 /************************************************************************ 980 * PANIC AND SPECIAL PORT FUNCTIONS * 981 ************************************************************************/ 982 983 /* 984 * You can point a port's reply vector at this function if you just want 985 * the message marked done, without any queueing or signaling. This is 986 * often used for structure-embedded messages. 987 */ 988 static 989 void 990 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 991 { 992 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 993 } 994 995 static 996 void * 997 lwkt_panic_getport(lwkt_port_t port) 998 { 999 panic("lwkt_getport() illegal on port %p", port); 1000 } 1001 1002 static 1003 int 1004 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1005 { 1006 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1007 } 1008 1009 static 1010 int 1011 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1012 { 1013 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1014 } 1015 1016 static 1017 void * 1018 lwkt_panic_waitport(lwkt_port_t port, int flags) 1019 { 1020 panic("port %p cannot be waited on", port); 1021 } 1022 1023 static 1024 void 1025 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1026 { 1027 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1028 } 1029 1030