1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 */ 37 38 #include <sys/param.h> 39 #include <sys/systm.h> 40 #include <sys/kernel.h> 41 #include <sys/proc.h> 42 #include <sys/rtprio.h> 43 #include <sys/queue.h> 44 #include <sys/sysctl.h> 45 #include <sys/kthread.h> 46 #include <sys/signalvar.h> 47 #include <sys/signal2.h> 48 #include <machine/cpu.h> 49 #include <sys/lock.h> 50 51 #include <vm/vm.h> 52 #include <vm/vm_param.h> 53 #include <vm/vm_kern.h> 54 #include <vm/vm_object.h> 55 #include <vm/vm_page.h> 56 #include <vm/vm_map.h> 57 #include <vm/vm_pager.h> 58 #include <vm/vm_extern.h> 59 #include <vm/vm_zone.h> 60 61 #include <sys/thread2.h> 62 #include <sys/msgport2.h> 63 #include <sys/spinlock2.h> 64 #include <sys/serialize.h> 65 66 #include <machine/stdarg.h> 67 #include <machine/cpufunc.h> 68 #include <machine/smp.h> 69 70 #include <sys/malloc.h> 71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 72 73 /************************************************************************ 74 * MESSAGE FUNCTIONS * 75 ************************************************************************/ 76 77 static __inline int 78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg) 79 { 80 return port->mp_putport(port, msg); 81 } 82 83 static __inline int 84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg) 85 { 86 return port->mp_putport_oncpu(port, msg); 87 } 88 89 static __inline void 90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg) 91 { 92 KKASSERT(msg->ms_reply_port != NULL && 93 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 94 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 95 } 96 97 static __inline void 98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg) 99 { 100 int error; 101 102 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 103 /* 104 * Target port opted to execute the message synchronously so 105 * queue the response. 106 */ 107 lwkt_replymsg(msg, error); 108 } 109 } 110 111 static __inline void 112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg) 113 { 114 int error; 115 116 if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) { 117 /* 118 * Target port opted to execute the message synchronously so 119 * queue the response. 120 */ 121 lwkt_replymsg(msg, error); 122 } 123 } 124 125 /* 126 * lwkt_sendmsg() 127 * 128 * Request asynchronous completion and call lwkt_beginmsg(). The 129 * target port can opt to execute the message synchronously or 130 * asynchronously and this function will automatically queue the 131 * response if the target executes the message synchronously. 132 * 133 * NOTE: The message is in an indeterminant state until this call 134 * returns. The caller should not mess with it (e.g. try to abort it) 135 * until then. 136 * 137 * NOTE: Do not use this function to forward a message as we might 138 * clobber ms_flags in a SMP race. 139 */ 140 void 141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 142 { 143 _lwkt_sendmsg_prepare(port, msg); 144 _lwkt_sendmsg_start(port, msg); 145 } 146 147 void 148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg) 149 { 150 _lwkt_sendmsg_prepare(port, msg); 151 _lwkt_sendmsg_start_oncpu(port, msg); 152 } 153 154 void 155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg) 156 { 157 _lwkt_sendmsg_prepare(port, msg); 158 } 159 160 void 161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg) 162 { 163 _lwkt_sendmsg_start(port, msg); 164 } 165 166 /* 167 * lwkt_domsg() 168 * 169 * Request synchronous completion and call lwkt_beginmsg(). The 170 * target port can opt to execute the message synchronously or 171 * asynchronously and this function will automatically block and 172 * wait for a response if the target executes the message 173 * asynchronously. 174 * 175 * NOTE: Do not use this function to forward a message as we might 176 * clobber ms_flags in a SMP race. 177 */ 178 int 179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 180 { 181 int error; 182 183 KKASSERT(msg->ms_reply_port != NULL && 184 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 185 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 186 msg->ms_flags |= MSGF_SYNC; 187 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 188 /* 189 * Target port opted to execute the message asynchronously so 190 * block and wait for a reply. 191 */ 192 error = lwkt_waitmsg(msg, flags); 193 } else { 194 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 195 } 196 return(error); 197 } 198 199 /* 200 * lwkt_forwardmsg() 201 * 202 * Forward a message received on one port to another port. 203 */ 204 int 205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 206 { 207 int error; 208 209 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 210 if ((error = port->mp_putport(port, msg)) != EASYNC) 211 lwkt_replymsg(msg, error); 212 return(error); 213 } 214 215 /* 216 * lwkt_abortmsg() 217 * 218 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 219 * The caller must ensure that the message will not be both replied AND 220 * destroyed while the abort is in progress. 221 * 222 * This function issues a callback which might block! 223 */ 224 void 225 lwkt_abortmsg(lwkt_msg_t msg) 226 { 227 /* 228 * A critical section protects us from reply IPIs on this cpu. 229 */ 230 crit_enter(); 231 232 /* 233 * Shortcut the operation if the message has already been returned. 234 * The callback typically constructs a lwkt_msg with the abort request, 235 * issues it synchronously, and waits for completion. The callback 236 * is not required to actually abort the message and the target port, 237 * upon receiving an abort request message generated by the callback 238 * should check whether the original message has already completed or 239 * not. 240 */ 241 if (msg->ms_flags & MSGF_ABORTABLE) { 242 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 243 msg->ms_abortfn(msg); 244 } 245 crit_exit(); 246 } 247 248 /************************************************************************ 249 * PORT INITIALIZATION API * 250 ************************************************************************/ 251 252 static void *lwkt_thread_getport(lwkt_port_t port); 253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 258 259 static void *lwkt_spin_getport(lwkt_port_t port); 260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg); 266 267 static void *lwkt_serialize_getport(lwkt_port_t port); 268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 272 273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 274 static void *lwkt_panic_getport(lwkt_port_t port); 275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg); 281 282 /* 283 * Core port initialization (internal) 284 */ 285 static __inline 286 void 287 _lwkt_initport(lwkt_port_t port, 288 void *(*gportfn)(lwkt_port_t), 289 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 290 int (*wmsgfn)(lwkt_msg_t, int), 291 void *(*wportfn)(lwkt_port_t, int), 292 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 293 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t), 294 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t)) 295 { 296 bzero(port, sizeof(*port)); 297 port->mp_cpuid = -1; 298 TAILQ_INIT(&port->mp_msgq); 299 TAILQ_INIT(&port->mp_msgq_prio); 300 port->mp_getport = gportfn; 301 port->mp_putport = pportfn; 302 port->mp_waitmsg = wmsgfn; 303 port->mp_waitport = wportfn; 304 port->mp_replyport = rportfn; 305 port->mp_dropmsg = dmsgfn; 306 port->mp_putport_oncpu = pportfn_oncpu; 307 } 308 309 /* 310 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 311 * we tell the scheduler not to reschedule if td is at a higher priority. 312 * 313 * This routine is called even if the thread is already scheduled. 314 */ 315 static __inline 316 void 317 _lwkt_schedule_msg(thread_t td, int flags) 318 { 319 lwkt_schedule(td); 320 } 321 322 /* 323 * lwkt_initport_thread() 324 * 325 * Initialize a port for use by a particular thread. The port may 326 * only be used by <td>. 327 */ 328 void 329 lwkt_initport_thread(lwkt_port_t port, thread_t td) 330 { 331 _lwkt_initport(port, 332 lwkt_thread_getport, 333 lwkt_thread_putport, 334 lwkt_thread_waitmsg, 335 lwkt_thread_waitport, 336 lwkt_thread_replyport, 337 lwkt_thread_dropmsg, 338 lwkt_thread_putport); 339 port->mpu_td = td; 340 } 341 342 /* 343 * lwkt_initport_spin() 344 * 345 * Initialize a port for use with descriptors that might be accessed 346 * via multiple LWPs, processes, or threads. Has somewhat more 347 * overhead then thread ports. 348 */ 349 void 350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid) 351 { 352 int (*dmsgfn)(lwkt_port_t, lwkt_msg_t); 353 int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t); 354 355 if (td == NULL) 356 dmsgfn = lwkt_panic_dropmsg; 357 else 358 dmsgfn = lwkt_spin_dropmsg; 359 360 if (fixed_cpuid) 361 pportfn_oncpu = lwkt_spin_putport_oncpu; 362 else 363 pportfn_oncpu = lwkt_panic_putport_oncpu; 364 365 _lwkt_initport(port, 366 lwkt_spin_getport, 367 lwkt_spin_putport, 368 lwkt_spin_waitmsg, 369 lwkt_spin_waitport, 370 lwkt_spin_replyport, 371 dmsgfn, 372 pportfn_oncpu); 373 spin_init(&port->mpu_spin); 374 port->mpu_td = td; 375 if (fixed_cpuid) 376 port->mp_cpuid = td->td_gd->gd_cpuid; 377 } 378 379 /* 380 * lwkt_initport_serialize() 381 * 382 * Initialize a port for use with descriptors that might be accessed 383 * via multiple LWPs, processes, or threads. Callers are assumed to 384 * have held the serializer (slz). 385 */ 386 void 387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 388 { 389 _lwkt_initport(port, 390 lwkt_serialize_getport, 391 lwkt_serialize_putport, 392 lwkt_serialize_waitmsg, 393 lwkt_serialize_waitport, 394 lwkt_serialize_replyport, 395 lwkt_panic_dropmsg, 396 lwkt_panic_putport_oncpu); 397 port->mpu_serialize = slz; 398 } 399 400 /* 401 * Similar to the standard initport, this function simply marks the message 402 * as being done and does not attempt to return it to an originating port. 403 */ 404 void 405 lwkt_initport_replyonly_null(lwkt_port_t port) 406 { 407 _lwkt_initport(port, 408 lwkt_panic_getport, 409 lwkt_panic_putport, 410 lwkt_panic_waitmsg, 411 lwkt_panic_waitport, 412 lwkt_null_replyport, 413 lwkt_panic_dropmsg, 414 lwkt_panic_putport_oncpu); 415 } 416 417 /* 418 * Initialize a reply-only port, typically used as a message sink. Such 419 * ports can only be used as a reply port. 420 */ 421 void 422 lwkt_initport_replyonly(lwkt_port_t port, 423 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 424 { 425 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 426 lwkt_panic_waitmsg, lwkt_panic_waitport, 427 rportfn, lwkt_panic_dropmsg, 428 lwkt_panic_putport_oncpu); 429 } 430 431 void 432 lwkt_initport_putonly(lwkt_port_t port, 433 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 434 { 435 _lwkt_initport(port, lwkt_panic_getport, pportfn, 436 lwkt_panic_waitmsg, lwkt_panic_waitport, 437 lwkt_panic_replyport, lwkt_panic_dropmsg, 438 lwkt_panic_putport_oncpu); 439 } 440 441 void 442 lwkt_initport_panic(lwkt_port_t port) 443 { 444 _lwkt_initport(port, 445 lwkt_panic_getport, lwkt_panic_putport, 446 lwkt_panic_waitmsg, lwkt_panic_waitport, 447 lwkt_panic_replyport, lwkt_panic_dropmsg, 448 lwkt_panic_putport_oncpu); 449 } 450 451 static __inline 452 void 453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 454 { 455 lwkt_msg_queue *queue; 456 457 /* 458 * normal case, remove and return the message. 459 */ 460 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 461 queue = &port->mp_msgq_prio; 462 else 463 queue = &port->mp_msgq; 464 TAILQ_REMOVE(queue, msg, ms_node); 465 466 /* 467 * atomic op needed for spin ports 468 */ 469 atomic_clear_int(&msg->ms_flags, MSGF_QUEUED); 470 } 471 472 static __inline 473 void 474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 475 { 476 lwkt_msg_queue *queue; 477 478 /* 479 * atomic op needed for spin ports 480 */ 481 atomic_set_int(&msg->ms_flags, MSGF_QUEUED); 482 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 483 queue = &port->mp_msgq_prio; 484 else 485 queue = &port->mp_msgq; 486 TAILQ_INSERT_TAIL(queue, msg, ms_node); 487 } 488 489 static __inline 490 lwkt_msg_t 491 _lwkt_pollmsg(lwkt_port_t port) 492 { 493 lwkt_msg_t msg; 494 495 msg = TAILQ_FIRST(&port->mp_msgq_prio); 496 if (__predict_false(msg != NULL)) 497 return msg; 498 499 /* 500 * Priority queue has no message, fallback to non-priority queue. 501 */ 502 return TAILQ_FIRST(&port->mp_msgq); 503 } 504 505 static __inline 506 void 507 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 508 { 509 /* 510 * atomic op needed for spin ports 511 */ 512 _lwkt_pushmsg(port, msg); 513 atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE); 514 } 515 516 /************************************************************************ 517 * THREAD PORT BACKEND * 518 ************************************************************************ 519 * 520 * This backend is used when the port a message is retrieved from is owned 521 * by a single thread (the calling thread). Messages are IPId to the 522 * correct cpu before being enqueued to a port. Note that this is fairly 523 * optimal since scheduling would have had to do an IPI anyway if the 524 * message were headed to a different cpu. 525 */ 526 527 /* 528 * This function completes reply processing for the default case in the 529 * context of the originating cpu. 530 */ 531 static 532 void 533 lwkt_thread_replyport_remote(lwkt_msg_t msg) 534 { 535 lwkt_port_t port = msg->ms_reply_port; 536 int flags; 537 538 /* 539 * Chase any thread migration that occurs 540 */ 541 if (port->mpu_td->td_gd != mycpu) { 542 lwkt_send_ipiq(port->mpu_td->td_gd, 543 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 544 return; 545 } 546 547 /* 548 * Cleanup (in critical section, IPI on same cpu, atomic op not needed) 549 */ 550 #ifdef INVARIANTS 551 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 552 msg->ms_flags &= ~MSGF_INTRANSIT; 553 #endif 554 flags = msg->ms_flags; 555 if (msg->ms_flags & MSGF_SYNC) { 556 cpu_sfence(); 557 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 558 } else { 559 _lwkt_enqueue_reply(port, msg); 560 } 561 if (port->mp_flags & MSGPORTF_WAITING) 562 _lwkt_schedule_msg(port->mpu_td, flags); 563 } 564 565 /* 566 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 567 * 568 * Called with the reply port as an argument but in the context of the 569 * original target port. Completion must occur on the target port's 570 * cpu. 571 * 572 * The critical section protects us from IPIs on the this CPU. 573 */ 574 static 575 void 576 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 577 { 578 int flags; 579 580 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 581 582 if (msg->ms_flags & MSGF_SYNC) { 583 /* 584 * If a synchronous completion has been requested, just wakeup 585 * the message without bothering to queue it to the target port. 586 * 587 * Assume the target thread is non-preemptive, so no critical 588 * section is required. 589 */ 590 if (port->mpu_td->td_gd == mycpu) { 591 crit_enter(); 592 flags = msg->ms_flags; 593 cpu_sfence(); 594 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 595 if (port->mp_flags & MSGPORTF_WAITING) 596 _lwkt_schedule_msg(port->mpu_td, flags); 597 crit_exit(); 598 } else { 599 #ifdef INVARIANTS 600 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 601 #endif 602 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 603 lwkt_send_ipiq(port->mpu_td->td_gd, 604 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 605 } 606 } else { 607 /* 608 * If an asynchronous completion has been requested the message 609 * must be queued to the reply port. 610 * 611 * A critical section is required to interlock the port queue. 612 */ 613 if (port->mpu_td->td_gd == mycpu) { 614 crit_enter(); 615 _lwkt_enqueue_reply(port, msg); 616 if (port->mp_flags & MSGPORTF_WAITING) 617 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 618 crit_exit(); 619 } else { 620 #ifdef INVARIANTS 621 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 622 #endif 623 atomic_set_int(&msg->ms_flags, MSGF_REPLY); 624 lwkt_send_ipiq(port->mpu_td->td_gd, 625 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 626 } 627 } 628 } 629 630 /* 631 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 632 * 633 * This function could _only_ be used when caller is in the same thread 634 * as the message's target port owner thread. 635 */ 636 static int 637 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 638 { 639 int error; 640 641 KASSERT(port->mpu_td == curthread, 642 ("message could only be dropped in the same thread " 643 "as the message target port thread")); 644 crit_enter_quick(port->mpu_td); 645 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 646 _lwkt_pullmsg(port, msg); 647 atomic_set_int(&msg->ms_flags, MSGF_DONE); 648 error = 0; 649 } else { 650 error = ENOENT; 651 } 652 crit_exit_quick(port->mpu_td); 653 654 return (error); 655 } 656 657 /* 658 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 659 * 660 * Called with the target port as an argument but in the context of the 661 * reply port. This function always implements an asynchronous put to 662 * the target message port, and thus returns EASYNC. 663 * 664 * The message must already have cleared MSGF_DONE and MSGF_REPLY 665 */ 666 static 667 void 668 lwkt_thread_putport_remote(lwkt_msg_t msg) 669 { 670 lwkt_port_t port = msg->ms_target_port; 671 672 /* 673 * Chase any thread migration that occurs 674 */ 675 if (port->mpu_td->td_gd != mycpu) { 676 lwkt_send_ipiq(port->mpu_td->td_gd, 677 (ipifunc1_t)lwkt_thread_putport_remote, msg); 678 return; 679 } 680 681 /* 682 * An atomic op is needed on ms_flags vs originator. Also 683 * note that the originator might be using a different type 684 * of msgport. 685 */ 686 #ifdef INVARIANTS 687 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 688 atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT); 689 #endif 690 _lwkt_pushmsg(port, msg); 691 if (port->mp_flags & MSGPORTF_WAITING) 692 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 693 } 694 695 static 696 int 697 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 698 { 699 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 700 701 msg->ms_target_port = port; 702 if (port->mpu_td->td_gd == mycpu) { 703 crit_enter(); 704 _lwkt_pushmsg(port, msg); 705 if (port->mp_flags & MSGPORTF_WAITING) 706 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 707 crit_exit(); 708 } else { 709 #ifdef INVARIANTS 710 /* 711 * Cleanup. 712 * 713 * An atomic op is needed on ms_flags vs originator. Also 714 * note that the originator might be using a different type 715 * of msgport. 716 */ 717 atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT); 718 #endif 719 lwkt_send_ipiq(port->mpu_td->td_gd, 720 (ipifunc1_t)lwkt_thread_putport_remote, msg); 721 } 722 return (EASYNC); 723 } 724 725 /* 726 * lwkt_thread_getport() 727 * 728 * Retrieve the next message from the port or NULL if no messages 729 * are ready. 730 */ 731 static 732 void * 733 lwkt_thread_getport(lwkt_port_t port) 734 { 735 lwkt_msg_t msg; 736 737 KKASSERT(port->mpu_td == curthread); 738 739 crit_enter_quick(port->mpu_td); 740 if ((msg = _lwkt_pollmsg(port)) != NULL) 741 _lwkt_pullmsg(port, msg); 742 crit_exit_quick(port->mpu_td); 743 return(msg); 744 } 745 746 /* 747 * lwkt_thread_waitmsg() 748 * 749 * Wait for a particular message to be replied. We must be the only 750 * thread waiting on the message. The port must be owned by the 751 * caller. 752 */ 753 static 754 int 755 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 756 { 757 thread_t td = curthread; 758 759 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 760 ("can't wait dropable message")); 761 762 if ((msg->ms_flags & MSGF_DONE) == 0) { 763 /* 764 * If the done bit was not set we have to block until it is. 765 */ 766 lwkt_port_t port = msg->ms_reply_port; 767 int sentabort; 768 769 KKASSERT(port->mpu_td == td); 770 crit_enter_quick(td); 771 sentabort = 0; 772 773 while ((msg->ms_flags & MSGF_DONE) == 0) { 774 port->mp_flags |= MSGPORTF_WAITING; /* same cpu */ 775 if (sentabort == 0) { 776 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 777 lwkt_abortmsg(msg); 778 } 779 } else { 780 lwkt_sleep("waitabt", 0); 781 } 782 port->mp_flags &= ~MSGPORTF_WAITING; 783 } 784 if (msg->ms_flags & MSGF_QUEUED) 785 _lwkt_pullmsg(port, msg); 786 crit_exit_quick(td); 787 } else { 788 /* 789 * If the done bit was set we only have to mess around with the 790 * message if it is queued on the reply port. 791 */ 792 crit_enter_quick(td); 793 if (msg->ms_flags & MSGF_QUEUED) { 794 lwkt_port_t port = msg->ms_reply_port; 795 thread_t td __debugvar = curthread; 796 797 KKASSERT(port->mpu_td == td); 798 _lwkt_pullmsg(port, msg); 799 } 800 crit_exit_quick(td); 801 } 802 return(msg->ms_error); 803 } 804 805 /* 806 * lwkt_thread_waitport() 807 * 808 * Wait for a new message to be available on the port. We must be the 809 * the only thread waiting on the port. The port must be owned by caller. 810 */ 811 static 812 void * 813 lwkt_thread_waitport(lwkt_port_t port, int flags) 814 { 815 thread_t td = curthread; 816 lwkt_msg_t msg; 817 int error; 818 819 KKASSERT(port->mpu_td == td); 820 crit_enter_quick(td); 821 while ((msg = _lwkt_pollmsg(port)) == NULL) { 822 port->mp_flags |= MSGPORTF_WAITING; 823 error = lwkt_sleep("waitport", flags); 824 port->mp_flags &= ~MSGPORTF_WAITING; 825 if (error) 826 goto done; 827 } 828 _lwkt_pullmsg(port, msg); 829 done: 830 crit_exit_quick(td); 831 return(msg); 832 } 833 834 /************************************************************************ 835 * SPIN PORT BACKEND * 836 ************************************************************************ 837 * 838 * This backend uses spinlocks instead of making assumptions about which 839 * thread is accessing the port. It must be used when a port is not owned 840 * by a particular thread. This is less optimal then thread ports but 841 * you don't have a choice if there are multiple threads accessing the port. 842 * 843 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 844 * on the message port, it is the responsibility of the code doing the 845 * wakeup to clear this flag rather then the blocked threads. Some 846 * superfluous wakeups may occur, which is ok. 847 * 848 * XXX synchronous message wakeups are not current optimized. 849 */ 850 851 static 852 void * 853 lwkt_spin_getport(lwkt_port_t port) 854 { 855 lwkt_msg_t msg; 856 857 spin_lock(&port->mpu_spin); 858 if ((msg = _lwkt_pollmsg(port)) != NULL) 859 _lwkt_pullmsg(port, msg); 860 spin_unlock(&port->mpu_spin); 861 return(msg); 862 } 863 864 static __inline int 865 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg) 866 { 867 int dowakeup; 868 869 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 870 871 msg->ms_target_port = port; 872 spin_lock(&port->mpu_spin); 873 _lwkt_pushmsg(port, msg); 874 dowakeup = 0; 875 if (port->mp_flags & MSGPORTF_WAITING) { 876 port->mp_flags &= ~MSGPORTF_WAITING; 877 dowakeup = 1; 878 } 879 spin_unlock(&port->mpu_spin); 880 881 return dowakeup; 882 } 883 884 static 885 int 886 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 887 { 888 if (lwkt_spin_putport_only(port, msg)) 889 wakeup(port); 890 return (EASYNC); 891 } 892 893 static 894 int 895 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg) 896 { 897 KASSERT(port->mp_cpuid == mycpuid, 898 ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d", 899 port->mp_cpuid, mycpuid)); 900 if (lwkt_spin_putport_only(port, msg)) 901 wakeup_mycpu(port); 902 return (EASYNC); 903 } 904 905 static 906 int 907 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 908 { 909 lwkt_port_t port; 910 int sentabort; 911 int error; 912 913 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 914 ("can't wait dropable message")); 915 port = msg->ms_reply_port; 916 917 if ((msg->ms_flags & MSGF_DONE) == 0) { 918 sentabort = 0; 919 spin_lock(&port->mpu_spin); 920 while ((msg->ms_flags & MSGF_DONE) == 0) { 921 void *won; 922 923 /* 924 * If message was sent synchronously from the beginning 925 * the wakeup will be on the message structure, else it 926 * will be on the port structure. 927 * 928 * ms_flags needs atomic op originator vs target MSGF_QUEUED 929 */ 930 if (msg->ms_flags & MSGF_SYNC) { 931 won = msg; 932 atomic_set_int(&msg->ms_flags, MSGF_WAITING); 933 } else { 934 won = port; 935 port->mp_flags |= MSGPORTF_WAITING; 936 } 937 938 /* 939 * Only messages which support abort can be interrupted. 940 * We must still wait for message completion regardless. 941 */ 942 if ((flags & PCATCH) && sentabort == 0) { 943 error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 944 if (error) { 945 sentabort = error; 946 spin_unlock(&port->mpu_spin); 947 lwkt_abortmsg(msg); 948 spin_lock(&port->mpu_spin); 949 } 950 } else { 951 error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0); 952 } 953 /* see note at the top on the MSGPORTF_WAITING flag */ 954 } 955 /* 956 * Turn EINTR into ERESTART if the signal indicates. 957 */ 958 if (sentabort && msg->ms_error == EINTR) 959 msg->ms_error = sentabort; 960 if (msg->ms_flags & MSGF_QUEUED) 961 _lwkt_pullmsg(port, msg); 962 spin_unlock(&port->mpu_spin); 963 } else { 964 spin_lock(&port->mpu_spin); 965 if (msg->ms_flags & MSGF_QUEUED) { 966 _lwkt_pullmsg(port, msg); 967 } 968 spin_unlock(&port->mpu_spin); 969 } 970 return(msg->ms_error); 971 } 972 973 static 974 void * 975 lwkt_spin_waitport(lwkt_port_t port, int flags) 976 { 977 lwkt_msg_t msg; 978 int error; 979 980 spin_lock(&port->mpu_spin); 981 while ((msg = _lwkt_pollmsg(port)) == NULL) { 982 port->mp_flags |= MSGPORTF_WAITING; 983 error = ssleep(port, &port->mpu_spin, flags, "waitport", 0); 984 /* see note at the top on the MSGPORTF_WAITING flag */ 985 if (error) { 986 spin_unlock(&port->mpu_spin); 987 return(NULL); 988 } 989 } 990 _lwkt_pullmsg(port, msg); 991 spin_unlock(&port->mpu_spin); 992 return(msg); 993 } 994 995 static 996 void 997 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 998 { 999 int dowakeup; 1000 1001 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1002 1003 if (msg->ms_flags & MSGF_SYNC) { 1004 /* 1005 * If a synchronous completion has been requested, just wakeup 1006 * the message without bothering to queue it to the target port. 1007 * 1008 * ms_flags protected by reply port spinlock 1009 */ 1010 spin_lock(&port->mpu_spin); 1011 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1012 dowakeup = 0; 1013 if (msg->ms_flags & MSGF_WAITING) { 1014 msg->ms_flags &= ~MSGF_WAITING; 1015 dowakeup = 1; 1016 } 1017 spin_unlock(&port->mpu_spin); 1018 if (dowakeup) 1019 wakeup(msg); 1020 } else { 1021 /* 1022 * If an asynchronous completion has been requested the message 1023 * must be queued to the reply port. 1024 */ 1025 spin_lock(&port->mpu_spin); 1026 _lwkt_enqueue_reply(port, msg); 1027 dowakeup = 0; 1028 if (port->mp_flags & MSGPORTF_WAITING) { 1029 port->mp_flags &= ~MSGPORTF_WAITING; 1030 dowakeup = 1; 1031 } 1032 spin_unlock(&port->mpu_spin); 1033 if (dowakeup) 1034 wakeup(port); 1035 } 1036 } 1037 1038 /* 1039 * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg() 1040 * 1041 * This function could _only_ be used when caller is in the same thread 1042 * as the message's target port owner thread. 1043 */ 1044 static int 1045 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1046 { 1047 int error; 1048 1049 KASSERT(port->mpu_td == curthread, 1050 ("message could only be dropped in the same thread " 1051 "as the message target port thread\n")); 1052 spin_lock(&port->mpu_spin); 1053 if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) { 1054 _lwkt_pullmsg(port, msg); 1055 msg->ms_flags |= MSGF_DONE; 1056 error = 0; 1057 } else { 1058 error = ENOENT; 1059 } 1060 spin_unlock(&port->mpu_spin); 1061 1062 return (error); 1063 } 1064 1065 /************************************************************************ 1066 * SERIALIZER PORT BACKEND * 1067 ************************************************************************ 1068 * 1069 * This backend uses serializer to protect port accessing. Callers are 1070 * assumed to have serializer held. This kind of port is usually created 1071 * by network device driver along with _one_ lwkt thread to pipeline 1072 * operations which may temporarily release serializer. 1073 * 1074 * Implementation is based on SPIN PORT BACKEND. 1075 */ 1076 1077 static 1078 void * 1079 lwkt_serialize_getport(lwkt_port_t port) 1080 { 1081 lwkt_msg_t msg; 1082 1083 ASSERT_SERIALIZED(port->mpu_serialize); 1084 1085 if ((msg = _lwkt_pollmsg(port)) != NULL) 1086 _lwkt_pullmsg(port, msg); 1087 return(msg); 1088 } 1089 1090 static 1091 int 1092 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 1093 { 1094 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 1095 ASSERT_SERIALIZED(port->mpu_serialize); 1096 1097 msg->ms_target_port = port; 1098 _lwkt_pushmsg(port, msg); 1099 if (port->mp_flags & MSGPORTF_WAITING) { 1100 port->mp_flags &= ~MSGPORTF_WAITING; 1101 wakeup(port); 1102 } 1103 return (EASYNC); 1104 } 1105 1106 static 1107 int 1108 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 1109 { 1110 lwkt_port_t port; 1111 int sentabort; 1112 int error; 1113 1114 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 1115 ("can't wait dropable message")); 1116 1117 if ((msg->ms_flags & MSGF_DONE) == 0) { 1118 port = msg->ms_reply_port; 1119 1120 ASSERT_SERIALIZED(port->mpu_serialize); 1121 1122 sentabort = 0; 1123 while ((msg->ms_flags & MSGF_DONE) == 0) { 1124 void *won; 1125 1126 /* 1127 * If message was sent synchronously from the beginning 1128 * the wakeup will be on the message structure, else it 1129 * will be on the port structure. 1130 */ 1131 if (msg->ms_flags & MSGF_SYNC) { 1132 won = msg; 1133 } else { 1134 won = port; 1135 port->mp_flags |= MSGPORTF_WAITING; 1136 } 1137 1138 /* 1139 * Only messages which support abort can be interrupted. 1140 * We must still wait for message completion regardless. 1141 */ 1142 if ((flags & PCATCH) && sentabort == 0) { 1143 error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0); 1144 if (error) { 1145 sentabort = error; 1146 lwkt_serialize_exit(port->mpu_serialize); 1147 lwkt_abortmsg(msg); 1148 lwkt_serialize_enter(port->mpu_serialize); 1149 } 1150 } else { 1151 error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0); 1152 } 1153 /* see note at the top on the MSGPORTF_WAITING flag */ 1154 } 1155 /* 1156 * Turn EINTR into ERESTART if the signal indicates. 1157 */ 1158 if (sentabort && msg->ms_error == EINTR) 1159 msg->ms_error = sentabort; 1160 if (msg->ms_flags & MSGF_QUEUED) 1161 _lwkt_pullmsg(port, msg); 1162 } else { 1163 if (msg->ms_flags & MSGF_QUEUED) { 1164 port = msg->ms_reply_port; 1165 1166 ASSERT_SERIALIZED(port->mpu_serialize); 1167 _lwkt_pullmsg(port, msg); 1168 } 1169 } 1170 return(msg->ms_error); 1171 } 1172 1173 static 1174 void * 1175 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1176 { 1177 lwkt_msg_t msg; 1178 int error; 1179 1180 ASSERT_SERIALIZED(port->mpu_serialize); 1181 1182 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1183 port->mp_flags |= MSGPORTF_WAITING; 1184 error = zsleep(port, port->mpu_serialize, flags, "waitport", 0); 1185 /* see note at the top on the MSGPORTF_WAITING flag */ 1186 if (error) 1187 return(NULL); 1188 } 1189 _lwkt_pullmsg(port, msg); 1190 return(msg); 1191 } 1192 1193 static 1194 void 1195 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1196 { 1197 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1198 ASSERT_SERIALIZED(port->mpu_serialize); 1199 1200 if (msg->ms_flags & MSGF_SYNC) { 1201 /* 1202 * If a synchronous completion has been requested, just wakeup 1203 * the message without bothering to queue it to the target port. 1204 * 1205 * (both sides synchronized via serialized reply port) 1206 */ 1207 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1208 wakeup(msg); 1209 } else { 1210 /* 1211 * If an asynchronous completion has been requested the message 1212 * must be queued to the reply port. 1213 */ 1214 _lwkt_enqueue_reply(port, msg); 1215 if (port->mp_flags & MSGPORTF_WAITING) { 1216 port->mp_flags &= ~MSGPORTF_WAITING; 1217 wakeup(port); 1218 } 1219 } 1220 } 1221 1222 /************************************************************************ 1223 * PANIC AND SPECIAL PORT FUNCTIONS * 1224 ************************************************************************/ 1225 1226 /* 1227 * You can point a port's reply vector at this function if you just want 1228 * the message marked done, without any queueing or signaling. This is 1229 * often used for structure-embedded messages. 1230 */ 1231 static 1232 void 1233 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1234 { 1235 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1236 } 1237 1238 static 1239 void * 1240 lwkt_panic_getport(lwkt_port_t port) 1241 { 1242 panic("lwkt_getport() illegal on port %p", port); 1243 } 1244 1245 static 1246 int 1247 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1248 { 1249 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1250 } 1251 1252 static 1253 int 1254 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1255 { 1256 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1257 } 1258 1259 static 1260 void * 1261 lwkt_panic_waitport(lwkt_port_t port, int flags) 1262 { 1263 panic("port %p cannot be waited on", port); 1264 } 1265 1266 static 1267 void 1268 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1269 { 1270 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1271 } 1272 1273 static 1274 int 1275 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1276 { 1277 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1278 /* NOT REACHED */ 1279 return (ENOENT); 1280 } 1281 1282 static 1283 int 1284 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg) 1285 { 1286 panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p", 1287 port, msg); 1288 /* NOT REACHED */ 1289 return (ENOENT); 1290 } 1291