xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 279dd846)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69 
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72 
73 /************************************************************************
74  *				MESSAGE FUNCTIONS			*
75  ************************************************************************/
76 
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
79 {
80     return port->mp_putport(port, msg);
81 }
82 
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
85 {
86     return port->mp_putport_oncpu(port, msg);
87 }
88 
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
91 {
92     KKASSERT(msg->ms_reply_port != NULL &&
93 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
95 }
96 
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
99 {
100     int error;
101 
102     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
103 	/*
104 	 * Target port opted to execute the message synchronously so
105 	 * queue the response.
106 	 */
107 	lwkt_replymsg(msg, error);
108     }
109 }
110 
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
113 {
114     int error;
115 
116     if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
117 	/*
118 	 * Target port opted to execute the message synchronously so
119 	 * queue the response.
120 	 */
121 	lwkt_replymsg(msg, error);
122     }
123 }
124 
125 /*
126  * lwkt_sendmsg()
127  *
128  *	Request asynchronous completion and call lwkt_beginmsg().  The
129  *	target port can opt to execute the message synchronously or
130  *	asynchronously and this function will automatically queue the
131  *	response if the target executes the message synchronously.
132  *
133  *	NOTE: The message is in an indeterminant state until this call
134  *	returns.  The caller should not mess with it (e.g. try to abort it)
135  *	until then.
136  *
137  *	NOTE: Do not use this function to forward a message as we might
138  *	clobber ms_flags in a SMP race.
139  */
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
142 {
143     _lwkt_sendmsg_prepare(port, msg);
144     _lwkt_sendmsg_start(port, msg);
145 }
146 
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
149 {
150     _lwkt_sendmsg_prepare(port, msg);
151     _lwkt_sendmsg_start_oncpu(port, msg);
152 }
153 
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
156 {
157     _lwkt_sendmsg_prepare(port, msg);
158 }
159 
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
162 {
163     _lwkt_sendmsg_start(port, msg);
164 }
165 
166 /*
167  * lwkt_domsg()
168  *
169  *	Request synchronous completion and call lwkt_beginmsg().  The
170  *	target port can opt to execute the message synchronously or
171  *	asynchronously and this function will automatically block and
172  *	wait for a response if the target executes the message
173  *	asynchronously.
174  *
175  *	NOTE: Do not use this function to forward a message as we might
176  *	clobber ms_flags in a SMP race.
177  */
178 int
179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
180 {
181     int error;
182 
183     KKASSERT(msg->ms_reply_port != NULL &&
184 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
185     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
186     msg->ms_flags |= MSGF_SYNC;
187     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
188 	/*
189 	 * Target port opted to execute the message asynchronously so
190 	 * block and wait for a reply.
191 	 */
192 	error = lwkt_waitmsg(msg, flags);
193     } else {
194 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
195     }
196     return(error);
197 }
198 
199 /*
200  * lwkt_forwardmsg()
201  *
202  * Forward a message received on one port to another port.
203  */
204 int
205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
206 {
207     int error;
208 
209     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
210     if ((error = port->mp_putport(port, msg)) != EASYNC)
211 	lwkt_replymsg(msg, error);
212     return(error);
213 }
214 
215 /*
216  * lwkt_abortmsg()
217  *
218  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
219  * The caller must ensure that the message will not be both replied AND
220  * destroyed while the abort is in progress.
221  *
222  * This function issues a callback which might block!
223  */
224 void
225 lwkt_abortmsg(lwkt_msg_t msg)
226 {
227     /*
228      * A critical section protects us from reply IPIs on this cpu.
229      */
230     crit_enter();
231 
232     /*
233      * Shortcut the operation if the message has already been returned.
234      * The callback typically constructs a lwkt_msg with the abort request,
235      * issues it synchronously, and waits for completion.  The callback
236      * is not required to actually abort the message and the target port,
237      * upon receiving an abort request message generated by the callback
238      * should check whether the original message has already completed or
239      * not.
240      */
241     if (msg->ms_flags & MSGF_ABORTABLE) {
242 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
243 	    msg->ms_abortfn(msg);
244     }
245     crit_exit();
246 }
247 
248 /************************************************************************
249  *			PORT INITIALIZATION API				*
250  ************************************************************************/
251 
252 static void *lwkt_thread_getport(lwkt_port_t port);
253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
258 
259 static void *lwkt_spin_getport(lwkt_port_t port);
260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
266 
267 static void *lwkt_serialize_getport(lwkt_port_t port);
268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
272 
273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
274 static void *lwkt_panic_getport(lwkt_port_t port);
275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
281 
282 /*
283  * Core port initialization (internal)
284  */
285 static __inline
286 void
287 _lwkt_initport(lwkt_port_t port,
288 	       void *(*gportfn)(lwkt_port_t),
289 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
290 	       int (*wmsgfn)(lwkt_msg_t, int),
291 	       void *(*wportfn)(lwkt_port_t, int),
292 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
293 	       int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
294 	       int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
295 {
296     bzero(port, sizeof(*port));
297     port->mp_cpuid = -1;
298     TAILQ_INIT(&port->mp_msgq);
299     TAILQ_INIT(&port->mp_msgq_prio);
300     port->mp_getport = gportfn;
301     port->mp_putport = pportfn;
302     port->mp_waitmsg = wmsgfn;
303     port->mp_waitport = wportfn;
304     port->mp_replyport = rportfn;
305     port->mp_dropmsg = dmsgfn;
306     port->mp_putport_oncpu = pportfn_oncpu;
307 }
308 
309 /*
310  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
311  * we tell the scheduler not to reschedule if td is at a higher priority.
312  *
313  * This routine is called even if the thread is already scheduled.
314  */
315 static __inline
316 void
317 _lwkt_schedule_msg(thread_t td, int flags)
318 {
319     lwkt_schedule(td);
320 }
321 
322 /*
323  * lwkt_initport_thread()
324  *
325  *	Initialize a port for use by a particular thread.  The port may
326  *	only be used by <td>.
327  */
328 void
329 lwkt_initport_thread(lwkt_port_t port, thread_t td)
330 {
331     _lwkt_initport(port,
332 		   lwkt_thread_getport,
333 		   lwkt_thread_putport,
334 		   lwkt_thread_waitmsg,
335 		   lwkt_thread_waitport,
336 		   lwkt_thread_replyport,
337 		   lwkt_thread_dropmsg,
338 		   lwkt_thread_putport);
339     port->mpu_td = td;
340 }
341 
342 /*
343  * lwkt_initport_spin()
344  *
345  *	Initialize a port for use with descriptors that might be accessed
346  *	via multiple LWPs, processes, or threads.  Has somewhat more
347  *	overhead then thread ports.
348  */
349 void
350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
351 {
352     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
353     int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
354 
355     if (td == NULL)
356 	dmsgfn = lwkt_panic_dropmsg;
357     else
358 	dmsgfn = lwkt_spin_dropmsg;
359 
360     if (fixed_cpuid)
361 	pportfn_oncpu = lwkt_spin_putport_oncpu;
362     else
363 	pportfn_oncpu = lwkt_panic_putport_oncpu;
364 
365     _lwkt_initport(port,
366 		   lwkt_spin_getport,
367 		   lwkt_spin_putport,
368 		   lwkt_spin_waitmsg,
369 		   lwkt_spin_waitport,
370 		   lwkt_spin_replyport,
371 		   dmsgfn,
372 		   pportfn_oncpu);
373     spin_init(&port->mpu_spin, "lwktinitport");
374     port->mpu_td = td;
375     if (fixed_cpuid)
376 	port->mp_cpuid = td->td_gd->gd_cpuid;
377 }
378 
379 /*
380  * lwkt_initport_serialize()
381  *
382  *	Initialize a port for use with descriptors that might be accessed
383  *	via multiple LWPs, processes, or threads.  Callers are assumed to
384  *	have held the serializer (slz).
385  */
386 void
387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
388 {
389     _lwkt_initport(port,
390 		   lwkt_serialize_getport,
391 		   lwkt_serialize_putport,
392 		   lwkt_serialize_waitmsg,
393 		   lwkt_serialize_waitport,
394 		   lwkt_serialize_replyport,
395 		   lwkt_panic_dropmsg,
396 		   lwkt_panic_putport_oncpu);
397     port->mpu_serialize = slz;
398 }
399 
400 /*
401  * Similar to the standard initport, this function simply marks the message
402  * as being done and does not attempt to return it to an originating port.
403  */
404 void
405 lwkt_initport_replyonly_null(lwkt_port_t port)
406 {
407     _lwkt_initport(port,
408 		   lwkt_panic_getport,
409 		   lwkt_panic_putport,
410 		   lwkt_panic_waitmsg,
411 		   lwkt_panic_waitport,
412 		   lwkt_null_replyport,
413 		   lwkt_panic_dropmsg,
414 		   lwkt_panic_putport_oncpu);
415 }
416 
417 /*
418  * Initialize a reply-only port, typically used as a message sink.  Such
419  * ports can only be used as a reply port.
420  */
421 void
422 lwkt_initport_replyonly(lwkt_port_t port,
423 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
424 {
425     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
426 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
427 			 rportfn, lwkt_panic_dropmsg,
428 			 lwkt_panic_putport_oncpu);
429 }
430 
431 void
432 lwkt_initport_putonly(lwkt_port_t port,
433 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
434 {
435     _lwkt_initport(port, lwkt_panic_getport, pportfn,
436 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
437 			 lwkt_panic_replyport, lwkt_panic_dropmsg,
438 			 lwkt_panic_putport_oncpu);
439 }
440 
441 void
442 lwkt_initport_panic(lwkt_port_t port)
443 {
444     _lwkt_initport(port,
445 		   lwkt_panic_getport, lwkt_panic_putport,
446 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
447 		   lwkt_panic_replyport, lwkt_panic_dropmsg,
448 		   lwkt_panic_putport_oncpu);
449 }
450 
451 static __inline
452 void
453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
454 {
455     lwkt_msg_queue *queue;
456 
457     /*
458      * normal case, remove and return the message.
459      */
460     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
461 	queue = &port->mp_msgq_prio;
462     else
463 	queue = &port->mp_msgq;
464     TAILQ_REMOVE(queue, msg, ms_node);
465 
466     /*
467      * atomic op needed for spin ports
468      */
469     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
470 }
471 
472 static __inline
473 void
474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
475 {
476     lwkt_msg_queue *queue;
477 
478     /*
479      * atomic op needed for spin ports
480      */
481     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
482     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
483 	queue = &port->mp_msgq_prio;
484     else
485     	queue = &port->mp_msgq;
486     TAILQ_INSERT_TAIL(queue, msg, ms_node);
487 
488     if (msg->ms_flags & MSGF_RECEIPT) {
489 	/*
490 	 * In case this message is forwarded later, the receipt
491 	 * flag was cleared once the receipt function is called.
492 	 */
493 	atomic_clear_int(&msg->ms_flags, MSGF_RECEIPT);
494 	msg->ms_receiptfn(msg, port);
495     }
496 }
497 
498 static __inline
499 lwkt_msg_t
500 _lwkt_pollmsg(lwkt_port_t port)
501 {
502     lwkt_msg_t msg;
503 
504     msg = TAILQ_FIRST(&port->mp_msgq_prio);
505     if (__predict_false(msg != NULL))
506 	return msg;
507 
508     /*
509      * Priority queue has no message, fallback to non-priority queue.
510      */
511     return TAILQ_FIRST(&port->mp_msgq);
512 }
513 
514 static __inline
515 void
516 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
517 {
518     /*
519      * atomic op needed for spin ports
520      */
521     _lwkt_pushmsg(port, msg);
522     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
523 }
524 
525 /************************************************************************
526  *			THREAD PORT BACKEND				*
527  ************************************************************************
528  *
529  * This backend is used when the port a message is retrieved from is owned
530  * by a single thread (the calling thread).  Messages are IPId to the
531  * correct cpu before being enqueued to a port.  Note that this is fairly
532  * optimal since scheduling would have had to do an IPI anyway if the
533  * message were headed to a different cpu.
534  */
535 
536 /*
537  * This function completes reply processing for the default case in the
538  * context of the originating cpu.
539  */
540 static
541 void
542 lwkt_thread_replyport_remote(lwkt_msg_t msg)
543 {
544     lwkt_port_t port = msg->ms_reply_port;
545     int flags;
546 
547     /*
548      * Chase any thread migration that occurs
549      */
550     if (port->mpu_td->td_gd != mycpu) {
551 	lwkt_send_ipiq(port->mpu_td->td_gd,
552 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
553 	return;
554     }
555 
556     /*
557      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
558      */
559 #ifdef INVARIANTS
560     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
561     msg->ms_flags &= ~MSGF_INTRANSIT;
562 #endif
563     flags = msg->ms_flags;
564     if (msg->ms_flags & MSGF_SYNC) {
565 	cpu_sfence();
566 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
567     } else {
568 	_lwkt_enqueue_reply(port, msg);
569     }
570     if (port->mp_flags & MSGPORTF_WAITING)
571 	_lwkt_schedule_msg(port->mpu_td, flags);
572 }
573 
574 /*
575  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
576  *
577  * Called with the reply port as an argument but in the context of the
578  * original target port.  Completion must occur on the target port's
579  * cpu.
580  *
581  * The critical section protects us from IPIs on the this CPU.
582  */
583 static
584 void
585 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
586 {
587     int flags;
588 
589     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
590 
591     if (msg->ms_flags & MSGF_SYNC) {
592 	/*
593 	 * If a synchronous completion has been requested, just wakeup
594 	 * the message without bothering to queue it to the target port.
595 	 *
596 	 * Assume the target thread is non-preemptive, so no critical
597 	 * section is required.
598 	 */
599 	if (port->mpu_td->td_gd == mycpu) {
600 	    crit_enter();
601 	    flags = msg->ms_flags;
602 	    cpu_sfence();
603 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
604 	    if (port->mp_flags & MSGPORTF_WAITING)
605 		_lwkt_schedule_msg(port->mpu_td, flags);
606 	    crit_exit();
607 	} else {
608 #ifdef INVARIANTS
609 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
610 #endif
611 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
612 	    lwkt_send_ipiq(port->mpu_td->td_gd,
613 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
614 	}
615     } else {
616 	/*
617 	 * If an asynchronous completion has been requested the message
618 	 * must be queued to the reply port.
619 	 *
620 	 * A critical section is required to interlock the port queue.
621 	 */
622 	if (port->mpu_td->td_gd == mycpu) {
623 	    crit_enter();
624 	    _lwkt_enqueue_reply(port, msg);
625 	    if (port->mp_flags & MSGPORTF_WAITING)
626 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
627 	    crit_exit();
628 	} else {
629 #ifdef INVARIANTS
630 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
631 #endif
632 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
633 	    lwkt_send_ipiq(port->mpu_td->td_gd,
634 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
635 	}
636     }
637 }
638 
639 /*
640  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
641  *
642  * This function could _only_ be used when caller is in the same thread
643  * as the message's target port owner thread.
644  */
645 static int
646 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
647 {
648     int error;
649 
650     KASSERT(port->mpu_td == curthread,
651     	    ("message could only be dropped in the same thread "
652 	     "as the message target port thread"));
653     crit_enter_quick(port->mpu_td);
654     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
655 	    _lwkt_pullmsg(port, msg);
656 	    atomic_set_int(&msg->ms_flags, MSGF_DONE);
657 	    error = 0;
658     } else {
659 	    error = ENOENT;
660     }
661     crit_exit_quick(port->mpu_td);
662 
663     return (error);
664 }
665 
666 /*
667  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
668  *
669  * Called with the target port as an argument but in the context of the
670  * reply port.  This function always implements an asynchronous put to
671  * the target message port, and thus returns EASYNC.
672  *
673  * The message must already have cleared MSGF_DONE and MSGF_REPLY
674  */
675 static
676 void
677 lwkt_thread_putport_remote(lwkt_msg_t msg)
678 {
679     lwkt_port_t port = msg->ms_target_port;
680 
681     /*
682      * Chase any thread migration that occurs
683      */
684     if (port->mpu_td->td_gd != mycpu) {
685 	lwkt_send_ipiq(port->mpu_td->td_gd,
686 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
687 	return;
688     }
689 
690     /*
691      * An atomic op is needed on ms_flags vs originator.  Also
692      * note that the originator might be using a different type
693      * of msgport.
694      */
695 #ifdef INVARIANTS
696     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
697     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
698 #endif
699     _lwkt_pushmsg(port, msg);
700     if (port->mp_flags & MSGPORTF_WAITING)
701 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
702 }
703 
704 static
705 int
706 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
707 {
708     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
709 
710     msg->ms_target_port = port;
711     if (port->mpu_td->td_gd == mycpu) {
712 	crit_enter();
713 	_lwkt_pushmsg(port, msg);
714 	if (port->mp_flags & MSGPORTF_WAITING)
715 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
716 	crit_exit();
717     } else {
718 #ifdef INVARIANTS
719 	/*
720 	 * Cleanup.
721 	 *
722 	 * An atomic op is needed on ms_flags vs originator.  Also
723 	 * note that the originator might be using a different type
724 	 * of msgport.
725 	 */
726 	atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
727 #endif
728 	lwkt_send_ipiq(port->mpu_td->td_gd,
729 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
730     }
731     return (EASYNC);
732 }
733 
734 /*
735  * lwkt_thread_getport()
736  *
737  *	Retrieve the next message from the port or NULL if no messages
738  *	are ready.
739  */
740 static
741 void *
742 lwkt_thread_getport(lwkt_port_t port)
743 {
744     lwkt_msg_t msg;
745 
746     KKASSERT(port->mpu_td == curthread);
747 
748     crit_enter_quick(port->mpu_td);
749     if ((msg = _lwkt_pollmsg(port)) != NULL)
750 	_lwkt_pullmsg(port, msg);
751     crit_exit_quick(port->mpu_td);
752     return(msg);
753 }
754 
755 /*
756  * lwkt_thread_waitmsg()
757  *
758  *	Wait for a particular message to be replied.  We must be the only
759  *	thread waiting on the message.  The port must be owned by the
760  *	caller.
761  */
762 static
763 int
764 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
765 {
766     thread_t td = curthread;
767 
768     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
769 	    ("can't wait dropable message"));
770 
771     if ((msg->ms_flags & MSGF_DONE) == 0) {
772 	/*
773 	 * If the done bit was not set we have to block until it is.
774 	 */
775 	lwkt_port_t port = msg->ms_reply_port;
776 	int sentabort;
777 
778 	KKASSERT(port->mpu_td == td);
779 	crit_enter_quick(td);
780 	sentabort = 0;
781 
782 	while ((msg->ms_flags & MSGF_DONE) == 0) {
783 	    port->mp_flags |= MSGPORTF_WAITING;	/* same cpu */
784 	    if (sentabort == 0) {
785 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
786 		    lwkt_abortmsg(msg);
787 		}
788 	    } else {
789 		lwkt_sleep("waitabt", 0);
790 	    }
791 	    port->mp_flags &= ~MSGPORTF_WAITING;
792 	}
793 	if (msg->ms_flags & MSGF_QUEUED)
794 	    _lwkt_pullmsg(port, msg);
795 	crit_exit_quick(td);
796     } else {
797 	/*
798 	 * If the done bit was set we only have to mess around with the
799 	 * message if it is queued on the reply port.
800 	 */
801 	crit_enter_quick(td);
802 	if (msg->ms_flags & MSGF_QUEUED) {
803 	    lwkt_port_t port = msg->ms_reply_port;
804 	    thread_t td __debugvar = curthread;
805 
806 	    KKASSERT(port->mpu_td == td);
807 	    _lwkt_pullmsg(port, msg);
808 	}
809 	crit_exit_quick(td);
810     }
811     return(msg->ms_error);
812 }
813 
814 /*
815  * lwkt_thread_waitport()
816  *
817  *	Wait for a new message to be available on the port.  We must be the
818  *	the only thread waiting on the port.  The port must be owned by caller.
819  */
820 static
821 void *
822 lwkt_thread_waitport(lwkt_port_t port, int flags)
823 {
824     thread_t td = curthread;
825     lwkt_msg_t msg;
826     int error;
827 
828     KKASSERT(port->mpu_td == td);
829     crit_enter_quick(td);
830     while ((msg = _lwkt_pollmsg(port)) == NULL) {
831 	port->mp_flags |= MSGPORTF_WAITING;
832 	error = lwkt_sleep("waitport", flags);
833 	port->mp_flags &= ~MSGPORTF_WAITING;
834 	if (error)
835 	    goto done;
836     }
837     _lwkt_pullmsg(port, msg);
838 done:
839     crit_exit_quick(td);
840     return(msg);
841 }
842 
843 /************************************************************************
844  *			   SPIN PORT BACKEND				*
845  ************************************************************************
846  *
847  * This backend uses spinlocks instead of making assumptions about which
848  * thread is accessing the port.  It must be used when a port is not owned
849  * by a particular thread.  This is less optimal then thread ports but
850  * you don't have a choice if there are multiple threads accessing the port.
851  *
852  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
853  * on the message port, it is the responsibility of the code doing the
854  * wakeup to clear this flag rather then the blocked threads.  Some
855  * superfluous wakeups may occur, which is ok.
856  *
857  * XXX synchronous message wakeups are not current optimized.
858  */
859 
860 static
861 void *
862 lwkt_spin_getport(lwkt_port_t port)
863 {
864     lwkt_msg_t msg;
865 
866     spin_lock(&port->mpu_spin);
867     if ((msg = _lwkt_pollmsg(port)) != NULL)
868 	_lwkt_pullmsg(port, msg);
869     spin_unlock(&port->mpu_spin);
870     return(msg);
871 }
872 
873 static __inline int
874 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
875 {
876     int dowakeup;
877 
878     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
879 
880     msg->ms_target_port = port;
881     spin_lock(&port->mpu_spin);
882     _lwkt_pushmsg(port, msg);
883     dowakeup = 0;
884     if (port->mp_flags & MSGPORTF_WAITING) {
885 	port->mp_flags &= ~MSGPORTF_WAITING;
886 	dowakeup = 1;
887     }
888     spin_unlock(&port->mpu_spin);
889 
890     return dowakeup;
891 }
892 
893 static
894 int
895 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
896 {
897     if (lwkt_spin_putport_only(port, msg))
898 	wakeup(port);
899     return (EASYNC);
900 }
901 
902 static
903 int
904 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
905 {
906     KASSERT(port->mp_cpuid == mycpuid,
907         ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
908 	 port->mp_cpuid, mycpuid));
909     if (lwkt_spin_putport_only(port, msg))
910 	wakeup_mycpu(port);
911     return (EASYNC);
912 }
913 
914 static
915 int
916 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
917 {
918     lwkt_port_t port;
919     int sentabort;
920     int error;
921 
922     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
923 	    ("can't wait dropable message"));
924     port = msg->ms_reply_port;
925 
926     if ((msg->ms_flags & MSGF_DONE) == 0) {
927 	sentabort = 0;
928 	spin_lock(&port->mpu_spin);
929 	while ((msg->ms_flags & MSGF_DONE) == 0) {
930 	    void *won;
931 
932 	    /*
933 	     * If message was sent synchronously from the beginning
934 	     * the wakeup will be on the message structure, else it
935 	     * will be on the port structure.
936 	     *
937 	     * ms_flags needs atomic op originator vs target MSGF_QUEUED
938 	     */
939 	    if (msg->ms_flags & MSGF_SYNC) {
940 		won = msg;
941 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
942 	    } else {
943 		won = port;
944 		port->mp_flags |= MSGPORTF_WAITING;
945 	    }
946 
947 	    /*
948 	     * Only messages which support abort can be interrupted.
949 	     * We must still wait for message completion regardless.
950 	     */
951 	    if ((flags & PCATCH) && sentabort == 0) {
952 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
953 		if (error) {
954 		    sentabort = error;
955 		    spin_unlock(&port->mpu_spin);
956 		    lwkt_abortmsg(msg);
957 		    spin_lock(&port->mpu_spin);
958 		}
959 	    } else {
960 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
961 	    }
962 	    /* see note at the top on the MSGPORTF_WAITING flag */
963 	}
964 	/*
965 	 * Turn EINTR into ERESTART if the signal indicates.
966 	 */
967 	if (sentabort && msg->ms_error == EINTR)
968 	    msg->ms_error = sentabort;
969 	if (msg->ms_flags & MSGF_QUEUED)
970 	    _lwkt_pullmsg(port, msg);
971 	spin_unlock(&port->mpu_spin);
972     } else {
973 	spin_lock(&port->mpu_spin);
974 	if (msg->ms_flags & MSGF_QUEUED) {
975 	    _lwkt_pullmsg(port, msg);
976 	}
977 	spin_unlock(&port->mpu_spin);
978     }
979     return(msg->ms_error);
980 }
981 
982 static
983 void *
984 lwkt_spin_waitport(lwkt_port_t port, int flags)
985 {
986     lwkt_msg_t msg;
987     int error;
988 
989     spin_lock(&port->mpu_spin);
990     while ((msg = _lwkt_pollmsg(port)) == NULL) {
991 	port->mp_flags |= MSGPORTF_WAITING;
992 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
993 	/* see note at the top on the MSGPORTF_WAITING flag */
994 	if (error) {
995 	    spin_unlock(&port->mpu_spin);
996 	    return(NULL);
997 	}
998     }
999     _lwkt_pullmsg(port, msg);
1000     spin_unlock(&port->mpu_spin);
1001     return(msg);
1002 }
1003 
1004 static
1005 void
1006 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
1007 {
1008     int dowakeup;
1009 
1010     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1011 
1012     if (msg->ms_flags & MSGF_SYNC) {
1013 	/*
1014 	 * If a synchronous completion has been requested, just wakeup
1015 	 * the message without bothering to queue it to the target port.
1016 	 *
1017 	 * ms_flags protected by reply port spinlock
1018 	 */
1019 	spin_lock(&port->mpu_spin);
1020 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1021 	dowakeup = 0;
1022 	if (msg->ms_flags & MSGF_WAITING) {
1023 		msg->ms_flags &= ~MSGF_WAITING;
1024 		dowakeup = 1;
1025 	}
1026 	spin_unlock(&port->mpu_spin);
1027 	if (dowakeup)
1028 		wakeup(msg);
1029     } else {
1030 	/*
1031 	 * If an asynchronous completion has been requested the message
1032 	 * must be queued to the reply port.
1033 	 */
1034 	spin_lock(&port->mpu_spin);
1035 	_lwkt_enqueue_reply(port, msg);
1036 	dowakeup = 0;
1037 	if (port->mp_flags & MSGPORTF_WAITING) {
1038 	    port->mp_flags &= ~MSGPORTF_WAITING;
1039 	    dowakeup = 1;
1040 	}
1041 	spin_unlock(&port->mpu_spin);
1042 	if (dowakeup)
1043 	    wakeup(port);
1044     }
1045 }
1046 
1047 /*
1048  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1049  *
1050  * This function could _only_ be used when caller is in the same thread
1051  * as the message's target port owner thread.
1052  */
1053 static int
1054 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1055 {
1056     int error;
1057 
1058     KASSERT(port->mpu_td == curthread,
1059     	    ("message could only be dropped in the same thread "
1060 	     "as the message target port thread\n"));
1061     spin_lock(&port->mpu_spin);
1062     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1063 	    _lwkt_pullmsg(port, msg);
1064 	    msg->ms_flags |= MSGF_DONE;
1065 	    error = 0;
1066     } else {
1067 	    error = ENOENT;
1068     }
1069     spin_unlock(&port->mpu_spin);
1070 
1071     return (error);
1072 }
1073 
1074 /************************************************************************
1075  *			  SERIALIZER PORT BACKEND			*
1076  ************************************************************************
1077  *
1078  * This backend uses serializer to protect port accessing.  Callers are
1079  * assumed to have serializer held.  This kind of port is usually created
1080  * by network device driver along with _one_ lwkt thread to pipeline
1081  * operations which may temporarily release serializer.
1082  *
1083  * Implementation is based on SPIN PORT BACKEND.
1084  */
1085 
1086 static
1087 void *
1088 lwkt_serialize_getport(lwkt_port_t port)
1089 {
1090     lwkt_msg_t msg;
1091 
1092     ASSERT_SERIALIZED(port->mpu_serialize);
1093 
1094     if ((msg = _lwkt_pollmsg(port)) != NULL)
1095 	_lwkt_pullmsg(port, msg);
1096     return(msg);
1097 }
1098 
1099 static
1100 int
1101 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1102 {
1103     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1104     ASSERT_SERIALIZED(port->mpu_serialize);
1105 
1106     msg->ms_target_port = port;
1107     _lwkt_pushmsg(port, msg);
1108     if (port->mp_flags & MSGPORTF_WAITING) {
1109 	port->mp_flags &= ~MSGPORTF_WAITING;
1110 	wakeup(port);
1111     }
1112     return (EASYNC);
1113 }
1114 
1115 static
1116 int
1117 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1118 {
1119     lwkt_port_t port;
1120     int sentabort;
1121     int error;
1122 
1123     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1124 	    ("can't wait dropable message"));
1125 
1126     if ((msg->ms_flags & MSGF_DONE) == 0) {
1127 	port = msg->ms_reply_port;
1128 
1129 	ASSERT_SERIALIZED(port->mpu_serialize);
1130 
1131 	sentabort = 0;
1132 	while ((msg->ms_flags & MSGF_DONE) == 0) {
1133 	    void *won;
1134 
1135 	    /*
1136 	     * If message was sent synchronously from the beginning
1137 	     * the wakeup will be on the message structure, else it
1138 	     * will be on the port structure.
1139 	     */
1140 	    if (msg->ms_flags & MSGF_SYNC) {
1141 		won = msg;
1142 	    } else {
1143 		won = port;
1144 		port->mp_flags |= MSGPORTF_WAITING;
1145 	    }
1146 
1147 	    /*
1148 	     * Only messages which support abort can be interrupted.
1149 	     * We must still wait for message completion regardless.
1150 	     */
1151 	    if ((flags & PCATCH) && sentabort == 0) {
1152 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1153 		if (error) {
1154 		    sentabort = error;
1155 		    lwkt_serialize_exit(port->mpu_serialize);
1156 		    lwkt_abortmsg(msg);
1157 		    lwkt_serialize_enter(port->mpu_serialize);
1158 		}
1159 	    } else {
1160 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1161 	    }
1162 	    /* see note at the top on the MSGPORTF_WAITING flag */
1163 	}
1164 	/*
1165 	 * Turn EINTR into ERESTART if the signal indicates.
1166 	 */
1167 	if (sentabort && msg->ms_error == EINTR)
1168 	    msg->ms_error = sentabort;
1169 	if (msg->ms_flags & MSGF_QUEUED)
1170 	    _lwkt_pullmsg(port, msg);
1171     } else {
1172 	if (msg->ms_flags & MSGF_QUEUED) {
1173 	    port = msg->ms_reply_port;
1174 
1175 	    ASSERT_SERIALIZED(port->mpu_serialize);
1176 	    _lwkt_pullmsg(port, msg);
1177 	}
1178     }
1179     return(msg->ms_error);
1180 }
1181 
1182 static
1183 void *
1184 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1185 {
1186     lwkt_msg_t msg;
1187     int error;
1188 
1189     ASSERT_SERIALIZED(port->mpu_serialize);
1190 
1191     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1192 	port->mp_flags |= MSGPORTF_WAITING;
1193 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1194 	/* see note at the top on the MSGPORTF_WAITING flag */
1195 	if (error)
1196 	    return(NULL);
1197     }
1198     _lwkt_pullmsg(port, msg);
1199     return(msg);
1200 }
1201 
1202 static
1203 void
1204 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1205 {
1206     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1207     ASSERT_SERIALIZED(port->mpu_serialize);
1208 
1209     if (msg->ms_flags & MSGF_SYNC) {
1210 	/*
1211 	 * If a synchronous completion has been requested, just wakeup
1212 	 * the message without bothering to queue it to the target port.
1213 	 *
1214 	 * (both sides synchronized via serialized reply port)
1215 	 */
1216 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1217 	wakeup(msg);
1218     } else {
1219 	/*
1220 	 * If an asynchronous completion has been requested the message
1221 	 * must be queued to the reply port.
1222 	 */
1223 	_lwkt_enqueue_reply(port, msg);
1224 	if (port->mp_flags & MSGPORTF_WAITING) {
1225 	    port->mp_flags &= ~MSGPORTF_WAITING;
1226 	    wakeup(port);
1227 	}
1228     }
1229 }
1230 
1231 /************************************************************************
1232  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1233  ************************************************************************/
1234 
1235 /*
1236  * You can point a port's reply vector at this function if you just want
1237  * the message marked done, without any queueing or signaling.  This is
1238  * often used for structure-embedded messages.
1239  */
1240 static
1241 void
1242 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1243 {
1244     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1245 }
1246 
1247 static
1248 void *
1249 lwkt_panic_getport(lwkt_port_t port)
1250 {
1251     panic("lwkt_getport() illegal on port %p", port);
1252 }
1253 
1254 static
1255 int
1256 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1257 {
1258     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1259 }
1260 
1261 static
1262 int
1263 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1264 {
1265     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1266 }
1267 
1268 static
1269 void *
1270 lwkt_panic_waitport(lwkt_port_t port, int flags)
1271 {
1272     panic("port %p cannot be waited on", port);
1273 }
1274 
1275 static
1276 void
1277 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1278 {
1279     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1280 }
1281 
1282 static
1283 int
1284 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1285 {
1286     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1287     /* NOT REACHED */
1288     return (ENOENT);
1289 }
1290 
1291 static
1292 int
1293 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1294 {
1295     panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1296         port, msg);
1297     /* NOT REACHED */
1298     return (ENOENT);
1299 }
1300