1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #include <machine/smp.h> 69 70 #include <sys/malloc.h> 71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 72 73 /************************************************************************ 74 * MESSAGE FUNCTIONS * 75 ************************************************************************/ 76 77 /* 78 * lwkt_sendmsg() 79 * 80 * Request asynchronous completion and call lwkt_beginmsg(). The 81 * target port can opt to execute the message synchronously or 82 * asynchronously and this function will automatically queue the 83 * response if the target executes the message synchronously. 84 * 85 * NOTE: The message is in an indeterminant state until this call 86 * returns. The caller should not mess with it (e.g. try to abort it) 87 * until then. 88 * 89 * NOTE: Do not use this function to forward a message as we might 90 * clobber ms_flags in a SMP race. 91 */ 92 void 93 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 94 { 95 int error; 96 97 KKASSERT(msg->ms_reply_port != NULL && 98 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 99 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 100 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 101 /* 102 * Target port opted to execute the message synchronously so 103 * queue the response. 104 */ 105 lwkt_replymsg(msg, error); 106 } 107 } 108 109 void 110 lwkt_sendmsg_stage1(lwkt_port_t port, lwkt_msg_t msg) 111 { 112 KKASSERT(msg->ms_reply_port != NULL && 113 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 114 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 115 } 116 117 void 118 lwkt_sendmsg_stage2(lwkt_port_t port, lwkt_msg_t msg) 119 { 120 int error; 121 122 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 123 /* 124 * Target port opted to execute the message synchronously so 125 * queue the response. 126 */ 127 lwkt_replymsg(msg, error); 128 } 129 } 130 131 /* 132 * lwkt_domsg() 133 * 134 * Request synchronous completion and call lwkt_beginmsg(). The 135 * target port can opt to execute the message synchronously or 136 * asynchronously and this function will automatically block and 137 * wait for a response if the target executes the message 138 * asynchronously. 139 * 140 * NOTE: Do not use this function to forward a message as we might 141 * clobber ms_flags in a SMP race. 142 */ 143 int 144 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 145 { 146 int error; 147 148 KKASSERT(msg->ms_reply_port != NULL && 149 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 150 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 151 msg->ms_flags |= MSGF_SYNC; 152 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 153 /* 154 * Target port opted to execute the message asynchronously so 155 * block and wait for a reply. 156 */ 157 error = lwkt_waitmsg(msg, flags); 158 } else { 159 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 160 } 161 return(error); 162 } 163 164 /* 165 * lwkt_forwardmsg() 166 * 167 * Forward a message received on one port to another port. 168 */ 169 int 170 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 171 { 172 int error; 173 174 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 175 if ((error = port->mp_putport(port, msg)) != EASYNC) 176 lwkt_replymsg(msg, error); 177 return(error); 178 } 179 180 /* 181 * lwkt_abortmsg() 182 * 183 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 184 * The caller must ensure that the message will not be both replied AND 185 * destroyed while the abort is in progress. 186 * 187 * This function issues a callback which might block! 188 */ 189 void 190 lwkt_abortmsg(lwkt_msg_t msg) 191 { 192 /* 193 * A critical section protects us from reply IPIs on this cpu. 194 */ 195 crit_enter(); 196 197 /* 198 * Shortcut the operation if the message has already been returned. 199 * The callback typically constructs a lwkt_msg with the abort request, 200 * issues it synchronously, and waits for completion. The callback 201 * is not required to actually abort the message and the target port, 202 * upon receiving an abort request message generated by the callback 203 * should check whether the original message has already completed or 204 * not. 205 */ 206 if (msg->ms_flags & MSGF_ABORTABLE) { 207 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 208 msg->ms_abortfn(msg); 209 } 210 crit_exit(); 211 } 212 213 /************************************************************************ 214 * PORT INITIALIZATION API * 215 ************************************************************************/ 216 217 static void *lwkt_thread_getport(lwkt_port_t port); 218 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 219 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 220 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 221 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 222 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 223 224 static void *lwkt_spin_getport(lwkt_port_t port); 225 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 226 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 227 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 228 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 229 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 230 231 static void *lwkt_serialize_getport(lwkt_port_t port); 232 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 233 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 234 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 235 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 236 237 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 238 static void *lwkt_panic_getport(lwkt_port_t port); 239 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 240 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 241 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 242 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 243 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 244 245 /* 246 * Core port initialization (internal) 247 */ 248 static __inline 249 void 250 _lwkt_initport(lwkt_port_t port, 251 void *(*gportfn)(lwkt_port_t), 252 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 253 int (*wmsgfn)(lwkt_msg_t, int), 254 void *(*wportfn)(lwkt_port_t, int), 255 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 256 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) 257 { 258 bzero(port, sizeof(*port)); 259 TAILQ_INIT(&port->mp_msgq); 260 TAILQ_INIT(&port->mp_msgq_prio); 261 port->mp_getport = gportfn; 262 port->mp_putport = pportfn; 263 port->mp_waitmsg = wmsgfn; 264 port->mp_waitport = wportfn; 265 port->mp_replyport = rportfn; 266 port->mp_dropmsg = dmsgfn; 267 } 268 269 /* 270 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 271 * we tell the scheduler not to reschedule if td is at a higher priority. 272 * 273 * This routine is called even if the thread is already scheduled. 274 */ 275 static __inline 276 void 277 _lwkt_schedule_msg(thread_t td, int flags) 278 { 279 lwkt_schedule(td); 280 } 281 282 /* 283 * lwkt_initport_thread() 284 * 285 * Initialize a port for use by a particular thread. The port may 286 * only be used by <td>. 287 */ 288 void 289 lwkt_initport_thread(lwkt_port_t port, thread_t td) 290 { 291 _lwkt_initport(port, 292 lwkt_thread_getport, 293 lwkt_thread_putport, 294 lwkt_thread_waitmsg, 295 lwkt_thread_waitport, 296 lwkt_thread_replyport, 297 lwkt_thread_dropmsg); 298 port->mpu_td = td; 299 } 300 301 /* 302 * lwkt_initport_spin() 303 * 304 * Initialize a port for use with descriptors that might be accessed 305 * via multiple LWPs, processes, or threads. Has somewhat more 306 * overhead then thread ports. 307 */ 308 void 309 lwkt_initport_spin(lwkt_port_t port, thread_t td) 310 { 311 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t); 312 313 if (td == NULL) 314 dmsgfn = lwkt_panic_dropmsg; 315 else 316 dmsgfn = lwkt_spin_dropmsg; 317 318 _lwkt_initport(port, 319 lwkt_spin_getport, 320 lwkt_spin_putport, 321 lwkt_spin_waitmsg, 322 lwkt_spin_waitport, 323 lwkt_spin_replyport, 324 dmsgfn); 325 spin_init(&port->mpu_spin); 326 port->mpu_td = td; 327 } 328 329 /* 330 * lwkt_initport_serialize() 331 * 332 * Initialize a port for use with descriptors that might be accessed 333 * via multiple LWPs, processes, or threads. Callers are assumed to 334 * have held the serializer (slz). 335 */ 336 void 337 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 338 { 339 _lwkt_initport(port, 340 lwkt_serialize_getport, 341 lwkt_serialize_putport, 342 lwkt_serialize_waitmsg, 343 lwkt_serialize_waitport, 344 lwkt_serialize_replyport, 345 lwkt_panic_dropmsg); 346 port->mpu_serialize = slz; 347 } 348 349 /* 350 * Similar to the standard initport, this function simply marks the message 351 * as being done and does not attempt to return it to an originating port. 352 */ 353 void 354 lwkt_initport_replyonly_null(lwkt_port_t port) 355 { 356 _lwkt_initport(port, 357 lwkt_panic_getport, 358 lwkt_panic_putport, 359 lwkt_panic_waitmsg, 360 lwkt_panic_waitport, 361 lwkt_null_replyport, 362 lwkt_panic_dropmsg); 363 } 364 365 /* 366 * Initialize a reply-only port, typically used as a message sink. Such 367 * ports can only be used as a reply port. 368 */ 369 void 370 lwkt_initport_replyonly(lwkt_port_t port, 371 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 372 { 373 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 374 lwkt_panic_waitmsg, lwkt_panic_waitport, 375 rportfn, lwkt_panic_dropmsg); 376 } 377 378 void 379 lwkt_initport_putonly(lwkt_port_t port, 380 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 381 { 382 _lwkt_initport(port, lwkt_panic_getport, pportfn, 383 lwkt_panic_waitmsg, lwkt_panic_waitport, 384 lwkt_panic_replyport, lwkt_panic_dropmsg); 385 } 386 387 void 388 lwkt_initport_panic(lwkt_port_t port) 389 { 390 _lwkt_initport(port, 391 lwkt_panic_getport, lwkt_panic_putport, 392 lwkt_panic_waitmsg, lwkt_panic_waitport, 393 lwkt_panic_replyport, lwkt_panic_dropmsg); 394 } 395 396 static __inline 397 void 398 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 399 { 400 lwkt_msg_queue *queue; 401 402 /* 403 * normal case, remove and return the message. 404 */ 405 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 406 queue = &port->mp_msgq_prio; 407 else 408 queue = &port->mp_msgq; 409 TAILQ_REMOVE(queue, msg, ms_node); 410 411 /* 412 * atomic op needed for spin ports 413 */ 414 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 415 } 416 417 static __inline 418 void 419 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 420 { 421 lwkt_msg_queue *queue; 422 423 /* 424 * atomic op needed for spin ports 425 */ 426 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 427 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 428 queue = &port->mp_msgq_prio; 429 else 430 queue = &port->mp_msgq; 431 TAILQ_INSERT_TAIL(queue, msg, ms_node); 432 } 433 434 static __inline 435 lwkt_msg_t 436 _lwkt_pollmsg(lwkt_port_t port) 437 { 438 lwkt_msg_t msg; 439 440 msg = TAILQ_FIRST(&port->mp_msgq_prio); 441 if (__predict_false(msg != NULL)) 442 return msg; 443 444 /* 445 * Priority queue has no message, fallback to non-priority queue. 446 */ 447 return TAILQ_FIRST(&port->mp_msgq); 448 } 449 450 static __inline 451 void 452 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 453 { 454 /* 455 * atomic op needed for spin ports 456 */ 457 _lwkt_pushmsg(port, msg); 458 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 459 } 460 461 /************************************************************************ 462 * THREAD PORT BACKEND * 463 ************************************************************************ 464 * 465 * This backend is used when the port a message is retrieved from is owned 466 * by a single thread (the calling thread). Messages are IPId to the 467 * correct cpu before being enqueued to a port. Note that this is fairly 468 * optimal since scheduling would have had to do an IPI anyway if the 469 * message were headed to a different cpu. 470 */ 471 472 /* 473 * This function completes reply processing for the default case in the 474 * context of the originating cpu. 475 */ 476 static 477 void 478 lwkt_thread_replyport_remote(lwkt_msg_t msg) 479 { 480 lwkt_port_t port = msg->ms_reply_port; 481 int flags; 482 483 /* 484 * Chase any thread migration that occurs 485 */ 486 if (port->mpu_td->td_gd != mycpu) { 487 lwkt_send_ipiq(port->mpu_td->td_gd, 488 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 489 return; 490 } 491 492 /* 493 * Cleanup (in critical section, IPI on same cpu, atomic op not needed) 494 */ 495 #ifdef INVARIANTS 496 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 497 msg->ms_flags &= ~MSGF_INTRANSIT; 498 #endif 499 flags = msg->ms_flags; 500 if (msg->ms_flags & MSGF_SYNC) { 501 cpu_sfence(); 502 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 503 } else { 504 _lwkt_enqueue_reply(port, msg); 505 } 506 if (port->mp_flags & MSGPORTF_WAITING) 507 _lwkt_schedule_msg(port->mpu_td, flags); 508 } 509 510 /* 511 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 512 * 513 * Called with the reply port as an argument but in the context of the 514 * original target port. Completion must occur on the target port's 515 * cpu. 516 * 517 * The critical section protects us from IPIs on the this CPU. 518 */ 519 static 520 void 521 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 522 { 523 int flags; 524 525 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 526 527 if (msg->ms_flags & MSGF_SYNC) { 528 /* 529 * If a synchronous completion has been requested, just wakeup 530 * the message without bothering to queue it to the target port. 531 * 532 * Assume the target thread is non-preemptive, so no critical 533 * section is required. 534 */ 535 if (port->mpu_td->td_gd == mycpu) { 536 crit_enter(); 537 flags = msg->ms_flags; 538 cpu_sfence(); 539 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 540 if (port->mp_flags & MSGPORTF_WAITING) 541 _lwkt_schedule_msg(port->mpu_td, flags); 542 crit_exit(); 543 } else { 544 #ifdef INVARIANTS 545 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 546 #endif 547 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 548 lwkt_send_ipiq(port->mpu_td->td_gd, 549 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 550 } 551 } else { 552 /* 553 * If an asynchronous completion has been requested the message 554 * must be queued to the reply port. 555 * 556 * A critical section is required to interlock the port queue. 557 */ 558 if (port->mpu_td->td_gd == mycpu) { 559 crit_enter(); 560 _lwkt_enqueue_reply(port, msg); 561 if (port->mp_flags & MSGPORTF_WAITING) 562 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 563 crit_exit(); 564 } else { 565 #ifdef INVARIANTS 566 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 567 #endif 568 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 569 lwkt_send_ipiq(port->mpu_td->td_gd, 570 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 571 } 572 } 573 } 574 575 /* 576 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 577 * 578 * This function could _only_ be used when caller is in the same thread 579 * as the message's target port owner thread. 580 */ 581 static int 582 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 583 { 584 int error; 585 586 KASSERT(port->mpu_td == curthread, 587 ("message could only be dropped in the same thread " 588 "as the message target port thread")); 589 crit_enter_quick(port->mpu_td); 590 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 591 _lwkt_pullmsg(port, msg); 592 atomic_set_int(&msg->ms_flags, MSGF_DONE); 593 error = 0; 594 } else { 595 error = ENOENT; 596 } 597 crit_exit_quick(port->mpu_td); 598 599 return (error); 600 } 601 602 /* 603 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 604 * 605 * Called with the target port as an argument but in the context of the 606 * reply port. This function always implements an asynchronous put to 607 * the target message port, and thus returns EASYNC. 608 * 609 * The message must already have cleared MSGF_DONE and MSGF_REPLY 610 */ 611 static 612 void 613 lwkt_thread_putport_remote(lwkt_msg_t msg) 614 { 615 lwkt_port_t port = msg->ms_target_port; 616 617 /* 618 * Chase any thread migration that occurs 619 */ 620 if (port->mpu_td->td_gd != mycpu) { 621 lwkt_send_ipiq(port->mpu_td->td_gd, 622 (ipifunc1_t)lwkt_thread_putport_remote, msg); 623 return; 624 } 625 626 /* 627 * An atomic op is needed on ms_flags vs originator. Also 628 * note that the originator might be using a different type 629 * of msgport. 630 */ 631 #ifdef INVARIANTS 632 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 633 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT); 634 #endif 635 _lwkt_pushmsg(port, msg); 636 if (port->mp_flags & MSGPORTF_WAITING) 637 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 638 } 639 640 static 641 int 642 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 643 { 644 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 645 646 msg->ms_target_port = port; 647 if (port->mpu_td->td_gd == mycpu) { 648 crit_enter(); 649 _lwkt_pushmsg(port, msg); 650 if (port->mp_flags & MSGPORTF_WAITING) 651 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 652 crit_exit(); 653 } else { 654 #ifdef INVARIANTS 655 /* 656 * Cleanup. 657 * 658 * An atomic op is needed on ms_flags vs originator. Also 659 * note that the originator might be using a different type 660 * of msgport. 661 */ 662 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 663 #endif 664 lwkt_send_ipiq(port->mpu_td->td_gd, 665 (ipifunc1_t)lwkt_thread_putport_remote, msg); 666 } 667 return (EASYNC); 668 } 669 670 /* 671 * lwkt_thread_getport() 672 * 673 * Retrieve the next message from the port or NULL if no messages 674 * are ready. 675 */ 676 static 677 void * 678 lwkt_thread_getport(lwkt_port_t port) 679 { 680 lwkt_msg_t msg; 681 682 KKASSERT(port->mpu_td == curthread); 683 684 crit_enter_quick(port->mpu_td); 685 if ((msg = _lwkt_pollmsg(port)) != NULL) 686 _lwkt_pullmsg(port, msg); 687 crit_exit_quick(port->mpu_td); 688 return(msg); 689 } 690 691 /* 692 * lwkt_thread_waitmsg() 693 * 694 * Wait for a particular message to be replied. We must be the only 695 * thread waiting on the message. The port must be owned by the 696 * caller. 697 */ 698 static 699 int 700 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 701 { 702 thread_t td = curthread; 703 704 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 705 ("can't wait dropable message")); 706 707 if ((msg->ms_flags & MSGF_DONE) == 0) { 708 /* 709 * If the done bit was not set we have to block until it is. 710 */ 711 lwkt_port_t port = msg->ms_reply_port; 712 int sentabort; 713 714 KKASSERT(port->mpu_td == td); 715 crit_enter_quick(td); 716 sentabort = 0; 717 718 while ((msg->ms_flags & MSGF_DONE) == 0) { 719 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */ 720 if (sentabort == 0) { 721 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 722 lwkt_abortmsg(msg); 723 } 724 } else { 725 lwkt_sleep("waitabt", 0); 726 } 727 port->mp_flags &= ~MSGPORTF_WAITING; 728 } 729 if (msg->ms_flags & MSGF_QUEUED) 730 _lwkt_pullmsg(port, msg); 731 crit_exit_quick(td); 732 } else { 733 /* 734 * If the done bit was set we only have to mess around with the 735 * message if it is queued on the reply port. 736 */ 737 crit_enter_quick(td); 738 if (msg->ms_flags & MSGF_QUEUED) { 739 lwkt_port_t port = msg->ms_reply_port; 740 thread_t td __debugvar = curthread; 741 742 KKASSERT(port->mpu_td == td); 743 _lwkt_pullmsg(port, msg); 744 } 745 crit_exit_quick(td); 746 } 747 return(msg->ms_error); 748 } 749 750 /* 751 * lwkt_thread_waitport() 752 * 753 * Wait for a new message to be available on the port. We must be the 754 * the only thread waiting on the port. The port must be owned by caller. 755 */ 756 static 757 void * 758 lwkt_thread_waitport(lwkt_port_t port, int flags) 759 { 760 thread_t td = curthread; 761 lwkt_msg_t msg; 762 int error; 763 764 KKASSERT(port->mpu_td == td); 765 crit_enter_quick(td); 766 while ((msg = _lwkt_pollmsg(port)) == NULL) { 767 port->mp_flags |= MSGPORTF_WAITING; 768 error = lwkt_sleep("waitport", flags); 769 port->mp_flags &= ~MSGPORTF_WAITING; 770 if (error) 771 goto done; 772 } 773 _lwkt_pullmsg(port, msg); 774 done: 775 crit_exit_quick(td); 776 return(msg); 777 } 778 779 /************************************************************************ 780 * SPIN PORT BACKEND * 781 ************************************************************************ 782 * 783 * This backend uses spinlocks instead of making assumptions about which 784 * thread is accessing the port. It must be used when a port is not owned 785 * by a particular thread. This is less optimal then thread ports but 786 * you don't have a choice if there are multiple threads accessing the port. 787 * 788 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 789 * on the message port, it is the responsibility of the code doing the 790 * wakeup to clear this flag rather then the blocked threads. Some 791 * superfluous wakeups may occur, which is ok. 792 * 793 * XXX synchronous message wakeups are not current optimized. 794 */ 795 796 static 797 void * 798 lwkt_spin_getport(lwkt_port_t port) 799 { 800 lwkt_msg_t msg; 801 802 spin_lock(&port->mpu_spin); 803 if ((msg = _lwkt_pollmsg(port)) != NULL) 804 _lwkt_pullmsg(port, msg); 805 spin_unlock(&port->mpu_spin); 806 return(msg); 807 } 808 809 static 810 int 811 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 812 { 813 int dowakeup; 814 815 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 816 817 msg->ms_target_port = port; 818 spin_lock(&port->mpu_spin); 819 _lwkt_pushmsg(port, msg); 820 dowakeup = 0; 821 if (port->mp_flags & MSGPORTF_WAITING) { 822 port->mp_flags &= ~MSGPORTF_WAITING; 823 dowakeup = 1; 824 } 825 spin_unlock(&port->mpu_spin); 826 if (dowakeup) 827 wakeup(port); 828 return (EASYNC); 829 } 830 831 static 832 int 833 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 834 { 835 lwkt_port_t port; 836 int sentabort; 837 int error; 838 839 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 840 ("can't wait dropable message")); 841 port = msg->ms_reply_port; 842 843 if ((msg->ms_flags & MSGF_DONE) == 0) { 844 sentabort = 0; 845 spin_lock(&port->mpu_spin); 846 while ((msg->ms_flags & MSGF_DONE) == 0) { 847 void *won; 848 849 /* 850 * If message was sent synchronously from the beginning 851 * the wakeup will be on the message structure, else it 852 * will be on the port structure. 853 * 854 * ms_flags needs atomic op originator vs target MSGF_QUEUED 855 */ 856 if (msg->ms_flags & MSGF_SYNC) { 857 won = msg; 858 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 859 } else { 860 won = port; 861 port->mp_flags |= MSGPORTF_WAITING; 862 } 863 864 /* 865 * Only messages which support abort can be interrupted. 866 * We must still wait for message completion regardless. 867 */ 868 if ((flags & PCATCH) && sentabort == 0) { 869 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 870 if (error) { 871 sentabort = error; 872 spin_unlock(&port->mpu_spin); 873 lwkt_abortmsg(msg); 874 spin_lock(&port->mpu_spin); 875 } 876 } else { 877 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 878 } 879 /* see note at the top on the MSGPORTF_WAITING flag */ 880 } 881 /* 882 * Turn EINTR into ERESTART if the signal indicates. 883 */ 884 if (sentabort && msg->ms_error == EINTR) 885 msg->ms_error = sentabort; 886 if (msg->ms_flags & MSGF_QUEUED) 887 _lwkt_pullmsg(port, msg); 888 spin_unlock(&port->mpu_spin); 889 } else { 890 spin_lock(&port->mpu_spin); 891 if (msg->ms_flags & MSGF_QUEUED) { 892 _lwkt_pullmsg(port, msg); 893 } 894 spin_unlock(&port->mpu_spin); 895 } 896 return(msg->ms_error); 897 } 898 899 static 900 void * 901 lwkt_spin_waitport(lwkt_port_t port, int flags) 902 { 903 lwkt_msg_t msg; 904 int error; 905 906 spin_lock(&port->mpu_spin); 907 while ((msg = _lwkt_pollmsg(port)) == NULL) { 908 port->mp_flags |= MSGPORTF_WAITING; 909 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 910 /* see note at the top on the MSGPORTF_WAITING flag */ 911 if (error) { 912 spin_unlock(&port->mpu_spin); 913 return(NULL); 914 } 915 } 916 _lwkt_pullmsg(port, msg); 917 spin_unlock(&port->mpu_spin); 918 return(msg); 919 } 920 921 static 922 void 923 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 924 { 925 int dowakeup; 926 927 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 928 929 if (msg->ms_flags & MSGF_SYNC) { 930 /* 931 * If a synchronous completion has been requested, just wakeup 932 * the message without bothering to queue it to the target port. 933 * 934 * ms_flags protected by reply port spinlock 935 */ 936 spin_lock(&port->mpu_spin); 937 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 938 dowakeup = 0; 939 if (msg->ms_flags & MSGF_WAITING) { 940 msg->ms_flags &= ~MSGF_WAITING; 941 dowakeup = 1; 942 } 943 spin_unlock(&port->mpu_spin); 944 if (dowakeup) 945 wakeup(msg); 946 } else { 947 /* 948 * If an asynchronous completion has been requested the message 949 * must be queued to the reply port. 950 */ 951 spin_lock(&port->mpu_spin); 952 _lwkt_enqueue_reply(port, msg); 953 dowakeup = 0; 954 if (port->mp_flags & MSGPORTF_WAITING) { 955 port->mp_flags &= ~MSGPORTF_WAITING; 956 dowakeup = 1; 957 } 958 spin_unlock(&port->mpu_spin); 959 if (dowakeup) 960 wakeup(port); 961 } 962 } 963 964 /* 965 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 966 * 967 * This function could _only_ be used when caller is in the same thread 968 * as the message's target port owner thread. 969 */ 970 static int 971 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 972 { 973 int error; 974 975 KASSERT(port->mpu_td == curthread, 976 ("message could only be dropped in the same thread " 977 "as the message target port thread\n")); 978 spin_lock(&port->mpu_spin); 979 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 980 _lwkt_pullmsg(port, msg); 981 msg->ms_flags |= MSGF_DONE; 982 error = 0; 983 } else { 984 error = ENOENT; 985 } 986 spin_unlock(&port->mpu_spin); 987 988 return (error); 989 } 990 991 /************************************************************************ 992 * SERIALIZER PORT BACKEND * 993 ************************************************************************ 994 * 995 * This backend uses serializer to protect port accessing. Callers are 996 * assumed to have serializer held. This kind of port is usually created 997 * by network device driver along with _one_ lwkt thread to pipeline 998 * operations which may temporarily release serializer. 999 * 1000 * Implementation is based on SPIN PORT BACKEND. 1001 */ 1002 1003 static 1004 void * 1005 lwkt_serialize_getport(lwkt_port_t port) 1006 { 1007 lwkt_msg_t msg; 1008 1009 ASSERT_SERIALIZED(port->mpu_serialize); 1010 1011 if ((msg = _lwkt_pollmsg(port)) != NULL) 1012 _lwkt_pullmsg(port, msg); 1013 return(msg); 1014 } 1015 1016 static 1017 int 1018 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 1019 { 1020 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 1021 ASSERT_SERIALIZED(port->mpu_serialize); 1022 1023 msg->ms_target_port = port; 1024 _lwkt_pushmsg(port, msg); 1025 if (port->mp_flags & MSGPORTF_WAITING) { 1026 port->mp_flags &= ~MSGPORTF_WAITING; 1027 wakeup(port); 1028 } 1029 return (EASYNC); 1030 } 1031 1032 static 1033 int 1034 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 1035 { 1036 lwkt_port_t port; 1037 int sentabort; 1038 int error; 1039 1040 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 1041 ("can't wait dropable message")); 1042 1043 if ((msg->ms_flags & MSGF_DONE) == 0) { 1044 port = msg->ms_reply_port; 1045 1046 ASSERT_SERIALIZED(port->mpu_serialize); 1047 1048 sentabort = 0; 1049 while ((msg->ms_flags & MSGF_DONE) == 0) { 1050 void *won; 1051 1052 /* 1053 * If message was sent synchronously from the beginning 1054 * the wakeup will be on the message structure, else it 1055 * will be on the port structure. 1056 */ 1057 if (msg->ms_flags & MSGF_SYNC) { 1058 won = msg; 1059 } else { 1060 won = port; 1061 port->mp_flags |= MSGPORTF_WAITING; 1062 } 1063 1064 /* 1065 * Only messages which support abort can be interrupted. 1066 * We must still wait for message completion regardless. 1067 */ 1068 if ((flags & PCATCH) && sentabort == 0) { 1069 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1070 if (error) { 1071 sentabort = error; 1072 lwkt_serialize_exit(port->mpu_serialize); 1073 lwkt_abortmsg(msg); 1074 lwkt_serialize_enter(port->mpu_serialize); 1075 } 1076 } else { 1077 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1078 } 1079 /* see note at the top on the MSGPORTF_WAITING flag */ 1080 } 1081 /* 1082 * Turn EINTR into ERESTART if the signal indicates. 1083 */ 1084 if (sentabort && msg->ms_error == EINTR) 1085 msg->ms_error = sentabort; 1086 if (msg->ms_flags & MSGF_QUEUED) 1087 _lwkt_pullmsg(port, msg); 1088 } else { 1089 if (msg->ms_flags & MSGF_QUEUED) { 1090 port = msg->ms_reply_port; 1091 1092 ASSERT_SERIALIZED(port->mpu_serialize); 1093 _lwkt_pullmsg(port, msg); 1094 } 1095 } 1096 return(msg->ms_error); 1097 } 1098 1099 static 1100 void * 1101 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1102 { 1103 lwkt_msg_t msg; 1104 int error; 1105 1106 ASSERT_SERIALIZED(port->mpu_serialize); 1107 1108 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1109 port->mp_flags |= MSGPORTF_WAITING; 1110 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1111 /* see note at the top on the MSGPORTF_WAITING flag */ 1112 if (error) 1113 return(NULL); 1114 } 1115 _lwkt_pullmsg(port, msg); 1116 return(msg); 1117 } 1118 1119 static 1120 void 1121 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1122 { 1123 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1124 ASSERT_SERIALIZED(port->mpu_serialize); 1125 1126 if (msg->ms_flags & MSGF_SYNC) { 1127 /* 1128 * If a synchronous completion has been requested, just wakeup 1129 * the message without bothering to queue it to the target port. 1130 * 1131 * (both sides synchronized via serialized reply port) 1132 */ 1133 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1134 wakeup(msg); 1135 } else { 1136 /* 1137 * If an asynchronous completion has been requested the message 1138 * must be queued to the reply port. 1139 */ 1140 _lwkt_enqueue_reply(port, msg); 1141 if (port->mp_flags & MSGPORTF_WAITING) { 1142 port->mp_flags &= ~MSGPORTF_WAITING; 1143 wakeup(port); 1144 } 1145 } 1146 } 1147 1148 /************************************************************************ 1149 * PANIC AND SPECIAL PORT FUNCTIONS * 1150 ************************************************************************/ 1151 1152 /* 1153 * You can point a port's reply vector at this function if you just want 1154 * the message marked done, without any queueing or signaling. This is 1155 * often used for structure-embedded messages. 1156 */ 1157 static 1158 void 1159 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1160 { 1161 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1162 } 1163 1164 static 1165 void * 1166 lwkt_panic_getport(lwkt_port_t port) 1167 { 1168 panic("lwkt_getport() illegal on port %p", port); 1169 } 1170 1171 static 1172 int 1173 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1174 { 1175 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1176 } 1177 1178 static 1179 int 1180 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1181 { 1182 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1183 } 1184 1185 static 1186 void * 1187 lwkt_panic_waitport(lwkt_port_t port, int flags) 1188 { 1189 panic("port %p cannot be waited on", port); 1190 } 1191 1192 static 1193 void 1194 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1195 { 1196 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1197 } 1198 1199 static 1200 int 1201 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1202 { 1203 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1204 /* NOT REACHED */ 1205 return (ENOENT); 1206 } 1207