xref: /freebsd/sys/rpc/clnt_vc.c (revision 2be1a816)
1 /*	$NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $	*/
2 
3 /*
4  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
5  * unrestricted use provided that this legend is included on all tape
6  * media and as a part of the software program in whole or part.  Users
7  * may copy or modify Sun RPC without charge, but are not authorized
8  * to license or distribute it to anyone else except as part of a product or
9  * program developed by the user.
10  *
11  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
12  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
13  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
14  *
15  * Sun RPC is provided with no support and without any obligation on the
16  * part of Sun Microsystems, Inc. to assist in its use, correction,
17  * modification or enhancement.
18  *
19  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
20  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
21  * OR ANY PART THEREOF.
22  *
23  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
24  * or profits or other special, indirect and consequential damages, even if
25  * Sun has been advised of the possibility of such damages.
26  *
27  * Sun Microsystems, Inc.
28  * 2550 Garcia Avenue
29  * Mountain View, California  94043
30  */
31 
32 #if defined(LIBC_SCCS) && !defined(lint)
33 static char *sccsid2 = "@(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro";
34 static char *sccsid = "@(#)clnt_tcp.c	2.2 88/08/01 4.0 RPCSRC";
35 static char sccsid3[] = "@(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro";
36 #endif
37 #include <sys/cdefs.h>
38 __FBSDID("$FreeBSD$");
39 
40 /*
41  * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
42  *
43  * Copyright (C) 1984, Sun Microsystems, Inc.
44  *
45  * TCP based RPC supports 'batched calls'.
46  * A sequence of calls may be batched-up in a send buffer.  The rpc call
47  * return immediately to the client even though the call was not necessarily
48  * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
49  * the rpc timeout value is zero (see clnt.h, rpc).
50  *
51  * Clients should NOT casually batch calls that in fact return results; that is,
52  * the server side should be aware that a call is batched and not produce any
53  * return message.  Batched calls that produce many result messages can
54  * deadlock (netlock) the client and the server....
55  *
56  * Now go hang yourself.
57  */
58 
59 #include <sys/param.h>
60 #include <sys/systm.h>
61 #include <sys/lock.h>
62 #include <sys/malloc.h>
63 #include <sys/mbuf.h>
64 #include <sys/mutex.h>
65 #include <sys/pcpu.h>
66 #include <sys/proc.h>
67 #include <sys/socket.h>
68 #include <sys/socketvar.h>
69 #include <sys/syslog.h>
70 #include <sys/time.h>
71 #include <sys/uio.h>
72 
73 #include <rpc/rpc.h>
74 #include <rpc/rpc_com.h>
75 
76 #define MCALL_MSG_SIZE 24
77 
78 struct cmessage {
79         struct cmsghdr cmsg;
80         struct cmsgcred cmcred;
81 };
82 
83 static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
84     xdrproc_t, void *, struct timeval);
85 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
86 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
87 static void clnt_vc_abort(CLIENT *);
88 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
89 static void clnt_vc_destroy(CLIENT *);
90 static bool_t time_not_ok(struct timeval *);
91 static void clnt_vc_soupcall(struct socket *so, void *arg, int waitflag);
92 
93 static struct clnt_ops clnt_vc_ops = {
94 	.cl_call =	clnt_vc_call,
95 	.cl_abort =	clnt_vc_abort,
96 	.cl_geterr =	clnt_vc_geterr,
97 	.cl_freeres =	clnt_vc_freeres,
98 	.cl_destroy =	clnt_vc_destroy,
99 	.cl_control =	clnt_vc_control
100 };
101 
102 /*
103  * A pending RPC request which awaits a reply.
104  */
105 struct ct_request {
106 	TAILQ_ENTRY(ct_request) cr_link;
107 	uint32_t		cr_xid;		/* XID of request */
108 	struct mbuf		*cr_mrep;	/* reply received by upcall */
109 	int			cr_error;	/* any error from upcall */
110 };
111 
112 TAILQ_HEAD(ct_request_list, ct_request);
113 
114 struct ct_data {
115 	struct mtx	ct_lock;
116 	struct socket	*ct_socket;	/* connection socket */
117 	bool_t		ct_closeit;	/* close it on destroy */
118 	struct timeval	ct_wait;	/* wait interval in milliseconds */
119 	struct sockaddr_storage	ct_addr; /* remote addr */
120 	struct rpc_err	ct_error;
121 	uint32_t	ct_xid;
122 	char		ct_mcallc[MCALL_MSG_SIZE]; /* marshalled callmsg */
123 	size_t		ct_mpos;	/* pos after marshal */
124 	const char	*ct_waitchan;
125 	int		ct_waitflag;
126 	struct mbuf	*ct_record;	/* current reply record */
127 	size_t		ct_record_resid; /* how much left of reply to read */
128 	bool_t		ct_record_eor;	 /* true if reading last fragment */
129 	struct ct_request_list ct_pending;
130 };
131 
132 static const char clnt_vc_errstr[] = "%s : %s";
133 static const char clnt_vc_str[] = "clnt_vc_create";
134 static const char clnt_read_vc_str[] = "read_vc";
135 static const char __no_mem_str[] = "out of memory";
136 
137 /*
138  * Create a client handle for a connection.
139  * Default options are set, which the user can change using clnt_control()'s.
140  * The rpc/vc package does buffering similar to stdio, so the client
141  * must pick send and receive buffer sizes, 0 => use the default.
142  * NB: fd is copied into a private area.
143  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
144  * set this something more useful.
145  *
146  * fd should be an open socket
147  */
148 CLIENT *
149 clnt_vc_create(
150 	struct socket *so,		/* open file descriptor */
151 	struct sockaddr *raddr,		/* servers address */
152 	const rpcprog_t prog,		/* program number */
153 	const rpcvers_t vers,		/* version number */
154 	size_t sendsz,			/* buffer recv size */
155 	size_t recvsz)			/* buffer send size */
156 {
157 	CLIENT *cl;			/* client handle */
158 	struct ct_data *ct = NULL;	/* client handle */
159 	struct timeval now;
160 	struct rpc_msg call_msg;
161 	static uint32_t disrupt;
162 	struct __rpc_sockinfo si;
163 	XDR xdrs;
164 	int error;
165 
166 	if (disrupt == 0)
167 		disrupt = (uint32_t)(long)raddr;
168 
169 	cl = (CLIENT *)mem_alloc(sizeof (*cl));
170 	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
171 
172 	mtx_init(&ct->ct_lock, "ct->ct_lock", NULL, MTX_DEF);
173 
174 	if ((so->so_state & (SS_ISCONNECTED|SS_ISCONFIRMING)) == 0) {
175 		error = soconnect(so, raddr, curthread);
176 		if (error) {
177 			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
178 			rpc_createerr.cf_error.re_errno = error;
179 			goto err;
180 		}
181 	}
182 
183 	if (!__rpc_socket2sockinfo(so, &si))
184 		goto err;
185 
186 	ct->ct_closeit = FALSE;
187 
188 	/*
189 	 * Set up private data struct
190 	 */
191 	ct->ct_socket = so;
192 	ct->ct_wait.tv_sec = -1;
193 	ct->ct_wait.tv_usec = -1;
194 	memcpy(&ct->ct_addr, raddr, raddr->sa_len);
195 
196 	/*
197 	 * Initialize call message
198 	 */
199 	getmicrotime(&now);
200 	ct->ct_xid = ((uint32_t)++disrupt) ^ __RPC_GETXID(&now);
201 	call_msg.rm_xid = ct->ct_xid;
202 	call_msg.rm_direction = CALL;
203 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
204 	call_msg.rm_call.cb_prog = (uint32_t)prog;
205 	call_msg.rm_call.cb_vers = (uint32_t)vers;
206 
207 	/*
208 	 * pre-serialize the static part of the call msg and stash it away
209 	 */
210 	xdrmem_create(&xdrs, ct->ct_mcallc, MCALL_MSG_SIZE,
211 	    XDR_ENCODE);
212 	if (! xdr_callhdr(&xdrs, &call_msg)) {
213 		if (ct->ct_closeit) {
214 			soclose(ct->ct_socket);
215 		}
216 		goto err;
217 	}
218 	ct->ct_mpos = XDR_GETPOS(&xdrs);
219 	XDR_DESTROY(&xdrs);
220 	ct->ct_waitchan = "rpcrecv";
221 	ct->ct_waitflag = 0;
222 
223 	/*
224 	 * Create a client handle which uses xdrrec for serialization
225 	 * and authnone for authentication.
226 	 */
227 	cl->cl_ops = &clnt_vc_ops;
228 	cl->cl_private = ct;
229 	cl->cl_auth = authnone_create();
230 	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
231 	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
232 
233 	SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
234 	ct->ct_socket->so_upcallarg = ct;
235 	ct->ct_socket->so_upcall = clnt_vc_soupcall;
236 	ct->ct_socket->so_rcv.sb_flags |= SB_UPCALL;
237 	SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
238 
239 	ct->ct_record = NULL;
240 	ct->ct_record_resid = 0;
241 	TAILQ_INIT(&ct->ct_pending);
242 	return (cl);
243 
244 err:
245 	if (cl) {
246 		if (ct) {
247 			mem_free(ct, sizeof (struct ct_data));
248 		}
249 		if (cl)
250 			mem_free(cl, sizeof (CLIENT));
251 	}
252 	return ((CLIENT *)NULL);
253 }
254 
255 static enum clnt_stat
256 clnt_vc_call(
257 	CLIENT *cl,
258 	rpcproc_t proc,
259 	xdrproc_t xdr_args,
260 	void *args_ptr,
261 	xdrproc_t xdr_results,
262 	void *results_ptr,
263 	struct timeval utimeout)
264 {
265 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
266 	XDR xdrs;
267 	struct rpc_msg reply_msg;
268 	bool_t ok;
269 	int nrefreshes = 2;		/* number of times to refresh cred */
270 	struct timeval timeout;
271 	uint32_t xid;
272 	struct mbuf *mreq = NULL;
273 	struct ct_request cr;
274 	int error;
275 
276 	mtx_lock(&ct->ct_lock);
277 
278 	cr.cr_mrep = NULL;
279 	cr.cr_error = 0;
280 
281 	if (ct->ct_wait.tv_usec == -1) {
282 		timeout = utimeout;	/* use supplied timeout */
283 	} else {
284 		timeout = ct->ct_wait;	/* use default timeout */
285 	}
286 
287 call_again:
288 	mtx_assert(&ct->ct_lock, MA_OWNED);
289 
290 	ct->ct_xid++;
291 	xid = ct->ct_xid;
292 
293 	mtx_unlock(&ct->ct_lock);
294 
295 	/*
296 	 * Leave space to pre-pend the record mark.
297 	 */
298 	MGETHDR(mreq, M_WAIT, MT_DATA);
299 	MCLGET(mreq, M_WAIT);
300 	mreq->m_len = 0;
301 	mreq->m_data += sizeof(uint32_t);
302 	m_append(mreq, ct->ct_mpos, ct->ct_mcallc);
303 
304 	/*
305 	 * The XID is the first thing in the request.
306 	 */
307 	*mtod(mreq, uint32_t *) = htonl(xid);
308 
309 	xdrmbuf_create(&xdrs, mreq, XDR_ENCODE);
310 
311 	ct->ct_error.re_status = RPC_SUCCESS;
312 
313 	if ((! XDR_PUTINT32(&xdrs, &proc)) ||
314 	    (! AUTH_MARSHALL(cl->cl_auth, &xdrs)) ||
315 	    (! (*xdr_args)(&xdrs, args_ptr))) {
316 		if (ct->ct_error.re_status == RPC_SUCCESS)
317 			ct->ct_error.re_status = RPC_CANTENCODEARGS;
318 		m_freem(mreq);
319 		return (ct->ct_error.re_status);
320 	}
321 	m_fixhdr(mreq);
322 
323 	/*
324 	 * Prepend a record marker containing the packet length.
325 	 */
326 	M_PREPEND(mreq, sizeof(uint32_t), M_WAIT);
327 	*mtod(mreq, uint32_t *) =
328 		htonl(0x80000000 | (mreq->m_pkthdr.len - sizeof(uint32_t)));
329 
330 	cr.cr_xid = xid;
331 	mtx_lock(&ct->ct_lock);
332 	TAILQ_INSERT_TAIL(&ct->ct_pending, &cr, cr_link);
333 	mtx_unlock(&ct->ct_lock);
334 
335 	/*
336 	 * sosend consumes mreq.
337 	 */
338 	error = sosend(ct->ct_socket, NULL, NULL, mreq, NULL, 0, curthread);
339 	mreq = NULL;
340 
341 	reply_msg.acpted_rply.ar_verf = _null_auth;
342 	reply_msg.acpted_rply.ar_results.where = results_ptr;
343 	reply_msg.acpted_rply.ar_results.proc = xdr_results;
344 
345 	mtx_lock(&ct->ct_lock);
346 
347 	if (error) {
348 		TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
349 
350 		ct->ct_error.re_errno = error;
351 		ct->ct_error.re_status = RPC_CANTSEND;
352 		goto out;
353 	}
354 
355 	/*
356 	 * Check to see if we got an upcall while waiting for the
357 	 * lock. In both these cases, the request has been removed
358 	 * from ct->ct_pending.
359 	 */
360 	if (cr.cr_error) {
361 		ct->ct_error.re_errno = cr.cr_error;
362 		ct->ct_error.re_status = RPC_CANTRECV;
363 		goto out;
364 	}
365 	if (cr.cr_mrep) {
366 		goto got_reply;
367 	}
368 
369 	/*
370 	 * Hack to provide rpc-based message passing
371 	 */
372 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
373 		if (cr.cr_xid)
374 			TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
375 		ct->ct_error.re_status = RPC_TIMEDOUT;
376 		goto out;
377 	}
378 
379 	error = msleep(&cr, &ct->ct_lock, ct->ct_waitflag, ct->ct_waitchan,
380 	    tvtohz(&timeout));
381 
382 	if (error) {
383 		/*
384 		 * The sleep returned an error so our request is still
385 		 * on the list. Turn the error code into an
386 		 * appropriate client status.
387 		 */
388 		if (cr.cr_xid)
389 			TAILQ_REMOVE(&ct->ct_pending, &cr, cr_link);
390 		ct->ct_error.re_errno = error;
391 		switch (error) {
392 		case EINTR:
393 			ct->ct_error.re_status = RPC_INTR;
394 			break;
395 		case EWOULDBLOCK:
396 			ct->ct_error.re_status = RPC_TIMEDOUT;
397 			break;
398 		default:
399 			ct->ct_error.re_status = RPC_CANTRECV;
400 		}
401 		goto out;
402 	} else {
403 		/*
404 		 * We were woken up by the upcall.  If the
405 		 * upcall had a receive error, report that,
406 		 * otherwise we have a reply.
407 		 */
408 		if (cr.cr_error) {
409 			ct->ct_error.re_errno = cr.cr_error;
410 			ct->ct_error.re_status = RPC_CANTRECV;
411 			goto out;
412 		}
413 	}
414 
415 got_reply:
416 	/*
417 	 * Now decode and validate the response. We need to drop the
418 	 * lock since xdr_replymsg may end up sleeping in malloc.
419 	 */
420 	mtx_unlock(&ct->ct_lock);
421 
422 	xdrmbuf_create(&xdrs, cr.cr_mrep, XDR_DECODE);
423 	ok = xdr_replymsg(&xdrs, &reply_msg);
424 	XDR_DESTROY(&xdrs);
425 	cr.cr_mrep = NULL;
426 
427 	mtx_lock(&ct->ct_lock);
428 
429 	if (ok) {
430 		if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
431 			(reply_msg.acpted_rply.ar_stat == SUCCESS))
432 			ct->ct_error.re_status = RPC_SUCCESS;
433 		else
434 			_seterr_reply(&reply_msg, &(ct->ct_error));
435 
436 		if (ct->ct_error.re_status == RPC_SUCCESS) {
437 			if (! AUTH_VALIDATE(cl->cl_auth,
438 					    &reply_msg.acpted_rply.ar_verf)) {
439 				ct->ct_error.re_status = RPC_AUTHERROR;
440 				ct->ct_error.re_why = AUTH_INVALIDRESP;
441 			}
442 			if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
443 				xdrs.x_op = XDR_FREE;
444 				(void) xdr_opaque_auth(&xdrs,
445 					&(reply_msg.acpted_rply.ar_verf));
446 			}
447 		}		/* end successful completion */
448 		/*
449 		 * If unsuccesful AND error is an authentication error
450 		 * then refresh credentials and try again, else break
451 		 */
452 		else if (ct->ct_error.re_status == RPC_AUTHERROR)
453 			/* maybe our credentials need to be refreshed ... */
454 			if (nrefreshes > 0 &&
455 			    AUTH_REFRESH(cl->cl_auth, &reply_msg)) {
456 				nrefreshes--;
457 				goto call_again;
458 			}
459 		/* end of unsuccessful completion */
460 	}	/* end of valid reply message */
461 	else {
462 		ct->ct_error.re_status = RPC_CANTDECODERES;
463 	}
464 out:
465 	mtx_assert(&ct->ct_lock, MA_OWNED);
466 
467 	if (mreq)
468 		m_freem(mreq);
469 	if (cr.cr_mrep)
470 		m_freem(cr.cr_mrep);
471 
472 	mtx_unlock(&ct->ct_lock);
473 	return (ct->ct_error.re_status);
474 }
475 
476 static void
477 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
478 {
479 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
480 
481 	*errp = ct->ct_error;
482 }
483 
484 static bool_t
485 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
486 {
487 	XDR xdrs;
488 	bool_t dummy;
489 
490 	xdrs.x_op = XDR_FREE;
491 	dummy = (*xdr_res)(&xdrs, res_ptr);
492 
493 	return (dummy);
494 }
495 
496 /*ARGSUSED*/
497 static void
498 clnt_vc_abort(CLIENT *cl)
499 {
500 }
501 
502 static bool_t
503 clnt_vc_control(CLIENT *cl, u_int request, void *info)
504 {
505 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
506 	void *infop = info;
507 
508 	mtx_lock(&ct->ct_lock);
509 
510 	switch (request) {
511 	case CLSET_FD_CLOSE:
512 		ct->ct_closeit = TRUE;
513 		mtx_unlock(&ct->ct_lock);
514 		return (TRUE);
515 	case CLSET_FD_NCLOSE:
516 		ct->ct_closeit = FALSE;
517 		mtx_unlock(&ct->ct_lock);
518 		return (TRUE);
519 	default:
520 		break;
521 	}
522 
523 	/* for other requests which use info */
524 	if (info == NULL) {
525 		mtx_unlock(&ct->ct_lock);
526 		return (FALSE);
527 	}
528 	switch (request) {
529 	case CLSET_TIMEOUT:
530 		if (time_not_ok((struct timeval *)info)) {
531 			mtx_unlock(&ct->ct_lock);
532 			return (FALSE);
533 		}
534 		ct->ct_wait = *(struct timeval *)infop;
535 		break;
536 	case CLGET_TIMEOUT:
537 		*(struct timeval *)infop = ct->ct_wait;
538 		break;
539 	case CLGET_SERVER_ADDR:
540 		(void) memcpy(info, &ct->ct_addr, (size_t)ct->ct_addr.ss_len);
541 		break;
542 	case CLGET_SVC_ADDR:
543 		/*
544 		 * Slightly different semantics to userland - we use
545 		 * sockaddr instead of netbuf.
546 		 */
547 		memcpy(info, &ct->ct_addr, ct->ct_addr.ss_len);
548 		break;
549 	case CLSET_SVC_ADDR:		/* set to new address */
550 		mtx_unlock(&ct->ct_lock);
551 		return (FALSE);
552 	case CLGET_XID:
553 		*(uint32_t *)info = ct->ct_xid;
554 		break;
555 	case CLSET_XID:
556 		/* This will set the xid of the NEXT call */
557 		/* decrement by 1 as clnt_vc_call() increments once */
558 		ct->ct_xid = *(uint32_t *)info - 1;
559 		break;
560 	case CLGET_VERS:
561 		/*
562 		 * This RELIES on the information that, in the call body,
563 		 * the version number field is the fifth field from the
564 		 * begining of the RPC header. MUST be changed if the
565 		 * call_struct is changed
566 		 */
567 		*(uint32_t *)info =
568 		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
569 		    4 * BYTES_PER_XDR_UNIT));
570 		break;
571 
572 	case CLSET_VERS:
573 		*(uint32_t *)(void *)(ct->ct_mcallc +
574 		    4 * BYTES_PER_XDR_UNIT) =
575 		    htonl(*(uint32_t *)info);
576 		break;
577 
578 	case CLGET_PROG:
579 		/*
580 		 * This RELIES on the information that, in the call body,
581 		 * the program number field is the fourth field from the
582 		 * begining of the RPC header. MUST be changed if the
583 		 * call_struct is changed
584 		 */
585 		*(uint32_t *)info =
586 		    ntohl(*(uint32_t *)(void *)(ct->ct_mcallc +
587 		    3 * BYTES_PER_XDR_UNIT));
588 		break;
589 
590 	case CLSET_PROG:
591 		*(uint32_t *)(void *)(ct->ct_mcallc +
592 		    3 * BYTES_PER_XDR_UNIT) =
593 		    htonl(*(uint32_t *)info);
594 		break;
595 
596 	case CLSET_WAITCHAN:
597 		ct->ct_waitchan = *(const char **)info;
598 		break;
599 
600 	case CLGET_WAITCHAN:
601 		*(const char **) info = ct->ct_waitchan;
602 		break;
603 
604 	case CLSET_INTERRUPTIBLE:
605 		if (*(int *) info)
606 			ct->ct_waitflag = PCATCH;
607 		else
608 			ct->ct_waitflag = 0;
609 		break;
610 
611 	case CLGET_INTERRUPTIBLE:
612 		if (ct->ct_waitflag)
613 			*(int *) info = TRUE;
614 		else
615 			*(int *) info = FALSE;
616 		break;
617 
618 	default:
619 		mtx_unlock(&ct->ct_lock);
620 		return (FALSE);
621 	}
622 
623 	mtx_unlock(&ct->ct_lock);
624 	return (TRUE);
625 }
626 
627 static void
628 clnt_vc_destroy(CLIENT *cl)
629 {
630 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
631 	struct socket *so = NULL;
632 
633 	mtx_lock(&ct->ct_lock);
634 
635 	if (ct->ct_socket) {
636 		SOCKBUF_LOCK(&ct->ct_socket->so_rcv);
637 		ct->ct_socket->so_upcallarg = NULL;
638 		ct->ct_socket->so_upcall = NULL;
639 		ct->ct_socket->so_rcv.sb_flags &= ~SB_UPCALL;
640 		SOCKBUF_UNLOCK(&ct->ct_socket->so_rcv);
641 
642 		KASSERT(!TAILQ_FIRST(&ct->ct_pending),
643 		    ("Destroying RPC client with pending RPC requests"));
644 
645 		if (ct->ct_closeit) {
646 			so = ct->ct_socket;
647 		}
648 	}
649 
650 	mtx_unlock(&ct->ct_lock);
651 
652 	mtx_destroy(&ct->ct_lock);
653 	if (so) {
654 		soshutdown(so, SHUT_WR);
655 		soclose(so);
656 	}
657 	mem_free(ct, sizeof(struct ct_data));
658 	mem_free(cl, sizeof(CLIENT));
659 }
660 
661 /*
662  * Make sure that the time is not garbage.   -1 value is disallowed.
663  * Note this is different from time_not_ok in clnt_dg.c
664  */
665 static bool_t
666 time_not_ok(struct timeval *t)
667 {
668 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
669 		t->tv_usec <= -1 || t->tv_usec > 1000000);
670 }
671 
672 void
673 clnt_vc_soupcall(struct socket *so, void *arg, int waitflag)
674 {
675 	struct ct_data *ct = (struct ct_data *) arg;
676 	struct uio uio;
677 	struct mbuf *m;
678 	struct ct_request *cr;
679 	int error, rcvflag, foundreq;
680 	uint32_t xid, header;
681 
682 	uio.uio_td = curthread;
683 	do {
684 		/*
685 		 * If ct_record_resid is zero, we are waiting for a
686 		 * record mark.
687 		 */
688 		if (ct->ct_record_resid == 0) {
689 			bool_t do_read;
690 
691 			/*
692 			 * Make sure there is either a whole record
693 			 * mark in the buffer or there is some other
694 			 * error condition
695 			 */
696 			do_read = FALSE;
697 			SOCKBUF_LOCK(&so->so_rcv);
698 			if (so->so_rcv.sb_cc >= sizeof(uint32_t)
699 			    || (so->so_rcv.sb_state & SBS_CANTRCVMORE)
700 			    || so->so_error)
701 				do_read = TRUE;
702 			SOCKBUF_UNLOCK(&so->so_rcv);
703 
704 			if (!do_read)
705 				return;
706 
707 			uio.uio_resid = sizeof(uint32_t);
708 			m = NULL;
709 			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
710 			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
711 
712 			if (error == EWOULDBLOCK)
713 				break;
714 
715 			/*
716 			 * If there was an error, wake up all pending
717 			 * requests.
718 			 */
719 			if (error || uio.uio_resid > 0) {
720 			wakeup_all:
721 				mtx_lock(&ct->ct_lock);
722 				if (!error) {
723 					/*
724 					 * We must have got EOF trying
725 					 * to read from the stream.
726 					 */
727 					error = ECONNRESET;
728 				}
729 				ct->ct_error.re_status = RPC_CANTRECV;
730 				ct->ct_error.re_errno = error;
731 				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
732 					cr->cr_error = error;
733 					wakeup(cr);
734 				}
735 				TAILQ_INIT(&ct->ct_pending);
736 				mtx_unlock(&ct->ct_lock);
737 				break;
738 			}
739 			memcpy(&header, mtod(m, uint32_t *), sizeof(uint32_t));
740 			header = ntohl(header);
741 			ct->ct_record = NULL;
742 			ct->ct_record_resid = header & 0x7fffffff;
743 			ct->ct_record_eor = ((header & 0x80000000) != 0);
744 			m_freem(m);
745 		} else {
746 			/*
747 			 * We have the record mark. Read as much as
748 			 * the socket has buffered up to the end of
749 			 * this record.
750 			 */
751 			uio.uio_resid = ct->ct_record_resid;
752 			m = NULL;
753 			rcvflag = MSG_DONTWAIT | MSG_SOCALLBCK;
754 			error = soreceive(so, NULL, &uio, &m, NULL, &rcvflag);
755 
756 			if (error == EWOULDBLOCK)
757 				break;
758 
759 			if (error || uio.uio_resid == ct->ct_record_resid)
760 				goto wakeup_all;
761 
762 			/*
763 			 * If we have part of the record already,
764 			 * chain this bit onto the end.
765 			 */
766 			if (ct->ct_record)
767 				m_last(ct->ct_record)->m_next = m;
768 			else
769 				ct->ct_record = m;
770 
771 			ct->ct_record_resid = uio.uio_resid;
772 
773 			/*
774 			 * If we have the entire record, see if we can
775 			 * match it to a request.
776 			 */
777 			if (ct->ct_record_resid == 0
778 			    && ct->ct_record_eor) {
779 				/*
780 				 * The XID is in the first uint32_t of
781 				 * the reply.
782 				 */
783 				ct->ct_record =
784 					m_pullup(ct->ct_record, sizeof(xid));
785 				if (!ct->ct_record)
786 					break;
787 				memcpy(&xid,
788 				    mtod(ct->ct_record, uint32_t *),
789 				    sizeof(uint32_t));
790 				xid = ntohl(xid);
791 
792 				mtx_lock(&ct->ct_lock);
793 				foundreq = 0;
794 				TAILQ_FOREACH(cr, &ct->ct_pending, cr_link) {
795 					if (cr->cr_xid == xid) {
796 						/*
797 						 * This one
798 						 * matches. We snip it
799 						 * out of the pending
800 						 * list and leave the
801 						 * reply mbuf in
802 						 * cr->cr_mrep. Set
803 						 * the XID to zero so
804 						 * that clnt_vc_call
805 						 * can know not to
806 						 * repeat the
807 						 * TAILQ_REMOVE.
808 						 */
809 						TAILQ_REMOVE(&ct->ct_pending,
810 						    cr, cr_link);
811 						cr->cr_xid = 0;
812 						cr->cr_mrep = ct->ct_record;
813 						cr->cr_error = 0;
814 						foundreq = 1;
815 						wakeup(cr);
816 						break;
817 					}
818 				}
819 				mtx_unlock(&ct->ct_lock);
820 
821 				if (!foundreq)
822 					m_freem(ct->ct_record);
823 				ct->ct_record = NULL;
824 			}
825 		}
826 	} while (m);
827 }
828