xref: /freebsd/sys/rpc/svc.c (revision 16038816)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*-
4  * SPDX-License-Identifier: BSD-3-Clause
5  *
6  * Copyright (c) 2009, Sun Microsystems, Inc.
7  * All rights reserved.
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions are met:
11  * - Redistributions of source code must retain the above copyright notice,
12  *   this list of conditions and the following disclaimer.
13  * - Redistributions in binary form must reproduce the above copyright notice,
14  *   this list of conditions and the following disclaimer in the documentation
15  *   and/or other materials provided with the distribution.
16  * - Neither the name of Sun Microsystems, Inc. nor the names of its
17  *   contributors may be used to endorse or promote products derived
18  *   from this software without specific prior written permission.
19  *
20  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
23  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
24  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
25  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
26  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
27  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
28  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
29  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
30  * POSSIBILITY OF SUCH DAMAGE.
31  */
32 
33 #if defined(LIBC_SCCS) && !defined(lint)
34 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
35 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39 
40 /*
41  * svc.c, Server-side remote procedure call interface.
42  *
43  * There are two sets of procedures here.  The xprt routines are
44  * for handling transport handles.  The svc routines handle the
45  * list of service routines.
46  *
47  * Copyright (C) 1984, Sun Microsystems, Inc.
48  */
49 
50 #include <sys/param.h>
51 #include <sys/lock.h>
52 #include <sys/kernel.h>
53 #include <sys/kthread.h>
54 #include <sys/malloc.h>
55 #include <sys/mbuf.h>
56 #include <sys/mutex.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/socketvar.h>
60 #include <sys/systm.h>
61 #include <sys/smp.h>
62 #include <sys/sx.h>
63 #include <sys/ucred.h>
64 
65 #include <rpc/rpc.h>
66 #include <rpc/rpcb_clnt.h>
67 #include <rpc/replay.h>
68 
69 #include <rpc/rpc_com.h>
70 
71 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
72 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
73 
74 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
75     char *);
76 static void svc_new_thread(SVCGROUP *grp);
77 static void xprt_unregister_locked(SVCXPRT *xprt);
78 static void svc_change_space_used(SVCPOOL *pool, long delta);
79 static bool_t svc_request_space_available(SVCPOOL *pool);
80 static void svcpool_cleanup(SVCPOOL *pool);
81 
82 /* ***************  SVCXPRT related stuff **************** */
83 
84 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
85 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
86 static int svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS);
87 
88 SVCPOOL*
89 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
90 {
91 	SVCPOOL *pool;
92 	SVCGROUP *grp;
93 	int g;
94 
95 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
96 
97 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
98 	pool->sp_name = name;
99 	pool->sp_state = SVCPOOL_INIT;
100 	pool->sp_proc = NULL;
101 	TAILQ_INIT(&pool->sp_callouts);
102 	TAILQ_INIT(&pool->sp_lcallouts);
103 	pool->sp_minthreads = 1;
104 	pool->sp_maxthreads = 1;
105 	pool->sp_groupcount = 1;
106 	for (g = 0; g < SVC_MAXGROUPS; g++) {
107 		grp = &pool->sp_groups[g];
108 		mtx_init(&grp->sg_lock, "sg_lock", NULL, MTX_DEF);
109 		grp->sg_pool = pool;
110 		grp->sg_state = SVCPOOL_ACTIVE;
111 		TAILQ_INIT(&grp->sg_xlist);
112 		TAILQ_INIT(&grp->sg_active);
113 		LIST_INIT(&grp->sg_idlethreads);
114 		grp->sg_minthreads = 1;
115 		grp->sg_maxthreads = 1;
116 	}
117 
118 	/*
119 	 * Don't use more than a quarter of mbuf clusters.  Nota bene:
120 	 * nmbclusters is an int, but nmbclusters*MCLBYTES may overflow
121 	 * on LP64 architectures, so cast to u_long to avoid undefined
122 	 * behavior.  (ILP32 architectures cannot have nmbclusters
123 	 * large enough to overflow for other reasons.)
124 	 */
125 	pool->sp_space_high = (u_long)nmbclusters * MCLBYTES / 4;
126 	pool->sp_space_low = (pool->sp_space_high / 3) * 2;
127 
128 	sysctl_ctx_init(&pool->sp_sysctl);
129 	if (sysctl_base) {
130 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
131 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
132 		    pool, 0, svcpool_minthread_sysctl, "I",
133 		    "Minimal number of threads");
134 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
135 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW | CTLFLAG_MPSAFE,
136 		    pool, 0, svcpool_maxthread_sysctl, "I",
137 		    "Maximal number of threads");
138 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
139 		    "threads", CTLTYPE_INT | CTLFLAG_RD | CTLFLAG_MPSAFE,
140 		    pool, 0, svcpool_threads_sysctl, "I",
141 		    "Current number of threads");
142 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
143 		    "groups", CTLFLAG_RD, &pool->sp_groupcount, 0,
144 		    "Number of thread groups");
145 
146 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 		    "request_space_used", CTLFLAG_RD,
148 		    &pool->sp_space_used,
149 		    "Space in parsed but not handled requests.");
150 
151 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
152 		    "request_space_used_highest", CTLFLAG_RD,
153 		    &pool->sp_space_used_highest,
154 		    "Highest space used since reboot.");
155 
156 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
157 		    "request_space_high", CTLFLAG_RW,
158 		    &pool->sp_space_high,
159 		    "Maximum space in parsed but not handled requests.");
160 
161 		SYSCTL_ADD_ULONG(&pool->sp_sysctl, sysctl_base, OID_AUTO,
162 		    "request_space_low", CTLFLAG_RW,
163 		    &pool->sp_space_low,
164 		    "Low water mark for request space.");
165 
166 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
167 		    "request_space_throttled", CTLFLAG_RD,
168 		    &pool->sp_space_throttled, 0,
169 		    "Whether nfs requests are currently throttled");
170 
171 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
172 		    "request_space_throttle_count", CTLFLAG_RD,
173 		    &pool->sp_space_throttle_count, 0,
174 		    "Count of times throttling based on request space has occurred");
175 	}
176 
177 	return pool;
178 }
179 
180 /*
181  * Code common to svcpool_destroy() and svcpool_close(), which cleans up
182  * the pool data structures.
183  */
184 static void
185 svcpool_cleanup(SVCPOOL *pool)
186 {
187 	SVCGROUP *grp;
188 	SVCXPRT *xprt, *nxprt;
189 	struct svc_callout *s;
190 	struct svc_loss_callout *sl;
191 	struct svcxprt_list cleanup;
192 	int g;
193 
194 	TAILQ_INIT(&cleanup);
195 
196 	for (g = 0; g < SVC_MAXGROUPS; g++) {
197 		grp = &pool->sp_groups[g];
198 		mtx_lock(&grp->sg_lock);
199 		while ((xprt = TAILQ_FIRST(&grp->sg_xlist)) != NULL) {
200 			xprt_unregister_locked(xprt);
201 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
202 		}
203 		mtx_unlock(&grp->sg_lock);
204 	}
205 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
206 		if (xprt->xp_socket != NULL)
207 			soshutdown(xprt->xp_socket, SHUT_WR);
208 		SVC_RELEASE(xprt);
209 	}
210 
211 	mtx_lock(&pool->sp_lock);
212 	while ((s = TAILQ_FIRST(&pool->sp_callouts)) != NULL) {
213 		mtx_unlock(&pool->sp_lock);
214 		svc_unreg(pool, s->sc_prog, s->sc_vers);
215 		mtx_lock(&pool->sp_lock);
216 	}
217 	while ((sl = TAILQ_FIRST(&pool->sp_lcallouts)) != NULL) {
218 		mtx_unlock(&pool->sp_lock);
219 		svc_loss_unreg(pool, sl->slc_dispatch);
220 		mtx_lock(&pool->sp_lock);
221 	}
222 	mtx_unlock(&pool->sp_lock);
223 }
224 
225 void
226 svcpool_destroy(SVCPOOL *pool)
227 {
228 	SVCGROUP *grp;
229 	int g;
230 
231 	svcpool_cleanup(pool);
232 
233 	for (g = 0; g < SVC_MAXGROUPS; g++) {
234 		grp = &pool->sp_groups[g];
235 		mtx_destroy(&grp->sg_lock);
236 	}
237 	mtx_destroy(&pool->sp_lock);
238 
239 	if (pool->sp_rcache)
240 		replay_freecache(pool->sp_rcache);
241 
242 	sysctl_ctx_free(&pool->sp_sysctl);
243 	free(pool, M_RPC);
244 }
245 
246 /*
247  * Similar to svcpool_destroy(), except that it does not destroy the actual
248  * data structures.  As such, "pool" may be used again.
249  */
250 void
251 svcpool_close(SVCPOOL *pool)
252 {
253 	SVCGROUP *grp;
254 	int g;
255 
256 	svcpool_cleanup(pool);
257 
258 	/* Now, initialize the pool's state for a fresh svc_run() call. */
259 	mtx_lock(&pool->sp_lock);
260 	pool->sp_state = SVCPOOL_INIT;
261 	mtx_unlock(&pool->sp_lock);
262 	for (g = 0; g < SVC_MAXGROUPS; g++) {
263 		grp = &pool->sp_groups[g];
264 		mtx_lock(&grp->sg_lock);
265 		grp->sg_state = SVCPOOL_ACTIVE;
266 		mtx_unlock(&grp->sg_lock);
267 	}
268 }
269 
270 /*
271  * Sysctl handler to get the present thread count on a pool
272  */
273 static int
274 svcpool_threads_sysctl(SYSCTL_HANDLER_ARGS)
275 {
276 	SVCPOOL *pool;
277 	int threads, error, g;
278 
279 	pool = oidp->oid_arg1;
280 	threads = 0;
281 	mtx_lock(&pool->sp_lock);
282 	for (g = 0; g < pool->sp_groupcount; g++)
283 		threads += pool->sp_groups[g].sg_threadcount;
284 	mtx_unlock(&pool->sp_lock);
285 	error = sysctl_handle_int(oidp, &threads, 0, req);
286 	return (error);
287 }
288 
289 /*
290  * Sysctl handler to set the minimum thread count on a pool
291  */
292 static int
293 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
294 {
295 	SVCPOOL *pool;
296 	int newminthreads, error, g;
297 
298 	pool = oidp->oid_arg1;
299 	newminthreads = pool->sp_minthreads;
300 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
301 	if (error == 0 && newminthreads != pool->sp_minthreads) {
302 		if (newminthreads > pool->sp_maxthreads)
303 			return (EINVAL);
304 		mtx_lock(&pool->sp_lock);
305 		pool->sp_minthreads = newminthreads;
306 		for (g = 0; g < pool->sp_groupcount; g++) {
307 			pool->sp_groups[g].sg_minthreads = max(1,
308 			    pool->sp_minthreads / pool->sp_groupcount);
309 		}
310 		mtx_unlock(&pool->sp_lock);
311 	}
312 	return (error);
313 }
314 
315 /*
316  * Sysctl handler to set the maximum thread count on a pool
317  */
318 static int
319 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
320 {
321 	SVCPOOL *pool;
322 	int newmaxthreads, error, g;
323 
324 	pool = oidp->oid_arg1;
325 	newmaxthreads = pool->sp_maxthreads;
326 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
327 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
328 		if (newmaxthreads < pool->sp_minthreads)
329 			return (EINVAL);
330 		mtx_lock(&pool->sp_lock);
331 		pool->sp_maxthreads = newmaxthreads;
332 		for (g = 0; g < pool->sp_groupcount; g++) {
333 			pool->sp_groups[g].sg_maxthreads = max(1,
334 			    pool->sp_maxthreads / pool->sp_groupcount);
335 		}
336 		mtx_unlock(&pool->sp_lock);
337 	}
338 	return (error);
339 }
340 
341 /*
342  * Activate a transport handle.
343  */
344 void
345 xprt_register(SVCXPRT *xprt)
346 {
347 	SVCPOOL *pool = xprt->xp_pool;
348 	SVCGROUP *grp;
349 	int g;
350 
351 	SVC_ACQUIRE(xprt);
352 	g = atomic_fetchadd_int(&pool->sp_nextgroup, 1) % pool->sp_groupcount;
353 	xprt->xp_group = grp = &pool->sp_groups[g];
354 	mtx_lock(&grp->sg_lock);
355 	xprt->xp_registered = TRUE;
356 	xprt->xp_active = FALSE;
357 	TAILQ_INSERT_TAIL(&grp->sg_xlist, xprt, xp_link);
358 	mtx_unlock(&grp->sg_lock);
359 }
360 
361 /*
362  * De-activate a transport handle. Note: the locked version doesn't
363  * release the transport - caller must do that after dropping the pool
364  * lock.
365  */
366 static void
367 xprt_unregister_locked(SVCXPRT *xprt)
368 {
369 	SVCGROUP *grp = xprt->xp_group;
370 
371 	mtx_assert(&grp->sg_lock, MA_OWNED);
372 	KASSERT(xprt->xp_registered == TRUE,
373 	    ("xprt_unregister_locked: not registered"));
374 	xprt_inactive_locked(xprt);
375 	TAILQ_REMOVE(&grp->sg_xlist, xprt, xp_link);
376 	xprt->xp_registered = FALSE;
377 }
378 
379 void
380 xprt_unregister(SVCXPRT *xprt)
381 {
382 	SVCGROUP *grp = xprt->xp_group;
383 
384 	mtx_lock(&grp->sg_lock);
385 	if (xprt->xp_registered == FALSE) {
386 		/* Already unregistered by another thread */
387 		mtx_unlock(&grp->sg_lock);
388 		return;
389 	}
390 	xprt_unregister_locked(xprt);
391 	mtx_unlock(&grp->sg_lock);
392 
393 	if (xprt->xp_socket != NULL)
394 		soshutdown(xprt->xp_socket, SHUT_WR);
395 	SVC_RELEASE(xprt);
396 }
397 
398 /*
399  * Attempt to assign a service thread to this transport.
400  */
401 static int
402 xprt_assignthread(SVCXPRT *xprt)
403 {
404 	SVCGROUP *grp = xprt->xp_group;
405 	SVCTHREAD *st;
406 
407 	mtx_assert(&grp->sg_lock, MA_OWNED);
408 	st = LIST_FIRST(&grp->sg_idlethreads);
409 	if (st) {
410 		LIST_REMOVE(st, st_ilink);
411 		SVC_ACQUIRE(xprt);
412 		xprt->xp_thread = st;
413 		st->st_xprt = xprt;
414 		cv_signal(&st->st_cond);
415 		return (TRUE);
416 	} else {
417 		/*
418 		 * See if we can create a new thread. The
419 		 * actual thread creation happens in
420 		 * svc_run_internal because our locking state
421 		 * is poorly defined (we are typically called
422 		 * from a socket upcall). Don't create more
423 		 * than one thread per second.
424 		 */
425 		if (grp->sg_state == SVCPOOL_ACTIVE
426 		    && grp->sg_lastcreatetime < time_uptime
427 		    && grp->sg_threadcount < grp->sg_maxthreads) {
428 			grp->sg_state = SVCPOOL_THREADWANTED;
429 		}
430 	}
431 	return (FALSE);
432 }
433 
434 void
435 xprt_active(SVCXPRT *xprt)
436 {
437 	SVCGROUP *grp = xprt->xp_group;
438 
439 	mtx_lock(&grp->sg_lock);
440 
441 	if (!xprt->xp_registered) {
442 		/*
443 		 * Race with xprt_unregister - we lose.
444 		 */
445 		mtx_unlock(&grp->sg_lock);
446 		return;
447 	}
448 
449 	if (!xprt->xp_active) {
450 		xprt->xp_active = TRUE;
451 		if (xprt->xp_thread == NULL) {
452 			if (!svc_request_space_available(xprt->xp_pool) ||
453 			    !xprt_assignthread(xprt))
454 				TAILQ_INSERT_TAIL(&grp->sg_active, xprt,
455 				    xp_alink);
456 		}
457 	}
458 
459 	mtx_unlock(&grp->sg_lock);
460 }
461 
462 void
463 xprt_inactive_locked(SVCXPRT *xprt)
464 {
465 	SVCGROUP *grp = xprt->xp_group;
466 
467 	mtx_assert(&grp->sg_lock, MA_OWNED);
468 	if (xprt->xp_active) {
469 		if (xprt->xp_thread == NULL)
470 			TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
471 		xprt->xp_active = FALSE;
472 	}
473 }
474 
475 void
476 xprt_inactive(SVCXPRT *xprt)
477 {
478 	SVCGROUP *grp = xprt->xp_group;
479 
480 	mtx_lock(&grp->sg_lock);
481 	xprt_inactive_locked(xprt);
482 	mtx_unlock(&grp->sg_lock);
483 }
484 
485 /*
486  * Variant of xprt_inactive() for use only when sure that port is
487  * assigned to thread. For example, within receive handlers.
488  */
489 void
490 xprt_inactive_self(SVCXPRT *xprt)
491 {
492 
493 	KASSERT(xprt->xp_thread != NULL,
494 	    ("xprt_inactive_self(%p) with NULL xp_thread", xprt));
495 	xprt->xp_active = FALSE;
496 }
497 
498 /*
499  * Add a service program to the callout list.
500  * The dispatch routine will be called when a rpc request for this
501  * program number comes in.
502  */
503 bool_t
504 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
505     void (*dispatch)(struct svc_req *, SVCXPRT *),
506     const struct netconfig *nconf)
507 {
508 	SVCPOOL *pool = xprt->xp_pool;
509 	struct svc_callout *s;
510 	char *netid = NULL;
511 	int flag = 0;
512 
513 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
514 
515 	if (xprt->xp_netid) {
516 		netid = strdup(xprt->xp_netid, M_RPC);
517 		flag = 1;
518 	} else if (nconf && nconf->nc_netid) {
519 		netid = strdup(nconf->nc_netid, M_RPC);
520 		flag = 1;
521 	} /* must have been created with svc_raw_create */
522 	if ((netid == NULL) && (flag == 1)) {
523 		return (FALSE);
524 	}
525 
526 	mtx_lock(&pool->sp_lock);
527 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
528 		if (netid)
529 			free(netid, M_RPC);
530 		if (s->sc_dispatch == dispatch)
531 			goto rpcb_it; /* he is registering another xptr */
532 		mtx_unlock(&pool->sp_lock);
533 		return (FALSE);
534 	}
535 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
536 	if (s == NULL) {
537 		if (netid)
538 			free(netid, M_RPC);
539 		mtx_unlock(&pool->sp_lock);
540 		return (FALSE);
541 	}
542 
543 	s->sc_prog = prog;
544 	s->sc_vers = vers;
545 	s->sc_dispatch = dispatch;
546 	s->sc_netid = netid;
547 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
548 
549 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
550 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
551 
552 rpcb_it:
553 	mtx_unlock(&pool->sp_lock);
554 	/* now register the information with the local binder service */
555 	if (nconf) {
556 		bool_t dummy;
557 		struct netconfig tnc;
558 		struct netbuf nb;
559 		tnc = *nconf;
560 		nb.buf = &xprt->xp_ltaddr;
561 		nb.len = xprt->xp_ltaddr.ss_len;
562 		dummy = rpcb_set(prog, vers, &tnc, &nb);
563 		return (dummy);
564 	}
565 	return (TRUE);
566 }
567 
568 /*
569  * Remove a service program from the callout list.
570  */
571 void
572 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
573 {
574 	struct svc_callout *s;
575 
576 	/* unregister the information anyway */
577 	(void) rpcb_unset(prog, vers, NULL);
578 	mtx_lock(&pool->sp_lock);
579 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
580 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
581 		if (s->sc_netid)
582 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
583 		mem_free(s, sizeof (struct svc_callout));
584 	}
585 	mtx_unlock(&pool->sp_lock);
586 }
587 
588 /*
589  * Add a service connection loss program to the callout list.
590  * The dispatch routine will be called when some port in ths pool die.
591  */
592 bool_t
593 svc_loss_reg(SVCXPRT *xprt, void (*dispatch)(SVCXPRT *))
594 {
595 	SVCPOOL *pool = xprt->xp_pool;
596 	struct svc_loss_callout *s;
597 
598 	mtx_lock(&pool->sp_lock);
599 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
600 		if (s->slc_dispatch == dispatch)
601 			break;
602 	}
603 	if (s != NULL) {
604 		mtx_unlock(&pool->sp_lock);
605 		return (TRUE);
606 	}
607 	s = malloc(sizeof(struct svc_loss_callout), M_RPC, M_NOWAIT);
608 	if (s == NULL) {
609 		mtx_unlock(&pool->sp_lock);
610 		return (FALSE);
611 	}
612 	s->slc_dispatch = dispatch;
613 	TAILQ_INSERT_TAIL(&pool->sp_lcallouts, s, slc_link);
614 	mtx_unlock(&pool->sp_lock);
615 	return (TRUE);
616 }
617 
618 /*
619  * Remove a service connection loss program from the callout list.
620  */
621 void
622 svc_loss_unreg(SVCPOOL *pool, void (*dispatch)(SVCXPRT *))
623 {
624 	struct svc_loss_callout *s;
625 
626 	mtx_lock(&pool->sp_lock);
627 	TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link) {
628 		if (s->slc_dispatch == dispatch) {
629 			TAILQ_REMOVE(&pool->sp_lcallouts, s, slc_link);
630 			free(s, M_RPC);
631 			break;
632 		}
633 	}
634 	mtx_unlock(&pool->sp_lock);
635 }
636 
637 /* ********************** CALLOUT list related stuff ************* */
638 
639 /*
640  * Search the callout list for a program number, return the callout
641  * struct.
642  */
643 static struct svc_callout *
644 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
645 {
646 	struct svc_callout *s;
647 
648 	mtx_assert(&pool->sp_lock, MA_OWNED);
649 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
650 		if (s->sc_prog == prog && s->sc_vers == vers
651 		    && (netid == NULL || s->sc_netid == NULL ||
652 			strcmp(netid, s->sc_netid) == 0))
653 			break;
654 	}
655 
656 	return (s);
657 }
658 
659 /* ******************* REPLY GENERATION ROUTINES  ************ */
660 
661 static bool_t
662 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
663     struct mbuf *body)
664 {
665 	SVCXPRT *xprt = rqstp->rq_xprt;
666 	bool_t ok;
667 
668 	if (rqstp->rq_args) {
669 		m_freem(rqstp->rq_args);
670 		rqstp->rq_args = NULL;
671 	}
672 
673 	if (xprt->xp_pool->sp_rcache)
674 		replay_setreply(xprt->xp_pool->sp_rcache,
675 		    rply, svc_getrpccaller(rqstp), body);
676 
677 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
678 		return (FALSE);
679 
680 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body, &rqstp->rq_reply_seq);
681 	if (rqstp->rq_addr) {
682 		free(rqstp->rq_addr, M_SONAME);
683 		rqstp->rq_addr = NULL;
684 	}
685 
686 	return (ok);
687 }
688 
689 /*
690  * Send a reply to an rpc request
691  */
692 bool_t
693 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
694 {
695 	struct rpc_msg rply;
696 	struct mbuf *m;
697 	XDR xdrs;
698 	bool_t ok;
699 
700 	rply.rm_xid = rqstp->rq_xid;
701 	rply.rm_direction = REPLY;
702 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
703 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
704 	rply.acpted_rply.ar_stat = SUCCESS;
705 	rply.acpted_rply.ar_results.where = NULL;
706 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
707 
708 	m = m_getcl(M_WAITOK, MT_DATA, 0);
709 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
710 	ok = xdr_results(&xdrs, xdr_location);
711 	XDR_DESTROY(&xdrs);
712 
713 	if (ok) {
714 		return (svc_sendreply_common(rqstp, &rply, m));
715 	} else {
716 		m_freem(m);
717 		return (FALSE);
718 	}
719 }
720 
721 bool_t
722 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
723 {
724 	struct rpc_msg rply;
725 
726 	rply.rm_xid = rqstp->rq_xid;
727 	rply.rm_direction = REPLY;
728 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
729 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
730 	rply.acpted_rply.ar_stat = SUCCESS;
731 	rply.acpted_rply.ar_results.where = NULL;
732 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
733 
734 	return (svc_sendreply_common(rqstp, &rply, m));
735 }
736 
737 /*
738  * No procedure error reply
739  */
740 void
741 svcerr_noproc(struct svc_req *rqstp)
742 {
743 	SVCXPRT *xprt = rqstp->rq_xprt;
744 	struct rpc_msg rply;
745 
746 	rply.rm_xid = rqstp->rq_xid;
747 	rply.rm_direction = REPLY;
748 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
749 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
750 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
751 
752 	if (xprt->xp_pool->sp_rcache)
753 		replay_setreply(xprt->xp_pool->sp_rcache,
754 		    &rply, svc_getrpccaller(rqstp), NULL);
755 
756 	svc_sendreply_common(rqstp, &rply, NULL);
757 }
758 
759 /*
760  * Can't decode args error reply
761  */
762 void
763 svcerr_decode(struct svc_req *rqstp)
764 {
765 	SVCXPRT *xprt = rqstp->rq_xprt;
766 	struct rpc_msg rply;
767 
768 	rply.rm_xid = rqstp->rq_xid;
769 	rply.rm_direction = REPLY;
770 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
771 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
772 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
773 
774 	if (xprt->xp_pool->sp_rcache)
775 		replay_setreply(xprt->xp_pool->sp_rcache,
776 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
777 
778 	svc_sendreply_common(rqstp, &rply, NULL);
779 }
780 
781 /*
782  * Some system error
783  */
784 void
785 svcerr_systemerr(struct svc_req *rqstp)
786 {
787 	SVCXPRT *xprt = rqstp->rq_xprt;
788 	struct rpc_msg rply;
789 
790 	rply.rm_xid = rqstp->rq_xid;
791 	rply.rm_direction = REPLY;
792 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
793 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
794 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
795 
796 	if (xprt->xp_pool->sp_rcache)
797 		replay_setreply(xprt->xp_pool->sp_rcache,
798 		    &rply, svc_getrpccaller(rqstp), NULL);
799 
800 	svc_sendreply_common(rqstp, &rply, NULL);
801 }
802 
803 /*
804  * Authentication error reply
805  */
806 void
807 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
808 {
809 	SVCXPRT *xprt = rqstp->rq_xprt;
810 	struct rpc_msg rply;
811 
812 	rply.rm_xid = rqstp->rq_xid;
813 	rply.rm_direction = REPLY;
814 	rply.rm_reply.rp_stat = MSG_DENIED;
815 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
816 	rply.rjcted_rply.rj_why = why;
817 
818 	if (xprt->xp_pool->sp_rcache)
819 		replay_setreply(xprt->xp_pool->sp_rcache,
820 		    &rply, svc_getrpccaller(rqstp), NULL);
821 
822 	svc_sendreply_common(rqstp, &rply, NULL);
823 }
824 
825 /*
826  * Auth too weak error reply
827  */
828 void
829 svcerr_weakauth(struct svc_req *rqstp)
830 {
831 
832 	svcerr_auth(rqstp, AUTH_TOOWEAK);
833 }
834 
835 /*
836  * Program unavailable error reply
837  */
838 void
839 svcerr_noprog(struct svc_req *rqstp)
840 {
841 	SVCXPRT *xprt = rqstp->rq_xprt;
842 	struct rpc_msg rply;
843 
844 	rply.rm_xid = rqstp->rq_xid;
845 	rply.rm_direction = REPLY;
846 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
847 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
848 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
849 
850 	if (xprt->xp_pool->sp_rcache)
851 		replay_setreply(xprt->xp_pool->sp_rcache,
852 		    &rply, svc_getrpccaller(rqstp), NULL);
853 
854 	svc_sendreply_common(rqstp, &rply, NULL);
855 }
856 
857 /*
858  * Program version mismatch error reply
859  */
860 void
861 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
862 {
863 	SVCXPRT *xprt = rqstp->rq_xprt;
864 	struct rpc_msg rply;
865 
866 	rply.rm_xid = rqstp->rq_xid;
867 	rply.rm_direction = REPLY;
868 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
869 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
870 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
871 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
872 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
873 
874 	if (xprt->xp_pool->sp_rcache)
875 		replay_setreply(xprt->xp_pool->sp_rcache,
876 		    &rply, svc_getrpccaller(rqstp), NULL);
877 
878 	svc_sendreply_common(rqstp, &rply, NULL);
879 }
880 
881 /*
882  * Allocate a new server transport structure. All fields are
883  * initialized to zero and xp_p3 is initialized to point at an
884  * extension structure to hold various flags and authentication
885  * parameters.
886  */
887 SVCXPRT *
888 svc_xprt_alloc(void)
889 {
890 	SVCXPRT *xprt;
891 	SVCXPRT_EXT *ext;
892 
893 	xprt = mem_alloc(sizeof(SVCXPRT));
894 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
895 	xprt->xp_p3 = ext;
896 	refcount_init(&xprt->xp_refs, 1);
897 
898 	return (xprt);
899 }
900 
901 /*
902  * Free a server transport structure.
903  */
904 void
905 svc_xprt_free(SVCXPRT *xprt)
906 {
907 
908 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
909 	/* The size argument is ignored, so 0 is ok. */
910 	mem_free(xprt->xp_gidp, 0);
911 	mem_free(xprt, sizeof(SVCXPRT));
912 }
913 
914 /* ******************* SERVER INPUT STUFF ******************* */
915 
916 /*
917  * Read RPC requests from a transport and queue them to be
918  * executed. We handle authentication and replay cache replies here.
919  * Actually dispatching the RPC is deferred till svc_executereq.
920  */
921 static enum xprt_stat
922 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
923 {
924 	SVCPOOL *pool = xprt->xp_pool;
925 	struct svc_req *r;
926 	struct rpc_msg msg;
927 	struct mbuf *args;
928 	struct svc_loss_callout *s;
929 	enum xprt_stat stat;
930 
931 	/* now receive msgs from xprtprt (support batch calls) */
932 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
933 
934 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
935 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
936 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
937 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
938 		enum auth_stat why;
939 
940 		/*
941 		 * Handle replays and authenticate before queuing the
942 		 * request to be executed.
943 		 */
944 		SVC_ACQUIRE(xprt);
945 		r->rq_xprt = xprt;
946 		if (pool->sp_rcache) {
947 			struct rpc_msg repmsg;
948 			struct mbuf *repbody;
949 			enum replay_state rs;
950 			rs = replay_find(pool->sp_rcache, &msg,
951 			    svc_getrpccaller(r), &repmsg, &repbody);
952 			switch (rs) {
953 			case RS_NEW:
954 				break;
955 			case RS_DONE:
956 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
957 				    repbody, &r->rq_reply_seq);
958 				if (r->rq_addr) {
959 					free(r->rq_addr, M_SONAME);
960 					r->rq_addr = NULL;
961 				}
962 				m_freem(args);
963 				goto call_done;
964 
965 			default:
966 				m_freem(args);
967 				goto call_done;
968 			}
969 		}
970 
971 		r->rq_xid = msg.rm_xid;
972 		r->rq_prog = msg.rm_call.cb_prog;
973 		r->rq_vers = msg.rm_call.cb_vers;
974 		r->rq_proc = msg.rm_call.cb_proc;
975 		r->rq_size = sizeof(*r) + m_length(args, NULL);
976 		r->rq_args = args;
977 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
978 			/*
979 			 * RPCSEC_GSS uses this return code
980 			 * for requests that form part of its
981 			 * context establishment protocol and
982 			 * should not be dispatched to the
983 			 * application.
984 			 */
985 			if (why != RPCSEC_GSS_NODISPATCH)
986 				svcerr_auth(r, why);
987 			goto call_done;
988 		}
989 
990 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
991 			svcerr_decode(r);
992 			goto call_done;
993 		}
994 
995 		/*
996 		 * Everything checks out, return request to caller.
997 		 */
998 		*rqstp_ret = r;
999 		r = NULL;
1000 	}
1001 call_done:
1002 	if (r) {
1003 		svc_freereq(r);
1004 		r = NULL;
1005 	}
1006 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
1007 		TAILQ_FOREACH(s, &pool->sp_lcallouts, slc_link)
1008 			(*s->slc_dispatch)(xprt);
1009 		xprt_unregister(xprt);
1010 	}
1011 
1012 	return (stat);
1013 }
1014 
1015 static void
1016 svc_executereq(struct svc_req *rqstp)
1017 {
1018 	SVCXPRT *xprt = rqstp->rq_xprt;
1019 	SVCPOOL *pool = xprt->xp_pool;
1020 	int prog_found;
1021 	rpcvers_t low_vers;
1022 	rpcvers_t high_vers;
1023 	struct svc_callout *s;
1024 
1025 	/* now match message with a registered service*/
1026 	prog_found = FALSE;
1027 	low_vers = (rpcvers_t) -1L;
1028 	high_vers = (rpcvers_t) 0L;
1029 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
1030 		if (s->sc_prog == rqstp->rq_prog) {
1031 			if (s->sc_vers == rqstp->rq_vers) {
1032 				/*
1033 				 * We hand ownership of r to the
1034 				 * dispatch method - they must call
1035 				 * svc_freereq.
1036 				 */
1037 				(*s->sc_dispatch)(rqstp, xprt);
1038 				return;
1039 			}  /* found correct version */
1040 			prog_found = TRUE;
1041 			if (s->sc_vers < low_vers)
1042 				low_vers = s->sc_vers;
1043 			if (s->sc_vers > high_vers)
1044 				high_vers = s->sc_vers;
1045 		}   /* found correct program */
1046 	}
1047 
1048 	/*
1049 	 * if we got here, the program or version
1050 	 * is not served ...
1051 	 */
1052 	if (prog_found)
1053 		svcerr_progvers(rqstp, low_vers, high_vers);
1054 	else
1055 		svcerr_noprog(rqstp);
1056 
1057 	svc_freereq(rqstp);
1058 }
1059 
1060 static void
1061 svc_checkidle(SVCGROUP *grp)
1062 {
1063 	SVCXPRT *xprt, *nxprt;
1064 	time_t timo;
1065 	struct svcxprt_list cleanup;
1066 
1067 	TAILQ_INIT(&cleanup);
1068 	TAILQ_FOREACH_SAFE(xprt, &grp->sg_xlist, xp_link, nxprt) {
1069 		/*
1070 		 * Only some transports have idle timers. Don't time
1071 		 * something out which is just waking up.
1072 		 */
1073 		if (!xprt->xp_idletimeout || xprt->xp_thread)
1074 			continue;
1075 
1076 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
1077 		if (time_uptime > timo) {
1078 			xprt_unregister_locked(xprt);
1079 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
1080 		}
1081 	}
1082 
1083 	mtx_unlock(&grp->sg_lock);
1084 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
1085 		soshutdown(xprt->xp_socket, SHUT_WR);
1086 		SVC_RELEASE(xprt);
1087 	}
1088 	mtx_lock(&grp->sg_lock);
1089 }
1090 
1091 static void
1092 svc_assign_waiting_sockets(SVCPOOL *pool)
1093 {
1094 	SVCGROUP *grp;
1095 	SVCXPRT *xprt;
1096 	int g;
1097 
1098 	for (g = 0; g < pool->sp_groupcount; g++) {
1099 		grp = &pool->sp_groups[g];
1100 		mtx_lock(&grp->sg_lock);
1101 		while ((xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1102 			if (xprt_assignthread(xprt))
1103 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1104 			else
1105 				break;
1106 		}
1107 		mtx_unlock(&grp->sg_lock);
1108 	}
1109 }
1110 
1111 static void
1112 svc_change_space_used(SVCPOOL *pool, long delta)
1113 {
1114 	unsigned long value;
1115 
1116 	value = atomic_fetchadd_long(&pool->sp_space_used, delta) + delta;
1117 	if (delta > 0) {
1118 		if (value >= pool->sp_space_high && !pool->sp_space_throttled) {
1119 			pool->sp_space_throttled = TRUE;
1120 			pool->sp_space_throttle_count++;
1121 		}
1122 		if (value > pool->sp_space_used_highest)
1123 			pool->sp_space_used_highest = value;
1124 	} else {
1125 		if (value < pool->sp_space_low && pool->sp_space_throttled) {
1126 			pool->sp_space_throttled = FALSE;
1127 			svc_assign_waiting_sockets(pool);
1128 		}
1129 	}
1130 }
1131 
1132 static bool_t
1133 svc_request_space_available(SVCPOOL *pool)
1134 {
1135 
1136 	if (pool->sp_space_throttled)
1137 		return (FALSE);
1138 	return (TRUE);
1139 }
1140 
1141 static void
1142 svc_run_internal(SVCGROUP *grp, bool_t ismaster)
1143 {
1144 	SVCPOOL *pool = grp->sg_pool;
1145 	SVCTHREAD *st, *stpref;
1146 	SVCXPRT *xprt;
1147 	enum xprt_stat stat;
1148 	struct svc_req *rqstp;
1149 	struct proc *p;
1150 	long sz;
1151 	int error;
1152 
1153 	st = mem_alloc(sizeof(*st));
1154 	mtx_init(&st->st_lock, "st_lock", NULL, MTX_DEF);
1155 	st->st_pool = pool;
1156 	st->st_xprt = NULL;
1157 	STAILQ_INIT(&st->st_reqs);
1158 	cv_init(&st->st_cond, "rpcsvc");
1159 
1160 	mtx_lock(&grp->sg_lock);
1161 
1162 	/*
1163 	 * If we are a new thread which was spawned to cope with
1164 	 * increased load, set the state back to SVCPOOL_ACTIVE.
1165 	 */
1166 	if (grp->sg_state == SVCPOOL_THREADSTARTING)
1167 		grp->sg_state = SVCPOOL_ACTIVE;
1168 
1169 	while (grp->sg_state != SVCPOOL_CLOSING) {
1170 		/*
1171 		 * Create new thread if requested.
1172 		 */
1173 		if (grp->sg_state == SVCPOOL_THREADWANTED) {
1174 			grp->sg_state = SVCPOOL_THREADSTARTING;
1175 			grp->sg_lastcreatetime = time_uptime;
1176 			mtx_unlock(&grp->sg_lock);
1177 			svc_new_thread(grp);
1178 			mtx_lock(&grp->sg_lock);
1179 			continue;
1180 		}
1181 
1182 		/*
1183 		 * Check for idle transports once per second.
1184 		 */
1185 		if (time_uptime > grp->sg_lastidlecheck) {
1186 			grp->sg_lastidlecheck = time_uptime;
1187 			svc_checkidle(grp);
1188 		}
1189 
1190 		xprt = st->st_xprt;
1191 		if (!xprt) {
1192 			/*
1193 			 * Enforce maxthreads count.
1194 			 */
1195 			if (!ismaster && grp->sg_threadcount >
1196 			    grp->sg_maxthreads)
1197 				break;
1198 
1199 			/*
1200 			 * Before sleeping, see if we can find an
1201 			 * active transport which isn't being serviced
1202 			 * by a thread.
1203 			 */
1204 			if (svc_request_space_available(pool) &&
1205 			    (xprt = TAILQ_FIRST(&grp->sg_active)) != NULL) {
1206 				TAILQ_REMOVE(&grp->sg_active, xprt, xp_alink);
1207 				SVC_ACQUIRE(xprt);
1208 				xprt->xp_thread = st;
1209 				st->st_xprt = xprt;
1210 				continue;
1211 			}
1212 
1213 			LIST_INSERT_HEAD(&grp->sg_idlethreads, st, st_ilink);
1214 			if (ismaster || (!ismaster &&
1215 			    grp->sg_threadcount > grp->sg_minthreads))
1216 				error = cv_timedwait_sig(&st->st_cond,
1217 				    &grp->sg_lock, 5 * hz);
1218 			else
1219 				error = cv_wait_sig(&st->st_cond,
1220 				    &grp->sg_lock);
1221 			if (st->st_xprt == NULL)
1222 				LIST_REMOVE(st, st_ilink);
1223 
1224 			/*
1225 			 * Reduce worker thread count when idle.
1226 			 */
1227 			if (error == EWOULDBLOCK) {
1228 				if (!ismaster
1229 				    && (grp->sg_threadcount
1230 					> grp->sg_minthreads)
1231 					&& !st->st_xprt)
1232 					break;
1233 			} else if (error != 0) {
1234 				KASSERT(error == EINTR || error == ERESTART,
1235 				    ("non-signal error %d", error));
1236 				mtx_unlock(&grp->sg_lock);
1237 				p = curproc;
1238 				PROC_LOCK(p);
1239 				if (P_SHOULDSTOP(p) ||
1240 				    (p->p_flag & P_TOTAL_STOP) != 0) {
1241 					thread_suspend_check(0);
1242 					PROC_UNLOCK(p);
1243 					mtx_lock(&grp->sg_lock);
1244 				} else {
1245 					PROC_UNLOCK(p);
1246 					svc_exit(pool);
1247 					mtx_lock(&grp->sg_lock);
1248 					break;
1249 				}
1250 			}
1251 			continue;
1252 		}
1253 		mtx_unlock(&grp->sg_lock);
1254 
1255 		/*
1256 		 * Drain the transport socket and queue up any RPCs.
1257 		 */
1258 		xprt->xp_lastactive = time_uptime;
1259 		do {
1260 			if (!svc_request_space_available(pool))
1261 				break;
1262 			rqstp = NULL;
1263 			stat = svc_getreq(xprt, &rqstp);
1264 			if (rqstp) {
1265 				svc_change_space_used(pool, rqstp->rq_size);
1266 				/*
1267 				 * See if the application has a preference
1268 				 * for some other thread.
1269 				 */
1270 				if (pool->sp_assign) {
1271 					stpref = pool->sp_assign(st, rqstp);
1272 					rqstp->rq_thread = stpref;
1273 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1274 					    rqstp, rq_link);
1275 					mtx_unlock(&stpref->st_lock);
1276 					if (stpref != st)
1277 						rqstp = NULL;
1278 				} else {
1279 					rqstp->rq_thread = st;
1280 					STAILQ_INSERT_TAIL(&st->st_reqs,
1281 					    rqstp, rq_link);
1282 				}
1283 			}
1284 		} while (rqstp == NULL && stat == XPRT_MOREREQS
1285 		    && grp->sg_state != SVCPOOL_CLOSING);
1286 
1287 		/*
1288 		 * Move this transport to the end of the active list to
1289 		 * ensure fairness when multiple transports are active.
1290 		 * If this was the last queued request, svc_getreq will end
1291 		 * up calling xprt_inactive to remove from the active list.
1292 		 */
1293 		mtx_lock(&grp->sg_lock);
1294 		xprt->xp_thread = NULL;
1295 		st->st_xprt = NULL;
1296 		if (xprt->xp_active) {
1297 			if (!svc_request_space_available(pool) ||
1298 			    !xprt_assignthread(xprt))
1299 				TAILQ_INSERT_TAIL(&grp->sg_active,
1300 				    xprt, xp_alink);
1301 		}
1302 		mtx_unlock(&grp->sg_lock);
1303 		SVC_RELEASE(xprt);
1304 
1305 		/*
1306 		 * Execute what we have queued.
1307 		 */
1308 		mtx_lock(&st->st_lock);
1309 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1310 			STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1311 			mtx_unlock(&st->st_lock);
1312 			sz = (long)rqstp->rq_size;
1313 			svc_executereq(rqstp);
1314 			svc_change_space_used(pool, -sz);
1315 			mtx_lock(&st->st_lock);
1316 		}
1317 		mtx_unlock(&st->st_lock);
1318 		mtx_lock(&grp->sg_lock);
1319 	}
1320 
1321 	if (st->st_xprt) {
1322 		xprt = st->st_xprt;
1323 		st->st_xprt = NULL;
1324 		SVC_RELEASE(xprt);
1325 	}
1326 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1327 	mtx_destroy(&st->st_lock);
1328 	cv_destroy(&st->st_cond);
1329 	mem_free(st, sizeof(*st));
1330 
1331 	grp->sg_threadcount--;
1332 	if (!ismaster)
1333 		wakeup(grp);
1334 	mtx_unlock(&grp->sg_lock);
1335 }
1336 
1337 static void
1338 svc_thread_start(void *arg)
1339 {
1340 
1341 	svc_run_internal((SVCGROUP *) arg, FALSE);
1342 	kthread_exit();
1343 }
1344 
1345 static void
1346 svc_new_thread(SVCGROUP *grp)
1347 {
1348 	SVCPOOL *pool = grp->sg_pool;
1349 	struct thread *td;
1350 
1351 	mtx_lock(&grp->sg_lock);
1352 	grp->sg_threadcount++;
1353 	mtx_unlock(&grp->sg_lock);
1354 	kthread_add(svc_thread_start, grp, pool->sp_proc, &td, 0, 0,
1355 	    "%s: service", pool->sp_name);
1356 }
1357 
1358 void
1359 svc_run(SVCPOOL *pool)
1360 {
1361 	int g, i;
1362 	struct proc *p;
1363 	struct thread *td;
1364 	SVCGROUP *grp;
1365 
1366 	p = curproc;
1367 	td = curthread;
1368 	snprintf(td->td_name, sizeof(td->td_name),
1369 	    "%s: master", pool->sp_name);
1370 	pool->sp_state = SVCPOOL_ACTIVE;
1371 	pool->sp_proc = p;
1372 
1373 	/* Choose group count based on number of threads and CPUs. */
1374 	pool->sp_groupcount = max(1, min(SVC_MAXGROUPS,
1375 	    min(pool->sp_maxthreads / 2, mp_ncpus) / 6));
1376 	for (g = 0; g < pool->sp_groupcount; g++) {
1377 		grp = &pool->sp_groups[g];
1378 		grp->sg_minthreads = max(1,
1379 		    pool->sp_minthreads / pool->sp_groupcount);
1380 		grp->sg_maxthreads = max(1,
1381 		    pool->sp_maxthreads / pool->sp_groupcount);
1382 		grp->sg_lastcreatetime = time_uptime;
1383 	}
1384 
1385 	/* Starting threads */
1386 	pool->sp_groups[0].sg_threadcount++;
1387 	for (g = 0; g < pool->sp_groupcount; g++) {
1388 		grp = &pool->sp_groups[g];
1389 		for (i = ((g == 0) ? 1 : 0); i < grp->sg_minthreads; i++)
1390 			svc_new_thread(grp);
1391 	}
1392 	svc_run_internal(&pool->sp_groups[0], TRUE);
1393 
1394 	/* Waiting for threads to stop. */
1395 	for (g = 0; g < pool->sp_groupcount; g++) {
1396 		grp = &pool->sp_groups[g];
1397 		mtx_lock(&grp->sg_lock);
1398 		while (grp->sg_threadcount > 0)
1399 			msleep(grp, &grp->sg_lock, 0, "svcexit", 0);
1400 		mtx_unlock(&grp->sg_lock);
1401 	}
1402 }
1403 
1404 void
1405 svc_exit(SVCPOOL *pool)
1406 {
1407 	SVCGROUP *grp;
1408 	SVCTHREAD *st;
1409 	int g;
1410 
1411 	pool->sp_state = SVCPOOL_CLOSING;
1412 	for (g = 0; g < pool->sp_groupcount; g++) {
1413 		grp = &pool->sp_groups[g];
1414 		mtx_lock(&grp->sg_lock);
1415 		if (grp->sg_state != SVCPOOL_CLOSING) {
1416 			grp->sg_state = SVCPOOL_CLOSING;
1417 			LIST_FOREACH(st, &grp->sg_idlethreads, st_ilink)
1418 				cv_signal(&st->st_cond);
1419 		}
1420 		mtx_unlock(&grp->sg_lock);
1421 	}
1422 }
1423 
1424 bool_t
1425 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1426 {
1427 	struct mbuf *m;
1428 	XDR xdrs;
1429 	bool_t stat;
1430 
1431 	m = rqstp->rq_args;
1432 	rqstp->rq_args = NULL;
1433 
1434 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1435 	stat = xargs(&xdrs, args);
1436 	XDR_DESTROY(&xdrs);
1437 
1438 	return (stat);
1439 }
1440 
1441 bool_t
1442 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1443 {
1444 	XDR xdrs;
1445 
1446 	if (rqstp->rq_addr) {
1447 		free(rqstp->rq_addr, M_SONAME);
1448 		rqstp->rq_addr = NULL;
1449 	}
1450 
1451 	xdrs.x_op = XDR_FREE;
1452 	return (xargs(&xdrs, args));
1453 }
1454 
1455 void
1456 svc_freereq(struct svc_req *rqstp)
1457 {
1458 	SVCTHREAD *st;
1459 	SVCPOOL *pool;
1460 
1461 	st = rqstp->rq_thread;
1462 	if (st) {
1463 		pool = st->st_pool;
1464 		if (pool->sp_done)
1465 			pool->sp_done(st, rqstp);
1466 	}
1467 
1468 	if (rqstp->rq_auth.svc_ah_ops)
1469 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1470 
1471 	if (rqstp->rq_xprt) {
1472 		SVC_RELEASE(rqstp->rq_xprt);
1473 	}
1474 
1475 	if (rqstp->rq_addr)
1476 		free(rqstp->rq_addr, M_SONAME);
1477 
1478 	if (rqstp->rq_args)
1479 		m_freem(rqstp->rq_args);
1480 
1481 	free(rqstp, M_RPC);
1482 }
1483