xref: /freebsd/sys/rpc/svc.c (revision 39beb93c)
1 /*	$NetBSD: svc.c,v 1.21 2000/07/06 03:10:35 christos Exp $	*/
2 
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31 
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)svc.c 1.44 88/02/08 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)svc.c	2.4 88/08/11 4.0 RPCSRC";
35 #endif
36 #include <sys/cdefs.h>
37 __FBSDID("$FreeBSD$");
38 
39 /*
40  * svc.c, Server-side remote procedure call interface.
41  *
42  * There are two sets of procedures here.  The xprt routines are
43  * for handling transport handles.  The svc routines handle the
44  * list of service routines.
45  *
46  * Copyright (C) 1984, Sun Microsystems, Inc.
47  */
48 
49 #include <sys/param.h>
50 #include <sys/lock.h>
51 #include <sys/kernel.h>
52 #include <sys/kthread.h>
53 #include <sys/malloc.h>
54 #include <sys/mbuf.h>
55 #include <sys/mutex.h>
56 #include <sys/proc.h>
57 #include <sys/queue.h>
58 #include <sys/socketvar.h>
59 #include <sys/systm.h>
60 #include <sys/ucred.h>
61 
62 #include <rpc/rpc.h>
63 #include <rpc/rpcb_clnt.h>
64 #include <rpc/replay.h>
65 
66 #include <rpc/rpc_com.h>
67 
68 #define SVC_VERSQUIET 0x0001		/* keep quiet about vers mismatch */
69 #define version_keepquiet(xp) (SVC_EXT(xp)->xp_flags & SVC_VERSQUIET)
70 
71 static struct svc_callout *svc_find(SVCPOOL *pool, rpcprog_t, rpcvers_t,
72     char *);
73 static void svc_new_thread(SVCPOOL *pool);
74 static void xprt_unregister_locked(SVCXPRT *xprt);
75 
76 /* ***************  SVCXPRT related stuff **************** */
77 
78 static int svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS);
79 static int svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS);
80 
81 SVCPOOL*
82 svcpool_create(const char *name, struct sysctl_oid_list *sysctl_base)
83 {
84 	SVCPOOL *pool;
85 
86 	pool = malloc(sizeof(SVCPOOL), M_RPC, M_WAITOK|M_ZERO);
87 
88 	mtx_init(&pool->sp_lock, "sp_lock", NULL, MTX_DEF);
89 	pool->sp_name = name;
90 	pool->sp_state = SVCPOOL_INIT;
91 	pool->sp_proc = NULL;
92 	TAILQ_INIT(&pool->sp_xlist);
93 	TAILQ_INIT(&pool->sp_active);
94 	TAILQ_INIT(&pool->sp_callouts);
95 	LIST_INIT(&pool->sp_threads);
96 	LIST_INIT(&pool->sp_idlethreads);
97 	pool->sp_minthreads = 1;
98 	pool->sp_maxthreads = 1;
99 	pool->sp_threadcount = 0;
100 
101 	/*
102 	 * Don't use more than a quarter of mbuf clusters or more than
103 	 * 45Mb buffering requests.
104 	 */
105 	pool->sp_space_high = nmbclusters * MCLBYTES / 4;
106 	if (pool->sp_space_high > 45 << 20)
107 		pool->sp_space_high = 45 << 20;
108 	pool->sp_space_low = 2 * pool->sp_space_high / 3;
109 
110 	sysctl_ctx_init(&pool->sp_sysctl);
111 	if (sysctl_base) {
112 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
113 		    "minthreads", CTLTYPE_INT | CTLFLAG_RW,
114 		    pool, 0, svcpool_minthread_sysctl, "I", "");
115 		SYSCTL_ADD_PROC(&pool->sp_sysctl, sysctl_base, OID_AUTO,
116 		    "maxthreads", CTLTYPE_INT | CTLFLAG_RW,
117 		    pool, 0, svcpool_maxthread_sysctl, "I", "");
118 		SYSCTL_ADD_INT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
119 		    "threads", CTLFLAG_RD, &pool->sp_threadcount, 0, "");
120 
121 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
122 		    "request_space_used", CTLFLAG_RD,
123 		    &pool->sp_space_used, 0,
124 		    "Space in parsed but not handled requests.");
125 
126 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
127 		    "request_space_used_highest", CTLFLAG_RD,
128 		    &pool->sp_space_used_highest, 0,
129 		    "Highest space used since reboot.");
130 
131 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
132 		    "request_space_high", CTLFLAG_RW,
133 		    &pool->sp_space_high, 0,
134 		    "Maximum space in parsed but not handled requests.");
135 
136 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
137 		    "request_space_low", CTLFLAG_RW,
138 		    &pool->sp_space_low, 0,
139 		    "Low water mark for request space.");
140 
141 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
142 		    "request_space_throttled", CTLFLAG_RD,
143 		    &pool->sp_space_throttled, 0,
144 		    "Whether nfs requests are currently throttled");
145 
146 		SYSCTL_ADD_UINT(&pool->sp_sysctl, sysctl_base, OID_AUTO,
147 		    "request_space_throttle_count", CTLFLAG_RD,
148 		    &pool->sp_space_throttle_count, 0,
149 		    "Count of times throttling based on request space has occurred");
150 	}
151 
152 	return pool;
153 }
154 
155 void
156 svcpool_destroy(SVCPOOL *pool)
157 {
158 	SVCXPRT *xprt, *nxprt;
159 	struct svc_callout *s;
160 	struct svcxprt_list cleanup;
161 
162 	TAILQ_INIT(&cleanup);
163 	mtx_lock(&pool->sp_lock);
164 
165 	while (TAILQ_FIRST(&pool->sp_xlist)) {
166 		xprt = TAILQ_FIRST(&pool->sp_xlist);
167 		xprt_unregister_locked(xprt);
168 		TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
169 	}
170 
171 	while (TAILQ_FIRST(&pool->sp_callouts)) {
172 		s = TAILQ_FIRST(&pool->sp_callouts);
173 		mtx_unlock(&pool->sp_lock);
174 		svc_unreg(pool, s->sc_prog, s->sc_vers);
175 		mtx_lock(&pool->sp_lock);
176 	}
177 
178 	mtx_destroy(&pool->sp_lock);
179 
180 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
181 		SVC_RELEASE(xprt);
182 	}
183 
184 	if (pool->sp_rcache)
185 		replay_freecache(pool->sp_rcache);
186 
187 	sysctl_ctx_free(&pool->sp_sysctl);
188 	free(pool, M_RPC);
189 }
190 
191 static bool_t
192 svcpool_active(SVCPOOL *pool)
193 {
194 	enum svcpool_state state = pool->sp_state;
195 
196 	if (state == SVCPOOL_INIT || state == SVCPOOL_CLOSING)
197 		return (FALSE);
198 	return (TRUE);
199 }
200 
201 /*
202  * Sysctl handler to set the minimum thread count on a pool
203  */
204 static int
205 svcpool_minthread_sysctl(SYSCTL_HANDLER_ARGS)
206 {
207 	SVCPOOL *pool;
208 	int newminthreads, error, n;
209 
210 	pool = oidp->oid_arg1;
211 	newminthreads = pool->sp_minthreads;
212 	error = sysctl_handle_int(oidp, &newminthreads, 0, req);
213 	if (error == 0 && newminthreads != pool->sp_minthreads) {
214 		if (newminthreads > pool->sp_maxthreads)
215 			return (EINVAL);
216 		mtx_lock(&pool->sp_lock);
217 		if (newminthreads > pool->sp_minthreads
218 		    && svcpool_active(pool)) {
219 			/*
220 			 * If the pool is running and we are
221 			 * increasing, create some more threads now.
222 			 */
223 			n = newminthreads - pool->sp_threadcount;
224 			if (n > 0) {
225 				mtx_unlock(&pool->sp_lock);
226 				while (n--)
227 					svc_new_thread(pool);
228 				mtx_lock(&pool->sp_lock);
229 			}
230 		}
231 		pool->sp_minthreads = newminthreads;
232 		mtx_unlock(&pool->sp_lock);
233 	}
234 	return (error);
235 }
236 
237 /*
238  * Sysctl handler to set the maximum thread count on a pool
239  */
240 static int
241 svcpool_maxthread_sysctl(SYSCTL_HANDLER_ARGS)
242 {
243 	SVCPOOL *pool;
244 	SVCTHREAD *st;
245 	int newmaxthreads, error;
246 
247 	pool = oidp->oid_arg1;
248 	newmaxthreads = pool->sp_maxthreads;
249 	error = sysctl_handle_int(oidp, &newmaxthreads, 0, req);
250 	if (error == 0 && newmaxthreads != pool->sp_maxthreads) {
251 		if (newmaxthreads < pool->sp_minthreads)
252 			return (EINVAL);
253 		mtx_lock(&pool->sp_lock);
254 		if (newmaxthreads < pool->sp_maxthreads
255 		    && svcpool_active(pool)) {
256 			/*
257 			 * If the pool is running and we are
258 			 * decreasing, wake up some idle threads to
259 			 * encourage them to exit.
260 			 */
261 			LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
262 				cv_signal(&st->st_cond);
263 		}
264 		pool->sp_maxthreads = newmaxthreads;
265 		mtx_unlock(&pool->sp_lock);
266 	}
267 	return (error);
268 }
269 
270 /*
271  * Activate a transport handle.
272  */
273 void
274 xprt_register(SVCXPRT *xprt)
275 {
276 	SVCPOOL *pool = xprt->xp_pool;
277 
278 	mtx_lock(&pool->sp_lock);
279 	xprt->xp_registered = TRUE;
280 	xprt->xp_active = FALSE;
281 	TAILQ_INSERT_TAIL(&pool->sp_xlist, xprt, xp_link);
282 	mtx_unlock(&pool->sp_lock);
283 }
284 
285 /*
286  * De-activate a transport handle. Note: the locked version doesn't
287  * release the transport - caller must do that after dropping the pool
288  * lock.
289  */
290 static void
291 xprt_unregister_locked(SVCXPRT *xprt)
292 {
293 	SVCPOOL *pool = xprt->xp_pool;
294 
295 	if (xprt->xp_active) {
296 		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
297 		xprt->xp_active = FALSE;
298 	}
299 	TAILQ_REMOVE(&pool->sp_xlist, xprt, xp_link);
300 	xprt->xp_registered = FALSE;
301 }
302 
303 void
304 xprt_unregister(SVCXPRT *xprt)
305 {
306 	SVCPOOL *pool = xprt->xp_pool;
307 
308 	mtx_lock(&pool->sp_lock);
309 	xprt_unregister_locked(xprt);
310 	mtx_unlock(&pool->sp_lock);
311 
312 	SVC_RELEASE(xprt);
313 }
314 
315 static void
316 xprt_assignthread(SVCXPRT *xprt)
317 {
318 	SVCPOOL *pool = xprt->xp_pool;
319 	SVCTHREAD *st;
320 
321 	/*
322 	 * Attempt to assign a service thread to this
323 	 * transport.
324 	 */
325 	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink) {
326 		if (st->st_xprt == NULL && STAILQ_EMPTY(&st->st_reqs))
327 			break;
328 	}
329 	if (st) {
330 		SVC_ACQUIRE(xprt);
331 		xprt->xp_thread = st;
332 		st->st_xprt = xprt;
333 		cv_signal(&st->st_cond);
334 	} else {
335 		/*
336 		 * See if we can create a new thread. The
337 		 * actual thread creation happens in
338 		 * svc_run_internal because our locking state
339 		 * is poorly defined (we are typically called
340 		 * from a socket upcall). Don't create more
341 		 * than one thread per second.
342 		 */
343 		if (pool->sp_state == SVCPOOL_ACTIVE
344 		    && pool->sp_lastcreatetime < time_uptime
345 		    && pool->sp_threadcount < pool->sp_maxthreads) {
346 			pool->sp_state = SVCPOOL_THREADWANTED;
347 		}
348 	}
349 }
350 
351 void
352 xprt_active(SVCXPRT *xprt)
353 {
354 	SVCPOOL *pool = xprt->xp_pool;
355 
356 	if (!xprt->xp_registered) {
357 		/*
358 		 * Race with xprt_unregister - we lose.
359 		 */
360 		return;
361 	}
362 
363 	mtx_lock(&pool->sp_lock);
364 
365 	if (!xprt->xp_active) {
366 		TAILQ_INSERT_TAIL(&pool->sp_active, xprt, xp_alink);
367 		xprt->xp_active = TRUE;
368 		xprt_assignthread(xprt);
369 	}
370 
371 	mtx_unlock(&pool->sp_lock);
372 }
373 
374 void
375 xprt_inactive_locked(SVCXPRT *xprt)
376 {
377 	SVCPOOL *pool = xprt->xp_pool;
378 
379 	if (xprt->xp_active) {
380 		TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
381 		xprt->xp_active = FALSE;
382 	}
383 }
384 
385 void
386 xprt_inactive(SVCXPRT *xprt)
387 {
388 	SVCPOOL *pool = xprt->xp_pool;
389 
390 	mtx_lock(&pool->sp_lock);
391 	xprt_inactive_locked(xprt);
392 	mtx_unlock(&pool->sp_lock);
393 }
394 
395 /*
396  * Add a service program to the callout list.
397  * The dispatch routine will be called when a rpc request for this
398  * program number comes in.
399  */
400 bool_t
401 svc_reg(SVCXPRT *xprt, const rpcprog_t prog, const rpcvers_t vers,
402     void (*dispatch)(struct svc_req *, SVCXPRT *),
403     const struct netconfig *nconf)
404 {
405 	SVCPOOL *pool = xprt->xp_pool;
406 	struct svc_callout *s;
407 	char *netid = NULL;
408 	int flag = 0;
409 
410 /* VARIABLES PROTECTED BY svc_lock: s, svc_head */
411 
412 	if (xprt->xp_netid) {
413 		netid = strdup(xprt->xp_netid, M_RPC);
414 		flag = 1;
415 	} else if (nconf && nconf->nc_netid) {
416 		netid = strdup(nconf->nc_netid, M_RPC);
417 		flag = 1;
418 	} /* must have been created with svc_raw_create */
419 	if ((netid == NULL) && (flag == 1)) {
420 		return (FALSE);
421 	}
422 
423 	mtx_lock(&pool->sp_lock);
424 	if ((s = svc_find(pool, prog, vers, netid)) != NULL) {
425 		if (netid)
426 			free(netid, M_RPC);
427 		if (s->sc_dispatch == dispatch)
428 			goto rpcb_it; /* he is registering another xptr */
429 		mtx_unlock(&pool->sp_lock);
430 		return (FALSE);
431 	}
432 	s = malloc(sizeof (struct svc_callout), M_RPC, M_NOWAIT);
433 	if (s == NULL) {
434 		if (netid)
435 			free(netid, M_RPC);
436 		mtx_unlock(&pool->sp_lock);
437 		return (FALSE);
438 	}
439 
440 	s->sc_prog = prog;
441 	s->sc_vers = vers;
442 	s->sc_dispatch = dispatch;
443 	s->sc_netid = netid;
444 	TAILQ_INSERT_TAIL(&pool->sp_callouts, s, sc_link);
445 
446 	if ((xprt->xp_netid == NULL) && (flag == 1) && netid)
447 		((SVCXPRT *) xprt)->xp_netid = strdup(netid, M_RPC);
448 
449 rpcb_it:
450 	mtx_unlock(&pool->sp_lock);
451 	/* now register the information with the local binder service */
452 	if (nconf) {
453 		bool_t dummy;
454 		struct netconfig tnc;
455 		struct netbuf nb;
456 		tnc = *nconf;
457 		nb.buf = &xprt->xp_ltaddr;
458 		nb.len = xprt->xp_ltaddr.ss_len;
459 		dummy = rpcb_set(prog, vers, &tnc, &nb);
460 		return (dummy);
461 	}
462 	return (TRUE);
463 }
464 
465 /*
466  * Remove a service program from the callout list.
467  */
468 void
469 svc_unreg(SVCPOOL *pool, const rpcprog_t prog, const rpcvers_t vers)
470 {
471 	struct svc_callout *s;
472 
473 	/* unregister the information anyway */
474 	(void) rpcb_unset(prog, vers, NULL);
475 	mtx_lock(&pool->sp_lock);
476 	while ((s = svc_find(pool, prog, vers, NULL)) != NULL) {
477 		TAILQ_REMOVE(&pool->sp_callouts, s, sc_link);
478 		if (s->sc_netid)
479 			mem_free(s->sc_netid, sizeof (s->sc_netid) + 1);
480 		mem_free(s, sizeof (struct svc_callout));
481 	}
482 	mtx_unlock(&pool->sp_lock);
483 }
484 
485 /* ********************** CALLOUT list related stuff ************* */
486 
487 /*
488  * Search the callout list for a program number, return the callout
489  * struct.
490  */
491 static struct svc_callout *
492 svc_find(SVCPOOL *pool, rpcprog_t prog, rpcvers_t vers, char *netid)
493 {
494 	struct svc_callout *s;
495 
496 	mtx_assert(&pool->sp_lock, MA_OWNED);
497 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
498 		if (s->sc_prog == prog && s->sc_vers == vers
499 		    && (netid == NULL || s->sc_netid == NULL ||
500 			strcmp(netid, s->sc_netid) == 0))
501 			break;
502 	}
503 
504 	return (s);
505 }
506 
507 /* ******************* REPLY GENERATION ROUTINES  ************ */
508 
509 static bool_t
510 svc_sendreply_common(struct svc_req *rqstp, struct rpc_msg *rply,
511     struct mbuf *body)
512 {
513 	SVCXPRT *xprt = rqstp->rq_xprt;
514 	bool_t ok;
515 
516 	if (rqstp->rq_args) {
517 		m_freem(rqstp->rq_args);
518 		rqstp->rq_args = NULL;
519 	}
520 
521 	if (xprt->xp_pool->sp_rcache)
522 		replay_setreply(xprt->xp_pool->sp_rcache,
523 		    rply, svc_getrpccaller(rqstp), body);
524 
525 	if (!SVCAUTH_WRAP(&rqstp->rq_auth, &body))
526 		return (FALSE);
527 
528 	ok = SVC_REPLY(xprt, rply, rqstp->rq_addr, body);
529 	if (rqstp->rq_addr) {
530 		free(rqstp->rq_addr, M_SONAME);
531 		rqstp->rq_addr = NULL;
532 	}
533 
534 	return (ok);
535 }
536 
537 /*
538  * Send a reply to an rpc request
539  */
540 bool_t
541 svc_sendreply(struct svc_req *rqstp, xdrproc_t xdr_results, void * xdr_location)
542 {
543 	struct rpc_msg rply;
544 	struct mbuf *m;
545 	XDR xdrs;
546 	bool_t ok;
547 
548 	rply.rm_xid = rqstp->rq_xid;
549 	rply.rm_direction = REPLY;
550 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
551 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
552 	rply.acpted_rply.ar_stat = SUCCESS;
553 	rply.acpted_rply.ar_results.where = NULL;
554 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
555 
556 	MGET(m, M_WAIT, MT_DATA);
557 	MCLGET(m, M_WAIT);
558 	m->m_len = 0;
559 	xdrmbuf_create(&xdrs, m, XDR_ENCODE);
560 	ok = xdr_results(&xdrs, xdr_location);
561 	XDR_DESTROY(&xdrs);
562 
563 	if (ok) {
564 		return (svc_sendreply_common(rqstp, &rply, m));
565 	} else {
566 		m_freem(m);
567 		return (FALSE);
568 	}
569 }
570 
571 bool_t
572 svc_sendreply_mbuf(struct svc_req *rqstp, struct mbuf *m)
573 {
574 	struct rpc_msg rply;
575 
576 	rply.rm_xid = rqstp->rq_xid;
577 	rply.rm_direction = REPLY;
578 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
579 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
580 	rply.acpted_rply.ar_stat = SUCCESS;
581 	rply.acpted_rply.ar_results.where = NULL;
582 	rply.acpted_rply.ar_results.proc = (xdrproc_t) xdr_void;
583 
584 	return (svc_sendreply_common(rqstp, &rply, m));
585 }
586 
587 /*
588  * No procedure error reply
589  */
590 void
591 svcerr_noproc(struct svc_req *rqstp)
592 {
593 	SVCXPRT *xprt = rqstp->rq_xprt;
594 	struct rpc_msg rply;
595 
596 	rply.rm_xid = rqstp->rq_xid;
597 	rply.rm_direction = REPLY;
598 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
599 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
600 	rply.acpted_rply.ar_stat = PROC_UNAVAIL;
601 
602 	if (xprt->xp_pool->sp_rcache)
603 		replay_setreply(xprt->xp_pool->sp_rcache,
604 		    &rply, svc_getrpccaller(rqstp), NULL);
605 
606 	svc_sendreply_common(rqstp, &rply, NULL);
607 }
608 
609 /*
610  * Can't decode args error reply
611  */
612 void
613 svcerr_decode(struct svc_req *rqstp)
614 {
615 	SVCXPRT *xprt = rqstp->rq_xprt;
616 	struct rpc_msg rply;
617 
618 	rply.rm_xid = rqstp->rq_xid;
619 	rply.rm_direction = REPLY;
620 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
621 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
622 	rply.acpted_rply.ar_stat = GARBAGE_ARGS;
623 
624 	if (xprt->xp_pool->sp_rcache)
625 		replay_setreply(xprt->xp_pool->sp_rcache,
626 		    &rply, (struct sockaddr *) &xprt->xp_rtaddr, NULL);
627 
628 	svc_sendreply_common(rqstp, &rply, NULL);
629 }
630 
631 /*
632  * Some system error
633  */
634 void
635 svcerr_systemerr(struct svc_req *rqstp)
636 {
637 	SVCXPRT *xprt = rqstp->rq_xprt;
638 	struct rpc_msg rply;
639 
640 	rply.rm_xid = rqstp->rq_xid;
641 	rply.rm_direction = REPLY;
642 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
643 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
644 	rply.acpted_rply.ar_stat = SYSTEM_ERR;
645 
646 	if (xprt->xp_pool->sp_rcache)
647 		replay_setreply(xprt->xp_pool->sp_rcache,
648 		    &rply, svc_getrpccaller(rqstp), NULL);
649 
650 	svc_sendreply_common(rqstp, &rply, NULL);
651 }
652 
653 /*
654  * Authentication error reply
655  */
656 void
657 svcerr_auth(struct svc_req *rqstp, enum auth_stat why)
658 {
659 	SVCXPRT *xprt = rqstp->rq_xprt;
660 	struct rpc_msg rply;
661 
662 	rply.rm_xid = rqstp->rq_xid;
663 	rply.rm_direction = REPLY;
664 	rply.rm_reply.rp_stat = MSG_DENIED;
665 	rply.rjcted_rply.rj_stat = AUTH_ERROR;
666 	rply.rjcted_rply.rj_why = why;
667 
668 	if (xprt->xp_pool->sp_rcache)
669 		replay_setreply(xprt->xp_pool->sp_rcache,
670 		    &rply, svc_getrpccaller(rqstp), NULL);
671 
672 	svc_sendreply_common(rqstp, &rply, NULL);
673 }
674 
675 /*
676  * Auth too weak error reply
677  */
678 void
679 svcerr_weakauth(struct svc_req *rqstp)
680 {
681 
682 	svcerr_auth(rqstp, AUTH_TOOWEAK);
683 }
684 
685 /*
686  * Program unavailable error reply
687  */
688 void
689 svcerr_noprog(struct svc_req *rqstp)
690 {
691 	SVCXPRT *xprt = rqstp->rq_xprt;
692 	struct rpc_msg rply;
693 
694 	rply.rm_xid = rqstp->rq_xid;
695 	rply.rm_direction = REPLY;
696 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
697 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
698 	rply.acpted_rply.ar_stat = PROG_UNAVAIL;
699 
700 	if (xprt->xp_pool->sp_rcache)
701 		replay_setreply(xprt->xp_pool->sp_rcache,
702 		    &rply, svc_getrpccaller(rqstp), NULL);
703 
704 	svc_sendreply_common(rqstp, &rply, NULL);
705 }
706 
707 /*
708  * Program version mismatch error reply
709  */
710 void
711 svcerr_progvers(struct svc_req *rqstp, rpcvers_t low_vers, rpcvers_t high_vers)
712 {
713 	SVCXPRT *xprt = rqstp->rq_xprt;
714 	struct rpc_msg rply;
715 
716 	rply.rm_xid = rqstp->rq_xid;
717 	rply.rm_direction = REPLY;
718 	rply.rm_reply.rp_stat = MSG_ACCEPTED;
719 	rply.acpted_rply.ar_verf = rqstp->rq_verf;
720 	rply.acpted_rply.ar_stat = PROG_MISMATCH;
721 	rply.acpted_rply.ar_vers.low = (uint32_t)low_vers;
722 	rply.acpted_rply.ar_vers.high = (uint32_t)high_vers;
723 
724 	if (xprt->xp_pool->sp_rcache)
725 		replay_setreply(xprt->xp_pool->sp_rcache,
726 		    &rply, svc_getrpccaller(rqstp), NULL);
727 
728 	svc_sendreply_common(rqstp, &rply, NULL);
729 }
730 
731 /*
732  * Allocate a new server transport structure. All fields are
733  * initialized to zero and xp_p3 is initialized to point at an
734  * extension structure to hold various flags and authentication
735  * parameters.
736  */
737 SVCXPRT *
738 svc_xprt_alloc()
739 {
740 	SVCXPRT *xprt;
741 	SVCXPRT_EXT *ext;
742 
743 	xprt = mem_alloc(sizeof(SVCXPRT));
744 	memset(xprt, 0, sizeof(SVCXPRT));
745 	ext = mem_alloc(sizeof(SVCXPRT_EXT));
746 	memset(ext, 0, sizeof(SVCXPRT_EXT));
747 	xprt->xp_p3 = ext;
748 	refcount_init(&xprt->xp_refs, 1);
749 
750 	return (xprt);
751 }
752 
753 /*
754  * Free a server transport structure.
755  */
756 void
757 svc_xprt_free(xprt)
758 	SVCXPRT *xprt;
759 {
760 
761 	mem_free(xprt->xp_p3, sizeof(SVCXPRT_EXT));
762 	mem_free(xprt, sizeof(SVCXPRT));
763 }
764 
765 /* ******************* SERVER INPUT STUFF ******************* */
766 
767 /*
768  * Read RPC requests from a transport and queue them to be
769  * executed. We handle authentication and replay cache replies here.
770  * Actually dispatching the RPC is deferred till svc_executereq.
771  */
772 static enum xprt_stat
773 svc_getreq(SVCXPRT *xprt, struct svc_req **rqstp_ret)
774 {
775 	SVCPOOL *pool = xprt->xp_pool;
776 	struct svc_req *r;
777 	struct rpc_msg msg;
778 	struct mbuf *args;
779 	enum xprt_stat stat;
780 
781 	/* now receive msgs from xprtprt (support batch calls) */
782 	r = malloc(sizeof(*r), M_RPC, M_WAITOK|M_ZERO);
783 
784 	msg.rm_call.cb_cred.oa_base = r->rq_credarea;
785 	msg.rm_call.cb_verf.oa_base = &r->rq_credarea[MAX_AUTH_BYTES];
786 	r->rq_clntcred = &r->rq_credarea[2*MAX_AUTH_BYTES];
787 	if (SVC_RECV(xprt, &msg, &r->rq_addr, &args)) {
788 		enum auth_stat why;
789 
790 		/*
791 		 * Handle replays and authenticate before queuing the
792 		 * request to be executed.
793 		 */
794 		SVC_ACQUIRE(xprt);
795 		r->rq_xprt = xprt;
796 		if (pool->sp_rcache) {
797 			struct rpc_msg repmsg;
798 			struct mbuf *repbody;
799 			enum replay_state rs;
800 			rs = replay_find(pool->sp_rcache, &msg,
801 			    svc_getrpccaller(r), &repmsg, &repbody);
802 			switch (rs) {
803 			case RS_NEW:
804 				break;
805 			case RS_DONE:
806 				SVC_REPLY(xprt, &repmsg, r->rq_addr,
807 				    repbody);
808 				if (r->rq_addr) {
809 					free(r->rq_addr, M_SONAME);
810 					r->rq_addr = NULL;
811 				}
812 				goto call_done;
813 
814 			default:
815 				goto call_done;
816 			}
817 		}
818 
819 		r->rq_xid = msg.rm_xid;
820 		r->rq_prog = msg.rm_call.cb_prog;
821 		r->rq_vers = msg.rm_call.cb_vers;
822 		r->rq_proc = msg.rm_call.cb_proc;
823 		r->rq_size = sizeof(*r) + m_length(args, NULL);
824 		r->rq_args = args;
825 		if ((why = _authenticate(r, &msg)) != AUTH_OK) {
826 			/*
827 			 * RPCSEC_GSS uses this return code
828 			 * for requests that form part of its
829 			 * context establishment protocol and
830 			 * should not be dispatched to the
831 			 * application.
832 			 */
833 			if (why != RPCSEC_GSS_NODISPATCH)
834 				svcerr_auth(r, why);
835 			goto call_done;
836 		}
837 
838 		if (!SVCAUTH_UNWRAP(&r->rq_auth, &r->rq_args)) {
839 			svcerr_decode(r);
840 			goto call_done;
841 		}
842 
843 		/*
844 		 * Everything checks out, return request to caller.
845 		 */
846 		*rqstp_ret = r;
847 		r = NULL;
848 	}
849 call_done:
850 	if (r) {
851 		svc_freereq(r);
852 		r = NULL;
853 	}
854 	if ((stat = SVC_STAT(xprt)) == XPRT_DIED) {
855 		xprt_unregister(xprt);
856 	}
857 
858 	return (stat);
859 }
860 
861 static void
862 svc_executereq(struct svc_req *rqstp)
863 {
864 	SVCXPRT *xprt = rqstp->rq_xprt;
865 	SVCPOOL *pool = xprt->xp_pool;
866 	int prog_found;
867 	rpcvers_t low_vers;
868 	rpcvers_t high_vers;
869 	struct svc_callout *s;
870 
871 	/* now match message with a registered service*/
872 	prog_found = FALSE;
873 	low_vers = (rpcvers_t) -1L;
874 	high_vers = (rpcvers_t) 0L;
875 	TAILQ_FOREACH(s, &pool->sp_callouts, sc_link) {
876 		if (s->sc_prog == rqstp->rq_prog) {
877 			if (s->sc_vers == rqstp->rq_vers) {
878 				/*
879 				 * We hand ownership of r to the
880 				 * dispatch method - they must call
881 				 * svc_freereq.
882 				 */
883 				(*s->sc_dispatch)(rqstp, xprt);
884 				return;
885 			}  /* found correct version */
886 			prog_found = TRUE;
887 			if (s->sc_vers < low_vers)
888 				low_vers = s->sc_vers;
889 			if (s->sc_vers > high_vers)
890 				high_vers = s->sc_vers;
891 		}   /* found correct program */
892 	}
893 
894 	/*
895 	 * if we got here, the program or version
896 	 * is not served ...
897 	 */
898 	if (prog_found)
899 		svcerr_progvers(rqstp, low_vers, high_vers);
900 	else
901 		svcerr_noprog(rqstp);
902 
903 	svc_freereq(rqstp);
904 }
905 
906 static void
907 svc_checkidle(SVCPOOL *pool)
908 {
909 	SVCXPRT *xprt, *nxprt;
910 	time_t timo;
911 	struct svcxprt_list cleanup;
912 
913 	TAILQ_INIT(&cleanup);
914 	TAILQ_FOREACH_SAFE(xprt, &pool->sp_xlist, xp_link, nxprt) {
915 		/*
916 		 * Only some transports have idle timers. Don't time
917 		 * something out which is just waking up.
918 		 */
919 		if (!xprt->xp_idletimeout || xprt->xp_thread)
920 			continue;
921 
922 		timo = xprt->xp_lastactive + xprt->xp_idletimeout;
923 		if (time_uptime > timo) {
924 			xprt_unregister_locked(xprt);
925 			TAILQ_INSERT_TAIL(&cleanup, xprt, xp_link);
926 		}
927 	}
928 
929 	mtx_unlock(&pool->sp_lock);
930 	TAILQ_FOREACH_SAFE(xprt, &cleanup, xp_link, nxprt) {
931 		SVC_RELEASE(xprt);
932 	}
933 	mtx_lock(&pool->sp_lock);
934 
935 }
936 
937 static void
938 svc_assign_waiting_sockets(SVCPOOL *pool)
939 {
940 	SVCXPRT *xprt;
941 
942 	TAILQ_FOREACH(xprt, &pool->sp_active, xp_alink) {
943 		if (!xprt->xp_thread) {
944 			xprt_assignthread(xprt);
945 		}
946 	}
947 }
948 
949 static bool_t
950 svc_request_space_available(SVCPOOL *pool)
951 {
952 
953 	mtx_assert(&pool->sp_lock, MA_OWNED);
954 
955 	if (pool->sp_space_throttled) {
956 		/*
957 		 * Below the low-water yet? If so, assign any waiting sockets.
958 		 */
959 		if (pool->sp_space_used < pool->sp_space_low) {
960 			pool->sp_space_throttled = FALSE;
961 			svc_assign_waiting_sockets(pool);
962 			return TRUE;
963 		}
964 
965 		return FALSE;
966 	} else {
967 		if (pool->sp_space_used
968 		    >= pool->sp_space_high) {
969 			pool->sp_space_throttled = TRUE;
970 			pool->sp_space_throttle_count++;
971 			return FALSE;
972 		}
973 
974 		return TRUE;
975 	}
976 }
977 
978 static void
979 svc_run_internal(SVCPOOL *pool, bool_t ismaster)
980 {
981 	SVCTHREAD *st, *stpref;
982 	SVCXPRT *xprt;
983 	enum xprt_stat stat;
984 	struct svc_req *rqstp;
985 	int error;
986 
987 	st = mem_alloc(sizeof(*st));
988 	st->st_xprt = NULL;
989 	STAILQ_INIT(&st->st_reqs);
990 	cv_init(&st->st_cond, "rpcsvc");
991 
992 	mtx_lock(&pool->sp_lock);
993 	LIST_INSERT_HEAD(&pool->sp_threads, st, st_link);
994 
995 	/*
996 	 * If we are a new thread which was spawned to cope with
997 	 * increased load, set the state back to SVCPOOL_ACTIVE.
998 	 */
999 	if (pool->sp_state == SVCPOOL_THREADSTARTING)
1000 		pool->sp_state = SVCPOOL_ACTIVE;
1001 
1002 	while (pool->sp_state != SVCPOOL_CLOSING) {
1003 		/*
1004 		 * Check for idle transports once per second.
1005 		 */
1006 		if (time_uptime > pool->sp_lastidlecheck) {
1007 			pool->sp_lastidlecheck = time_uptime;
1008 			svc_checkidle(pool);
1009 		}
1010 
1011 		xprt = st->st_xprt;
1012 		if (!xprt && STAILQ_EMPTY(&st->st_reqs)) {
1013 			/*
1014 			 * Enforce maxthreads count.
1015 			 */
1016 			if (pool->sp_threadcount > pool->sp_maxthreads)
1017 				break;
1018 
1019 			/*
1020 			 * Before sleeping, see if we can find an
1021 			 * active transport which isn't being serviced
1022 			 * by a thread.
1023 			 */
1024 			if (svc_request_space_available(pool)) {
1025 				TAILQ_FOREACH(xprt, &pool->sp_active,
1026 				    xp_alink) {
1027 					if (!xprt->xp_thread) {
1028 						SVC_ACQUIRE(xprt);
1029 						xprt->xp_thread = st;
1030 						st->st_xprt = xprt;
1031 						break;
1032 					}
1033 				}
1034 			}
1035 			if (st->st_xprt)
1036 				continue;
1037 
1038 			LIST_INSERT_HEAD(&pool->sp_idlethreads, st, st_ilink);
1039 			error = cv_timedwait_sig(&st->st_cond, &pool->sp_lock,
1040 				5 * hz);
1041 			LIST_REMOVE(st, st_ilink);
1042 
1043 			/*
1044 			 * Reduce worker thread count when idle.
1045 			 */
1046 			if (error == EWOULDBLOCK) {
1047 				if (!ismaster
1048 				    && (pool->sp_threadcount
1049 					> pool->sp_minthreads)
1050 					&& !st->st_xprt
1051 					&& STAILQ_EMPTY(&st->st_reqs))
1052 					break;
1053 			}
1054 			if (error == EWOULDBLOCK)
1055 				continue;
1056 			if (error) {
1057 				if (pool->sp_state != SVCPOOL_CLOSING) {
1058 					mtx_unlock(&pool->sp_lock);
1059 					svc_exit(pool);
1060 					mtx_lock(&pool->sp_lock);
1061 				}
1062 				break;
1063 			}
1064 
1065 			if (pool->sp_state == SVCPOOL_THREADWANTED) {
1066 				pool->sp_state = SVCPOOL_THREADSTARTING;
1067 				pool->sp_lastcreatetime = time_uptime;
1068 				mtx_unlock(&pool->sp_lock);
1069 				svc_new_thread(pool);
1070 				mtx_lock(&pool->sp_lock);
1071 			}
1072 			continue;
1073 		}
1074 
1075 		if (xprt) {
1076 			/*
1077 			 * Drain the transport socket and queue up any
1078 			 * RPCs.
1079 			 */
1080 			xprt->xp_lastactive = time_uptime;
1081 			stat = XPRT_IDLE;
1082 			do {
1083 				if (!svc_request_space_available(pool))
1084 					break;
1085 				rqstp = NULL;
1086 				mtx_unlock(&pool->sp_lock);
1087 				stat = svc_getreq(xprt, &rqstp);
1088 				mtx_lock(&pool->sp_lock);
1089 				if (rqstp) {
1090 					/*
1091 					 * See if the application has
1092 					 * a preference for some other
1093 					 * thread.
1094 					 */
1095 					stpref = st;
1096 					if (pool->sp_assign)
1097 						stpref = pool->sp_assign(st,
1098 						    rqstp);
1099 
1100 					pool->sp_space_used +=
1101 						rqstp->rq_size;
1102 					if (pool->sp_space_used
1103 					    > pool->sp_space_used_highest)
1104 						pool->sp_space_used_highest =
1105 							pool->sp_space_used;
1106 					rqstp->rq_thread = stpref;
1107 					STAILQ_INSERT_TAIL(&stpref->st_reqs,
1108 					    rqstp, rq_link);
1109 					stpref->st_reqcount++;
1110 
1111 					/*
1112 					 * If we assigned the request
1113 					 * to another thread, make
1114 					 * sure its awake and continue
1115 					 * reading from the
1116 					 * socket. Otherwise, try to
1117 					 * find some other thread to
1118 					 * read from the socket and
1119 					 * execute the request
1120 					 * immediately.
1121 					 */
1122 					if (stpref != st) {
1123 						cv_signal(&stpref->st_cond);
1124 						continue;
1125 					} else {
1126 						break;
1127 					}
1128 				}
1129 			} while (stat == XPRT_MOREREQS
1130 			    && pool->sp_state != SVCPOOL_CLOSING);
1131 
1132 			/*
1133 			 * Move this transport to the end of the
1134 			 * active list to ensure fairness when
1135 			 * multiple transports are active. If this was
1136 			 * the last queued request, svc_getreq will
1137 			 * end up calling xprt_inactive to remove from
1138 			 * the active list.
1139 			 */
1140 			xprt->xp_thread = NULL;
1141 			st->st_xprt = NULL;
1142 			if (xprt->xp_active) {
1143 				xprt_assignthread(xprt);
1144 				TAILQ_REMOVE(&pool->sp_active, xprt, xp_alink);
1145 				TAILQ_INSERT_TAIL(&pool->sp_active, xprt,
1146 				    xp_alink);
1147 			}
1148 			mtx_unlock(&pool->sp_lock);
1149 			SVC_RELEASE(xprt);
1150 			mtx_lock(&pool->sp_lock);
1151 		}
1152 
1153 		/*
1154 		 * Execute what we have queued.
1155 		 */
1156 		while ((rqstp = STAILQ_FIRST(&st->st_reqs)) != NULL) {
1157 			size_t sz = rqstp->rq_size;
1158 			mtx_unlock(&pool->sp_lock);
1159 			svc_executereq(rqstp);
1160 			mtx_lock(&pool->sp_lock);
1161 			pool->sp_space_used -= sz;
1162 		}
1163 	}
1164 
1165 	if (st->st_xprt) {
1166 		xprt = st->st_xprt;
1167 		st->st_xprt = NULL;
1168 		SVC_RELEASE(xprt);
1169 	}
1170 
1171 	KASSERT(STAILQ_EMPTY(&st->st_reqs), ("stray reqs on exit"));
1172 	LIST_REMOVE(st, st_link);
1173 	pool->sp_threadcount--;
1174 
1175 	mtx_unlock(&pool->sp_lock);
1176 
1177 	cv_destroy(&st->st_cond);
1178 	mem_free(st, sizeof(*st));
1179 
1180 	if (!ismaster)
1181 		wakeup(pool);
1182 }
1183 
1184 static void
1185 svc_thread_start(void *arg)
1186 {
1187 
1188 	svc_run_internal((SVCPOOL *) arg, FALSE);
1189 	kthread_exit();
1190 }
1191 
1192 static void
1193 svc_new_thread(SVCPOOL *pool)
1194 {
1195 	struct thread *td;
1196 
1197 	pool->sp_threadcount++;
1198 	kthread_add(svc_thread_start, pool,
1199 	    pool->sp_proc, &td, 0, 0,
1200 	    "%s: service", pool->sp_name);
1201 }
1202 
1203 void
1204 svc_run(SVCPOOL *pool)
1205 {
1206 	int i;
1207 	struct proc *p;
1208 	struct thread *td;
1209 
1210 	p = curproc;
1211 	td = curthread;
1212 	snprintf(td->td_name, sizeof(td->td_name),
1213 	    "%s: master", pool->sp_name);
1214 	pool->sp_state = SVCPOOL_ACTIVE;
1215 	pool->sp_proc = p;
1216 	pool->sp_lastcreatetime = time_uptime;
1217 	pool->sp_threadcount = 1;
1218 
1219 	for (i = 1; i < pool->sp_minthreads; i++) {
1220 		svc_new_thread(pool);
1221 	}
1222 
1223 	svc_run_internal(pool, TRUE);
1224 
1225 	mtx_lock(&pool->sp_lock);
1226 	while (pool->sp_threadcount > 0)
1227 		msleep(pool, &pool->sp_lock, 0, "svcexit", 0);
1228 	mtx_unlock(&pool->sp_lock);
1229 }
1230 
1231 void
1232 svc_exit(SVCPOOL *pool)
1233 {
1234 	SVCTHREAD *st;
1235 
1236 	mtx_lock(&pool->sp_lock);
1237 
1238 	pool->sp_state = SVCPOOL_CLOSING;
1239 	LIST_FOREACH(st, &pool->sp_idlethreads, st_ilink)
1240 		cv_signal(&st->st_cond);
1241 
1242 	mtx_unlock(&pool->sp_lock);
1243 }
1244 
1245 bool_t
1246 svc_getargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1247 {
1248 	struct mbuf *m;
1249 	XDR xdrs;
1250 	bool_t stat;
1251 
1252 	m = rqstp->rq_args;
1253 	rqstp->rq_args = NULL;
1254 
1255 	xdrmbuf_create(&xdrs, m, XDR_DECODE);
1256 	stat = xargs(&xdrs, args);
1257 	XDR_DESTROY(&xdrs);
1258 
1259 	return (stat);
1260 }
1261 
1262 bool_t
1263 svc_freeargs(struct svc_req *rqstp, xdrproc_t xargs, void *args)
1264 {
1265 	XDR xdrs;
1266 
1267 	if (rqstp->rq_addr) {
1268 		free(rqstp->rq_addr, M_SONAME);
1269 		rqstp->rq_addr = NULL;
1270 	}
1271 
1272 	xdrs.x_op = XDR_FREE;
1273 	return (xargs(&xdrs, args));
1274 }
1275 
1276 void
1277 svc_freereq(struct svc_req *rqstp)
1278 {
1279 	SVCTHREAD *st;
1280 	SVCXPRT *xprt;
1281 	SVCPOOL *pool;
1282 
1283 	st = rqstp->rq_thread;
1284 	xprt = rqstp->rq_xprt;
1285 	if (xprt)
1286 		pool = xprt->xp_pool;
1287 	else
1288 		pool = NULL;
1289 	if (st) {
1290 		mtx_lock(&pool->sp_lock);
1291 		KASSERT(rqstp == STAILQ_FIRST(&st->st_reqs),
1292 		    ("Freeing request out of order"));
1293 		STAILQ_REMOVE_HEAD(&st->st_reqs, rq_link);
1294 		st->st_reqcount--;
1295 		if (pool->sp_done)
1296 			pool->sp_done(st, rqstp);
1297 		mtx_unlock(&pool->sp_lock);
1298 	}
1299 
1300 	if (rqstp->rq_auth.svc_ah_ops)
1301 		SVCAUTH_RELEASE(&rqstp->rq_auth);
1302 
1303 	if (rqstp->rq_xprt) {
1304 		SVC_RELEASE(rqstp->rq_xprt);
1305 	}
1306 
1307 	if (rqstp->rq_addr)
1308 		free(rqstp->rq_addr, M_SONAME);
1309 
1310 	if (rqstp->rq_args)
1311 		m_freem(rqstp->rq_args);
1312 
1313 	free(rqstp, M_RPC);
1314 }
1315