xref: /dragonfly/sys/kern/lwkt_msgport.c (revision 92fc8b5c)
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * NOTE! This file may be compiled for userland libraries as well as for
35  * the kernel.
36  *
37  * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.54 2008/11/26 15:05:42 sephe Exp $
38  */
39 
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/proc.h>
44 #include <sys/rtprio.h>
45 #include <sys/queue.h>
46 #include <sys/sysctl.h>
47 #include <sys/kthread.h>
48 #include <sys/signalvar.h>
49 #include <sys/signal2.h>
50 #include <machine/cpu.h>
51 #include <sys/lock.h>
52 
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_object.h>
57 #include <vm/vm_page.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_pager.h>
60 #include <vm/vm_extern.h>
61 #include <vm/vm_zone.h>
62 
63 #include <sys/thread2.h>
64 #include <sys/msgport2.h>
65 #include <sys/spinlock2.h>
66 #include <sys/serialize.h>
67 
68 #include <machine/stdarg.h>
69 #include <machine/cpufunc.h>
70 #ifdef SMP
71 #include <machine/smp.h>
72 #endif
73 
74 #include <sys/malloc.h>
75 MALLOC_DEFINE(M_LWKTMSG, "lwkt message", "lwkt message");
76 
77 /************************************************************************
78  *				MESSAGE FUNCTIONS			*
79  ************************************************************************/
80 
81 /*
82  * lwkt_sendmsg()
83  *
84  *	Request asynchronous completion and call lwkt_beginmsg().  The
85  *	target port can opt to execute the message synchronously or
86  *	asynchronously and this function will automatically queue the
87  *	response if the target executes the message synchronously.
88  *
89  *	NOTE: The message is in an indeterminant state until this call
90  *	returns.  The caller should not mess with it (e.g. try to abort it)
91  *	until then.
92  */
93 void
94 lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
95 {
96     int error;
97 
98     KKASSERT(msg->ms_reply_port != NULL &&
99 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
100     msg->ms_flags &= ~(MSGF_REPLY | MSGF_SYNC | MSGF_DONE);
101     if ((error = lwkt_beginmsg(port, msg)) != EASYNC) {
102 	lwkt_replymsg(msg, error);
103     }
104 }
105 
106 /*
107  * lwkt_domsg()
108  *
109  *	Request asynchronous completion and call lwkt_beginmsg().  The
110  *	target port can opt to execute the message synchronously or
111  *	asynchronously and this function will automatically queue the
112  *	response if the target executes the message synchronously.
113  */
114 int
115 lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg, int flags)
116 {
117     int error;
118 
119     KKASSERT(msg->ms_reply_port != NULL &&
120 	     (msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == MSGF_DONE);
121     msg->ms_flags &= ~(MSGF_REPLY | MSGF_DONE);
122     msg->ms_flags |= MSGF_SYNC;
123     if ((error = lwkt_beginmsg(port, msg)) == EASYNC) {
124 	error = lwkt_waitmsg(msg, flags);
125     } else {
126 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
127     }
128     return(error);
129 }
130 
131 /*
132  * lwkt_forwardmsg()
133  *
134  * Forward a message received on one port to another port.
135  */
136 int
137 lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
138 {
139     int error;
140 
141     crit_enter();
142     KKASSERT((msg->ms_flags & (MSGF_QUEUED|MSGF_DONE|MSGF_REPLY)) == 0);
143     if ((error = port->mp_putport(port, msg)) != EASYNC)
144 	lwkt_replymsg(msg, error);
145     crit_exit();
146     return(error);
147 }
148 
149 /*
150  * lwkt_abortmsg()
151  *
152  * Attempt to abort a message.  This only works if MSGF_ABORTABLE is set.
153  * The caller must ensure that the message will not be both replied AND
154  * destroyed while the abort is in progress.
155  *
156  * This function issues a callback which might block!
157  */
158 void
159 lwkt_abortmsg(lwkt_msg_t msg)
160 {
161     /*
162      * A critical section protects us from reply IPIs on this cpu.
163      */
164     crit_enter();
165 
166     /*
167      * Shortcut the operation if the message has already been returned.
168      * The callback typically constructs a lwkt_msg with the abort request,
169      * issues it synchronously, and waits for completion.  The callback
170      * is not required to actually abort the message and the target port,
171      * upon receiving an abort request message generated by the callback
172      * should check whether the original message has already completed or
173      * not.
174      */
175     if (msg->ms_flags & MSGF_ABORTABLE) {
176 	if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == 0)
177 	    msg->ms_abortfn(msg);
178     }
179     crit_exit();
180 }
181 
182 /************************************************************************
183  *			PORT INITIALIZATION API				*
184  ************************************************************************/
185 
186 static void *lwkt_thread_getport(lwkt_port_t port);
187 static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
188 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
189 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
190 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
191 static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
192 
193 static void *lwkt_spin_getport(lwkt_port_t port);
194 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
195 static int lwkt_spin_waitmsg(lwkt_msg_t msg, int flags);
196 static void *lwkt_spin_waitport(lwkt_port_t port, int flags);
197 static void lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg);
198 
199 static void *lwkt_serialize_getport(lwkt_port_t port);
200 static int lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg);
201 static int lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags);
202 static void *lwkt_serialize_waitport(lwkt_port_t port, int flags);
203 static void lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg);
204 
205 static void lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg);
206 static void *lwkt_panic_getport(lwkt_port_t port);
207 static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
208 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
209 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
210 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
211 static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
212 
213 /*
214  * Core port initialization (internal)
215  */
216 static __inline
217 void
218 _lwkt_initport(lwkt_port_t port,
219 	       void *(*gportfn)(lwkt_port_t),
220 	       int (*pportfn)(lwkt_port_t, lwkt_msg_t),
221 	       int (*wmsgfn)(lwkt_msg_t, int),
222 	       void *(*wportfn)(lwkt_port_t, int),
223 	       void (*rportfn)(lwkt_port_t, lwkt_msg_t),
224 	       void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
225 {
226     bzero(port, sizeof(*port));
227     TAILQ_INIT(&port->mp_msgq);
228     TAILQ_INIT(&port->mp_msgq_prio);
229     port->mp_getport = gportfn;
230     port->mp_putport = pportfn;
231     port->mp_waitmsg =  wmsgfn;
232     port->mp_waitport =  wportfn;
233     port->mp_replyport = rportfn;
234     port->mp_dropmsg = dmsgfn;
235 }
236 
237 /*
238  * Schedule the target thread.  If the message flags contains MSGF_NORESCHED
239  * we tell the scheduler not to reschedule if td is at a higher priority.
240  *
241  * This routine is called even if the thread is already scheduled so messages
242  * without NORESCHED will cause the target thread to be rescheduled even if
243  * prior messages did not.
244  */
245 static __inline
246 void
247 _lwkt_schedule_msg(thread_t td, int flags)
248 {
249     if (flags & MSGF_NORESCHED)
250 	lwkt_schedule_noresched(td);
251     else
252 	lwkt_schedule(td);
253 }
254 
255 /*
256  * lwkt_initport_thread()
257  *
258  *	Initialize a port for use by a particular thread.  The port may
259  *	only be used by <td>.
260  */
261 void
262 lwkt_initport_thread(lwkt_port_t port, thread_t td)
263 {
264     _lwkt_initport(port,
265 		   lwkt_thread_getport,
266 		   lwkt_thread_putport,
267 		   lwkt_thread_waitmsg,
268 		   lwkt_thread_waitport,
269 		   lwkt_thread_replyport,
270 		   lwkt_thread_dropmsg);
271     port->mpu_td = td;
272 }
273 
274 /*
275  * lwkt_initport_spin()
276  *
277  *	Initialize a port for use with descriptors that might be accessed
278  *	via multiple LWPs, processes, or threads.  Has somewhat more
279  *	overhead then thread ports.
280  */
281 void
282 lwkt_initport_spin(lwkt_port_t port)
283 {
284     _lwkt_initport(port,
285 		   lwkt_spin_getport,
286 		   lwkt_spin_putport,
287 		   lwkt_spin_waitmsg,
288 		   lwkt_spin_waitport,
289 		   lwkt_spin_replyport,
290 		   lwkt_panic_dropmsg);
291     spin_init(&port->mpu_spin);
292 }
293 
294 /*
295  * lwkt_initport_serialize()
296  *
297  *	Initialize a port for use with descriptors that might be accessed
298  *	via multiple LWPs, processes, or threads.  Callers are assumed to
299  *	have held the serializer (slz).
300  */
301 void
302 lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
303 {
304     _lwkt_initport(port,
305 		   lwkt_serialize_getport,
306 		   lwkt_serialize_putport,
307 		   lwkt_serialize_waitmsg,
308 		   lwkt_serialize_waitport,
309 		   lwkt_serialize_replyport,
310 		   lwkt_panic_dropmsg);
311     port->mpu_serialize = slz;
312 }
313 
314 /*
315  * Similar to the standard initport, this function simply marks the message
316  * as being done and does not attempt to return it to an originating port.
317  */
318 void
319 lwkt_initport_replyonly_null(lwkt_port_t port)
320 {
321     _lwkt_initport(port,
322 		   lwkt_panic_getport,
323 		   lwkt_panic_putport,
324 		   lwkt_panic_waitmsg,
325 		   lwkt_panic_waitport,
326 		   lwkt_null_replyport,
327 		   lwkt_panic_dropmsg);
328 }
329 
330 /*
331  * Initialize a reply-only port, typically used as a message sink.  Such
332  * ports can only be used as a reply port.
333  */
334 void
335 lwkt_initport_replyonly(lwkt_port_t port,
336 			void (*rportfn)(lwkt_port_t, lwkt_msg_t))
337 {
338     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
339 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
340 			 rportfn, lwkt_panic_dropmsg);
341 }
342 
343 void
344 lwkt_initport_putonly(lwkt_port_t port,
345 		      int (*pportfn)(lwkt_port_t, lwkt_msg_t))
346 {
347     _lwkt_initport(port, lwkt_panic_getport, pportfn,
348 			 lwkt_panic_waitmsg, lwkt_panic_waitport,
349 			 lwkt_panic_replyport, lwkt_panic_dropmsg);
350 }
351 
352 void
353 lwkt_initport_panic(lwkt_port_t port)
354 {
355     _lwkt_initport(port,
356 		   lwkt_panic_getport, lwkt_panic_putport,
357 		   lwkt_panic_waitmsg, lwkt_panic_waitport,
358 		   lwkt_panic_replyport, lwkt_panic_dropmsg);
359 }
360 
361 static __inline
362 void
363 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
364 {
365     lwkt_msg_queue *queue;
366 
367     /*
368      * normal case, remove and return the message.
369      */
370     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
371 	queue = &port->mp_msgq_prio;
372     else
373 	queue = &port->mp_msgq;
374     TAILQ_REMOVE(queue, msg, ms_node);
375     msg->ms_flags &= ~MSGF_QUEUED;
376 }
377 
378 static __inline
379 void
380 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
381 {
382     lwkt_msg_queue *queue;
383 
384     msg->ms_flags |= MSGF_QUEUED;
385     if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
386 	queue = &port->mp_msgq_prio;
387     else
388     	queue = &port->mp_msgq;
389     TAILQ_INSERT_TAIL(queue, msg, ms_node);
390 }
391 
392 static __inline
393 lwkt_msg_t
394 _lwkt_pollmsg(lwkt_port_t port)
395 {
396     lwkt_msg_t msg;
397 
398     msg = TAILQ_FIRST(&port->mp_msgq_prio);
399     if (__predict_false(msg != NULL))
400 	return msg;
401 
402     /*
403      * Priority queue has no message, fallback to non-priority queue.
404      */
405     return TAILQ_FIRST(&port->mp_msgq);
406 }
407 
408 static __inline
409 void
410 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
411 {
412     _lwkt_pushmsg(port, msg);
413     msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
414 }
415 
416 /************************************************************************
417  *			THREAD PORT BACKEND				*
418  ************************************************************************
419  *
420  * This backend is used when the port a message is retrieved from is owned
421  * by a single thread (the calling thread).  Messages are IPId to the
422  * correct cpu before being enqueued to a port.  Note that this is fairly
423  * optimal since scheduling would have had to do an IPI anyway if the
424  * message were headed to a different cpu.
425  */
426 
427 #ifdef SMP
428 
429 /*
430  * This function completes reply processing for the default case in the
431  * context of the originating cpu.
432  */
433 static
434 void
435 lwkt_thread_replyport_remote(lwkt_msg_t msg)
436 {
437     lwkt_port_t port = msg->ms_reply_port;
438     int flags;
439 
440     /*
441      * Chase any thread migration that occurs
442      */
443     if (port->mpu_td->td_gd != mycpu) {
444 	lwkt_send_ipiq(port->mpu_td->td_gd,
445 		       (ipifunc1_t)lwkt_thread_replyport_remote, msg);
446 	return;
447     }
448 
449     /*
450      * Cleanup
451      */
452 #ifdef INVARIANTS
453     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
454     msg->ms_flags &= ~MSGF_INTRANSIT;
455 #endif
456     flags = msg->ms_flags;
457     if (msg->ms_flags & MSGF_SYNC) {
458 	cpu_sfence();
459 	msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
460     } else {
461 	_lwkt_enqueue_reply(port, msg);
462     }
463     if (port->mp_flags & MSGPORTF_WAITING)
464 	_lwkt_schedule_msg(port->mpu_td, flags);
465 }
466 
467 #endif
468 
469 /*
470  * lwkt_thread_replyport() - Backend to lwkt_replymsg()
471  *
472  * Called with the reply port as an argument but in the context of the
473  * original target port.  Completion must occur on the target port's
474  * cpu.
475  *
476  * The critical section protects us from IPIs on the this CPU.
477  */
478 static
479 void
480 lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
481 {
482     int flags;
483 
484     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED|MSGF_INTRANSIT)) == 0);
485 
486     if (msg->ms_flags & MSGF_SYNC) {
487 	/*
488 	 * If a synchronous completion has been requested, just wakeup
489 	 * the message without bothering to queue it to the target port.
490 	 *
491 	 * Assume the target thread is non-preemptive, so no critical
492 	 * section is required.
493 	 */
494 #ifdef SMP
495 	if (port->mpu_td->td_gd == mycpu) {
496 #endif
497 	    flags = msg->ms_flags;
498 	    cpu_sfence();
499 	    msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
500 	    if (port->mp_flags & MSGPORTF_WAITING)
501 		_lwkt_schedule_msg(port->mpu_td, flags);
502 #ifdef SMP
503 	} else {
504 #ifdef INVARIANTS
505 	    msg->ms_flags |= MSGF_INTRANSIT;
506 #endif
507 	    msg->ms_flags |= MSGF_REPLY;
508 	    lwkt_send_ipiq(port->mpu_td->td_gd,
509 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
510 	}
511 #endif
512     } else {
513 	/*
514 	 * If an asynchronous completion has been requested the message
515 	 * must be queued to the reply port.
516 	 *
517 	 * A critical section is required to interlock the port queue.
518 	 */
519 #ifdef SMP
520 	if (port->mpu_td->td_gd == mycpu) {
521 #endif
522 	    crit_enter();
523 	    _lwkt_enqueue_reply(port, msg);
524 	    if (port->mp_flags & MSGPORTF_WAITING)
525 		_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
526 	    crit_exit();
527 #ifdef SMP
528 	} else {
529 #ifdef INVARIANTS
530 	    msg->ms_flags |= MSGF_INTRANSIT;
531 #endif
532 	    msg->ms_flags |= MSGF_REPLY;
533 	    lwkt_send_ipiq(port->mpu_td->td_gd,
534 			   (ipifunc1_t)lwkt_thread_replyport_remote, msg);
535 	}
536 #endif
537     }
538 }
539 
540 /*
541  * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
542  *
543  * This function could _only_ be used when caller is in the same thread
544  * as the message's target port owner thread.
545  */
546 static void
547 lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
548 {
549     KASSERT(port->mpu_td == curthread,
550     	    ("message could only be dropped in the same thread "
551 	     "as the message target port thread\n"));
552     crit_enter_quick(port->mpu_td);
553     _lwkt_pullmsg(port, msg);
554     msg->ms_flags |= MSGF_DONE;
555     crit_exit_quick(port->mpu_td);
556 }
557 
558 /*
559  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
560  *
561  * Called with the target port as an argument but in the context of the
562  * reply port.  This function always implements an asynchronous put to
563  * the target message port, and thus returns EASYNC.
564  *
565  * The message must already have cleared MSGF_DONE and MSGF_REPLY
566  */
567 
568 #ifdef SMP
569 
570 static
571 void
572 lwkt_thread_putport_remote(lwkt_msg_t msg)
573 {
574     lwkt_port_t port = msg->ms_target_port;
575 
576     /*
577      * Chase any thread migration that occurs
578      */
579     if (port->mpu_td->td_gd != mycpu) {
580 	lwkt_send_ipiq(port->mpu_td->td_gd,
581 		       (ipifunc1_t)lwkt_thread_putport_remote, msg);
582 	return;
583     }
584 
585     /*
586      * Cleanup
587      */
588 #ifdef INVARIANTS
589     KKASSERT(msg->ms_flags & MSGF_INTRANSIT);
590     msg->ms_flags &= ~MSGF_INTRANSIT;
591 #endif
592     _lwkt_pushmsg(port, msg);
593     if (port->mp_flags & MSGPORTF_WAITING)
594 	_lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
595 }
596 
597 #endif
598 
599 static
600 int
601 lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg)
602 {
603     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
604 
605     msg->ms_target_port = port;
606 #ifdef SMP
607     if (port->mpu_td->td_gd == mycpu) {
608 #endif
609 	crit_enter();
610 	_lwkt_pushmsg(port, msg);
611 	if (port->mp_flags & MSGPORTF_WAITING)
612 	    _lwkt_schedule_msg(port->mpu_td, msg->ms_flags);
613 	crit_exit();
614 #ifdef SMP
615     } else {
616 #ifdef INVARIANTS
617 	msg->ms_flags |= MSGF_INTRANSIT;
618 #endif
619 	lwkt_send_ipiq(port->mpu_td->td_gd,
620 			(ipifunc1_t)lwkt_thread_putport_remote, msg);
621     }
622 #endif
623     return (EASYNC);
624 }
625 
626 /*
627  * lwkt_thread_getport()
628  *
629  *	Retrieve the next message from the port or NULL if no messages
630  *	are ready.
631  */
632 static
633 void *
634 lwkt_thread_getport(lwkt_port_t port)
635 {
636     lwkt_msg_t msg;
637 
638     KKASSERT(port->mpu_td == curthread);
639 
640     crit_enter_quick(port->mpu_td);
641     if ((msg = _lwkt_pollmsg(port)) != NULL)
642 	_lwkt_pullmsg(port, msg);
643     crit_exit_quick(port->mpu_td);
644     return(msg);
645 }
646 
647 /*
648  * lwkt_thread_waitmsg()
649  *
650  *	Wait for a particular message to be replied.  We must be the only
651  *	thread waiting on the message.  The port must be owned by the
652  *	caller.
653  */
654 static
655 int
656 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
657 {
658     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
659     	    ("can't wait dropable message\n"));
660 
661     if ((msg->ms_flags & MSGF_DONE) == 0) {
662 	/*
663 	 * If the done bit was not set we have to block until it is.
664 	 */
665 	lwkt_port_t port = msg->ms_reply_port;
666 	thread_t td = curthread;
667 	int sentabort;
668 
669 	KKASSERT(port->mpu_td == td);
670 	crit_enter_quick(td);
671 	sentabort = 0;
672 
673 	while ((msg->ms_flags & MSGF_DONE) == 0) {
674 	    port->mp_flags |= MSGPORTF_WAITING;
675 	    if (sentabort == 0) {
676 		if ((sentabort = lwkt_sleep("waitmsg", flags)) != 0) {
677 		    lwkt_abortmsg(msg);
678 		}
679 	    } else {
680 		lwkt_sleep("waitabt", 0);
681 	    }
682 	    port->mp_flags &= ~MSGPORTF_WAITING;
683 	}
684 	if (msg->ms_flags & MSGF_QUEUED)
685 	    _lwkt_pullmsg(port, msg);
686 	crit_exit_quick(td);
687     } else {
688 	/*
689 	 * If the done bit was set we only have to mess around with the
690 	 * message if it is queued on the reply port.
691 	 */
692 	if (msg->ms_flags & MSGF_QUEUED) {
693 	    lwkt_port_t port = msg->ms_reply_port;
694 	    thread_t td = curthread;
695 
696 	    KKASSERT(port->mpu_td == td);
697 	    crit_enter_quick(td);
698 	    _lwkt_pullmsg(port, msg);
699 	    crit_exit_quick(td);
700 	}
701     }
702     return(msg->ms_error);
703 }
704 
705 static
706 void *
707 lwkt_thread_waitport(lwkt_port_t port, int flags)
708 {
709     thread_t td = curthread;
710     lwkt_msg_t msg;
711     int error;
712 
713     KKASSERT(port->mpu_td == td);
714     crit_enter_quick(td);
715     while ((msg = _lwkt_pollmsg(port)) == NULL) {
716 	port->mp_flags |= MSGPORTF_WAITING;
717 	error = lwkt_sleep("waitport", flags);
718 	port->mp_flags &= ~MSGPORTF_WAITING;
719 	if (error)
720 	    goto done;
721     }
722     _lwkt_pullmsg(port, msg);
723 done:
724     crit_exit_quick(td);
725     return(msg);
726 }
727 
728 /************************************************************************
729  *			   SPIN PORT BACKEND				*
730  ************************************************************************
731  *
732  * This backend uses spinlocks instead of making assumptions about which
733  * thread is accessing the port.  It must be used when a port is not owned
734  * by a particular thread.  This is less optimal then thread ports but
735  * you don't have a choice if there are multiple threads accessing the port.
736  *
737  * Note on MSGPORTF_WAITING - because there may be multiple threads blocked
738  * on the message port, it is the responsibility of the code doing the
739  * wakeup to clear this flag rather then the blocked threads.  Some
740  * superfluous wakeups may occur, which is ok.
741  *
742  * XXX synchronous message wakeups are not current optimized.
743  */
744 
745 static
746 void *
747 lwkt_spin_getport(lwkt_port_t port)
748 {
749     lwkt_msg_t msg;
750 
751     spin_lock(&port->mpu_spin);
752     if ((msg = _lwkt_pollmsg(port)) != NULL)
753 	_lwkt_pullmsg(port, msg);
754     spin_unlock(&port->mpu_spin);
755     return(msg);
756 }
757 
758 static
759 int
760 lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg)
761 {
762     int dowakeup;
763 
764     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
765 
766     msg->ms_target_port = port;
767     spin_lock(&port->mpu_spin);
768     _lwkt_pushmsg(port, msg);
769     dowakeup = 0;
770     if (port->mp_flags & MSGPORTF_WAITING) {
771 	port->mp_flags &= ~MSGPORTF_WAITING;
772 	dowakeup = 1;
773     }
774     spin_unlock(&port->mpu_spin);
775     if (dowakeup)
776 	wakeup(port);
777     return (EASYNC);
778 }
779 
780 static
781 int
782 lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
783 {
784     lwkt_port_t port;
785     int sentabort;
786     int error;
787 
788     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
789     	    ("can't wait dropable message\n"));
790 
791     if ((msg->ms_flags & MSGF_DONE) == 0) {
792 	port = msg->ms_reply_port;
793 	sentabort = 0;
794 	spin_lock(&port->mpu_spin);
795 	while ((msg->ms_flags & MSGF_DONE) == 0) {
796 	    void *won;
797 
798 	    /*
799 	     * If message was sent synchronously from the beginning
800 	     * the wakeup will be on the message structure, else it
801 	     * will be on the port structure.
802 	     */
803 	    if (msg->ms_flags & MSGF_SYNC) {
804 		won = msg;
805 	    } else {
806 		won = port;
807 		port->mp_flags |= MSGPORTF_WAITING;
808 	    }
809 
810 	    /*
811 	     * Only messages which support abort can be interrupted.
812 	     * We must still wait for message completion regardless.
813 	     */
814 	    if ((flags & PCATCH) && sentabort == 0) {
815 		error = ssleep(won, &port->mpu_spin, PCATCH, "waitmsg", 0);
816 		if (error) {
817 		    sentabort = error;
818 		    spin_unlock(&port->mpu_spin);
819 		    lwkt_abortmsg(msg);
820 		    spin_lock(&port->mpu_spin);
821 		}
822 	    } else {
823 		error = ssleep(won, &port->mpu_spin, 0, "waitmsg", 0);
824 	    }
825 	    /* see note at the top on the MSGPORTF_WAITING flag */
826 	}
827 	/*
828 	 * Turn EINTR into ERESTART if the signal indicates.
829 	 */
830 	if (sentabort && msg->ms_error == EINTR)
831 	    msg->ms_error = sentabort;
832 	if (msg->ms_flags & MSGF_QUEUED)
833 	    _lwkt_pullmsg(port, msg);
834 	spin_unlock(&port->mpu_spin);
835     } else {
836 	if (msg->ms_flags & MSGF_QUEUED) {
837 	    port = msg->ms_reply_port;
838 	    spin_lock(&port->mpu_spin);
839 	    _lwkt_pullmsg(port, msg);
840 	    spin_unlock(&port->mpu_spin);
841 	}
842     }
843     return(msg->ms_error);
844 }
845 
846 static
847 void *
848 lwkt_spin_waitport(lwkt_port_t port, int flags)
849 {
850     lwkt_msg_t msg;
851     int error;
852 
853     spin_lock(&port->mpu_spin);
854     while ((msg = _lwkt_pollmsg(port)) == NULL) {
855 	port->mp_flags |= MSGPORTF_WAITING;
856 	error = ssleep(port, &port->mpu_spin, flags, "waitport", 0);
857 	/* see note at the top on the MSGPORTF_WAITING flag */
858 	if (error) {
859 	    spin_unlock(&port->mpu_spin);
860 	    return(NULL);
861 	}
862     }
863     _lwkt_pullmsg(port, msg);
864     spin_unlock(&port->mpu_spin);
865     return(msg);
866 }
867 
868 static
869 void
870 lwkt_spin_replyport(lwkt_port_t port, lwkt_msg_t msg)
871 {
872     int dowakeup;
873 
874     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
875 
876     if (msg->ms_flags & MSGF_SYNC) {
877 	/*
878 	 * If a synchronous completion has been requested, just wakeup
879 	 * the message without bothering to queue it to the target port.
880 	 */
881 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
882 	wakeup(msg);
883     } else {
884 	/*
885 	 * If an asynchronous completion has been requested the message
886 	 * must be queued to the reply port.
887 	 */
888 	spin_lock(&port->mpu_spin);
889 	_lwkt_enqueue_reply(port, msg);
890 	dowakeup = 0;
891 	if (port->mp_flags & MSGPORTF_WAITING) {
892 	    port->mp_flags &= ~MSGPORTF_WAITING;
893 	    dowakeup = 1;
894 	}
895 	spin_unlock(&port->mpu_spin);
896 	if (dowakeup)
897 	    wakeup(port);
898     }
899 }
900 
901 /************************************************************************
902  *			  SERIALIZER PORT BACKEND			*
903  ************************************************************************
904  *
905  * This backend uses serializer to protect port accessing.  Callers are
906  * assumed to have serializer held.  This kind of port is usually created
907  * by network device driver along with _one_ lwkt thread to pipeline
908  * operations which may temporarily release serializer.
909  *
910  * Implementation is based on SPIN PORT BACKEND.
911  */
912 
913 static
914 void *
915 lwkt_serialize_getport(lwkt_port_t port)
916 {
917     lwkt_msg_t msg;
918 
919     ASSERT_SERIALIZED(port->mpu_serialize);
920 
921     if ((msg = _lwkt_pollmsg(port)) != NULL)
922 	_lwkt_pullmsg(port, msg);
923     return(msg);
924 }
925 
926 static
927 int
928 lwkt_serialize_putport(lwkt_port_t port, lwkt_msg_t msg)
929 {
930     KKASSERT((msg->ms_flags & (MSGF_DONE | MSGF_REPLY)) == 0);
931     ASSERT_SERIALIZED(port->mpu_serialize);
932 
933     msg->ms_target_port = port;
934     _lwkt_pushmsg(port, msg);
935     if (port->mp_flags & MSGPORTF_WAITING) {
936 	port->mp_flags &= ~MSGPORTF_WAITING;
937 	wakeup(port);
938     }
939     return (EASYNC);
940 }
941 
942 static
943 int
944 lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
945 {
946     lwkt_port_t port;
947     int sentabort;
948     int error;
949 
950     KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
951     	    ("can't wait dropable message\n"));
952 
953     if ((msg->ms_flags & MSGF_DONE) == 0) {
954 	port = msg->ms_reply_port;
955 
956 	ASSERT_SERIALIZED(port->mpu_serialize);
957 
958 	sentabort = 0;
959 	while ((msg->ms_flags & MSGF_DONE) == 0) {
960 	    void *won;
961 
962 	    /*
963 	     * If message was sent synchronously from the beginning
964 	     * the wakeup will be on the message structure, else it
965 	     * will be on the port structure.
966 	     */
967 	    if (msg->ms_flags & MSGF_SYNC) {
968 		won = msg;
969 	    } else {
970 		won = port;
971 		port->mp_flags |= MSGPORTF_WAITING;
972 	    }
973 
974 	    /*
975 	     * Only messages which support abort can be interrupted.
976 	     * We must still wait for message completion regardless.
977 	     */
978 	    if ((flags & PCATCH) && sentabort == 0) {
979 		error = zsleep(won, port->mpu_serialize, PCATCH, "waitmsg", 0);
980 		if (error) {
981 		    sentabort = error;
982 		    lwkt_serialize_exit(port->mpu_serialize);
983 		    lwkt_abortmsg(msg);
984 		    lwkt_serialize_enter(port->mpu_serialize);
985 		}
986 	    } else {
987 		error = zsleep(won, port->mpu_serialize, 0, "waitmsg", 0);
988 	    }
989 	    /* see note at the top on the MSGPORTF_WAITING flag */
990 	}
991 	/*
992 	 * Turn EINTR into ERESTART if the signal indicates.
993 	 */
994 	if (sentabort && msg->ms_error == EINTR)
995 	    msg->ms_error = sentabort;
996 	if (msg->ms_flags & MSGF_QUEUED)
997 	    _lwkt_pullmsg(port, msg);
998     } else {
999 	if (msg->ms_flags & MSGF_QUEUED) {
1000 	    port = msg->ms_reply_port;
1001 
1002 	    ASSERT_SERIALIZED(port->mpu_serialize);
1003 	    _lwkt_pullmsg(port, msg);
1004 	}
1005     }
1006     return(msg->ms_error);
1007 }
1008 
1009 static
1010 void *
1011 lwkt_serialize_waitport(lwkt_port_t port, int flags)
1012 {
1013     lwkt_msg_t msg;
1014     int error;
1015 
1016     ASSERT_SERIALIZED(port->mpu_serialize);
1017 
1018     while ((msg = _lwkt_pollmsg(port)) == NULL) {
1019 	port->mp_flags |= MSGPORTF_WAITING;
1020 	error = zsleep(port, port->mpu_serialize, flags, "waitport", 0);
1021 	/* see note at the top on the MSGPORTF_WAITING flag */
1022 	if (error)
1023 	    return(NULL);
1024     }
1025     _lwkt_pullmsg(port, msg);
1026     return(msg);
1027 }
1028 
1029 static
1030 void
1031 lwkt_serialize_replyport(lwkt_port_t port, lwkt_msg_t msg)
1032 {
1033     KKASSERT((msg->ms_flags & (MSGF_DONE|MSGF_QUEUED)) == 0);
1034     ASSERT_SERIALIZED(port->mpu_serialize);
1035 
1036     if (msg->ms_flags & MSGF_SYNC) {
1037 	/*
1038 	 * If a synchronous completion has been requested, just wakeup
1039 	 * the message without bothering to queue it to the target port.
1040 	 */
1041 	msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1042 	wakeup(msg);
1043     } else {
1044 	/*
1045 	 * If an asynchronous completion has been requested the message
1046 	 * must be queued to the reply port.
1047 	 */
1048 	_lwkt_enqueue_reply(port, msg);
1049 	if (port->mp_flags & MSGPORTF_WAITING) {
1050 	    port->mp_flags &= ~MSGPORTF_WAITING;
1051 	    wakeup(port);
1052 	}
1053     }
1054 }
1055 
1056 /************************************************************************
1057  *		     PANIC AND SPECIAL PORT FUNCTIONS			*
1058  ************************************************************************/
1059 
1060 /*
1061  * You can point a port's reply vector at this function if you just want
1062  * the message marked done, without any queueing or signaling.  This is
1063  * often used for structure-embedded messages.
1064  */
1065 static
1066 void
1067 lwkt_null_replyport(lwkt_port_t port, lwkt_msg_t msg)
1068 {
1069     msg->ms_flags |= MSGF_DONE | MSGF_REPLY;
1070 }
1071 
1072 static
1073 void *
1074 lwkt_panic_getport(lwkt_port_t port)
1075 {
1076     panic("lwkt_getport() illegal on port %p", port);
1077 }
1078 
1079 static
1080 int
1081 lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg)
1082 {
1083     panic("lwkt_begin/do/sendmsg() illegal on port %p msg %p", port, msg);
1084 }
1085 
1086 static
1087 int
1088 lwkt_panic_waitmsg(lwkt_msg_t msg, int flags)
1089 {
1090     panic("port %p msg %p cannot be waited on", msg->ms_reply_port, msg);
1091 }
1092 
1093 static
1094 void *
1095 lwkt_panic_waitport(lwkt_port_t port, int flags)
1096 {
1097     panic("port %p cannot be waited on", port);
1098 }
1099 
1100 static
1101 void
1102 lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
1103 {
1104     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
1105 }
1106 
1107 static
1108 void
1109 lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
1110 {
1111     panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
1112 }
1113