xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 82730a9c)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #include <machine/smp.h>
69 
70 #include <sys/malloc.h>
71 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
72 
73 /************************************************************************
74  *				MESSAGE FUNCTIONS			*
75  ************************************************************************/
76 
77 static __inline int
78 lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
79 {
80     return port->mp_putport(port, msg);
81 }
82 
83 static __inline int
84 lwkt_beginmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
85 {
86     return port->mp_putport_oncpu(port, msg);
87 }
88 
89 static __inline void
90 _lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
91 {
92     KKASSERT(msg->ms_reply_port != NULL &&
93 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
94     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
95 }
96 
97 static __inline void
98 _lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
99 {
100     int error;
101 
102     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
103 	/*
104 	 * Target port opted to execute the message synchronously so
105 	 * queue the response.
106 	 */
107 	lwkt_replymsg(msg, error);
108     }
109 }
110 
111 static __inline void
112 _lwkt_sendmsg_start_oncpu(lwkt_port_t port, lwkt_msg_t msg)
113 {
114     int error;
115 
116     if ((error = lwkt_beginmsg_oncpu(port, msg)) != EASYNC) {
117 	/*
118 	 * Target port opted to execute the message synchronously so
119 	 * queue the response.
120 	 */
121 	lwkt_replymsg(msg, error);
122     }
123 }
124 
125 /*
126  * lwkt_sendmsg()
127  *
128  *	Request asynchronous completion and call lwkt_beginmsg().  The
129  *	target port can opt to execute the message synchronously or
130  *	asynchronously and this function will automatically queue the
131  *	response if the target executes the message synchronously.
132  *
133  *	NOTE: The message is in an indeterminant state until this call
134  *	returns.  The caller should not mess with it (e.g. try to abort it)
135  *	until then.
136  *
137  *	NOTE: Do not use this function to forward a message as we might
138  *	clobber ms_flags in a SMP race.
139  */
140 void
141 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
142 {
143     _lwkt_sendmsg_prepare(port, msg);
144     _lwkt_sendmsg_start(port, msg);
145 }
146 
147 void
148 lwkt_sendmsg_oncpu(lwkt_port_t port, lwkt_msg_t msg)
149 {
150     _lwkt_sendmsg_prepare(port, msg);
151     _lwkt_sendmsg_start_oncpu(port, msg);
152 }
153 
154 void
155 lwkt_sendmsg_prepare(lwkt_port_t port, lwkt_msg_t msg)
156 {
157     _lwkt_sendmsg_prepare(port, msg);
158 }
159 
160 void
161 lwkt_sendmsg_start(lwkt_port_t port, lwkt_msg_t msg)
162 {
163     _lwkt_sendmsg_start(port, msg);
164 }
165 
166 /*
167  * lwkt_domsg()
168  *
169  *	Request synchronous completion and call lwkt_beginmsg().  The
170  *	target port can opt to execute the message synchronously or
171  *	asynchronously and this function will automatically block and
172  *	wait for a response if the target executes the message
173  *	asynchronously.
174  *
175  *	NOTE: Do not use this function to forward a message as we might
176  *	clobber ms_flags in a SMP race.
177  */
178 int
179 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
180 {
181     int error;
182 
183     KKASSERT(msg->ms_reply_port != NULL &&
184 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
185     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
186     msg->ms_flags |= MSGF_SYNC;
187     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
188 	/*
189 	 * Target port opted to execute the message asynchronously so
190 	 * block and wait for a reply.
191 	 */
192 	error = lwkt_waitmsg(msg, flags);
193     } else {
194 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
195     }
196     return(error);
197 }
198 
199 /*
200  * lwkt_forwardmsg()
201  *
202  * Forward a message received on one port to another port.
203  */
204 int
205 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
206 {
207     int error;
208 
209     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
210     if ((error = port->mp_putport(port, msg)) != EASYNC)
211 	lwkt_replymsg(msg, error);
212     return(error);
213 }
214 
215 /*
216  * lwkt_abortmsg()
217  *
218  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
219  * The caller must ensure that the message will not be both replied AND
220  * destroyed while the abort is in progress.
221  *
222  * This function issues a callback which might block!
223  */
224 void
225 lwkt_abortmsg(lwkt_msg_t msg)
226 {
227     /*
228      * A critical section protects us from reply IPIs on this cpu.
229      */
230     crit_enter();
231 
232     /*
233      * Shortcut the operation if the message has already been returned.
234      * The callback typically constructs a lwkt_msg with the abort request,
235      * issues it synchronously, and waits for completion.  The callback
236      * is not required to actually abort the message and the target port,
237      * upon receiving an abort request message generated by the callback
238      * should check whether the original message has already completed or
239      * not.
240      */
241     if (msg->ms_flags & MSGF_ABORTABLE) {
242 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
243 	    msg->ms_abortfn(msg);
244     }
245     crit_exit();
246 }
247 
248 /************************************************************************
249  *			PORT INITIALIZATION API				*
250  ************************************************************************/
251 
252 static void *lwkt_thread_getport(lwkt_port_t port);
253 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
254 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
255 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
256 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
257 static int lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
258 
259 static void *lwkt_spin_getport(lwkt_port_t port);
260 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
261 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
262 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
263 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
264 static int lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
265 static int lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
266 
267 static void *lwkt_serialize_getport(lwkt_port_t port);
268 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
269 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
270 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
271 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
272 
273 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
274 static void *lwkt_panic_getport(lwkt_port_t port);
275 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
276 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
277 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
278 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
279 static int lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
280 static int lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg);
281 
282 /*
283  * Core port initialization (internal)
284  */
285 static __inline
286 void
287 _lwkt_initport(lwkt_port_t port,
288 	       void *(*gportfn)(lwkt_port_t),
289 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
290 	       int (*wmsgfn)(lwkt_msg_t, int),
291 	       void *(*wportfn)(lwkt_port_t, int),
292 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
293 	       int (*dmsgfn)(lwkt_port_t, lwkt_msg_t),
294 	       int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t))
295 {
296     bzero(port, sizeof(*port));
297     port->mp_cpuid = -1;
298     TAILQ_INIT(&port->mp_msgq);
299     TAILQ_INIT(&port->mp_msgq_prio);
300     port->mp_getport = gportfn;
301     port->mp_putport = pportfn;
302     port->mp_waitmsg = wmsgfn;
303     port->mp_waitport = wportfn;
304     port->mp_replyport = rportfn;
305     port->mp_dropmsg = dmsgfn;
306     port->mp_putport_oncpu = pportfn_oncpu;
307 }
308 
309 /*
310  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
311  * we tell the scheduler not to reschedule if td is at a higher priority.
312  *
313  * This routine is called even if the thread is already scheduled.
314  */
315 static __inline
316 void
317 _lwkt_schedule_msg(thread_t td, int flags)
318 {
319     lwkt_schedule(td);
320 }
321 
322 /*
323  * lwkt_initport_thread()
324  *
325  *	Initialize a port for use by a particular thread.  The port may
326  *	only be used by <td>.
327  */
328 void
329 lwkt_initport_thread(lwkt_port_t port, thread_t td)
330 {
331     _lwkt_initport(port,
332 		   lwkt_thread_getport,
333 		   lwkt_thread_putport,
334 		   lwkt_thread_waitmsg,
335 		   lwkt_thread_waitport,
336 		   lwkt_thread_replyport,
337 		   lwkt_thread_dropmsg,
338 		   lwkt_thread_putport);
339     port->mpu_td = td;
340 }
341 
342 /*
343  * lwkt_initport_spin()
344  *
345  *	Initialize a port for use with descriptors that might be accessed
346  *	via multiple LWPs, processes, or threads.  Has somewhat more
347  *	overhead then thread ports.
348  */
349 void
350 lwkt_initport_spin(lwkt_port_t port, thread_t td, boolean_t fixed_cpuid)
351 {
352     int (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
353     int (*pportfn_oncpu)(lwkt_port_t, lwkt_msg_t);
354 
355     if (td == NULL)
356 	dmsgfn = lwkt_panic_dropmsg;
357     else
358 	dmsgfn = lwkt_spin_dropmsg;
359 
360     if (fixed_cpuid)
361 	pportfn_oncpu = lwkt_spin_putport_oncpu;
362     else
363 	pportfn_oncpu = lwkt_panic_putport_oncpu;
364 
365     _lwkt_initport(port,
366 		   lwkt_spin_getport,
367 		   lwkt_spin_putport,
368 		   lwkt_spin_waitmsg,
369 		   lwkt_spin_waitport,
370 		   lwkt_spin_replyport,
371 		   dmsgfn,
372 		   pportfn_oncpu);
373     spin_init(&port->mpu_spin);
374     port->mpu_td = td;
375     if (fixed_cpuid)
376 	port->mp_cpuid = td->td_gd->gd_cpuid;
377 }
378 
379 /*
380  * lwkt_initport_serialize()
381  *
382  *	Initialize a port for use with descriptors that might be accessed
383  *	via multiple LWPs, processes, or threads.  Callers are assumed to
384  *	have held the serializer (slz).
385  */
386 void
387 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
388 {
389     _lwkt_initport(port,
390 		   lwkt_serialize_getport,
391 		   lwkt_serialize_putport,
392 		   lwkt_serialize_waitmsg,
393 		   lwkt_serialize_waitport,
394 		   lwkt_serialize_replyport,
395 		   lwkt_panic_dropmsg,
396 		   lwkt_panic_putport_oncpu);
397     port->mpu_serialize = slz;
398 }
399 
400 /*
401  * Similar to the standard initport, this function simply marks the message
402  * as being done and does not attempt to return it to an originating port.
403  */
404 void
405 lwkt_initport_replyonly_null(lwkt_port_t port)
406 {
407     _lwkt_initport(port,
408 		   lwkt_panic_getport,
409 		   lwkt_panic_putport,
410 		   lwkt_panic_waitmsg,
411 		   lwkt_panic_waitport,
412 		   lwkt_null_replyport,
413 		   lwkt_panic_dropmsg,
414 		   lwkt_panic_putport_oncpu);
415 }
416 
417 /*
418  * Initialize a reply-only port, typically used as a message sink.  Such
419  * ports can only be used as a reply port.
420  */
421 void
422 lwkt_initport_replyonly(lwkt_port_t port,
423 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
424 {
425     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
426 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
427 			 rportfn, lwkt_panic_dropmsg,
428 			 lwkt_panic_putport_oncpu);
429 }
430 
431 void
432 lwkt_initport_putonly(lwkt_port_t port,
433 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
434 {
435     _lwkt_initport(port, lwkt_panic_getport, pportfn,
436 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
437 			 lwkt_panic_replyport, lwkt_panic_dropmsg,
438 			 lwkt_panic_putport_oncpu);
439 }
440 
441 void
442 lwkt_initport_panic(lwkt_port_t port)
443 {
444     _lwkt_initport(port,
445 		   lwkt_panic_getport, lwkt_panic_putport,
446 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
447 		   lwkt_panic_replyport, lwkt_panic_dropmsg,
448 		   lwkt_panic_putport_oncpu);
449 }
450 
451 static __inline
452 void
453 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
454 {
455     lwkt_msg_queue *queue;
456 
457     /*
458      * normal case, remove and return the message.
459      */
460     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
461 	queue = &port->mp_msgq_prio;
462     else
463 	queue = &port->mp_msgq;
464     TAILQ_REMOVE(queue, msg, ms_node);
465 
466     /*
467      * atomic op needed for spin ports
468      */
469     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
470 }
471 
472 static __inline
473 void
474 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
475 {
476     lwkt_msg_queue *queue;
477 
478     /*
479      * atomic op needed for spin ports
480      */
481     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
482     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
483 	queue = &port->mp_msgq_prio;
484     else
485     	queue = &port->mp_msgq;
486     TAILQ_INSERT_TAIL(queue, msg, ms_node);
487 }
488 
489 static __inline
490 lwkt_msg_t
491 _lwkt_pollmsg(lwkt_port_t port)
492 {
493     lwkt_msg_t msg;
494 
495     msg = TAILQ_FIRST(&port->mp_msgq_prio);
496     if (__predict_false(msg != NULL))
497 	return msg;
498 
499     /*
500      * Priority queue has no message, fallback to non-priority queue.
501      */
502     return TAILQ_FIRST(&port->mp_msgq);
503 }
504 
505 static __inline
506 void
507 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
508 {
509     /*
510      * atomic op needed for spin ports
511      */
512     _lwkt_pushmsg(port, msg);
513     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
514 }
515 
516 /************************************************************************
517  *			THREAD PORT BACKEND				*
518  ************************************************************************
519  *
520  * This backend is used when the port a message is retrieved from is owned
521  * by a single thread (the calling thread).  Messages are IPId to the
522  * correct cpu before being enqueued to a port.  Note that this is fairly
523  * optimal since scheduling would have had to do an IPI anyway if the
524  * message were headed to a different cpu.
525  */
526 
527 /*
528  * This function completes reply processing for the default case in the
529  * context of the originating cpu.
530  */
531 static
532 void
533 lwkt_thread_replyport_remote(lwkt_msg_t msg)
534 {
535     lwkt_port_t port = msg->ms_reply_port;
536     int flags;
537 
538     /*
539      * Chase any thread migration that occurs
540      */
541     if (port->mpu_td->td_gd != mycpu) {
542 	lwkt_send_ipiq(port->mpu_td->td_gd,
543 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
544 	return;
545     }
546 
547     /*
548      * Cleanup (in critical section, IPI on same cpu, atomic op not needed)
549      */
550 #ifdef INVARIANTS
551     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
552     msg->ms_flags &= ~MSGF_INTRANSIT;
553 #endif
554     flags = msg->ms_flags;
555     if (msg->ms_flags & MSGF_SYNC) {
556 	cpu_sfence();
557 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
558     } else {
559 	_lwkt_enqueue_reply(port, msg);
560     }
561     if (port->mp_flags & MSGPORTF_WAITING)
562 	_lwkt_schedule_msg(port->mpu_td, flags);
563 }
564 
565 /*
566  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
567  *
568  * Called with the reply port as an argument but in the context of the
569  * original target port.  Completion must occur on the target port's
570  * cpu.
571  *
572  * The critical section protects us from IPIs on the this CPU.
573  */
574 static
575 void
576 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
577 {
578     int flags;
579 
580     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
581 
582     if (msg->ms_flags & MSGF_SYNC) {
583 	/*
584 	 * If a synchronous completion has been requested, just wakeup
585 	 * the message without bothering to queue it to the target port.
586 	 *
587 	 * Assume the target thread is non-preemptive, so no critical
588 	 * section is required.
589 	 */
590 	if (port->mpu_td->td_gd == mycpu) {
591 	    crit_enter();
592 	    flags = msg->ms_flags;
593 	    cpu_sfence();
594 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
595 	    if (port->mp_flags & MSGPORTF_WAITING)
596 		_lwkt_schedule_msg(port->mpu_td, flags);
597 	    crit_exit();
598 	} else {
599 #ifdef INVARIANTS
600 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
601 #endif
602 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
603 	    lwkt_send_ipiq(port->mpu_td->td_gd,
604 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
605 	}
606     } else {
607 	/*
608 	 * If an asynchronous completion has been requested the message
609 	 * must be queued to the reply port.
610 	 *
611 	 * A critical section is required to interlock the port queue.
612 	 */
613 	if (port->mpu_td->td_gd == mycpu) {
614 	    crit_enter();
615 	    _lwkt_enqueue_reply(port, msg);
616 	    if (port->mp_flags & MSGPORTF_WAITING)
617 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
618 	    crit_exit();
619 	} else {
620 #ifdef INVARIANTS
621 	    atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
622 #endif
623 	    atomic_set_int(&msg->ms_flags, MSGF_REPLY);
624 	    lwkt_send_ipiq(port->mpu_td->td_gd,
625 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
626 	}
627     }
628 }
629 
630 /*
631  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
632  *
633  * This function could _only_ be used when caller is in the same thread
634  * as the message's target port owner thread.
635  */
636 static int
637 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
638 {
639     int error;
640 
641     KASSERT(port->mpu_td == curthread,
642     	    ("message could only be dropped in the same thread "
643 	     "as the message target port thread"));
644     crit_enter_quick(port->mpu_td);
645     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
646 	    _lwkt_pullmsg(port, msg);
647 	    atomic_set_int(&msg->ms_flags, MSGF_DONE);
648 	    error = 0;
649     } else {
650 	    error = ENOENT;
651     }
652     crit_exit_quick(port->mpu_td);
653 
654     return (error);
655 }
656 
657 /*
658  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
659  *
660  * Called with the target port as an argument but in the context of the
661  * reply port.  This function always implements an asynchronous put to
662  * the target message port, and thus returns EASYNC.
663  *
664  * The message must already have cleared MSGF_DONE and MSGF_REPLY
665  */
666 static
667 void
668 lwkt_thread_putport_remote(lwkt_msg_t msg)
669 {
670     lwkt_port_t port = msg->ms_target_port;
671 
672     /*
673      * Chase any thread migration that occurs
674      */
675     if (port->mpu_td->td_gd != mycpu) {
676 	lwkt_send_ipiq(port->mpu_td->td_gd,
677 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
678 	return;
679     }
680 
681     /*
682      * An atomic op is needed on ms_flags vs originator.  Also
683      * note that the originator might be using a different type
684      * of msgport.
685      */
686 #ifdef INVARIANTS
687     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
688     atomic_clear_int(&msg->ms_flags, MSGF_INTRANSIT);
689 #endif
690     _lwkt_pushmsg(port, msg);
691     if (port->mp_flags & MSGPORTF_WAITING)
692 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
693 }
694 
695 static
696 int
697 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
698 {
699     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
700 
701     msg->ms_target_port = port;
702     if (port->mpu_td->td_gd == mycpu) {
703 	crit_enter();
704 	_lwkt_pushmsg(port, msg);
705 	if (port->mp_flags & MSGPORTF_WAITING)
706 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
707 	crit_exit();
708     } else {
709 #ifdef INVARIANTS
710 	/*
711 	 * Cleanup.
712 	 *
713 	 * An atomic op is needed on ms_flags vs originator.  Also
714 	 * note that the originator might be using a different type
715 	 * of msgport.
716 	 */
717 	atomic_set_int(&msg->ms_flags, MSGF_INTRANSIT);
718 #endif
719 	lwkt_send_ipiq(port->mpu_td->td_gd,
720 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
721     }
722     return (EASYNC);
723 }
724 
725 /*
726  * lwkt_thread_getport()
727  *
728  *	Retrieve the next message from the port or NULL if no messages
729  *	are ready.
730  */
731 static
732 void *
733 lwkt_thread_getport(lwkt_port_t port)
734 {
735     lwkt_msg_t msg;
736 
737     KKASSERT(port->mpu_td == curthread);
738 
739     crit_enter_quick(port->mpu_td);
740     if ((msg = _lwkt_pollmsg(port)) != NULL)
741 	_lwkt_pullmsg(port, msg);
742     crit_exit_quick(port->mpu_td);
743     return(msg);
744 }
745 
746 /*
747  * lwkt_thread_waitmsg()
748  *
749  *	Wait for a particular message to be replied.  We must be the only
750  *	thread waiting on the message.  The port must be owned by the
751  *	caller.
752  */
753 static
754 int
755 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
756 {
757     thread_t td = curthread;
758 
759     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
760 	    ("can't wait dropable message"));
761 
762     if ((msg->ms_flags & MSGF_DONE) == 0) {
763 	/*
764 	 * If the done bit was not set we have to block until it is.
765 	 */
766 	lwkt_port_t port = msg->ms_reply_port;
767 	int sentabort;
768 
769 	KKASSERT(port->mpu_td == td);
770 	crit_enter_quick(td);
771 	sentabort = 0;
772 
773 	while ((msg->ms_flags & MSGF_DONE) == 0) {
774 	    port->mp_flags |= MSGPORTF_WAITING;	/* same cpu */
775 	    if (sentabort == 0) {
776 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
777 		    lwkt_abortmsg(msg);
778 		}
779 	    } else {
780 		lwkt_sleep("waitabt", 0);
781 	    }
782 	    port->mp_flags &= ~MSGPORTF_WAITING;
783 	}
784 	if (msg->ms_flags & MSGF_QUEUED)
785 	    _lwkt_pullmsg(port, msg);
786 	crit_exit_quick(td);
787     } else {
788 	/*
789 	 * If the done bit was set we only have to mess around with the
790 	 * message if it is queued on the reply port.
791 	 */
792 	crit_enter_quick(td);
793 	if (msg->ms_flags & MSGF_QUEUED) {
794 	    lwkt_port_t port = msg->ms_reply_port;
795 	    thread_t td __debugvar = curthread;
796 
797 	    KKASSERT(port->mpu_td == td);
798 	    _lwkt_pullmsg(port, msg);
799 	}
800 	crit_exit_quick(td);
801     }
802     return(msg->ms_error);
803 }
804 
805 /*
806  * lwkt_thread_waitport()
807  *
808  *	Wait for a new message to be available on the port.  We must be the
809  *	the only thread waiting on the port.  The port must be owned by caller.
810  */
811 static
812 void *
813 lwkt_thread_waitport(lwkt_port_t port, int flags)
814 {
815     thread_t td = curthread;
816     lwkt_msg_t msg;
817     int error;
818 
819     KKASSERT(port->mpu_td == td);
820     crit_enter_quick(td);
821     while ((msg = _lwkt_pollmsg(port)) == NULL) {
822 	port->mp_flags |= MSGPORTF_WAITING;
823 	error = lwkt_sleep("waitport", flags);
824 	port->mp_flags &= ~MSGPORTF_WAITING;
825 	if (error)
826 	    goto done;
827     }
828     _lwkt_pullmsg(port, msg);
829 done:
830     crit_exit_quick(td);
831     return(msg);
832 }
833 
834 /************************************************************************
835  *			   SPIN PORT BACKEND				*
836  ************************************************************************
837  *
838  * This backend uses spinlocks instead of making assumptions about which
839  * thread is accessing the port.  It must be used when a port is not owned
840  * by a particular thread.  This is less optimal then thread ports but
841  * you don't have a choice if there are multiple threads accessing the port.
842  *
843  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
844  * on the message port, it is the responsibility of the code doing the
845  * wakeup to clear this flag rather then the blocked threads.  Some
846  * superfluous wakeups may occur, which is ok.
847  *
848  * XXX synchronous message wakeups are not current optimized.
849  */
850 
851 static
852 void *
853 lwkt_spin_getport(lwkt_port_t port)
854 {
855     lwkt_msg_t msg;
856 
857     spin_lock(&port->mpu_spin);
858     if ((msg = _lwkt_pollmsg(port)) != NULL)
859 	_lwkt_pullmsg(port, msg);
860     spin_unlock(&port->mpu_spin);
861     return(msg);
862 }
863 
864 static __inline int
865 lwkt_spin_putport_only(lwkt_port_t port, lwkt_msg_t msg)
866 {
867     int dowakeup;
868 
869     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
870 
871     msg->ms_target_port = port;
872     spin_lock(&port->mpu_spin);
873     _lwkt_pushmsg(port, msg);
874     dowakeup = 0;
875     if (port->mp_flags & MSGPORTF_WAITING) {
876 	port->mp_flags &= ~MSGPORTF_WAITING;
877 	dowakeup = 1;
878     }
879     spin_unlock(&port->mpu_spin);
880 
881     return dowakeup;
882 }
883 
884 static
885 int
886 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
887 {
888     if (lwkt_spin_putport_only(port, msg))
889 	wakeup(port);
890     return (EASYNC);
891 }
892 
893 static
894 int
895 lwkt_spin_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
896 {
897     KASSERT(port->mp_cpuid == mycpuid,
898         ("cpu mismatch, can't do oncpu putport; port cpu%d, curcpu cpu%d",
899 	 port->mp_cpuid, mycpuid));
900     if (lwkt_spin_putport_only(port, msg))
901 	wakeup_mycpu(port);
902     return (EASYNC);
903 }
904 
905 static
906 int
907 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
908 {
909     lwkt_port_t port;
910     int sentabort;
911     int error;
912 
913     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
914 	    ("can't wait dropable message"));
915     port = msg->ms_reply_port;
916 
917     if ((msg->ms_flags & MSGF_DONE) == 0) {
918 	sentabort = 0;
919 	spin_lock(&port->mpu_spin);
920 	while ((msg->ms_flags & MSGF_DONE) == 0) {
921 	    void *won;
922 
923 	    /*
924 	     * If message was sent synchronously from the beginning
925 	     * the wakeup will be on the message structure, else it
926 	     * will be on the port structure.
927 	     *
928 	     * ms_flags needs atomic op originator vs target MSGF_QUEUED
929 	     */
930 	    if (msg->ms_flags & MSGF_SYNC) {
931 		won = msg;
932 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
933 	    } else {
934 		won = port;
935 		port->mp_flags |= MSGPORTF_WAITING;
936 	    }
937 
938 	    /*
939 	     * Only messages which support abort can be interrupted.
940 	     * We must still wait for message completion regardless.
941 	     */
942 	    if ((flags & PCATCH) && sentabort == 0) {
943 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
944 		if (error) {
945 		    sentabort = error;
946 		    spin_unlock(&port->mpu_spin);
947 		    lwkt_abortmsg(msg);
948 		    spin_lock(&port->mpu_spin);
949 		}
950 	    } else {
951 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
952 	    }
953 	    /* see note at the top on the MSGPORTF_WAITING flag */
954 	}
955 	/*
956 	 * Turn EINTR into ERESTART if the signal indicates.
957 	 */
958 	if (sentabort && msg->ms_error == EINTR)
959 	    msg->ms_error = sentabort;
960 	if (msg->ms_flags & MSGF_QUEUED)
961 	    _lwkt_pullmsg(port, msg);
962 	spin_unlock(&port->mpu_spin);
963     } else {
964 	spin_lock(&port->mpu_spin);
965 	if (msg->ms_flags & MSGF_QUEUED) {
966 	    _lwkt_pullmsg(port, msg);
967 	}
968 	spin_unlock(&port->mpu_spin);
969     }
970     return(msg->ms_error);
971 }
972 
973 static
974 void *
975 lwkt_spin_waitport(lwkt_port_t port, int flags)
976 {
977     lwkt_msg_t msg;
978     int error;
979 
980     spin_lock(&port->mpu_spin);
981     while ((msg = _lwkt_pollmsg(port)) == NULL) {
982 	port->mp_flags |= MSGPORTF_WAITING;
983 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
984 	/* see note at the top on the MSGPORTF_WAITING flag */
985 	if (error) {
986 	    spin_unlock(&port->mpu_spin);
987 	    return(NULL);
988 	}
989     }
990     _lwkt_pullmsg(port, msg);
991     spin_unlock(&port->mpu_spin);
992     return(msg);
993 }
994 
995 static
996 void
997 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
998 {
999     int dowakeup;
1000 
1001     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1002 
1003     if (msg->ms_flags & MSGF_SYNC) {
1004 	/*
1005 	 * If a synchronous completion has been requested, just wakeup
1006 	 * the message without bothering to queue it to the target port.
1007 	 *
1008 	 * ms_flags protected by reply port spinlock
1009 	 */
1010 	spin_lock(&port->mpu_spin);
1011 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1012 	dowakeup = 0;
1013 	if (msg->ms_flags & MSGF_WAITING) {
1014 		msg->ms_flags &= ~MSGF_WAITING;
1015 		dowakeup = 1;
1016 	}
1017 	spin_unlock(&port->mpu_spin);
1018 	if (dowakeup)
1019 		wakeup(msg);
1020     } else {
1021 	/*
1022 	 * If an asynchronous completion has been requested the message
1023 	 * must be queued to the reply port.
1024 	 */
1025 	spin_lock(&port->mpu_spin);
1026 	_lwkt_enqueue_reply(port, msg);
1027 	dowakeup = 0;
1028 	if (port->mp_flags & MSGPORTF_WAITING) {
1029 	    port->mp_flags &= ~MSGPORTF_WAITING;
1030 	    dowakeup = 1;
1031 	}
1032 	spin_unlock(&port->mpu_spin);
1033 	if (dowakeup)
1034 	    wakeup(port);
1035     }
1036 }
1037 
1038 /*
1039  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
1040  *
1041  * This function could _only_ be used when caller is in the same thread
1042  * as the message's target port owner thread.
1043  */
1044 static int
1045 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1046 {
1047     int error;
1048 
1049     KASSERT(port->mpu_td == curthread,
1050     	    ("message could only be dropped in the same thread "
1051 	     "as the message target port thread\n"));
1052     spin_lock(&port->mpu_spin);
1053     if ((msg->ms_flags & (MSGF_REPLY|MSGF_QUEUED)) == MSGF_QUEUED) {
1054 	    _lwkt_pullmsg(port, msg);
1055 	    msg->ms_flags |= MSGF_DONE;
1056 	    error = 0;
1057     } else {
1058 	    error = ENOENT;
1059     }
1060     spin_unlock(&port->mpu_spin);
1061 
1062     return (error);
1063 }
1064 
1065 /************************************************************************
1066  *			  SERIALIZER PORT BACKEND			*
1067  ************************************************************************
1068  *
1069  * This backend uses serializer to protect port accessing.  Callers are
1070  * assumed to have serializer held.  This kind of port is usually created
1071  * by network device driver along with _one_ lwkt thread to pipeline
1072  * operations which may temporarily release serializer.
1073  *
1074  * Implementation is based on SPIN PORT BACKEND.
1075  */
1076 
1077 static
1078 void *
1079 lwkt_serialize_getport(lwkt_port_t port)
1080 {
1081     lwkt_msg_t msg;
1082 
1083     ASSERT_SERIALIZED(port->mpu_serialize);
1084 
1085     if ((msg = _lwkt_pollmsg(port)) != NULL)
1086 	_lwkt_pullmsg(port, msg);
1087     return(msg);
1088 }
1089 
1090 static
1091 int
1092 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
1093 {
1094     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
1095     ASSERT_SERIALIZED(port->mpu_serialize);
1096 
1097     msg->ms_target_port = port;
1098     _lwkt_pushmsg(port, msg);
1099     if (port->mp_flags & MSGPORTF_WAITING) {
1100 	port->mp_flags &= ~MSGPORTF_WAITING;
1101 	wakeup(port);
1102     }
1103     return (EASYNC);
1104 }
1105 
1106 static
1107 int
1108 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
1109 {
1110     lwkt_port_t port;
1111     int sentabort;
1112     int error;
1113 
1114     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1115 	    ("can't wait dropable message"));
1116 
1117     if ((msg->ms_flags & MSGF_DONE) == 0) {
1118 	port = msg->ms_reply_port;
1119 
1120 	ASSERT_SERIALIZED(port->mpu_serialize);
1121 
1122 	sentabort = 0;
1123 	while ((msg->ms_flags & MSGF_DONE) == 0) {
1124 	    void *won;
1125 
1126 	    /*
1127 	     * If message was sent synchronously from the beginning
1128 	     * the wakeup will be on the message structure, else it
1129 	     * will be on the port structure.
1130 	     */
1131 	    if (msg->ms_flags & MSGF_SYNC) {
1132 		won = msg;
1133 	    } else {
1134 		won = port;
1135 		port->mp_flags |= MSGPORTF_WAITING;
1136 	    }
1137 
1138 	    /*
1139 	     * Only messages which support abort can be interrupted.
1140 	     * We must still wait for message completion regardless.
1141 	     */
1142 	    if ((flags & PCATCH) && sentabort == 0) {
1143 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1144 		if (error) {
1145 		    sentabort = error;
1146 		    lwkt_serialize_exit(port->mpu_serialize);
1147 		    lwkt_abortmsg(msg);
1148 		    lwkt_serialize_enter(port->mpu_serialize);
1149 		}
1150 	    } else {
1151 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1152 	    }
1153 	    /* see note at the top on the MSGPORTF_WAITING flag */
1154 	}
1155 	/*
1156 	 * Turn EINTR into ERESTART if the signal indicates.
1157 	 */
1158 	if (sentabort && msg->ms_error == EINTR)
1159 	    msg->ms_error = sentabort;
1160 	if (msg->ms_flags & MSGF_QUEUED)
1161 	    _lwkt_pullmsg(port, msg);
1162     } else {
1163 	if (msg->ms_flags & MSGF_QUEUED) {
1164 	    port = msg->ms_reply_port;
1165 
1166 	    ASSERT_SERIALIZED(port->mpu_serialize);
1167 	    _lwkt_pullmsg(port, msg);
1168 	}
1169     }
1170     return(msg->ms_error);
1171 }
1172 
1173 static
1174 void *
1175 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1176 {
1177     lwkt_msg_t msg;
1178     int error;
1179 
1180     ASSERT_SERIALIZED(port->mpu_serialize);
1181 
1182     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1183 	port->mp_flags |= MSGPORTF_WAITING;
1184 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1185 	/* see note at the top on the MSGPORTF_WAITING flag */
1186 	if (error)
1187 	    return(NULL);
1188     }
1189     _lwkt_pullmsg(port, msg);
1190     return(msg);
1191 }
1192 
1193 static
1194 void
1195 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1196 {
1197     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1198     ASSERT_SERIALIZED(port->mpu_serialize);
1199 
1200     if (msg->ms_flags & MSGF_SYNC) {
1201 	/*
1202 	 * If a synchronous completion has been requested, just wakeup
1203 	 * the message without bothering to queue it to the target port.
1204 	 *
1205 	 * (both sides synchronized via serialized reply port)
1206 	 */
1207 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1208 	wakeup(msg);
1209     } else {
1210 	/*
1211 	 * If an asynchronous completion has been requested the message
1212 	 * must be queued to the reply port.
1213 	 */
1214 	_lwkt_enqueue_reply(port, msg);
1215 	if (port->mp_flags & MSGPORTF_WAITING) {
1216 	    port->mp_flags &= ~MSGPORTF_WAITING;
1217 	    wakeup(port);
1218 	}
1219     }
1220 }
1221 
1222 /************************************************************************
1223  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1224  ************************************************************************/
1225 
1226 /*
1227  * You can point a port's reply vector at this function if you just want
1228  * the message marked done, without any queueing or signaling.  This is
1229  * often used for structure-embedded messages.
1230  */
1231 static
1232 void
1233 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1234 {
1235     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1236 }
1237 
1238 static
1239 void *
1240 lwkt_panic_getport(lwkt_port_t port)
1241 {
1242     panic("lwkt_getport() illegal on port %p", port);
1243 }
1244 
1245 static
1246 int
1247 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1248 {
1249     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1250 }
1251 
1252 static
1253 int
1254 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1255 {
1256     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1257 }
1258 
1259 static
1260 void *
1261 lwkt_panic_waitport(lwkt_port_t port, int flags)
1262 {
1263     panic("port %p cannot be waited on", port);
1264 }
1265 
1266 static
1267 void
1268 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1269 {
1270     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1271 }
1272 
1273 static
1274 int
1275 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1276 {
1277     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1278     /* NOT REACHED */
1279     return (ENOENT);
1280 }
1281 
1282 static
1283 int
1284 lwkt_panic_putport_oncpu(lwkt_port_t port, lwkt_msg_t msg)
1285 {
1286     panic("lwkt_begin_oncpu/sendmsg_oncpu() illegal on port %p msg %p",
1287         port, msg);
1288     /* NOT REACHED */
1289     return (ENOENT);
1290 }
1291