xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 62f7f702)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.45 2008/03/05 13:03:29 sephe Exp $
38  */
39 
40 #ifdef _KERNEL
41 
42 #include <sys/param.h>
43 #include <sys/systm.h>
44 #include <sys/kernel.h>
45 #include <sys/proc.h>
46 #include <sys/rtprio.h>
47 #include <sys/queue.h>
48 #include <sys/sysctl.h>
49 #include <sys/kthread.h>
50 #include <sys/signalvar.h>
51 #include <sys/signal2.h>
52 #include <machine/cpu.h>
53 #include <sys/lock.h>
54 
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_kern.h>
58 #include <vm/vm_object.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_map.h>
61 #include <vm/vm_pager.h>
62 #include <vm/vm_extern.h>
63 #include <vm/vm_zone.h>
64 
65 #include <sys/thread2.h>
66 #include <sys/msgport2.h>
67 #include <sys/spinlock2.h>
68 #include <sys/serialize.h>
69 
70 #include <machine/stdarg.h>
71 #include <machine/cpufunc.h>
72 #ifdef SMP
73 #include <machine/smp.h>
74 #endif
75 
76 #include <sys/malloc.h>
77 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
78 
79 #else
80 
81 #include <sys/stdint.h>
82 #include <libcaps/thread.h>
83 #include <sys/thread.h>
84 #include <sys/msgport.h>
85 #include <sys/errno.h>
86 #include <libcaps/globaldata.h>
87 #include <machine/cpufunc.h>
88 #include <sys/thread2.h>
89 #include <sys/msgport2.h>
90 #include <string.h>
91 
92 #endif /* _KERNEL */
93 
94 
95 /************************************************************************
96  *				MESSAGE FUNCTIONS			*
97  ************************************************************************/
98 
99 /*
100  * lwkt_sendmsg()
101  *
102  *	Request asynchronous completion and call lwkt_beginmsg().  The
103  *	target port can opt to execute the message synchronously or
104  *	asynchronously and this function will automatically queue the
105  *	response if the target executes the message synchronously.
106  *
107  *	NOTE: The message is in an indeterminant state until this call
108  *	returns.  The caller should not mess with it (e.g. try to abort it)
109  *	until then.
110  */
111 void
112 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
113 {
114     int error;
115 
116     KKASSERT(msg->ms_reply_port != NULL &&
117 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
118     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
119     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
120 	lwkt_replymsg(msg, error);
121     }
122 }
123 
124 /*
125  * lwkt_domsg()
126  *
127  *	Request asynchronous completion and call lwkt_beginmsg().  The
128  *	target port can opt to execute the message synchronously or
129  *	asynchronously and this function will automatically queue the
130  *	response if the target executes the message synchronously.
131  */
132 int
133 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
134 {
135     int error;
136 
137     KKASSERT(msg->ms_reply_port != NULL &&
138 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
139     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
140     msg->ms_flags |= MSGF_SYNC;
141     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
142 	error = lwkt_waitmsg(msg, flags);
143     } else {
144 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
145     }
146     return(error);
147 }
148 
149 /*
150  * lwkt_forwardmsg()
151  *
152  * Forward a message received on one port to another port.
153  */
154 int
155 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
156 {
157     int error;
158 
159     crit_enter();
160     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
161     if ((error = port->mp_putport(port, msg)) != EASYNC)
162 	lwkt_replymsg(msg, error);
163     crit_exit();
164     return(error);
165 }
166 
167 /*
168  * lwkt_abortmsg()
169  *
170  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
171  * The caller must ensure that the message will not be both replied AND
172  * destroyed while the abort is in progress.
173  *
174  * This function issues a callback which might block!
175  */
176 void
177 lwkt_abortmsg(lwkt_msg_t msg)
178 {
179     /*
180      * A critical section protects us from reply IPIs on this cpu.
181      */
182     crit_enter();
183 
184     /*
185      * Shortcut the operation if the message has already been returned.
186      * The callback typically constructs a lwkt_msg with the abort request,
187      * issues it synchronously, and waits for completion.  The callback
188      * is not required to actually abort the message and the target port,
189      * upon receiving an abort request message generated by the callback
190      * should check whether the original message has already completed or
191      * not.
192      */
193     if (msg->ms_flags & MSGF_ABORTABLE) {
194 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
195 	    msg->ms_abortfn(msg);
196     }
197     crit_exit();
198 }
199 
200 /************************************************************************
201  *			PORT INITIALIZATION API				*
202  ************************************************************************/
203 
204 static void *lwkt_thread_getport(lwkt_port_t port);
205 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
206 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
207 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
208 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
209 
210 static void *lwkt_spin_getport(lwkt_port_t port);
211 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
212 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
213 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
214 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
215 
216 static void *lwkt_serialize_getport(lwkt_port_t port);
217 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
218 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
219 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
220 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
221 
222 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
223 static void *lwkt_panic_getport(lwkt_port_t port);
224 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
225 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
226 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
227 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
228 
229 /*
230  * Core port initialization (internal)
231  */
232 static __inline
233 void
234 _lwkt_initport(lwkt_port_t port,
235 	       void *(*gportfn)(lwkt_port_t),
236 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
237 	       int (*wmsgfn)(lwkt_msg_t, int),
238 	       void *(*wportfn)(lwkt_port_t, int),
239 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t))
240 {
241     bzero(port, sizeof(*port));
242     TAILQ_INIT(&port->mp_msgq);
243     port->mp_getport = gportfn;
244     port->mp_putport = pportfn;
245     port->mp_waitmsg =  wmsgfn;
246     port->mp_waitport =  wportfn;
247     port->mp_replyport = rportfn;
248 }
249 
250 /*
251  * lwkt_initport_thread()
252  *
253  *	Initialize a port for use by a particular thread.  The port may
254  *	only be used by <td>.
255  */
256 void
257 lwkt_initport_thread(lwkt_port_t port, thread_t td)
258 {
259     _lwkt_initport(port,
260 		   lwkt_thread_getport,
261 		   lwkt_thread_putport,
262 		   lwkt_thread_waitmsg,
263 		   lwkt_thread_waitport,
264 		   lwkt_thread_replyport);
265     port->mpu_td = td;
266 }
267 
268 /*
269  * lwkt_initport_spin()
270  *
271  *	Initialize a port for use with descriptors that might be accessed
272  *	via multiple LWPs, processes, or threads.  Has somewhat more
273  *	overhead then thread ports.
274  */
275 void
276 lwkt_initport_spin(lwkt_port_t port)
277 {
278     _lwkt_initport(port,
279 		   lwkt_spin_getport,
280 		   lwkt_spin_putport,
281 		   lwkt_spin_waitmsg,
282 		   lwkt_spin_waitport,
283 		   lwkt_spin_replyport);
284     spin_init(&port->mpu_spin);
285 }
286 
287 void
288 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
289 {
290     _lwkt_initport(port,
291 		   lwkt_serialize_getport,
292 		   lwkt_serialize_putport,
293 		   lwkt_serialize_waitmsg,
294 		   lwkt_serialize_waitport,
295 		   lwkt_serialize_replyport);
296     port->mpu_serialize = slz;
297 }
298 
299 /*
300  * Similar to the standard initport, this function simply marks the message
301  * as being done and does not attempt to return it to an originating port.
302  */
303 void
304 lwkt_initport_replyonly_null(lwkt_port_t port)
305 {
306     _lwkt_initport(port,
307 		   lwkt_panic_getport,
308 		   lwkt_panic_putport,
309 		   lwkt_panic_waitmsg,
310 		   lwkt_panic_waitport,
311 		   lwkt_null_replyport);
312 }
313 
314 /*
315  * Initialize a reply-only port, typically used as a message sink.  Such
316  * ports can only be used as a reply port.
317  */
318 void
319 lwkt_initport_replyonly(lwkt_port_t port,
320 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
321 {
322     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
323 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
324 			 rportfn);
325 }
326 
327 void
328 lwkt_initport_putonly(lwkt_port_t port,
329 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
330 {
331     _lwkt_initport(port, lwkt_panic_getport, pportfn,
332 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
333 			 lwkt_panic_replyport);
334 }
335 
336 void
337 lwkt_initport_panic(lwkt_port_t port)
338 {
339     _lwkt_initport(port,
340 		   lwkt_panic_getport, lwkt_panic_putport,
341 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
342 		   lwkt_panic_replyport);
343 }
344 
345 /*
346  * lwkt_getport()
347  *
348  *	Retrieve the next message from the port's message queue, return NULL
349  *	if no messages are pending.  The retrieved message will either be a
350  *	request or a reply based on the MSGF_REPLY bit.
351  *
352  *	The calling thread MUST own the port.
353  */
354 
355 static __inline
356 void
357 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
358 {
359     /*
360      * normal case, remove and return the message.
361      */
362     TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
363     msg->ms_flags &= ~MSGF_QUEUED;
364 }
365 
366 /************************************************************************
367  *			THREAD PORT BACKEND				*
368  ************************************************************************
369  *
370  * This backend is used when the port a message is retrieved from is owned
371  * by a single thread (the calling thread).  Messages are IPId to the
372  * correct cpu before being enqueued to a port.  Note that this is fairly
373  * optimal since scheduling would have had to do an IPI anyway if the
374  * message were headed to a different cpu.
375  */
376 
377 #ifdef SMP
378 
379 /*
380  * This function completes reply processing for the default case in the
381  * context of the originating cpu.
382  */
383 static
384 void
385 lwkt_thread_replyport_remote(lwkt_msg_t msg)
386 {
387     lwkt_port_t port = msg->ms_reply_port;
388 
389     /*
390      * Chase any thread migration that occurs
391      */
392     if (port->mpu_td->td_gd != mycpu) {
393 	lwkt_send_ipiq(port->mpu_td->td_gd,
394 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
395 	return;
396     }
397 
398     /*
399      * Cleanup
400      */
401 #ifdef INVARIANTS
402     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
403     msg->ms_flags &= ~MSGF_INTRANSIT;
404 #endif
405     if (msg->ms_flags & MSGF_SYNC) {
406 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
407     } else {
408 	    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
409 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
410     }
411     if (port->mp_flags & MSGPORTF_WAITING)
412 	lwkt_schedule(port->mpu_td);
413 }
414 
415 #endif
416 
417 /*
418  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
419  *
420  * Called with the reply port as an argument but in the context of the
421  * original target port.  Completion must occur on the target port's
422  * cpu.
423  *
424  * The critical section protects us from IPIs on the this CPU.
425  */
426 void
427 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
428 {
429     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
430 
431     if (msg->ms_flags & MSGF_SYNC) {
432 	/*
433 	 * If a synchronous completion has been requested, just wakeup
434 	 * the message without bothering to queue it to the target port.
435 	 *
436 	 * Assume the target thread is non-preemptive, so no critical
437 	 * section is required.
438 	 */
439 #ifdef SMP
440 	if (port->mpu_td->td_gd == mycpu) {
441 #endif
442 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
443 	    if (port->mp_flags & MSGPORTF_WAITING)
444 		lwkt_schedule(port->mpu_td);
445 #ifdef SMP
446 	} else {
447 #ifdef INVARIANTS
448 	    msg->ms_flags |= MSGF_INTRANSIT;
449 #endif
450 	    msg->ms_flags |= MSGF_REPLY;
451 	    lwkt_send_ipiq(port->mpu_td->td_gd,
452 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
453 	}
454 #endif
455     } else {
456 	/*
457 	 * If an asynchronous completion has been requested the message
458 	 * must be queued to the reply port.
459 	 *
460 	 * A critical section is required to interlock the port queue.
461 	 */
462 #ifdef SMP
463 	if (port->mpu_td->td_gd == mycpu) {
464 #endif
465 	    crit_enter();
466 	    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
467 	    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
468 	    if (port->mp_flags & MSGPORTF_WAITING)
469 		lwkt_schedule(port->mpu_td);
470 	    crit_exit();
471 #ifdef SMP
472 	} else {
473 #ifdef INVARIANTS
474 	    msg->ms_flags |= MSGF_INTRANSIT;
475 #endif
476 	    msg->ms_flags |= MSGF_REPLY;
477 	    lwkt_send_ipiq(port->mpu_td->td_gd,
478 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
479 	}
480 #endif
481     }
482 }
483 
484 /*
485  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
486  *
487  * Called with the target port as an argument but in the context of the
488  * reply port.  This function always implements an asynchronous put to
489  * the target message port, and thus returns EASYNC.
490  *
491  * The message must already have cleared MSGF_DONE and MSGF_REPLY
492  */
493 
494 #ifdef SMP
495 
496 static
497 void
498 lwkt_thread_putport_remote(lwkt_msg_t msg)
499 {
500     lwkt_port_t port = msg->ms_target_port;
501 
502     /*
503      * Chase any thread migration that occurs
504      */
505     if (port->mpu_td->td_gd != mycpu) {
506 	lwkt_send_ipiq(port->mpu_td->td_gd,
507 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
508 	return;
509     }
510 
511     /*
512      * Cleanup
513      */
514 #ifdef INVARIANTS
515     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
516     msg->ms_flags &= ~MSGF_INTRANSIT;
517 #endif
518     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
519     msg->ms_flags |= MSGF_QUEUED;
520     if (port->mp_flags & MSGPORTF_WAITING)
521 	lwkt_schedule(port->mpu_td);
522 }
523 
524 #endif
525 
526 static
527 int
528 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
529 {
530     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
531 
532     msg->ms_target_port = port;
533 #ifdef SMP
534     if (port->mpu_td->td_gd == mycpu) {
535 #endif
536 	crit_enter();
537 	msg->ms_flags |= MSGF_QUEUED;
538 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
539 	if (port->mp_flags & MSGPORTF_WAITING)
540 	    lwkt_schedule(port->mpu_td);
541 	crit_exit();
542 #ifdef SMP
543     } else {
544 #ifdef INVARIANTS
545 	msg->ms_flags |= MSGF_INTRANSIT;
546 #endif
547 	lwkt_send_ipiq(port->mpu_td->td_gd,
548 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
549     }
550 #endif
551     return (EASYNC);
552 }
553 
554 /*
555  * lwkt_thread_getport()
556  *
557  *	Retrieve the next message from the port or NULL if no messages
558  *	are ready.
559  */
560 void *
561 lwkt_thread_getport(lwkt_port_t port)
562 {
563     lwkt_msg_t msg;
564 
565     KKASSERT(port->mpu_td == curthread);
566 
567     crit_enter_quick(port->mpu_td);
568     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
569 	_lwkt_pullmsg(port, msg);
570     crit_exit_quick(port->mpu_td);
571     return(msg);
572 }
573 
574 /*
575  * lwkt_thread_waitmsg()
576  *
577  *	Wait for a particular message to be replied.  We must be the only
578  *	thread waiting on the message.  The port must be owned by the
579  *	caller.
580  */
581 int
582 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
583 {
584     if ((msg->ms_flags & MSGF_DONE) == 0) {
585 	/*
586 	 * If the done bit was not set we have to block until it is.
587 	 */
588 	lwkt_port_t port = msg->ms_reply_port;
589 	thread_t td = curthread;
590 	int sentabort;
591 
592 	KKASSERT(port->mpu_td == td);
593 	crit_enter_quick(td);
594 	sentabort = 0;
595 
596 	while ((msg->ms_flags & MSGF_DONE) == 0) {
597 	    port->mp_flags |= MSGPORTF_WAITING;
598 	    if (sentabort == 0) {
599 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
600 		    lwkt_abortmsg(msg);
601 		}
602 	    } else {
603 		lwkt_sleep("waitabt", 0);
604 	    }
605 	    port->mp_flags &= ~MSGPORTF_WAITING;
606 	}
607 	if (msg->ms_flags & MSGF_QUEUED)
608 	    _lwkt_pullmsg(port, msg);
609 	crit_exit_quick(td);
610     } else {
611 	/*
612 	 * If the done bit was set we only have to mess around with the
613 	 * message if it is queued on the reply port.
614 	 */
615 	if (msg->ms_flags & MSGF_QUEUED) {
616 	    lwkt_port_t port = msg->ms_reply_port;
617 	    thread_t td = curthread;
618 
619 	    KKASSERT(port->mpu_td == td);
620 	    crit_enter_quick(td);
621 	    _lwkt_pullmsg(port, msg);
622 	    crit_exit_quick(td);
623 	}
624     }
625     return(msg->ms_error);
626 }
627 
628 void *
629 lwkt_thread_waitport(lwkt_port_t port, int flags)
630 {
631     thread_t td = curthread;
632     lwkt_msg_t msg;
633     int error;
634 
635     KKASSERT(port->mpu_td == td);
636     crit_enter_quick(td);
637     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
638 	port->mp_flags |= MSGPORTF_WAITING;
639 	error = lwkt_sleep("waitport", flags);
640 	port->mp_flags &= ~MSGPORTF_WAITING;
641 	if (error)
642 		goto done;
643     }
644     _lwkt_pullmsg(port, msg);
645 done:
646     crit_exit_quick(td);
647     return(msg);
648 }
649 
650 /************************************************************************
651  *			   SPIN PORT BACKEND				*
652  ************************************************************************
653  *
654  * This backend uses spinlocks instead of making assumptions about which
655  * thread is accessing the port.  It must be used when a port is not owned
656  * by a particular thread.  This is less optimal then thread ports but
657  * you don't have a choice if there are multiple threads accessing the port.
658  *
659  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
660  * on the message port, it is the responsibility of the code doing the
661  * wakeup to clear this flag rather then the blocked threads.  Some
662  * superfluous wakeups may occur, which is ok.
663  *
664  * XXX synchronous message wakeups are not current optimized.
665  */
666 
667 static
668 void *
669 lwkt_spin_getport(lwkt_port_t port)
670 {
671     lwkt_msg_t msg;
672 
673     spin_lock_wr(&port->mpu_spin);
674     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
675 	_lwkt_pullmsg(port, msg);
676     spin_unlock_wr(&port->mpu_spin);
677     return(msg);
678 }
679 
680 static
681 int
682 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
683 {
684     int dowakeup;
685 
686     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
687 
688     msg->ms_target_port = port;
689     spin_lock_wr(&port->mpu_spin);
690     msg->ms_flags |= MSGF_QUEUED;
691     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
692     dowakeup = 0;
693     if (port->mp_flags & MSGPORTF_WAITING) {
694 	port->mp_flags &= ~MSGPORTF_WAITING;
695 	dowakeup = 1;
696     }
697     spin_unlock_wr(&port->mpu_spin);
698     if (dowakeup)
699 	wakeup(port);
700     return (EASYNC);
701 }
702 
703 static
704 int
705 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
706 {
707     lwkt_port_t port;
708     int sentabort;
709     int error;
710 
711     if ((msg->ms_flags & MSGF_DONE) == 0) {
712 	port = msg->ms_reply_port;
713 	sentabort = 0;
714 	spin_lock_wr(&port->mpu_spin);
715 	while ((msg->ms_flags & MSGF_DONE) == 0) {
716 	    void *won;
717 
718 	    /*
719 	     * If message was sent synchronously from the beginning
720 	     * the wakeup will be on the message structure, else it
721 	     * will be on the port structure.
722 	     */
723 	    if (msg->ms_flags & MSGF_SYNC) {
724 		won = msg;
725 	    } else {
726 		won = port;
727 		port->mp_flags |= MSGPORTF_WAITING;
728 	    }
729 
730 	    /*
731 	     * Only messages which support abort can be interrupted.
732 	     * We must still wait for message completion regardless.
733 	     */
734 	    if ((flags & PCATCH) && sentabort == 0) {
735 		error = msleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
736 		if (error) {
737 		    sentabort = error;
738 		    spin_unlock_wr(&port->mpu_spin);
739 		    lwkt_abortmsg(msg);
740 		    spin_lock_wr(&port->mpu_spin);
741 		}
742 	    } else {
743 		error = msleep(won, &port->mpu_spin, 0, "waitmsg", 0);
744 	    }
745 	    /* see note at the top on the MSGPORTF_WAITING flag */
746 	}
747 	/*
748 	 * Turn EINTR into ERESTART if the signal indicates.
749 	 */
750 	if (sentabort && msg->ms_error == EINTR)
751 	    msg->ms_error = sentabort;
752 	if (msg->ms_flags & MSGF_QUEUED)
753 		_lwkt_pullmsg(port, msg);
754 	spin_unlock_wr(&port->mpu_spin);
755     } else {
756 	if (msg->ms_flags & MSGF_QUEUED) {
757 	    port = msg->ms_reply_port;
758 	    spin_lock_wr(&port->mpu_spin);
759 	    _lwkt_pullmsg(port, msg);
760 	    spin_unlock_wr(&port->mpu_spin);
761 	}
762     }
763     return(msg->ms_error);
764 }
765 
766 static
767 void *
768 lwkt_spin_waitport(lwkt_port_t port, int flags)
769 {
770     lwkt_msg_t msg;
771     int error;
772 
773     spin_lock_wr(&port->mpu_spin);
774     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
775 	port->mp_flags |= MSGPORTF_WAITING;
776 	error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
777 	/* see note at the top on the MSGPORTF_WAITING flag */
778 	if (error) {
779 	    spin_unlock_wr(&port->mpu_spin);
780 	    return(NULL);
781 	}
782     }
783     _lwkt_pullmsg(port, msg);
784     spin_unlock_wr(&port->mpu_spin);
785     return(msg);
786 }
787 
788 static
789 void
790 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
791 {
792     int dowakeup;
793 
794     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
795 
796     if (msg->ms_flags & MSGF_SYNC) {
797 	/*
798 	 * If a synchronous completion has been requested, just wakeup
799 	 * the message without bothering to queue it to the target port.
800 	 */
801 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
802 	wakeup(msg);
803     } else {
804 	/*
805 	 * If an asynchronous completion has been requested the message
806 	 * must be queued to the reply port.
807 	 */
808 	spin_lock_wr(&port->mpu_spin);
809 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
810 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
811 	dowakeup = 0;
812 	if (port->mp_flags & MSGPORTF_WAITING) {
813 	    port->mp_flags &= ~MSGPORTF_WAITING;
814 	    dowakeup = 1;
815 	}
816 	spin_unlock_wr(&port->mpu_spin);
817 	if (dowakeup)
818 	    wakeup(port);
819     }
820 }
821 
822 /************************************************************************
823  *			  SERIALIZER PORT BACKEND			*
824  ************************************************************************
825  *
826  * This backend uses serializer to protect port accessing.  Callers are
827  * assumed to have serializer held.  This kind of port is usually created
828  * by network device driver along with _one_ lwkt thread to pipeline
829  * operations which may temporarily release serializer.
830  *
831  * Implementation is based on SPIN PORT BACKEND.
832  */
833 
834 static
835 void *
836 lwkt_serialize_getport(lwkt_port_t port)
837 {
838     lwkt_msg_t msg;
839 
840     ASSERT_SERIALIZED(port->mpu_serialize);
841 
842     if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
843 	_lwkt_pullmsg(port, msg);
844     return(msg);
845 }
846 
847 static
848 int
849 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
850 {
851     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
852     ASSERT_SERIALIZED(port->mpu_serialize);
853 
854     msg->ms_target_port = port;
855     msg->ms_flags |= MSGF_QUEUED;
856     TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
857     if (port->mp_flags & MSGPORTF_WAITING) {
858 	port->mp_flags &= ~MSGPORTF_WAITING;
859 	wakeup(port);
860     }
861     return (EASYNC);
862 }
863 
864 static
865 int
866 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
867 {
868     lwkt_port_t port;
869     int sentabort;
870     int error;
871 
872     if ((msg->ms_flags & MSGF_DONE) == 0) {
873 	port = msg->ms_reply_port;
874 
875 	ASSERT_SERIALIZED(port->mpu_serialize);
876 
877 	sentabort = 0;
878 	while ((msg->ms_flags & MSGF_DONE) == 0) {
879 	    void *won;
880 
881 	    /*
882 	     * If message was sent synchronously from the beginning
883 	     * the wakeup will be on the message structure, else it
884 	     * will be on the port structure.
885 	     */
886 	    if (msg->ms_flags & MSGF_SYNC) {
887 		won = msg;
888 	    } else {
889 		won = port;
890 		port->mp_flags |= MSGPORTF_WAITING;
891 	    }
892 
893 	    /*
894 	     * Only messages which support abort can be interrupted.
895 	     * We must still wait for message completion regardless.
896 	     */
897 	    if ((flags & PCATCH) && sentabort == 0) {
898 		error = serialize_sleep(won, port->mpu_serialize, PCATCH,
899 					"waitmsg", 0);
900 		if (error) {
901 		    sentabort = error;
902 		    lwkt_serialize_exit(port->mpu_serialize);
903 		    lwkt_abortmsg(msg);
904 		    lwkt_serialize_enter(port->mpu_serialize);
905 		}
906 	    } else {
907 		error = serialize_sleep(won, port->mpu_serialize, 0,
908 					"waitmsg", 0);
909 	    }
910 	    /* see note at the top on the MSGPORTF_WAITING flag */
911 	}
912 	/*
913 	 * Turn EINTR into ERESTART if the signal indicates.
914 	 */
915 	if (sentabort && msg->ms_error == EINTR)
916 	    msg->ms_error = sentabort;
917 	if (msg->ms_flags & MSGF_QUEUED)
918 		_lwkt_pullmsg(port, msg);
919     } else {
920 	if (msg->ms_flags & MSGF_QUEUED) {
921 	    port = msg->ms_reply_port;
922 
923 	    ASSERT_SERIALIZED(port->mpu_serialize);
924 	    _lwkt_pullmsg(port, msg);
925 	}
926     }
927     return(msg->ms_error);
928 }
929 
930 static
931 void *
932 lwkt_serialize_waitport(lwkt_port_t port, int flags)
933 {
934     lwkt_msg_t msg;
935     int error;
936 
937     ASSERT_SERIALIZED(port->mpu_serialize);
938 
939     while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
940 	port->mp_flags |= MSGPORTF_WAITING;
941 	error = serialize_sleep(port, port->mpu_serialize, flags,
942 				"waitport", 0);
943 	/* see note at the top on the MSGPORTF_WAITING flag */
944 	if (error)
945 	    return(NULL);
946     }
947     _lwkt_pullmsg(port, msg);
948     return(msg);
949 }
950 
951 static
952 void
953 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
954 {
955     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
956     ASSERT_SERIALIZED(port->mpu_serialize);
957 
958     if (msg->ms_flags & MSGF_SYNC) {
959 	/*
960 	 * If a synchronous completion has been requested, just wakeup
961 	 * the message without bothering to queue it to the target port.
962 	 */
963 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
964 	wakeup(msg);
965     } else {
966 	/*
967 	 * If an asynchronous completion has been requested the message
968 	 * must be queued to the reply port.
969 	 */
970 	TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
971 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
972 	if (port->mp_flags & MSGPORTF_WAITING) {
973 	    port->mp_flags &= ~MSGPORTF_WAITING;
974 	    wakeup(port);
975 	}
976     }
977 }
978 
979 /************************************************************************
980  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
981  ************************************************************************/
982 
983 /*
984  * You can point a port's reply vector at this function if you just want
985  * the message marked done, without any queueing or signaling.  This is
986  * often used for structure-embedded messages.
987  */
988 static
989 void
990 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
991 {
992     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
993 }
994 
995 static
996 void *
997 lwkt_panic_getport(lwkt_port_t port)
998 {
999     panic("lwkt_getport() illegal on port %p", port);
1000 }
1001 
1002 static
1003 int
1004 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1005 {
1006     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1007 }
1008 
1009 static
1010 int
1011 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1012 {
1013     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1014 }
1015 
1016 static
1017 void *
1018 lwkt_panic_waitport(lwkt_port_t port, int flags)
1019 {
1020     panic("port %p cannot be waited on", port);
1021 }
1022 
1023 static
1024 void
1025 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1026 {
1027     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1028 }
1029 
1030