xref: /dragonfly/lib/libc/rpc/clnt_vc.c (revision cfd1aba3)
1 /*
2  * Sun RPC is a product of Sun Microsystems, Inc. and is provided for
3  * unrestricted use provided that this legend is included on all tape
4  * media and as a part of the software program in whole or part.  Users
5  * may copy or modify Sun RPC without charge, but are not authorized
6  * to license or distribute it to anyone else except as part of a product or
7  * program developed by the user.
8  *
9  * SUN RPC IS PROVIDED AS IS WITH NO WARRANTIES OF ANY KIND INCLUDING THE
10  * WARRANTIES OF DESIGN, MERCHANTIBILITY AND FITNESS FOR A PARTICULAR
11  * PURPOSE, OR ARISING FROM A COURSE OF DEALING, USAGE OR TRADE PRACTICE.
12  *
13  * Sun RPC is provided with no support and without any obligation on the
14  * part of Sun Microsystems, Inc. to assist in its use, correction,
15  * modification or enhancement.
16  *
17  * SUN MICROSYSTEMS, INC. SHALL HAVE NO LIABILITY WITH RESPECT TO THE
18  * INFRINGEMENT OF COPYRIGHTS, TRADE SECRETS OR ANY PATENTS BY SUN RPC
19  * OR ANY PART THEREOF.
20  *
21  * In no event will Sun Microsystems, Inc. be liable for any lost revenue
22  * or profits or other special, indirect and consequential damages, even if
23  * Sun has been advised of the possibility of such damages.
24  *
25  * Sun Microsystems, Inc.
26  * 2550 Garcia Avenue
27  * Mountain View, California  94043
28  * @(#)clnt_tcp.c 1.37 87/10/05 Copyr 1984 Sun Micro;	2.2 88/08/01 4.0 RPCSRC
29  * @(#)clnt_vc.c 1.19 89/03/16 Copyr 1988 Sun Micro
30  * $NetBSD: clnt_vc.c,v 1.4 2000/07/14 08:40:42 fvdl Exp $
31  * $FreeBSD: src/lib/libc/rpc/clnt_vc.c,v 1.20 2006/09/09 22:18:57 mbr Exp $
32  */
33 
34 /*
35  * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
36  *
37  * Copyright (C) 1984, Sun Microsystems, Inc.
38  *
39  * TCP based RPC supports 'batched calls'.
40  * A sequence of calls may be batched-up in a send buffer.  The rpc call
41  * return immediately to the client even though the call was not necessarily
42  * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
43  * the rpc timeout value is zero (see clnt.h, rpc).
44  *
45  * Clients should NOT casually batch calls that in fact return results; that is,
46  * the server side should be aware that a call is batched and not produce any
47  * return message.  Batched calls that produce many result messages can
48  * deadlock (netlock) the client and the server....
49  *
50  * Now go hang yourself.
51  */
52 
53 #include "namespace.h"
54 #include "reentrant.h"
55 #include <sys/types.h>
56 #include <sys/poll.h>
57 #include <sys/syslog.h>
58 #include <sys/socket.h>
59 #include <sys/un.h>
60 #include <sys/uio.h>
61 
62 #include <arpa/inet.h>
63 #include <assert.h>
64 #include <err.h>
65 #include <errno.h>
66 #include <netdb.h>
67 #include <stdio.h>
68 #include <stdlib.h>
69 #include <string.h>
70 #include <unistd.h>
71 #include <signal.h>
72 
73 #include <rpc/rpc.h>
74 #include "un-namespace.h"
75 #include "rpc_com.h"
76 #include "mt_misc.h"
77 
78 #define MCALL_MSG_SIZE 24
79 
80 struct cmessage {
81         struct cmsghdr cmsg;
82         struct cmsgcred cmcred;
83 };
84 
85 static void		 clnt_vc_abort(CLIENT *);
86 static enum clnt_stat	 clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
87 				      xdrproc_t, void *, struct timeval);
88 static bool_t		 clnt_vc_control(CLIENT *, u_int, void *);
89 static void		 clnt_vc_destroy(CLIENT *);
90 static bool_t		 clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
91 static void		 clnt_vc_geterr(CLIENT *, struct rpc_err *);
92 static struct clnt_ops	*clnt_vc_ops(void);
93 static int		 __msgread(int, void *, size_t);
94 static int		 __msgwrite(int, void *, size_t);
95 static int		 read_vc(void *, void *, int);
96 static bool_t		 time_not_ok(struct timeval *);
97 static int		 write_vc(void *, void *, int);
98 
99 struct ct_data {
100 	int		ct_fd;		/* connection's fd */
101 	bool_t		ct_closeit;	/* close it on destroy */
102 	struct timeval	ct_wait;	/* wait interval in milliseconds */
103 	bool_t          ct_waitset;	/* wait set by clnt_control? */
104 	struct netbuf	ct_addr;	/* remote addr */
105 	struct rpc_err	ct_error;
106 	union {
107 		char	ct_mcallc[MCALL_MSG_SIZE];	/* marshalled callmsg */
108 		u_int32_t ct_mcalli;
109 	} ct_u;
110 	u_int		ct_mpos;	/* pos after marshal */
111 	XDR		ct_xdrs;	/* XDR stream */
112 };
113 
114 /*
115  *      This machinery implements per-fd locks for MT-safety.  It is not
116  *      sufficient to do per-CLIENT handle locks for MT-safety because a
117  *      user may create more than one CLIENT handle with the same fd behind
118  *      it.  Therfore, we allocate an array of flags (vc_fd_locks), protected
119  *      by the clnt_fd_lock mutex, and an array (vc_cv) of condition variables
120  *      similarly protected.  Vc_fd_lock[fd] == 1 => a call is activte on some
121  *      CLIENT handle created for that fd.
122  *      The current implementation holds locks across the entire RPC and reply.
123  *      Yes, this is silly, and as soon as this code is proven to work, this
124  *      should be the first thing fixed.  One step at a time.
125  */
126 static int      *vc_fd_locks;
127 static cond_t   *vc_cv;
128 #define release_fd_lock(fd, mask) {	\
129 	mutex_lock(&clnt_fd_lock);	\
130 	vc_fd_locks[fd] = 0;		\
131 	mutex_unlock(&clnt_fd_lock);	\
132 	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);	\
133 	cond_signal(&vc_cv[fd]);	\
134 }
135 
136 static const char clnt_vc_errstr[] = "%s : %s";
137 static const char clnt_vc_str[] = "clnt_vc_create";
138 static const char clnt_read_vc_str[] = "read_vc";
139 static const char __no_mem_str[] = "out of memory";
140 
141 /*
142  * Create a client handle for a connection.
143  * Default options are set, which the user can change using clnt_control()'s.
144  * The rpc/vc package does buffering similar to stdio, so the client
145  * must pick send and receive buffer sizes, 0 => use the default.
146  * NB: fd is copied into a private area.
147  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
148  * set this something more useful.
149  *
150  * fd should be an open socket
151  */
152 CLIENT *
153 clnt_vc_create(int fd,			/* open file descriptor */
154 	const struct netbuf *raddr,	/* servers address */
155 	const rpcprog_t prog,		/* program number */
156 	const rpcvers_t vers,		/* version number */
157 	u_int sendsz,			/* buffer recv size */
158 	u_int recvsz)			/* buffer send size */
159 {
160 	CLIENT *cl;			/* client handle */
161 	struct ct_data *ct = NULL;	/* client handle */
162 	struct timeval now;
163 	struct rpc_msg call_msg;
164 	static u_int32_t disrupt;
165 	sigset_t mask;
166 	sigset_t newmask;
167 	struct sockaddr_storage ss;
168 	socklen_t slen;
169 	struct __rpc_sockinfo si;
170 
171 	if (disrupt == 0)
172 		disrupt = (u_int32_t)(long)raddr;
173 
174 	cl = (CLIENT *)mem_alloc(sizeof (*cl));
175 	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
176 	if ((cl == NULL) || (ct == NULL)) {
177 		syslog(LOG_ERR, clnt_vc_errstr, clnt_vc_str, __no_mem_str);
178 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
179 		rpc_createerr.cf_error.re_errno = errno;
180 		goto err;
181 	}
182 	ct->ct_addr.buf = NULL;
183 	sigfillset(&newmask);
184 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
185 	mutex_lock(&clnt_fd_lock);
186 	if (vc_fd_locks == NULL) {
187 		int cv_allocsz, fd_allocsz;
188 		int dtbsize = __rpc_dtbsize();
189 
190 		fd_allocsz = dtbsize * sizeof (int);
191 		vc_fd_locks = (int *) mem_alloc(fd_allocsz);
192 		if (vc_fd_locks == NULL) {
193 			mutex_unlock(&clnt_fd_lock);
194 			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
195 			goto err;
196 		} else
197 			memset(vc_fd_locks, '\0', fd_allocsz);
198 
199 		assert(vc_cv == NULL);
200 		cv_allocsz = dtbsize * sizeof (cond_t);
201 		vc_cv = (cond_t *) mem_alloc(cv_allocsz);
202 		if (vc_cv == NULL) {
203 			mem_free(vc_fd_locks, fd_allocsz);
204 			vc_fd_locks = NULL;
205 			mutex_unlock(&clnt_fd_lock);
206 			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
207 			goto err;
208 		} else {
209 			int i;
210 
211 			for (i = 0; i < dtbsize; i++)
212 				cond_init(&vc_cv[i], 0, NULL);
213 		}
214 	} else
215 		assert(vc_cv != NULL);
216 
217 	/*
218 	 * XXX - fvdl connecting while holding a mutex?
219 	 */
220 	slen = sizeof ss;
221 	if (_getpeername(fd, (struct sockaddr *)(void *)&ss, &slen) < 0) {
222 		if (errno != ENOTCONN) {
223 			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
224 			rpc_createerr.cf_error.re_errno = errno;
225 			mutex_unlock(&clnt_fd_lock);
226 			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
227 			goto err;
228 		}
229 		if (_connect(fd, (struct sockaddr *)raddr->buf, raddr->len) < 0){
230 			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
231 			rpc_createerr.cf_error.re_errno = errno;
232 			mutex_unlock(&clnt_fd_lock);
233 			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
234 			goto err;
235 		}
236 	}
237 	mutex_unlock(&clnt_fd_lock);
238 	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
239 	if (!__rpc_fd2sockinfo(fd, &si))
240 		goto err;
241 
242 	ct->ct_closeit = FALSE;
243 
244 	/*
245 	 * Set up private data struct
246 	 */
247 	ct->ct_fd = fd;
248 	ct->ct_wait.tv_usec = 0;
249 	ct->ct_waitset = FALSE;
250 	ct->ct_addr.buf = malloc(raddr->maxlen);
251 	if (ct->ct_addr.buf == NULL)
252 		goto err;
253 	memcpy(ct->ct_addr.buf, raddr->buf, raddr->len);
254 	ct->ct_addr.len = raddr->maxlen;
255 	ct->ct_addr.maxlen = raddr->maxlen;
256 
257 	/*
258 	 * Initialize call message
259 	 */
260 	gettimeofday(&now, NULL);
261 	call_msg.rm_xid = ((u_int32_t)++disrupt) ^ __RPC_GETXID(&now);
262 	call_msg.rm_direction = CALL;
263 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
264 	call_msg.rm_call.cb_prog = (u_int32_t)prog;
265 	call_msg.rm_call.cb_vers = (u_int32_t)vers;
266 
267 	/*
268 	 * pre-serialize the static part of the call msg and stash it away
269 	 */
270 	xdrmem_create(&(ct->ct_xdrs), ct->ct_u.ct_mcallc, MCALL_MSG_SIZE,
271 	    XDR_ENCODE);
272 	if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
273 		if (ct->ct_closeit) {
274 			_close(fd);
275 		}
276 		goto err;
277 	}
278 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
279 	XDR_DESTROY(&(ct->ct_xdrs));
280 
281 	/*
282 	 * Create a client handle which uses xdrrec for serialization
283 	 * and authnone for authentication.
284 	 */
285 	cl->cl_ops = clnt_vc_ops();
286 	cl->cl_private = ct;
287 	cl->cl_auth = authnone_create();
288 	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
289 	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
290 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz,
291 	    cl->cl_private, read_vc, write_vc);
292 	return (cl);
293 
294 err:
295 	if (cl) {
296 		if (ct) {
297 			if (ct->ct_addr.len)
298 				mem_free(ct->ct_addr.buf, ct->ct_addr.len);
299 			mem_free(ct, sizeof (struct ct_data));
300 		}
301 		if (cl)
302 			mem_free(cl, sizeof (CLIENT));
303 	}
304 	return (NULL);
305 }
306 
307 static enum clnt_stat
308 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, void *args_ptr,
309 	     xdrproc_t xdr_results, void *results_ptr, struct timeval timeout)
310 {
311 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
312 	XDR *xdrs = &(ct->ct_xdrs);
313 	struct rpc_msg reply_msg;
314 	u_int32_t x_id;
315 	u_int32_t *msg_x_id = &ct->ct_u.ct_mcalli;    /* yuk */
316 	bool_t shipnow;
317 	int refreshes = 2;
318 	sigset_t mask, newmask;
319 	int rpc_lock_value;
320 
321 	assert(cl != NULL);
322 
323 	sigfillset(&newmask);
324 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
325 	mutex_lock(&clnt_fd_lock);
326 	while (vc_fd_locks[ct->ct_fd])
327 		cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
328 	if (__isthreaded)
329                 rpc_lock_value = 1;
330         else
331                 rpc_lock_value = 0;
332 	vc_fd_locks[ct->ct_fd] = rpc_lock_value;
333 	mutex_unlock(&clnt_fd_lock);
334 	if (!ct->ct_waitset) {
335 		/* If time is not within limits, we ignore it. */
336 		if (time_not_ok(&timeout) == FALSE)
337 			ct->ct_wait = timeout;
338 	}
339 
340 	shipnow =
341 	    (xdr_results == NULL && timeout.tv_sec == 0
342 	    && timeout.tv_usec == 0) ? FALSE : TRUE;
343 
344 call_again:
345 	xdrs->x_op = XDR_ENCODE;
346 	ct->ct_error.re_status = RPC_SUCCESS;
347 	x_id = ntohl(--(*msg_x_id));
348 
349 	if ((! XDR_PUTBYTES(xdrs, ct->ct_u.ct_mcallc, ct->ct_mpos)) ||
350 	    (! XDR_PUTINT32(xdrs, &proc)) ||
351 	    (! AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
352 	    (! (*xdr_args)(xdrs, args_ptr))) {
353 		if (ct->ct_error.re_status == RPC_SUCCESS)
354 			ct->ct_error.re_status = RPC_CANTENCODEARGS;
355 		xdrrec_endofrecord(xdrs, TRUE);
356 		release_fd_lock(ct->ct_fd, mask);
357 		return (ct->ct_error.re_status);
358 	}
359 	if (! xdrrec_endofrecord(xdrs, shipnow)) {
360 		release_fd_lock(ct->ct_fd, mask);
361 		return (ct->ct_error.re_status = RPC_CANTSEND);
362 	}
363 	if (! shipnow) {
364 		release_fd_lock(ct->ct_fd, mask);
365 		return (RPC_SUCCESS);
366 	}
367 	/*
368 	 * Hack to provide rpc-based message passing
369 	 */
370 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
371 		release_fd_lock(ct->ct_fd, mask);
372 		return(ct->ct_error.re_status = RPC_TIMEDOUT);
373 	}
374 
375 
376 	/*
377 	 * Keep receiving until we get a valid transaction id
378 	 */
379 	xdrs->x_op = XDR_DECODE;
380 	while (TRUE) {
381 		reply_msg.acpted_rply.ar_verf = _null_auth;
382 		reply_msg.acpted_rply.ar_results.where = NULL;
383 		reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
384 		if (! xdrrec_skiprecord(xdrs)) {
385 			release_fd_lock(ct->ct_fd, mask);
386 			return (ct->ct_error.re_status);
387 		}
388 		/* now decode and validate the response header */
389 		if (! xdr_replymsg(xdrs, &reply_msg)) {
390 			if (ct->ct_error.re_status == RPC_SUCCESS)
391 				continue;
392 			release_fd_lock(ct->ct_fd, mask);
393 			return (ct->ct_error.re_status);
394 		}
395 		if (reply_msg.rm_xid == x_id)
396 			break;
397 	}
398 
399 	/*
400 	 * process header
401 	 */
402 	_seterr_reply(&reply_msg, &(ct->ct_error));
403 	if (ct->ct_error.re_status == RPC_SUCCESS) {
404 		if (! AUTH_VALIDATE(cl->cl_auth,
405 		    &reply_msg.acpted_rply.ar_verf)) {
406 			ct->ct_error.re_status = RPC_AUTHERROR;
407 			ct->ct_error.re_why = AUTH_INVALIDRESP;
408 		} else if (! (*xdr_results)(xdrs, results_ptr)) {
409 			if (ct->ct_error.re_status == RPC_SUCCESS)
410 				ct->ct_error.re_status = RPC_CANTDECODERES;
411 		}
412 		/* free verifier ... */
413 		if (reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
414 			xdrs->x_op = XDR_FREE;
415 			xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
416 		}
417 	}  /* end successful completion */
418 	else {
419 		/* maybe our credentials need to be refreshed ... */
420 		if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
421 			goto call_again;
422 	}  /* end of unsuccessful completion */
423 	release_fd_lock(ct->ct_fd, mask);
424 	return (ct->ct_error.re_status);
425 }
426 
427 static void
428 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
429 {
430 	struct ct_data *ct;
431 
432 	assert(cl != NULL);
433 	assert(errp != NULL);
434 
435 	ct = (struct ct_data *) cl->cl_private;
436 	*errp = ct->ct_error;
437 }
438 
439 static bool_t
440 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, void *res_ptr)
441 {
442 	struct ct_data *ct;
443 	XDR *xdrs;
444 	bool_t dummy;
445 	sigset_t mask;
446 	sigset_t newmask;
447 
448 	assert(cl != NULL);
449 
450 	ct = (struct ct_data *)cl->cl_private;
451 	xdrs = &(ct->ct_xdrs);
452 
453 	sigfillset(&newmask);
454 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
455 	mutex_lock(&clnt_fd_lock);
456 	while (vc_fd_locks[ct->ct_fd])
457 		cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
458 	xdrs->x_op = XDR_FREE;
459 	dummy = (*xdr_res)(xdrs, res_ptr);
460 	mutex_unlock(&clnt_fd_lock);
461 	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
462 	cond_signal(&vc_cv[ct->ct_fd]);
463 
464 	return dummy;
465 }
466 
467 /*ARGSUSED*/
468 static void
469 clnt_vc_abort(CLIENT *cl __unused)
470 {
471 }
472 
473 static bool_t
474 clnt_vc_control(CLIENT *cl, u_int request, void *info)
475 {
476 	struct ct_data *ct;
477 	void *infop = info;
478 	sigset_t mask;
479 	sigset_t newmask;
480 	int rpc_lock_value;
481 
482 	assert(cl != NULL);
483 
484 	ct = (struct ct_data *)cl->cl_private;
485 
486 	sigfillset(&newmask);
487 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
488 	mutex_lock(&clnt_fd_lock);
489 	while (vc_fd_locks[ct->ct_fd])
490 		cond_wait(&vc_cv[ct->ct_fd], &clnt_fd_lock);
491 	if (__isthreaded)
492                 rpc_lock_value = 1;
493         else
494                 rpc_lock_value = 0;
495 	vc_fd_locks[ct->ct_fd] = rpc_lock_value;
496 	mutex_unlock(&clnt_fd_lock);
497 
498 	switch (request) {
499 	case CLSET_FD_CLOSE:
500 		ct->ct_closeit = TRUE;
501 		release_fd_lock(ct->ct_fd, mask);
502 		return (TRUE);
503 	case CLSET_FD_NCLOSE:
504 		ct->ct_closeit = FALSE;
505 		release_fd_lock(ct->ct_fd, mask);
506 		return (TRUE);
507 	default:
508 		break;
509 	}
510 
511 	/* for other requests which use info */
512 	if (info == NULL) {
513 		release_fd_lock(ct->ct_fd, mask);
514 		return (FALSE);
515 	}
516 	switch (request) {
517 	case CLSET_TIMEOUT:
518 		if (time_not_ok((struct timeval *)info)) {
519 			release_fd_lock(ct->ct_fd, mask);
520 			return (FALSE);
521 		}
522 		ct->ct_wait = *(struct timeval *)infop;
523 		ct->ct_waitset = TRUE;
524 		break;
525 	case CLGET_TIMEOUT:
526 		*(struct timeval *)infop = ct->ct_wait;
527 		break;
528 	case CLGET_SERVER_ADDR:
529 		memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
530 		break;
531 	case CLGET_FD:
532 		*(int *)info = ct->ct_fd;
533 		break;
534 	case CLGET_SVC_ADDR:
535 		/* The caller should not free this memory area */
536 		*(struct netbuf *)info = ct->ct_addr;
537 		break;
538 	case CLSET_SVC_ADDR:		/* set to new address */
539 		release_fd_lock(ct->ct_fd, mask);
540 		return (FALSE);
541 	case CLGET_XID:
542 		/*
543 		 * use the knowledge that xid is the
544 		 * first element in the call structure
545 		 * This will get the xid of the PREVIOUS call
546 		 */
547 		*(u_int32_t *)info =
548 		    ntohl(*(u_int32_t *)(void *)&ct->ct_u.ct_mcalli);
549 		break;
550 	case CLSET_XID:
551 		/* This will set the xid of the NEXT call */
552 		*(u_int32_t *)(void *)&ct->ct_u.ct_mcalli =
553 		    htonl(*((u_int32_t *)info) + 1);
554 		/* increment by 1 as clnt_vc_call() decrements once */
555 		break;
556 	case CLGET_VERS:
557 		/*
558 		 * This RELIES on the information that, in the call body,
559 		 * the version number field is the fifth field from the
560 		 * begining of the RPC header. MUST be changed if the
561 		 * call_struct is changed
562 		 */
563 		*(u_int32_t *)info =
564 		    ntohl(*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
565 		    4 * BYTES_PER_XDR_UNIT));
566 		break;
567 
568 	case CLSET_VERS:
569 		*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
570 		    4 * BYTES_PER_XDR_UNIT) =
571 		    htonl(*(u_int32_t *)info);
572 		break;
573 
574 	case CLGET_PROG:
575 		/*
576 		 * This RELIES on the information that, in the call body,
577 		 * the program number field is the fourth field from the
578 		 * begining of the RPC header. MUST be changed if the
579 		 * call_struct is changed
580 		 */
581 		*(u_int32_t *)info =
582 		    ntohl(*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
583 		    3 * BYTES_PER_XDR_UNIT));
584 		break;
585 
586 	case CLSET_PROG:
587 		*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
588 		    3 * BYTES_PER_XDR_UNIT) =
589 		    htonl(*(u_int32_t *)info);
590 		break;
591 
592 	default:
593 		release_fd_lock(ct->ct_fd, mask);
594 		return (FALSE);
595 	}
596 	release_fd_lock(ct->ct_fd, mask);
597 	return (TRUE);
598 }
599 
600 
601 static void
602 clnt_vc_destroy(CLIENT *cl)
603 {
604 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
605 	int ct_fd = ct->ct_fd;
606 	sigset_t mask;
607 	sigset_t newmask;
608 
609 	assert(cl != NULL);
610 
611 	ct = (struct ct_data *) cl->cl_private;
612 
613 	sigfillset(&newmask);
614 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
615 	mutex_lock(&clnt_fd_lock);
616 	while (vc_fd_locks[ct_fd])
617 		cond_wait(&vc_cv[ct_fd], &clnt_fd_lock);
618 	if (ct->ct_closeit && ct->ct_fd != -1) {
619 		_close(ct->ct_fd);
620 	}
621 	XDR_DESTROY(&(ct->ct_xdrs));
622 	if (ct->ct_addr.buf)
623 		free(ct->ct_addr.buf);
624 	mem_free(ct, sizeof(struct ct_data));
625 	mem_free(cl, sizeof(CLIENT));
626 	mutex_unlock(&clnt_fd_lock);
627 	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
628 	cond_signal(&vc_cv[ct_fd]);
629 }
630 
631 /*
632  * Interface between xdr serializer and tcp connection.
633  * Behaves like the system calls, read & write, but keeps some error state
634  * around for the rpc level.
635  */
636 static int
637 read_vc(void *ctp, void *buf, int len)
638 {
639 	struct sockaddr sa;
640 	socklen_t sal;
641 	struct ct_data *ct = (struct ct_data *)ctp;
642 	struct pollfd fd;
643 	int milliseconds = (int)((ct->ct_wait.tv_sec * 1000) +
644 	    (ct->ct_wait.tv_usec / 1000));
645 
646 	if (len == 0)
647 		return (0);
648 	fd.fd = ct->ct_fd;
649 	fd.events = POLLIN;
650 	for (;;) {
651 		switch (_poll(&fd, 1, milliseconds)) {
652 		case 0:
653 			ct->ct_error.re_status = RPC_TIMEDOUT;
654 			return (-1);
655 
656 		case -1:
657 			if (errno == EINTR)
658 				continue;
659 			ct->ct_error.re_status = RPC_CANTRECV;
660 			ct->ct_error.re_errno = errno;
661 			return (-1);
662 		}
663 		break;
664 	}
665 
666 	sal = sizeof(sa);
667 	if ((_getpeername(ct->ct_fd, &sa, &sal) == 0) &&
668 	    (sa.sa_family == AF_LOCAL)) {
669 		len = __msgread(ct->ct_fd, buf, (size_t)len);
670 	} else {
671 		len = _read(ct->ct_fd, buf, (size_t)len);
672 	}
673 
674 	switch (len) {
675 	case 0:
676 		/* premature eof */
677 		ct->ct_error.re_errno = ECONNRESET;
678 		ct->ct_error.re_status = RPC_CANTRECV;
679 		len = -1;  /* it's really an error */
680 		break;
681 
682 	case -1:
683 		ct->ct_error.re_errno = errno;
684 		ct->ct_error.re_status = RPC_CANTRECV;
685 		break;
686 	}
687 	return (len);
688 }
689 
690 static int
691 write_vc(void *ctp, void *buf, int len)
692 {
693 	struct sockaddr sa;
694 	socklen_t sal;
695 	struct ct_data *ct = (struct ct_data *)ctp;
696 	int i, cnt;
697 
698 	sal = sizeof(sa);
699 	if ((_getpeername(ct->ct_fd, &sa, &sal) == 0) &&
700 	    (sa.sa_family == AF_LOCAL)) {
701 		for (cnt = len; cnt > 0; cnt -= i, buf = (char *)buf + i) {
702 			if ((i = __msgwrite(ct->ct_fd, buf,
703 			     (size_t)cnt)) == -1) {
704 				ct->ct_error.re_errno = errno;
705 				ct->ct_error.re_status = RPC_CANTSEND;
706 				return (-1);
707 			}
708 		}
709 	} else {
710 		for (cnt = len; cnt > 0; cnt -= i, buf = (char *)buf + i) {
711 			if ((i = _write(ct->ct_fd, buf, (size_t)cnt)) == -1) {
712 				ct->ct_error.re_errno = errno;
713 				ct->ct_error.re_status = RPC_CANTSEND;
714 				return (-1);
715 			}
716 		}
717 	}
718 	return (len);
719 }
720 
721 static struct clnt_ops *
722 clnt_vc_ops(void)
723 {
724 	static struct clnt_ops ops;
725 	sigset_t mask, newmask;
726 
727 	/* VARIABLES PROTECTED BY ops_lock: ops */
728 
729 	sigfillset(&newmask);
730 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
731 	mutex_lock(&ops_lock);
732 	if (ops.cl_call == NULL) {
733 		ops.cl_call = clnt_vc_call;
734 		ops.cl_abort = clnt_vc_abort;
735 		ops.cl_geterr = clnt_vc_geterr;
736 		ops.cl_freeres = clnt_vc_freeres;
737 		ops.cl_destroy = clnt_vc_destroy;
738 		ops.cl_control = clnt_vc_control;
739 	}
740 	mutex_unlock(&ops_lock);
741 	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
742 	return (&ops);
743 }
744 
745 /*
746  * Make sure that the time is not garbage.   -1 value is disallowed.
747  * Note this is different from time_not_ok in clnt_dg.c
748  */
749 static bool_t
750 time_not_ok(struct timeval *t)
751 {
752 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
753 		t->tv_usec <= -1 || t->tv_usec > 1000000);
754 }
755 
756 static int
757 __msgread(int sock, void *buf, size_t cnt)
758 {
759 	struct iovec iov[1];
760 	struct msghdr msg;
761 	union {
762 		struct cmsghdr cmsg;
763 		char control[CMSG_SPACE(sizeof(struct cmsgcred))];
764 	} cm;
765 
766 	bzero((char *)&cm, sizeof(cm));
767 	iov[0].iov_base = buf;
768 	iov[0].iov_len = cnt;
769 
770 	msg.msg_iov = iov;
771 	msg.msg_iovlen = 1;
772 	msg.msg_name = NULL;
773 	msg.msg_namelen = 0;
774 	msg.msg_control = (caddr_t)&cm;
775 	msg.msg_controllen = CMSG_SPACE(sizeof(struct cmsgcred));
776 	msg.msg_flags = 0;
777 
778 	return(_recvmsg(sock, &msg, 0));
779 }
780 
781 static int
782 __msgwrite(int sock, void *buf, size_t cnt)
783 {
784 	struct iovec iov[1];
785 	struct msghdr msg;
786 	union {
787 		struct cmsghdr cmsg;
788 		char control[CMSG_SPACE(sizeof(struct cmsgcred))];
789 	} cm;
790 
791 	bzero((char *)&cm, sizeof(cm));
792 	iov[0].iov_base = buf;
793 	iov[0].iov_len = cnt;
794 
795 	cm.cmsg.cmsg_type = SCM_CREDS;
796 	cm.cmsg.cmsg_level = SOL_SOCKET;
797 	cm.cmsg.cmsg_len = CMSG_LEN(sizeof(struct cmsgcred));
798 
799 	msg.msg_iov = iov;
800 	msg.msg_iovlen = 1;
801 	msg.msg_name = NULL;
802 	msg.msg_namelen = 0;
803 	msg.msg_control = (caddr_t)&cm;
804 	msg.msg_controllen = CMSG_SPACE(sizeof(struct cmsgcred));
805 	msg.msg_flags = 0;
806 
807 	return(_sendmsg(sock, &msg, 0));
808 }
809