xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 77b0c609)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  */
37 
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/proc.h>
42 #include <sys/rtprio.h>
43 #include <sys/queue.h>
44 #include <sys/sysctl.h>
45 #include <sys/kthread.h>
46 #include <sys/signalvar.h>
47 #include <sys/signal2.h>
48 #include <machine/cpu.h>
49 #include <sys/lock.h>
50 
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <vm/vm_kern.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_page.h>
56 #include <vm/vm_map.h>
57 #include <vm/vm_pager.h>
58 #include <vm/vm_extern.h>
59 #include <vm/vm_zone.h>
60 
61 #include <sys/thread2.h>
62 #include <sys/msgport2.h>
63 #include <sys/spinlock2.h>
64 #include <sys/serialize.h>
65 
66 #include <machine/stdarg.h>
67 #include <machine/cpufunc.h>
68 #ifdef SMP
69 #include <machine/smp.h>
70 #endif
71 
72 #include <sys/malloc.h>
73 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
74 
75 /************************************************************************
76  *				MESSAGE FUNCTIONS			*
77  ************************************************************************/
78 
79 /*
80  * lwkt_sendmsg()
81  *
82  *	Request asynchronous completion and call lwkt_beginmsg().  The
83  *	target port can opt to execute the message synchronously or
84  *	asynchronously and this function will automatically queue the
85  *	response if the target executes the message synchronously.
86  *
87  *	NOTE: The message is in an indeterminant state until this call
88  *	returns.  The caller should not mess with it (e.g. try to abort it)
89  *	until then.
90  */
91 void
92 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
93 {
94     int error;
95 
96     KKASSERT(msg->ms_reply_port != NULL &&
97 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
98     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
99     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
100 	/*
101 	 * Target port opted to execute the message synchronously so
102 	 * queue the response.
103 	 */
104 	lwkt_replymsg(msg, error);
105     }
106 }
107 
108 /*
109  * lwkt_domsg()
110  *
111  *	Request synchronous completion and call lwkt_beginmsg().  The
112  *	target port can opt to execute the message synchronously or
113  *	asynchronously and this function will automatically block and
114  *	wait for a response if the target executes the message
115  *	asynchronously.
116  */
117 int
118 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
119 {
120     int error;
121 
122     KKASSERT(msg->ms_reply_port != NULL &&
123 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
124     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
125     msg->ms_flags |= MSGF_SYNC;
126     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
127 	/*
128 	 * Target port opted to execute the message asynchronously so
129 	 * block and wait for a reply.
130 	 */
131 	error = lwkt_waitmsg(msg, flags);
132     } else {
133 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
134     }
135     return(error);
136 }
137 
138 /*
139  * lwkt_forwardmsg()
140  *
141  * Forward a message received on one port to another port.
142  */
143 int
144 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
145 {
146     int error;
147 
148     crit_enter();
149     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
150     if ((error = port->mp_putport(port, msg)) != EASYNC)
151 	lwkt_replymsg(msg, error);
152     crit_exit();
153     return(error);
154 }
155 
156 /*
157  * lwkt_abortmsg()
158  *
159  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
160  * The caller must ensure that the message will not be both replied AND
161  * destroyed while the abort is in progress.
162  *
163  * This function issues a callback which might block!
164  */
165 void
166 lwkt_abortmsg(lwkt_msg_t msg)
167 {
168     /*
169      * A critical section protects us from reply IPIs on this cpu.
170      */
171     crit_enter();
172 
173     /*
174      * Shortcut the operation if the message has already been returned.
175      * The callback typically constructs a lwkt_msg with the abort request,
176      * issues it synchronously, and waits for completion.  The callback
177      * is not required to actually abort the message and the target port,
178      * upon receiving an abort request message generated by the callback
179      * should check whether the original message has already completed or
180      * not.
181      */
182     if (msg->ms_flags & MSGF_ABORTABLE) {
183 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
184 	    msg->ms_abortfn(msg);
185     }
186     crit_exit();
187 }
188 
189 /************************************************************************
190  *			PORT INITIALIZATION API				*
191  ************************************************************************/
192 
193 static void *lwkt_thread_getport(lwkt_port_t port);
194 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
197 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
199 
200 static void *lwkt_spin_getport(lwkt_port_t port);
201 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
202 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
203 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
204 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
205 static void lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
206 
207 static void *lwkt_serialize_getport(lwkt_port_t port);
208 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
209 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
210 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
211 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
212 
213 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
214 static void *lwkt_panic_getport(lwkt_port_t port);
215 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
216 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
217 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
218 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
219 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
220 
221 /*
222  * Core port initialization (internal)
223  */
224 static __inline
225 void
226 _lwkt_initport(lwkt_port_t port,
227 	       void *(*gportfn)(lwkt_port_t),
228 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
229 	       int (*wmsgfn)(lwkt_msg_t, int),
230 	       void *(*wportfn)(lwkt_port_t, int),
231 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
232 	       void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
233 {
234     bzero(port, sizeof(*port));
235     TAILQ_INIT(&port->mp_msgq);
236     TAILQ_INIT(&port->mp_msgq_prio);
237     port->mp_getport = gportfn;
238     port->mp_putport = pportfn;
239     port->mp_waitmsg =  wmsgfn;
240     port->mp_waitport =  wportfn;
241     port->mp_replyport = rportfn;
242     port->mp_dropmsg = dmsgfn;
243 }
244 
245 /*
246  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
247  * we tell the scheduler not to reschedule if td is at a higher priority.
248  *
249  * This routine is called even if the thread is already scheduled.
250  */
251 static __inline
252 void
253 _lwkt_schedule_msg(thread_t td, int flags)
254 {
255     lwkt_schedule(td);
256 }
257 
258 /*
259  * lwkt_initport_thread()
260  *
261  *	Initialize a port for use by a particular thread.  The port may
262  *	only be used by <td>.
263  */
264 void
265 lwkt_initport_thread(lwkt_port_t port, thread_t td)
266 {
267     _lwkt_initport(port,
268 		   lwkt_thread_getport,
269 		   lwkt_thread_putport,
270 		   lwkt_thread_waitmsg,
271 		   lwkt_thread_waitport,
272 		   lwkt_thread_replyport,
273 		   lwkt_thread_dropmsg);
274     port->mpu_td = td;
275 }
276 
277 /*
278  * lwkt_initport_spin()
279  *
280  *	Initialize a port for use with descriptors that might be accessed
281  *	via multiple LWPs, processes, or threads.  Has somewhat more
282  *	overhead then thread ports.
283  */
284 void
285 lwkt_initport_spin(lwkt_port_t port, thread_t td)
286 {
287     void (*dmsgfn)(lwkt_port_t, lwkt_msg_t);
288 
289     if (td == NULL)
290 	dmsgfn = lwkt_panic_dropmsg;
291     else
292 	dmsgfn = lwkt_spin_dropmsg;
293 
294     _lwkt_initport(port,
295 		   lwkt_spin_getport,
296 		   lwkt_spin_putport,
297 		   lwkt_spin_waitmsg,
298 		   lwkt_spin_waitport,
299 		   lwkt_spin_replyport,
300 		   dmsgfn);
301     spin_init(&port->mpu_spin);
302     port->mpu_td = td;
303 }
304 
305 /*
306  * lwkt_initport_serialize()
307  *
308  *	Initialize a port for use with descriptors that might be accessed
309  *	via multiple LWPs, processes, or threads.  Callers are assumed to
310  *	have held the serializer (slz).
311  */
312 void
313 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
314 {
315     _lwkt_initport(port,
316 		   lwkt_serialize_getport,
317 		   lwkt_serialize_putport,
318 		   lwkt_serialize_waitmsg,
319 		   lwkt_serialize_waitport,
320 		   lwkt_serialize_replyport,
321 		   lwkt_panic_dropmsg);
322     port->mpu_serialize = slz;
323 }
324 
325 /*
326  * Similar to the standard initport, this function simply marks the message
327  * as being done and does not attempt to return it to an originating port.
328  */
329 void
330 lwkt_initport_replyonly_null(lwkt_port_t port)
331 {
332     _lwkt_initport(port,
333 		   lwkt_panic_getport,
334 		   lwkt_panic_putport,
335 		   lwkt_panic_waitmsg,
336 		   lwkt_panic_waitport,
337 		   lwkt_null_replyport,
338 		   lwkt_panic_dropmsg);
339 }
340 
341 /*
342  * Initialize a reply-only port, typically used as a message sink.  Such
343  * ports can only be used as a reply port.
344  */
345 void
346 lwkt_initport_replyonly(lwkt_port_t port,
347 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
348 {
349     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
350 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
351 			 rportfn, lwkt_panic_dropmsg);
352 }
353 
354 void
355 lwkt_initport_putonly(lwkt_port_t port,
356 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
357 {
358     _lwkt_initport(port, lwkt_panic_getport, pportfn,
359 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
360 			 lwkt_panic_replyport, lwkt_panic_dropmsg);
361 }
362 
363 void
364 lwkt_initport_panic(lwkt_port_t port)
365 {
366     _lwkt_initport(port,
367 		   lwkt_panic_getport, lwkt_panic_putport,
368 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
369 		   lwkt_panic_replyport, lwkt_panic_dropmsg);
370 }
371 
372 static __inline
373 void
374 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
375 {
376     lwkt_msg_queue *queue;
377 
378     /*
379      * normal case, remove and return the message.
380      */
381     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
382 	queue = &port->mp_msgq_prio;
383     else
384 	queue = &port->mp_msgq;
385     TAILQ_REMOVE(queue, msg, ms_node);
386 
387     /*
388      * atomic op needed for spin ports
389      */
390     atomic_clear_int(&msg->ms_flags, MSGF_QUEUED);
391 }
392 
393 static __inline
394 void
395 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
396 {
397     lwkt_msg_queue *queue;
398 
399     /*
400      * atomic op needed for spin ports
401      */
402     atomic_set_int(&msg->ms_flags, MSGF_QUEUED);
403     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
404 	queue = &port->mp_msgq_prio;
405     else
406     	queue = &port->mp_msgq;
407     TAILQ_INSERT_TAIL(queue, msg, ms_node);
408 }
409 
410 static __inline
411 lwkt_msg_t
412 _lwkt_pollmsg(lwkt_port_t port)
413 {
414     lwkt_msg_t msg;
415 
416     msg = TAILQ_FIRST(&port->mp_msgq_prio);
417     if (__predict_false(msg != NULL))
418 	return msg;
419 
420     /*
421      * Priority queue has no message, fallback to non-priority queue.
422      */
423     return TAILQ_FIRST(&port->mp_msgq);
424 }
425 
426 static __inline
427 void
428 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
429 {
430     /*
431      * atomic op needed for spin ports
432      */
433     _lwkt_pushmsg(port, msg);
434     atomic_set_int(&msg->ms_flags, MSGF_REPLY | MSGF_DONE);
435 }
436 
437 /************************************************************************
438  *			THREAD PORT BACKEND				*
439  ************************************************************************
440  *
441  * This backend is used when the port a message is retrieved from is owned
442  * by a single thread (the calling thread).  Messages are IPId to the
443  * correct cpu before being enqueued to a port.  Note that this is fairly
444  * optimal since scheduling would have had to do an IPI anyway if the
445  * message were headed to a different cpu.
446  */
447 
448 #ifdef SMP
449 
450 /*
451  * This function completes reply processing for the default case in the
452  * context of the originating cpu.
453  */
454 static
455 void
456 lwkt_thread_replyport_remote(lwkt_msg_t msg)
457 {
458     lwkt_port_t port = msg->ms_reply_port;
459     int flags;
460 
461     /*
462      * Chase any thread migration that occurs
463      */
464     if (port->mpu_td->td_gd != mycpu) {
465 	lwkt_send_ipiq(port->mpu_td->td_gd,
466 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
467 	return;
468     }
469 
470     /*
471      * Cleanup
472      */
473 #ifdef INVARIANTS
474     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
475     msg->ms_flags &= ~MSGF_INTRANSIT;
476 #endif
477     flags = msg->ms_flags;
478     if (msg->ms_flags & MSGF_SYNC) {
479 	cpu_sfence();
480 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
481     } else {
482 	_lwkt_enqueue_reply(port, msg);
483     }
484     if (port->mp_flags & MSGPORTF_WAITING)
485 	_lwkt_schedule_msg(port->mpu_td, flags);
486 }
487 
488 #endif
489 
490 /*
491  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
492  *
493  * Called with the reply port as an argument but in the context of the
494  * original target port.  Completion must occur on the target port's
495  * cpu.
496  *
497  * The critical section protects us from IPIs on the this CPU.
498  */
499 static
500 void
501 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
502 {
503     int flags;
504 
505     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
506 
507     if (msg->ms_flags & MSGF_SYNC) {
508 	/*
509 	 * If a synchronous completion has been requested, just wakeup
510 	 * the message without bothering to queue it to the target port.
511 	 *
512 	 * Assume the target thread is non-preemptive, so no critical
513 	 * section is required.
514 	 */
515 #ifdef SMP
516 	if (port->mpu_td->td_gd == mycpu) {
517 #endif
518 	    flags = msg->ms_flags;
519 	    cpu_sfence();
520 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
521 	    if (port->mp_flags & MSGPORTF_WAITING)
522 		_lwkt_schedule_msg(port->mpu_td, flags);
523 #ifdef SMP
524 	} else {
525 #ifdef INVARIANTS
526 	    msg->ms_flags |= MSGF_INTRANSIT;
527 #endif
528 	    msg->ms_flags |= MSGF_REPLY;
529 	    lwkt_send_ipiq(port->mpu_td->td_gd,
530 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
531 	}
532 #endif
533     } else {
534 	/*
535 	 * If an asynchronous completion has been requested the message
536 	 * must be queued to the reply port.
537 	 *
538 	 * A critical section is required to interlock the port queue.
539 	 */
540 #ifdef SMP
541 	if (port->mpu_td->td_gd == mycpu) {
542 #endif
543 	    crit_enter();
544 	    _lwkt_enqueue_reply(port, msg);
545 	    if (port->mp_flags & MSGPORTF_WAITING)
546 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
547 	    crit_exit();
548 #ifdef SMP
549 	} else {
550 #ifdef INVARIANTS
551 	    msg->ms_flags |= MSGF_INTRANSIT;
552 #endif
553 	    msg->ms_flags |= MSGF_REPLY;
554 	    lwkt_send_ipiq(port->mpu_td->td_gd,
555 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
556 	}
557 #endif
558     }
559 }
560 
561 /*
562  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
563  *
564  * This function could _only_ be used when caller is in the same thread
565  * as the message's target port owner thread.
566  */
567 static void
568 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
569 {
570     KASSERT(port->mpu_td == curthread,
571     	    ("message could only be dropped in the same thread "
572 	     "as the message target port thread"));
573     crit_enter_quick(port->mpu_td);
574     _lwkt_pullmsg(port, msg);
575     msg->ms_flags |= MSGF_DONE;
576     crit_exit_quick(port->mpu_td);
577 }
578 
579 /*
580  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
581  *
582  * Called with the target port as an argument but in the context of the
583  * reply port.  This function always implements an asynchronous put to
584  * the target message port, and thus returns EASYNC.
585  *
586  * The message must already have cleared MSGF_DONE and MSGF_REPLY
587  */
588 
589 #ifdef SMP
590 
591 static
592 void
593 lwkt_thread_putport_remote(lwkt_msg_t msg)
594 {
595     lwkt_port_t port = msg->ms_target_port;
596 
597     /*
598      * Chase any thread migration that occurs
599      */
600     if (port->mpu_td->td_gd != mycpu) {
601 	lwkt_send_ipiq(port->mpu_td->td_gd,
602 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
603 	return;
604     }
605 
606     /*
607      * Cleanup
608      */
609 #ifdef INVARIANTS
610     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
611     msg->ms_flags &= ~MSGF_INTRANSIT;
612 #endif
613     _lwkt_pushmsg(port, msg);
614     if (port->mp_flags & MSGPORTF_WAITING)
615 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
616 }
617 
618 #endif
619 
620 static
621 int
622 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
623 {
624     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
625 
626     msg->ms_target_port = port;
627 #ifdef SMP
628     if (port->mpu_td->td_gd == mycpu) {
629 #endif
630 	crit_enter();
631 	_lwkt_pushmsg(port, msg);
632 	if (port->mp_flags & MSGPORTF_WAITING)
633 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
634 	crit_exit();
635 #ifdef SMP
636     } else {
637 #ifdef INVARIANTS
638 	msg->ms_flags |= MSGF_INTRANSIT;
639 #endif
640 	lwkt_send_ipiq(port->mpu_td->td_gd,
641 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
642     }
643 #endif
644     return (EASYNC);
645 }
646 
647 /*
648  * lwkt_thread_getport()
649  *
650  *	Retrieve the next message from the port or NULL if no messages
651  *	are ready.
652  */
653 static
654 void *
655 lwkt_thread_getport(lwkt_port_t port)
656 {
657     lwkt_msg_t msg;
658 
659     KKASSERT(port->mpu_td == curthread);
660 
661     crit_enter_quick(port->mpu_td);
662     if ((msg = _lwkt_pollmsg(port)) != NULL)
663 	_lwkt_pullmsg(port, msg);
664     crit_exit_quick(port->mpu_td);
665     return(msg);
666 }
667 
668 /*
669  * lwkt_thread_waitmsg()
670  *
671  *	Wait for a particular message to be replied.  We must be the only
672  *	thread waiting on the message.  The port must be owned by the
673  *	caller.
674  */
675 static
676 int
677 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
678 {
679     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
680 	    ("can't wait dropable message"));
681 
682     if ((msg->ms_flags & MSGF_DONE) == 0) {
683 	/*
684 	 * If the done bit was not set we have to block until it is.
685 	 */
686 	lwkt_port_t port = msg->ms_reply_port;
687 	thread_t td = curthread;
688 	int sentabort;
689 
690 	KKASSERT(port->mpu_td == td);
691 	crit_enter_quick(td);
692 	sentabort = 0;
693 
694 	while ((msg->ms_flags & MSGF_DONE) == 0) {
695 	    port->mp_flags |= MSGPORTF_WAITING;
696 	    if (sentabort == 0) {
697 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
698 		    lwkt_abortmsg(msg);
699 		}
700 	    } else {
701 		lwkt_sleep("waitabt", 0);
702 	    }
703 	    port->mp_flags &= ~MSGPORTF_WAITING;
704 	}
705 	if (msg->ms_flags & MSGF_QUEUED)
706 	    _lwkt_pullmsg(port, msg);
707 	crit_exit_quick(td);
708     } else {
709 	/*
710 	 * If the done bit was set we only have to mess around with the
711 	 * message if it is queued on the reply port.
712 	 */
713 	if (msg->ms_flags & MSGF_QUEUED) {
714 	    lwkt_port_t port = msg->ms_reply_port;
715 	    thread_t td = curthread;
716 
717 	    KKASSERT(port->mpu_td == td);
718 	    crit_enter_quick(td);
719 	    _lwkt_pullmsg(port, msg);
720 	    crit_exit_quick(td);
721 	}
722     }
723     return(msg->ms_error);
724 }
725 
726 /*
727  * lwkt_thread_waitport()
728  *
729  *	Wait for a new message to be available on the port.  We must be the
730  *	the only thread waiting on the port.  The port must be owned by caller.
731  */
732 static
733 void *
734 lwkt_thread_waitport(lwkt_port_t port, int flags)
735 {
736     thread_t td = curthread;
737     lwkt_msg_t msg;
738     int error;
739 
740     KKASSERT(port->mpu_td == td);
741     crit_enter_quick(td);
742     while ((msg = _lwkt_pollmsg(port)) == NULL) {
743 	port->mp_flags |= MSGPORTF_WAITING;
744 	error = lwkt_sleep("waitport", flags);
745 	port->mp_flags &= ~MSGPORTF_WAITING;
746 	if (error)
747 	    goto done;
748     }
749     _lwkt_pullmsg(port, msg);
750 done:
751     crit_exit_quick(td);
752     return(msg);
753 }
754 
755 /************************************************************************
756  *			   SPIN PORT BACKEND				*
757  ************************************************************************
758  *
759  * This backend uses spinlocks instead of making assumptions about which
760  * thread is accessing the port.  It must be used when a port is not owned
761  * by a particular thread.  This is less optimal then thread ports but
762  * you don't have a choice if there are multiple threads accessing the port.
763  *
764  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
765  * on the message port, it is the responsibility of the code doing the
766  * wakeup to clear this flag rather then the blocked threads.  Some
767  * superfluous wakeups may occur, which is ok.
768  *
769  * XXX synchronous message wakeups are not current optimized.
770  */
771 
772 static
773 void *
774 lwkt_spin_getport(lwkt_port_t port)
775 {
776     lwkt_msg_t msg;
777 
778     spin_lock(&port->mpu_spin);
779     if ((msg = _lwkt_pollmsg(port)) != NULL)
780 	_lwkt_pullmsg(port, msg);
781     spin_unlock(&port->mpu_spin);
782     return(msg);
783 }
784 
785 static
786 int
787 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
788 {
789     int dowakeup;
790 
791     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
792 
793     msg->ms_target_port = port;
794     spin_lock(&port->mpu_spin);
795     _lwkt_pushmsg(port, msg);
796     dowakeup = 0;
797     if (port->mp_flags & MSGPORTF_WAITING) {
798 	port->mp_flags &= ~MSGPORTF_WAITING;
799 	dowakeup = 1;
800     }
801     spin_unlock(&port->mpu_spin);
802     if (dowakeup)
803 	wakeup(port);
804     return (EASYNC);
805 }
806 
807 static
808 int
809 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
810 {
811     lwkt_port_t port;
812     int sentabort;
813     int error;
814 
815     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
816 	    ("can't wait dropable message"));
817 
818     if ((msg->ms_flags & MSGF_DONE) == 0) {
819 	port = msg->ms_reply_port;
820 	sentabort = 0;
821 	spin_lock(&port->mpu_spin);
822 	while ((msg->ms_flags & MSGF_DONE) == 0) {
823 	    void *won;
824 
825 	    /*
826 	     * If message was sent synchronously from the beginning
827 	     * the wakeup will be on the message structure, else it
828 	     * will be on the port structure.
829 	     */
830 	    if (msg->ms_flags & MSGF_SYNC) {
831 		won = msg;
832 		atomic_set_int(&msg->ms_flags, MSGF_WAITING);
833 	    } else {
834 		won = port;
835 		port->mp_flags |= MSGPORTF_WAITING;
836 	    }
837 
838 	    /*
839 	     * Only messages which support abort can be interrupted.
840 	     * We must still wait for message completion regardless.
841 	     */
842 	    if ((flags & PCATCH) && sentabort == 0) {
843 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
844 		if (error) {
845 		    sentabort = error;
846 		    spin_unlock(&port->mpu_spin);
847 		    lwkt_abortmsg(msg);
848 		    spin_lock(&port->mpu_spin);
849 		}
850 	    } else {
851 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
852 	    }
853 	    /* see note at the top on the MSGPORTF_WAITING flag */
854 	}
855 	/*
856 	 * Turn EINTR into ERESTART if the signal indicates.
857 	 */
858 	if (sentabort && msg->ms_error == EINTR)
859 	    msg->ms_error = sentabort;
860 	if (msg->ms_flags & MSGF_QUEUED)
861 	    _lwkt_pullmsg(port, msg);
862 	spin_unlock(&port->mpu_spin);
863     } else {
864 	if (msg->ms_flags & MSGF_QUEUED) {
865 	    port = msg->ms_reply_port;
866 	    spin_lock(&port->mpu_spin);
867 	    _lwkt_pullmsg(port, msg);
868 	    spin_unlock(&port->mpu_spin);
869 	}
870     }
871     return(msg->ms_error);
872 }
873 
874 static
875 void *
876 lwkt_spin_waitport(lwkt_port_t port, int flags)
877 {
878     lwkt_msg_t msg;
879     int error;
880 
881     spin_lock(&port->mpu_spin);
882     while ((msg = _lwkt_pollmsg(port)) == NULL) {
883 	port->mp_flags |= MSGPORTF_WAITING;
884 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
885 	/* see note at the top on the MSGPORTF_WAITING flag */
886 	if (error) {
887 	    spin_unlock(&port->mpu_spin);
888 	    return(NULL);
889 	}
890     }
891     _lwkt_pullmsg(port, msg);
892     spin_unlock(&port->mpu_spin);
893     return(msg);
894 }
895 
896 static
897 void
898 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
899 {
900     int dowakeup;
901 
902     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
903 
904     if (msg->ms_flags & MSGF_SYNC) {
905 	/*
906 	 * If a synchronous completion has been requested, just wakeup
907 	 * the message without bothering to queue it to the target port.
908 	 */
909 	spin_lock(&port->mpu_spin);
910 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
911 	dowakeup = 0;
912 	if (msg->ms_flags & MSGF_WAITING) {
913 		atomic_clear_int(&msg->ms_flags, MSGF_WAITING);
914 		dowakeup = 1;
915 	}
916 	spin_unlock(&port->mpu_spin);
917 	if (dowakeup)
918 		wakeup(msg);
919     } else {
920 	/*
921 	 * If an asynchronous completion has been requested the message
922 	 * must be queued to the reply port.
923 	 */
924 	spin_lock(&port->mpu_spin);
925 	_lwkt_enqueue_reply(port, msg);
926 	dowakeup = 0;
927 	if (port->mp_flags & MSGPORTF_WAITING) {
928 	    port->mp_flags &= ~MSGPORTF_WAITING;
929 	    dowakeup = 1;
930 	}
931 	spin_unlock(&port->mpu_spin);
932 	if (dowakeup)
933 	    wakeup(port);
934     }
935 }
936 
937 /*
938  * lwkt_spin_dropmsg() - Backend to lwkt_dropmsg()
939  *
940  * This function could _only_ be used when caller is in the same thread
941  * as the message's target port owner thread.
942  */
943 static void
944 lwkt_spin_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
945 {
946     KASSERT(port->mpu_td == curthread,
947     	    ("message could only be dropped in the same thread "
948 	     "as the message target port thread\n"));
949     spin_lock(&port->mpu_spin);
950     _lwkt_pullmsg(port, msg);
951     msg->ms_flags |= MSGF_DONE;
952     spin_unlock(&port->mpu_spin);
953 }
954 
955 /************************************************************************
956  *			  SERIALIZER PORT BACKEND			*
957  ************************************************************************
958  *
959  * This backend uses serializer to protect port accessing.  Callers are
960  * assumed to have serializer held.  This kind of port is usually created
961  * by network device driver along with _one_ lwkt thread to pipeline
962  * operations which may temporarily release serializer.
963  *
964  * Implementation is based on SPIN PORT BACKEND.
965  */
966 
967 static
968 void *
969 lwkt_serialize_getport(lwkt_port_t port)
970 {
971     lwkt_msg_t msg;
972 
973     ASSERT_SERIALIZED(port->mpu_serialize);
974 
975     if ((msg = _lwkt_pollmsg(port)) != NULL)
976 	_lwkt_pullmsg(port, msg);
977     return(msg);
978 }
979 
980 static
981 int
982 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
983 {
984     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
985     ASSERT_SERIALIZED(port->mpu_serialize);
986 
987     msg->ms_target_port = port;
988     _lwkt_pushmsg(port, msg);
989     if (port->mp_flags & MSGPORTF_WAITING) {
990 	port->mp_flags &= ~MSGPORTF_WAITING;
991 	wakeup(port);
992     }
993     return (EASYNC);
994 }
995 
996 static
997 int
998 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
999 {
1000     lwkt_port_t port;
1001     int sentabort;
1002     int error;
1003 
1004     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
1005 	    ("can't wait dropable message"));
1006 
1007     if ((msg->ms_flags & MSGF_DONE) == 0) {
1008 	port = msg->ms_reply_port;
1009 
1010 	ASSERT_SERIALIZED(port->mpu_serialize);
1011 
1012 	sentabort = 0;
1013 	while ((msg->ms_flags & MSGF_DONE) == 0) {
1014 	    void *won;
1015 
1016 	    /*
1017 	     * If message was sent synchronously from the beginning
1018 	     * the wakeup will be on the message structure, else it
1019 	     * will be on the port structure.
1020 	     */
1021 	    if (msg->ms_flags & MSGF_SYNC) {
1022 		won = msg;
1023 	    } else {
1024 		won = port;
1025 		port->mp_flags |= MSGPORTF_WAITING;
1026 	    }
1027 
1028 	    /*
1029 	     * Only messages which support abort can be interrupted.
1030 	     * We must still wait for message completion regardless.
1031 	     */
1032 	    if ((flags & PCATCH) && sentabort == 0) {
1033 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
1034 		if (error) {
1035 		    sentabort = error;
1036 		    lwkt_serialize_exit(port->mpu_serialize);
1037 		    lwkt_abortmsg(msg);
1038 		    lwkt_serialize_enter(port->mpu_serialize);
1039 		}
1040 	    } else {
1041 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
1042 	    }
1043 	    /* see note at the top on the MSGPORTF_WAITING flag */
1044 	}
1045 	/*
1046 	 * Turn EINTR into ERESTART if the signal indicates.
1047 	 */
1048 	if (sentabort && msg->ms_error == EINTR)
1049 	    msg->ms_error = sentabort;
1050 	if (msg->ms_flags & MSGF_QUEUED)
1051 	    _lwkt_pullmsg(port, msg);
1052     } else {
1053 	if (msg->ms_flags & MSGF_QUEUED) {
1054 	    port = msg->ms_reply_port;
1055 
1056 	    ASSERT_SERIALIZED(port->mpu_serialize);
1057 	    _lwkt_pullmsg(port, msg);
1058 	}
1059     }
1060     return(msg->ms_error);
1061 }
1062 
1063 static
1064 void *
1065 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1066 {
1067     lwkt_msg_t msg;
1068     int error;
1069 
1070     ASSERT_SERIALIZED(port->mpu_serialize);
1071 
1072     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1073 	port->mp_flags |= MSGPORTF_WAITING;
1074 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1075 	/* see note at the top on the MSGPORTF_WAITING flag */
1076 	if (error)
1077 	    return(NULL);
1078     }
1079     _lwkt_pullmsg(port, msg);
1080     return(msg);
1081 }
1082 
1083 static
1084 void
1085 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1086 {
1087     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1088     ASSERT_SERIALIZED(port->mpu_serialize);
1089 
1090     if (msg->ms_flags & MSGF_SYNC) {
1091 	/*
1092 	 * If a synchronous completion has been requested, just wakeup
1093 	 * the message without bothering to queue it to the target port.
1094 	 */
1095 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1096 	wakeup(msg);
1097     } else {
1098 	/*
1099 	 * If an asynchronous completion has been requested the message
1100 	 * must be queued to the reply port.
1101 	 */
1102 	_lwkt_enqueue_reply(port, msg);
1103 	if (port->mp_flags & MSGPORTF_WAITING) {
1104 	    port->mp_flags &= ~MSGPORTF_WAITING;
1105 	    wakeup(port);
1106 	}
1107     }
1108 }
1109 
1110 /************************************************************************
1111  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1112  ************************************************************************/
1113 
1114 /*
1115  * You can point a port's reply vector at this function if you just want
1116  * the message marked done, without any queueing or signaling.  This is
1117  * often used for structure-embedded messages.
1118  */
1119 static
1120 void
1121 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1122 {
1123     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1124 }
1125 
1126 static
1127 void *
1128 lwkt_panic_getport(lwkt_port_t port)
1129 {
1130     panic("lwkt_getport() illegal on port %p", port);
1131 }
1132 
1133 static
1134 int
1135 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1136 {
1137     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1138 }
1139 
1140 static
1141 int
1142 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1143 {
1144     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1145 }
1146 
1147 static
1148 void *
1149 lwkt_panic_waitport(lwkt_port_t port, int flags)
1150 {
1151     panic("port %p cannot be waited on", port);
1152 }
1153 
1154 static
1155 void
1156 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1157 {
1158     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1159 }
1160 
1161 static
1162 void
1163 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1164 {
1165     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1166 }
1167