xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 25a2db75)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69 
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72 
73 /************************************************************************
74  *				MESSAGE FUNCTIONS			*
75  ************************************************************************/
76 
77 /*
78  * lwkt_sendmsg()
79  *
80  *	Request asynchronous completion and call lwkt_beginmsg().  The
81  *	target port can opt to execute the message synchronously or
82  *	asynchronously and this function will automatically queue the
83  *	response if the target executes the message synchronously.
84  *
85  *	NOTE: The message is in an indeterminant state until this call
86  *	returns.  The caller should not mess with it (e.g. try to abort it)
87  *	until then.
88  *
89  *	NOTE: Do not use this function to forward a message as we might
90  *	clobber ms_flags in a SMP race.
91  */
92 void
93 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
94 {
95     int error;
96 
97     KKASSERT(msg->ms_reply_port != NULL &&
98 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
99     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
100     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
101 	/*
102 	 * Target port opted to execute the message synchronously so
103 	 * queue the response.
104 	 */
105 	lwkt_replymsg(msg, error);
106     }
107 }
108 
109 void
110 lwkt_sendmsg_stage1(lwkt_port_t port, lwkt_msg_t msg)
111 {
112     KKASSERT(msg->ms_reply_port != NULL &&
113 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
114     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
115 }
116 
117 void
118 lwkt_sendmsg_stage2(lwkt_port_t port, lwkt_msg_t msg)
119 {
120     int error;
121 
122     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
123 	/*
124 	 * Target port opted to execute the message synchronously so
125 	 * queue the response.
126 	 */
127 	lwkt_replymsg(msg, error);
128     }
129 }
130 
131 /*
132  * lwkt_domsg()
133  *
134  *	Request synchronous completion and call lwkt_beginmsg().  The
135  *	target port can opt to execute the message synchronously or
136  *	asynchronously and this function will automatically block and
137  *	wait for a response if the target executes the message
138  *	asynchronously.
139  *
140  *	NOTE: Do not use this function to forward a message as we might
141  *	clobber ms_flags in a SMP race.
142  */
143 int
144 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
145 {
146     int error;
147 
148     KKASSERT(msg->ms_reply_port != NULL &&
149 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
150     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
151     msg->ms_flags |= MSGF_SYNC;
152     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
153 	/*
154 	 * Target port opted to execute the message asynchronously so
155 	 * block and wait for a reply.
156 	 */
157 	error = lwkt_waitmsg(msg, flags);
158     } else {
159 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
160     }
161     return(error);
162 }
163 
164 /*
165  * lwkt_forwardmsg()
166  *
167  * Forward a message received on one port to another port.
168  */
169 int
170 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
171 {
172     int error;
173 
174     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
175     if ((error = port->mp_putport(port, msg)) != EASYNC)
176 	lwkt_replymsg(msg, error);
177     return(error);
178 }
179 
180 /*
181  * lwkt_abortmsg()
182  *
183  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
184  * The caller must ensure that the message will not be both replied AND
185  * destroyed while the abort is in progress.
186  *
187  * This function issues a callback which might block!
188  */
189 void
190 lwkt_abortmsg(lwkt_msg_t msg)
191 {
192     /*
193      * A critical section protects us from reply IPIs on this cpu.
194      */
195     crit_enter();
196 
197     /*
198      * Shortcut the operation if the message has already been returned.
199      * The callback typically constructs a lwkt_msg with the abort request,
200      * issues it synchronously, and waits for completion.  The callback
201      * is not required to actually abort the message and the target port,
202      * upon receiving an abort request message generated by the callback
203      * should check whether the original message has already completed or
204      * not.
205      */
206     if (msg->ms_flags & MSGF_ABORTABLE) {
207 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
208 	    msg->ms_abortfn(msg);
209     }
210     crit_exit();
211 }
212 
213 /************************************************************************
214  *			PORT INITIALIZATION API				*
215  ************************************************************************/
216 
217 static void *lwkt_thread_getport(lwkt_port_t port);
218 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
219 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
220 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
221 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
222 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
223 
224 static void *lwkt_spin_getport(lwkt_port_t port);
225 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
226 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
227 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
228 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
229 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
230 
231 static void *lwkt_serialize_getport(lwkt_port_t port);
232 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
233 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
234 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
235 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
236 
237 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
238 static void *lwkt_panic_getport(lwkt_port_t port);
239 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
240 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
241 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
242 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
243 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
244 
245 /*
246  * Core port initialization (internal)
247  */
248 static __inline
249 void
250 _lwkt_initport(lwkt_port_t port,
251 	       void *(*gportfn)(lwkt_port_t),
252 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
253 	       int (*wmsgfn)(lwkt_msg_t, int),
254 	       void *(*wportfn)(lwkt_port_t, int),
255 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
256 	       int (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
257 {
258     bzero(port, sizeof(*port));
259     TAILQ_INIT(&port->mp_msgq);
260     TAILQ_INIT(&port->mp_msgq_prio);
261     port->mp_getport = gportfn;
262     port->mp_putport = pportfn;
263     port->mp_waitmsg =  wmsgfn;
264     port->mp_waitport =  wportfn;
265     port->mp_replyport = rportfn;
266     port->mp_dropmsg = dmsgfn;
267 }
268 
269 /*
270  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
271  * we tell the scheduler not to reschedule if td is at a higher priority.
272  *
273  * This routine is called even if the thread is already scheduled.
274  */
275 static __inline
276 void
277 _lwkt_schedule_msg(thread_t td, int flags)
278 {
279     lwkt_schedule(td);
280 }
281 
282 /*
283  * lwkt_initport_thread()
284  *
285  *	Initialize a port for use by a particular thread.  The port may
286  *	only be used by <td>.
287  */
288 void
289 lwkt_initport_thread(lwkt_port_t port, thread_t td)
290 {
291     _lwkt_initport(port,
292 		   lwkt_thread_getport,
293 		   lwkt_thread_putport,
294 		   lwkt_thread_waitmsg,
295 		   lwkt_thread_waitport,
296 		   lwkt_thread_replyport,
297 		   lwkt_thread_dropmsg);
298     port->mpu_td = td;
299 }
300 
301 /*
302  * lwkt_initport_spin()
303  *
304  *	Initialize a port for use with descriptors that might be accessed
305  *	via multiple LWPs, processes, or threads.  Has somewhat more
306  *	overhead then thread ports.
307  */
308 void
309 lwkt_initport_spin(lwkt_port_t port, thread_t td)
310 {
311     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
312 
313     if (td == NULL)
314 	dmsgfn = lwkt_panic_dropmsg;
315     else
316 	dmsgfn = lwkt_spin_dropmsg;
317 
318     _lwkt_initport(port,
319 		   lwkt_spin_getport,
320 		   lwkt_spin_putport,
321 		   lwkt_spin_waitmsg,
322 		   lwkt_spin_waitport,
323 		   lwkt_spin_replyport,
324 		   dmsgfn);
325     spin_init(&port->mpu_spin);
326     port->mpu_td = td;
327 }
328 
329 /*
330  * lwkt_initport_serialize()
331  *
332  *	Initialize a port for use with descriptors that might be accessed
333  *	via multiple LWPs, processes, or threads.  Callers are assumed to
334  *	have held the serializer (slz).
335  */
336 void
337 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
338 {
339     _lwkt_initport(port,
340 		   lwkt_serialize_getport,
341 		   lwkt_serialize_putport,
342 		   lwkt_serialize_waitmsg,
343 		   lwkt_serialize_waitport,
344 		   lwkt_serialize_replyport,
345 		   lwkt_panic_dropmsg);
346     port->mpu_serialize = slz;
347 }
348 
349 /*
350  * Similar to the standard initport, this function simply marks the message
351  * as being done and does not attempt to return it to an originating port.
352  */
353 void
354 lwkt_initport_replyonly_null(lwkt_port_t port)
355 {
356     _lwkt_initport(port,
357 		   lwkt_panic_getport,
358 		   lwkt_panic_putport,
359 		   lwkt_panic_waitmsg,
360 		   lwkt_panic_waitport,
361 		   lwkt_null_replyport,
362 		   lwkt_panic_dropmsg);
363 }
364 
365 /*
366  * Initialize a reply-only port, typically used as a message sink.  Such
367  * ports can only be used as a reply port.
368  */
369 void
370 lwkt_initport_replyonly(lwkt_port_t port,
371 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
372 {
373     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
374 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
375 			 rportfn, lwkt_panic_dropmsg);
376 }
377 
378 void
379 lwkt_initport_putonly(lwkt_port_t port,
380 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
381 {
382     _lwkt_initport(port, lwkt_panic_getport, pportfn,
383 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
384 			 lwkt_panic_replyport, lwkt_panic_dropmsg);
385 }
386 
387 void
388 lwkt_initport_panic(lwkt_port_t port)
389 {
390     _lwkt_initport(port,
391 		   lwkt_panic_getport, lwkt_panic_putport,
392 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
393 		   lwkt_panic_replyport, lwkt_panic_dropmsg);
394 }
395 
396 static __inline
397 void
398 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
399 {
400     lwkt_msg_queue *queue;
401 
402     /*
403      * normal case, remove and return the message.
404      */
405     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
406 	queue = &port->mp_msgq_prio;
407     else
408 	queue = &port->mp_msgq;
409     TAILQ_REMOVE(queue, msg, ms_node);
410 
411     /*
412      * atomic op needed for spin ports
413      */
414     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
415 }
416 
417 static __inline
418 void
419 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
420 {
421     lwkt_msg_queue *queue;
422 
423     /*
424      * atomic op needed for spin ports
425      */
426     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
427     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
428 	queue = &port->mp_msgq_prio;
429     else
430     	queue = &port->mp_msgq;
431     TAILQ_INSERT_TAIL(queue, msg, ms_node);
432 }
433 
434 static __inline
435 lwkt_msg_t
436 _lwkt_pollmsg(lwkt_port_t port)
437 {
438     lwkt_msg_t msg;
439 
440     msg = TAILQ_FIRST(&port->mp_msgq_prio);
441     if (__predict_false(msg != NULL))
442 	return msg;
443 
444     /*
445      * Priority queue has no message, fallback to non-priority queue.
446      */
447     return TAILQ_FIRST(&port->mp_msgq);
448 }
449 
450 static __inline
451 void
452 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
453 {
454     /*
455      * atomic op needed for spin ports
456      */
457     _lwkt_pushmsg(port, msg);
458     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
459 }
460 
461 /************************************************************************
462  *			THREAD PORT BACKEND				*
463  ************************************************************************
464  *
465  * This backend is used when the port a message is retrieved from is owned
466  * by a single thread (the calling thread).  Messages are IPId to the
467  * correct cpu before being enqueued to a port.  Note that this is fairly
468  * optimal since scheduling would have had to do an IPI anyway if the
469  * message were headed to a different cpu.
470  */
471 
472 /*
473  * This function completes reply processing for the default case in the
474  * context of the originating cpu.
475  */
476 static
477 void
478 lwkt_thread_replyport_remote(lwkt_msg_t msg)
479 {
480     lwkt_port_t port = msg->ms_reply_port;
481     int flags;
482 
483     /*
484      * Chase any thread migration that occurs
485      */
486     if (port->mpu_td->td_gd != mycpu) {
487 	lwkt_send_ipiq(port->mpu_td->td_gd,
488 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
489 	return;
490     }
491 
492     /*
493      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
494      */
495 #ifdef INVARIANTS
496     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
497     msg->ms_flags &= ~MSGF_INTRANSIT;
498 #endif
499     flags = msg->ms_flags;
500     if (msg->ms_flags & MSGF_SYNC) {
501 	cpu_sfence();
502 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
503     } else {
504 	_lwkt_enqueue_reply(port, msg);
505     }
506     if (port->mp_flags & MSGPORTF_WAITING)
507 	_lwkt_schedule_msg(port->mpu_td, flags);
508 }
509 
510 /*
511  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
512  *
513  * Called with the reply port as an argument but in the context of the
514  * original target port.  Completion must occur on the target port's
515  * cpu.
516  *
517  * The critical section protects us from IPIs on the this CPU.
518  */
519 static
520 void
521 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
522 {
523     int flags;
524 
525     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
526 
527     if (msg->ms_flags & MSGF_SYNC) {
528 	/*
529 	 * If a synchronous completion has been requested, just wakeup
530 	 * the message without bothering to queue it to the target port.
531 	 *
532 	 * Assume the target thread is non-preemptive, so no critical
533 	 * section is required.
534 	 */
535 	if (port->mpu_td->td_gd == mycpu) {
536 	    crit_enter();
537 	    flags = msg->ms_flags;
538 	    cpu_sfence();
539 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
540 	    if (port->mp_flags & MSGPORTF_WAITING)
541 		_lwkt_schedule_msg(port->mpu_td, flags);
542 	    crit_exit();
543 	} else {
544 #ifdef INVARIANTS
545 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
546 #endif
547 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
548 	    lwkt_send_ipiq(port->mpu_td->td_gd,
549 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
550 	}
551     } else {
552 	/*
553 	 * If an asynchronous completion has been requested the message
554 	 * must be queued to the reply port.
555 	 *
556 	 * A critical section is required to interlock the port queue.
557 	 */
558 	if (port->mpu_td->td_gd == mycpu) {
559 	    crit_enter();
560 	    _lwkt_enqueue_reply(port, msg);
561 	    if (port->mp_flags & MSGPORTF_WAITING)
562 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
563 	    crit_exit();
564 	} else {
565 #ifdef INVARIANTS
566 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
567 #endif
568 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
569 	    lwkt_send_ipiq(port->mpu_td->td_gd,
570 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
571 	}
572     }
573 }
574 
575 /*
576  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
577  *
578  * This function could _only_ be used when caller is in the same thread
579  * as the message's target port owner thread.
580  */
581 static int
582 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
583 {
584     int error;
585 
586     KASSERT(port->mpu_td == curthread,
587     	    ("message could only be dropped in the same thread "
588 	     "as the message target port thread"));
589     crit_enter_quick(port->mpu_td);
590     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
591 	    _lwkt_pullmsg(port, msg);
592 	    atomic_set_int(&msg->ms_flags, MSGF_DONE);
593 	    error = 0;
594     } else {
595 	    error = ENOENT;
596     }
597     crit_exit_quick(port->mpu_td);
598 
599     return (error);
600 }
601 
602 /*
603  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
604  *
605  * Called with the target port as an argument but in the context of the
606  * reply port.  This function always implements an asynchronous put to
607  * the target message port, and thus returns EASYNC.
608  *
609  * The message must already have cleared MSGF_DONE and MSGF_REPLY
610  */
611 static
612 void
613 lwkt_thread_putport_remote(lwkt_msg_t msg)
614 {
615     lwkt_port_t port = msg->ms_target_port;
616 
617     /*
618      * Chase any thread migration that occurs
619      */
620     if (port->mpu_td->td_gd != mycpu) {
621 	lwkt_send_ipiq(port->mpu_td->td_gd,
622 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
623 	return;
624     }
625 
626     /*
627      * An atomic op is needed on ms_flags vs originator.  Also
628      * note that the originator might be using a different type
629      * of msgport.
630      */
631 #ifdef INVARIANTS
632     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
633     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
634 #endif
635     _lwkt_pushmsg(port, msg);
636     if (port->mp_flags & MSGPORTF_WAITING)
637 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
638 }
639 
640 static
641 int
642 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
643 {
644     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
645 
646     msg->ms_target_port = port;
647     if (port->mpu_td->td_gd == mycpu) {
648 	crit_enter();
649 	_lwkt_pushmsg(port, msg);
650 	if (port->mp_flags & MSGPORTF_WAITING)
651 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
652 	crit_exit();
653     } else {
654 #ifdef INVARIANTS
655 	/*
656 	 * Cleanup.
657 	 *
658 	 * An atomic op is needed on ms_flags vs originator.  Also
659 	 * note that the originator might be using a different type
660 	 * of msgport.
661 	 */
662 	atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
663 #endif
664 	lwkt_send_ipiq(port->mpu_td->td_gd,
665 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
666     }
667     return (EASYNC);
668 }
669 
670 /*
671  * lwkt_thread_getport()
672  *
673  *	Retrieve the next message from the port or NULL if no messages
674  *	are ready.
675  */
676 static
677 void *
678 lwkt_thread_getport(lwkt_port_t port)
679 {
680     lwkt_msg_t msg;
681 
682     KKASSERT(port->mpu_td == curthread);
683 
684     crit_enter_quick(port->mpu_td);
685     if ((msg = _lwkt_pollmsg(port)) != NULL)
686 	_lwkt_pullmsg(port, msg);
687     crit_exit_quick(port->mpu_td);
688     return(msg);
689 }
690 
691 /*
692  * lwkt_thread_waitmsg()
693  *
694  *	Wait for a particular message to be replied.  We must be the only
695  *	thread waiting on the message.  The port must be owned by the
696  *	caller.
697  */
698 static
699 int
700 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
701 {
702     thread_t td = curthread;
703 
704     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
705 	    ("can't wait dropable message"));
706 
707     if ((msg->ms_flags & MSGF_DONE) == 0) {
708 	/*
709 	 * If the done bit was not set we have to block until it is.
710 	 */
711 	lwkt_port_t port = msg->ms_reply_port;
712 	int sentabort;
713 
714 	KKASSERT(port->mpu_td == td);
715 	crit_enter_quick(td);
716 	sentabort = 0;
717 
718 	while ((msg->ms_flags & MSGF_DONE) == 0) {
719 	    port->mp_flags |= MSGPORTF_WAITING;	/* same cpu */
720 	    if (sentabort == 0) {
721 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
722 		    lwkt_abortmsg(msg);
723 		}
724 	    } else {
725 		lwkt_sleep("waitabt", 0);
726 	    }
727 	    port->mp_flags &= ~MSGPORTF_WAITING;
728 	}
729 	if (msg->ms_flags & MSGF_QUEUED)
730 	    _lwkt_pullmsg(port, msg);
731 	crit_exit_quick(td);
732     } else {
733 	/*
734 	 * If the done bit was set we only have to mess around with the
735 	 * message if it is queued on the reply port.
736 	 */
737 	crit_enter_quick(td);
738 	if (msg->ms_flags & MSGF_QUEUED) {
739 	    lwkt_port_t port = msg->ms_reply_port;
740 	    thread_t td __debugvar = curthread;
741 
742 	    KKASSERT(port->mpu_td == td);
743 	    _lwkt_pullmsg(port, msg);
744 	}
745 	crit_exit_quick(td);
746     }
747     return(msg->ms_error);
748 }
749 
750 /*
751  * lwkt_thread_waitport()
752  *
753  *	Wait for a new message to be available on the port.  We must be the
754  *	the only thread waiting on the port.  The port must be owned by caller.
755  */
756 static
757 void *
758 lwkt_thread_waitport(lwkt_port_t port, int flags)
759 {
760     thread_t td = curthread;
761     lwkt_msg_t msg;
762     int error;
763 
764     KKASSERT(port->mpu_td == td);
765     crit_enter_quick(td);
766     while ((msg = _lwkt_pollmsg(port)) == NULL) {
767 	port->mp_flags |= MSGPORTF_WAITING;
768 	error = lwkt_sleep("waitport", flags);
769 	port->mp_flags &= ~MSGPORTF_WAITING;
770 	if (error)
771 	    goto done;
772     }
773     _lwkt_pullmsg(port, msg);
774 done:
775     crit_exit_quick(td);
776     return(msg);
777 }
778 
779 /************************************************************************
780  *			   SPIN PORT BACKEND				*
781  ************************************************************************
782  *
783  * This backend uses spinlocks instead of making assumptions about which
784  * thread is accessing the port.  It must be used when a port is not owned
785  * by a particular thread.  This is less optimal then thread ports but
786  * you don't have a choice if there are multiple threads accessing the port.
787  *
788  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
789  * on the message port, it is the responsibility of the code doing the
790  * wakeup to clear this flag rather then the blocked threads.  Some
791  * superfluous wakeups may occur, which is ok.
792  *
793  * XXX synchronous message wakeups are not current optimized.
794  */
795 
796 static
797 void *
798 lwkt_spin_getport(lwkt_port_t port)
799 {
800     lwkt_msg_t msg;
801 
802     spin_lock(&port->mpu_spin);
803     if ((msg = _lwkt_pollmsg(port)) != NULL)
804 	_lwkt_pullmsg(port, msg);
805     spin_unlock(&port->mpu_spin);
806     return(msg);
807 }
808 
809 static
810 int
811 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
812 {
813     int dowakeup;
814 
815     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
816 
817     msg->ms_target_port = port;
818     spin_lock(&port->mpu_spin);
819     _lwkt_pushmsg(port, msg);
820     dowakeup = 0;
821     if (port->mp_flags & MSGPORTF_WAITING) {
822 	port->mp_flags &= ~MSGPORTF_WAITING;
823 	dowakeup = 1;
824     }
825     spin_unlock(&port->mpu_spin);
826     if (dowakeup)
827 	wakeup(port);
828     return (EASYNC);
829 }
830 
831 static
832 int
833 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
834 {
835     lwkt_port_t port;
836     int sentabort;
837     int error;
838 
839     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
840 	    ("can't wait dropable message"));
841     port = msg->ms_reply_port;
842 
843     if ((msg->ms_flags & MSGF_DONE) == 0) {
844 	sentabort = 0;
845 	spin_lock(&port->mpu_spin);
846 	while ((msg->ms_flags & MSGF_DONE) == 0) {
847 	    void *won;
848 
849 	    /*
850 	     * If message was sent synchronously from the beginning
851 	     * the wakeup will be on the message structure, else it
852 	     * will be on the port structure.
853 	     *
854 	     * ms_flags needs atomic op originator vs target MSGF_QUEUED
855 	     */
856 	    if (msg->ms_flags & MSGF_SYNC) {
857 		won = msg;
858 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
859 	    } else {
860 		won = port;
861 		port->mp_flags |= MSGPORTF_WAITING;
862 	    }
863 
864 	    /*
865 	     * Only messages which support abort can be interrupted.
866 	     * We must still wait for message completion regardless.
867 	     */
868 	    if ((flags & PCATCH) && sentabort == 0) {
869 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
870 		if (error) {
871 		    sentabort = error;
872 		    spin_unlock(&port->mpu_spin);
873 		    lwkt_abortmsg(msg);
874 		    spin_lock(&port->mpu_spin);
875 		}
876 	    } else {
877 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
878 	    }
879 	    /* see note at the top on the MSGPORTF_WAITING flag */
880 	}
881 	/*
882 	 * Turn EINTR into ERESTART if the signal indicates.
883 	 */
884 	if (sentabort && msg->ms_error == EINTR)
885 	    msg->ms_error = sentabort;
886 	if (msg->ms_flags & MSGF_QUEUED)
887 	    _lwkt_pullmsg(port, msg);
888 	spin_unlock(&port->mpu_spin);
889     } else {
890 	spin_lock(&port->mpu_spin);
891 	if (msg->ms_flags & MSGF_QUEUED) {
892 	    _lwkt_pullmsg(port, msg);
893 	}
894 	spin_unlock(&port->mpu_spin);
895     }
896     return(msg->ms_error);
897 }
898 
899 static
900 void *
901 lwkt_spin_waitport(lwkt_port_t port, int flags)
902 {
903     lwkt_msg_t msg;
904     int error;
905 
906     spin_lock(&port->mpu_spin);
907     while ((msg = _lwkt_pollmsg(port)) == NULL) {
908 	port->mp_flags |= MSGPORTF_WAITING;
909 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
910 	/* see note at the top on the MSGPORTF_WAITING flag */
911 	if (error) {
912 	    spin_unlock(&port->mpu_spin);
913 	    return(NULL);
914 	}
915     }
916     _lwkt_pullmsg(port, msg);
917     spin_unlock(&port->mpu_spin);
918     return(msg);
919 }
920 
921 static
922 void
923 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
924 {
925     int dowakeup;
926 
927     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
928 
929     if (msg->ms_flags & MSGF_SYNC) {
930 	/*
931 	 * If a synchronous completion has been requested, just wakeup
932 	 * the message without bothering to queue it to the target port.
933 	 *
934 	 * ms_flags protected by reply port spinlock
935 	 */
936 	spin_lock(&port->mpu_spin);
937 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
938 	dowakeup = 0;
939 	if (msg->ms_flags & MSGF_WAITING) {
940 		msg->ms_flags &= ~MSGF_WAITING;
941 		dowakeup = 1;
942 	}
943 	spin_unlock(&port->mpu_spin);
944 	if (dowakeup)
945 		wakeup(msg);
946     } else {
947 	/*
948 	 * If an asynchronous completion has been requested the message
949 	 * must be queued to the reply port.
950 	 */
951 	spin_lock(&port->mpu_spin);
952 	_lwkt_enqueue_reply(port, msg);
953 	dowakeup = 0;
954 	if (port->mp_flags & MSGPORTF_WAITING) {
955 	    port->mp_flags &= ~MSGPORTF_WAITING;
956 	    dowakeup = 1;
957 	}
958 	spin_unlock(&port->mpu_spin);
959 	if (dowakeup)
960 	    wakeup(port);
961     }
962 }
963 
964 /*
965  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
966  *
967  * This function could _only_ be used when caller is in the same thread
968  * as the message's target port owner thread.
969  */
970 static int
971 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
972 {
973     int error;
974 
975     KASSERT(port->mpu_td == curthread,
976     	    ("message could only be dropped in the same thread "
977 	     "as the message target port thread\n"));
978     spin_lock(&port->mpu_spin);
979     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
980 	    _lwkt_pullmsg(port, msg);
981 	    msg->ms_flags |= MSGF_DONE;
982 	    error = 0;
983     } else {
984 	    error = ENOENT;
985     }
986     spin_unlock(&port->mpu_spin);
987 
988     return (error);
989 }
990 
991 /************************************************************************
992  *			  SERIALIZER PORT BACKEND			*
993  ************************************************************************
994  *
995  * This backend uses serializer to protect port accessing.  Callers are
996  * assumed to have serializer held.  This kind of port is usually created
997  * by network device driver along with _one_ lwkt thread to pipeline
998  * operations which may temporarily release serializer.
999  *
1000  * Implementation is based on SPIN PORT BACKEND.
1001  */
1002 
1003 static
1004 void *
1005 lwkt_serialize_getport(lwkt_port_t port)
1006 {
1007     lwkt_msg_t msg;
1008 
1009     ASSERT_SERIALIZED(port->mpu_serialize);
1010 
1011     if ((msg = _lwkt_pollmsg(port)) != NULL)
1012 	_lwkt_pullmsg(port, msg);
1013     return(msg);
1014 }
1015 
1016 static
1017 int
1018 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1019 {
1020     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1021     ASSERT_SERIALIZED(port->mpu_serialize);
1022 
1023     msg->ms_target_port = port;
1024     _lwkt_pushmsg(port, msg);
1025     if (port->mp_flags & MSGPORTF_WAITING) {
1026 	port->mp_flags &= ~MSGPORTF_WAITING;
1027 	wakeup(port);
1028     }
1029     return (EASYNC);
1030 }
1031 
1032 static
1033 int
1034 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1035 {
1036     lwkt_port_t port;
1037     int sentabort;
1038     int error;
1039 
1040     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1041 	    ("can't wait dropable message"));
1042 
1043     if ((msg->ms_flags & MSGF_DONE) == 0) {
1044 	port = msg->ms_reply_port;
1045 
1046 	ASSERT_SERIALIZED(port->mpu_serialize);
1047 
1048 	sentabort = 0;
1049 	while ((msg->ms_flags & MSGF_DONE) == 0) {
1050 	    void *won;
1051 
1052 	    /*
1053 	     * If message was sent synchronously from the beginning
1054 	     * the wakeup will be on the message structure, else it
1055 	     * will be on the port structure.
1056 	     */
1057 	    if (msg->ms_flags & MSGF_SYNC) {
1058 		won = msg;
1059 	    } else {
1060 		won = port;
1061 		port->mp_flags |= MSGPORTF_WAITING;
1062 	    }
1063 
1064 	    /*
1065 	     * Only messages which support abort can be interrupted.
1066 	     * We must still wait for message completion regardless.
1067 	     */
1068 	    if ((flags & PCATCH) && sentabort == 0) {
1069 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1070 		if (error) {
1071 		    sentabort = error;
1072 		    lwkt_serialize_exit(port->mpu_serialize);
1073 		    lwkt_abortmsg(msg);
1074 		    lwkt_serialize_enter(port->mpu_serialize);
1075 		}
1076 	    } else {
1077 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1078 	    }
1079 	    /* see note at the top on the MSGPORTF_WAITING flag */
1080 	}
1081 	/*
1082 	 * Turn EINTR into ERESTART if the signal indicates.
1083 	 */
1084 	if (sentabort && msg->ms_error == EINTR)
1085 	    msg->ms_error = sentabort;
1086 	if (msg->ms_flags & MSGF_QUEUED)
1087 	    _lwkt_pullmsg(port, msg);
1088     } else {
1089 	if (msg->ms_flags & MSGF_QUEUED) {
1090 	    port = msg->ms_reply_port;
1091 
1092 	    ASSERT_SERIALIZED(port->mpu_serialize);
1093 	    _lwkt_pullmsg(port, msg);
1094 	}
1095     }
1096     return(msg->ms_error);
1097 }
1098 
1099 static
1100 void *
1101 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1102 {
1103     lwkt_msg_t msg;
1104     int error;
1105 
1106     ASSERT_SERIALIZED(port->mpu_serialize);
1107 
1108     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1109 	port->mp_flags |= MSGPORTF_WAITING;
1110 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1111 	/* see note at the top on the MSGPORTF_WAITING flag */
1112 	if (error)
1113 	    return(NULL);
1114     }
1115     _lwkt_pullmsg(port, msg);
1116     return(msg);
1117 }
1118 
1119 static
1120 void
1121 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1122 {
1123     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1124     ASSERT_SERIALIZED(port->mpu_serialize);
1125 
1126     if (msg->ms_flags & MSGF_SYNC) {
1127 	/*
1128 	 * If a synchronous completion has been requested, just wakeup
1129 	 * the message without bothering to queue it to the target port.
1130 	 *
1131 	 * (both sides synchronized via serialized reply port)
1132 	 */
1133 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1134 	wakeup(msg);
1135     } else {
1136 	/*
1137 	 * If an asynchronous completion has been requested the message
1138 	 * must be queued to the reply port.
1139 	 */
1140 	_lwkt_enqueue_reply(port, msg);
1141 	if (port->mp_flags & MSGPORTF_WAITING) {
1142 	    port->mp_flags &= ~MSGPORTF_WAITING;
1143 	    wakeup(port);
1144 	}
1145     }
1146 }
1147 
1148 /************************************************************************
1149  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1150  ************************************************************************/
1151 
1152 /*
1153  * You can point a port's reply vector at this function if you just want
1154  * the message marked done, without any queueing or signaling.  This is
1155  * often used for structure-embedded messages.
1156  */
1157 static
1158 void
1159 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1160 {
1161     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1162 }
1163 
1164 static
1165 void *
1166 lwkt_panic_getport(lwkt_port_t port)
1167 {
1168     panic("lwkt_getport() illegal on port %p", port);
1169 }
1170 
1171 static
1172 int
1173 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1174 {
1175     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1176 }
1177 
1178 static
1179 int
1180 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1181 {
1182     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1183 }
1184 
1185 static
1186 void *
1187 lwkt_panic_waitport(lwkt_port_t port, int flags)
1188 {
1189     panic("port %p cannot be waited on", port);
1190 }
1191 
1192 static
1193 void
1194 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1195 {
1196     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1197 }
1198 
1199 static
1200 int
1201 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1202 {
1203     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1204     /* NOT REACHED */
1205     return (ENOENT);
1206 }
1207