1 /* 2 * Copyright (c) 2003,2004 The DragonFly Project. All rights reserved. 3 * 4 * This code is derived from software contributed to The DragonFly Project 5 * by Matthew Dillon <dillon@backplane.com> 6 * 7 * Redistribution and use in source and binary forms, with or without 8 * modification, are permitted provided that the following conditions 9 * are met: 10 * 11 * 1. Redistributions of source code must retain the above copyright 12 * notice, this list of conditions and the following disclaimer. 13 * 2. Redistributions in binary form must reproduce the above copyright 14 * notice, this list of conditions and the following disclaimer in 15 * the documentation and/or other materials provided with the 16 * distribution. 17 * 3. Neither the name of The DragonFly Project nor the names of its 18 * contributors may be used to endorse or promote products derived 19 * from this software without specific, prior written permission. 20 * 21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, 27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED 29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, 30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT 31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF 32 * SUCH DAMAGE. 33 * 34 * NOTE! This file may be compiled for userland libraries as well as for 35 * the kernel. 36 * 37 * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $ 38 */ 39 40 #include <sys/param.h> 41 #include <sys/systm.h> 42 #include <sys/kernel.h> 43 #include <sys/proc.h> 44 #include <sys/rtprio.h> 45 #include <sys/queue.h> 46 #include <sys/sysctl.h> 47 #include <sys/kthread.h> 48 #include <sys/signalvar.h> 49 #include <sys/signal2.h> 50 #include <machine/cpu.h> 51 #include <sys/lock.h> 52 53 #include <vm/vm.h> 54 #include <vm/vm_param.h> 55 #include <vm/vm_kern.h> 56 #include <vm/vm_object.h> 57 #include <vm/vm_page.h> 58 #include <vm/vm_map.h> 59 #include <vm/vm_pager.h> 60 #include <vm/vm_extern.h> 61 #include <vm/vm_zone.h> 62 63 #include <sys/thread2.h> 64 #include <sys/msgport2.h> 65 #include <sys/spinlock2.h> 66 #include <sys/serialize.h> 67 68 #include <machine/stdarg.h> 69 #include <machine/cpufunc.h> 70 #ifdef SMP 71 #include <machine/smp.h> 72 #endif 73 74 #include <sys/malloc.h> 75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message"); 76 77 /************************************************************************ 78 * MESSAGE FUNCTIONS * 79 ************************************************************************/ 80 81 /* 82 * lwkt_sendmsg() 83 * 84 * Request asynchronous completion and call lwkt_beginmsg(). The 85 * target port can opt to execute the message synchronously or 86 * asynchronously and this function will automatically queue the 87 * response if the target executes the message synchronously. 88 * 89 * NOTE: The message is in an indeterminant state until this call 90 * returns. The caller should not mess with it (e.g. try to abort it) 91 * until then. 92 */ 93 void 94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg) 95 { 96 int error; 97 98 KKASSERT(msg->ms_reply_port != NULL && 99 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 100 msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE); 101 if ((error = lwkt_beginmsg(port, msg)) != EASYNC) { 102 lwkt_replymsg(msg, error); 103 } 104 } 105 106 /* 107 * lwkt_domsg() 108 * 109 * Request asynchronous completion and call lwkt_beginmsg(). The 110 * target port can opt to execute the message synchronously or 111 * asynchronously and this function will automatically queue the 112 * response if the target executes the message synchronously. 113 */ 114 int 115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags) 116 { 117 int error; 118 119 KKASSERT(msg->ms_reply_port != NULL && 120 (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE); 121 msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE); 122 msg->ms_flags |= MSGF_SYNC; 123 if ((error = lwkt_beginmsg(port, msg)) == EASYNC) { 124 error = lwkt_waitmsg(msg, flags); 125 } else { 126 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 127 } 128 return(error); 129 } 130 131 /* 132 * lwkt_forwardmsg() 133 * 134 * Forward a message received on one port to another port. 135 */ 136 int 137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg) 138 { 139 int error; 140 141 crit_enter(); 142 KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0); 143 if ((error = port->mp_putport(port, msg)) != EASYNC) 144 lwkt_replymsg(msg, error); 145 crit_exit(); 146 return(error); 147 } 148 149 /* 150 * lwkt_abortmsg() 151 * 152 * Attempt to abort a message. This only works if MSGF_ABORTABLE is set. 153 * The caller must ensure that the message will not be both replied AND 154 * destroyed while the abort is in progress. 155 * 156 * This function issues a callback which might block! 157 */ 158 void 159 lwkt_abortmsg(lwkt_msg_t msg) 160 { 161 /* 162 * A critical section protects us from reply IPIs on this cpu. 163 */ 164 crit_enter(); 165 166 /* 167 * Shortcut the operation if the message has already been returned. 168 * The callback typically constructs a lwkt_msg with the abort request, 169 * issues it synchronously, and waits for completion. The callback 170 * is not required to actually abort the message and the target port, 171 * upon receiving an abort request message generated by the callback 172 * should check whether the original message has already completed or 173 * not. 174 */ 175 if (msg->ms_flags & MSGF_ABORTABLE) { 176 if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0) 177 msg->ms_abortfn(msg); 178 } 179 crit_exit(); 180 } 181 182 /************************************************************************ 183 * PORT INITIALIZATION API * 184 ************************************************************************/ 185 186 static void *lwkt_thread_getport(lwkt_port_t port); 187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); 188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); 189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags); 190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); 191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 192 193 static void *lwkt_spin_getport(lwkt_port_t port); 194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); 195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags); 196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags); 197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg); 198 199 static void *lwkt_serialize_getport(lwkt_port_t port); 200 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg); 201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags); 202 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags); 203 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg); 204 205 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg); 206 static void *lwkt_panic_getport(lwkt_port_t port); 207 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); 208 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); 209 static void *lwkt_panic_waitport(lwkt_port_t port, int flags); 210 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); 211 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); 212 213 /* 214 * Core port initialization (internal) 215 */ 216 static __inline 217 void 218 _lwkt_initport(lwkt_port_t port, 219 void *(*gportfn)(lwkt_port_t), 220 int (*pportfn)(lwkt_port_t, lwkt_msg_t), 221 int (*wmsgfn)(lwkt_msg_t, int), 222 void *(*wportfn)(lwkt_port_t, int), 223 void (*rportfn)(lwkt_port_t, lwkt_msg_t), 224 void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) 225 { 226 bzero(port, sizeof(*port)); 227 TAILQ_INIT(&port->mp_msgq); 228 TAILQ_INIT(&port->mp_msgq_prio); 229 port->mp_getport = gportfn; 230 port->mp_putport = pportfn; 231 port->mp_waitmsg = wmsgfn; 232 port->mp_waitport = wportfn; 233 port->mp_replyport = rportfn; 234 port->mp_dropmsg = dmsgfn; 235 } 236 237 /* 238 * Schedule the target thread. If the message flags contains MSGF_NORESCHED 239 * we tell the scheduler not to reschedule if td is at a higher priority. 240 * 241 * This routine is called even if the thread is already scheduled so messages 242 * without NORESCHED will cause the target thread to be rescheduled even if 243 * prior messages did not. 244 */ 245 static __inline 246 void 247 _lwkt_schedule_msg(thread_t td, int flags) 248 { 249 if (flags & MSGF_NORESCHED) 250 lwkt_schedule_noresched(td); 251 else 252 lwkt_schedule(td); 253 } 254 255 /* 256 * lwkt_initport_thread() 257 * 258 * Initialize a port for use by a particular thread. The port may 259 * only be used by <td>. 260 */ 261 void 262 lwkt_initport_thread(lwkt_port_t port, thread_t td) 263 { 264 _lwkt_initport(port, 265 lwkt_thread_getport, 266 lwkt_thread_putport, 267 lwkt_thread_waitmsg, 268 lwkt_thread_waitport, 269 lwkt_thread_replyport, 270 lwkt_thread_dropmsg); 271 port->mpu_td = td; 272 } 273 274 /* 275 * lwkt_initport_spin() 276 * 277 * Initialize a port for use with descriptors that might be accessed 278 * via multiple LWPs, processes, or threads. Has somewhat more 279 * overhead then thread ports. 280 */ 281 void 282 lwkt_initport_spin(lwkt_port_t port) 283 { 284 _lwkt_initport(port, 285 lwkt_spin_getport, 286 lwkt_spin_putport, 287 lwkt_spin_waitmsg, 288 lwkt_spin_waitport, 289 lwkt_spin_replyport, 290 lwkt_panic_dropmsg); 291 spin_init(&port->mpu_spin); 292 } 293 294 /* 295 * lwkt_initport_serialize() 296 * 297 * Initialize a port for use with descriptors that might be accessed 298 * via multiple LWPs, processes, or threads. Callers are assumed to 299 * have held the serializer (slz). 300 */ 301 void 302 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) 303 { 304 _lwkt_initport(port, 305 lwkt_serialize_getport, 306 lwkt_serialize_putport, 307 lwkt_serialize_waitmsg, 308 lwkt_serialize_waitport, 309 lwkt_serialize_replyport, 310 lwkt_panic_dropmsg); 311 port->mpu_serialize = slz; 312 } 313 314 /* 315 * Similar to the standard initport, this function simply marks the message 316 * as being done and does not attempt to return it to an originating port. 317 */ 318 void 319 lwkt_initport_replyonly_null(lwkt_port_t port) 320 { 321 _lwkt_initport(port, 322 lwkt_panic_getport, 323 lwkt_panic_putport, 324 lwkt_panic_waitmsg, 325 lwkt_panic_waitport, 326 lwkt_null_replyport, 327 lwkt_panic_dropmsg); 328 } 329 330 /* 331 * Initialize a reply-only port, typically used as a message sink. Such 332 * ports can only be used as a reply port. 333 */ 334 void 335 lwkt_initport_replyonly(lwkt_port_t port, 336 void (*rportfn)(lwkt_port_t, lwkt_msg_t)) 337 { 338 _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, 339 lwkt_panic_waitmsg, lwkt_panic_waitport, 340 rportfn, lwkt_panic_dropmsg); 341 } 342 343 void 344 lwkt_initport_putonly(lwkt_port_t port, 345 int (*pportfn)(lwkt_port_t, lwkt_msg_t)) 346 { 347 _lwkt_initport(port, lwkt_panic_getport, pportfn, 348 lwkt_panic_waitmsg, lwkt_panic_waitport, 349 lwkt_panic_replyport, lwkt_panic_dropmsg); 350 } 351 352 void 353 lwkt_initport_panic(lwkt_port_t port) 354 { 355 _lwkt_initport(port, 356 lwkt_panic_getport, lwkt_panic_putport, 357 lwkt_panic_waitmsg, lwkt_panic_waitport, 358 lwkt_panic_replyport, lwkt_panic_dropmsg); 359 } 360 361 static __inline 362 void 363 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) 364 { 365 lwkt_msg_queue *queue; 366 367 /* 368 * normal case, remove and return the message. 369 */ 370 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 371 queue = &port->mp_msgq_prio; 372 else 373 queue = &port->mp_msgq; 374 TAILQ_REMOVE(queue, msg, ms_node); 375 msg->ms_flags &= ~MSGF_QUEUED; 376 } 377 378 static __inline 379 void 380 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) 381 { 382 lwkt_msg_queue *queue; 383 384 msg->ms_flags |= MSGF_QUEUED; 385 if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) 386 queue = &port->mp_msgq_prio; 387 else 388 queue = &port->mp_msgq; 389 TAILQ_INSERT_TAIL(queue, msg, ms_node); 390 } 391 392 static __inline 393 lwkt_msg_t 394 _lwkt_pollmsg(lwkt_port_t port) 395 { 396 lwkt_msg_t msg; 397 398 msg = TAILQ_FIRST(&port->mp_msgq_prio); 399 if (__predict_false(msg != NULL)) 400 return msg; 401 402 /* 403 * Priority queue has no message, fallback to non-priority queue. 404 */ 405 return TAILQ_FIRST(&port->mp_msgq); 406 } 407 408 static __inline 409 void 410 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) 411 { 412 _lwkt_pushmsg(port, msg); 413 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 414 } 415 416 /************************************************************************ 417 * THREAD PORT BACKEND * 418 ************************************************************************ 419 * 420 * This backend is used when the port a message is retrieved from is owned 421 * by a single thread (the calling thread). Messages are IPId to the 422 * correct cpu before being enqueued to a port. Note that this is fairly 423 * optimal since scheduling would have had to do an IPI anyway if the 424 * message were headed to a different cpu. 425 */ 426 427 #ifdef SMP 428 429 /* 430 * This function completes reply processing for the default case in the 431 * context of the originating cpu. 432 */ 433 static 434 void 435 lwkt_thread_replyport_remote(lwkt_msg_t msg) 436 { 437 lwkt_port_t port = msg->ms_reply_port; 438 int flags; 439 440 /* 441 * Chase any thread migration that occurs 442 */ 443 if (port->mpu_td->td_gd != mycpu) { 444 lwkt_send_ipiq(port->mpu_td->td_gd, 445 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 446 return; 447 } 448 449 /* 450 * Cleanup 451 */ 452 #ifdef INVARIANTS 453 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 454 msg->ms_flags &= ~MSGF_INTRANSIT; 455 #endif 456 flags = msg->ms_flags; 457 if (msg->ms_flags & MSGF_SYNC) { 458 cpu_sfence(); 459 msg->ms_flags |= MSGF_REPLY | MSGF_DONE; 460 } else { 461 _lwkt_enqueue_reply(port, msg); 462 } 463 if (port->mp_flags & MSGPORTF_WAITING) 464 _lwkt_schedule_msg(port->mpu_td, flags); 465 } 466 467 #endif 468 469 /* 470 * lwkt_thread_replyport() - Backend to lwkt_replymsg() 471 * 472 * Called with the reply port as an argument but in the context of the 473 * original target port. Completion must occur on the target port's 474 * cpu. 475 * 476 * The critical section protects us from IPIs on the this CPU. 477 */ 478 static 479 void 480 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) 481 { 482 int flags; 483 484 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0); 485 486 if (msg->ms_flags & MSGF_SYNC) { 487 /* 488 * If a synchronous completion has been requested, just wakeup 489 * the message without bothering to queue it to the target port. 490 * 491 * Assume the target thread is non-preemptive, so no critical 492 * section is required. 493 */ 494 #ifdef SMP 495 if (port->mpu_td->td_gd == mycpu) { 496 #endif 497 flags = msg->ms_flags; 498 cpu_sfence(); 499 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 500 if (port->mp_flags & MSGPORTF_WAITING) 501 _lwkt_schedule_msg(port->mpu_td, flags); 502 #ifdef SMP 503 } else { 504 #ifdef INVARIANTS 505 msg->ms_flags |= MSGF_INTRANSIT; 506 #endif 507 msg->ms_flags |= MSGF_REPLY; 508 lwkt_send_ipiq(port->mpu_td->td_gd, 509 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 510 } 511 #endif 512 } else { 513 /* 514 * If an asynchronous completion has been requested the message 515 * must be queued to the reply port. 516 * 517 * A critical section is required to interlock the port queue. 518 */ 519 #ifdef SMP 520 if (port->mpu_td->td_gd == mycpu) { 521 #endif 522 crit_enter(); 523 _lwkt_enqueue_reply(port, msg); 524 if (port->mp_flags & MSGPORTF_WAITING) 525 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 526 crit_exit(); 527 #ifdef SMP 528 } else { 529 #ifdef INVARIANTS 530 msg->ms_flags |= MSGF_INTRANSIT; 531 #endif 532 msg->ms_flags |= MSGF_REPLY; 533 lwkt_send_ipiq(port->mpu_td->td_gd, 534 (ipifunc1_t)lwkt_thread_replyport_remote, msg); 535 } 536 #endif 537 } 538 } 539 540 /* 541 * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() 542 * 543 * This function could _only_ be used when caller is in the same thread 544 * as the message's target port owner thread. 545 */ 546 static void 547 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 548 { 549 KASSERT(port->mpu_td == curthread, 550 ("message could only be dropped in the same thread " 551 "as the message target port thread\n")); 552 crit_enter_quick(port->mpu_td); 553 _lwkt_pullmsg(port, msg); 554 msg->ms_flags |= MSGF_DONE; 555 crit_exit_quick(port->mpu_td); 556 } 557 558 /* 559 * lwkt_thread_putport() - Backend to lwkt_beginmsg() 560 * 561 * Called with the target port as an argument but in the context of the 562 * reply port. This function always implements an asynchronous put to 563 * the target message port, and thus returns EASYNC. 564 * 565 * The message must already have cleared MSGF_DONE and MSGF_REPLY 566 */ 567 568 #ifdef SMP 569 570 static 571 void 572 lwkt_thread_putport_remote(lwkt_msg_t msg) 573 { 574 lwkt_port_t port = msg->ms_target_port; 575 576 /* 577 * Chase any thread migration that occurs 578 */ 579 if (port->mpu_td->td_gd != mycpu) { 580 lwkt_send_ipiq(port->mpu_td->td_gd, 581 (ipifunc1_t)lwkt_thread_putport_remote, msg); 582 return; 583 } 584 585 /* 586 * Cleanup 587 */ 588 #ifdef INVARIANTS 589 KKASSERT(msg->ms_flags & MSGF_INTRANSIT); 590 msg->ms_flags &= ~MSGF_INTRANSIT; 591 #endif 592 _lwkt_pushmsg(port, msg); 593 if (port->mp_flags & MSGPORTF_WAITING) 594 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 595 } 596 597 #endif 598 599 static 600 int 601 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg) 602 { 603 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 604 605 msg->ms_target_port = port; 606 #ifdef SMP 607 if (port->mpu_td->td_gd == mycpu) { 608 #endif 609 crit_enter(); 610 _lwkt_pushmsg(port, msg); 611 if (port->mp_flags & MSGPORTF_WAITING) 612 _lwkt_schedule_msg(port->mpu_td, msg->ms_flags); 613 crit_exit(); 614 #ifdef SMP 615 } else { 616 #ifdef INVARIANTS 617 msg->ms_flags |= MSGF_INTRANSIT; 618 #endif 619 lwkt_send_ipiq(port->mpu_td->td_gd, 620 (ipifunc1_t)lwkt_thread_putport_remote, msg); 621 } 622 #endif 623 return (EASYNC); 624 } 625 626 /* 627 * lwkt_thread_getport() 628 * 629 * Retrieve the next message from the port or NULL if no messages 630 * are ready. 631 */ 632 static 633 void * 634 lwkt_thread_getport(lwkt_port_t port) 635 { 636 lwkt_msg_t msg; 637 638 KKASSERT(port->mpu_td == curthread); 639 640 crit_enter_quick(port->mpu_td); 641 if ((msg = _lwkt_pollmsg(port)) != NULL) 642 _lwkt_pullmsg(port, msg); 643 crit_exit_quick(port->mpu_td); 644 return(msg); 645 } 646 647 /* 648 * lwkt_thread_waitmsg() 649 * 650 * Wait for a particular message to be replied. We must be the only 651 * thread waiting on the message. The port must be owned by the 652 * caller. 653 */ 654 static 655 int 656 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) 657 { 658 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 659 ("can't wait dropable message\n")); 660 661 if ((msg->ms_flags & MSGF_DONE) == 0) { 662 /* 663 * If the done bit was not set we have to block until it is. 664 */ 665 lwkt_port_t port = msg->ms_reply_port; 666 thread_t td = curthread; 667 int sentabort; 668 669 KKASSERT(port->mpu_td == td); 670 crit_enter_quick(td); 671 sentabort = 0; 672 673 while ((msg->ms_flags & MSGF_DONE) == 0) { 674 port->mp_flags |= MSGPORTF_WAITING; 675 if (sentabort == 0) { 676 if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) { 677 lwkt_abortmsg(msg); 678 } 679 } else { 680 lwkt_sleep("waitabt", 0); 681 } 682 port->mp_flags &= ~MSGPORTF_WAITING; 683 } 684 if (msg->ms_flags & MSGF_QUEUED) 685 _lwkt_pullmsg(port, msg); 686 crit_exit_quick(td); 687 } else { 688 /* 689 * If the done bit was set we only have to mess around with the 690 * message if it is queued on the reply port. 691 */ 692 if (msg->ms_flags & MSGF_QUEUED) { 693 lwkt_port_t port = msg->ms_reply_port; 694 thread_t td = curthread; 695 696 KKASSERT(port->mpu_td == td); 697 crit_enter_quick(td); 698 _lwkt_pullmsg(port, msg); 699 crit_exit_quick(td); 700 } 701 } 702 return(msg->ms_error); 703 } 704 705 static 706 void * 707 lwkt_thread_waitport(lwkt_port_t port, int flags) 708 { 709 thread_t td = curthread; 710 lwkt_msg_t msg; 711 int error; 712 713 KKASSERT(port->mpu_td == td); 714 crit_enter_quick(td); 715 while ((msg = _lwkt_pollmsg(port)) == NULL) { 716 port->mp_flags |= MSGPORTF_WAITING; 717 error = lwkt_sleep("waitport", flags); 718 port->mp_flags &= ~MSGPORTF_WAITING; 719 if (error) 720 goto done; 721 } 722 _lwkt_pullmsg(port, msg); 723 done: 724 crit_exit_quick(td); 725 return(msg); 726 } 727 728 /************************************************************************ 729 * SPIN PORT BACKEND * 730 ************************************************************************ 731 * 732 * This backend uses spinlocks instead of making assumptions about which 733 * thread is accessing the port. It must be used when a port is not owned 734 * by a particular thread. This is less optimal then thread ports but 735 * you don't have a choice if there are multiple threads accessing the port. 736 * 737 * Note on MSGPORTF_WAITING - because there may be multiple threads blocked 738 * on the message port, it is the responsibility of the code doing the 739 * wakeup to clear this flag rather then the blocked threads. Some 740 * superfluous wakeups may occur, which is ok. 741 * 742 * XXX synchronous message wakeups are not current optimized. 743 */ 744 745 static 746 void * 747 lwkt_spin_getport(lwkt_port_t port) 748 { 749 lwkt_msg_t msg; 750 751 spin_lock_wr(&port->mpu_spin); 752 if ((msg = _lwkt_pollmsg(port)) != NULL) 753 _lwkt_pullmsg(port, msg); 754 spin_unlock_wr(&port->mpu_spin); 755 return(msg); 756 } 757 758 static 759 int 760 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg) 761 { 762 int dowakeup; 763 764 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 765 766 msg->ms_target_port = port; 767 spin_lock_wr(&port->mpu_spin); 768 _lwkt_pushmsg(port, msg); 769 dowakeup = 0; 770 if (port->mp_flags & MSGPORTF_WAITING) { 771 port->mp_flags &= ~MSGPORTF_WAITING; 772 dowakeup = 1; 773 } 774 spin_unlock_wr(&port->mpu_spin); 775 if (dowakeup) 776 wakeup(port); 777 return (EASYNC); 778 } 779 780 static 781 int 782 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) 783 { 784 lwkt_port_t port; 785 int sentabort; 786 int error; 787 788 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 789 ("can't wait dropable message\n")); 790 791 if ((msg->ms_flags & MSGF_DONE) == 0) { 792 port = msg->ms_reply_port; 793 sentabort = 0; 794 spin_lock_wr(&port->mpu_spin); 795 while ((msg->ms_flags & MSGF_DONE) == 0) { 796 void *won; 797 798 /* 799 * If message was sent synchronously from the beginning 800 * the wakeup will be on the message structure, else it 801 * will be on the port structure. 802 */ 803 if (msg->ms_flags & MSGF_SYNC) { 804 won = msg; 805 } else { 806 won = port; 807 port->mp_flags |= MSGPORTF_WAITING; 808 } 809 810 /* 811 * Only messages which support abort can be interrupted. 812 * We must still wait for message completion regardless. 813 */ 814 if ((flags & PCATCH) && sentabort == 0) { 815 error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0); 816 if (error) { 817 sentabort = error; 818 spin_unlock_wr(&port->mpu_spin); 819 lwkt_abortmsg(msg); 820 spin_lock_wr(&port->mpu_spin); 821 } 822 } else { 823 error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0); 824 } 825 /* see note at the top on the MSGPORTF_WAITING flag */ 826 } 827 /* 828 * Turn EINTR into ERESTART if the signal indicates. 829 */ 830 if (sentabort && msg->ms_error == EINTR) 831 msg->ms_error = sentabort; 832 if (msg->ms_flags & MSGF_QUEUED) 833 _lwkt_pullmsg(port, msg); 834 spin_unlock_wr(&port->mpu_spin); 835 } else { 836 if (msg->ms_flags & MSGF_QUEUED) { 837 port = msg->ms_reply_port; 838 spin_lock_wr(&port->mpu_spin); 839 _lwkt_pullmsg(port, msg); 840 spin_unlock_wr(&port->mpu_spin); 841 } 842 } 843 return(msg->ms_error); 844 } 845 846 static 847 void * 848 lwkt_spin_waitport(lwkt_port_t port, int flags) 849 { 850 lwkt_msg_t msg; 851 int error; 852 853 spin_lock_wr(&port->mpu_spin); 854 while ((msg = _lwkt_pollmsg(port)) == NULL) { 855 port->mp_flags |= MSGPORTF_WAITING; 856 error = msleep(port, &port->mpu_spin, flags, "waitport", 0); 857 /* see note at the top on the MSGPORTF_WAITING flag */ 858 if (error) { 859 spin_unlock_wr(&port->mpu_spin); 860 return(NULL); 861 } 862 } 863 _lwkt_pullmsg(port, msg); 864 spin_unlock_wr(&port->mpu_spin); 865 return(msg); 866 } 867 868 static 869 void 870 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg) 871 { 872 int dowakeup; 873 874 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 875 876 if (msg->ms_flags & MSGF_SYNC) { 877 /* 878 * If a synchronous completion has been requested, just wakeup 879 * the message without bothering to queue it to the target port. 880 */ 881 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 882 wakeup(msg); 883 } else { 884 /* 885 * If an asynchronous completion has been requested the message 886 * must be queued to the reply port. 887 */ 888 spin_lock_wr(&port->mpu_spin); 889 _lwkt_enqueue_reply(port, msg); 890 dowakeup = 0; 891 if (port->mp_flags & MSGPORTF_WAITING) { 892 port->mp_flags &= ~MSGPORTF_WAITING; 893 dowakeup = 1; 894 } 895 spin_unlock_wr(&port->mpu_spin); 896 if (dowakeup) 897 wakeup(port); 898 } 899 } 900 901 /************************************************************************ 902 * SERIALIZER PORT BACKEND * 903 ************************************************************************ 904 * 905 * This backend uses serializer to protect port accessing. Callers are 906 * assumed to have serializer held. This kind of port is usually created 907 * by network device driver along with _one_ lwkt thread to pipeline 908 * operations which may temporarily release serializer. 909 * 910 * Implementation is based on SPIN PORT BACKEND. 911 */ 912 913 static 914 void * 915 lwkt_serialize_getport(lwkt_port_t port) 916 { 917 lwkt_msg_t msg; 918 919 ASSERT_SERIALIZED(port->mpu_serialize); 920 921 if ((msg = _lwkt_pollmsg(port)) != NULL) 922 _lwkt_pullmsg(port, msg); 923 return(msg); 924 } 925 926 static 927 int 928 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg) 929 { 930 KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0); 931 ASSERT_SERIALIZED(port->mpu_serialize); 932 933 msg->ms_target_port = port; 934 _lwkt_pushmsg(port, msg); 935 if (port->mp_flags & MSGPORTF_WAITING) { 936 port->mp_flags &= ~MSGPORTF_WAITING; 937 wakeup(port); 938 } 939 return (EASYNC); 940 } 941 942 static 943 int 944 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) 945 { 946 lwkt_port_t port; 947 int sentabort; 948 int error; 949 950 KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, 951 ("can't wait dropable message\n")); 952 953 if ((msg->ms_flags & MSGF_DONE) == 0) { 954 port = msg->ms_reply_port; 955 956 ASSERT_SERIALIZED(port->mpu_serialize); 957 958 sentabort = 0; 959 while ((msg->ms_flags & MSGF_DONE) == 0) { 960 void *won; 961 962 /* 963 * If message was sent synchronously from the beginning 964 * the wakeup will be on the message structure, else it 965 * will be on the port structure. 966 */ 967 if (msg->ms_flags & MSGF_SYNC) { 968 won = msg; 969 } else { 970 won = port; 971 port->mp_flags |= MSGPORTF_WAITING; 972 } 973 974 /* 975 * Only messages which support abort can be interrupted. 976 * We must still wait for message completion regardless. 977 */ 978 if ((flags & PCATCH) && sentabort == 0) { 979 error = serialize_sleep(won, port->mpu_serialize, PCATCH, 980 "waitmsg", 0); 981 if (error) { 982 sentabort = error; 983 lwkt_serialize_exit(port->mpu_serialize); 984 lwkt_abortmsg(msg); 985 lwkt_serialize_enter(port->mpu_serialize); 986 } 987 } else { 988 error = serialize_sleep(won, port->mpu_serialize, 0, 989 "waitmsg", 0); 990 } 991 /* see note at the top on the MSGPORTF_WAITING flag */ 992 } 993 /* 994 * Turn EINTR into ERESTART if the signal indicates. 995 */ 996 if (sentabort && msg->ms_error == EINTR) 997 msg->ms_error = sentabort; 998 if (msg->ms_flags & MSGF_QUEUED) 999 _lwkt_pullmsg(port, msg); 1000 } else { 1001 if (msg->ms_flags & MSGF_QUEUED) { 1002 port = msg->ms_reply_port; 1003 1004 ASSERT_SERIALIZED(port->mpu_serialize); 1005 _lwkt_pullmsg(port, msg); 1006 } 1007 } 1008 return(msg->ms_error); 1009 } 1010 1011 static 1012 void * 1013 lwkt_serialize_waitport(lwkt_port_t port, int flags) 1014 { 1015 lwkt_msg_t msg; 1016 int error; 1017 1018 ASSERT_SERIALIZED(port->mpu_serialize); 1019 1020 while ((msg = _lwkt_pollmsg(port)) == NULL) { 1021 port->mp_flags |= MSGPORTF_WAITING; 1022 error = serialize_sleep(port, port->mpu_serialize, flags, 1023 "waitport", 0); 1024 /* see note at the top on the MSGPORTF_WAITING flag */ 1025 if (error) 1026 return(NULL); 1027 } 1028 _lwkt_pullmsg(port, msg); 1029 return(msg); 1030 } 1031 1032 static 1033 void 1034 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg) 1035 { 1036 KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0); 1037 ASSERT_SERIALIZED(port->mpu_serialize); 1038 1039 if (msg->ms_flags & MSGF_SYNC) { 1040 /* 1041 * If a synchronous completion has been requested, just wakeup 1042 * the message without bothering to queue it to the target port. 1043 */ 1044 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1045 wakeup(msg); 1046 } else { 1047 /* 1048 * If an asynchronous completion has been requested the message 1049 * must be queued to the reply port. 1050 */ 1051 _lwkt_enqueue_reply(port, msg); 1052 if (port->mp_flags & MSGPORTF_WAITING) { 1053 port->mp_flags &= ~MSGPORTF_WAITING; 1054 wakeup(port); 1055 } 1056 } 1057 } 1058 1059 /************************************************************************ 1060 * PANIC AND SPECIAL PORT FUNCTIONS * 1061 ************************************************************************/ 1062 1063 /* 1064 * You can point a port's reply vector at this function if you just want 1065 * the message marked done, without any queueing or signaling. This is 1066 * often used for structure-embedded messages. 1067 */ 1068 static 1069 void 1070 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg) 1071 { 1072 msg->ms_flags |= MSGF_DONE | MSGF_REPLY; 1073 } 1074 1075 static 1076 void * 1077 lwkt_panic_getport(lwkt_port_t port) 1078 { 1079 panic("lwkt_getport() illegal on port %p", port); 1080 } 1081 1082 static 1083 int 1084 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg) 1085 { 1086 panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg); 1087 } 1088 1089 static 1090 int 1091 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags) 1092 { 1093 panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg); 1094 } 1095 1096 static 1097 void * 1098 lwkt_panic_waitport(lwkt_port_t port, int flags) 1099 { 1100 panic("port %p cannot be waited on", port); 1101 } 1102 1103 static 1104 void 1105 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) 1106 { 1107 panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); 1108 } 1109 1110 static 1111 void 1112 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) 1113 { 1114 panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); 1115 } 1116