xref: /dragonfly/sys/kern/lwkt_msgport.c (revision e2f5ccfb)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.46 2008/05/18 20:57:56 nth Exp $
38  */
39 
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52 
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62 
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67 
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73 
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76 
77 /************************************************************************
78  *				MESSAGE FUNCTIONS			*
79  ************************************************************************/
80 
81 /*
82  * lwkt_sendmsg()
83  *
84  *	Request asynchronous completion and call lwkt_beginmsg().  The
85  *	target port can opt to execute the message synchronously or
86  *	asynchronously and this function will automatically queue the
87  *	response if the target executes the message synchronously.
88  *
89  *	NOTE: The message is in an indeterminant state until this call
90  *	returns.  The caller should not mess with it (e.g. try to abort it)
91  *	until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97 
98     KKASSERT(msg->ms_reply_port != NULL &&
99 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 	lwkt_replymsg(msg, error);
103     }
104 }
105 
106 /*
107  * lwkt_domsg()
108  *
109  *	Request asynchronous completion and call lwkt_beginmsg().  The
110  *	target port can opt to execute the message synchronously or
111  *	asynchronously and this function will automatically queue the
112  *	response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118 
119     KKASSERT(msg->ms_reply_port != NULL &&
120 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124 	error = lwkt_waitmsg(msg, flags);
125     } else {
126 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130 
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {
139     int error;
140 
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144 	lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148 
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165 
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177 	    msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181 
182 /************************************************************************
183  *			PORT INITIALIZATION API				*
184  ************************************************************************/
185 
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 
192 static void *lwkt_spin_getport(lwkt_port_t port);
193 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
194 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
195 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
196 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
197 
198 static void *lwkt_serialize_getport(lwkt_port_t port);
199 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
200 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
201 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
202 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
203 
204 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void *lwkt_panic_getport(lwkt_port_t port);
206 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
207 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
208 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
209 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
210 
211 /*
212  * Core port initialization (internal)
213  */
214 static __inline
215 void
216 _lwkt_initport(lwkt_port_t port,
217 	       void *(*gportfn)(lwkt_port_t),
218 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
219 	       int (*wmsgfn)(lwkt_msg_t, int),
220 	       void *(*wportfn)(lwkt_port_t, int),
221 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t))
222 {
223     bzero(port, sizeof(*port));
224     TAILQ_INIT(&port->mp_msgq);
225     port->mp_getport = gportfn;
226     port->mp_putport = pportfn;
227     port->mp_waitmsg =  wmsgfn;
228     port->mp_waitport =  wportfn;
229     port->mp_replyport = rportfn;
230 }
231 
232 /*
233  * lwkt_initport_thread()
234  *
235  *	Initialize a port for use by a particular thread.  The port may
236  *	only be used by <td>.
237  */
238 void
239 lwkt_initport_thread(lwkt_port_t port, thread_t td)
240 {
241     _lwkt_initport(port,
242 		   lwkt_thread_getport,
243 		   lwkt_thread_putport,
244 		   lwkt_thread_waitmsg,
245 		   lwkt_thread_waitport,
246 		   lwkt_thread_replyport);
247     port->mpu_td = td;
248 }
249 
250 /*
251  * lwkt_initport_spin()
252  *
253  *	Initialize a port for use with descriptors that might be accessed
254  *	via multiple LWPs, processes, or threads.  Has somewhat more
255  *	overhead then thread ports.
256  */
257 void
258 lwkt_initport_spin(lwkt_port_t port)
259 {
260     _lwkt_initport(port,
261 		   lwkt_spin_getport,
262 		   lwkt_spin_putport,
263 		   lwkt_spin_waitmsg,
264 		   lwkt_spin_waitport,
265 		   lwkt_spin_replyport);
266     spin_init(&port->mpu_spin);
267 }
268 
269 void
270 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
271 {
272     _lwkt_initport(port,
273 		   lwkt_serialize_getport,
274 		   lwkt_serialize_putport,
275 		   lwkt_serialize_waitmsg,
276 		   lwkt_serialize_waitport,
277 		   lwkt_serialize_replyport);
278     port->mpu_serialize = slz;
279 }
280 
281 /*
282  * Similar to the standard initport, this function simply marks the message
283  * as being done and does not attempt to return it to an originating port.
284  */
285 void
286 lwkt_initport_replyonly_null(lwkt_port_t port)
287 {
288     _lwkt_initport(port,
289 		   lwkt_panic_getport,
290 		   lwkt_panic_putport,
291 		   lwkt_panic_waitmsg,
292 		   lwkt_panic_waitport,
293 		   lwkt_null_replyport);
294 }
295 
296 /*
297  * Initialize a reply-only port, typically used as a message sink.  Such
298  * ports can only be used as a reply port.
299  */
300 void
301 lwkt_initport_replyonly(lwkt_port_t port,
302 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
303 {
304     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
305 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
306 			 rportfn);
307 }
308 
309 void
310 lwkt_initport_putonly(lwkt_port_t port,
311 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
312 {
313     _lwkt_initport(port, lwkt_panic_getport, pportfn,
314 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
315 			 lwkt_panic_replyport);
316 }
317 
318 void
319 lwkt_initport_panic(lwkt_port_t port)
320 {
321     _lwkt_initport(port,
322 		   lwkt_panic_getport, lwkt_panic_putport,
323 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
324 		   lwkt_panic_replyport);
325 }
326 
327 /*
328  * lwkt_getport()
329  *
330  *	Retrieve the next message from the port's message queue, return NULL
331  *	if no messages are pending.  The retrieved message will either be a
332  *	request or a reply based on the MSGF_REPLY bit.
333  *
334  *	The calling thread MUST own the port.
335  */
336 
337 static __inline
338 void
339 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
340 {
341     /*
342      * normal case, remove and return the message.
343      */
344     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
345     msg->ms_flags &= ~MSGF_QUEUED;
346 }
347 
348 /************************************************************************
349  *			THREAD PORT BACKEND				*
350  ************************************************************************
351  *
352  * This backend is used when the port a message is retrieved from is owned
353  * by a single thread (the calling thread).  Messages are IPId to the
354  * correct cpu before being enqueued to a port.  Note that this is fairly
355  * optimal since scheduling would have had to do an IPI anyway if the
356  * message were headed to a different cpu.
357  */
358 
359 #ifdef SMP
360 
361 /*
362  * This function completes reply processing for the default case in the
363  * context of the originating cpu.
364  */
365 static
366 void
367 lwkt_thread_replyport_remote(lwkt_msg_t msg)
368 {
369     lwkt_port_t port = msg->ms_reply_port;
370 
371     /*
372      * Chase any thread migration that occurs
373      */
374     if (port->mpu_td->td_gd != mycpu) {
375 	lwkt_send_ipiq(port->mpu_td->td_gd,
376 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
377 	return;
378     }
379 
380     /*
381      * Cleanup
382      */
383 #ifdef INVARIANTS
384     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
385     msg->ms_flags &= ~MSGF_INTRANSIT;
386 #endif
387     if (msg->ms_flags & MSGF_SYNC) {
388 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
389     } else {
390 	    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
391 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
392     }
393     if (port->mp_flags & MSGPORTF_WAITING)
394 	lwkt_schedule(port->mpu_td);
395 }
396 
397 #endif
398 
399 /*
400  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
401  *
402  * Called with the reply port as an argument but in the context of the
403  * original target port.  Completion must occur on the target port's
404  * cpu.
405  *
406  * The critical section protects us from IPIs on the this CPU.
407  */
408 void
409 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
410 {
411     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
412 
413     if (msg->ms_flags & MSGF_SYNC) {
414 	/*
415 	 * If a synchronous completion has been requested, just wakeup
416 	 * the message without bothering to queue it to the target port.
417 	 *
418 	 * Assume the target thread is non-preemptive, so no critical
419 	 * section is required.
420 	 */
421 #ifdef SMP
422 	if (port->mpu_td->td_gd == mycpu) {
423 #endif
424 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
425 	    if (port->mp_flags & MSGPORTF_WAITING)
426 		lwkt_schedule(port->mpu_td);
427 #ifdef SMP
428 	} else {
429 #ifdef INVARIANTS
430 	    msg->ms_flags |= MSGF_INTRANSIT;
431 #endif
432 	    msg->ms_flags |= MSGF_REPLY;
433 	    lwkt_send_ipiq(port->mpu_td->td_gd,
434 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
435 	}
436 #endif
437     } else {
438 	/*
439 	 * If an asynchronous completion has been requested the message
440 	 * must be queued to the reply port.
441 	 *
442 	 * A critical section is required to interlock the port queue.
443 	 */
444 #ifdef SMP
445 	if (port->mpu_td->td_gd == mycpu) {
446 #endif
447 	    crit_enter();
448 	    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
449 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
450 	    if (port->mp_flags & MSGPORTF_WAITING)
451 		lwkt_schedule(port->mpu_td);
452 	    crit_exit();
453 #ifdef SMP
454 	} else {
455 #ifdef INVARIANTS
456 	    msg->ms_flags |= MSGF_INTRANSIT;
457 #endif
458 	    msg->ms_flags |= MSGF_REPLY;
459 	    lwkt_send_ipiq(port->mpu_td->td_gd,
460 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
461 	}
462 #endif
463     }
464 }
465 
466 /*
467  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
468  *
469  * Called with the target port as an argument but in the context of the
470  * reply port.  This function always implements an asynchronous put to
471  * the target message port, and thus returns EASYNC.
472  *
473  * The message must already have cleared MSGF_DONE and MSGF_REPLY
474  */
475 
476 #ifdef SMP
477 
478 static
479 void
480 lwkt_thread_putport_remote(lwkt_msg_t msg)
481 {
482     lwkt_port_t port = msg->ms_target_port;
483 
484     /*
485      * Chase any thread migration that occurs
486      */
487     if (port->mpu_td->td_gd != mycpu) {
488 	lwkt_send_ipiq(port->mpu_td->td_gd,
489 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
490 	return;
491     }
492 
493     /*
494      * Cleanup
495      */
496 #ifdef INVARIANTS
497     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
498     msg->ms_flags &= ~MSGF_INTRANSIT;
499 #endif
500     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
501     msg->ms_flags |= MSGF_QUEUED;
502     if (port->mp_flags & MSGPORTF_WAITING)
503 	lwkt_schedule(port->mpu_td);
504 }
505 
506 #endif
507 
508 static
509 int
510 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
511 {
512     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
513 
514     msg->ms_target_port = port;
515 #ifdef SMP
516     if (port->mpu_td->td_gd == mycpu) {
517 #endif
518 	crit_enter();
519 	msg->ms_flags |= MSGF_QUEUED;
520 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
521 	if (port->mp_flags & MSGPORTF_WAITING)
522 	    lwkt_schedule(port->mpu_td);
523 	crit_exit();
524 #ifdef SMP
525     } else {
526 #ifdef INVARIANTS
527 	msg->ms_flags |= MSGF_INTRANSIT;
528 #endif
529 	lwkt_send_ipiq(port->mpu_td->td_gd,
530 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
531     }
532 #endif
533     return (EASYNC);
534 }
535 
536 /*
537  * lwkt_thread_getport()
538  *
539  *	Retrieve the next message from the port or NULL if no messages
540  *	are ready.
541  */
542 void *
543 lwkt_thread_getport(lwkt_port_t port)
544 {
545     lwkt_msg_t msg;
546 
547     KKASSERT(port->mpu_td == curthread);
548 
549     crit_enter_quick(port->mpu_td);
550     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
551 	_lwkt_pullmsg(port, msg);
552     crit_exit_quick(port->mpu_td);
553     return(msg);
554 }
555 
556 /*
557  * lwkt_thread_waitmsg()
558  *
559  *	Wait for a particular message to be replied.  We must be the only
560  *	thread waiting on the message.  The port must be owned by the
561  *	caller.
562  */
563 int
564 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
565 {
566     if ((msg->ms_flags & MSGF_DONE) == 0) {
567 	/*
568 	 * If the done bit was not set we have to block until it is.
569 	 */
570 	lwkt_port_t port = msg->ms_reply_port;
571 	thread_t td = curthread;
572 	int sentabort;
573 
574 	KKASSERT(port->mpu_td == td);
575 	crit_enter_quick(td);
576 	sentabort = 0;
577 
578 	while ((msg->ms_flags & MSGF_DONE) == 0) {
579 	    port->mp_flags |= MSGPORTF_WAITING;
580 	    if (sentabort == 0) {
581 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
582 		    lwkt_abortmsg(msg);
583 		}
584 	    } else {
585 		lwkt_sleep("waitabt", 0);
586 	    }
587 	    port->mp_flags &= ~MSGPORTF_WAITING;
588 	}
589 	if (msg->ms_flags & MSGF_QUEUED)
590 	    _lwkt_pullmsg(port, msg);
591 	crit_exit_quick(td);
592     } else {
593 	/*
594 	 * If the done bit was set we only have to mess around with the
595 	 * message if it is queued on the reply port.
596 	 */
597 	if (msg->ms_flags & MSGF_QUEUED) {
598 	    lwkt_port_t port = msg->ms_reply_port;
599 	    thread_t td = curthread;
600 
601 	    KKASSERT(port->mpu_td == td);
602 	    crit_enter_quick(td);
603 	    _lwkt_pullmsg(port, msg);
604 	    crit_exit_quick(td);
605 	}
606     }
607     return(msg->ms_error);
608 }
609 
610 void *
611 lwkt_thread_waitport(lwkt_port_t port, int flags)
612 {
613     thread_t td = curthread;
614     lwkt_msg_t msg;
615     int error;
616 
617     KKASSERT(port->mpu_td == td);
618     crit_enter_quick(td);
619     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
620 	port->mp_flags |= MSGPORTF_WAITING;
621 	error = lwkt_sleep("waitport", flags);
622 	port->mp_flags &= ~MSGPORTF_WAITING;
623 	if (error)
624 		goto done;
625     }
626     _lwkt_pullmsg(port, msg);
627 done:
628     crit_exit_quick(td);
629     return(msg);
630 }
631 
632 /************************************************************************
633  *			   SPIN PORT BACKEND				*
634  ************************************************************************
635  *
636  * This backend uses spinlocks instead of making assumptions about which
637  * thread is accessing the port.  It must be used when a port is not owned
638  * by a particular thread.  This is less optimal then thread ports but
639  * you don't have a choice if there are multiple threads accessing the port.
640  *
641  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
642  * on the message port, it is the responsibility of the code doing the
643  * wakeup to clear this flag rather then the blocked threads.  Some
644  * superfluous wakeups may occur, which is ok.
645  *
646  * XXX synchronous message wakeups are not current optimized.
647  */
648 
649 static
650 void *
651 lwkt_spin_getport(lwkt_port_t port)
652 {
653     lwkt_msg_t msg;
654 
655     spin_lock_wr(&port->mpu_spin);
656     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
657 	_lwkt_pullmsg(port, msg);
658     spin_unlock_wr(&port->mpu_spin);
659     return(msg);
660 }
661 
662 static
663 int
664 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
665 {
666     int dowakeup;
667 
668     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
669 
670     msg->ms_target_port = port;
671     spin_lock_wr(&port->mpu_spin);
672     msg->ms_flags |= MSGF_QUEUED;
673     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
674     dowakeup = 0;
675     if (port->mp_flags & MSGPORTF_WAITING) {
676 	port->mp_flags &= ~MSGPORTF_WAITING;
677 	dowakeup = 1;
678     }
679     spin_unlock_wr(&port->mpu_spin);
680     if (dowakeup)
681 	wakeup(port);
682     return (EASYNC);
683 }
684 
685 static
686 int
687 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
688 {
689     lwkt_port_t port;
690     int sentabort;
691     int error;
692 
693     if ((msg->ms_flags & MSGF_DONE) == 0) {
694 	port = msg->ms_reply_port;
695 	sentabort = 0;
696 	spin_lock_wr(&port->mpu_spin);
697 	while ((msg->ms_flags & MSGF_DONE) == 0) {
698 	    void *won;
699 
700 	    /*
701 	     * If message was sent synchronously from the beginning
702 	     * the wakeup will be on the message structure, else it
703 	     * will be on the port structure.
704 	     */
705 	    if (msg->ms_flags & MSGF_SYNC) {
706 		won = msg;
707 	    } else {
708 		won = port;
709 		port->mp_flags |= MSGPORTF_WAITING;
710 	    }
711 
712 	    /*
713 	     * Only messages which support abort can be interrupted.
714 	     * We must still wait for message completion regardless.
715 	     */
716 	    if ((flags & PCATCH) && sentabort == 0) {
717 		error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
718 		if (error) {
719 		    sentabort = error;
720 		    spin_unlock_wr(&port->mpu_spin);
721 		    lwkt_abortmsg(msg);
722 		    spin_lock_wr(&port->mpu_spin);
723 		}
724 	    } else {
725 		error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
726 	    }
727 	    /* see note at the top on the MSGPORTF_WAITING flag */
728 	}
729 	/*
730 	 * Turn EINTR into ERESTART if the signal indicates.
731 	 */
732 	if (sentabort && msg->ms_error == EINTR)
733 	    msg->ms_error = sentabort;
734 	if (msg->ms_flags & MSGF_QUEUED)
735 		_lwkt_pullmsg(port, msg);
736 	spin_unlock_wr(&port->mpu_spin);
737     } else {
738 	if (msg->ms_flags & MSGF_QUEUED) {
739 	    port = msg->ms_reply_port;
740 	    spin_lock_wr(&port->mpu_spin);
741 	    _lwkt_pullmsg(port, msg);
742 	    spin_unlock_wr(&port->mpu_spin);
743 	}
744     }
745     return(msg->ms_error);
746 }
747 
748 static
749 void *
750 lwkt_spin_waitport(lwkt_port_t port, int flags)
751 {
752     lwkt_msg_t msg;
753     int error;
754 
755     spin_lock_wr(&port->mpu_spin);
756     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
757 	port->mp_flags |= MSGPORTF_WAITING;
758 	error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
759 	/* see note at the top on the MSGPORTF_WAITING flag */
760 	if (error) {
761 	    spin_unlock_wr(&port->mpu_spin);
762 	    return(NULL);
763 	}
764     }
765     _lwkt_pullmsg(port, msg);
766     spin_unlock_wr(&port->mpu_spin);
767     return(msg);
768 }
769 
770 static
771 void
772 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
773 {
774     int dowakeup;
775 
776     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
777 
778     if (msg->ms_flags & MSGF_SYNC) {
779 	/*
780 	 * If a synchronous completion has been requested, just wakeup
781 	 * the message without bothering to queue it to the target port.
782 	 */
783 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
784 	wakeup(msg);
785     } else {
786 	/*
787 	 * If an asynchronous completion has been requested the message
788 	 * must be queued to the reply port.
789 	 */
790 	spin_lock_wr(&port->mpu_spin);
791 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
792 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
793 	dowakeup = 0;
794 	if (port->mp_flags & MSGPORTF_WAITING) {
795 	    port->mp_flags &= ~MSGPORTF_WAITING;
796 	    dowakeup = 1;
797 	}
798 	spin_unlock_wr(&port->mpu_spin);
799 	if (dowakeup)
800 	    wakeup(port);
801     }
802 }
803 
804 /************************************************************************
805  *			  SERIALIZER PORT BACKEND			*
806  ************************************************************************
807  *
808  * This backend uses serializer to protect port accessing.  Callers are
809  * assumed to have serializer held.  This kind of port is usually created
810  * by network device driver along with _one_ lwkt thread to pipeline
811  * operations which may temporarily release serializer.
812  *
813  * Implementation is based on SPIN PORT BACKEND.
814  */
815 
816 static
817 void *
818 lwkt_serialize_getport(lwkt_port_t port)
819 {
820     lwkt_msg_t msg;
821 
822     ASSERT_SERIALIZED(port->mpu_serialize);
823 
824     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
825 	_lwkt_pullmsg(port, msg);
826     return(msg);
827 }
828 
829 static
830 int
831 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
832 {
833     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
834     ASSERT_SERIALIZED(port->mpu_serialize);
835 
836     msg->ms_target_port = port;
837     msg->ms_flags |= MSGF_QUEUED;
838     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
839     if (port->mp_flags & MSGPORTF_WAITING) {
840 	port->mp_flags &= ~MSGPORTF_WAITING;
841 	wakeup(port);
842     }
843     return (EASYNC);
844 }
845 
846 static
847 int
848 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
849 {
850     lwkt_port_t port;
851     int sentabort;
852     int error;
853 
854     if ((msg->ms_flags & MSGF_DONE) == 0) {
855 	port = msg->ms_reply_port;
856 
857 	ASSERT_SERIALIZED(port->mpu_serialize);
858 
859 	sentabort = 0;
860 	while ((msg->ms_flags & MSGF_DONE) == 0) {
861 	    void *won;
862 
863 	    /*
864 	     * If message was sent synchronously from the beginning
865 	     * the wakeup will be on the message structure, else it
866 	     * will be on the port structure.
867 	     */
868 	    if (msg->ms_flags & MSGF_SYNC) {
869 		won = msg;
870 	    } else {
871 		won = port;
872 		port->mp_flags |= MSGPORTF_WAITING;
873 	    }
874 
875 	    /*
876 	     * Only messages which support abort can be interrupted.
877 	     * We must still wait for message completion regardless.
878 	     */
879 	    if ((flags & PCATCH) && sentabort == 0) {
880 		error = serialize_sleep(won, port->mpu_serialize, PCATCH,
881 					"waitmsg", 0);
882 		if (error) {
883 		    sentabort = error;
884 		    lwkt_serialize_exit(port->mpu_serialize);
885 		    lwkt_abortmsg(msg);
886 		    lwkt_serialize_enter(port->mpu_serialize);
887 		}
888 	    } else {
889 		error = serialize_sleep(won, port->mpu_serialize, 0,
890 					"waitmsg", 0);
891 	    }
892 	    /* see note at the top on the MSGPORTF_WAITING flag */
893 	}
894 	/*
895 	 * Turn EINTR into ERESTART if the signal indicates.
896 	 */
897 	if (sentabort && msg->ms_error == EINTR)
898 	    msg->ms_error = sentabort;
899 	if (msg->ms_flags & MSGF_QUEUED)
900 		_lwkt_pullmsg(port, msg);
901     } else {
902 	if (msg->ms_flags & MSGF_QUEUED) {
903 	    port = msg->ms_reply_port;
904 
905 	    ASSERT_SERIALIZED(port->mpu_serialize);
906 	    _lwkt_pullmsg(port, msg);
907 	}
908     }
909     return(msg->ms_error);
910 }
911 
912 static
913 void *
914 lwkt_serialize_waitport(lwkt_port_t port, int flags)
915 {
916     lwkt_msg_t msg;
917     int error;
918 
919     ASSERT_SERIALIZED(port->mpu_serialize);
920 
921     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
922 	port->mp_flags |= MSGPORTF_WAITING;
923 	error = serialize_sleep(port, port->mpu_serialize, flags,
924 				"waitport", 0);
925 	/* see note at the top on the MSGPORTF_WAITING flag */
926 	if (error)
927 	    return(NULL);
928     }
929     _lwkt_pullmsg(port, msg);
930     return(msg);
931 }
932 
933 static
934 void
935 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
936 {
937     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
938     ASSERT_SERIALIZED(port->mpu_serialize);
939 
940     if (msg->ms_flags & MSGF_SYNC) {
941 	/*
942 	 * If a synchronous completion has been requested, just wakeup
943 	 * the message without bothering to queue it to the target port.
944 	 */
945 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
946 	wakeup(msg);
947     } else {
948 	/*
949 	 * If an asynchronous completion has been requested the message
950 	 * must be queued to the reply port.
951 	 */
952 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
953 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
954 	if (port->mp_flags & MSGPORTF_WAITING) {
955 	    port->mp_flags &= ~MSGPORTF_WAITING;
956 	    wakeup(port);
957 	}
958     }
959 }
960 
961 /************************************************************************
962  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
963  ************************************************************************/
964 
965 /*
966  * You can point a port's reply vector at this function if you just want
967  * the message marked done, without any queueing or signaling.  This is
968  * often used for structure-embedded messages.
969  */
970 static
971 void
972 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
973 {
974     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
975 }
976 
977 static
978 void *
979 lwkt_panic_getport(lwkt_port_t port)
980 {
981     panic("lwkt_getport() illegal on port %p", port);
982 }
983 
984 static
985 int
986 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
987 {
988     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
989 }
990 
991 static
992 int
993 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
994 {
995     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
996 }
997 
998 static
999 void *
1000 lwkt_panic_waitport(lwkt_port_t port, int flags)
1001 {
1002     panic("port %p cannot be waited on", port);
1003 }
1004 
1005 static
1006 void
1007 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1008 {
1009     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1010 }
1011 
1012