xref: /dragonfly/sys/kern/lwkt_msgport.c (revision b7367ef6)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.44 2007/07/04 19:40:35 dillon Exp $
38  */
39 
40 #ifdef _KERNEL
41 
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/proc.h>
46 #include <sys/rtprio.h>
47 #include <sys/queue.h>
48 #include <sys/sysctl.h>
49 #include <sys/kthread.h>
50 #include <sys/signalvar.h>
51 #include <sys/signal2.h>
52 #include <machine/cpu.h>
53 #include <sys/lock.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_kern.h>
58 #include <vm/vm_object.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_pager.h>
62 #include <vm/vm_extern.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/thread2.h>
66 #include <sys/msgport2.h>
67 #include <sys/spinlock2.h>
68 
69 #include <machine/stdarg.h>
70 #include <machine/cpufunc.h>
71 #ifdef SMP
72 #include <machine/smp.h>
73 #endif
74 
75 #include <sys/malloc.h>
76 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
77 
78 #else
79 
80 #include <sys/stdint.h>
81 #include <libcaps/thread.h>
82 #include <sys/thread.h>
83 #include <sys/msgport.h>
84 #include <sys/errno.h>
85 #include <libcaps/globaldata.h>
86 #include <machine/cpufunc.h>
87 #include <sys/thread2.h>
88 #include <sys/msgport2.h>
89 #include <string.h>
90 
91 #endif /* _KERNEL */
92 
93 
94 /************************************************************************
95  *				MESSAGE FUNCTIONS			*
96  ************************************************************************/
97 
98 /*
99  * lwkt_sendmsg()
100  *
101  *	Request asynchronous completion and call lwkt_beginmsg().  The
102  *	target port can opt to execute the message synchronously or
103  *	asynchronously and this function will automatically queue the
104  *	response if the target executes the message synchronously.
105  *
106  *	NOTE: The message is in an indeterminant state until this call
107  *	returns.  The caller should not mess with it (e.g. try to abort it)
108  *	until then.
109  */
110 void
111 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
112 {
113     int error;
114 
115     KKASSERT(msg->ms_reply_port != NULL &&
116 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
117     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
118     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
119 	lwkt_replymsg(msg, error);
120     }
121 }
122 
123 /*
124  * lwkt_domsg()
125  *
126  *	Request asynchronous completion and call lwkt_beginmsg().  The
127  *	target port can opt to execute the message synchronously or
128  *	asynchronously and this function will automatically queue the
129  *	response if the target executes the message synchronously.
130  */
131 int
132 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
133 {
134     int error;
135 
136     KKASSERT(msg->ms_reply_port != NULL &&
137 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
138     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
139     msg->ms_flags |= MSGF_SYNC;
140     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
141 	error = lwkt_waitmsg(msg, flags);
142     } else {
143 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
144     }
145     return(error);
146 }
147 
148 /*
149  * lwkt_forwardmsg()
150  *
151  * Forward a message received on one port to another port.
152  */
153 int
154 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
155 {
156     int error;
157 
158     crit_enter();
159     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
160     if ((error = port->mp_putport(port, msg)) != EASYNC)
161 	lwkt_replymsg(msg, error);
162     crit_exit();
163     return(error);
164 }
165 
166 /*
167  * lwkt_abortmsg()
168  *
169  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
170  * The caller must ensure that the message will not be both replied AND
171  * destroyed while the abort is in progress.
172  *
173  * This function issues a callback which might block!
174  */
175 void
176 lwkt_abortmsg(lwkt_msg_t msg)
177 {
178     /*
179      * A critical section protects us from reply IPIs on this cpu.
180      */
181     crit_enter();
182 
183     /*
184      * Shortcut the operation if the message has already been returned.
185      * The callback typically constructs a lwkt_msg with the abort request,
186      * issues it synchronously, and waits for completion.  The callback
187      * is not required to actually abort the message and the target port,
188      * upon receiving an abort request message generated by the callback
189      * should check whether the original message has already completed or
190      * not.
191      */
192     if (msg->ms_flags & MSGF_ABORTABLE) {
193 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
194 	    msg->ms_abortfn(msg);
195     }
196     crit_exit();
197 }
198 
199 /************************************************************************
200  *			PORT INITIALIZATION API				*
201  ************************************************************************/
202 
203 static void *lwkt_thread_getport(lwkt_port_t port);
204 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
205 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
206 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
207 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
208 
209 static void *lwkt_spin_getport(lwkt_port_t port);
210 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
211 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
212 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
213 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 
215 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
216 static void *lwkt_panic_getport(lwkt_port_t port);
217 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
218 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
219 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
220 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
221 
222 /*
223  * Core port initialization (internal)
224  */
225 static __inline
226 void
227 _lwkt_initport(lwkt_port_t port,
228 	       void *(*gportfn)(lwkt_port_t),
229 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
230 	       int (*wmsgfn)(lwkt_msg_t, int),
231 	       void *(*wportfn)(lwkt_port_t, int),
232 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     port->mp_getport = gportfn;
237     port->mp_putport = pportfn;
238     port->mp_waitmsg =  wmsgfn;
239     port->mp_waitport =  wportfn;
240     port->mp_replyport = rportfn;
241 }
242 
243 /*
244  * lwkt_initport_thread()
245  *
246  *	Initialize a port for use by a particular thread.  The port may
247  *	only be used by <td>.
248  */
249 void
250 lwkt_initport_thread(lwkt_port_t port, thread_t td)
251 {
252     _lwkt_initport(port,
253 		   lwkt_thread_getport,
254 		   lwkt_thread_putport,
255 		   lwkt_thread_waitmsg,
256 		   lwkt_thread_waitport,
257 		   lwkt_thread_replyport);
258     port->mpu_td = td;
259 }
260 
261 /*
262  * lwkt_initport_spin()
263  *
264  *	Initialize a port for use with descriptors that might be accessed
265  *	via multiple LWPs, processes, or threads.  Has somewhat more
266  *	overhead then thread ports.
267  */
268 void
269 lwkt_initport_spin(lwkt_port_t port)
270 {
271     _lwkt_initport(port,
272 		   lwkt_spin_getport,
273 		   lwkt_spin_putport,
274 		   lwkt_spin_waitmsg,
275 		   lwkt_spin_waitport,
276 		   lwkt_spin_replyport);
277     spin_init(&port->mpu_spin);
278 }
279 
280 /*
281  * Similar to the standard initport, this function simply marks the message
282  * as being done and does not attempt to return it to an originating port.
283  */
284 void
285 lwkt_initport_replyonly_null(lwkt_port_t port)
286 {
287     _lwkt_initport(port,
288 		   lwkt_panic_getport,
289 		   lwkt_panic_putport,
290 		   lwkt_panic_waitmsg,
291 		   lwkt_panic_waitport,
292 		   lwkt_null_replyport);
293 }
294 
295 /*
296  * Initialize a reply-only port, typically used as a message sink.  Such
297  * ports can only be used as a reply port.
298  */
299 void
300 lwkt_initport_replyonly(lwkt_port_t port,
301 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
302 {
303     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
304 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
305 			 rportfn);
306 }
307 
308 void
309 lwkt_initport_putonly(lwkt_port_t port,
310 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
311 {
312     _lwkt_initport(port, lwkt_panic_getport, pportfn,
313 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
314 			 lwkt_panic_replyport);
315 }
316 
317 void
318 lwkt_initport_panic(lwkt_port_t port)
319 {
320     _lwkt_initport(port,
321 		   lwkt_panic_getport, lwkt_panic_putport,
322 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
323 		   lwkt_panic_replyport);
324 }
325 
326 /*
327  * lwkt_getport()
328  *
329  *	Retrieve the next message from the port's message queue, return NULL
330  *	if no messages are pending.  The retrieved message will either be a
331  *	request or a reply based on the MSGF_REPLY bit.
332  *
333  *	The calling thread MUST own the port.
334  */
335 
336 static __inline
337 void
338 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
339 {
340     /*
341      * normal case, remove and return the message.
342      */
343     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
344     msg->ms_flags &= ~MSGF_QUEUED;
345 }
346 
347 /************************************************************************
348  *			THREAD PORT BACKEND				*
349  ************************************************************************
350  *
351  * This backend is used when the port a message is retrieved from is owned
352  * by a single thread (the calling thread).  Messages are IPId to the
353  * correct cpu before being enqueued to a port.  Note that this is fairly
354  * optimal since scheduling would have had to do an IPI anyway if the
355  * message were headed to a different cpu.
356  */
357 
358 #ifdef SMP
359 
360 /*
361  * This function completes reply processing for the default case in the
362  * context of the originating cpu.
363  */
364 static
365 void
366 lwkt_thread_replyport_remote(lwkt_msg_t msg)
367 {
368     lwkt_port_t port = msg->ms_reply_port;
369 
370     /*
371      * Chase any thread migration that occurs
372      */
373     if (port->mpu_td->td_gd != mycpu) {
374 	lwkt_send_ipiq(port->mpu_td->td_gd,
375 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
376 	return;
377     }
378 
379     /*
380      * Cleanup
381      */
382 #ifdef INVARIANTS
383     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
384     msg->ms_flags &= ~MSGF_INTRANSIT;
385 #endif
386     if (msg->ms_flags & MSGF_SYNC) {
387 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
388     } else {
389 	    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
390 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
391     }
392     if (port->mp_flags & MSGPORTF_WAITING)
393 	lwkt_schedule(port->mpu_td);
394 }
395 
396 #endif
397 
398 /*
399  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
400  *
401  * Called with the reply port as an argument but in the context of the
402  * original target port.  Completion must occur on the target port's
403  * cpu.
404  *
405  * The critical section protects us from IPIs on the this CPU.
406  */
407 void
408 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
409 {
410     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
411 
412     if (msg->ms_flags & MSGF_SYNC) {
413 	/*
414 	 * If a synchronous completion has been requested, just wakeup
415 	 * the message without bothering to queue it to the target port.
416 	 *
417 	 * Assume the target thread is non-preemptive, so no critical
418 	 * section is required.
419 	 */
420 #ifdef SMP
421 	if (port->mpu_td->td_gd == mycpu) {
422 #endif
423 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
424 	    if (port->mp_flags & MSGPORTF_WAITING)
425 		lwkt_schedule(port->mpu_td);
426 #ifdef SMP
427 	} else {
428 #ifdef INVARIANTS
429 	    msg->ms_flags |= MSGF_INTRANSIT;
430 #endif
431 	    msg->ms_flags |= MSGF_REPLY;
432 	    lwkt_send_ipiq(port->mpu_td->td_gd,
433 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
434 	}
435 #endif
436     } else {
437 	/*
438 	 * If an asynchronous completion has been requested the message
439 	 * must be queued to the reply port.
440 	 *
441 	 * A critical section is required to interlock the port queue.
442 	 */
443 #ifdef SMP
444 	if (port->mpu_td->td_gd == mycpu) {
445 #endif
446 	    crit_enter();
447 	    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
448 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
449 	    if (port->mp_flags & MSGPORTF_WAITING)
450 		lwkt_schedule(port->mpu_td);
451 	    crit_exit();
452 #ifdef SMP
453 	} else {
454 #ifdef INVARIANTS
455 	    msg->ms_flags |= MSGF_INTRANSIT;
456 #endif
457 	    msg->ms_flags |= MSGF_REPLY;
458 	    lwkt_send_ipiq(port->mpu_td->td_gd,
459 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
460 	}
461 #endif
462     }
463 }
464 
465 /*
466  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
467  *
468  * Called with the target port as an argument but in the context of the
469  * reply port.  This function always implements an asynchronous put to
470  * the target message port, and thus returns EASYNC.
471  *
472  * The message must already have cleared MSGF_DONE and MSGF_REPLY
473  */
474 
475 #ifdef SMP
476 
477 static
478 void
479 lwkt_thread_putport_remote(lwkt_msg_t msg)
480 {
481     lwkt_port_t port = msg->ms_target_port;
482 
483     /*
484      * Chase any thread migration that occurs
485      */
486     if (port->mpu_td->td_gd != mycpu) {
487 	lwkt_send_ipiq(port->mpu_td->td_gd,
488 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
489 	return;
490     }
491 
492     /*
493      * Cleanup
494      */
495 #ifdef INVARIANTS
496     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
497     msg->ms_flags &= ~MSGF_INTRANSIT;
498 #endif
499     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
500     msg->ms_flags |= MSGF_QUEUED;
501     if (port->mp_flags & MSGPORTF_WAITING)
502 	lwkt_schedule(port->mpu_td);
503 }
504 
505 #endif
506 
507 static
508 int
509 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
510 {
511     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
512 
513     msg->ms_target_port = port;
514 #ifdef SMP
515     if (port->mpu_td->td_gd == mycpu) {
516 #endif
517 	crit_enter();
518 	msg->ms_flags |= MSGF_QUEUED;
519 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
520 	if (port->mp_flags & MSGPORTF_WAITING)
521 	    lwkt_schedule(port->mpu_td);
522 	crit_exit();
523 #ifdef SMP
524     } else {
525 #ifdef INVARIANTS
526 	msg->ms_flags |= MSGF_INTRANSIT;
527 #endif
528 	lwkt_send_ipiq(port->mpu_td->td_gd,
529 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
530     }
531 #endif
532     return (EASYNC);
533 }
534 
535 /*
536  * lwkt_thread_getport()
537  *
538  *	Retrieve the next message from the port or NULL if no messages
539  *	are ready.
540  */
541 void *
542 lwkt_thread_getport(lwkt_port_t port)
543 {
544     lwkt_msg_t msg;
545 
546     KKASSERT(port->mpu_td == curthread);
547 
548     crit_enter_quick(port->mpu_td);
549     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
550 	_lwkt_pullmsg(port, msg);
551     crit_exit_quick(port->mpu_td);
552     return(msg);
553 }
554 
555 /*
556  * lwkt_thread_waitmsg()
557  *
558  *	Wait for a particular message to be replied.  We must be the only
559  *	thread waiting on the message.  The port must be owned by the
560  *	caller.
561  */
562 int
563 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
564 {
565     if ((msg->ms_flags & MSGF_DONE) == 0) {
566 	/*
567 	 * If the done bit was not set we have to block until it is.
568 	 */
569 	lwkt_port_t port = msg->ms_reply_port;
570 	thread_t td = curthread;
571 	int sentabort;
572 
573 	KKASSERT(port->mpu_td == td);
574 	crit_enter_quick(td);
575 	sentabort = 0;
576 
577 	while ((msg->ms_flags & MSGF_DONE) == 0) {
578 	    port->mp_flags |= MSGPORTF_WAITING;
579 	    if (sentabort == 0) {
580 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
581 		    lwkt_abortmsg(msg);
582 		}
583 	    } else {
584 		lwkt_sleep("waitabt", 0);
585 	    }
586 	    port->mp_flags &= ~MSGPORTF_WAITING;
587 	}
588 	if (msg->ms_flags & MSGF_QUEUED)
589 	    _lwkt_pullmsg(port, msg);
590 	crit_exit_quick(td);
591     } else {
592 	/*
593 	 * If the done bit was set we only have to mess around with the
594 	 * message if it is queued on the reply port.
595 	 */
596 	if (msg->ms_flags & MSGF_QUEUED) {
597 	    lwkt_port_t port = msg->ms_reply_port;
598 	    thread_t td = curthread;
599 
600 	    KKASSERT(port->mpu_td == td);
601 	    crit_enter_quick(td);
602 	    _lwkt_pullmsg(port, msg);
603 	    crit_exit_quick(td);
604 	}
605     }
606     return(msg->ms_error);
607 }
608 
609 void *
610 lwkt_thread_waitport(lwkt_port_t port, int flags)
611 {
612     thread_t td = curthread;
613     lwkt_msg_t msg;
614     int error;
615 
616     KKASSERT(port->mpu_td == td);
617     crit_enter_quick(td);
618     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
619 	port->mp_flags |= MSGPORTF_WAITING;
620 	error = lwkt_sleep("waitport", flags);
621 	port->mp_flags &= ~MSGPORTF_WAITING;
622 	if (error)
623 		goto done;
624     }
625     _lwkt_pullmsg(port, msg);
626 done:
627     crit_exit_quick(td);
628     return(msg);
629 }
630 
631 /************************************************************************
632  *			   SPIN PORT BACKEND				*
633  ************************************************************************
634  *
635  * This backend uses spinlocks instead of making assumptions about which
636  * thread is accessing the port.  It must be used when a port is not owned
637  * by a particular thread.  This is less optimal then thread ports but
638  * you don't have a choice if there are multiple threads accessing the port.
639  *
640  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
641  * on the message port, it is the responsibility of the code doing the
642  * wakeup to clear this flag rather then the blocked threads.  Some
643  * superfluous wakeups may occur, which is ok.
644  *
645  * XXX synchronous message wakeups are not current optimized.
646  */
647 
648 static
649 void *
650 lwkt_spin_getport(lwkt_port_t port)
651 {
652     lwkt_msg_t msg;
653 
654     spin_lock_wr(&port->mpu_spin);
655     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
656 	_lwkt_pullmsg(port, msg);
657     spin_unlock_wr(&port->mpu_spin);
658     return(msg);
659 }
660 
661 static
662 int
663 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
664 {
665     int dowakeup;
666 
667     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
668 
669     msg->ms_target_port = port;
670     spin_lock_wr(&port->mpu_spin);
671     msg->ms_flags |= MSGF_QUEUED;
672     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
673     dowakeup = 0;
674     if (port->mp_flags & MSGPORTF_WAITING) {
675 	port->mp_flags &= ~MSGPORTF_WAITING;
676 	dowakeup = 1;
677     }
678     spin_unlock_wr(&port->mpu_spin);
679     if (dowakeup)
680 	wakeup(port);
681     return (EASYNC);
682 }
683 
684 static
685 int
686 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
687 {
688     lwkt_port_t port;
689     int sentabort;
690     int error;
691 
692     if ((msg->ms_flags & MSGF_DONE) == 0) {
693 	port = msg->ms_reply_port;
694 	sentabort = 0;
695 	spin_lock_wr(&port->mpu_spin);
696 	while ((msg->ms_flags & MSGF_DONE) == 0) {
697 	    void *won;
698 
699 	    /*
700 	     * If message was sent synchronously from the beginning
701 	     * the wakeup will be on the message structure, else it
702 	     * will be on the port structure.
703 	     */
704 	    if (msg->ms_flags & MSGF_SYNC) {
705 		won = msg;
706 	    } else {
707 		won = port;
708 		port->mp_flags |= MSGPORTF_WAITING;
709 	    }
710 
711 	    /*
712 	     * Only messages which support abort can be interrupted.
713 	     * We must still wait for message completion regardless.
714 	     */
715 	    if ((flags & PCATCH) && sentabort == 0) {
716 		error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
717 		if (error) {
718 		    sentabort = error;
719 		    spin_unlock_wr(&port->mpu_spin);
720 		    lwkt_abortmsg(msg);
721 		    spin_lock_wr(&port->mpu_spin);
722 		}
723 	    } else {
724 		error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
725 	    }
726 	    /* see note at the top on the MSGPORTF_WAITING flag */
727 	}
728 	/*
729 	 * Turn EINTR into ERESTART if the signal indicates.
730 	 */
731 	if (sentabort && msg->ms_error == EINTR)
732 	    msg->ms_error = sentabort;
733 	if (msg->ms_flags & MSGF_QUEUED)
734 		_lwkt_pullmsg(port, msg);
735 	spin_unlock_wr(&port->mpu_spin);
736     } else {
737 	if (msg->ms_flags & MSGF_QUEUED) {
738 	    port = msg->ms_reply_port;
739 	    spin_lock_wr(&port->mpu_spin);
740 	    _lwkt_pullmsg(port, msg);
741 	    spin_unlock_wr(&port->mpu_spin);
742 	}
743     }
744     return(msg->ms_error);
745 }
746 
747 static
748 void *
749 lwkt_spin_waitport(lwkt_port_t port, int flags)
750 {
751     lwkt_msg_t msg;
752     int error;
753 
754     spin_lock_wr(&port->mpu_spin);
755     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
756 	port->mp_flags |= MSGPORTF_WAITING;
757 	error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
758 	/* see note at the top on the MSGPORTF_WAITING flag */
759 	if (error) {
760 	    spin_unlock_wr(&port->mpu_spin);
761 	    return(NULL);
762 	}
763     }
764     _lwkt_pullmsg(port, msg);
765     spin_unlock_wr(&port->mpu_spin);
766     return(msg);
767 }
768 
769 static
770 void
771 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
772 {
773     int dowakeup;
774 
775     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
776 
777     if (msg->ms_flags & MSGF_SYNC) {
778 	/*
779 	 * If a synchronous completion has been requested, just wakeup
780 	 * the message without bothering to queue it to the target port.
781 	 */
782 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
783 	wakeup(msg);
784     } else {
785 	/*
786 	 * If an asynchronous completion has been requested the message
787 	 * must be queued to the reply port.
788 	 */
789 	spin_lock_wr(&port->mpu_spin);
790 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
791 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
792 	dowakeup = 0;
793 	if (port->mp_flags & MSGPORTF_WAITING) {
794 	    port->mp_flags &= ~MSGPORTF_WAITING;
795 	    dowakeup = 1;
796 	}
797 	spin_unlock_wr(&port->mpu_spin);
798 	if (dowakeup)
799 	    wakeup(port);
800     }
801 }
802 
803 /************************************************************************
804  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
805  ************************************************************************/
806 
807 /*
808  * You can point a port's reply vector at this function if you just want
809  * the message marked done, without any queueing or signaling.  This is
810  * often used for structure-embedded messages.
811  */
812 static
813 void
814 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
815 {
816     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
817 }
818 
819 static
820 void *
821 lwkt_panic_getport(lwkt_port_t port)
822 {
823     panic("lwkt_getport() illegal on port %p", port);
824 }
825 
826 static
827 int
828 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
829 {
830     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
831 }
832 
833 static
834 int
835 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
836 {
837     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
838 }
839 
840 static
841 void *
842 lwkt_panic_waitport(lwkt_port_t port, int flags)
843 {
844     panic("port %p cannot be waited on", port);
845 }
846 
847 static
848 void
849 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
850 {
851     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
852 }
853 
854