1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #include <machine/smp.h> 69 70 #include <sys/malloc.h> 71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 72 73 /************************************************************************ 74 * MESSAGE FUNCTIONS * 75 ************************************************************************/ 76 77 static __inline int 78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg) 79 { 80 return port->mp_putport(port, msg); 81 } 82 83 static __inline int 84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg) 85 { 86 return port->mp_putport_oncpu(port, msg); 87 } 88 89 static __inline void 90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg) 91 { 92 KKASSERT(msg->ms_reply_port != NULL && 93 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 94 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 95 } 96 97 static __inline void 98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg) 99 { 100 int error; 101 102 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 103 /* 104 * Target port opted to execute the message synchronously so 105 * queue the response. 106 */ 107 lwkt_replymsg(msg, error); 108 } 109 } 110 111 static __inline void 112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg) 113 { 114 int error; 115 116 if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) { 117 /* 118 * Target port opted to execute the message synchronously so 119 * queue the response. 120 */ 121 lwkt_replymsg(msg, error); 122 } 123 } 124 125 /* 126 * lwkt_sendmsg() 127 * 128 * Request asynchronous completion and call lwkt_beginmsg(). The 129 * target port can opt to execute the message synchronously or 130 * asynchronously and this function will automatically queue the 131 * response if the target executes the message synchronously. 132 * 133 * NOTE: The message is in an indeterminant state until this call 134 * returns. The caller should not mess with it (e.g. try to abort it) 135 * until then. 136 * 137 * NOTE: Do not use this function to forward a message as we might 138 * clobber ms_flags in a SMP race. 139 */ 140 void 141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 142 { 143 _lwkt_sendmsg_prepare(port, msg); 144 _lwkt_sendmsg_start(port, msg); 145 } 146 147 void 148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg) 149 { 150 _lwkt_sendmsg_prepare(port, msg); 151 _lwkt_sendmsg_start_oncpu(port, msg); 152 } 153 154 void 155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg) 156 { 157 _lwkt_sendmsg_prepare(port, msg); 158 } 159 160 void 161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg) 162 { 163 _lwkt_sendmsg_start(port, msg); 164 } 165 166 /* 167 * lwkt_domsg() 168 * 169 * Request synchronous completion and call lwkt_beginmsg(). The 170 * target port can opt to execute the message synchronously or 171 * asynchronously and this function will automatically block and 172 * wait for a response if the target executes the message 173 * asynchronously. 174 * 175 * NOTE: Do not use this function to forward a message as we might 176 * clobber ms_flags in a SMP race. 177 */ 178 int 179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 180 { 181 int error; 182 183 KKASSERT(msg->ms_reply_port != NULL && 184 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 185 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 186 msg->ms_flags |= MSGF_SYNC; 187 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 188 /* 189 * Target port opted to execute the message asynchronously so 190 * block and wait for a reply. 191 */ 192 error = lwkt_waitmsg(msg, flags); 193 } else { 194 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 195 } 196 return(error); 197 } 198 199 /* 200 * lwkt_forwardmsg() 201 * 202 * Forward a message received on one port to another port. 203 */ 204 int 205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 206 { 207 int error; 208 209 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 210 if ((error = port->mp_putport(port, msg)) != EASYNC) 211 lwkt_replymsg(msg, error); 212 return(error); 213 } 214 215 /* 216 * lwkt_abortmsg() 217 * 218 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 219 * The caller must ensure that the message will not be both replied AND 220 * destroyed while the abort is in progress. 221 * 222 * This function issues a callback which might block! 223 */ 224 void 225 lwkt_abortmsg(lwkt_msg_t msg) 226 { 227 /* 228 * A critical section protects us from reply IPIs on this cpu. 229 */ 230 crit_enter(); 231 232 /* 233 * Shortcut the operation if the message has already been returned. 234 * The callback typically constructs a lwkt_msg with the abort request, 235 * issues it synchronously, and waits for completion. The callback 236 * is not required to actually abort the message and the target port, 237 * upon receiving an abort request message generated by the callback 238 * should check whether the original message has already completed or 239 * not. 240 */ 241 if (msg->ms_flags & MSGF_ABORTABLE) { 242 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 243 msg->ms_abortfn(msg); 244 } 245 crit_exit(); 246 } 247 248 /************************************************************************ 249 * PORT INITIALIZATION API * 250 ************************************************************************/ 251 252 static void *lwkt_thread_getport(lwkt_port_t port); 253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 258 259 static void *lwkt_spin_getport(lwkt_port_t port); 260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg); 266 267 static void *lwkt_serialize_getport(lwkt_port_t port); 268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 272 273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 274 static void *lwkt_panic_getport(lwkt_port_t port); 275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg); 281 282 /* 283 * Core port initialization (internal) 284 */ 285 static __inline 286 void 287 _lwkt_initport(lwkt_port_t port, 288 void *(*gportfn)(lwkt_port_t), 289 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 290 int (*wmsgfn)(lwkt_msg_t, int), 291 void *(*wportfn)(lwkt_port_t, int), 292 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 293 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t), 294 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t)) 295 { 296 bzero(port, sizeof(*port)); 297 port->mp_cpuid = -1; 298 TAILQ_INIT(&port->mp_msgq); 299 TAILQ_INIT(&port->mp_msgq_prio); 300 port->mp_getport = gportfn; 301 port->mp_putport = pportfn; 302 port->mp_waitmsg = wmsgfn; 303 port->mp_waitport = wportfn; 304 port->mp_replyport = rportfn; 305 port->mp_dropmsg = dmsgfn; 306 port->mp_putport_oncpu = pportfn_oncpu; 307 } 308 309 /* 310 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 311 * we tell the scheduler not to reschedule if td is at a higher priority. 312 * 313 * This routine is called even if the thread is already scheduled. 314 */ 315 static __inline 316 void 317 _lwkt_schedule_msg(thread_t td, int flags) 318 { 319 lwkt_schedule(td); 320 } 321 322 /* 323 * lwkt_initport_thread() 324 * 325 * Initialize a port for use by a particular thread. The port may 326 * only be used by <td>. 327 */ 328 void 329 lwkt_initport_thread(lwkt_port_t port, thread_t td) 330 { 331 _lwkt_initport(port, 332 lwkt_thread_getport, 333 lwkt_thread_putport, 334 lwkt_thread_waitmsg, 335 lwkt_thread_waitport, 336 lwkt_thread_replyport, 337 lwkt_thread_dropmsg, 338 lwkt_thread_putport); 339 port->mpu_td = td; 340 } 341 342 /* 343 * lwkt_initport_spin() 344 * 345 * Initialize a port for use with descriptors that might be accessed 346 * via multiple LWPs, processes, or threads. Has somewhat more 347 * overhead then thread ports. 348 */ 349 void 350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid) 351 { 352 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t); 353 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t); 354 355 if (td == NULL) 356 dmsgfn = lwkt_panic_dropmsg; 357 else 358 dmsgfn = lwkt_spin_dropmsg; 359 360 if (fixed_cpuid) 361 pportfn_oncpu = lwkt_spin_putport_oncpu; 362 else 363 pportfn_oncpu = lwkt_panic_putport_oncpu; 364 365 _lwkt_initport(port, 366 lwkt_spin_getport, 367 lwkt_spin_putport, 368 lwkt_spin_waitmsg, 369 lwkt_spin_waitport, 370 lwkt_spin_replyport, 371 dmsgfn, 372 pportfn_oncpu); 373 spin_init(&port->mpu_spin, "lwktinitport"); 374 port->mpu_td = td; 375 if (fixed_cpuid) 376 port->mp_cpuid = td->td_gd->gd_cpuid; 377 } 378 379 /* 380 * lwkt_initport_serialize() 381 * 382 * Initialize a port for use with descriptors that might be accessed 383 * via multiple LWPs, processes, or threads. Callers are assumed to 384 * have held the serializer (slz). 385 */ 386 void 387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 388 { 389 _lwkt_initport(port, 390 lwkt_serialize_getport, 391 lwkt_serialize_putport, 392 lwkt_serialize_waitmsg, 393 lwkt_serialize_waitport, 394 lwkt_serialize_replyport, 395 lwkt_panic_dropmsg, 396 lwkt_panic_putport_oncpu); 397 port->mpu_serialize = slz; 398 } 399 400 /* 401 * Similar to the standard initport, this function simply marks the message 402 * as being done and does not attempt to return it to an originating port. 403 */ 404 void 405 lwkt_initport_replyonly_null(lwkt_port_t port) 406 { 407 _lwkt_initport(port, 408 lwkt_panic_getport, 409 lwkt_panic_putport, 410 lwkt_panic_waitmsg, 411 lwkt_panic_waitport, 412 lwkt_null_replyport, 413 lwkt_panic_dropmsg, 414 lwkt_panic_putport_oncpu); 415 } 416 417 /* 418 * Initialize a reply-only port, typically used as a message sink. Such 419 * ports can only be used as a reply port. 420 */ 421 void 422 lwkt_initport_replyonly(lwkt_port_t port, 423 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 424 { 425 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 426 lwkt_panic_waitmsg, lwkt_panic_waitport, 427 rportfn, lwkt_panic_dropmsg, 428 lwkt_panic_putport_oncpu); 429 } 430 431 void 432 lwkt_initport_putonly(lwkt_port_t port, 433 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 434 { 435 _lwkt_initport(port, lwkt_panic_getport, pportfn, 436 lwkt_panic_waitmsg, lwkt_panic_waitport, 437 lwkt_panic_replyport, lwkt_panic_dropmsg, 438 lwkt_panic_putport_oncpu); 439 } 440 441 void 442 lwkt_initport_panic(lwkt_port_t port) 443 { 444 _lwkt_initport(port, 445 lwkt_panic_getport, lwkt_panic_putport, 446 lwkt_panic_waitmsg, lwkt_panic_waitport, 447 lwkt_panic_replyport, lwkt_panic_dropmsg, 448 lwkt_panic_putport_oncpu); 449 } 450 451 static __inline 452 void 453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 454 { 455 lwkt_msg_queue *queue; 456 457 /* 458 * normal case, remove and return the message. 459 */ 460 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 461 queue = &port->mp_msgq_prio; 462 else 463 queue = &port->mp_msgq; 464 TAILQ_REMOVE(queue, msg, ms_node); 465 466 /* 467 * atomic op needed for spin ports 468 */ 469 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 470 } 471 472 static __inline 473 void 474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 475 { 476 lwkt_msg_queue *queue; 477 478 /* 479 * atomic op needed for spin ports 480 */ 481 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 482 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 483 queue = &port->mp_msgq_prio; 484 else 485 queue = &port->mp_msgq; 486 TAILQ_INSERT_TAIL(queue, msg, ms_node); 487 488 if (msg->ms_flags & MSGF_RECEIPT) { 489 /* 490 * In case this message is forwarded later, the receipt 491 * flag was cleared once the receipt function is called. 492 */ 493 atomic_clear_int(&msg->ms_flags, MSGF_RECEIPT); 494 msg->ms_receiptfn(msg, port); 495 } 496 } 497 498 static __inline 499 lwkt_msg_t 500 _lwkt_pollmsg(lwkt_port_t port) 501 { 502 lwkt_msg_t msg; 503 504 msg = TAILQ_FIRST(&port->mp_msgq_prio); 505 if (__predict_false(msg != NULL)) 506 return msg; 507 508 /* 509 * Priority queue has no message, fallback to non-priority queue. 510 */ 511 return TAILQ_FIRST(&port->mp_msgq); 512 } 513 514 static __inline 515 void 516 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 517 { 518 /* 519 * atomic op needed for spin ports 520 */ 521 _lwkt_pushmsg(port, msg); 522 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 523 } 524 525 /************************************************************************ 526 * THREAD PORT BACKEND * 527 ************************************************************************ 528 * 529 * This backend is used when the port a message is retrieved from is owned 530 * by a single thread (the calling thread). Messages are IPId to the 531 * correct cpu before being enqueued to a port. Note that this is fairly 532 * optimal since scheduling would have had to do an IPI anyway if the 533 * message were headed to a different cpu. 534 */ 535 536 /* 537 * This function completes reply processing for the default case in the 538 * context of the originating cpu. 539 */ 540 static 541 void 542 lwkt_thread_replyport_remote(lwkt_msg_t msg) 543 { 544 lwkt_port_t port = msg->ms_reply_port; 545 int flags; 546 547 /* 548 * Chase any thread migration that occurs 549 */ 550 if (port->mpu_td->td_gd != mycpu) { 551 lwkt_send_ipiq(port->mpu_td->td_gd, 552 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 553 return; 554 } 555 556 /* 557 * Cleanup (in critical section, IPI on same cpu, atomic op not needed) 558 */ 559 #ifdef INVARIANTS 560 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 561 msg->ms_flags &= ~MSGF_INTRANSIT; 562 #endif 563 flags = msg->ms_flags; 564 if (msg->ms_flags & MSGF_SYNC) { 565 cpu_sfence(); 566 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 567 } else { 568 _lwkt_enqueue_reply(port, msg); 569 } 570 if (port->mp_flags & MSGPORTF_WAITING) 571 _lwkt_schedule_msg(port->mpu_td, flags); 572 } 573 574 /* 575 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 576 * 577 * Called with the reply port as an argument but in the context of the 578 * original target port. Completion must occur on the target port's 579 * cpu. 580 * 581 * The critical section protects us from IPIs on the this CPU. 582 */ 583 static 584 void 585 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 586 { 587 int flags; 588 589 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 590 591 if (msg->ms_flags & MSGF_SYNC) { 592 /* 593 * If a synchronous completion has been requested, just wakeup 594 * the message without bothering to queue it to the target port. 595 * 596 * Assume the target thread is non-preemptive, so no critical 597 * section is required. 598 */ 599 if (port->mpu_td->td_gd == mycpu) { 600 crit_enter(); 601 flags = msg->ms_flags; 602 cpu_sfence(); 603 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 604 if (port->mp_flags & MSGPORTF_WAITING) 605 _lwkt_schedule_msg(port->mpu_td, flags); 606 crit_exit(); 607 } else { 608 #ifdef INVARIANTS 609 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 610 #endif 611 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 612 lwkt_send_ipiq(port->mpu_td->td_gd, 613 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 614 } 615 } else { 616 /* 617 * If an asynchronous completion has been requested the message 618 * must be queued to the reply port. 619 * 620 * A critical section is required to interlock the port queue. 621 */ 622 if (port->mpu_td->td_gd == mycpu) { 623 crit_enter(); 624 _lwkt_enqueue_reply(port, msg); 625 if (port->mp_flags & MSGPORTF_WAITING) 626 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 627 crit_exit(); 628 } else { 629 #ifdef INVARIANTS 630 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 631 #endif 632 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 633 lwkt_send_ipiq(port->mpu_td->td_gd, 634 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 635 } 636 } 637 } 638 639 /* 640 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 641 * 642 * This function could _only_ be used when caller is in the same thread 643 * as the message's target port owner thread. 644 */ 645 static int 646 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 647 { 648 int error; 649 650 KASSERT(port->mpu_td == curthread, 651 ("message could only be dropped in the same thread " 652 "as the message target port thread")); 653 crit_enter_quick(port->mpu_td); 654 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 655 _lwkt_pullmsg(port, msg); 656 atomic_set_int(&msg->ms_flags, MSGF_DONE); 657 error = 0; 658 } else { 659 error = ENOENT; 660 } 661 crit_exit_quick(port->mpu_td); 662 663 return (error); 664 } 665 666 /* 667 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 668 * 669 * Called with the target port as an argument but in the context of the 670 * reply port. This function always implements an asynchronous put to 671 * the target message port, and thus returns EASYNC. 672 * 673 * The message must already have cleared MSGF_DONE and MSGF_REPLY 674 */ 675 static 676 void 677 lwkt_thread_putport_remote(lwkt_msg_t msg) 678 { 679 lwkt_port_t port = msg->ms_target_port; 680 681 /* 682 * Chase any thread migration that occurs 683 */ 684 if (port->mpu_td->td_gd != mycpu) { 685 lwkt_send_ipiq(port->mpu_td->td_gd, 686 (ipifunc1_t)lwkt_thread_putport_remote, msg); 687 return; 688 } 689 690 /* 691 * An atomic op is needed on ms_flags vs originator. Also 692 * note that the originator might be using a different type 693 * of msgport. 694 */ 695 #ifdef INVARIANTS 696 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 697 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT); 698 #endif 699 _lwkt_pushmsg(port, msg); 700 if (port->mp_flags & MSGPORTF_WAITING) 701 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 702 } 703 704 static 705 int 706 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 707 { 708 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 709 710 msg->ms_target_port = port; 711 if (port->mpu_td->td_gd == mycpu) { 712 crit_enter(); 713 _lwkt_pushmsg(port, msg); 714 if (port->mp_flags & MSGPORTF_WAITING) 715 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 716 crit_exit(); 717 } else { 718 #ifdef INVARIANTS 719 /* 720 * Cleanup. 721 * 722 * An atomic op is needed on ms_flags vs originator. Also 723 * note that the originator might be using a different type 724 * of msgport. 725 */ 726 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 727 #endif 728 lwkt_send_ipiq(port->mpu_td->td_gd, 729 (ipifunc1_t)lwkt_thread_putport_remote, msg); 730 } 731 return (EASYNC); 732 } 733 734 /* 735 * lwkt_thread_getport() 736 * 737 * Retrieve the next message from the port or NULL if no messages 738 * are ready. 739 */ 740 static 741 void * 742 lwkt_thread_getport(lwkt_port_t port) 743 { 744 lwkt_msg_t msg; 745 746 KKASSERT(port->mpu_td == curthread); 747 748 crit_enter_quick(port->mpu_td); 749 if ((msg = _lwkt_pollmsg(port)) != NULL) 750 _lwkt_pullmsg(port, msg); 751 crit_exit_quick(port->mpu_td); 752 return(msg); 753 } 754 755 /* 756 * lwkt_thread_waitmsg() 757 * 758 * Wait for a particular message to be replied. We must be the only 759 * thread waiting on the message. The port must be owned by the 760 * caller. 761 */ 762 static 763 int 764 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 765 { 766 thread_t td = curthread; 767 768 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 769 ("can't wait dropable message")); 770 771 if ((msg->ms_flags & MSGF_DONE) == 0) { 772 /* 773 * If the done bit was not set we have to block until it is. 774 */ 775 lwkt_port_t port = msg->ms_reply_port; 776 int sentabort; 777 778 KKASSERT(port->mpu_td == td); 779 crit_enter_quick(td); 780 sentabort = 0; 781 782 while ((msg->ms_flags & MSGF_DONE) == 0) { 783 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */ 784 if (sentabort == 0) { 785 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 786 lwkt_abortmsg(msg); 787 } 788 } else { 789 lwkt_sleep("waitabt", 0); 790 } 791 port->mp_flags &= ~MSGPORTF_WAITING; 792 } 793 if (msg->ms_flags & MSGF_QUEUED) 794 _lwkt_pullmsg(port, msg); 795 crit_exit_quick(td); 796 } else { 797 /* 798 * If the done bit was set we only have to mess around with the 799 * message if it is queued on the reply port. 800 */ 801 crit_enter_quick(td); 802 if (msg->ms_flags & MSGF_QUEUED) { 803 lwkt_port_t port = msg->ms_reply_port; 804 thread_t td __debugvar = curthread; 805 806 KKASSERT(port->mpu_td == td); 807 _lwkt_pullmsg(port, msg); 808 } 809 crit_exit_quick(td); 810 } 811 return(msg->ms_error); 812 } 813 814 /* 815 * lwkt_thread_waitport() 816 * 817 * Wait for a new message to be available on the port. We must be the 818 * the only thread waiting on the port. The port must be owned by caller. 819 */ 820 static 821 void * 822 lwkt_thread_waitport(lwkt_port_t port, int flags) 823 { 824 thread_t td = curthread; 825 lwkt_msg_t msg; 826 int error; 827 828 KKASSERT(port->mpu_td == td); 829 crit_enter_quick(td); 830 while ((msg = _lwkt_pollmsg(port)) == NULL) { 831 port->mp_flags |= MSGPORTF_WAITING; 832 error = lwkt_sleep("waitport", flags); 833 port->mp_flags &= ~MSGPORTF_WAITING; 834 if (error) 835 goto done; 836 } 837 _lwkt_pullmsg(port, msg); 838 done: 839 crit_exit_quick(td); 840 return(msg); 841 } 842 843 /************************************************************************ 844 * SPIN PORT BACKEND * 845 ************************************************************************ 846 * 847 * This backend uses spinlocks instead of making assumptions about which 848 * thread is accessing the port. It must be used when a port is not owned 849 * by a particular thread. This is less optimal then thread ports but 850 * you don't have a choice if there are multiple threads accessing the port. 851 * 852 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 853 * on the message port, it is the responsibility of the code doing the 854 * wakeup to clear this flag rather then the blocked threads. Some 855 * superfluous wakeups may occur, which is ok. 856 * 857 * XXX synchronous message wakeups are not current optimized. 858 */ 859 860 static 861 void * 862 lwkt_spin_getport(lwkt_port_t port) 863 { 864 lwkt_msg_t msg; 865 866 spin_lock(&port->mpu_spin); 867 if ((msg = _lwkt_pollmsg(port)) != NULL) 868 _lwkt_pullmsg(port, msg); 869 spin_unlock(&port->mpu_spin); 870 return(msg); 871 } 872 873 static __inline int 874 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg) 875 { 876 int dowakeup; 877 878 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 879 880 msg->ms_target_port = port; 881 spin_lock(&port->mpu_spin); 882 _lwkt_pushmsg(port, msg); 883 dowakeup = 0; 884 if (port->mp_flags & MSGPORTF_WAITING) { 885 port->mp_flags &= ~MSGPORTF_WAITING; 886 dowakeup = 1; 887 } 888 spin_unlock(&port->mpu_spin); 889 890 return dowakeup; 891 } 892 893 static 894 int 895 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 896 { 897 if (lwkt_spin_putport_only(port, msg)) 898 wakeup(port); 899 return (EASYNC); 900 } 901 902 static 903 int 904 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg) 905 { 906 KASSERT(port->mp_cpuid == mycpuid, 907 ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d", 908 port->mp_cpuid, mycpuid)); 909 if (lwkt_spin_putport_only(port, msg)) 910 wakeup_mycpu(port); 911 return (EASYNC); 912 } 913 914 static 915 int 916 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 917 { 918 lwkt_port_t port; 919 int sentabort; 920 int error; 921 922 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 923 ("can't wait dropable message")); 924 port = msg->ms_reply_port; 925 926 if ((msg->ms_flags & MSGF_DONE) == 0) { 927 sentabort = 0; 928 spin_lock(&port->mpu_spin); 929 while ((msg->ms_flags & MSGF_DONE) == 0) { 930 void *won; 931 932 /* 933 * If message was sent synchronously from the beginning 934 * the wakeup will be on the message structure, else it 935 * will be on the port structure. 936 * 937 * ms_flags needs atomic op originator vs target MSGF_QUEUED 938 */ 939 if (msg->ms_flags & MSGF_SYNC) { 940 won = msg; 941 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 942 } else { 943 won = port; 944 port->mp_flags |= MSGPORTF_WAITING; 945 } 946 947 /* 948 * Only messages which support abort can be interrupted. 949 * We must still wait for message completion regardless. 950 */ 951 if ((flags & PCATCH) && sentabort == 0) { 952 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 953 if (error) { 954 sentabort = error; 955 spin_unlock(&port->mpu_spin); 956 lwkt_abortmsg(msg); 957 spin_lock(&port->mpu_spin); 958 } 959 } else { 960 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 961 } 962 /* see note at the top on the MSGPORTF_WAITING flag */ 963 } 964 /* 965 * Turn EINTR into ERESTART if the signal indicates. 966 */ 967 if (sentabort && msg->ms_error == EINTR) 968 msg->ms_error = sentabort; 969 if (msg->ms_flags & MSGF_QUEUED) 970 _lwkt_pullmsg(port, msg); 971 spin_unlock(&port->mpu_spin); 972 } else { 973 spin_lock(&port->mpu_spin); 974 if (msg->ms_flags & MSGF_QUEUED) { 975 _lwkt_pullmsg(port, msg); 976 } 977 spin_unlock(&port->mpu_spin); 978 } 979 return(msg->ms_error); 980 } 981 982 static 983 void * 984 lwkt_spin_waitport(lwkt_port_t port, int flags) 985 { 986 lwkt_msg_t msg; 987 int error; 988 989 spin_lock(&port->mpu_spin); 990 while ((msg = _lwkt_pollmsg(port)) == NULL) { 991 port->mp_flags |= MSGPORTF_WAITING; 992 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 993 /* see note at the top on the MSGPORTF_WAITING flag */ 994 if (error) { 995 spin_unlock(&port->mpu_spin); 996 return(NULL); 997 } 998 } 999 _lwkt_pullmsg(port, msg); 1000 spin_unlock(&port->mpu_spin); 1001 return(msg); 1002 } 1003 1004 static 1005 void 1006 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 1007 { 1008 int dowakeup; 1009 1010 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1011 1012 if (msg->ms_flags & MSGF_SYNC) { 1013 /* 1014 * If a synchronous completion has been requested, just wakeup 1015 * the message without bothering to queue it to the target port. 1016 * 1017 * ms_flags protected by reply port spinlock 1018 */ 1019 spin_lock(&port->mpu_spin); 1020 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1021 dowakeup = 0; 1022 if (msg->ms_flags & MSGF_WAITING) { 1023 msg->ms_flags &= ~MSGF_WAITING; 1024 dowakeup = 1; 1025 } 1026 spin_unlock(&port->mpu_spin); 1027 if (dowakeup) 1028 wakeup(msg); 1029 } else { 1030 /* 1031 * If an asynchronous completion has been requested the message 1032 * must be queued to the reply port. 1033 */ 1034 spin_lock(&port->mpu_spin); 1035 _lwkt_enqueue_reply(port, msg); 1036 dowakeup = 0; 1037 if (port->mp_flags & MSGPORTF_WAITING) { 1038 port->mp_flags &= ~MSGPORTF_WAITING; 1039 dowakeup = 1; 1040 } 1041 spin_unlock(&port->mpu_spin); 1042 if (dowakeup) 1043 wakeup(port); 1044 } 1045 } 1046 1047 /* 1048 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 1049 * 1050 * This function could _only_ be used when caller is in the same thread 1051 * as the message's target port owner thread. 1052 */ 1053 static int 1054 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1055 { 1056 int error; 1057 1058 KASSERT(port->mpu_td == curthread, 1059 ("message could only be dropped in the same thread " 1060 "as the message target port thread\n")); 1061 spin_lock(&port->mpu_spin); 1062 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 1063 _lwkt_pullmsg(port, msg); 1064 msg->ms_flags |= MSGF_DONE; 1065 error = 0; 1066 } else { 1067 error = ENOENT; 1068 } 1069 spin_unlock(&port->mpu_spin); 1070 1071 return (error); 1072 } 1073 1074 /************************************************************************ 1075 * SERIALIZER PORT BACKEND * 1076 ************************************************************************ 1077 * 1078 * This backend uses serializer to protect port accessing. Callers are 1079 * assumed to have serializer held. This kind of port is usually created 1080 * by network device driver along with _one_ lwkt thread to pipeline 1081 * operations which may temporarily release serializer. 1082 * 1083 * Implementation is based on SPIN PORT BACKEND. 1084 */ 1085 1086 static 1087 void * 1088 lwkt_serialize_getport(lwkt_port_t port) 1089 { 1090 lwkt_msg_t msg; 1091 1092 ASSERT_SERIALIZED(port->mpu_serialize); 1093 1094 if ((msg = _lwkt_pollmsg(port)) != NULL) 1095 _lwkt_pullmsg(port, msg); 1096 return(msg); 1097 } 1098 1099 static 1100 int 1101 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 1102 { 1103 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 1104 ASSERT_SERIALIZED(port->mpu_serialize); 1105 1106 msg->ms_target_port = port; 1107 _lwkt_pushmsg(port, msg); 1108 if (port->mp_flags & MSGPORTF_WAITING) { 1109 port->mp_flags &= ~MSGPORTF_WAITING; 1110 wakeup(port); 1111 } 1112 return (EASYNC); 1113 } 1114 1115 static 1116 int 1117 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 1118 { 1119 lwkt_port_t port; 1120 int sentabort; 1121 int error; 1122 1123 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 1124 ("can't wait dropable message")); 1125 1126 if ((msg->ms_flags & MSGF_DONE) == 0) { 1127 port = msg->ms_reply_port; 1128 1129 ASSERT_SERIALIZED(port->mpu_serialize); 1130 1131 sentabort = 0; 1132 while ((msg->ms_flags & MSGF_DONE) == 0) { 1133 void *won; 1134 1135 /* 1136 * If message was sent synchronously from the beginning 1137 * the wakeup will be on the message structure, else it 1138 * will be on the port structure. 1139 */ 1140 if (msg->ms_flags & MSGF_SYNC) { 1141 won = msg; 1142 } else { 1143 won = port; 1144 port->mp_flags |= MSGPORTF_WAITING; 1145 } 1146 1147 /* 1148 * Only messages which support abort can be interrupted. 1149 * We must still wait for message completion regardless. 1150 */ 1151 if ((flags & PCATCH) && sentabort == 0) { 1152 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1153 if (error) { 1154 sentabort = error; 1155 lwkt_serialize_exit(port->mpu_serialize); 1156 lwkt_abortmsg(msg); 1157 lwkt_serialize_enter(port->mpu_serialize); 1158 } 1159 } else { 1160 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1161 } 1162 /* see note at the top on the MSGPORTF_WAITING flag */ 1163 } 1164 /* 1165 * Turn EINTR into ERESTART if the signal indicates. 1166 */ 1167 if (sentabort && msg->ms_error == EINTR) 1168 msg->ms_error = sentabort; 1169 if (msg->ms_flags & MSGF_QUEUED) 1170 _lwkt_pullmsg(port, msg); 1171 } else { 1172 if (msg->ms_flags & MSGF_QUEUED) { 1173 port = msg->ms_reply_port; 1174 1175 ASSERT_SERIALIZED(port->mpu_serialize); 1176 _lwkt_pullmsg(port, msg); 1177 } 1178 } 1179 return(msg->ms_error); 1180 } 1181 1182 static 1183 void * 1184 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1185 { 1186 lwkt_msg_t msg; 1187 int error; 1188 1189 ASSERT_SERIALIZED(port->mpu_serialize); 1190 1191 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1192 port->mp_flags |= MSGPORTF_WAITING; 1193 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1194 /* see note at the top on the MSGPORTF_WAITING flag */ 1195 if (error) 1196 return(NULL); 1197 } 1198 _lwkt_pullmsg(port, msg); 1199 return(msg); 1200 } 1201 1202 static 1203 void 1204 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1205 { 1206 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1207 ASSERT_SERIALIZED(port->mpu_serialize); 1208 1209 if (msg->ms_flags & MSGF_SYNC) { 1210 /* 1211 * If a synchronous completion has been requested, just wakeup 1212 * the message without bothering to queue it to the target port. 1213 * 1214 * (both sides synchronized via serialized reply port) 1215 */ 1216 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1217 wakeup(msg); 1218 } else { 1219 /* 1220 * If an asynchronous completion has been requested the message 1221 * must be queued to the reply port. 1222 */ 1223 _lwkt_enqueue_reply(port, msg); 1224 if (port->mp_flags & MSGPORTF_WAITING) { 1225 port->mp_flags &= ~MSGPORTF_WAITING; 1226 wakeup(port); 1227 } 1228 } 1229 } 1230 1231 /************************************************************************ 1232 * PANIC AND SPECIAL PORT FUNCTIONS * 1233 ************************************************************************/ 1234 1235 /* 1236 * You can point a port's reply vector at this function if you just want 1237 * the message marked done, without any queueing or signaling. This is 1238 * often used for structure-embedded messages. 1239 */ 1240 static 1241 void 1242 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1243 { 1244 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1245 } 1246 1247 static 1248 void * 1249 lwkt_panic_getport(lwkt_port_t port) 1250 { 1251 panic("lwkt_getport() illegal on port %p", port); 1252 } 1253 1254 static 1255 int 1256 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1257 { 1258 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1259 } 1260 1261 static 1262 int 1263 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1264 { 1265 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1266 } 1267 1268 static 1269 void * 1270 lwkt_panic_waitport(lwkt_port_t port, int flags) 1271 { 1272 panic("port %p cannot be waited on", port); 1273 } 1274 1275 static 1276 void 1277 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1278 { 1279 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1280 } 1281 1282 static 1283 int 1284 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1285 { 1286 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1287 /* NOT REACHED */ 1288 return (ENOENT); 1289 } 1290 1291 static 1292 int 1293 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg) 1294 { 1295 panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p", 1296 port, msg); 1297 /* NOT REACHED */ 1298 return (ENOENT); 1299 } 1300