xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 3bafb5c1)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
38  */
39 
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52 
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62 
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67 
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73 
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76 
77 /************************************************************************
78  *				MESSAGE FUNCTIONS			*
79  ************************************************************************/
80 
81 /*
82  * lwkt_sendmsg()
83  *
84  *	Request asynchronous completion and call lwkt_beginmsg().  The
85  *	target port can opt to execute the message synchronously or
86  *	asynchronously and this function will automatically queue the
87  *	response if the target executes the message synchronously.
88  *
89  *	NOTE: The message is in an indeterminant state until this call
90  *	returns.  The caller should not mess with it (e.g. try to abort it)
91  *	until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97 
98     KKASSERT(msg->ms_reply_port != NULL &&
99 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 	lwkt_replymsg(msg, error);
103     }
104 }
105 
106 /*
107  * lwkt_domsg()
108  *
109  *	Request asynchronous completion and call lwkt_beginmsg().  The
110  *	target port can opt to execute the message synchronously or
111  *	asynchronously and this function will automatically queue the
112  *	response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118 
119     KKASSERT(msg->ms_reply_port != NULL &&
120 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124 	error = lwkt_waitmsg(msg, flags);
125     } else {
126 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130 
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {
139     int error;
140 
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144 	lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148 
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165 
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177 	    msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181 
182 /************************************************************************
183  *			PORT INITIALIZATION API				*
184  ************************************************************************/
185 
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
192 
193 static void *lwkt_spin_getport(lwkt_port_t port);
194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 
199 static void *lwkt_serialize_getport(lwkt_port_t port);
200 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
202 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
203 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204 
205 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
206 static void *lwkt_panic_getport(lwkt_port_t port);
207 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
208 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
209 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
210 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
211 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
212 
213 /*
214  * Core port initialization (internal)
215  */
216 static __inline
217 void
218 _lwkt_initport(lwkt_port_t port,
219 	       void *(*gportfn)(lwkt_port_t),
220 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
221 	       int (*wmsgfn)(lwkt_msg_t, int),
222 	       void *(*wportfn)(lwkt_port_t, int),
223 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
224 	       void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
225 {
226     bzero(port, sizeof(*port));
227     TAILQ_INIT(&port->mp_msgq);
228     TAILQ_INIT(&port->mp_msgq_prio);
229     port->mp_getport = gportfn;
230     port->mp_putport = pportfn;
231     port->mp_waitmsg =  wmsgfn;
232     port->mp_waitport =  wportfn;
233     port->mp_replyport = rportfn;
234     port->mp_dropmsg = dmsgfn;
235 }
236 
237 /*
238  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
239  * we tell the scheduler not to reschedule if td is at a higher priority.
240  *
241  * This routine is called even if the thread is already scheduled.
242  */
243 static __inline
244 void
245 _lwkt_schedule_msg(thread_t td, int flags)
246 {
247     lwkt_schedule(td);
248 }
249 
250 /*
251  * lwkt_initport_thread()
252  *
253  *	Initialize a port for use by a particular thread.  The port may
254  *	only be used by <td>.
255  */
256 void
257 lwkt_initport_thread(lwkt_port_t port, thread_t td)
258 {
259     _lwkt_initport(port,
260 		   lwkt_thread_getport,
261 		   lwkt_thread_putport,
262 		   lwkt_thread_waitmsg,
263 		   lwkt_thread_waitport,
264 		   lwkt_thread_replyport,
265 		   lwkt_thread_dropmsg);
266     port->mpu_td = td;
267 }
268 
269 /*
270  * lwkt_initport_spin()
271  *
272  *	Initialize a port for use with descriptors that might be accessed
273  *	via multiple LWPs, processes, or threads.  Has somewhat more
274  *	overhead then thread ports.
275  */
276 void
277 lwkt_initport_spin(lwkt_port_t port)
278 {
279     _lwkt_initport(port,
280 		   lwkt_spin_getport,
281 		   lwkt_spin_putport,
282 		   lwkt_spin_waitmsg,
283 		   lwkt_spin_waitport,
284 		   lwkt_spin_replyport,
285 		   lwkt_panic_dropmsg);
286     spin_init(&port->mpu_spin);
287 }
288 
289 /*
290  * lwkt_initport_serialize()
291  *
292  *	Initialize a port for use with descriptors that might be accessed
293  *	via multiple LWPs, processes, or threads.  Callers are assumed to
294  *	have held the serializer (slz).
295  */
296 void
297 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
298 {
299     _lwkt_initport(port,
300 		   lwkt_serialize_getport,
301 		   lwkt_serialize_putport,
302 		   lwkt_serialize_waitmsg,
303 		   lwkt_serialize_waitport,
304 		   lwkt_serialize_replyport,
305 		   lwkt_panic_dropmsg);
306     port->mpu_serialize = slz;
307 }
308 
309 /*
310  * Similar to the standard initport, this function simply marks the message
311  * as being done and does not attempt to return it to an originating port.
312  */
313 void
314 lwkt_initport_replyonly_null(lwkt_port_t port)
315 {
316     _lwkt_initport(port,
317 		   lwkt_panic_getport,
318 		   lwkt_panic_putport,
319 		   lwkt_panic_waitmsg,
320 		   lwkt_panic_waitport,
321 		   lwkt_null_replyport,
322 		   lwkt_panic_dropmsg);
323 }
324 
325 /*
326  * Initialize a reply-only port, typically used as a message sink.  Such
327  * ports can only be used as a reply port.
328  */
329 void
330 lwkt_initport_replyonly(lwkt_port_t port,
331 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
332 {
333     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
334 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
335 			 rportfn, lwkt_panic_dropmsg);
336 }
337 
338 void
339 lwkt_initport_putonly(lwkt_port_t port,
340 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
341 {
342     _lwkt_initport(port, lwkt_panic_getport, pportfn,
343 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
344 			 lwkt_panic_replyport, lwkt_panic_dropmsg);
345 }
346 
347 void
348 lwkt_initport_panic(lwkt_port_t port)
349 {
350     _lwkt_initport(port,
351 		   lwkt_panic_getport, lwkt_panic_putport,
352 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
353 		   lwkt_panic_replyport, lwkt_panic_dropmsg);
354 }
355 
356 static __inline
357 void
358 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
359 {
360     lwkt_msg_queue *queue;
361 
362     /*
363      * normal case, remove and return the message.
364      */
365     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
366 	queue = &port->mp_msgq_prio;
367     else
368 	queue = &port->mp_msgq;
369     TAILQ_REMOVE(queue, msg, ms_node);
370 
371     /*
372      * atomic op needed for spin ports
373      */
374     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
375 }
376 
377 static __inline
378 void
379 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
380 {
381     lwkt_msg_queue *queue;
382 
383     /*
384      * atomic op needed for spin ports
385      */
386     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
387     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
388 	queue = &port->mp_msgq_prio;
389     else
390     	queue = &port->mp_msgq;
391     TAILQ_INSERT_TAIL(queue, msg, ms_node);
392 }
393 
394 static __inline
395 lwkt_msg_t
396 _lwkt_pollmsg(lwkt_port_t port)
397 {
398     lwkt_msg_t msg;
399 
400     msg = TAILQ_FIRST(&port->mp_msgq_prio);
401     if (__predict_false(msg != NULL))
402 	return msg;
403 
404     /*
405      * Priority queue has no message, fallback to non-priority queue.
406      */
407     return TAILQ_FIRST(&port->mp_msgq);
408 }
409 
410 static __inline
411 void
412 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
413 {
414     /*
415      * atomic op needed for spin ports
416      */
417     _lwkt_pushmsg(port, msg);
418     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
419 }
420 
421 /************************************************************************
422  *			THREAD PORT BACKEND				*
423  ************************************************************************
424  *
425  * This backend is used when the port a message is retrieved from is owned
426  * by a single thread (the calling thread).  Messages are IPId to the
427  * correct cpu before being enqueued to a port.  Note that this is fairly
428  * optimal since scheduling would have had to do an IPI anyway if the
429  * message were headed to a different cpu.
430  */
431 
432 #ifdef SMP
433 
434 /*
435  * This function completes reply processing for the default case in the
436  * context of the originating cpu.
437  */
438 static
439 void
440 lwkt_thread_replyport_remote(lwkt_msg_t msg)
441 {
442     lwkt_port_t port = msg->ms_reply_port;
443     int flags;
444 
445     /*
446      * Chase any thread migration that occurs
447      */
448     if (port->mpu_td->td_gd != mycpu) {
449 	lwkt_send_ipiq(port->mpu_td->td_gd,
450 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
451 	return;
452     }
453 
454     /*
455      * Cleanup
456      */
457 #ifdef INVARIANTS
458     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
459     msg->ms_flags &= ~MSGF_INTRANSIT;
460 #endif
461     flags = msg->ms_flags;
462     if (msg->ms_flags & MSGF_SYNC) {
463 	cpu_sfence();
464 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
465     } else {
466 	_lwkt_enqueue_reply(port, msg);
467     }
468     if (port->mp_flags & MSGPORTF_WAITING)
469 	_lwkt_schedule_msg(port->mpu_td, flags);
470 }
471 
472 #endif
473 
474 /*
475  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
476  *
477  * Called with the reply port as an argument but in the context of the
478  * original target port.  Completion must occur on the target port's
479  * cpu.
480  *
481  * The critical section protects us from IPIs on the this CPU.
482  */
483 static
484 void
485 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
486 {
487     int flags;
488 
489     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
490 
491     if (msg->ms_flags & MSGF_SYNC) {
492 	/*
493 	 * If a synchronous completion has been requested, just wakeup
494 	 * the message without bothering to queue it to the target port.
495 	 *
496 	 * Assume the target thread is non-preemptive, so no critical
497 	 * section is required.
498 	 */
499 #ifdef SMP
500 	if (port->mpu_td->td_gd == mycpu) {
501 #endif
502 	    flags = msg->ms_flags;
503 	    cpu_sfence();
504 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
505 	    if (port->mp_flags & MSGPORTF_WAITING)
506 		_lwkt_schedule_msg(port->mpu_td, flags);
507 #ifdef SMP
508 	} else {
509 #ifdef INVARIANTS
510 	    msg->ms_flags |= MSGF_INTRANSIT;
511 #endif
512 	    msg->ms_flags |= MSGF_REPLY;
513 	    lwkt_send_ipiq(port->mpu_td->td_gd,
514 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
515 	}
516 #endif
517     } else {
518 	/*
519 	 * If an asynchronous completion has been requested the message
520 	 * must be queued to the reply port.
521 	 *
522 	 * A critical section is required to interlock the port queue.
523 	 */
524 #ifdef SMP
525 	if (port->mpu_td->td_gd == mycpu) {
526 #endif
527 	    crit_enter();
528 	    _lwkt_enqueue_reply(port, msg);
529 	    if (port->mp_flags & MSGPORTF_WAITING)
530 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
531 	    crit_exit();
532 #ifdef SMP
533 	} else {
534 #ifdef INVARIANTS
535 	    msg->ms_flags |= MSGF_INTRANSIT;
536 #endif
537 	    msg->ms_flags |= MSGF_REPLY;
538 	    lwkt_send_ipiq(port->mpu_td->td_gd,
539 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
540 	}
541 #endif
542     }
543 }
544 
545 /*
546  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
547  *
548  * This function could _only_ be used when caller is in the same thread
549  * as the message's target port owner thread.
550  */
551 static void
552 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
553 {
554     KASSERT(port->mpu_td == curthread,
555     	    ("message could only be dropped in the same thread "
556 	     "as the message target port thread\n"));
557     crit_enter_quick(port->mpu_td);
558     _lwkt_pullmsg(port, msg);
559     msg->ms_flags |= MSGF_DONE;
560     crit_exit_quick(port->mpu_td);
561 }
562 
563 /*
564  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
565  *
566  * Called with the target port as an argument but in the context of the
567  * reply port.  This function always implements an asynchronous put to
568  * the target message port, and thus returns EASYNC.
569  *
570  * The message must already have cleared MSGF_DONE and MSGF_REPLY
571  */
572 
573 #ifdef SMP
574 
575 static
576 void
577 lwkt_thread_putport_remote(lwkt_msg_t msg)
578 {
579     lwkt_port_t port = msg->ms_target_port;
580 
581     /*
582      * Chase any thread migration that occurs
583      */
584     if (port->mpu_td->td_gd != mycpu) {
585 	lwkt_send_ipiq(port->mpu_td->td_gd,
586 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
587 	return;
588     }
589 
590     /*
591      * Cleanup
592      */
593 #ifdef INVARIANTS
594     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
595     msg->ms_flags &= ~MSGF_INTRANSIT;
596 #endif
597     _lwkt_pushmsg(port, msg);
598     if (port->mp_flags & MSGPORTF_WAITING)
599 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
600 }
601 
602 #endif
603 
604 static
605 int
606 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
607 {
608     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
609 
610     msg->ms_target_port = port;
611 #ifdef SMP
612     if (port->mpu_td->td_gd == mycpu) {
613 #endif
614 	crit_enter();
615 	_lwkt_pushmsg(port, msg);
616 	if (port->mp_flags & MSGPORTF_WAITING)
617 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
618 	crit_exit();
619 #ifdef SMP
620     } else {
621 #ifdef INVARIANTS
622 	msg->ms_flags |= MSGF_INTRANSIT;
623 #endif
624 	lwkt_send_ipiq(port->mpu_td->td_gd,
625 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
626     }
627 #endif
628     return (EASYNC);
629 }
630 
631 /*
632  * lwkt_thread_getport()
633  *
634  *	Retrieve the next message from the port or NULL if no messages
635  *	are ready.
636  */
637 static
638 void *
639 lwkt_thread_getport(lwkt_port_t port)
640 {
641     lwkt_msg_t msg;
642 
643     KKASSERT(port->mpu_td == curthread);
644 
645     crit_enter_quick(port->mpu_td);
646     if ((msg = _lwkt_pollmsg(port)) != NULL)
647 	_lwkt_pullmsg(port, msg);
648     crit_exit_quick(port->mpu_td);
649     return(msg);
650 }
651 
652 /*
653  * lwkt_thread_waitmsg()
654  *
655  *	Wait for a particular message to be replied.  We must be the only
656  *	thread waiting on the message.  The port must be owned by the
657  *	caller.
658  */
659 static
660 int
661 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
662 {
663     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
664     	    ("can't wait dropable message\n"));
665 
666     if ((msg->ms_flags & MSGF_DONE) == 0) {
667 	/*
668 	 * If the done bit was not set we have to block until it is.
669 	 */
670 	lwkt_port_t port = msg->ms_reply_port;
671 	thread_t td = curthread;
672 	int sentabort;
673 
674 	KKASSERT(port->mpu_td == td);
675 	crit_enter_quick(td);
676 	sentabort = 0;
677 
678 	while ((msg->ms_flags & MSGF_DONE) == 0) {
679 	    port->mp_flags |= MSGPORTF_WAITING;
680 	    if (sentabort == 0) {
681 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
682 		    lwkt_abortmsg(msg);
683 		}
684 	    } else {
685 		lwkt_sleep("waitabt", 0);
686 	    }
687 	    port->mp_flags &= ~MSGPORTF_WAITING;
688 	}
689 	if (msg->ms_flags & MSGF_QUEUED)
690 	    _lwkt_pullmsg(port, msg);
691 	crit_exit_quick(td);
692     } else {
693 	/*
694 	 * If the done bit was set we only have to mess around with the
695 	 * message if it is queued on the reply port.
696 	 */
697 	if (msg->ms_flags & MSGF_QUEUED) {
698 	    lwkt_port_t port = msg->ms_reply_port;
699 	    thread_t td = curthread;
700 
701 	    KKASSERT(port->mpu_td == td);
702 	    crit_enter_quick(td);
703 	    _lwkt_pullmsg(port, msg);
704 	    crit_exit_quick(td);
705 	}
706     }
707     return(msg->ms_error);
708 }
709 
710 static
711 void *
712 lwkt_thread_waitport(lwkt_port_t port, int flags)
713 {
714     thread_t td = curthread;
715     lwkt_msg_t msg;
716     int error;
717 
718     KKASSERT(port->mpu_td == td);
719     crit_enter_quick(td);
720     while ((msg = _lwkt_pollmsg(port)) == NULL) {
721 	port->mp_flags |= MSGPORTF_WAITING;
722 	error = lwkt_sleep("waitport", flags);
723 	port->mp_flags &= ~MSGPORTF_WAITING;
724 	if (error)
725 	    goto done;
726     }
727     _lwkt_pullmsg(port, msg);
728 done:
729     crit_exit_quick(td);
730     return(msg);
731 }
732 
733 /************************************************************************
734  *			   SPIN PORT BACKEND				*
735  ************************************************************************
736  *
737  * This backend uses spinlocks instead of making assumptions about which
738  * thread is accessing the port.  It must be used when a port is not owned
739  * by a particular thread.  This is less optimal then thread ports but
740  * you don't have a choice if there are multiple threads accessing the port.
741  *
742  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
743  * on the message port, it is the responsibility of the code doing the
744  * wakeup to clear this flag rather then the blocked threads.  Some
745  * superfluous wakeups may occur, which is ok.
746  *
747  * XXX synchronous message wakeups are not current optimized.
748  */
749 
750 static
751 void *
752 lwkt_spin_getport(lwkt_port_t port)
753 {
754     lwkt_msg_t msg;
755 
756     spin_lock(&port->mpu_spin);
757     if ((msg = _lwkt_pollmsg(port)) != NULL)
758 	_lwkt_pullmsg(port, msg);
759     spin_unlock(&port->mpu_spin);
760     return(msg);
761 }
762 
763 static
764 int
765 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
766 {
767     int dowakeup;
768 
769     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
770 
771     msg->ms_target_port = port;
772     spin_lock(&port->mpu_spin);
773     _lwkt_pushmsg(port, msg);
774     dowakeup = 0;
775     if (port->mp_flags & MSGPORTF_WAITING) {
776 	port->mp_flags &= ~MSGPORTF_WAITING;
777 	dowakeup = 1;
778     }
779     spin_unlock(&port->mpu_spin);
780     if (dowakeup)
781 	wakeup(port);
782     return (EASYNC);
783 }
784 
785 static
786 int
787 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
788 {
789     lwkt_port_t port;
790     int sentabort;
791     int error;
792 
793     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
794     	    ("can't wait dropable message\n"));
795 
796     if ((msg->ms_flags & MSGF_DONE) == 0) {
797 	port = msg->ms_reply_port;
798 	sentabort = 0;
799 	spin_lock(&port->mpu_spin);
800 	while ((msg->ms_flags & MSGF_DONE) == 0) {
801 	    void *won;
802 
803 	    /*
804 	     * If message was sent synchronously from the beginning
805 	     * the wakeup will be on the message structure, else it
806 	     * will be on the port structure.
807 	     */
808 	    if (msg->ms_flags & MSGF_SYNC) {
809 		won = msg;
810 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
811 	    } else {
812 		won = port;
813 		port->mp_flags |= MSGPORTF_WAITING;
814 	    }
815 
816 	    /*
817 	     * Only messages which support abort can be interrupted.
818 	     * We must still wait for message completion regardless.
819 	     */
820 	    if ((flags & PCATCH) && sentabort == 0) {
821 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
822 		if (error) {
823 		    sentabort = error;
824 		    spin_unlock(&port->mpu_spin);
825 		    lwkt_abortmsg(msg);
826 		    spin_lock(&port->mpu_spin);
827 		}
828 	    } else {
829 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
830 	    }
831 	    /* see note at the top on the MSGPORTF_WAITING flag */
832 	}
833 	/*
834 	 * Turn EINTR into ERESTART if the signal indicates.
835 	 */
836 	if (sentabort && msg->ms_error == EINTR)
837 	    msg->ms_error = sentabort;
838 	if (msg->ms_flags & MSGF_QUEUED)
839 	    _lwkt_pullmsg(port, msg);
840 	spin_unlock(&port->mpu_spin);
841     } else {
842 	if (msg->ms_flags & MSGF_QUEUED) {
843 	    port = msg->ms_reply_port;
844 	    spin_lock(&port->mpu_spin);
845 	    _lwkt_pullmsg(port, msg);
846 	    spin_unlock(&port->mpu_spin);
847 	}
848     }
849     return(msg->ms_error);
850 }
851 
852 static
853 void *
854 lwkt_spin_waitport(lwkt_port_t port, int flags)
855 {
856     lwkt_msg_t msg;
857     int error;
858 
859     spin_lock(&port->mpu_spin);
860     while ((msg = _lwkt_pollmsg(port)) == NULL) {
861 	port->mp_flags |= MSGPORTF_WAITING;
862 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
863 	/* see note at the top on the MSGPORTF_WAITING flag */
864 	if (error) {
865 	    spin_unlock(&port->mpu_spin);
866 	    return(NULL);
867 	}
868     }
869     _lwkt_pullmsg(port, msg);
870     spin_unlock(&port->mpu_spin);
871     return(msg);
872 }
873 
874 static
875 void
876 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
877 {
878     int dowakeup;
879 
880     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
881 
882     if (msg->ms_flags & MSGF_SYNC) {
883 	/*
884 	 * If a synchronous completion has been requested, just wakeup
885 	 * the message without bothering to queue it to the target port.
886 	 */
887 	spin_lock(&port->mpu_spin);
888 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
889 	dowakeup = 0;
890 	if (msg->ms_flags & MSGF_WAITING) {
891 		atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
892 		dowakeup = 1;
893 	}
894 	spin_unlock(&port->mpu_spin);
895 	if (dowakeup)
896 		wakeup(msg);
897     } else {
898 	/*
899 	 * If an asynchronous completion has been requested the message
900 	 * must be queued to the reply port.
901 	 */
902 	spin_lock(&port->mpu_spin);
903 	_lwkt_enqueue_reply(port, msg);
904 	dowakeup = 0;
905 	if (port->mp_flags & MSGPORTF_WAITING) {
906 	    port->mp_flags &= ~MSGPORTF_WAITING;
907 	    dowakeup = 1;
908 	}
909 	spin_unlock(&port->mpu_spin);
910 	if (dowakeup)
911 	    wakeup(port);
912     }
913 }
914 
915 /************************************************************************
916  *			  SERIALIZER PORT BACKEND			*
917  ************************************************************************
918  *
919  * This backend uses serializer to protect port accessing.  Callers are
920  * assumed to have serializer held.  This kind of port is usually created
921  * by network device driver along with _one_ lwkt thread to pipeline
922  * operations which may temporarily release serializer.
923  *
924  * Implementation is based on SPIN PORT BACKEND.
925  */
926 
927 static
928 void *
929 lwkt_serialize_getport(lwkt_port_t port)
930 {
931     lwkt_msg_t msg;
932 
933     ASSERT_SERIALIZED(port->mpu_serialize);
934 
935     if ((msg = _lwkt_pollmsg(port)) != NULL)
936 	_lwkt_pullmsg(port, msg);
937     return(msg);
938 }
939 
940 static
941 int
942 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
943 {
944     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
945     ASSERT_SERIALIZED(port->mpu_serialize);
946 
947     msg->ms_target_port = port;
948     _lwkt_pushmsg(port, msg);
949     if (port->mp_flags & MSGPORTF_WAITING) {
950 	port->mp_flags &= ~MSGPORTF_WAITING;
951 	wakeup(port);
952     }
953     return (EASYNC);
954 }
955 
956 static
957 int
958 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
959 {
960     lwkt_port_t port;
961     int sentabort;
962     int error;
963 
964     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
965     	    ("can't wait dropable message\n"));
966 
967     if ((msg->ms_flags & MSGF_DONE) == 0) {
968 	port = msg->ms_reply_port;
969 
970 	ASSERT_SERIALIZED(port->mpu_serialize);
971 
972 	sentabort = 0;
973 	while ((msg->ms_flags & MSGF_DONE) == 0) {
974 	    void *won;
975 
976 	    /*
977 	     * If message was sent synchronously from the beginning
978 	     * the wakeup will be on the message structure, else it
979 	     * will be on the port structure.
980 	     */
981 	    if (msg->ms_flags & MSGF_SYNC) {
982 		won = msg;
983 	    } else {
984 		won = port;
985 		port->mp_flags |= MSGPORTF_WAITING;
986 	    }
987 
988 	    /*
989 	     * Only messages which support abort can be interrupted.
990 	     * We must still wait for message completion regardless.
991 	     */
992 	    if ((flags & PCATCH) && sentabort == 0) {
993 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
994 		if (error) {
995 		    sentabort = error;
996 		    lwkt_serialize_exit(port->mpu_serialize);
997 		    lwkt_abortmsg(msg);
998 		    lwkt_serialize_enter(port->mpu_serialize);
999 		}
1000 	    } else {
1001 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1002 	    }
1003 	    /* see note at the top on the MSGPORTF_WAITING flag */
1004 	}
1005 	/*
1006 	 * Turn EINTR into ERESTART if the signal indicates.
1007 	 */
1008 	if (sentabort && msg->ms_error == EINTR)
1009 	    msg->ms_error = sentabort;
1010 	if (msg->ms_flags & MSGF_QUEUED)
1011 	    _lwkt_pullmsg(port, msg);
1012     } else {
1013 	if (msg->ms_flags & MSGF_QUEUED) {
1014 	    port = msg->ms_reply_port;
1015 
1016 	    ASSERT_SERIALIZED(port->mpu_serialize);
1017 	    _lwkt_pullmsg(port, msg);
1018 	}
1019     }
1020     return(msg->ms_error);
1021 }
1022 
1023 static
1024 void *
1025 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1026 {
1027     lwkt_msg_t msg;
1028     int error;
1029 
1030     ASSERT_SERIALIZED(port->mpu_serialize);
1031 
1032     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1033 	port->mp_flags |= MSGPORTF_WAITING;
1034 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1035 	/* see note at the top on the MSGPORTF_WAITING flag */
1036 	if (error)
1037 	    return(NULL);
1038     }
1039     _lwkt_pullmsg(port, msg);
1040     return(msg);
1041 }
1042 
1043 static
1044 void
1045 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1046 {
1047     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1048     ASSERT_SERIALIZED(port->mpu_serialize);
1049 
1050     if (msg->ms_flags & MSGF_SYNC) {
1051 	/*
1052 	 * If a synchronous completion has been requested, just wakeup
1053 	 * the message without bothering to queue it to the target port.
1054 	 */
1055 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1056 	wakeup(msg);
1057     } else {
1058 	/*
1059 	 * If an asynchronous completion has been requested the message
1060 	 * must be queued to the reply port.
1061 	 */
1062 	_lwkt_enqueue_reply(port, msg);
1063 	if (port->mp_flags & MSGPORTF_WAITING) {
1064 	    port->mp_flags &= ~MSGPORTF_WAITING;
1065 	    wakeup(port);
1066 	}
1067     }
1068 }
1069 
1070 /************************************************************************
1071  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1072  ************************************************************************/
1073 
1074 /*
1075  * You can point a port's reply vector at this function if you just want
1076  * the message marked done, without any queueing or signaling.  This is
1077  * often used for structure-embedded messages.
1078  */
1079 static
1080 void
1081 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1082 {
1083     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1084 }
1085 
1086 static
1087 void *
1088 lwkt_panic_getport(lwkt_port_t port)
1089 {
1090     panic("lwkt_getport() illegal on port %p", port);
1091 }
1092 
1093 static
1094 int
1095 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1096 {
1097     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1098 }
1099 
1100 static
1101 int
1102 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1103 {
1104     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1105 }
1106 
1107 static
1108 void *
1109 lwkt_panic_waitport(lwkt_port_t port, int flags)
1110 {
1111     panic("port %p cannot be waited on", port);
1112 }
1113 
1114 static
1115 void
1116 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1117 {
1118     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1119 }
1120 
1121 static
1122 void
1123 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1124 {
1125     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1126 }
1127