1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #ifdef SMP 69 #include <machine/smp.h> 70 #endif 71 72 #include <sys/malloc.h> 73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 74 75 /************************************************************************ 76 * MESSAGE FUNCTIONS * 77 ************************************************************************/ 78 79 /* 80 * lwkt_sendmsg() 81 * 82 * Request asynchronous completion and call lwkt_beginmsg(). The 83 * target port can opt to execute the message synchronously or 84 * asynchronously and this function will automatically queue the 85 * response if the target executes the message synchronously. 86 * 87 * NOTE: The message is in an indeterminant state until this call 88 * returns. The caller should not mess with it (e.g. try to abort it) 89 * until then. 90 */ 91 void 92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 93 { 94 int error; 95 96 KKASSERT(msg->ms_reply_port != NULL && 97 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 98 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 99 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 100 /* 101 * Target port opted to execute the message synchronously so 102 * queue the response. 103 */ 104 lwkt_replymsg(msg, error); 105 } 106 } 107 108 /* 109 * lwkt_domsg() 110 * 111 * Request synchronous completion and call lwkt_beginmsg(). The 112 * target port can opt to execute the message synchronously or 113 * asynchronously and this function will automatically block and 114 * wait for a response if the target executes the message 115 * asynchronously. 116 */ 117 int 118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 119 { 120 int error; 121 122 KKASSERT(msg->ms_reply_port != NULL && 123 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 124 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 125 msg->ms_flags |= MSGF_SYNC; 126 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 127 /* 128 * Target port opted to execute the message asynchronously so 129 * block and wait for a reply. 130 */ 131 error = lwkt_waitmsg(msg, flags); 132 } else { 133 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 134 } 135 return(error); 136 } 137 138 /* 139 * lwkt_forwardmsg() 140 * 141 * Forward a message received on one port to another port. 142 */ 143 int 144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 145 { 146 int error; 147 148 crit_enter(); 149 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 150 if ((error = port->mp_putport(port, msg)) != EASYNC) 151 lwkt_replymsg(msg, error); 152 crit_exit(); 153 return(error); 154 } 155 156 /* 157 * lwkt_abortmsg() 158 * 159 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 160 * The caller must ensure that the message will not be both replied AND 161 * destroyed while the abort is in progress. 162 * 163 * This function issues a callback which might block! 164 */ 165 void 166 lwkt_abortmsg(lwkt_msg_t msg) 167 { 168 /* 169 * A critical section protects us from reply IPIs on this cpu. 170 */ 171 crit_enter(); 172 173 /* 174 * Shortcut the operation if the message has already been returned. 175 * The callback typically constructs a lwkt_msg with the abort request, 176 * issues it synchronously, and waits for completion. The callback 177 * is not required to actually abort the message and the target port, 178 * upon receiving an abort request message generated by the callback 179 * should check whether the original message has already completed or 180 * not. 181 */ 182 if (msg->ms_flags & MSGF_ABORTABLE) { 183 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 184 msg->ms_abortfn(msg); 185 } 186 crit_exit(); 187 } 188 189 /************************************************************************ 190 * PORT INITIALIZATION API * 191 ************************************************************************/ 192 193 static void *lwkt_thread_getport(lwkt_port_t port); 194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 199 200 static void *lwkt_spin_getport(lwkt_port_t port); 201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 206 207 static void *lwkt_serialize_getport(lwkt_port_t port); 208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 212 213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 214 static void *lwkt_panic_getport(lwkt_port_t port); 215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 220 221 /* 222 * Core port initialization (internal) 223 */ 224 static __inline 225 void 226 _lwkt_initport(lwkt_port_t port, 227 void *(*gportfn)(lwkt_port_t), 228 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 229 int (*wmsgfn)(lwkt_msg_t, int), 230 void *(*wportfn)(lwkt_port_t, int), 231 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 232 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) 233 { 234 bzero(port, sizeof(*port)); 235 TAILQ_INIT(&port->mp_msgq); 236 TAILQ_INIT(&port->mp_msgq_prio); 237 port->mp_getport = gportfn; 238 port->mp_putport = pportfn; 239 port->mp_waitmsg = wmsgfn; 240 port->mp_waitport = wportfn; 241 port->mp_replyport = rportfn; 242 port->mp_dropmsg = dmsgfn; 243 } 244 245 /* 246 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 247 * we tell the scheduler not to reschedule if td is at a higher priority. 248 * 249 * This routine is called even if the thread is already scheduled. 250 */ 251 static __inline 252 void 253 _lwkt_schedule_msg(thread_t td, int flags) 254 { 255 lwkt_schedule(td); 256 } 257 258 /* 259 * lwkt_initport_thread() 260 * 261 * Initialize a port for use by a particular thread. The port may 262 * only be used by <td>. 263 */ 264 void 265 lwkt_initport_thread(lwkt_port_t port, thread_t td) 266 { 267 _lwkt_initport(port, 268 lwkt_thread_getport, 269 lwkt_thread_putport, 270 lwkt_thread_waitmsg, 271 lwkt_thread_waitport, 272 lwkt_thread_replyport, 273 lwkt_thread_dropmsg); 274 port->mpu_td = td; 275 } 276 277 /* 278 * lwkt_initport_spin() 279 * 280 * Initialize a port for use with descriptors that might be accessed 281 * via multiple LWPs, processes, or threads. Has somewhat more 282 * overhead then thread ports. 283 */ 284 void 285 lwkt_initport_spin(lwkt_port_t port, thread_t td) 286 { 287 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t); 288 289 if (td == NULL) 290 dmsgfn = lwkt_panic_dropmsg; 291 else 292 dmsgfn = lwkt_spin_dropmsg; 293 294 _lwkt_initport(port, 295 lwkt_spin_getport, 296 lwkt_spin_putport, 297 lwkt_spin_waitmsg, 298 lwkt_spin_waitport, 299 lwkt_spin_replyport, 300 dmsgfn); 301 spin_init(&port->mpu_spin); 302 port->mpu_td = td; 303 } 304 305 /* 306 * lwkt_initport_serialize() 307 * 308 * Initialize a port for use with descriptors that might be accessed 309 * via multiple LWPs, processes, or threads. Callers are assumed to 310 * have held the serializer (slz). 311 */ 312 void 313 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 314 { 315 _lwkt_initport(port, 316 lwkt_serialize_getport, 317 lwkt_serialize_putport, 318 lwkt_serialize_waitmsg, 319 lwkt_serialize_waitport, 320 lwkt_serialize_replyport, 321 lwkt_panic_dropmsg); 322 port->mpu_serialize = slz; 323 } 324 325 /* 326 * Similar to the standard initport, this function simply marks the message 327 * as being done and does not attempt to return it to an originating port. 328 */ 329 void 330 lwkt_initport_replyonly_null(lwkt_port_t port) 331 { 332 _lwkt_initport(port, 333 lwkt_panic_getport, 334 lwkt_panic_putport, 335 lwkt_panic_waitmsg, 336 lwkt_panic_waitport, 337 lwkt_null_replyport, 338 lwkt_panic_dropmsg); 339 } 340 341 /* 342 * Initialize a reply-only port, typically used as a message sink. Such 343 * ports can only be used as a reply port. 344 */ 345 void 346 lwkt_initport_replyonly(lwkt_port_t port, 347 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 348 { 349 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 350 lwkt_panic_waitmsg, lwkt_panic_waitport, 351 rportfn, lwkt_panic_dropmsg); 352 } 353 354 void 355 lwkt_initport_putonly(lwkt_port_t port, 356 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 357 { 358 _lwkt_initport(port, lwkt_panic_getport, pportfn, 359 lwkt_panic_waitmsg, lwkt_panic_waitport, 360 lwkt_panic_replyport, lwkt_panic_dropmsg); 361 } 362 363 void 364 lwkt_initport_panic(lwkt_port_t port) 365 { 366 _lwkt_initport(port, 367 lwkt_panic_getport, lwkt_panic_putport, 368 lwkt_panic_waitmsg, lwkt_panic_waitport, 369 lwkt_panic_replyport, lwkt_panic_dropmsg); 370 } 371 372 static __inline 373 void 374 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 375 { 376 lwkt_msg_queue *queue; 377 378 /* 379 * normal case, remove and return the message. 380 */ 381 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 382 queue = &port->mp_msgq_prio; 383 else 384 queue = &port->mp_msgq; 385 TAILQ_REMOVE(queue, msg, ms_node); 386 387 /* 388 * atomic op needed for spin ports 389 */ 390 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 391 } 392 393 static __inline 394 void 395 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 396 { 397 lwkt_msg_queue *queue; 398 399 /* 400 * atomic op needed for spin ports 401 */ 402 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 403 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 404 queue = &port->mp_msgq_prio; 405 else 406 queue = &port->mp_msgq; 407 TAILQ_INSERT_TAIL(queue, msg, ms_node); 408 } 409 410 static __inline 411 lwkt_msg_t 412 _lwkt_pollmsg(lwkt_port_t port) 413 { 414 lwkt_msg_t msg; 415 416 msg = TAILQ_FIRST(&port->mp_msgq_prio); 417 if (__predict_false(msg != NULL)) 418 return msg; 419 420 /* 421 * Priority queue has no message, fallback to non-priority queue. 422 */ 423 return TAILQ_FIRST(&port->mp_msgq); 424 } 425 426 static __inline 427 void 428 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 429 { 430 /* 431 * atomic op needed for spin ports 432 */ 433 _lwkt_pushmsg(port, msg); 434 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 435 } 436 437 /************************************************************************ 438 * THREAD PORT BACKEND * 439 ************************************************************************ 440 * 441 * This backend is used when the port a message is retrieved from is owned 442 * by a single thread (the calling thread). Messages are IPId to the 443 * correct cpu before being enqueued to a port. Note that this is fairly 444 * optimal since scheduling would have had to do an IPI anyway if the 445 * message were headed to a different cpu. 446 */ 447 448 #ifdef SMP 449 450 /* 451 * This function completes reply processing for the default case in the 452 * context of the originating cpu. 453 */ 454 static 455 void 456 lwkt_thread_replyport_remote(lwkt_msg_t msg) 457 { 458 lwkt_port_t port = msg->ms_reply_port; 459 int flags; 460 461 /* 462 * Chase any thread migration that occurs 463 */ 464 if (port->mpu_td->td_gd != mycpu) { 465 lwkt_send_ipiq(port->mpu_td->td_gd, 466 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 467 return; 468 } 469 470 /* 471 * Cleanup 472 */ 473 #ifdef INVARIANTS 474 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 475 msg->ms_flags &= ~MSGF_INTRANSIT; 476 #endif 477 flags = msg->ms_flags; 478 if (msg->ms_flags & MSGF_SYNC) { 479 cpu_sfence(); 480 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 481 } else { 482 _lwkt_enqueue_reply(port, msg); 483 } 484 if (port->mp_flags & MSGPORTF_WAITING) 485 _lwkt_schedule_msg(port->mpu_td, flags); 486 } 487 488 #endif 489 490 /* 491 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 492 * 493 * Called with the reply port as an argument but in the context of the 494 * original target port. Completion must occur on the target port's 495 * cpu. 496 * 497 * The critical section protects us from IPIs on the this CPU. 498 */ 499 static 500 void 501 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 502 { 503 int flags; 504 505 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 506 507 if (msg->ms_flags & MSGF_SYNC) { 508 /* 509 * If a synchronous completion has been requested, just wakeup 510 * the message without bothering to queue it to the target port. 511 * 512 * Assume the target thread is non-preemptive, so no critical 513 * section is required. 514 */ 515 #ifdef SMP 516 if (port->mpu_td->td_gd == mycpu) { 517 #endif 518 flags = msg->ms_flags; 519 cpu_sfence(); 520 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 521 if (port->mp_flags & MSGPORTF_WAITING) 522 _lwkt_schedule_msg(port->mpu_td, flags); 523 #ifdef SMP 524 } else { 525 #ifdef INVARIANTS 526 msg->ms_flags |= MSGF_INTRANSIT; 527 #endif 528 msg->ms_flags |= MSGF_REPLY; 529 lwkt_send_ipiq(port->mpu_td->td_gd, 530 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 531 } 532 #endif 533 } else { 534 /* 535 * If an asynchronous completion has been requested the message 536 * must be queued to the reply port. 537 * 538 * A critical section is required to interlock the port queue. 539 */ 540 #ifdef SMP 541 if (port->mpu_td->td_gd == mycpu) { 542 #endif 543 crit_enter(); 544 _lwkt_enqueue_reply(port, msg); 545 if (port->mp_flags & MSGPORTF_WAITING) 546 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 547 crit_exit(); 548 #ifdef SMP 549 } else { 550 #ifdef INVARIANTS 551 msg->ms_flags |= MSGF_INTRANSIT; 552 #endif 553 msg->ms_flags |= MSGF_REPLY; 554 lwkt_send_ipiq(port->mpu_td->td_gd, 555 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 556 } 557 #endif 558 } 559 } 560 561 /* 562 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 563 * 564 * This function could _only_ be used when caller is in the same thread 565 * as the message's target port owner thread. 566 */ 567 static void 568 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 569 { 570 KASSERT(port->mpu_td == curthread, 571 ("message could only be dropped in the same thread " 572 "as the message target port thread")); 573 crit_enter_quick(port->mpu_td); 574 _lwkt_pullmsg(port, msg); 575 msg->ms_flags |= MSGF_DONE; 576 crit_exit_quick(port->mpu_td); 577 } 578 579 /* 580 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 581 * 582 * Called with the target port as an argument but in the context of the 583 * reply port. This function always implements an asynchronous put to 584 * the target message port, and thus returns EASYNC. 585 * 586 * The message must already have cleared MSGF_DONE and MSGF_REPLY 587 */ 588 589 #ifdef SMP 590 591 static 592 void 593 lwkt_thread_putport_remote(lwkt_msg_t msg) 594 { 595 lwkt_port_t port = msg->ms_target_port; 596 597 /* 598 * Chase any thread migration that occurs 599 */ 600 if (port->mpu_td->td_gd != mycpu) { 601 lwkt_send_ipiq(port->mpu_td->td_gd, 602 (ipifunc1_t)lwkt_thread_putport_remote, msg); 603 return; 604 } 605 606 /* 607 * Cleanup 608 */ 609 #ifdef INVARIANTS 610 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 611 msg->ms_flags &= ~MSGF_INTRANSIT; 612 #endif 613 _lwkt_pushmsg(port, msg); 614 if (port->mp_flags & MSGPORTF_WAITING) 615 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 616 } 617 618 #endif 619 620 static 621 int 622 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 623 { 624 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 625 626 msg->ms_target_port = port; 627 #ifdef SMP 628 if (port->mpu_td->td_gd == mycpu) { 629 #endif 630 crit_enter(); 631 _lwkt_pushmsg(port, msg); 632 if (port->mp_flags & MSGPORTF_WAITING) 633 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 634 crit_exit(); 635 #ifdef SMP 636 } else { 637 #ifdef INVARIANTS 638 msg->ms_flags |= MSGF_INTRANSIT; 639 #endif 640 lwkt_send_ipiq(port->mpu_td->td_gd, 641 (ipifunc1_t)lwkt_thread_putport_remote, msg); 642 } 643 #endif 644 return (EASYNC); 645 } 646 647 /* 648 * lwkt_thread_getport() 649 * 650 * Retrieve the next message from the port or NULL if no messages 651 * are ready. 652 */ 653 static 654 void * 655 lwkt_thread_getport(lwkt_port_t port) 656 { 657 lwkt_msg_t msg; 658 659 KKASSERT(port->mpu_td == curthread); 660 661 crit_enter_quick(port->mpu_td); 662 if ((msg = _lwkt_pollmsg(port)) != NULL) 663 _lwkt_pullmsg(port, msg); 664 crit_exit_quick(port->mpu_td); 665 return(msg); 666 } 667 668 /* 669 * lwkt_thread_waitmsg() 670 * 671 * Wait for a particular message to be replied. We must be the only 672 * thread waiting on the message. The port must be owned by the 673 * caller. 674 */ 675 static 676 int 677 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 678 { 679 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 680 ("can't wait dropable message")); 681 682 if ((msg->ms_flags & MSGF_DONE) == 0) { 683 /* 684 * If the done bit was not set we have to block until it is. 685 */ 686 lwkt_port_t port = msg->ms_reply_port; 687 thread_t td = curthread; 688 int sentabort; 689 690 KKASSERT(port->mpu_td == td); 691 crit_enter_quick(td); 692 sentabort = 0; 693 694 while ((msg->ms_flags & MSGF_DONE) == 0) { 695 port->mp_flags |= MSGPORTF_WAITING; 696 if (sentabort == 0) { 697 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 698 lwkt_abortmsg(msg); 699 } 700 } else { 701 lwkt_sleep("waitabt", 0); 702 } 703 port->mp_flags &= ~MSGPORTF_WAITING; 704 } 705 if (msg->ms_flags & MSGF_QUEUED) 706 _lwkt_pullmsg(port, msg); 707 crit_exit_quick(td); 708 } else { 709 /* 710 * If the done bit was set we only have to mess around with the 711 * message if it is queued on the reply port. 712 */ 713 if (msg->ms_flags & MSGF_QUEUED) { 714 lwkt_port_t port = msg->ms_reply_port; 715 thread_t td = curthread; 716 717 KKASSERT(port->mpu_td == td); 718 crit_enter_quick(td); 719 _lwkt_pullmsg(port, msg); 720 crit_exit_quick(td); 721 } 722 } 723 return(msg->ms_error); 724 } 725 726 /* 727 * lwkt_thread_waitport() 728 * 729 * Wait for a new message to be available on the port. We must be the 730 * the only thread waiting on the port. The port must be owned by caller. 731 */ 732 static 733 void * 734 lwkt_thread_waitport(lwkt_port_t port, int flags) 735 { 736 thread_t td = curthread; 737 lwkt_msg_t msg; 738 int error; 739 740 KKASSERT(port->mpu_td == td); 741 crit_enter_quick(td); 742 while ((msg = _lwkt_pollmsg(port)) == NULL) { 743 port->mp_flags |= MSGPORTF_WAITING; 744 error = lwkt_sleep("waitport", flags); 745 port->mp_flags &= ~MSGPORTF_WAITING; 746 if (error) 747 goto done; 748 } 749 _lwkt_pullmsg(port, msg); 750 done: 751 crit_exit_quick(td); 752 return(msg); 753 } 754 755 /************************************************************************ 756 * SPIN PORT BACKEND * 757 ************************************************************************ 758 * 759 * This backend uses spinlocks instead of making assumptions about which 760 * thread is accessing the port. It must be used when a port is not owned 761 * by a particular thread. This is less optimal then thread ports but 762 * you don't have a choice if there are multiple threads accessing the port. 763 * 764 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 765 * on the message port, it is the responsibility of the code doing the 766 * wakeup to clear this flag rather then the blocked threads. Some 767 * superfluous wakeups may occur, which is ok. 768 * 769 * XXX synchronous message wakeups are not current optimized. 770 */ 771 772 static 773 void * 774 lwkt_spin_getport(lwkt_port_t port) 775 { 776 lwkt_msg_t msg; 777 778 spin_lock(&port->mpu_spin); 779 if ((msg = _lwkt_pollmsg(port)) != NULL) 780 _lwkt_pullmsg(port, msg); 781 spin_unlock(&port->mpu_spin); 782 return(msg); 783 } 784 785 static 786 int 787 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 788 { 789 int dowakeup; 790 791 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 792 793 msg->ms_target_port = port; 794 spin_lock(&port->mpu_spin); 795 _lwkt_pushmsg(port, msg); 796 dowakeup = 0; 797 if (port->mp_flags & MSGPORTF_WAITING) { 798 port->mp_flags &= ~MSGPORTF_WAITING; 799 dowakeup = 1; 800 } 801 spin_unlock(&port->mpu_spin); 802 if (dowakeup) 803 wakeup(port); 804 return (EASYNC); 805 } 806 807 static 808 int 809 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 810 { 811 lwkt_port_t port; 812 int sentabort; 813 int error; 814 815 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 816 ("can't wait dropable message")); 817 818 if ((msg->ms_flags & MSGF_DONE) == 0) { 819 port = msg->ms_reply_port; 820 sentabort = 0; 821 spin_lock(&port->mpu_spin); 822 while ((msg->ms_flags & MSGF_DONE) == 0) { 823 void *won; 824 825 /* 826 * If message was sent synchronously from the beginning 827 * the wakeup will be on the message structure, else it 828 * will be on the port structure. 829 */ 830 if (msg->ms_flags & MSGF_SYNC) { 831 won = msg; 832 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 833 } else { 834 won = port; 835 port->mp_flags |= MSGPORTF_WAITING; 836 } 837 838 /* 839 * Only messages which support abort can be interrupted. 840 * We must still wait for message completion regardless. 841 */ 842 if ((flags & PCATCH) && sentabort == 0) { 843 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 844 if (error) { 845 sentabort = error; 846 spin_unlock(&port->mpu_spin); 847 lwkt_abortmsg(msg); 848 spin_lock(&port->mpu_spin); 849 } 850 } else { 851 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 852 } 853 /* see note at the top on the MSGPORTF_WAITING flag */ 854 } 855 /* 856 * Turn EINTR into ERESTART if the signal indicates. 857 */ 858 if (sentabort && msg->ms_error == EINTR) 859 msg->ms_error = sentabort; 860 if (msg->ms_flags & MSGF_QUEUED) 861 _lwkt_pullmsg(port, msg); 862 spin_unlock(&port->mpu_spin); 863 } else { 864 if (msg->ms_flags & MSGF_QUEUED) { 865 port = msg->ms_reply_port; 866 spin_lock(&port->mpu_spin); 867 _lwkt_pullmsg(port, msg); 868 spin_unlock(&port->mpu_spin); 869 } 870 } 871 return(msg->ms_error); 872 } 873 874 static 875 void * 876 lwkt_spin_waitport(lwkt_port_t port, int flags) 877 { 878 lwkt_msg_t msg; 879 int error; 880 881 spin_lock(&port->mpu_spin); 882 while ((msg = _lwkt_pollmsg(port)) == NULL) { 883 port->mp_flags |= MSGPORTF_WAITING; 884 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 885 /* see note at the top on the MSGPORTF_WAITING flag */ 886 if (error) { 887 spin_unlock(&port->mpu_spin); 888 return(NULL); 889 } 890 } 891 _lwkt_pullmsg(port, msg); 892 spin_unlock(&port->mpu_spin); 893 return(msg); 894 } 895 896 static 897 void 898 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 899 { 900 int dowakeup; 901 902 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 903 904 if (msg->ms_flags & MSGF_SYNC) { 905 /* 906 * If a synchronous completion has been requested, just wakeup 907 * the message without bothering to queue it to the target port. 908 */ 909 spin_lock(&port->mpu_spin); 910 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 911 dowakeup = 0; 912 if (msg->ms_flags & MSGF_WAITING) { 913 atomic_clear_int(&msg->ms_flags, MSGF_WAITING); 914 dowakeup = 1; 915 } 916 spin_unlock(&port->mpu_spin); 917 if (dowakeup) 918 wakeup(msg); 919 } else { 920 /* 921 * If an asynchronous completion has been requested the message 922 * must be queued to the reply port. 923 */ 924 spin_lock(&port->mpu_spin); 925 _lwkt_enqueue_reply(port, msg); 926 dowakeup = 0; 927 if (port->mp_flags & MSGPORTF_WAITING) { 928 port->mp_flags &= ~MSGPORTF_WAITING; 929 dowakeup = 1; 930 } 931 spin_unlock(&port->mpu_spin); 932 if (dowakeup) 933 wakeup(port); 934 } 935 } 936 937 /* 938 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 939 * 940 * This function could _only_ be used when caller is in the same thread 941 * as the message's target port owner thread. 942 */ 943 static void 944 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 945 { 946 KASSERT(port->mpu_td == curthread, 947 ("message could only be dropped in the same thread " 948 "as the message target port thread\n")); 949 spin_lock(&port->mpu_spin); 950 _lwkt_pullmsg(port, msg); 951 msg->ms_flags |= MSGF_DONE; 952 spin_unlock(&port->mpu_spin); 953 } 954 955 /************************************************************************ 956 * SERIALIZER PORT BACKEND * 957 ************************************************************************ 958 * 959 * This backend uses serializer to protect port accessing. Callers are 960 * assumed to have serializer held. This kind of port is usually created 961 * by network device driver along with _one_ lwkt thread to pipeline 962 * operations which may temporarily release serializer. 963 * 964 * Implementation is based on SPIN PORT BACKEND. 965 */ 966 967 static 968 void * 969 lwkt_serialize_getport(lwkt_port_t port) 970 { 971 lwkt_msg_t msg; 972 973 ASSERT_SERIALIZED(port->mpu_serialize); 974 975 if ((msg = _lwkt_pollmsg(port)) != NULL) 976 _lwkt_pullmsg(port, msg); 977 return(msg); 978 } 979 980 static 981 int 982 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 983 { 984 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 985 ASSERT_SERIALIZED(port->mpu_serialize); 986 987 msg->ms_target_port = port; 988 _lwkt_pushmsg(port, msg); 989 if (port->mp_flags & MSGPORTF_WAITING) { 990 port->mp_flags &= ~MSGPORTF_WAITING; 991 wakeup(port); 992 } 993 return (EASYNC); 994 } 995 996 static 997 int 998 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 999 { 1000 lwkt_port_t port; 1001 int sentabort; 1002 int error; 1003 1004 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 1005 ("can't wait dropable message")); 1006 1007 if ((msg->ms_flags & MSGF_DONE) == 0) { 1008 port = msg->ms_reply_port; 1009 1010 ASSERT_SERIALIZED(port->mpu_serialize); 1011 1012 sentabort = 0; 1013 while ((msg->ms_flags & MSGF_DONE) == 0) { 1014 void *won; 1015 1016 /* 1017 * If message was sent synchronously from the beginning 1018 * the wakeup will be on the message structure, else it 1019 * will be on the port structure. 1020 */ 1021 if (msg->ms_flags & MSGF_SYNC) { 1022 won = msg; 1023 } else { 1024 won = port; 1025 port->mp_flags |= MSGPORTF_WAITING; 1026 } 1027 1028 /* 1029 * Only messages which support abort can be interrupted. 1030 * We must still wait for message completion regardless. 1031 */ 1032 if ((flags & PCATCH) && sentabort == 0) { 1033 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1034 if (error) { 1035 sentabort = error; 1036 lwkt_serialize_exit(port->mpu_serialize); 1037 lwkt_abortmsg(msg); 1038 lwkt_serialize_enter(port->mpu_serialize); 1039 } 1040 } else { 1041 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1042 } 1043 /* see note at the top on the MSGPORTF_WAITING flag */ 1044 } 1045 /* 1046 * Turn EINTR into ERESTART if the signal indicates. 1047 */ 1048 if (sentabort && msg->ms_error == EINTR) 1049 msg->ms_error = sentabort; 1050 if (msg->ms_flags & MSGF_QUEUED) 1051 _lwkt_pullmsg(port, msg); 1052 } else { 1053 if (msg->ms_flags & MSGF_QUEUED) { 1054 port = msg->ms_reply_port; 1055 1056 ASSERT_SERIALIZED(port->mpu_serialize); 1057 _lwkt_pullmsg(port, msg); 1058 } 1059 } 1060 return(msg->ms_error); 1061 } 1062 1063 static 1064 void * 1065 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1066 { 1067 lwkt_msg_t msg; 1068 int error; 1069 1070 ASSERT_SERIALIZED(port->mpu_serialize); 1071 1072 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1073 port->mp_flags |= MSGPORTF_WAITING; 1074 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1075 /* see note at the top on the MSGPORTF_WAITING flag */ 1076 if (error) 1077 return(NULL); 1078 } 1079 _lwkt_pullmsg(port, msg); 1080 return(msg); 1081 } 1082 1083 static 1084 void 1085 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1086 { 1087 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1088 ASSERT_SERIALIZED(port->mpu_serialize); 1089 1090 if (msg->ms_flags & MSGF_SYNC) { 1091 /* 1092 * If a synchronous completion has been requested, just wakeup 1093 * the message without bothering to queue it to the target port. 1094 */ 1095 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1096 wakeup(msg); 1097 } else { 1098 /* 1099 * If an asynchronous completion has been requested the message 1100 * must be queued to the reply port. 1101 */ 1102 _lwkt_enqueue_reply(port, msg); 1103 if (port->mp_flags & MSGPORTF_WAITING) { 1104 port->mp_flags &= ~MSGPORTF_WAITING; 1105 wakeup(port); 1106 } 1107 } 1108 } 1109 1110 /************************************************************************ 1111 * PANIC AND SPECIAL PORT FUNCTIONS * 1112 ************************************************************************/ 1113 1114 /* 1115 * You can point a port's reply vector at this function if you just want 1116 * the message marked done, without any queueing or signaling. This is 1117 * often used for structure-embedded messages. 1118 */ 1119 static 1120 void 1121 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1122 { 1123 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1124 } 1125 1126 static 1127 void * 1128 lwkt_panic_getport(lwkt_port_t port) 1129 { 1130 panic("lwkt_getport() illegal on port %p", port); 1131 } 1132 1133 static 1134 int 1135 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1136 { 1137 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1138 } 1139 1140 static 1141 int 1142 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1143 { 1144 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1145 } 1146 1147 static 1148 void * 1149 lwkt_panic_waitport(lwkt_port_t port, int flags) 1150 { 1151 panic("port %p cannot be waited on", port); 1152 } 1153 1154 static 1155 void 1156 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1157 { 1158 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1159 } 1160 1161 static 1162 void 1163 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1164 { 1165 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1166 } 1167