1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #include <machine/smp.h> 69 70 #include <sys/malloc.h> 71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 72 73 /************************************************************************ 74 * MESSAGE FUNCTIONS * 75 ************************************************************************/ 76 77 static __inline int 78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg) 79 { 80 return port->mp_putport(port, msg); 81 } 82 83 static __inline int 84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg) 85 { 86 return port->mp_putport_oncpu(port, msg); 87 } 88 89 static __inline void 90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg) 91 { 92 KKASSERT(msg->ms_reply_port != NULL && 93 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 94 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 95 } 96 97 static __inline void 98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg) 99 { 100 int error; 101 102 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 103 /* 104 * Target port opted to execute the message synchronously so 105 * queue the response. 106 */ 107 lwkt_replymsg(msg, error); 108 } 109 } 110 111 static __inline void 112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg) 113 { 114 int error; 115 116 if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) { 117 /* 118 * Target port opted to execute the message synchronously so 119 * queue the response. 120 */ 121 lwkt_replymsg(msg, error); 122 } 123 } 124 125 /* 126 * lwkt_sendmsg() 127 * 128 * Request asynchronous completion and call lwkt_beginmsg(). The 129 * target port can opt to execute the message synchronously or 130 * asynchronously and this function will automatically queue the 131 * response if the target executes the message synchronously. 132 * 133 * NOTE: The message is in an indeterminant state until this call 134 * returns. The caller should not mess with it (e.g. try to abort it) 135 * until then. 136 * 137 * NOTE: Do not use this function to forward a message as we might 138 * clobber ms_flags in a SMP race. 139 */ 140 void 141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 142 { 143 _lwkt_sendmsg_prepare(port, msg); 144 _lwkt_sendmsg_start(port, msg); 145 } 146 147 void 148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg) 149 { 150 _lwkt_sendmsg_prepare(port, msg); 151 _lwkt_sendmsg_start_oncpu(port, msg); 152 } 153 154 void 155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg) 156 { 157 _lwkt_sendmsg_prepare(port, msg); 158 } 159 160 void 161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg) 162 { 163 _lwkt_sendmsg_start(port, msg); 164 } 165 166 void 167 lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg) 168 { 169 _lwkt_sendmsg_start_oncpu(port, msg); 170 } 171 172 /* 173 * lwkt_domsg() 174 * 175 * Request synchronous completion and call lwkt_beginmsg(). The 176 * target port can opt to execute the message synchronously or 177 * asynchronously and this function will automatically block and 178 * wait for a response if the target executes the message 179 * asynchronously. 180 * 181 * NOTE: Do not use this function to forward a message as we might 182 * clobber ms_flags in a SMP race. 183 */ 184 int 185 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 186 { 187 int error; 188 189 KKASSERT(msg->ms_reply_port != NULL && 190 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 191 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 192 msg->ms_flags |= MSGF_SYNC; 193 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 194 /* 195 * Target port opted to execute the message asynchronously so 196 * block and wait for a reply. 197 */ 198 error = lwkt_waitmsg(msg, flags); 199 } else { 200 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 201 } 202 return(error); 203 } 204 205 /* 206 * lwkt_forwardmsg() 207 * 208 * Forward a message received on one port to another port. 209 */ 210 int 211 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 212 { 213 int error; 214 215 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 216 if ((error = port->mp_putport(port, msg)) != EASYNC) 217 lwkt_replymsg(msg, error); 218 return(error); 219 } 220 221 /* 222 * lwkt_abortmsg() 223 * 224 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 225 * The caller must ensure that the message will not be both replied AND 226 * destroyed while the abort is in progress. 227 * 228 * This function issues a callback which might block! 229 */ 230 void 231 lwkt_abortmsg(lwkt_msg_t msg) 232 { 233 /* 234 * A critical section protects us from reply IPIs on this cpu. 235 */ 236 crit_enter(); 237 238 /* 239 * Shortcut the operation if the message has already been returned. 240 * The callback typically constructs a lwkt_msg with the abort request, 241 * issues it synchronously, and waits for completion. The callback 242 * is not required to actually abort the message and the target port, 243 * upon receiving an abort request message generated by the callback 244 * should check whether the original message has already completed or 245 * not. 246 */ 247 if (msg->ms_flags & MSGF_ABORTABLE) { 248 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 249 msg->ms_abortfn(msg); 250 } 251 crit_exit(); 252 } 253 254 /************************************************************************ 255 * PORT INITIALIZATION API * 256 ************************************************************************/ 257 258 static void *lwkt_thread_getport(lwkt_port_t port); 259 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 260 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 261 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 262 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 263 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 264 265 static void *lwkt_spin_getport(lwkt_port_t port); 266 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 267 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 268 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 269 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 270 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 271 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg); 272 273 static void *lwkt_serialize_getport(lwkt_port_t port); 274 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 275 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 276 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 277 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 278 279 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 280 static void *lwkt_panic_getport(lwkt_port_t port); 281 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 282 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 283 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 284 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 285 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 286 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg); 287 288 /* 289 * Core port initialization (internal) 290 */ 291 static __inline 292 void 293 _lwkt_initport(lwkt_port_t port, 294 void *(*gportfn)(lwkt_port_t), 295 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 296 int (*wmsgfn)(lwkt_msg_t, int), 297 void *(*wportfn)(lwkt_port_t, int), 298 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 299 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t), 300 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t)) 301 { 302 bzero(port, sizeof(*port)); 303 port->mp_cpuid = -1; 304 TAILQ_INIT(&port->mp_msgq); 305 TAILQ_INIT(&port->mp_msgq_prio); 306 port->mp_getport = gportfn; 307 port->mp_putport = pportfn; 308 port->mp_waitmsg = wmsgfn; 309 port->mp_waitport = wportfn; 310 port->mp_replyport = rportfn; 311 port->mp_dropmsg = dmsgfn; 312 port->mp_putport_oncpu = pportfn_oncpu; 313 } 314 315 /* 316 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 317 * we tell the scheduler not to reschedule if td is at a higher priority. 318 * 319 * This routine is called even if the thread is already scheduled. 320 */ 321 static __inline 322 void 323 _lwkt_schedule_msg(thread_t td, int flags) 324 { 325 lwkt_schedule(td); 326 } 327 328 /* 329 * lwkt_initport_thread() 330 * 331 * Initialize a port for use by a particular thread. The port may 332 * only be used by <td>. 333 */ 334 void 335 lwkt_initport_thread(lwkt_port_t port, thread_t td) 336 { 337 _lwkt_initport(port, 338 lwkt_thread_getport, 339 lwkt_thread_putport, 340 lwkt_thread_waitmsg, 341 lwkt_thread_waitport, 342 lwkt_thread_replyport, 343 lwkt_thread_dropmsg, 344 lwkt_thread_putport); 345 port->mpu_td = td; 346 } 347 348 /* 349 * lwkt_initport_spin() 350 * 351 * Initialize a port for use with descriptors that might be accessed 352 * via multiple LWPs, processes, or threads. Has somewhat more 353 * overhead then thread ports. 354 */ 355 void 356 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid) 357 { 358 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t); 359 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t); 360 361 if (td == NULL) 362 dmsgfn = lwkt_panic_dropmsg; 363 else 364 dmsgfn = lwkt_spin_dropmsg; 365 366 if (fixed_cpuid) 367 pportfn_oncpu = lwkt_spin_putport_oncpu; 368 else 369 pportfn_oncpu = lwkt_panic_putport_oncpu; 370 371 _lwkt_initport(port, 372 lwkt_spin_getport, 373 lwkt_spin_putport, 374 lwkt_spin_waitmsg, 375 lwkt_spin_waitport, 376 lwkt_spin_replyport, 377 dmsgfn, 378 pportfn_oncpu); 379 spin_init(&port->mpu_spin, "lwktinitport"); 380 port->mpu_td = td; 381 if (fixed_cpuid) 382 port->mp_cpuid = td->td_gd->gd_cpuid; 383 } 384 385 /* 386 * lwkt_initport_serialize() 387 * 388 * Initialize a port for use with descriptors that might be accessed 389 * via multiple LWPs, processes, or threads. Callers are assumed to 390 * have held the serializer (slz). 391 */ 392 void 393 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 394 { 395 _lwkt_initport(port, 396 lwkt_serialize_getport, 397 lwkt_serialize_putport, 398 lwkt_serialize_waitmsg, 399 lwkt_serialize_waitport, 400 lwkt_serialize_replyport, 401 lwkt_panic_dropmsg, 402 lwkt_panic_putport_oncpu); 403 port->mpu_serialize = slz; 404 } 405 406 /* 407 * Similar to the standard initport, this function simply marks the message 408 * as being done and does not attempt to return it to an originating port. 409 */ 410 void 411 lwkt_initport_replyonly_null(lwkt_port_t port) 412 { 413 _lwkt_initport(port, 414 lwkt_panic_getport, 415 lwkt_panic_putport, 416 lwkt_panic_waitmsg, 417 lwkt_panic_waitport, 418 lwkt_null_replyport, 419 lwkt_panic_dropmsg, 420 lwkt_panic_putport_oncpu); 421 } 422 423 /* 424 * Initialize a reply-only port, typically used as a message sink. Such 425 * ports can only be used as a reply port. 426 */ 427 void 428 lwkt_initport_replyonly(lwkt_port_t port, 429 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 430 { 431 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 432 lwkt_panic_waitmsg, lwkt_panic_waitport, 433 rportfn, lwkt_panic_dropmsg, 434 lwkt_panic_putport_oncpu); 435 } 436 437 void 438 lwkt_initport_putonly(lwkt_port_t port, 439 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 440 { 441 _lwkt_initport(port, lwkt_panic_getport, pportfn, 442 lwkt_panic_waitmsg, lwkt_panic_waitport, 443 lwkt_panic_replyport, lwkt_panic_dropmsg, 444 lwkt_panic_putport_oncpu); 445 } 446 447 void 448 lwkt_initport_panic(lwkt_port_t port) 449 { 450 _lwkt_initport(port, 451 lwkt_panic_getport, lwkt_panic_putport, 452 lwkt_panic_waitmsg, lwkt_panic_waitport, 453 lwkt_panic_replyport, lwkt_panic_dropmsg, 454 lwkt_panic_putport_oncpu); 455 } 456 457 static __inline 458 void 459 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 460 { 461 lwkt_msg_queue *queue; 462 463 /* 464 * normal case, remove and return the message. 465 */ 466 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 467 queue = &port->mp_msgq_prio; 468 else 469 queue = &port->mp_msgq; 470 TAILQ_REMOVE(queue, msg, ms_node); 471 472 /* 473 * atomic op needed for spin ports 474 */ 475 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 476 } 477 478 static __inline 479 void 480 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 481 { 482 lwkt_msg_queue *queue; 483 484 /* 485 * atomic op needed for spin ports 486 */ 487 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 488 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 489 queue = &port->mp_msgq_prio; 490 else 491 queue = &port->mp_msgq; 492 TAILQ_INSERT_TAIL(queue, msg, ms_node); 493 494 if (msg->ms_flags & MSGF_RECEIPT) { 495 /* 496 * In case this message is forwarded later, the receipt 497 * flag was cleared once the receipt function is called. 498 */ 499 atomic_clear_int(&msg->ms_flags, MSGF_RECEIPT); 500 msg->ms_receiptfn(msg, port); 501 } 502 } 503 504 static __inline 505 lwkt_msg_t 506 _lwkt_pollmsg(lwkt_port_t port) 507 { 508 lwkt_msg_t msg; 509 510 msg = TAILQ_FIRST(&port->mp_msgq_prio); 511 if (__predict_false(msg != NULL)) 512 return msg; 513 514 /* 515 * Priority queue has no message, fallback to non-priority queue. 516 */ 517 return TAILQ_FIRST(&port->mp_msgq); 518 } 519 520 static __inline 521 void 522 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 523 { 524 /* 525 * atomic op needed for spin ports 526 */ 527 _lwkt_pushmsg(port, msg); 528 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 529 } 530 531 /************************************************************************ 532 * THREAD PORT BACKEND * 533 ************************************************************************ 534 * 535 * This backend is used when the port a message is retrieved from is owned 536 * by a single thread (the calling thread). Messages are IPId to the 537 * correct cpu before being enqueued to a port. Note that this is fairly 538 * optimal since scheduling would have had to do an IPI anyway if the 539 * message were headed to a different cpu. 540 */ 541 542 /* 543 * This function completes reply processing for the default case in the 544 * context of the originating cpu. 545 */ 546 static 547 void 548 lwkt_thread_replyport_remote(lwkt_msg_t msg) 549 { 550 lwkt_port_t port = msg->ms_reply_port; 551 int flags; 552 553 /* 554 * Chase any thread migration that occurs 555 */ 556 if (port->mpu_td->td_gd != mycpu) { 557 lwkt_send_ipiq(port->mpu_td->td_gd, 558 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 559 return; 560 } 561 562 /* 563 * Cleanup (in critical section, IPI on same cpu, atomic op not needed) 564 */ 565 #ifdef INVARIANTS 566 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 567 msg->ms_flags &= ~MSGF_INTRANSIT; 568 #endif 569 flags = msg->ms_flags; 570 if (msg->ms_flags & MSGF_SYNC) { 571 cpu_sfence(); 572 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 573 } else { 574 _lwkt_enqueue_reply(port, msg); 575 } 576 if (port->mp_flags & MSGPORTF_WAITING) 577 _lwkt_schedule_msg(port->mpu_td, flags); 578 } 579 580 /* 581 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 582 * 583 * Called with the reply port as an argument but in the context of the 584 * original target port. Completion must occur on the target port's 585 * cpu. 586 * 587 * The critical section protects us from IPIs on the this CPU. 588 */ 589 static 590 void 591 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 592 { 593 int flags; 594 595 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 596 597 if (msg->ms_flags & MSGF_SYNC) { 598 /* 599 * If a synchronous completion has been requested, just wakeup 600 * the message without bothering to queue it to the target port. 601 * 602 * Assume the target thread is non-preemptive, so no critical 603 * section is required. 604 */ 605 if (port->mpu_td->td_gd == mycpu) { 606 crit_enter(); 607 flags = msg->ms_flags; 608 cpu_sfence(); 609 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 610 if (port->mp_flags & MSGPORTF_WAITING) 611 _lwkt_schedule_msg(port->mpu_td, flags); 612 crit_exit(); 613 } else { 614 #ifdef INVARIANTS 615 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 616 #endif 617 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 618 lwkt_send_ipiq(port->mpu_td->td_gd, 619 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 620 } 621 } else { 622 /* 623 * If an asynchronous completion has been requested the message 624 * must be queued to the reply port. 625 * 626 * A critical section is required to interlock the port queue. 627 */ 628 if (port->mpu_td->td_gd == mycpu) { 629 crit_enter(); 630 _lwkt_enqueue_reply(port, msg); 631 if (port->mp_flags & MSGPORTF_WAITING) 632 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 633 crit_exit(); 634 } else { 635 #ifdef INVARIANTS 636 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 637 #endif 638 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 639 lwkt_send_ipiq(port->mpu_td->td_gd, 640 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 641 } 642 } 643 } 644 645 /* 646 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 647 * 648 * This function could _only_ be used when caller is in the same thread 649 * as the message's target port owner thread. 650 */ 651 static int 652 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 653 { 654 int error; 655 656 KASSERT(port->mpu_td == curthread, 657 ("message could only be dropped in the same thread " 658 "as the message target port thread")); 659 crit_enter_quick(port->mpu_td); 660 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 661 _lwkt_pullmsg(port, msg); 662 atomic_set_int(&msg->ms_flags, MSGF_DONE); 663 error = 0; 664 } else { 665 error = ENOENT; 666 } 667 crit_exit_quick(port->mpu_td); 668 669 return (error); 670 } 671 672 /* 673 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 674 * 675 * Called with the target port as an argument but in the context of the 676 * reply port. This function always implements an asynchronous put to 677 * the target message port, and thus returns EASYNC. 678 * 679 * The message must already have cleared MSGF_DONE and MSGF_REPLY 680 */ 681 static 682 void 683 lwkt_thread_putport_remote(lwkt_msg_t msg) 684 { 685 lwkt_port_t port = msg->ms_target_port; 686 687 /* 688 * Chase any thread migration that occurs 689 */ 690 if (port->mpu_td->td_gd != mycpu) { 691 lwkt_send_ipiq(port->mpu_td->td_gd, 692 (ipifunc1_t)lwkt_thread_putport_remote, msg); 693 return; 694 } 695 696 /* 697 * An atomic op is needed on ms_flags vs originator. Also 698 * note that the originator might be using a different type 699 * of msgport. 700 */ 701 #ifdef INVARIANTS 702 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 703 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT); 704 #endif 705 _lwkt_pushmsg(port, msg); 706 if (port->mp_flags & MSGPORTF_WAITING) 707 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 708 } 709 710 static 711 int 712 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 713 { 714 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 715 716 msg->ms_target_port = port; 717 if (port->mpu_td->td_gd == mycpu) { 718 crit_enter(); 719 _lwkt_pushmsg(port, msg); 720 if (port->mp_flags & MSGPORTF_WAITING) 721 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 722 crit_exit(); 723 } else { 724 #ifdef INVARIANTS 725 /* 726 * Cleanup. 727 * 728 * An atomic op is needed on ms_flags vs originator. Also 729 * note that the originator might be using a different type 730 * of msgport. 731 */ 732 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 733 #endif 734 lwkt_send_ipiq(port->mpu_td->td_gd, 735 (ipifunc1_t)lwkt_thread_putport_remote, msg); 736 } 737 return (EASYNC); 738 } 739 740 /* 741 * lwkt_thread_getport() 742 * 743 * Retrieve the next message from the port or NULL if no messages 744 * are ready. 745 */ 746 static 747 void * 748 lwkt_thread_getport(lwkt_port_t port) 749 { 750 lwkt_msg_t msg; 751 752 KKASSERT(port->mpu_td == curthread); 753 754 crit_enter_quick(port->mpu_td); 755 if ((msg = _lwkt_pollmsg(port)) != NULL) 756 _lwkt_pullmsg(port, msg); 757 crit_exit_quick(port->mpu_td); 758 return(msg); 759 } 760 761 /* 762 * lwkt_thread_waitmsg() 763 * 764 * Wait for a particular message to be replied. We must be the only 765 * thread waiting on the message. The port must be owned by the 766 * caller. 767 */ 768 static 769 int 770 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 771 { 772 thread_t td = curthread; 773 774 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 775 ("can't wait dropable message")); 776 777 if ((msg->ms_flags & MSGF_DONE) == 0) { 778 /* 779 * If the done bit was not set we have to block until it is. 780 */ 781 lwkt_port_t port = msg->ms_reply_port; 782 int sentabort; 783 784 KKASSERT(port->mpu_td == td); 785 crit_enter_quick(td); 786 sentabort = 0; 787 788 while ((msg->ms_flags & MSGF_DONE) == 0) { 789 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */ 790 if (sentabort == 0) { 791 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 792 lwkt_abortmsg(msg); 793 } 794 } else { 795 lwkt_sleep("waitabt", 0); 796 } 797 port->mp_flags &= ~MSGPORTF_WAITING; 798 } 799 if (msg->ms_flags & MSGF_QUEUED) 800 _lwkt_pullmsg(port, msg); 801 crit_exit_quick(td); 802 } else { 803 /* 804 * If the done bit was set we only have to mess around with the 805 * message if it is queued on the reply port. 806 */ 807 crit_enter_quick(td); 808 if (msg->ms_flags & MSGF_QUEUED) { 809 lwkt_port_t port = msg->ms_reply_port; 810 thread_t td __debugvar = curthread; 811 812 KKASSERT(port->mpu_td == td); 813 _lwkt_pullmsg(port, msg); 814 } 815 crit_exit_quick(td); 816 } 817 return(msg->ms_error); 818 } 819 820 /* 821 * lwkt_thread_waitport() 822 * 823 * Wait for a new message to be available on the port. We must be the 824 * the only thread waiting on the port. The port must be owned by caller. 825 */ 826 static 827 void * 828 lwkt_thread_waitport(lwkt_port_t port, int flags) 829 { 830 thread_t td = curthread; 831 lwkt_msg_t msg; 832 int error; 833 834 KKASSERT(port->mpu_td == td); 835 crit_enter_quick(td); 836 while ((msg = _lwkt_pollmsg(port)) == NULL) { 837 port->mp_flags |= MSGPORTF_WAITING; 838 error = lwkt_sleep("waitport", flags); 839 port->mp_flags &= ~MSGPORTF_WAITING; 840 if (error) 841 goto done; 842 } 843 _lwkt_pullmsg(port, msg); 844 done: 845 crit_exit_quick(td); 846 return(msg); 847 } 848 849 /************************************************************************ 850 * SPIN PORT BACKEND * 851 ************************************************************************ 852 * 853 * This backend uses spinlocks instead of making assumptions about which 854 * thread is accessing the port. It must be used when a port is not owned 855 * by a particular thread. This is less optimal then thread ports but 856 * you don't have a choice if there are multiple threads accessing the port. 857 * 858 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 859 * on the message port, it is the responsibility of the code doing the 860 * wakeup to clear this flag rather then the blocked threads. Some 861 * superfluous wakeups may occur, which is ok. 862 * 863 * XXX synchronous message wakeups are not current optimized. 864 */ 865 866 static 867 void * 868 lwkt_spin_getport(lwkt_port_t port) 869 { 870 lwkt_msg_t msg; 871 872 spin_lock(&port->mpu_spin); 873 if ((msg = _lwkt_pollmsg(port)) != NULL) 874 _lwkt_pullmsg(port, msg); 875 spin_unlock(&port->mpu_spin); 876 return(msg); 877 } 878 879 static __inline int 880 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg) 881 { 882 int dowakeup; 883 884 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 885 886 msg->ms_target_port = port; 887 spin_lock(&port->mpu_spin); 888 _lwkt_pushmsg(port, msg); 889 dowakeup = 0; 890 if (port->mp_flags & MSGPORTF_WAITING) { 891 port->mp_flags &= ~MSGPORTF_WAITING; 892 dowakeup = 1; 893 } 894 spin_unlock(&port->mpu_spin); 895 896 return dowakeup; 897 } 898 899 static 900 int 901 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 902 { 903 if (lwkt_spin_putport_only(port, msg)) 904 wakeup(port); 905 return (EASYNC); 906 } 907 908 static 909 int 910 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg) 911 { 912 KASSERT(port->mp_cpuid == mycpuid, 913 ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d", 914 port->mp_cpuid, mycpuid)); 915 if (lwkt_spin_putport_only(port, msg)) 916 wakeup_mycpu(port); 917 return (EASYNC); 918 } 919 920 static 921 int 922 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 923 { 924 lwkt_port_t port; 925 int sentabort; 926 int error; 927 928 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 929 ("can't wait dropable message")); 930 port = msg->ms_reply_port; 931 932 if ((msg->ms_flags & MSGF_DONE) == 0) { 933 sentabort = 0; 934 spin_lock(&port->mpu_spin); 935 while ((msg->ms_flags & MSGF_DONE) == 0) { 936 void *won; 937 938 /* 939 * If message was sent synchronously from the beginning 940 * the wakeup will be on the message structure, else it 941 * will be on the port structure. 942 * 943 * ms_flags needs atomic op originator vs target MSGF_QUEUED 944 */ 945 if (msg->ms_flags & MSGF_SYNC) { 946 won = msg; 947 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 948 } else { 949 won = port; 950 port->mp_flags |= MSGPORTF_WAITING; 951 } 952 953 /* 954 * Only messages which support abort can be interrupted. 955 * We must still wait for message completion regardless. 956 */ 957 if ((flags & PCATCH) && sentabort == 0) { 958 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 959 if (error) { 960 sentabort = error; 961 spin_unlock(&port->mpu_spin); 962 lwkt_abortmsg(msg); 963 spin_lock(&port->mpu_spin); 964 } 965 } else { 966 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 967 } 968 /* see note at the top on the MSGPORTF_WAITING flag */ 969 } 970 /* 971 * Turn EINTR into ERESTART if the signal indicates. 972 */ 973 if (sentabort && msg->ms_error == EINTR) 974 msg->ms_error = sentabort; 975 if (msg->ms_flags & MSGF_QUEUED) 976 _lwkt_pullmsg(port, msg); 977 spin_unlock(&port->mpu_spin); 978 } else { 979 spin_lock(&port->mpu_spin); 980 if (msg->ms_flags & MSGF_QUEUED) { 981 _lwkt_pullmsg(port, msg); 982 } 983 spin_unlock(&port->mpu_spin); 984 } 985 return(msg->ms_error); 986 } 987 988 static 989 void * 990 lwkt_spin_waitport(lwkt_port_t port, int flags) 991 { 992 lwkt_msg_t msg; 993 int error; 994 995 spin_lock(&port->mpu_spin); 996 while ((msg = _lwkt_pollmsg(port)) == NULL) { 997 port->mp_flags |= MSGPORTF_WAITING; 998 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 999 /* see note at the top on the MSGPORTF_WAITING flag */ 1000 if (error) { 1001 spin_unlock(&port->mpu_spin); 1002 return(NULL); 1003 } 1004 } 1005 _lwkt_pullmsg(port, msg); 1006 spin_unlock(&port->mpu_spin); 1007 return(msg); 1008 } 1009 1010 static 1011 void 1012 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 1013 { 1014 int dowakeup; 1015 1016 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1017 1018 if (msg->ms_flags & MSGF_SYNC) { 1019 /* 1020 * If a synchronous completion has been requested, just wakeup 1021 * the message without bothering to queue it to the target port. 1022 * 1023 * ms_flags protected by reply port spinlock 1024 */ 1025 spin_lock(&port->mpu_spin); 1026 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1027 dowakeup = 0; 1028 if (msg->ms_flags & MSGF_WAITING) { 1029 msg->ms_flags &= ~MSGF_WAITING; 1030 dowakeup = 1; 1031 } 1032 spin_unlock(&port->mpu_spin); 1033 if (dowakeup) 1034 wakeup(msg); 1035 } else { 1036 /* 1037 * If an asynchronous completion has been requested the message 1038 * must be queued to the reply port. 1039 */ 1040 spin_lock(&port->mpu_spin); 1041 _lwkt_enqueue_reply(port, msg); 1042 dowakeup = 0; 1043 if (port->mp_flags & MSGPORTF_WAITING) { 1044 port->mp_flags &= ~MSGPORTF_WAITING; 1045 dowakeup = 1; 1046 } 1047 spin_unlock(&port->mpu_spin); 1048 if (dowakeup) 1049 wakeup(port); 1050 } 1051 } 1052 1053 /* 1054 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 1055 * 1056 * This function could _only_ be used when caller is in the same thread 1057 * as the message's target port owner thread. 1058 */ 1059 static int 1060 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1061 { 1062 int error; 1063 1064 KASSERT(port->mpu_td == curthread, 1065 ("message could only be dropped in the same thread " 1066 "as the message target port thread\n")); 1067 spin_lock(&port->mpu_spin); 1068 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 1069 _lwkt_pullmsg(port, msg); 1070 msg->ms_flags |= MSGF_DONE; 1071 error = 0; 1072 } else { 1073 error = ENOENT; 1074 } 1075 spin_unlock(&port->mpu_spin); 1076 1077 return (error); 1078 } 1079 1080 /************************************************************************ 1081 * SERIALIZER PORT BACKEND * 1082 ************************************************************************ 1083 * 1084 * This backend uses serializer to protect port accessing. Callers are 1085 * assumed to have serializer held. This kind of port is usually created 1086 * by network device driver along with _one_ lwkt thread to pipeline 1087 * operations which may temporarily release serializer. 1088 * 1089 * Implementation is based on SPIN PORT BACKEND. 1090 */ 1091 1092 static 1093 void * 1094 lwkt_serialize_getport(lwkt_port_t port) 1095 { 1096 lwkt_msg_t msg; 1097 1098 ASSERT_SERIALIZED(port->mpu_serialize); 1099 1100 if ((msg = _lwkt_pollmsg(port)) != NULL) 1101 _lwkt_pullmsg(port, msg); 1102 return(msg); 1103 } 1104 1105 static 1106 int 1107 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 1108 { 1109 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 1110 ASSERT_SERIALIZED(port->mpu_serialize); 1111 1112 msg->ms_target_port = port; 1113 _lwkt_pushmsg(port, msg); 1114 if (port->mp_flags & MSGPORTF_WAITING) { 1115 port->mp_flags &= ~MSGPORTF_WAITING; 1116 wakeup(port); 1117 } 1118 return (EASYNC); 1119 } 1120 1121 static 1122 int 1123 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 1124 { 1125 lwkt_port_t port; 1126 int sentabort; 1127 int error; 1128 1129 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 1130 ("can't wait dropable message")); 1131 1132 if ((msg->ms_flags & MSGF_DONE) == 0) { 1133 port = msg->ms_reply_port; 1134 1135 ASSERT_SERIALIZED(port->mpu_serialize); 1136 1137 sentabort = 0; 1138 while ((msg->ms_flags & MSGF_DONE) == 0) { 1139 void *won; 1140 1141 /* 1142 * If message was sent synchronously from the beginning 1143 * the wakeup will be on the message structure, else it 1144 * will be on the port structure. 1145 */ 1146 if (msg->ms_flags & MSGF_SYNC) { 1147 won = msg; 1148 } else { 1149 won = port; 1150 port->mp_flags |= MSGPORTF_WAITING; 1151 } 1152 1153 /* 1154 * Only messages which support abort can be interrupted. 1155 * We must still wait for message completion regardless. 1156 */ 1157 if ((flags & PCATCH) && sentabort == 0) { 1158 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1159 if (error) { 1160 sentabort = error; 1161 lwkt_serialize_exit(port->mpu_serialize); 1162 lwkt_abortmsg(msg); 1163 lwkt_serialize_enter(port->mpu_serialize); 1164 } 1165 } else { 1166 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1167 } 1168 /* see note at the top on the MSGPORTF_WAITING flag */ 1169 } 1170 /* 1171 * Turn EINTR into ERESTART if the signal indicates. 1172 */ 1173 if (sentabort && msg->ms_error == EINTR) 1174 msg->ms_error = sentabort; 1175 if (msg->ms_flags & MSGF_QUEUED) 1176 _lwkt_pullmsg(port, msg); 1177 } else { 1178 if (msg->ms_flags & MSGF_QUEUED) { 1179 port = msg->ms_reply_port; 1180 1181 ASSERT_SERIALIZED(port->mpu_serialize); 1182 _lwkt_pullmsg(port, msg); 1183 } 1184 } 1185 return(msg->ms_error); 1186 } 1187 1188 static 1189 void * 1190 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1191 { 1192 lwkt_msg_t msg; 1193 int error; 1194 1195 ASSERT_SERIALIZED(port->mpu_serialize); 1196 1197 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1198 port->mp_flags |= MSGPORTF_WAITING; 1199 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1200 /* see note at the top on the MSGPORTF_WAITING flag */ 1201 if (error) 1202 return(NULL); 1203 } 1204 _lwkt_pullmsg(port, msg); 1205 return(msg); 1206 } 1207 1208 static 1209 void 1210 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1211 { 1212 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1213 ASSERT_SERIALIZED(port->mpu_serialize); 1214 1215 if (msg->ms_flags & MSGF_SYNC) { 1216 /* 1217 * If a synchronous completion has been requested, just wakeup 1218 * the message without bothering to queue it to the target port. 1219 * 1220 * (both sides synchronized via serialized reply port) 1221 */ 1222 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1223 wakeup(msg); 1224 } else { 1225 /* 1226 * If an asynchronous completion has been requested the message 1227 * must be queued to the reply port. 1228 */ 1229 _lwkt_enqueue_reply(port, msg); 1230 if (port->mp_flags & MSGPORTF_WAITING) { 1231 port->mp_flags &= ~MSGPORTF_WAITING; 1232 wakeup(port); 1233 } 1234 } 1235 } 1236 1237 /************************************************************************ 1238 * PANIC AND SPECIAL PORT FUNCTIONS * 1239 ************************************************************************/ 1240 1241 /* 1242 * You can point a port's reply vector at this function if you just want 1243 * the message marked done, without any queueing or signaling. This is 1244 * often used for structure-embedded messages. 1245 */ 1246 static 1247 void 1248 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1249 { 1250 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1251 } 1252 1253 static 1254 void * 1255 lwkt_panic_getport(lwkt_port_t port) 1256 { 1257 panic("lwkt_getport() illegal on port %p", port); 1258 } 1259 1260 static 1261 int 1262 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1263 { 1264 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1265 } 1266 1267 static 1268 int 1269 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1270 { 1271 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1272 } 1273 1274 static 1275 void * 1276 lwkt_panic_waitport(lwkt_port_t port, int flags) 1277 { 1278 panic("port %p cannot be waited on", port); 1279 } 1280 1281 static 1282 void 1283 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1284 { 1285 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1286 } 1287 1288 static 1289 int 1290 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1291 { 1292 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1293 /* NOT REACHED */ 1294 return (ENOENT); 1295 } 1296 1297 static 1298 int 1299 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg) 1300 { 1301 panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p", 1302 port, msg); 1303 /* NOT REACHED */ 1304 return (ENOENT); 1305 } 1306