xref: /dragonfly/sys/kern/lwkt_msgport.c (revision d9f85b33)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69 
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72 
73 /************************************************************************
74  *				MESSAGE FUNCTIONS			*
75  ************************************************************************/
76 
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
79 {
80     return port->mp_putport(port, msg);
81 }
82 
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
85 {
86     return port->mp_putport_oncpu(port, msg);
87 }
88 
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
91 {
92     KKASSERT(msg->ms_reply_port != NULL &&
93 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
95 }
96 
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
99 {
100     int error;
101 
102     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
103 	/*
104 	 * Target port opted to execute the message synchronously so
105 	 * queue the response.
106 	 */
107 	lwkt_replymsg(msg, error);
108     }
109 }
110 
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
113 {
114     int error;
115 
116     if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
117 	/*
118 	 * Target port opted to execute the message synchronously so
119 	 * queue the response.
120 	 */
121 	lwkt_replymsg(msg, error);
122     }
123 }
124 
125 /*
126  * lwkt_sendmsg()
127  *
128  *	Request asynchronous completion and call lwkt_beginmsg().  The
129  *	target port can opt to execute the message synchronously or
130  *	asynchronously and this function will automatically queue the
131  *	response if the target executes the message synchronously.
132  *
133  *	NOTE: The message is in an indeterminant state until this call
134  *	returns.  The caller should not mess with it (e.g. try to abort it)
135  *	until then.
136  *
137  *	NOTE: Do not use this function to forward a message as we might
138  *	clobber ms_flags in a SMP race.
139  */
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
142 {
143     _lwkt_sendmsg_prepare(port, msg);
144     _lwkt_sendmsg_start(port, msg);
145 }
146 
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
149 {
150     _lwkt_sendmsg_prepare(port, msg);
151     _lwkt_sendmsg_start_oncpu(port, msg);
152 }
153 
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
156 {
157     _lwkt_sendmsg_prepare(port, msg);
158 }
159 
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
162 {
163     _lwkt_sendmsg_start(port, msg);
164 }
165 
166 void
167 lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
168 {
169     _lwkt_sendmsg_start_oncpu(port, msg);
170 }
171 
172 /*
173  * lwkt_domsg()
174  *
175  *	Request synchronous completion and call lwkt_beginmsg().  The
176  *	target port can opt to execute the message synchronously or
177  *	asynchronously and this function will automatically block and
178  *	wait for a response if the target executes the message
179  *	asynchronously.
180  *
181  *	NOTE: Do not use this function to forward a message as we might
182  *	clobber ms_flags in a SMP race.
183  */
184 int
185 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
186 {
187     int error;
188 
189     KKASSERT(msg->ms_reply_port != NULL &&
190 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
191     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
192     msg->ms_flags |= MSGF_SYNC;
193     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
194 	/*
195 	 * Target port opted to execute the message asynchronously so
196 	 * block and wait for a reply.
197 	 */
198 	error = lwkt_waitmsg(msg, flags);
199     } else {
200 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
201     }
202     return(error);
203 }
204 
205 /*
206  * lwkt_forwardmsg()
207  *
208  * Forward a message received on one port to another port.
209  */
210 int
211 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
212 {
213     int error;
214 
215     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
216     if ((error = port->mp_putport(port, msg)) != EASYNC)
217 	lwkt_replymsg(msg, error);
218     return(error);
219 }
220 
221 /*
222  * lwkt_abortmsg()
223  *
224  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
225  * The caller must ensure that the message will not be both replied AND
226  * destroyed while the abort is in progress.
227  *
228  * This function issues a callback which might block!
229  */
230 void
231 lwkt_abortmsg(lwkt_msg_t msg)
232 {
233     /*
234      * A critical section protects us from reply IPIs on this cpu.
235      */
236     crit_enter();
237 
238     /*
239      * Shortcut the operation if the message has already been returned.
240      * The callback typically constructs a lwkt_msg with the abort request,
241      * issues it synchronously, and waits for completion.  The callback
242      * is not required to actually abort the message and the target port,
243      * upon receiving an abort request message generated by the callback
244      * should check whether the original message has already completed or
245      * not.
246      */
247     if (msg->ms_flags & MSGF_ABORTABLE) {
248 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
249 	    msg->ms_abortfn(msg);
250     }
251     crit_exit();
252 }
253 
254 /************************************************************************
255  *			PORT INITIALIZATION API				*
256  ************************************************************************/
257 
258 static void *lwkt_thread_getport(lwkt_port_t port);
259 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
260 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
261 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
262 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
263 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
264 
265 static void *lwkt_spin_getport(lwkt_port_t port);
266 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
267 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
268 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
269 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
270 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
271 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
272 
273 static void *lwkt_serialize_getport(lwkt_port_t port);
274 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
275 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
276 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
277 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
278 
279 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
280 static void *lwkt_panic_getport(lwkt_port_t port);
281 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
282 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
283 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
284 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
285 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
286 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
287 
288 /*
289  * Core port initialization (internal)
290  */
291 static __inline
292 void
293 _lwkt_initport(lwkt_port_t port,
294 	       void *(*gportfn)(lwkt_port_t),
295 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
296 	       int (*wmsgfn)(lwkt_msg_t, int),
297 	       void *(*wportfn)(lwkt_port_t, int),
298 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
299 	       int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
300 	       int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
301 {
302     bzero(port, sizeof(*port));
303     port->mp_cpuid = -1;
304     TAILQ_INIT(&port->mp_msgq);
305     TAILQ_INIT(&port->mp_msgq_prio);
306     port->mp_getport = gportfn;
307     port->mp_putport = pportfn;
308     port->mp_waitmsg = wmsgfn;
309     port->mp_waitport = wportfn;
310     port->mp_replyport = rportfn;
311     port->mp_dropmsg = dmsgfn;
312     port->mp_putport_oncpu = pportfn_oncpu;
313 }
314 
315 /*
316  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
317  * we tell the scheduler not to reschedule if td is at a higher priority.
318  *
319  * This routine is called even if the thread is already scheduled.
320  */
321 static __inline
322 void
323 _lwkt_schedule_msg(thread_t td, int flags)
324 {
325     lwkt_schedule(td);
326 }
327 
328 /*
329  * lwkt_initport_thread()
330  *
331  *	Initialize a port for use by a particular thread.  The port may
332  *	only be used by <td>.
333  */
334 void
335 lwkt_initport_thread(lwkt_port_t port, thread_t td)
336 {
337     _lwkt_initport(port,
338 		   lwkt_thread_getport,
339 		   lwkt_thread_putport,
340 		   lwkt_thread_waitmsg,
341 		   lwkt_thread_waitport,
342 		   lwkt_thread_replyport,
343 		   lwkt_thread_dropmsg,
344 		   lwkt_thread_putport);
345     port->mpu_td = td;
346 }
347 
348 /*
349  * lwkt_initport_spin()
350  *
351  *	Initialize a port for use with descriptors that might be accessed
352  *	via multiple LWPs, processes, or threads.  Has somewhat more
353  *	overhead then thread ports.
354  */
355 void
356 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
357 {
358     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
359     int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
360 
361     if (td == NULL)
362 	dmsgfn = lwkt_panic_dropmsg;
363     else
364 	dmsgfn = lwkt_spin_dropmsg;
365 
366     if (fixed_cpuid)
367 	pportfn_oncpu = lwkt_spin_putport_oncpu;
368     else
369 	pportfn_oncpu = lwkt_panic_putport_oncpu;
370 
371     _lwkt_initport(port,
372 		   lwkt_spin_getport,
373 		   lwkt_spin_putport,
374 		   lwkt_spin_waitmsg,
375 		   lwkt_spin_waitport,
376 		   lwkt_spin_replyport,
377 		   dmsgfn,
378 		   pportfn_oncpu);
379     spin_init(&port->mpu_spin, "lwktinitport");
380     port->mpu_td = td;
381     if (fixed_cpuid)
382 	port->mp_cpuid = td->td_gd->gd_cpuid;
383 }
384 
385 /*
386  * lwkt_initport_serialize()
387  *
388  *	Initialize a port for use with descriptors that might be accessed
389  *	via multiple LWPs, processes, or threads.  Callers are assumed to
390  *	have held the serializer (slz).
391  */
392 void
393 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
394 {
395     _lwkt_initport(port,
396 		   lwkt_serialize_getport,
397 		   lwkt_serialize_putport,
398 		   lwkt_serialize_waitmsg,
399 		   lwkt_serialize_waitport,
400 		   lwkt_serialize_replyport,
401 		   lwkt_panic_dropmsg,
402 		   lwkt_panic_putport_oncpu);
403     port->mpu_serialize = slz;
404 }
405 
406 /*
407  * Similar to the standard initport, this function simply marks the message
408  * as being done and does not attempt to return it to an originating port.
409  */
410 void
411 lwkt_initport_replyonly_null(lwkt_port_t port)
412 {
413     _lwkt_initport(port,
414 		   lwkt_panic_getport,
415 		   lwkt_panic_putport,
416 		   lwkt_panic_waitmsg,
417 		   lwkt_panic_waitport,
418 		   lwkt_null_replyport,
419 		   lwkt_panic_dropmsg,
420 		   lwkt_panic_putport_oncpu);
421 }
422 
423 /*
424  * Initialize a reply-only port, typically used as a message sink.  Such
425  * ports can only be used as a reply port.
426  */
427 void
428 lwkt_initport_replyonly(lwkt_port_t port,
429 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
430 {
431     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
432 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
433 			 rportfn, lwkt_panic_dropmsg,
434 			 lwkt_panic_putport_oncpu);
435 }
436 
437 void
438 lwkt_initport_putonly(lwkt_port_t port,
439 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
440 {
441     _lwkt_initport(port, lwkt_panic_getport, pportfn,
442 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
443 			 lwkt_panic_replyport, lwkt_panic_dropmsg,
444 			 lwkt_panic_putport_oncpu);
445 }
446 
447 void
448 lwkt_initport_panic(lwkt_port_t port)
449 {
450     _lwkt_initport(port,
451 		   lwkt_panic_getport, lwkt_panic_putport,
452 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
453 		   lwkt_panic_replyport, lwkt_panic_dropmsg,
454 		   lwkt_panic_putport_oncpu);
455 }
456 
457 static __inline
458 void
459 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
460 {
461     lwkt_msg_queue *queue;
462 
463     /*
464      * normal case, remove and return the message.
465      */
466     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
467 	queue = &port->mp_msgq_prio;
468     else
469 	queue = &port->mp_msgq;
470     TAILQ_REMOVE(queue, msg, ms_node);
471 
472     /*
473      * atomic op needed for spin ports
474      */
475     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
476 }
477 
478 static __inline
479 void
480 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
481 {
482     lwkt_msg_queue *queue;
483 
484     /*
485      * atomic op needed for spin ports
486      */
487     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
488     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
489 	queue = &port->mp_msgq_prio;
490     else
491     	queue = &port->mp_msgq;
492     TAILQ_INSERT_TAIL(queue, msg, ms_node);
493 
494     if (msg->ms_flags & MSGF_RECEIPT) {
495 	/*
496 	 * In case this message is forwarded later, the receipt
497 	 * flag was cleared once the receipt function is called.
498 	 */
499 	atomic_clear_int(&msg->ms_flags, MSGF_RECEIPT);
500 	msg->ms_receiptfn(msg, port);
501     }
502 }
503 
504 static __inline
505 lwkt_msg_t
506 _lwkt_pollmsg(lwkt_port_t port)
507 {
508     lwkt_msg_t msg;
509 
510     msg = TAILQ_FIRST(&port->mp_msgq_prio);
511     if (__predict_false(msg != NULL))
512 	return msg;
513 
514     /*
515      * Priority queue has no message, fallback to non-priority queue.
516      */
517     return TAILQ_FIRST(&port->mp_msgq);
518 }
519 
520 static __inline
521 void
522 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
523 {
524     /*
525      * atomic op needed for spin ports
526      */
527     _lwkt_pushmsg(port, msg);
528     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
529 }
530 
531 /************************************************************************
532  *			THREAD PORT BACKEND				*
533  ************************************************************************
534  *
535  * This backend is used when the port a message is retrieved from is owned
536  * by a single thread (the calling thread).  Messages are IPId to the
537  * correct cpu before being enqueued to a port.  Note that this is fairly
538  * optimal since scheduling would have had to do an IPI anyway if the
539  * message were headed to a different cpu.
540  */
541 
542 /*
543  * This function completes reply processing for the default case in the
544  * context of the originating cpu.
545  */
546 static
547 void
548 lwkt_thread_replyport_remote(lwkt_msg_t msg)
549 {
550     lwkt_port_t port = msg->ms_reply_port;
551     int flags;
552 
553     /*
554      * Chase any thread migration that occurs
555      */
556     if (port->mpu_td->td_gd != mycpu) {
557 	lwkt_send_ipiq(port->mpu_td->td_gd,
558 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
559 	return;
560     }
561 
562     /*
563      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
564      */
565 #ifdef INVARIANTS
566     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
567     msg->ms_flags &= ~MSGF_INTRANSIT;
568 #endif
569     flags = msg->ms_flags;
570     if (msg->ms_flags & MSGF_SYNC) {
571 	cpu_sfence();
572 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
573     } else {
574 	_lwkt_enqueue_reply(port, msg);
575     }
576     if (port->mp_flags & MSGPORTF_WAITING)
577 	_lwkt_schedule_msg(port->mpu_td, flags);
578 }
579 
580 /*
581  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
582  *
583  * Called with the reply port as an argument but in the context of the
584  * original target port.  Completion must occur on the target port's
585  * cpu.
586  *
587  * The critical section protects us from IPIs on the this CPU.
588  */
589 static
590 void
591 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
592 {
593     int flags;
594 
595     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
596 
597     if (msg->ms_flags & MSGF_SYNC) {
598 	/*
599 	 * If a synchronous completion has been requested, just wakeup
600 	 * the message without bothering to queue it to the target port.
601 	 *
602 	 * Assume the target thread is non-preemptive, so no critical
603 	 * section is required.
604 	 */
605 	if (port->mpu_td->td_gd == mycpu) {
606 	    crit_enter();
607 	    flags = msg->ms_flags;
608 	    cpu_sfence();
609 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
610 	    if (port->mp_flags & MSGPORTF_WAITING)
611 		_lwkt_schedule_msg(port->mpu_td, flags);
612 	    crit_exit();
613 	} else {
614 #ifdef INVARIANTS
615 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
616 #endif
617 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
618 	    lwkt_send_ipiq(port->mpu_td->td_gd,
619 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
620 	}
621     } else {
622 	/*
623 	 * If an asynchronous completion has been requested the message
624 	 * must be queued to the reply port.
625 	 *
626 	 * A critical section is required to interlock the port queue.
627 	 */
628 	if (port->mpu_td->td_gd == mycpu) {
629 	    crit_enter();
630 	    _lwkt_enqueue_reply(port, msg);
631 	    if (port->mp_flags & MSGPORTF_WAITING)
632 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
633 	    crit_exit();
634 	} else {
635 #ifdef INVARIANTS
636 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
637 #endif
638 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
639 	    lwkt_send_ipiq(port->mpu_td->td_gd,
640 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
641 	}
642     }
643 }
644 
645 /*
646  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
647  *
648  * This function could _only_ be used when caller is in the same thread
649  * as the message's target port owner thread.
650  */
651 static int
652 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
653 {
654     int error;
655 
656     KASSERT(port->mpu_td == curthread,
657     	    ("message could only be dropped in the same thread "
658 	     "as the message target port thread"));
659     crit_enter_quick(port->mpu_td);
660     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
661 	    _lwkt_pullmsg(port, msg);
662 	    atomic_set_int(&msg->ms_flags, MSGF_DONE);
663 	    error = 0;
664     } else {
665 	    error = ENOENT;
666     }
667     crit_exit_quick(port->mpu_td);
668 
669     return (error);
670 }
671 
672 /*
673  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
674  *
675  * Called with the target port as an argument but in the context of the
676  * reply port.  This function always implements an asynchronous put to
677  * the target message port, and thus returns EASYNC.
678  *
679  * The message must already have cleared MSGF_DONE and MSGF_REPLY
680  */
681 static
682 void
683 lwkt_thread_putport_remote(lwkt_msg_t msg)
684 {
685     lwkt_port_t port = msg->ms_target_port;
686 
687     /*
688      * Chase any thread migration that occurs
689      */
690     if (port->mpu_td->td_gd != mycpu) {
691 	lwkt_send_ipiq(port->mpu_td->td_gd,
692 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
693 	return;
694     }
695 
696     /*
697      * An atomic op is needed on ms_flags vs originator.  Also
698      * note that the originator might be using a different type
699      * of msgport.
700      */
701 #ifdef INVARIANTS
702     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
703     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
704 #endif
705     _lwkt_pushmsg(port, msg);
706     if (port->mp_flags & MSGPORTF_WAITING)
707 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
708 }
709 
710 static
711 int
712 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
713 {
714     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
715 
716     msg->ms_target_port = port;
717     if (port->mpu_td->td_gd == mycpu) {
718 	crit_enter();
719 	_lwkt_pushmsg(port, msg);
720 	if (port->mp_flags & MSGPORTF_WAITING)
721 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
722 	crit_exit();
723     } else {
724 #ifdef INVARIANTS
725 	/*
726 	 * Cleanup.
727 	 *
728 	 * An atomic op is needed on ms_flags vs originator.  Also
729 	 * note that the originator might be using a different type
730 	 * of msgport.
731 	 */
732 	atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
733 #endif
734 	lwkt_send_ipiq(port->mpu_td->td_gd,
735 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
736     }
737     return (EASYNC);
738 }
739 
740 /*
741  * lwkt_thread_getport()
742  *
743  *	Retrieve the next message from the port or NULL if no messages
744  *	are ready.
745  */
746 static
747 void *
748 lwkt_thread_getport(lwkt_port_t port)
749 {
750     lwkt_msg_t msg;
751 
752     KKASSERT(port->mpu_td == curthread);
753 
754     crit_enter_quick(port->mpu_td);
755     if ((msg = _lwkt_pollmsg(port)) != NULL)
756 	_lwkt_pullmsg(port, msg);
757     crit_exit_quick(port->mpu_td);
758     return(msg);
759 }
760 
761 /*
762  * lwkt_thread_waitmsg()
763  *
764  *	Wait for a particular message to be replied.  We must be the only
765  *	thread waiting on the message.  The port must be owned by the
766  *	caller.
767  */
768 static
769 int
770 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
771 {
772     thread_t td = curthread;
773 
774     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
775 	    ("can't wait dropable message"));
776 
777     if ((msg->ms_flags & MSGF_DONE) == 0) {
778 	/*
779 	 * If the done bit was not set we have to block until it is.
780 	 */
781 	lwkt_port_t port = msg->ms_reply_port;
782 	int sentabort;
783 
784 	KKASSERT(port->mpu_td == td);
785 	crit_enter_quick(td);
786 	sentabort = 0;
787 
788 	while ((msg->ms_flags & MSGF_DONE) == 0) {
789 	    port->mp_flags |= MSGPORTF_WAITING;	/* same cpu */
790 	    if (sentabort == 0) {
791 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
792 		    lwkt_abortmsg(msg);
793 		}
794 	    } else {
795 		lwkt_sleep("waitabt", 0);
796 	    }
797 	    port->mp_flags &= ~MSGPORTF_WAITING;
798 	}
799 	if (msg->ms_flags & MSGF_QUEUED)
800 	    _lwkt_pullmsg(port, msg);
801 	crit_exit_quick(td);
802     } else {
803 	/*
804 	 * If the done bit was set we only have to mess around with the
805 	 * message if it is queued on the reply port.
806 	 */
807 	crit_enter_quick(td);
808 	if (msg->ms_flags & MSGF_QUEUED) {
809 	    lwkt_port_t port = msg->ms_reply_port;
810 	    thread_t td __debugvar = curthread;
811 
812 	    KKASSERT(port->mpu_td == td);
813 	    _lwkt_pullmsg(port, msg);
814 	}
815 	crit_exit_quick(td);
816     }
817     return(msg->ms_error);
818 }
819 
820 /*
821  * lwkt_thread_waitport()
822  *
823  *	Wait for a new message to be available on the port.  We must be the
824  *	the only thread waiting on the port.  The port must be owned by caller.
825  */
826 static
827 void *
828 lwkt_thread_waitport(lwkt_port_t port, int flags)
829 {
830     thread_t td = curthread;
831     lwkt_msg_t msg;
832     int error;
833 
834     KKASSERT(port->mpu_td == td);
835     crit_enter_quick(td);
836     while ((msg = _lwkt_pollmsg(port)) == NULL) {
837 	port->mp_flags |= MSGPORTF_WAITING;
838 	error = lwkt_sleep("waitport", flags);
839 	port->mp_flags &= ~MSGPORTF_WAITING;
840 	if (error)
841 	    goto done;
842     }
843     _lwkt_pullmsg(port, msg);
844 done:
845     crit_exit_quick(td);
846     return(msg);
847 }
848 
849 /************************************************************************
850  *			   SPIN PORT BACKEND				*
851  ************************************************************************
852  *
853  * This backend uses spinlocks instead of making assumptions about which
854  * thread is accessing the port.  It must be used when a port is not owned
855  * by a particular thread.  This is less optimal then thread ports but
856  * you don't have a choice if there are multiple threads accessing the port.
857  *
858  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
859  * on the message port, it is the responsibility of the code doing the
860  * wakeup to clear this flag rather then the blocked threads.  Some
861  * superfluous wakeups may occur, which is ok.
862  *
863  * XXX synchronous message wakeups are not current optimized.
864  */
865 
866 static
867 void *
868 lwkt_spin_getport(lwkt_port_t port)
869 {
870     lwkt_msg_t msg;
871 
872     spin_lock(&port->mpu_spin);
873     if ((msg = _lwkt_pollmsg(port)) != NULL)
874 	_lwkt_pullmsg(port, msg);
875     spin_unlock(&port->mpu_spin);
876     return(msg);
877 }
878 
879 static __inline int
880 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
881 {
882     int dowakeup;
883 
884     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
885 
886     msg->ms_target_port = port;
887     spin_lock(&port->mpu_spin);
888     _lwkt_pushmsg(port, msg);
889     dowakeup = 0;
890     if (port->mp_flags & MSGPORTF_WAITING) {
891 	port->mp_flags &= ~MSGPORTF_WAITING;
892 	dowakeup = 1;
893     }
894     spin_unlock(&port->mpu_spin);
895 
896     return dowakeup;
897 }
898 
899 static
900 int
901 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
902 {
903     if (lwkt_spin_putport_only(port, msg))
904 	wakeup(port);
905     return (EASYNC);
906 }
907 
908 static
909 int
910 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
911 {
912     KASSERT(port->mp_cpuid == mycpuid,
913         ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
914 	 port->mp_cpuid, mycpuid));
915     if (lwkt_spin_putport_only(port, msg))
916 	wakeup_mycpu(port);
917     return (EASYNC);
918 }
919 
920 static
921 int
922 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
923 {
924     lwkt_port_t port;
925     int sentabort;
926     int error;
927 
928     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
929 	    ("can't wait dropable message"));
930     port = msg->ms_reply_port;
931 
932     if ((msg->ms_flags & MSGF_DONE) == 0) {
933 	sentabort = 0;
934 	spin_lock(&port->mpu_spin);
935 	while ((msg->ms_flags & MSGF_DONE) == 0) {
936 	    void *won;
937 
938 	    /*
939 	     * If message was sent synchronously from the beginning
940 	     * the wakeup will be on the message structure, else it
941 	     * will be on the port structure.
942 	     *
943 	     * ms_flags needs atomic op originator vs target MSGF_QUEUED
944 	     */
945 	    if (msg->ms_flags & MSGF_SYNC) {
946 		won = msg;
947 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
948 	    } else {
949 		won = port;
950 		port->mp_flags |= MSGPORTF_WAITING;
951 	    }
952 
953 	    /*
954 	     * Only messages which support abort can be interrupted.
955 	     * We must still wait for message completion regardless.
956 	     */
957 	    if ((flags & PCATCH) && sentabort == 0) {
958 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
959 		if (error) {
960 		    sentabort = error;
961 		    spin_unlock(&port->mpu_spin);
962 		    lwkt_abortmsg(msg);
963 		    spin_lock(&port->mpu_spin);
964 		}
965 	    } else {
966 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
967 	    }
968 	    /* see note at the top on the MSGPORTF_WAITING flag */
969 	}
970 	/*
971 	 * Turn EINTR into ERESTART if the signal indicates.
972 	 */
973 	if (sentabort && msg->ms_error == EINTR)
974 	    msg->ms_error = sentabort;
975 	if (msg->ms_flags & MSGF_QUEUED)
976 	    _lwkt_pullmsg(port, msg);
977 	spin_unlock(&port->mpu_spin);
978     } else {
979 	spin_lock(&port->mpu_spin);
980 	if (msg->ms_flags & MSGF_QUEUED) {
981 	    _lwkt_pullmsg(port, msg);
982 	}
983 	spin_unlock(&port->mpu_spin);
984     }
985     return(msg->ms_error);
986 }
987 
988 static
989 void *
990 lwkt_spin_waitport(lwkt_port_t port, int flags)
991 {
992     lwkt_msg_t msg;
993     int error;
994 
995     spin_lock(&port->mpu_spin);
996     while ((msg = _lwkt_pollmsg(port)) == NULL) {
997 	port->mp_flags |= MSGPORTF_WAITING;
998 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
999 	/* see note at the top on the MSGPORTF_WAITING flag */
1000 	if (error) {
1001 	    spin_unlock(&port->mpu_spin);
1002 	    return(NULL);
1003 	}
1004     }
1005     _lwkt_pullmsg(port, msg);
1006     spin_unlock(&port->mpu_spin);
1007     return(msg);
1008 }
1009 
1010 static
1011 void
1012 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
1013 {
1014     int dowakeup;
1015 
1016     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1017 
1018     if (msg->ms_flags & MSGF_SYNC) {
1019 	/*
1020 	 * If a synchronous completion has been requested, just wakeup
1021 	 * the message without bothering to queue it to the target port.
1022 	 *
1023 	 * ms_flags protected by reply port spinlock
1024 	 */
1025 	spin_lock(&port->mpu_spin);
1026 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1027 	dowakeup = 0;
1028 	if (msg->ms_flags & MSGF_WAITING) {
1029 		msg->ms_flags &= ~MSGF_WAITING;
1030 		dowakeup = 1;
1031 	}
1032 	spin_unlock(&port->mpu_spin);
1033 	if (dowakeup)
1034 		wakeup(msg);
1035     } else {
1036 	/*
1037 	 * If an asynchronous completion has been requested the message
1038 	 * must be queued to the reply port.
1039 	 */
1040 	spin_lock(&port->mpu_spin);
1041 	_lwkt_enqueue_reply(port, msg);
1042 	dowakeup = 0;
1043 	if (port->mp_flags & MSGPORTF_WAITING) {
1044 	    port->mp_flags &= ~MSGPORTF_WAITING;
1045 	    dowakeup = 1;
1046 	}
1047 	spin_unlock(&port->mpu_spin);
1048 	if (dowakeup)
1049 	    wakeup(port);
1050     }
1051 }
1052 
1053 /*
1054  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1055  *
1056  * This function could _only_ be used when caller is in the same thread
1057  * as the message's target port owner thread.
1058  */
1059 static int
1060 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1061 {
1062     int error;
1063 
1064     KASSERT(port->mpu_td == curthread,
1065     	    ("message could only be dropped in the same thread "
1066 	     "as the message target port thread\n"));
1067     spin_lock(&port->mpu_spin);
1068     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1069 	    _lwkt_pullmsg(port, msg);
1070 	    msg->ms_flags |= MSGF_DONE;
1071 	    error = 0;
1072     } else {
1073 	    error = ENOENT;
1074     }
1075     spin_unlock(&port->mpu_spin);
1076 
1077     return (error);
1078 }
1079 
1080 /************************************************************************
1081  *			  SERIALIZER PORT BACKEND			*
1082  ************************************************************************
1083  *
1084  * This backend uses serializer to protect port accessing.  Callers are
1085  * assumed to have serializer held.  This kind of port is usually created
1086  * by network device driver along with _one_ lwkt thread to pipeline
1087  * operations which may temporarily release serializer.
1088  *
1089  * Implementation is based on SPIN PORT BACKEND.
1090  */
1091 
1092 static
1093 void *
1094 lwkt_serialize_getport(lwkt_port_t port)
1095 {
1096     lwkt_msg_t msg;
1097 
1098     ASSERT_SERIALIZED(port->mpu_serialize);
1099 
1100     if ((msg = _lwkt_pollmsg(port)) != NULL)
1101 	_lwkt_pullmsg(port, msg);
1102     return(msg);
1103 }
1104 
1105 static
1106 int
1107 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1108 {
1109     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1110     ASSERT_SERIALIZED(port->mpu_serialize);
1111 
1112     msg->ms_target_port = port;
1113     _lwkt_pushmsg(port, msg);
1114     if (port->mp_flags & MSGPORTF_WAITING) {
1115 	port->mp_flags &= ~MSGPORTF_WAITING;
1116 	wakeup(port);
1117     }
1118     return (EASYNC);
1119 }
1120 
1121 static
1122 int
1123 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1124 {
1125     lwkt_port_t port;
1126     int sentabort;
1127     int error;
1128 
1129     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1130 	    ("can't wait dropable message"));
1131 
1132     if ((msg->ms_flags & MSGF_DONE) == 0) {
1133 	port = msg->ms_reply_port;
1134 
1135 	ASSERT_SERIALIZED(port->mpu_serialize);
1136 
1137 	sentabort = 0;
1138 	while ((msg->ms_flags & MSGF_DONE) == 0) {
1139 	    void *won;
1140 
1141 	    /*
1142 	     * If message was sent synchronously from the beginning
1143 	     * the wakeup will be on the message structure, else it
1144 	     * will be on the port structure.
1145 	     */
1146 	    if (msg->ms_flags & MSGF_SYNC) {
1147 		won = msg;
1148 	    } else {
1149 		won = port;
1150 		port->mp_flags |= MSGPORTF_WAITING;
1151 	    }
1152 
1153 	    /*
1154 	     * Only messages which support abort can be interrupted.
1155 	     * We must still wait for message completion regardless.
1156 	     */
1157 	    if ((flags & PCATCH) && sentabort == 0) {
1158 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1159 		if (error) {
1160 		    sentabort = error;
1161 		    lwkt_serialize_exit(port->mpu_serialize);
1162 		    lwkt_abortmsg(msg);
1163 		    lwkt_serialize_enter(port->mpu_serialize);
1164 		}
1165 	    } else {
1166 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1167 	    }
1168 	    /* see note at the top on the MSGPORTF_WAITING flag */
1169 	}
1170 	/*
1171 	 * Turn EINTR into ERESTART if the signal indicates.
1172 	 */
1173 	if (sentabort && msg->ms_error == EINTR)
1174 	    msg->ms_error = sentabort;
1175 	if (msg->ms_flags & MSGF_QUEUED)
1176 	    _lwkt_pullmsg(port, msg);
1177     } else {
1178 	if (msg->ms_flags & MSGF_QUEUED) {
1179 	    port = msg->ms_reply_port;
1180 
1181 	    ASSERT_SERIALIZED(port->mpu_serialize);
1182 	    _lwkt_pullmsg(port, msg);
1183 	}
1184     }
1185     return(msg->ms_error);
1186 }
1187 
1188 static
1189 void *
1190 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1191 {
1192     lwkt_msg_t msg;
1193     int error;
1194 
1195     ASSERT_SERIALIZED(port->mpu_serialize);
1196 
1197     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1198 	port->mp_flags |= MSGPORTF_WAITING;
1199 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1200 	/* see note at the top on the MSGPORTF_WAITING flag */
1201 	if (error)
1202 	    return(NULL);
1203     }
1204     _lwkt_pullmsg(port, msg);
1205     return(msg);
1206 }
1207 
1208 static
1209 void
1210 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1211 {
1212     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1213     ASSERT_SERIALIZED(port->mpu_serialize);
1214 
1215     if (msg->ms_flags & MSGF_SYNC) {
1216 	/*
1217 	 * If a synchronous completion has been requested, just wakeup
1218 	 * the message without bothering to queue it to the target port.
1219 	 *
1220 	 * (both sides synchronized via serialized reply port)
1221 	 */
1222 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1223 	wakeup(msg);
1224     } else {
1225 	/*
1226 	 * If an asynchronous completion has been requested the message
1227 	 * must be queued to the reply port.
1228 	 */
1229 	_lwkt_enqueue_reply(port, msg);
1230 	if (port->mp_flags & MSGPORTF_WAITING) {
1231 	    port->mp_flags &= ~MSGPORTF_WAITING;
1232 	    wakeup(port);
1233 	}
1234     }
1235 }
1236 
1237 /************************************************************************
1238  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1239  ************************************************************************/
1240 
1241 /*
1242  * You can point a port's reply vector at this function if you just want
1243  * the message marked done, without any queueing or signaling.  This is
1244  * often used for structure-embedded messages.
1245  */
1246 static
1247 void
1248 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1249 {
1250     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1251 }
1252 
1253 static
1254 void *
1255 lwkt_panic_getport(lwkt_port_t port)
1256 {
1257     panic("lwkt_getport() illegal on port %p", port);
1258 }
1259 
1260 static
1261 int
1262 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1263 {
1264     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1265 }
1266 
1267 static
1268 int
1269 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1270 {
1271     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1272 }
1273 
1274 static
1275 void *
1276 lwkt_panic_waitport(lwkt_port_t port, int flags)
1277 {
1278     panic("port %p cannot be waited on", port);
1279 }
1280 
1281 static
1282 void
1283 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1284 {
1285     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1286 }
1287 
1288 static
1289 int
1290 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1291 {
1292     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1293     /* NOT REACHED */
1294     return (ENOENT);
1295 }
1296 
1297 static
1298 int
1299 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1300 {
1301     panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1302         port, msg);
1303     /* NOT REACHED */
1304     return (ENOENT);
1305 }
1306