1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 * 37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $ 38 */ 39 40 #include <sys/param.h> 41 #include <sys/systm.h> 42 #include <sys/kernel.h> 43 #include <sys/proc.h> 44 #include <sys/rtprio.h> 45 #include <sys/queue.h> 46 #include <sys/sysctl.h> 47 #include <sys/kthread.h> 48 #include <sys/signalvar.h> 49 #include <sys/signal2.h> 50 #include <machine/cpu.h> 51 #include <sys/lock.h> 52 53 #include <vm/vm.h> 54 #include <vm/vm_param.h> 55 #include <vm/vm_kern.h> 56 #include <vm/vm_object.h> 57 #include <vm/vm_page.h> 58 #include <vm/vm_map.h> 59 #include <vm/vm_pager.h> 60 #include <vm/vm_extern.h> 61 #include <vm/vm_zone.h> 62 63 #include <sys/thread2.h> 64 #include <sys/msgport2.h> 65 #include <sys/spinlock2.h> 66 #include <sys/serialize.h> 67 68 #include <machine/stdarg.h> 69 #include <machine/cpufunc.h> 70 #ifdef SMP 71 #include <machine/smp.h> 72 #endif 73 74 #include <sys/malloc.h> 75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 76 77 /************************************************************************ 78 * MESSAGE FUNCTIONS * 79 ************************************************************************/ 80 81 /* 82 * lwkt_sendmsg() 83 * 84 * Request asynchronous completion and call lwkt_beginmsg(). The 85 * target port can opt to execute the message synchronously or 86 * asynchronously and this function will automatically queue the 87 * response if the target executes the message synchronously. 88 * 89 * NOTE: The message is in an indeterminant state until this call 90 * returns. The caller should not mess with it (e.g. try to abort it) 91 * until then. 92 */ 93 void 94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 95 { 96 int error; 97 98 KKASSERT(msg->ms_reply_port != NULL && 99 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 100 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 101 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 102 lwkt_replymsg(msg, error); 103 } 104 } 105 106 /* 107 * lwkt_domsg() 108 * 109 * Request asynchronous completion and call lwkt_beginmsg(). The 110 * target port can opt to execute the message synchronously or 111 * asynchronously and this function will automatically queue the 112 * response if the target executes the message synchronously. 113 */ 114 int 115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 116 { 117 int error; 118 119 KKASSERT(msg->ms_reply_port != NULL && 120 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 121 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 122 msg->ms_flags |= MSGF_SYNC; 123 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 124 error = lwkt_waitmsg(msg, flags); 125 } else { 126 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 127 } 128 return(error); 129 } 130 131 /* 132 * lwkt_forwardmsg() 133 * 134 * Forward a message received on one port to another port. 135 */ 136 int 137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 138 { 139 int error; 140 141 crit_enter(); 142 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 143 if ((error = port->mp_putport(port, msg)) != EASYNC) 144 lwkt_replymsg(msg, error); 145 crit_exit(); 146 return(error); 147 } 148 149 /* 150 * lwkt_abortmsg() 151 * 152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 153 * The caller must ensure that the message will not be both replied AND 154 * destroyed while the abort is in progress. 155 * 156 * This function issues a callback which might block! 157 */ 158 void 159 lwkt_abortmsg(lwkt_msg_t msg) 160 { 161 /* 162 * A critical section protects us from reply IPIs on this cpu. 163 */ 164 crit_enter(); 165 166 /* 167 * Shortcut the operation if the message has already been returned. 168 * The callback typically constructs a lwkt_msg with the abort request, 169 * issues it synchronously, and waits for completion. The callback 170 * is not required to actually abort the message and the target port, 171 * upon receiving an abort request message generated by the callback 172 * should check whether the original message has already completed or 173 * not. 174 */ 175 if (msg->ms_flags & MSGF_ABORTABLE) { 176 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 177 msg->ms_abortfn(msg); 178 } 179 crit_exit(); 180 } 181 182 /************************************************************************ 183 * PORT INITIALIZATION API * 184 ************************************************************************/ 185 186 static void *lwkt_thread_getport(lwkt_port_t port); 187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 192 193 static void *lwkt_spin_getport(lwkt_port_t port); 194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 198 199 static void *lwkt_serialize_getport(lwkt_port_t port); 200 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 202 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 203 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 204 205 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 206 static void *lwkt_panic_getport(lwkt_port_t port); 207 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 208 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 209 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 210 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 211 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 212 213 /* 214 * Core port initialization (internal) 215 */ 216 static __inline 217 void 218 _lwkt_initport(lwkt_port_t port, 219 void *(*gportfn)(lwkt_port_t), 220 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 221 int (*wmsgfn)(lwkt_msg_t, int), 222 void *(*wportfn)(lwkt_port_t, int), 223 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 224 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) 225 { 226 bzero(port, sizeof(*port)); 227 TAILQ_INIT(&port->mp_msgq); 228 TAILQ_INIT(&port->mp_msgq_prio); 229 port->mp_getport = gportfn; 230 port->mp_putport = pportfn; 231 port->mp_waitmsg = wmsgfn; 232 port->mp_waitport = wportfn; 233 port->mp_replyport = rportfn; 234 port->mp_dropmsg = dmsgfn; 235 } 236 237 /* 238 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 239 * we tell the scheduler not to reschedule if td is at a higher priority. 240 * 241 * This routine is called even if the thread is already scheduled. 242 */ 243 static __inline 244 void 245 _lwkt_schedule_msg(thread_t td, int flags) 246 { 247 lwkt_schedule(td); 248 } 249 250 /* 251 * lwkt_initport_thread() 252 * 253 * Initialize a port for use by a particular thread. The port may 254 * only be used by <td>. 255 */ 256 void 257 lwkt_initport_thread(lwkt_port_t port, thread_t td) 258 { 259 _lwkt_initport(port, 260 lwkt_thread_getport, 261 lwkt_thread_putport, 262 lwkt_thread_waitmsg, 263 lwkt_thread_waitport, 264 lwkt_thread_replyport, 265 lwkt_thread_dropmsg); 266 port->mpu_td = td; 267 } 268 269 /* 270 * lwkt_initport_spin() 271 * 272 * Initialize a port for use with descriptors that might be accessed 273 * via multiple LWPs, processes, or threads. Has somewhat more 274 * overhead then thread ports. 275 */ 276 void 277 lwkt_initport_spin(lwkt_port_t port) 278 { 279 _lwkt_initport(port, 280 lwkt_spin_getport, 281 lwkt_spin_putport, 282 lwkt_spin_waitmsg, 283 lwkt_spin_waitport, 284 lwkt_spin_replyport, 285 lwkt_panic_dropmsg); 286 spin_init(&port->mpu_spin); 287 } 288 289 /* 290 * lwkt_initport_serialize() 291 * 292 * Initialize a port for use with descriptors that might be accessed 293 * via multiple LWPs, processes, or threads. Callers are assumed to 294 * have held the serializer (slz). 295 */ 296 void 297 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 298 { 299 _lwkt_initport(port, 300 lwkt_serialize_getport, 301 lwkt_serialize_putport, 302 lwkt_serialize_waitmsg, 303 lwkt_serialize_waitport, 304 lwkt_serialize_replyport, 305 lwkt_panic_dropmsg); 306 port->mpu_serialize = slz; 307 } 308 309 /* 310 * Similar to the standard initport, this function simply marks the message 311 * as being done and does not attempt to return it to an originating port. 312 */ 313 void 314 lwkt_initport_replyonly_null(lwkt_port_t port) 315 { 316 _lwkt_initport(port, 317 lwkt_panic_getport, 318 lwkt_panic_putport, 319 lwkt_panic_waitmsg, 320 lwkt_panic_waitport, 321 lwkt_null_replyport, 322 lwkt_panic_dropmsg); 323 } 324 325 /* 326 * Initialize a reply-only port, typically used as a message sink. Such 327 * ports can only be used as a reply port. 328 */ 329 void 330 lwkt_initport_replyonly(lwkt_port_t port, 331 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 332 { 333 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 334 lwkt_panic_waitmsg, lwkt_panic_waitport, 335 rportfn, lwkt_panic_dropmsg); 336 } 337 338 void 339 lwkt_initport_putonly(lwkt_port_t port, 340 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 341 { 342 _lwkt_initport(port, lwkt_panic_getport, pportfn, 343 lwkt_panic_waitmsg, lwkt_panic_waitport, 344 lwkt_panic_replyport, lwkt_panic_dropmsg); 345 } 346 347 void 348 lwkt_initport_panic(lwkt_port_t port) 349 { 350 _lwkt_initport(port, 351 lwkt_panic_getport, lwkt_panic_putport, 352 lwkt_panic_waitmsg, lwkt_panic_waitport, 353 lwkt_panic_replyport, lwkt_panic_dropmsg); 354 } 355 356 static __inline 357 void 358 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 359 { 360 lwkt_msg_queue *queue; 361 362 /* 363 * normal case, remove and return the message. 364 */ 365 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 366 queue = &port->mp_msgq_prio; 367 else 368 queue = &port->mp_msgq; 369 TAILQ_REMOVE(queue, msg, ms_node); 370 371 /* 372 * atomic op needed for spin ports 373 */ 374 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 375 } 376 377 static __inline 378 void 379 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 380 { 381 lwkt_msg_queue *queue; 382 383 /* 384 * atomic op needed for spin ports 385 */ 386 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 387 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 388 queue = &port->mp_msgq_prio; 389 else 390 queue = &port->mp_msgq; 391 TAILQ_INSERT_TAIL(queue, msg, ms_node); 392 } 393 394 static __inline 395 lwkt_msg_t 396 _lwkt_pollmsg(lwkt_port_t port) 397 { 398 lwkt_msg_t msg; 399 400 msg = TAILQ_FIRST(&port->mp_msgq_prio); 401 if (__predict_false(msg != NULL)) 402 return msg; 403 404 /* 405 * Priority queue has no message, fallback to non-priority queue. 406 */ 407 return TAILQ_FIRST(&port->mp_msgq); 408 } 409 410 static __inline 411 void 412 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 413 { 414 /* 415 * atomic op needed for spin ports 416 */ 417 _lwkt_pushmsg(port, msg); 418 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 419 } 420 421 /************************************************************************ 422 * THREAD PORT BACKEND * 423 ************************************************************************ 424 * 425 * This backend is used when the port a message is retrieved from is owned 426 * by a single thread (the calling thread). Messages are IPId to the 427 * correct cpu before being enqueued to a port. Note that this is fairly 428 * optimal since scheduling would have had to do an IPI anyway if the 429 * message were headed to a different cpu. 430 */ 431 432 #ifdef SMP 433 434 /* 435 * This function completes reply processing for the default case in the 436 * context of the originating cpu. 437 */ 438 static 439 void 440 lwkt_thread_replyport_remote(lwkt_msg_t msg) 441 { 442 lwkt_port_t port = msg->ms_reply_port; 443 int flags; 444 445 /* 446 * Chase any thread migration that occurs 447 */ 448 if (port->mpu_td->td_gd != mycpu) { 449 lwkt_send_ipiq(port->mpu_td->td_gd, 450 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 451 return; 452 } 453 454 /* 455 * Cleanup 456 */ 457 #ifdef INVARIANTS 458 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 459 msg->ms_flags &= ~MSGF_INTRANSIT; 460 #endif 461 flags = msg->ms_flags; 462 if (msg->ms_flags & MSGF_SYNC) { 463 cpu_sfence(); 464 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 465 } else { 466 _lwkt_enqueue_reply(port, msg); 467 } 468 if (port->mp_flags & MSGPORTF_WAITING) 469 _lwkt_schedule_msg(port->mpu_td, flags); 470 } 471 472 #endif 473 474 /* 475 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 476 * 477 * Called with the reply port as an argument but in the context of the 478 * original target port. Completion must occur on the target port's 479 * cpu. 480 * 481 * The critical section protects us from IPIs on the this CPU. 482 */ 483 static 484 void 485 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 486 { 487 int flags; 488 489 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 490 491 if (msg->ms_flags & MSGF_SYNC) { 492 /* 493 * If a synchronous completion has been requested, just wakeup 494 * the message without bothering to queue it to the target port. 495 * 496 * Assume the target thread is non-preemptive, so no critical 497 * section is required. 498 */ 499 #ifdef SMP 500 if (port->mpu_td->td_gd == mycpu) { 501 #endif 502 flags = msg->ms_flags; 503 cpu_sfence(); 504 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 505 if (port->mp_flags & MSGPORTF_WAITING) 506 _lwkt_schedule_msg(port->mpu_td, flags); 507 #ifdef SMP 508 } else { 509 #ifdef INVARIANTS 510 msg->ms_flags |= MSGF_INTRANSIT; 511 #endif 512 msg->ms_flags |= MSGF_REPLY; 513 lwkt_send_ipiq(port->mpu_td->td_gd, 514 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 515 } 516 #endif 517 } else { 518 /* 519 * If an asynchronous completion has been requested the message 520 * must be queued to the reply port. 521 * 522 * A critical section is required to interlock the port queue. 523 */ 524 #ifdef SMP 525 if (port->mpu_td->td_gd == mycpu) { 526 #endif 527 crit_enter(); 528 _lwkt_enqueue_reply(port, msg); 529 if (port->mp_flags & MSGPORTF_WAITING) 530 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 531 crit_exit(); 532 #ifdef SMP 533 } else { 534 #ifdef INVARIANTS 535 msg->ms_flags |= MSGF_INTRANSIT; 536 #endif 537 msg->ms_flags |= MSGF_REPLY; 538 lwkt_send_ipiq(port->mpu_td->td_gd, 539 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 540 } 541 #endif 542 } 543 } 544 545 /* 546 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 547 * 548 * This function could _only_ be used when caller is in the same thread 549 * as the message's target port owner thread. 550 */ 551 static void 552 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 553 { 554 KASSERT(port->mpu_td == curthread, 555 ("message could only be dropped in the same thread " 556 "as the message target port thread\n")); 557 crit_enter_quick(port->mpu_td); 558 _lwkt_pullmsg(port, msg); 559 msg->ms_flags |= MSGF_DONE; 560 crit_exit_quick(port->mpu_td); 561 } 562 563 /* 564 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 565 * 566 * Called with the target port as an argument but in the context of the 567 * reply port. This function always implements an asynchronous put to 568 * the target message port, and thus returns EASYNC. 569 * 570 * The message must already have cleared MSGF_DONE and MSGF_REPLY 571 */ 572 573 #ifdef SMP 574 575 static 576 void 577 lwkt_thread_putport_remote(lwkt_msg_t msg) 578 { 579 lwkt_port_t port = msg->ms_target_port; 580 581 /* 582 * Chase any thread migration that occurs 583 */ 584 if (port->mpu_td->td_gd != mycpu) { 585 lwkt_send_ipiq(port->mpu_td->td_gd, 586 (ipifunc1_t)lwkt_thread_putport_remote, msg); 587 return; 588 } 589 590 /* 591 * Cleanup 592 */ 593 #ifdef INVARIANTS 594 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 595 msg->ms_flags &= ~MSGF_INTRANSIT; 596 #endif 597 _lwkt_pushmsg(port, msg); 598 if (port->mp_flags & MSGPORTF_WAITING) 599 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 600 } 601 602 #endif 603 604 static 605 int 606 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 607 { 608 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 609 610 msg->ms_target_port = port; 611 #ifdef SMP 612 if (port->mpu_td->td_gd == mycpu) { 613 #endif 614 crit_enter(); 615 _lwkt_pushmsg(port, msg); 616 if (port->mp_flags & MSGPORTF_WAITING) 617 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 618 crit_exit(); 619 #ifdef SMP 620 } else { 621 #ifdef INVARIANTS 622 msg->ms_flags |= MSGF_INTRANSIT; 623 #endif 624 lwkt_send_ipiq(port->mpu_td->td_gd, 625 (ipifunc1_t)lwkt_thread_putport_remote, msg); 626 } 627 #endif 628 return (EASYNC); 629 } 630 631 /* 632 * lwkt_thread_getport() 633 * 634 * Retrieve the next message from the port or NULL if no messages 635 * are ready. 636 */ 637 static 638 void * 639 lwkt_thread_getport(lwkt_port_t port) 640 { 641 lwkt_msg_t msg; 642 643 KKASSERT(port->mpu_td == curthread); 644 645 crit_enter_quick(port->mpu_td); 646 if ((msg = _lwkt_pollmsg(port)) != NULL) 647 _lwkt_pullmsg(port, msg); 648 crit_exit_quick(port->mpu_td); 649 return(msg); 650 } 651 652 /* 653 * lwkt_thread_waitmsg() 654 * 655 * Wait for a particular message to be replied. We must be the only 656 * thread waiting on the message. The port must be owned by the 657 * caller. 658 */ 659 static 660 int 661 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 662 { 663 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 664 ("can't wait dropable message\n")); 665 666 if ((msg->ms_flags & MSGF_DONE) == 0) { 667 /* 668 * If the done bit was not set we have to block until it is. 669 */ 670 lwkt_port_t port = msg->ms_reply_port; 671 thread_t td = curthread; 672 int sentabort; 673 674 KKASSERT(port->mpu_td == td); 675 crit_enter_quick(td); 676 sentabort = 0; 677 678 while ((msg->ms_flags & MSGF_DONE) == 0) { 679 port->mp_flags |= MSGPORTF_WAITING; 680 if (sentabort == 0) { 681 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 682 lwkt_abortmsg(msg); 683 } 684 } else { 685 lwkt_sleep("waitabt", 0); 686 } 687 port->mp_flags &= ~MSGPORTF_WAITING; 688 } 689 if (msg->ms_flags & MSGF_QUEUED) 690 _lwkt_pullmsg(port, msg); 691 crit_exit_quick(td); 692 } else { 693 /* 694 * If the done bit was set we only have to mess around with the 695 * message if it is queued on the reply port. 696 */ 697 if (msg->ms_flags & MSGF_QUEUED) { 698 lwkt_port_t port = msg->ms_reply_port; 699 thread_t td = curthread; 700 701 KKASSERT(port->mpu_td == td); 702 crit_enter_quick(td); 703 _lwkt_pullmsg(port, msg); 704 crit_exit_quick(td); 705 } 706 } 707 return(msg->ms_error); 708 } 709 710 static 711 void * 712 lwkt_thread_waitport(lwkt_port_t port, int flags) 713 { 714 thread_t td = curthread; 715 lwkt_msg_t msg; 716 int error; 717 718 KKASSERT(port->mpu_td == td); 719 crit_enter_quick(td); 720 while ((msg = _lwkt_pollmsg(port)) == NULL) { 721 port->mp_flags |= MSGPORTF_WAITING; 722 error = lwkt_sleep("waitport", flags); 723 port->mp_flags &= ~MSGPORTF_WAITING; 724 if (error) 725 goto done; 726 } 727 _lwkt_pullmsg(port, msg); 728 done: 729 crit_exit_quick(td); 730 return(msg); 731 } 732 733 /************************************************************************ 734 * SPIN PORT BACKEND * 735 ************************************************************************ 736 * 737 * This backend uses spinlocks instead of making assumptions about which 738 * thread is accessing the port. It must be used when a port is not owned 739 * by a particular thread. This is less optimal then thread ports but 740 * you don't have a choice if there are multiple threads accessing the port. 741 * 742 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 743 * on the message port, it is the responsibility of the code doing the 744 * wakeup to clear this flag rather then the blocked threads. Some 745 * superfluous wakeups may occur, which is ok. 746 * 747 * XXX synchronous message wakeups are not current optimized. 748 */ 749 750 static 751 void * 752 lwkt_spin_getport(lwkt_port_t port) 753 { 754 lwkt_msg_t msg; 755 756 spin_lock(&port->mpu_spin); 757 if ((msg = _lwkt_pollmsg(port)) != NULL) 758 _lwkt_pullmsg(port, msg); 759 spin_unlock(&port->mpu_spin); 760 return(msg); 761 } 762 763 static 764 int 765 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 766 { 767 int dowakeup; 768 769 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 770 771 msg->ms_target_port = port; 772 spin_lock(&port->mpu_spin); 773 _lwkt_pushmsg(port, msg); 774 dowakeup = 0; 775 if (port->mp_flags & MSGPORTF_WAITING) { 776 port->mp_flags &= ~MSGPORTF_WAITING; 777 dowakeup = 1; 778 } 779 spin_unlock(&port->mpu_spin); 780 if (dowakeup) 781 wakeup(port); 782 return (EASYNC); 783 } 784 785 static 786 int 787 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 788 { 789 lwkt_port_t port; 790 int sentabort; 791 int error; 792 793 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 794 ("can't wait dropable message\n")); 795 796 if ((msg->ms_flags & MSGF_DONE) == 0) { 797 port = msg->ms_reply_port; 798 sentabort = 0; 799 spin_lock(&port->mpu_spin); 800 while ((msg->ms_flags & MSGF_DONE) == 0) { 801 void *won; 802 803 /* 804 * If message was sent synchronously from the beginning 805 * the wakeup will be on the message structure, else it 806 * will be on the port structure. 807 */ 808 if (msg->ms_flags & MSGF_SYNC) { 809 won = msg; 810 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 811 } else { 812 won = port; 813 port->mp_flags |= MSGPORTF_WAITING; 814 } 815 816 /* 817 * Only messages which support abort can be interrupted. 818 * We must still wait for message completion regardless. 819 */ 820 if ((flags & PCATCH) && sentabort == 0) { 821 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 822 if (error) { 823 sentabort = error; 824 spin_unlock(&port->mpu_spin); 825 lwkt_abortmsg(msg); 826 spin_lock(&port->mpu_spin); 827 } 828 } else { 829 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 830 } 831 /* see note at the top on the MSGPORTF_WAITING flag */ 832 } 833 /* 834 * Turn EINTR into ERESTART if the signal indicates. 835 */ 836 if (sentabort && msg->ms_error == EINTR) 837 msg->ms_error = sentabort; 838 if (msg->ms_flags & MSGF_QUEUED) 839 _lwkt_pullmsg(port, msg); 840 spin_unlock(&port->mpu_spin); 841 } else { 842 if (msg->ms_flags & MSGF_QUEUED) { 843 port = msg->ms_reply_port; 844 spin_lock(&port->mpu_spin); 845 _lwkt_pullmsg(port, msg); 846 spin_unlock(&port->mpu_spin); 847 } 848 } 849 return(msg->ms_error); 850 } 851 852 static 853 void * 854 lwkt_spin_waitport(lwkt_port_t port, int flags) 855 { 856 lwkt_msg_t msg; 857 int error; 858 859 spin_lock(&port->mpu_spin); 860 while ((msg = _lwkt_pollmsg(port)) == NULL) { 861 port->mp_flags |= MSGPORTF_WAITING; 862 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 863 /* see note at the top on the MSGPORTF_WAITING flag */ 864 if (error) { 865 spin_unlock(&port->mpu_spin); 866 return(NULL); 867 } 868 } 869 _lwkt_pullmsg(port, msg); 870 spin_unlock(&port->mpu_spin); 871 return(msg); 872 } 873 874 static 875 void 876 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 877 { 878 int dowakeup; 879 880 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 881 882 if (msg->ms_flags & MSGF_SYNC) { 883 /* 884 * If a synchronous completion has been requested, just wakeup 885 * the message without bothering to queue it to the target port. 886 */ 887 spin_lock(&port->mpu_spin); 888 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 889 dowakeup = 0; 890 if (msg->ms_flags & MSGF_WAITING) { 891 atomic_clear_int(&msg->ms_flags, MSGF_WAITING); 892 dowakeup = 1; 893 } 894 spin_unlock(&port->mpu_spin); 895 if (dowakeup) 896 wakeup(msg); 897 } else { 898 /* 899 * If an asynchronous completion has been requested the message 900 * must be queued to the reply port. 901 */ 902 spin_lock(&port->mpu_spin); 903 _lwkt_enqueue_reply(port, msg); 904 dowakeup = 0; 905 if (port->mp_flags & MSGPORTF_WAITING) { 906 port->mp_flags &= ~MSGPORTF_WAITING; 907 dowakeup = 1; 908 } 909 spin_unlock(&port->mpu_spin); 910 if (dowakeup) 911 wakeup(port); 912 } 913 } 914 915 /************************************************************************ 916 * SERIALIZER PORT BACKEND * 917 ************************************************************************ 918 * 919 * This backend uses serializer to protect port accessing. Callers are 920 * assumed to have serializer held. This kind of port is usually created 921 * by network device driver along with _one_ lwkt thread to pipeline 922 * operations which may temporarily release serializer. 923 * 924 * Implementation is based on SPIN PORT BACKEND. 925 */ 926 927 static 928 void * 929 lwkt_serialize_getport(lwkt_port_t port) 930 { 931 lwkt_msg_t msg; 932 933 ASSERT_SERIALIZED(port->mpu_serialize); 934 935 if ((msg = _lwkt_pollmsg(port)) != NULL) 936 _lwkt_pullmsg(port, msg); 937 return(msg); 938 } 939 940 static 941 int 942 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 943 { 944 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 945 ASSERT_SERIALIZED(port->mpu_serialize); 946 947 msg->ms_target_port = port; 948 _lwkt_pushmsg(port, msg); 949 if (port->mp_flags & MSGPORTF_WAITING) { 950 port->mp_flags &= ~MSGPORTF_WAITING; 951 wakeup(port); 952 } 953 return (EASYNC); 954 } 955 956 static 957 int 958 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 959 { 960 lwkt_port_t port; 961 int sentabort; 962 int error; 963 964 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 965 ("can't wait dropable message\n")); 966 967 if ((msg->ms_flags & MSGF_DONE) == 0) { 968 port = msg->ms_reply_port; 969 970 ASSERT_SERIALIZED(port->mpu_serialize); 971 972 sentabort = 0; 973 while ((msg->ms_flags & MSGF_DONE) == 0) { 974 void *won; 975 976 /* 977 * If message was sent synchronously from the beginning 978 * the wakeup will be on the message structure, else it 979 * will be on the port structure. 980 */ 981 if (msg->ms_flags & MSGF_SYNC) { 982 won = msg; 983 } else { 984 won = port; 985 port->mp_flags |= MSGPORTF_WAITING; 986 } 987 988 /* 989 * Only messages which support abort can be interrupted. 990 * We must still wait for message completion regardless. 991 */ 992 if ((flags & PCATCH) && sentabort == 0) { 993 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 994 if (error) { 995 sentabort = error; 996 lwkt_serialize_exit(port->mpu_serialize); 997 lwkt_abortmsg(msg); 998 lwkt_serialize_enter(port->mpu_serialize); 999 } 1000 } else { 1001 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1002 } 1003 /* see note at the top on the MSGPORTF_WAITING flag */ 1004 } 1005 /* 1006 * Turn EINTR into ERESTART if the signal indicates. 1007 */ 1008 if (sentabort && msg->ms_error == EINTR) 1009 msg->ms_error = sentabort; 1010 if (msg->ms_flags & MSGF_QUEUED) 1011 _lwkt_pullmsg(port, msg); 1012 } else { 1013 if (msg->ms_flags & MSGF_QUEUED) { 1014 port = msg->ms_reply_port; 1015 1016 ASSERT_SERIALIZED(port->mpu_serialize); 1017 _lwkt_pullmsg(port, msg); 1018 } 1019 } 1020 return(msg->ms_error); 1021 } 1022 1023 static 1024 void * 1025 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1026 { 1027 lwkt_msg_t msg; 1028 int error; 1029 1030 ASSERT_SERIALIZED(port->mpu_serialize); 1031 1032 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1033 port->mp_flags |= MSGPORTF_WAITING; 1034 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1035 /* see note at the top on the MSGPORTF_WAITING flag */ 1036 if (error) 1037 return(NULL); 1038 } 1039 _lwkt_pullmsg(port, msg); 1040 return(msg); 1041 } 1042 1043 static 1044 void 1045 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1046 { 1047 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1048 ASSERT_SERIALIZED(port->mpu_serialize); 1049 1050 if (msg->ms_flags & MSGF_SYNC) { 1051 /* 1052 * If a synchronous completion has been requested, just wakeup 1053 * the message without bothering to queue it to the target port. 1054 */ 1055 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1056 wakeup(msg); 1057 } else { 1058 /* 1059 * If an asynchronous completion has been requested the message 1060 * must be queued to the reply port. 1061 */ 1062 _lwkt_enqueue_reply(port, msg); 1063 if (port->mp_flags & MSGPORTF_WAITING) { 1064 port->mp_flags &= ~MSGPORTF_WAITING; 1065 wakeup(port); 1066 } 1067 } 1068 } 1069 1070 /************************************************************************ 1071 * PANIC AND SPECIAL PORT FUNCTIONS * 1072 ************************************************************************/ 1073 1074 /* 1075 * You can point a port's reply vector at this function if you just want 1076 * the message marked done, without any queueing or signaling. This is 1077 * often used for structure-embedded messages. 1078 */ 1079 static 1080 void 1081 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1082 { 1083 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1084 } 1085 1086 static 1087 void * 1088 lwkt_panic_getport(lwkt_port_t port) 1089 { 1090 panic("lwkt_getport() illegal on port %p", port); 1091 } 1092 1093 static 1094 int 1095 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1096 { 1097 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1098 } 1099 1100 static 1101 int 1102 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1103 { 1104 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1105 } 1106 1107 static 1108 void * 1109 lwkt_panic_waitport(lwkt_port_t port, int flags) 1110 { 1111 panic("port %p cannot be waited on", port); 1112 } 1113 1114 static 1115 void 1116 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1117 { 1118 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1119 } 1120 1121 static 1122 void 1123 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1124 { 1125 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1126 } 1127