xref: /illumos-gate/usr/src/lib/libnsl/rpc/clnt_vc.c (revision 79033acb)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 
23 /*
24  * Copyright 2005 Sun Microsystems, Inc.  All rights reserved.
25  * Use is subject to license terms.
26  */
27 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
28 /* All Rights Reserved */
29 /*
30  * Portions of this source code were derived from Berkeley
31  * 4.3 BSD under license from the Regents of the University of
32  * California.
33  */
34 
35 #pragma ident	"%Z%%M%	%I%	%E% SMI"
36 
37 /*
38  * clnt_vc.c
39  *
40  * Implements a connectionful client side RPC.
41  *
42  * Connectionful RPC supports 'batched calls'.
43  * A sequence of calls may be batched-up in a send buffer. The rpc call
44  * return immediately to the client even though the call was not necessarily
45  * sent. The batching occurs if the results' xdr routine is NULL (0) AND
46  * the rpc timeout value is zero (see clnt.h, rpc).
47  *
48  * Clients should NOT casually batch calls that in fact return results; that
49  * is the server side should be aware that a call is batched and not produce
50  * any return message. Batched calls that produce many result messages can
51  * deadlock (netlock) the client and the server....
52  */
53 
54 
55 #include "mt.h"
56 #include "rpc_mt.h"
57 #include <assert.h>
58 #include <rpc/rpc.h>
59 #include <errno.h>
60 #include <sys/byteorder.h>
61 #include <sys/mkdev.h>
62 #include <sys/poll.h>
63 #include <syslog.h>
64 #include <stdlib.h>
65 #include <unistd.h>
66 #include <netinet/tcp.h>
67 
68 #define	MCALL_MSG_SIZE 24
69 #define	SECS_TO_MS 1000
70 #define	USECS_TO_MS 1/1000
71 #ifndef MIN
72 #define	MIN(a, b)	(((a) < (b)) ? (a) : (b))
73 #endif
74 
75 extern int __rpc_timeval_to_msec(struct timeval *);
76 extern int __rpc_compress_pollfd(int, pollfd_t *, pollfd_t *);
77 extern bool_t xdr_opaque_auth(XDR *, struct opaque_auth *);
78 extern bool_t __rpc_gss_wrap(AUTH *, char *, uint_t, XDR *, bool_t (*)(),
79 								caddr_t);
80 extern bool_t __rpc_gss_unwrap(AUTH *, XDR *, bool_t (*)(), caddr_t);
81 extern CLIENT *_clnt_vc_create_timed(int, struct netbuf *, rpcprog_t,
82 		rpcvers_t, uint_t, uint_t, const struct timeval *);
83 
84 static struct clnt_ops	*clnt_vc_ops(void);
85 static int		read_vc(void *, caddr_t, int);
86 static int		write_vc(void *, caddr_t, int);
87 static int		t_rcvall(int, char *, int);
88 static bool_t		time_not_ok(struct timeval *);
89 
90 struct ct_data;
91 static bool_t		set_up_connection(int, struct netbuf *,
92 				struct ct_data *, const struct timeval *);
93 static bool_t		set_io_mode(struct ct_data *, int);
94 
95 /*
96  * Lock table handle used by various MT sync. routines
97  */
98 static mutex_t	vctbl_lock = DEFAULTMUTEX;
99 static void	*vctbl = NULL;
100 
101 static const char clnt_vc_errstr[] = "%s : %s";
102 static const char clnt_vc_str[] = "clnt_vc_create";
103 static const char clnt_read_vc_str[] = "read_vc";
104 static const char __no_mem_str[] = "out of memory";
105 static const char no_fcntl_getfl_str[] = "could not get status flags and modes";
106 static const char no_nonblock_str[] = "could not set transport blocking mode";
107 
108 /*
109  * Private data structure
110  */
111 struct ct_data {
112 	int		ct_fd;		/* connection's fd */
113 	bool_t		ct_closeit;	/* close it on destroy */
114 	int		ct_tsdu;	/* size of tsdu */
115 	int		ct_wait;	/* wait interval in milliseconds */
116 	bool_t		ct_waitset;	/* wait set by clnt_control? */
117 	struct netbuf	ct_addr;	/* remote addr */
118 	struct rpc_err	ct_error;
119 	char		ct_mcall[MCALL_MSG_SIZE]; /* marshalled callmsg */
120 	uint_t		ct_mpos;	/* pos after marshal */
121 	XDR		ct_xdrs;	/* XDR stream */
122 
123 	/* NON STANDARD INFO - 00-08-31 */
124 	bool_t		ct_is_oneway; /* True if the current call is oneway. */
125 	bool_t		ct_is_blocking;
126 	ushort_t	ct_io_mode;
127 	ushort_t	ct_blocking_mode;
128 	uint_t		ct_bufferSize; /* Total size of the buffer. */
129 	uint_t		ct_bufferPendingSize; /* Size of unsent data. */
130 	char 		*ct_buffer; /* Pointer to the buffer. */
131 	char 		*ct_bufferWritePtr; /* Ptr to the first free byte. */
132 	char 		*ct_bufferReadPtr; /* Ptr to the first byte of data. */
133 };
134 
135 struct nb_reg_node {
136 	struct nb_reg_node *next;
137 	struct ct_data *ct;
138 };
139 
140 static struct nb_reg_node *nb_first = (struct nb_reg_node *)&nb_first;
141 static struct nb_reg_node *nb_free  = (struct nb_reg_node *)&nb_free;
142 
143 static bool_t exit_handler_set = FALSE;
144 
145 static mutex_t nb_list_mutex = DEFAULTMUTEX;
146 
147 
148 /* Define some macros to manage the linked list. */
149 #define	LIST_ISEMPTY(l) (l == (struct nb_reg_node *)&l)
150 #define	LIST_CLR(l) (l = (struct nb_reg_node *)&l)
151 #define	LIST_ADD(l, node) (node->next = l->next, l = node)
152 #define	LIST_EXTRACT(l, node) (node = l, l = l->next)
153 #define	LIST_FOR_EACH(l, node) \
154 	for (node = l; node != (struct nb_reg_node *)&l; node = node->next)
155 
156 
157 /* Default size of the IO buffer used in non blocking mode */
158 #define	DEFAULT_PENDING_ZONE_MAX_SIZE (16*1024)
159 
160 static int nb_send(struct ct_data *, void *, unsigned int);
161 static int do_flush(struct ct_data *, uint_t);
162 static bool_t set_flush_mode(struct ct_data *, int);
163 static bool_t set_blocking_connection(struct ct_data *, bool_t);
164 
165 static int register_nb(struct ct_data *);
166 static int unregister_nb(struct ct_data *);
167 
168 
169 /*
170  * Change the mode of the underlying fd.
171  */
172 static bool_t
173 set_blocking_connection(struct ct_data *ct, bool_t blocking)
174 {
175 	int flag;
176 
177 	/*
178 	 * If the underlying fd is already in the required mode,
179 	 * avoid the syscall.
180 	 */
181 	if (ct->ct_is_blocking == blocking)
182 		return (TRUE);
183 
184 	if ((flag = fcntl(ct->ct_fd, F_GETFL, 0)) < 0) {
185 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
186 		    no_fcntl_getfl_str);
187 		return (FALSE);
188 	}
189 
190 	flag = blocking? flag&~O_NONBLOCK : flag|O_NONBLOCK;
191 	if (fcntl(ct->ct_fd, F_SETFL, flag) != 0) {
192 		(void) syslog(LOG_ERR, "set_blocking_connection : %s",
193 		    no_nonblock_str);
194 		return (FALSE);
195 	}
196 	ct->ct_is_blocking = blocking;
197 	return (TRUE);
198 }
199 
200 /*
201  * Create a client handle for a connection.
202  * Default options are set, which the user can change using clnt_control()'s.
203  * The rpc/vc package does buffering similar to stdio, so the client
204  * must pick send and receive buffer sizes, 0 => use the default.
205  * NB: fd is copied into a private area.
206  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
207  * set this something more useful.
208  *
209  * fd should be open and bound.
210  */
211 CLIENT *
212 clnt_vc_create(const int fd, struct netbuf *svcaddr, const rpcprog_t prog,
213 	const rpcvers_t vers, const uint_t sendsz, const uint_t recvsz)
214 {
215 	return (_clnt_vc_create_timed(fd, svcaddr, prog, vers, sendsz,
216 			recvsz, NULL));
217 }
218 
219 /*
220  * This has the same definition as clnt_vc_create(), except it
221  * takes an additional parameter - a pointer to a timeval structure.
222  *
223  * Not a public interface. This is for clnt_create_timed,
224  * clnt_create_vers_timed, clnt_tp_create_timed to pass down the timeout
225  * value to control a tcp connection attempt.
226  * (for bug 4049792: clnt_create_timed does not time out)
227  *
228  * If tp is NULL, use default timeout to set up the connection.
229  */
230 CLIENT *
231 _clnt_vc_create_timed(int fd, struct netbuf *svcaddr, rpcprog_t prog,
232 	rpcvers_t vers, uint_t sendsz, uint_t recvsz, const struct timeval *tp)
233 {
234 	CLIENT *cl;			/* client handle */
235 	struct ct_data *ct;		/* private data */
236 	struct timeval now;
237 	struct rpc_msg call_msg;
238 	struct t_info tinfo;
239 	int flag;
240 
241 	cl = malloc(sizeof (*cl));
242 	ct = malloc(sizeof (*ct));
243 	if ((cl == NULL) || (ct == NULL)) {
244 		(void) syslog(LOG_ERR, clnt_vc_errstr,
245 				clnt_vc_str, __no_mem_str);
246 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
247 		rpc_createerr.cf_error.re_errno = errno;
248 		rpc_createerr.cf_error.re_terrno = 0;
249 		goto err;
250 	}
251 	ct->ct_addr.buf = NULL;
252 
253 	sig_mutex_lock(&vctbl_lock);
254 
255 	if ((vctbl == NULL) && ((vctbl = rpc_fd_init()) == NULL)) {
256 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
257 		rpc_createerr.cf_error.re_errno = errno;
258 		rpc_createerr.cf_error.re_terrno = 0;
259 		sig_mutex_unlock(&vctbl_lock);
260 		goto err;
261 	}
262 
263 	ct->ct_io_mode = RPC_CL_BLOCKING;
264 	ct->ct_blocking_mode = RPC_CL_BLOCKING_FLUSH;
265 
266 	ct->ct_buffer = NULL;	/* We allocate the buffer when needed. */
267 	ct->ct_bufferSize = DEFAULT_PENDING_ZONE_MAX_SIZE;
268 	ct->ct_bufferPendingSize = 0;
269 	ct->ct_bufferWritePtr = NULL;
270 	ct->ct_bufferReadPtr = NULL;
271 
272 	/* Check the current state of the fd. */
273 	if ((flag = fcntl(fd, F_GETFL, 0)) < 0) {
274 		(void) syslog(LOG_ERR, "_clnt_vc_create_timed : %s",
275 		    no_fcntl_getfl_str);
276 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
277 		rpc_createerr.cf_error.re_terrno = errno;
278 		rpc_createerr.cf_error.re_errno = 0;
279 		sig_mutex_unlock(&vctbl_lock);
280 		goto err;
281 	}
282 	ct->ct_is_blocking = flag & O_NONBLOCK ? FALSE : TRUE;
283 
284 	if (set_up_connection(fd, svcaddr, ct, tp) == FALSE) {
285 		sig_mutex_unlock(&vctbl_lock);
286 		goto err;
287 	}
288 	sig_mutex_unlock(&vctbl_lock);
289 
290 	/*
291 	 * Set up other members of private data struct
292 	 */
293 	ct->ct_fd = fd;
294 	/*
295 	 * The actual value will be set by clnt_call or clnt_control
296 	 */
297 	ct->ct_wait = 30000;
298 	ct->ct_waitset = FALSE;
299 	/*
300 	 * By default, closeit is always FALSE. It is users responsibility
301 	 * to do a t_close on it, else the user may use clnt_control
302 	 * to let clnt_destroy do it for him/her.
303 	 */
304 	ct->ct_closeit = FALSE;
305 
306 	/*
307 	 * Initialize call message
308 	 */
309 	(void) gettimeofday(&now, (struct timezone *)0);
310 	call_msg.rm_xid = getpid() ^ now.tv_sec ^ now.tv_usec;
311 	call_msg.rm_call.cb_prog = prog;
312 	call_msg.rm_call.cb_vers = vers;
313 
314 	/*
315 	 * pre-serialize the static part of the call msg and stash it away
316 	 */
317 	xdrmem_create(&(ct->ct_xdrs), ct->ct_mcall, MCALL_MSG_SIZE, XDR_ENCODE);
318 	if (!xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
319 		goto err;
320 	}
321 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
322 	XDR_DESTROY(&(ct->ct_xdrs));
323 
324 	if (t_getinfo(fd, &tinfo) == -1) {
325 		rpc_createerr.cf_stat = RPC_TLIERROR;
326 		rpc_createerr.cf_error.re_terrno = t_errno;
327 		rpc_createerr.cf_error.re_errno = 0;
328 		goto err;
329 	}
330 	/*
331 	 * Find the receive and the send size
332 	 */
333 	sendsz = __rpc_get_t_size((int)sendsz, tinfo.tsdu);
334 	recvsz = __rpc_get_t_size((int)recvsz, tinfo.tsdu);
335 	if ((sendsz == 0) || (recvsz == 0)) {
336 		rpc_createerr.cf_stat = RPC_TLIERROR;
337 		rpc_createerr.cf_error.re_terrno = 0;
338 		rpc_createerr.cf_error.re_errno = 0;
339 		goto err;
340 	}
341 	ct->ct_tsdu = tinfo.tsdu;
342 	/*
343 	 * Create a client handle which uses xdrrec for serialization
344 	 * and authnone for authentication.
345 	 */
346 	ct->ct_xdrs.x_ops = NULL;
347 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz, (caddr_t)ct,
348 			read_vc, write_vc);
349 	if (ct->ct_xdrs.x_ops == NULL) {
350 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
351 		rpc_createerr.cf_error.re_terrno = 0;
352 		rpc_createerr.cf_error.re_errno = ENOMEM;
353 		goto err;
354 	}
355 	cl->cl_ops = clnt_vc_ops();
356 	cl->cl_private = (caddr_t)ct;
357 	cl->cl_auth = authnone_create();
358 	cl->cl_tp = NULL;
359 	cl->cl_netid = NULL;
360 	return (cl);
361 
362 err:
363 	if (cl) {
364 		if (ct) {
365 			if (ct->ct_addr.len)
366 				free(ct->ct_addr.buf);
367 			free(ct);
368 		}
369 		free(cl);
370 	}
371 	return (NULL);
372 }
373 
374 #define	TCPOPT_BUFSIZE 128
375 
376 /*
377  * Set tcp connection timeout value.
378  * Retun 0 for success, -1 for failure.
379  */
380 static int
381 _set_tcp_conntime(int fd, int optval)
382 {
383 	struct t_optmgmt req, res;
384 	struct opthdr *opt;
385 	int *ip;
386 	char buf[TCPOPT_BUFSIZE];
387 
388 	/* LINTED pointer cast */
389 	opt = (struct opthdr *)buf;
390 	opt->level =  IPPROTO_TCP;
391 	opt->name = TCP_CONN_ABORT_THRESHOLD;
392 	opt->len = sizeof (int);
393 
394 	req.flags = T_NEGOTIATE;
395 	req.opt.len = sizeof (struct opthdr) + opt->len;
396 	req.opt.buf = (char *)opt;
397 	/* LINTED pointer cast */
398 	ip = (int *)((char *)buf + sizeof (struct opthdr));
399 	*ip = optval;
400 
401 	res.flags = 0;
402 	res.opt.buf = (char *)buf;
403 	res.opt.maxlen = sizeof (buf);
404 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
405 		return (-1);
406 	}
407 	return (0);
408 }
409 
410 /*
411  * Get current tcp connection timeout value.
412  * Retun 0 for success, -1 for failure.
413  */
414 static int
415 _get_tcp_conntime(int fd)
416 {
417 	struct t_optmgmt req, res;
418 	struct opthdr *opt;
419 	int *ip, retval;
420 	char buf[TCPOPT_BUFSIZE];
421 
422 	/* LINTED pointer cast */
423 	opt = (struct opthdr *)buf;
424 	opt->level =  IPPROTO_TCP;
425 	opt->name = TCP_CONN_ABORT_THRESHOLD;
426 	opt->len = sizeof (int);
427 
428 	req.flags = T_CURRENT;
429 	req.opt.len = sizeof (struct opthdr) + opt->len;
430 	req.opt.buf = (char *)opt;
431 	/* LINTED pointer cast */
432 	ip = (int *)((char *)buf + sizeof (struct opthdr));
433 	*ip = 0;
434 
435 	res.flags = 0;
436 	res.opt.buf = (char *)buf;
437 	res.opt.maxlen = sizeof (buf);
438 	if (t_optmgmt(fd, &req, &res) < 0 || res.flags != T_SUCCESS) {
439 		return (-1);
440 	}
441 
442 	/* LINTED pointer cast */
443 	ip = (int *)((char *)buf + sizeof (struct opthdr));
444 	retval = *ip;
445 	return (retval);
446 }
447 
448 static bool_t
449 set_up_connection(int fd, struct netbuf *svcaddr, struct ct_data *ct,
450 						const struct timeval *tp)
451 {
452 	int state;
453 	struct t_call sndcallstr, *rcvcall;
454 	int nconnect;
455 	bool_t connected, do_rcv_connect;
456 	int curr_time = 0;
457 
458 	ct->ct_addr.len = 0;
459 	state = t_getstate(fd);
460 	if (state == -1) {
461 		rpc_createerr.cf_stat = RPC_TLIERROR;
462 		rpc_createerr.cf_error.re_errno = 0;
463 		rpc_createerr.cf_error.re_terrno = t_errno;
464 		return (FALSE);
465 	}
466 
467 #ifdef DEBUG
468 	fprintf(stderr, "set_up_connection: state = %d\n", state);
469 #endif
470 	switch (state) {
471 	case T_IDLE:
472 		if (svcaddr == NULL) {
473 			rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
474 			return (FALSE);
475 		}
476 		/*
477 		 * Connect only if state is IDLE and svcaddr known
478 		 */
479 /* LINTED pointer alignment */
480 		rcvcall = (struct t_call *)t_alloc(fd, T_CALL, T_OPT|T_ADDR);
481 		if (rcvcall == NULL) {
482 			rpc_createerr.cf_stat = RPC_TLIERROR;
483 			rpc_createerr.cf_error.re_terrno = t_errno;
484 			rpc_createerr.cf_error.re_errno = errno;
485 			return (FALSE);
486 		}
487 		rcvcall->udata.maxlen = 0;
488 		sndcallstr.addr = *svcaddr;
489 		sndcallstr.opt.len = 0;
490 		sndcallstr.udata.len = 0;
491 		/*
492 		 * Even NULL could have sufficed for rcvcall, because
493 		 * the address returned is same for all cases except
494 		 * for the gateway case, and hence required.
495 		 */
496 		connected = FALSE;
497 		do_rcv_connect = FALSE;
498 
499 		/*
500 		 * If there is a timeout value specified, we will try to
501 		 * reset the tcp connection timeout. If the transport does
502 		 * not support the TCP_CONN_ABORT_THRESHOLD option or fails
503 		 * for other reason, default timeout will be used.
504 		 */
505 		if (tp != NULL) {
506 		    int ms;
507 
508 		    /* TCP_CONN_ABORT_THRESHOLD takes int value in millisecs */
509 		    ms = tp->tv_sec * SECS_TO_MS + tp->tv_usec * USECS_TO_MS;
510 		    if (((curr_time = _get_tcp_conntime(fd)) != -1) &&
511 			(_set_tcp_conntime(fd, ms) == 0)) {
512 			/* EMPTY */
513 #ifdef DEBUG
514 			fprintf(stderr, "set_up_connection: set tcp ");
515 			fprintf(stderr, "connection timeout to %d ms\n", ms);
516 #endif
517 		    }
518 		}
519 
520 		for (nconnect = 0; nconnect < 3; nconnect++) {
521 			if (t_connect(fd, &sndcallstr, rcvcall) != -1) {
522 				connected = TRUE;
523 				break;
524 			}
525 			if (!(t_errno == TSYSERR && errno == EINTR)) {
526 				break;
527 			}
528 			if ((state = t_getstate(fd)) == T_OUTCON) {
529 				do_rcv_connect = TRUE;
530 				break;
531 			}
532 			if (state != T_IDLE) {
533 				break;
534 			}
535 		}
536 		if (do_rcv_connect) {
537 			do {
538 				if (t_rcvconnect(fd, rcvcall) != -1) {
539 					connected = TRUE;
540 					break;
541 				}
542 			} while (t_errno == TSYSERR && errno == EINTR);
543 		}
544 
545 		/*
546 		 * Set the connection timeout back to its old value.
547 		 */
548 		if (curr_time) {
549 			(void) _set_tcp_conntime(fd, curr_time);
550 		}
551 
552 		if (!connected) {
553 			rpc_createerr.cf_stat = RPC_TLIERROR;
554 			rpc_createerr.cf_error.re_terrno = t_errno;
555 			rpc_createerr.cf_error.re_errno = errno;
556 			(void) t_free((char *)rcvcall, T_CALL);
557 #ifdef DEBUG
558 			fprintf(stderr, "clnt_vc: t_connect error %d\n",
559 				rpc_createerr.cf_error.re_terrno);
560 #endif
561 			return (FALSE);
562 		}
563 
564 		/* Free old area if allocated */
565 		if (ct->ct_addr.buf)
566 			free(ct->ct_addr.buf);
567 		ct->ct_addr = rcvcall->addr;	/* To get the new address */
568 		/* So that address buf does not get freed */
569 		rcvcall->addr.buf = NULL;
570 		(void) t_free((char *)rcvcall, T_CALL);
571 		break;
572 	case T_DATAXFER:
573 	case T_OUTCON:
574 		if (svcaddr == NULL) {
575 			/*
576 			 * svcaddr could also be NULL in cases where the
577 			 * client is already bound and connected.
578 			 */
579 			ct->ct_addr.len = 0;
580 		} else {
581 			ct->ct_addr.buf = malloc(svcaddr->len);
582 			if (ct->ct_addr.buf == NULL) {
583 				(void) syslog(LOG_ERR, clnt_vc_errstr,
584 					clnt_vc_str, __no_mem_str);
585 				rpc_createerr.cf_stat = RPC_SYSTEMERROR;
586 				rpc_createerr.cf_error.re_errno = errno;
587 				rpc_createerr.cf_error.re_terrno = 0;
588 				return (FALSE);
589 			}
590 			(void) memcpy(ct->ct_addr.buf, svcaddr->buf,
591 					(size_t)svcaddr->len);
592 			ct->ct_addr.len = ct->ct_addr.maxlen = svcaddr->len;
593 		}
594 		break;
595 	default:
596 		rpc_createerr.cf_stat = RPC_UNKNOWNADDR;
597 		return (FALSE);
598 	}
599 	return (TRUE);
600 }
601 
602 static enum clnt_stat
603 clnt_vc_call(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr,
604 	xdrproc_t xdr_results, caddr_t results_ptr, struct timeval timeout)
605 {
606 /* LINTED pointer alignment */
607 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
608 	XDR *xdrs = &(ct->ct_xdrs);
609 	struct rpc_msg reply_msg;
610 	uint32_t x_id;
611 /* LINTED pointer alignment */
612 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
613 	bool_t shipnow;
614 	int refreshes = 2;
615 
616 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
617 		rpc_callerr.re_status = RPC_FAILED;
618 		rpc_callerr.re_errno = errno;
619 		rpc_fd_unlock(vctbl, ct->ct_fd);
620 		return (RPC_FAILED);
621 	}
622 
623 	ct->ct_is_oneway = FALSE;
624 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
625 		if (do_flush(ct, RPC_CL_BLOCKING_FLUSH) != 0) {
626 			rpc_fd_unlock(vctbl, ct->ct_fd);
627 			return (RPC_FAILED);  /* XXX */
628 		}
629 	}
630 
631 	if (!ct->ct_waitset) {
632 		/* If time is not within limits, we ignore it. */
633 		if (time_not_ok(&timeout) == FALSE)
634 			ct->ct_wait = __rpc_timeval_to_msec(&timeout);
635 	} else {
636 		timeout.tv_sec = (ct->ct_wait / 1000);
637 		timeout.tv_usec = (ct->ct_wait % 1000) * 1000;
638 	}
639 
640 	shipnow = ((xdr_results == (xdrproc_t)0) && (timeout.tv_sec == 0) &&
641 	    (timeout.tv_usec == 0)) ? FALSE : TRUE;
642 call_again:
643 	xdrs->x_op = XDR_ENCODE;
644 	rpc_callerr.re_status = RPC_SUCCESS;
645 	/*
646 	 * Due to little endian byte order, it is necessary to convert to host
647 	 * format before decrementing xid.
648 	 */
649 	x_id = ntohl(*msg_x_id) - 1;
650 	*msg_x_id = htonl(x_id);
651 
652 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
653 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
654 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
655 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
656 		    (!xdr_args(xdrs, args_ptr))) {
657 			if (rpc_callerr.re_status == RPC_SUCCESS)
658 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
659 			(void) xdrrec_endofrecord(xdrs, TRUE);
660 			rpc_fd_unlock(vctbl, ct->ct_fd);
661 			return (rpc_callerr.re_status);
662 		}
663 	} else {
664 /* LINTED pointer alignment */
665 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
666 		IXDR_PUT_U_INT32(u, proc);
667 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
668 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
669 			if (rpc_callerr.re_status == RPC_SUCCESS)
670 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
671 			(void) xdrrec_endofrecord(xdrs, TRUE);
672 			rpc_fd_unlock(vctbl, ct->ct_fd);
673 			return (rpc_callerr.re_status);
674 		}
675 	}
676 	if (!xdrrec_endofrecord(xdrs, shipnow)) {
677 		rpc_fd_unlock(vctbl, ct->ct_fd);
678 		return (rpc_callerr.re_status = RPC_CANTSEND);
679 	}
680 	if (!shipnow) {
681 		rpc_fd_unlock(vctbl, ct->ct_fd);
682 		return (RPC_SUCCESS);
683 	}
684 	/*
685 	 * Hack to provide rpc-based message passing
686 	 */
687 	if (timeout.tv_sec == 0 && timeout.tv_usec == 0) {
688 		rpc_fd_unlock(vctbl, ct->ct_fd);
689 		return (rpc_callerr.re_status = RPC_TIMEDOUT);
690 	}
691 
692 
693 	/*
694 	 * Keep receiving until we get a valid transaction id
695 	 */
696 	xdrs->x_op = XDR_DECODE;
697 	for (;;) {
698 		reply_msg.acpted_rply.ar_verf = _null_auth;
699 		reply_msg.acpted_rply.ar_results.where = NULL;
700 		reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
701 		if (!xdrrec_skiprecord(xdrs)) {
702 			rpc_fd_unlock(vctbl, ct->ct_fd);
703 			return (rpc_callerr.re_status);
704 		}
705 		/* now decode and validate the response header */
706 		if (!xdr_replymsg(xdrs, &reply_msg)) {
707 			if (rpc_callerr.re_status == RPC_SUCCESS)
708 				continue;
709 			rpc_fd_unlock(vctbl, ct->ct_fd);
710 			return (rpc_callerr.re_status);
711 		}
712 		if (reply_msg.rm_xid == x_id)
713 			break;
714 	}
715 
716 	/*
717 	 * process header
718 	 */
719 	if ((reply_msg.rm_reply.rp_stat == MSG_ACCEPTED) &&
720 	    (reply_msg.acpted_rply.ar_stat == SUCCESS))
721 		rpc_callerr.re_status = RPC_SUCCESS;
722 	else
723 		__seterr_reply(&reply_msg, &(rpc_callerr));
724 
725 	if (rpc_callerr.re_status == RPC_SUCCESS) {
726 		if (!AUTH_VALIDATE(cl->cl_auth,
727 				&reply_msg.acpted_rply.ar_verf)) {
728 			rpc_callerr.re_status = RPC_AUTHERROR;
729 			rpc_callerr.re_why = AUTH_INVALIDRESP;
730 		} else if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
731 			if (!(*xdr_results)(xdrs, results_ptr)) {
732 				if (rpc_callerr.re_status == RPC_SUCCESS)
733 				    rpc_callerr.re_status = RPC_CANTDECODERES;
734 			}
735 		} else if (!__rpc_gss_unwrap(cl->cl_auth, xdrs, xdr_results,
736 							results_ptr)) {
737 			if (rpc_callerr.re_status == RPC_SUCCESS)
738 				rpc_callerr.re_status = RPC_CANTDECODERES;
739 		}
740 	}	/* end successful completion */
741 	/*
742 	 * If unsuccesful AND error is an authentication error
743 	 * then refresh credentials and try again, else break
744 	 */
745 	else if (rpc_callerr.re_status == RPC_AUTHERROR) {
746 		/* maybe our credentials need to be refreshed ... */
747 		if (refreshes-- && AUTH_REFRESH(cl->cl_auth, &reply_msg))
748 			goto call_again;
749 		else
750 			/*
751 			 * We are setting rpc_callerr here given that libnsl
752 			 * is not reentrant thereby reinitializing the TSD.
753 			 * If not set here then success could be returned even
754 			 * though refresh failed.
755 			 */
756 			rpc_callerr.re_status = RPC_AUTHERROR;
757 	} /* end of unsuccessful completion */
758 	/* free verifier ... */
759 	if (reply_msg.rm_reply.rp_stat == MSG_ACCEPTED &&
760 			reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
761 		xdrs->x_op = XDR_FREE;
762 		(void) xdr_opaque_auth(xdrs, &(reply_msg.acpted_rply.ar_verf));
763 	}
764 	rpc_fd_unlock(vctbl, ct->ct_fd);
765 	return (rpc_callerr.re_status);
766 }
767 
768 static enum clnt_stat
769 clnt_vc_send(CLIENT *cl, rpcproc_t proc, xdrproc_t xdr_args, caddr_t args_ptr)
770 {
771 /* LINTED pointer alignment */
772 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
773 	XDR *xdrs = &(ct->ct_xdrs);
774 	uint32_t x_id;
775 /* LINTED pointer alignment */
776 	uint32_t *msg_x_id = (uint32_t *)(ct->ct_mcall);	/* yuk */
777 
778 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
779 		rpc_callerr.re_status = RPC_FAILED;
780 		rpc_callerr.re_errno = errno;
781 		rpc_fd_unlock(vctbl, ct->ct_fd);
782 		return (RPC_FAILED);
783 	}
784 
785 	ct->ct_is_oneway = TRUE;
786 
787 	xdrs->x_op = XDR_ENCODE;
788 	rpc_callerr.re_status = RPC_SUCCESS;
789 	/*
790 	 * Due to little endian byte order, it is necessary to convert to host
791 	 * format before decrementing xid.
792 	 */
793 	x_id = ntohl(*msg_x_id) - 1;
794 	*msg_x_id = htonl(x_id);
795 
796 	if (cl->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
797 		if ((!XDR_PUTBYTES(xdrs, ct->ct_mcall, ct->ct_mpos)) ||
798 		    (!XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
799 		    (!AUTH_MARSHALL(cl->cl_auth, xdrs)) ||
800 		    (!xdr_args(xdrs, args_ptr))) {
801 			if (rpc_callerr.re_status == RPC_SUCCESS)
802 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
803 			(void) xdrrec_endofrecord(xdrs, TRUE);
804 			rpc_fd_unlock(vctbl, ct->ct_fd);
805 			return (rpc_callerr.re_status);
806 		}
807 	} else {
808 /* LINTED pointer alignment */
809 		uint32_t *u = (uint32_t *)&ct->ct_mcall[ct->ct_mpos];
810 		IXDR_PUT_U_INT32(u, proc);
811 		if (!__rpc_gss_wrap(cl->cl_auth, ct->ct_mcall,
812 		    ((char *)u) - ct->ct_mcall, xdrs, xdr_args, args_ptr)) {
813 			if (rpc_callerr.re_status == RPC_SUCCESS)
814 				rpc_callerr.re_status = RPC_CANTENCODEARGS;
815 			(void) xdrrec_endofrecord(xdrs, TRUE);
816 			rpc_fd_unlock(vctbl, ct->ct_fd);
817 			return (rpc_callerr.re_status);
818 		}
819 	}
820 
821 	/*
822 	 * Do not need to check errors, as the following code does
823 	 * not depend on the successful completion of the call.
824 	 * An error, if any occurs, is reported through
825 	 * rpc_callerr.re_status.
826 	 */
827 	(void) xdrrec_endofrecord(xdrs, TRUE);
828 
829 	rpc_fd_unlock(vctbl, ct->ct_fd);
830 	return (rpc_callerr.re_status);
831 }
832 
833 /* ARGSUSED */
834 static void
835 clnt_vc_geterr(CLIENT *cl, struct rpc_err *errp)
836 {
837 	*errp = rpc_callerr;
838 }
839 
840 static bool_t
841 clnt_vc_freeres(CLIENT *cl, xdrproc_t xdr_res, caddr_t res_ptr)
842 {
843 /* LINTED pointer alignment */
844 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
845 	XDR *xdrs = &(ct->ct_xdrs);
846 	bool_t stat;
847 
848 	(void) rpc_fd_lock(vctbl, ct->ct_fd);
849 	xdrs->x_op = XDR_FREE;
850 	stat = (*xdr_res)(xdrs, res_ptr);
851 	rpc_fd_unlock(vctbl, ct->ct_fd);
852 	return (stat);
853 }
854 
855 static void
856 clnt_vc_abort(void)
857 {
858 }
859 
860 /*ARGSUSED*/
861 static bool_t
862 clnt_vc_control(CLIENT *cl, int request, char *info)
863 {
864 	bool_t ret;
865 /* LINTED pointer alignment */
866 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
867 
868 	if (rpc_fd_lock(vctbl, ct->ct_fd)) {
869 		rpc_fd_unlock(vctbl, ct->ct_fd);
870 		return (RPC_FAILED);
871 	}
872 
873 	switch (request) {
874 	case CLSET_FD_CLOSE:
875 		ct->ct_closeit = TRUE;
876 		rpc_fd_unlock(vctbl, ct->ct_fd);
877 		return (TRUE);
878 	case CLSET_FD_NCLOSE:
879 		ct->ct_closeit = FALSE;
880 		rpc_fd_unlock(vctbl, ct->ct_fd);
881 		return (TRUE);
882 	case CLFLUSH:
883 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
884 			int res;
885 			res = do_flush(ct, (info == NULL ||
886 			    /* LINTED pointer cast */
887 			    *(int *)info == RPC_CL_DEFAULT_FLUSH)?
888 			    /* LINTED pointer cast */
889 			    ct->ct_blocking_mode: *(int *)info);
890 			ret = (0 == res);
891 		}
892 		rpc_fd_unlock(vctbl, ct->ct_fd);
893 		return (ret);
894 	}
895 
896 	/* for other requests which use info */
897 	if (info == NULL) {
898 		rpc_fd_unlock(vctbl, ct->ct_fd);
899 		return (FALSE);
900 	}
901 	switch (request) {
902 	case CLSET_TIMEOUT:
903 /* LINTED pointer alignment */
904 		if (time_not_ok((struct timeval *)info)) {
905 			rpc_fd_unlock(vctbl, ct->ct_fd);
906 			return (FALSE);
907 		}
908 /* LINTED pointer alignment */
909 		ct->ct_wait = __rpc_timeval_to_msec((struct timeval *)info);
910 		ct->ct_waitset = TRUE;
911 		break;
912 	case CLGET_TIMEOUT:
913 /* LINTED pointer alignment */
914 		((struct timeval *)info)->tv_sec = ct->ct_wait / 1000;
915 /* LINTED pointer alignment */
916 		((struct timeval *)info)->tv_usec =
917 			(ct->ct_wait % 1000) * 1000;
918 		break;
919 	case CLGET_SERVER_ADDR:	/* For compatibility only */
920 		(void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
921 		break;
922 	case CLGET_FD:
923 /* LINTED pointer alignment */
924 		*(int *)info = ct->ct_fd;
925 		break;
926 	case CLGET_SVC_ADDR:
927 		/* The caller should not free this memory area */
928 /* LINTED pointer alignment */
929 		*(struct netbuf *)info = ct->ct_addr;
930 		break;
931 	case CLSET_SVC_ADDR:		/* set to new address */
932 #ifdef undef
933 		/*
934 		 * XXX: once the t_snddis(), followed by t_connect() starts to
935 		 * work, this ifdef should be removed.  CLIENT handle reuse
936 		 * would then be possible for COTS as well.
937 		 */
938 		if (t_snddis(ct->ct_fd, NULL) == -1) {
939 			rpc_createerr.cf_stat = RPC_TLIERROR;
940 			rpc_createerr.cf_error.re_terrno = t_errno;
941 			rpc_createerr.cf_error.re_errno = errno;
942 			rpc_fd_unlock(vctbl, ct->ct_fd);
943 			return (FALSE);
944 		}
945 		ret = set_up_connection(ct->ct_fd, (struct netbuf *)info,
946 			ct, NULL));
947 		rpc_fd_unlock(vctbl, ct->ct_fd);
948 		return (ret);
949 #else
950 		rpc_fd_unlock(vctbl, ct->ct_fd);
951 		return (FALSE);
952 #endif
953 	case CLGET_XID:
954 		/*
955 		 * use the knowledge that xid is the
956 		 * first element in the call structure
957 		 * This will get the xid of the PREVIOUS call
958 		 */
959 /* LINTED pointer alignment */
960 		*(uint32_t *)info = ntohl(*(uint32_t *)ct->ct_mcall);
961 		break;
962 	case CLSET_XID:
963 		/* This will set the xid of the NEXT call */
964 /* LINTED pointer alignment */
965 		*(uint32_t *)ct->ct_mcall =  htonl(*(uint32_t *)info + 1);
966 		/* increment by 1 as clnt_vc_call() decrements once */
967 		break;
968 	case CLGET_VERS:
969 		/*
970 		 * This RELIES on the information that, in the call body,
971 		 * the version number field is the fifth field from the
972 		 * begining of the RPC header. MUST be changed if the
973 		 * call_struct is changed
974 		 */
975 /* LINTED pointer alignment */
976 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
977 						4 * BYTES_PER_XDR_UNIT));
978 		break;
979 
980 	case CLSET_VERS:
981 /* LINTED pointer alignment */
982 		*(uint32_t *)(ct->ct_mcall + 4 * BYTES_PER_XDR_UNIT) =
983 /* LINTED pointer alignment */
984 			htonl(*(uint32_t *)info);
985 		break;
986 
987 	case CLGET_PROG:
988 		/*
989 		 * This RELIES on the information that, in the call body,
990 		 * the program number field is the fourth field from the
991 		 * begining of the RPC header. MUST be changed if the
992 		 * call_struct is changed
993 		 */
994 /* LINTED pointer alignment */
995 		*(uint32_t *)info = ntohl(*(uint32_t *)(ct->ct_mcall +
996 						3 * BYTES_PER_XDR_UNIT));
997 		break;
998 
999 	case CLSET_PROG:
1000 /* LINTED pointer alignment */
1001 		*(uint32_t *)(ct->ct_mcall + 3 * BYTES_PER_XDR_UNIT) =
1002 /* LINTED pointer alignment */
1003 			htonl(*(uint32_t *)info);
1004 		break;
1005 
1006 	case CLSET_IO_MODE:
1007 		/* LINTED pointer cast */
1008 		if (!set_io_mode(ct, *(int *)info)) {
1009 		    rpc_fd_unlock(vctbl, ct->ct_fd);
1010 		    return (FALSE);
1011 		}
1012 		break;
1013 	case CLSET_FLUSH_MODE:
1014 		/* Set a specific FLUSH_MODE */
1015 		/* LINTED pointer cast */
1016 		if (!set_flush_mode(ct, *(int *)info)) {
1017 		    rpc_fd_unlock(vctbl, ct->ct_fd);
1018 		    return (FALSE);
1019 		}
1020 		break;
1021 	case CLGET_FLUSH_MODE:
1022 		/* LINTED pointer cast */
1023 		*(rpcflushmode_t *)info = ct->ct_blocking_mode;
1024 		break;
1025 
1026 	case CLGET_IO_MODE:
1027 		/* LINTED pointer cast */
1028 		*(rpciomode_t *)info = ct->ct_io_mode;
1029 		break;
1030 
1031 	case CLGET_CURRENT_REC_SIZE:
1032 		/*
1033 		 * Returns the current amount of memory allocated
1034 		 * to pending requests
1035 		 */
1036 		/* LINTED pointer cast */
1037 		*(int *)info = ct->ct_bufferPendingSize;
1038 		break;
1039 
1040 	case CLSET_CONNMAXREC_SIZE:
1041 		/* Cannot resize the buffer if it is used. */
1042 		if (ct->ct_bufferPendingSize != 0) {
1043 			rpc_fd_unlock(vctbl, ct->ct_fd);
1044 			return (FALSE);
1045 		}
1046 		/*
1047 		 * If the new size is equal to the current size,
1048 		 * there is nothing to do.
1049 		 */
1050 		/* LINTED pointer cast */
1051 		if (ct->ct_bufferSize == *(uint_t *)info)
1052 			break;
1053 
1054 		/* LINTED pointer cast */
1055 		ct->ct_bufferSize = *(uint_t *)info;
1056 		if (ct->ct_buffer) {
1057 			free(ct->ct_buffer);
1058 			ct->ct_buffer = NULL;
1059 			ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = NULL;
1060 		}
1061 		break;
1062 
1063 	case CLGET_CONNMAXREC_SIZE:
1064 		/*
1065 		 * Returns the size of buffer allocated
1066 		 * to pending requests
1067 		 */
1068 		/* LINTED pointer cast */
1069 		*(uint_t *)info = ct->ct_bufferSize;
1070 		break;
1071 
1072 	default:
1073 		rpc_fd_unlock(vctbl, ct->ct_fd);
1074 		return (FALSE);
1075 	}
1076 	rpc_fd_unlock(vctbl, ct->ct_fd);
1077 	return (TRUE);
1078 }
1079 
1080 static void
1081 clnt_vc_destroy(CLIENT *cl)
1082 {
1083 /* LINTED pointer alignment */
1084 	struct ct_data *ct = (struct ct_data *)cl->cl_private;
1085 	int ct_fd = ct->ct_fd;
1086 
1087 	(void) rpc_fd_lock(vctbl, ct_fd);
1088 
1089 	if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1090 		(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1091 		(void) unregister_nb(ct);
1092 	}
1093 
1094 	if (ct->ct_closeit)
1095 		(void) t_close(ct_fd);
1096 	XDR_DESTROY(&(ct->ct_xdrs));
1097 	if (ct->ct_addr.buf)
1098 		free(ct->ct_addr.buf);
1099 	free(ct);
1100 	if (cl->cl_netid && cl->cl_netid[0])
1101 		free(cl->cl_netid);
1102 	if (cl->cl_tp && cl->cl_tp[0])
1103 		free(cl->cl_tp);
1104 	free(cl);
1105 	rpc_fd_unlock(vctbl, ct_fd);
1106 }
1107 
1108 /*
1109  * Interface between xdr serializer and vc connection.
1110  * Behaves like the system calls, read & write, but keeps some error state
1111  * around for the rpc level.
1112  */
1113 static int
1114 read_vc(void *ct_tmp, caddr_t buf, int len)
1115 {
1116 	static pthread_key_t pfdp_key;
1117 	struct pollfd *pfdp;
1118 	int npfd;		/* total number of pfdp allocated */
1119 	struct ct_data *ct = ct_tmp;
1120 	struct timeval starttime;
1121 	struct timeval curtime;
1122 	int poll_time;
1123 	int delta;
1124 
1125 	if (len == 0)
1126 		return (0);
1127 
1128 	/*
1129 	 * Allocate just one the first time.  thr_get_storage() may
1130 	 * return a larger buffer, left over from the last time we were
1131 	 * here, but that's OK.  realloc() will deal with it properly.
1132 	 */
1133 	npfd = 1;
1134 	pfdp = thr_get_storage(&pfdp_key, sizeof (struct pollfd), free);
1135 	if (pfdp == NULL) {
1136 		(void) syslog(LOG_ERR, clnt_vc_errstr,
1137 			clnt_read_vc_str, __no_mem_str);
1138 		rpc_callerr.re_status = RPC_SYSTEMERROR;
1139 		rpc_callerr.re_errno = errno;
1140 		rpc_callerr.re_terrno = 0;
1141 		return (-1);
1142 	}
1143 
1144 	/*
1145 	 *	N.B.:  slot 0 in the pollfd array is reserved for the file
1146 	 *	descriptor we're really interested in (as opposed to the
1147 	 *	callback descriptors).
1148 	 */
1149 	pfdp[0].fd = ct->ct_fd;
1150 	pfdp[0].events = MASKVAL;
1151 	pfdp[0].revents = 0;
1152 	poll_time = ct->ct_wait;
1153 	if (gettimeofday(&starttime, NULL) == -1) {
1154 		syslog(LOG_ERR, "Unable to get time of day: %m");
1155 		return (-1);
1156 	}
1157 
1158 	for (;;) {
1159 		extern void (*_svc_getreqset_proc)();
1160 		extern pollfd_t *svc_pollfd;
1161 		extern int svc_max_pollfd;
1162 		int fds;
1163 
1164 		/* VARIABLES PROTECTED BY svc_fd_lock: svc_pollfd */
1165 
1166 		if (_svc_getreqset_proc) {
1167 			sig_rw_rdlock(&svc_fd_lock);
1168 
1169 			/* reallocate pfdp to svc_max_pollfd +1 */
1170 			if (npfd != (svc_max_pollfd + 1)) {
1171 				struct pollfd *tmp_pfdp = realloc(pfdp,
1172 						sizeof (struct pollfd) *
1173 						(svc_max_pollfd + 1));
1174 				if (tmp_pfdp == NULL) {
1175 					sig_rw_unlock(&svc_fd_lock);
1176 					(void) syslog(LOG_ERR, clnt_vc_errstr,
1177 						clnt_read_vc_str, __no_mem_str);
1178 					rpc_callerr.re_status = RPC_SYSTEMERROR;
1179 					rpc_callerr.re_errno = errno;
1180 					rpc_callerr.re_terrno = 0;
1181 					return (-1);
1182 				}
1183 
1184 				pfdp = tmp_pfdp;
1185 				npfd = svc_max_pollfd + 1;
1186 				(void) pthread_setspecific(pfdp_key, pfdp);
1187 			}
1188 			if (npfd > 1)
1189 				(void) memcpy(&pfdp[1], svc_pollfd,
1190 				    sizeof (struct pollfd) * (npfd - 1));
1191 
1192 			sig_rw_unlock(&svc_fd_lock);
1193 		} else {
1194 			npfd = 1;	/* don't forget about pfdp[0] */
1195 		}
1196 
1197 		switch (fds = poll(pfdp, npfd, poll_time)) {
1198 		case 0:
1199 			rpc_callerr.re_status = RPC_TIMEDOUT;
1200 			return (-1);
1201 
1202 		case -1:
1203 			if (errno != EINTR)
1204 				continue;
1205 			else {
1206 				/*
1207 				 * interrupted by another signal,
1208 				 * update time_waited
1209 				 */
1210 
1211 				if (gettimeofday(&curtime, NULL) == -1) {
1212 					syslog(LOG_ERR,
1213 					    "Unable to get time of day:  %m");
1214 					errno = 0;
1215 					continue;
1216 				};
1217 				delta = (curtime.tv_sec -
1218 						starttime.tv_sec) * 1000 +
1219 					(curtime.tv_usec -
1220 						starttime.tv_usec) / 1000;
1221 				poll_time -= delta;
1222 				if (poll_time < 0) {
1223 					rpc_callerr.re_status =
1224 						RPC_TIMEDOUT;
1225 					errno = 0;
1226 					return (-1);
1227 				} else {
1228 					errno = 0; /* reset it */
1229 					continue;
1230 				}
1231 			}
1232 		}
1233 
1234 		if (pfdp[0].revents == 0) {
1235 			/* must be for server side of the house */
1236 			(*_svc_getreqset_proc)(&pfdp[1], fds);
1237 			continue;	/* do poll again */
1238 		}
1239 
1240 		if (pfdp[0].revents & POLLNVAL) {
1241 			rpc_callerr.re_status = RPC_CANTRECV;
1242 			/*
1243 			 *	Note:  we're faking errno here because we
1244 			 *	previously would have expected select() to
1245 			 *	return -1 with errno EBADF.  Poll(BA_OS)
1246 			 *	returns 0 and sets the POLLNVAL revents flag
1247 			 *	instead.
1248 			 */
1249 			rpc_callerr.re_errno = errno = EBADF;
1250 			return (-1);
1251 		}
1252 
1253 		if (pfdp[0].revents & (POLLERR | POLLHUP)) {
1254 			rpc_callerr.re_status = RPC_CANTRECV;
1255 			rpc_callerr.re_errno = errno = EPIPE;
1256 			return (-1);
1257 		}
1258 		break;
1259 	}
1260 
1261 	switch (len = t_rcvall(ct->ct_fd, buf, len)) {
1262 	case 0:
1263 		/* premature eof */
1264 		rpc_callerr.re_errno = ENOLINK;
1265 		rpc_callerr.re_terrno = 0;
1266 		rpc_callerr.re_status = RPC_CANTRECV;
1267 		len = -1;	/* it's really an error */
1268 		break;
1269 
1270 	case -1:
1271 		rpc_callerr.re_terrno = t_errno;
1272 		rpc_callerr.re_errno = 0;
1273 		rpc_callerr.re_status = RPC_CANTRECV;
1274 		break;
1275 	}
1276 	return (len);
1277 }
1278 
1279 static int
1280 write_vc(void *ct_tmp, caddr_t buf, int len)
1281 {
1282 	int i, cnt;
1283 	struct ct_data *ct = ct_tmp;
1284 	int flag;
1285 	int maxsz;
1286 
1287 	maxsz = ct->ct_tsdu;
1288 
1289 	/* Handle the non-blocking mode */
1290 	if (ct->ct_is_oneway && ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1291 		/*
1292 		 * Test a special case here. If the length of the current
1293 		 * write is greater than the transport data unit, and the
1294 		 * mode is non blocking, we return RPC_CANTSEND.
1295 		 * XXX  this is not very clean.
1296 		 */
1297 		if (maxsz > 0 && len > maxsz) {
1298 			rpc_callerr.re_terrno = errno;
1299 			rpc_callerr.re_errno = 0;
1300 			rpc_callerr.re_status = RPC_CANTSEND;
1301 			return (-1);
1302 		}
1303 
1304 		len = nb_send(ct, buf, (unsigned)len);
1305 		if (len == -1) {
1306 			rpc_callerr.re_terrno = errno;
1307 			rpc_callerr.re_errno = 0;
1308 			rpc_callerr.re_status = RPC_CANTSEND;
1309 		} else if (len == -2) {
1310 			rpc_callerr.re_terrno = 0;
1311 			rpc_callerr.re_errno = 0;
1312 			rpc_callerr.re_status = RPC_CANTSTORE;
1313 		}
1314 		return (len);
1315 	}
1316 
1317 	if ((maxsz == 0) || (maxsz == -1)) {
1318 		/*
1319 		 * T_snd may return -1 for error on connection (connection
1320 		 * needs to be repaired/closed, and -2 for flow-control
1321 		 * handling error (no operation to do, just wait and call
1322 		 * T_Flush()).
1323 		 */
1324 		if ((len = t_snd(ct->ct_fd, buf, (unsigned)len, 0)) == -1) {
1325 			rpc_callerr.re_terrno = t_errno;
1326 			rpc_callerr.re_errno = 0;
1327 			rpc_callerr.re_status = RPC_CANTSEND;
1328 		}
1329 		return (len);
1330 	}
1331 
1332 	/*
1333 	 * This for those transports which have a max size for data.
1334 	 */
1335 	for (cnt = len, i = 0; cnt > 0; cnt -= i, buf += i) {
1336 		flag = cnt > maxsz ? T_MORE : 0;
1337 		if ((i = t_snd(ct->ct_fd, buf, (unsigned)MIN(cnt, maxsz),
1338 				flag)) == -1) {
1339 			rpc_callerr.re_terrno = t_errno;
1340 			rpc_callerr.re_errno = 0;
1341 			rpc_callerr.re_status = RPC_CANTSEND;
1342 			return (-1);
1343 		}
1344 	}
1345 	return (len);
1346 }
1347 
1348 /*
1349  * Receive the required bytes of data, even if it is fragmented.
1350  */
1351 static int
1352 t_rcvall(int fd, char *buf, int len)
1353 {
1354 	int moreflag;
1355 	int final = 0;
1356 	int res;
1357 
1358 	do {
1359 		moreflag = 0;
1360 		res = t_rcv(fd, buf, (unsigned)len, &moreflag);
1361 		if (res == -1) {
1362 			if (t_errno == TLOOK)
1363 				switch (t_look(fd)) {
1364 				case T_DISCONNECT:
1365 					(void) t_rcvdis(fd, NULL);
1366 					(void) t_snddis(fd, NULL);
1367 					return (-1);
1368 				case T_ORDREL:
1369 				/* Received orderly release indication */
1370 					(void) t_rcvrel(fd);
1371 				/* Send orderly release indicator */
1372 					(void) t_sndrel(fd);
1373 					return (-1);
1374 				default:
1375 					return (-1);
1376 				}
1377 		} else if (res == 0) {
1378 			return (0);
1379 		}
1380 		final += res;
1381 		buf += res;
1382 		len -= res;
1383 	} while ((len > 0) && (moreflag & T_MORE));
1384 	return (final);
1385 }
1386 
1387 static struct clnt_ops *
1388 clnt_vc_ops(void)
1389 {
1390 	static struct clnt_ops ops;
1391 	extern mutex_t	ops_lock;
1392 
1393 	/* VARIABLES PROTECTED BY ops_lock: ops */
1394 
1395 	sig_mutex_lock(&ops_lock);
1396 	if (ops.cl_call == NULL) {
1397 		ops.cl_call = clnt_vc_call;
1398 		ops.cl_send = clnt_vc_send;
1399 		ops.cl_abort = clnt_vc_abort;
1400 		ops.cl_geterr = clnt_vc_geterr;
1401 		ops.cl_freeres = clnt_vc_freeres;
1402 		ops.cl_destroy = clnt_vc_destroy;
1403 		ops.cl_control = clnt_vc_control;
1404 	}
1405 	sig_mutex_unlock(&ops_lock);
1406 	return (&ops);
1407 }
1408 
1409 /*
1410  * Make sure that the time is not garbage.   -1 value is disallowed.
1411  * Note this is different from time_not_ok in clnt_dg.c
1412  */
1413 static bool_t
1414 time_not_ok(struct timeval *t)
1415 {
1416 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1417 		t->tv_usec <= -1 || t->tv_usec > 1000000);
1418 }
1419 
1420 
1421 /* Compute the # of bytes that remains until the end of the buffer */
1422 #define	REMAIN_BYTES(p) (ct->ct_bufferSize-(ct->ct_##p - ct->ct_buffer))
1423 
1424 static int
1425 addInBuffer(struct ct_data *ct, char *dataToAdd, unsigned int nBytes)
1426 {
1427 	if (NULL == ct->ct_buffer) {
1428 		/* Buffer not allocated yet. */
1429 		char *buffer;
1430 
1431 		buffer = malloc(ct->ct_bufferSize);
1432 		if (NULL == buffer) {
1433 			errno = ENOMEM;
1434 			return (-1);
1435 		}
1436 		(void) memcpy(buffer, dataToAdd, nBytes);
1437 
1438 		ct->ct_buffer = buffer;
1439 		ct->ct_bufferReadPtr = buffer;
1440 		ct->ct_bufferWritePtr = buffer + nBytes;
1441 		ct->ct_bufferPendingSize = nBytes;
1442 	} else {
1443 		/*
1444 		 * For an already allocated buffer, two mem copies
1445 		 * might be needed, depending on the current
1446 		 * writing position.
1447 		 */
1448 
1449 		/* Compute the length of the first copy. */
1450 		int len = MIN(nBytes, REMAIN_BYTES(bufferWritePtr));
1451 
1452 		ct->ct_bufferPendingSize += nBytes;
1453 
1454 		(void) memcpy(ct->ct_bufferWritePtr, dataToAdd, len);
1455 		ct->ct_bufferWritePtr += len;
1456 		nBytes -= len;
1457 		if (0 == nBytes) {
1458 			/* One memcopy needed. */
1459 
1460 			/*
1461 			 * If the write pointer is at the end of the buffer,
1462 			 * wrap it now.
1463 			 */
1464 			if (ct->ct_bufferWritePtr ==
1465 			    (ct->ct_buffer + ct->ct_bufferSize)) {
1466 				ct->ct_bufferWritePtr = ct->ct_buffer;
1467 			}
1468 		} else {
1469 			/* Two memcopy needed. */
1470 			dataToAdd += len;
1471 
1472 			/*
1473 			 * Copy the remaining data to the beginning of the
1474 			 * buffer
1475 			 */
1476 			(void) memcpy(ct->ct_buffer, dataToAdd, nBytes);
1477 			ct->ct_bufferWritePtr = ct->ct_buffer + nBytes;
1478 		}
1479 	}
1480 	return (0);
1481 }
1482 
1483 static void
1484 consumeFromBuffer(struct ct_data *ct, unsigned int nBytes)
1485 {
1486 	ct->ct_bufferPendingSize -= nBytes;
1487 	if (ct->ct_bufferPendingSize == 0) {
1488 		/*
1489 		 * If the buffer contains no data, we set the two pointers at
1490 		 * the beginning of the buffer (to miminize buffer wraps).
1491 		 */
1492 		ct->ct_bufferReadPtr = ct->ct_bufferWritePtr = ct->ct_buffer;
1493 	} else {
1494 		ct->ct_bufferReadPtr += nBytes;
1495 		if (ct->ct_bufferReadPtr >
1496 		    ct->ct_buffer + ct->ct_bufferSize) {
1497 			ct->ct_bufferReadPtr -= ct->ct_bufferSize;
1498 		}
1499 	}
1500 }
1501 
1502 static int
1503 iovFromBuffer(struct ct_data *ct, struct iovec *iov)
1504 {
1505 	int l;
1506 
1507 	if (ct->ct_bufferPendingSize == 0)
1508 		return (0);
1509 
1510 	l = REMAIN_BYTES(bufferReadPtr);
1511 	if (l < ct->ct_bufferPendingSize) {
1512 		/* Buffer in two fragments. */
1513 		iov[0].iov_base = ct->ct_bufferReadPtr;
1514 		iov[0].iov_len  = l;
1515 
1516 		iov[1].iov_base = ct->ct_buffer;
1517 		iov[1].iov_len  = ct->ct_bufferPendingSize - l;
1518 		return (2);
1519 	} else {
1520 		/* Buffer in one fragment. */
1521 		iov[0].iov_base = ct->ct_bufferReadPtr;
1522 		iov[0].iov_len  = ct->ct_bufferPendingSize;
1523 		return (1);
1524 	}
1525 }
1526 
1527 static bool_t
1528 set_flush_mode(struct ct_data *ct, int mode)
1529 {
1530 	switch (mode) {
1531 	case RPC_CL_BLOCKING_FLUSH:
1532 		/* flush as most as possible without blocking */
1533 	case RPC_CL_BESTEFFORT_FLUSH:
1534 		/* flush the buffer completely (possibly blocking) */
1535 	case RPC_CL_DEFAULT_FLUSH:
1536 		/* flush according to the currently defined policy */
1537 		ct->ct_blocking_mode = mode;
1538 		return (TRUE);
1539 	default:
1540 		return (FALSE);
1541 	}
1542 }
1543 
1544 static bool_t
1545 set_io_mode(struct ct_data *ct, int ioMode)
1546 {
1547 	switch (ioMode) {
1548 	case RPC_CL_BLOCKING:
1549 		if (ct->ct_io_mode == RPC_CL_NONBLOCKING) {
1550 			if (NULL != ct->ct_buffer) {
1551 				/*
1552 				 * If a buffer was allocated for this
1553 				 * connection, flush it now, and free it.
1554 				 */
1555 				(void) do_flush(ct, RPC_CL_BLOCKING_FLUSH);
1556 				free(ct->ct_buffer);
1557 				ct->ct_buffer = NULL;
1558 			}
1559 			(void) unregister_nb(ct);
1560 			ct->ct_io_mode = ioMode;
1561 		}
1562 		break;
1563 	case RPC_CL_NONBLOCKING:
1564 		if (ct->ct_io_mode == RPC_CL_BLOCKING) {
1565 			if (-1 == register_nb(ct)) {
1566 				return (FALSE);
1567 			}
1568 			ct->ct_io_mode = ioMode;
1569 		}
1570 		break;
1571 	default:
1572 		return (FALSE);
1573 	}
1574 	return (TRUE);
1575 }
1576 
1577 static int
1578 do_flush(struct ct_data *ct, uint_t flush_mode)
1579 {
1580 	int result;
1581 	if (ct->ct_bufferPendingSize == 0) {
1582 		return (0);
1583 	}
1584 
1585 	switch (flush_mode) {
1586 	case RPC_CL_BLOCKING_FLUSH:
1587 		if (!set_blocking_connection(ct, TRUE)) {
1588 			return (-1);
1589 		}
1590 		while (ct->ct_bufferPendingSize > 0) {
1591 			if (REMAIN_BYTES(bufferReadPtr) <
1592 			    ct->ct_bufferPendingSize) {
1593 				struct iovec iov[2];
1594 				(void) iovFromBuffer(ct, iov);
1595 				result = writev(ct->ct_fd, iov, 2);
1596 			} else {
1597 				result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1598 				    ct->ct_bufferPendingSize, 0);
1599 			}
1600 			if (result < 0) {
1601 				return (-1);
1602 			}
1603 			consumeFromBuffer(ct, result);
1604 		}
1605 
1606 		break;
1607 
1608 	case RPC_CL_BESTEFFORT_FLUSH:
1609 		(void) set_blocking_connection(ct, FALSE);
1610 		if (REMAIN_BYTES(bufferReadPtr) < ct->ct_bufferPendingSize) {
1611 			struct iovec iov[2];
1612 			(void) iovFromBuffer(ct, iov);
1613 			result = writev(ct->ct_fd, iov, 2);
1614 		} else {
1615 			result = t_snd(ct->ct_fd, ct->ct_bufferReadPtr,
1616 			    ct->ct_bufferPendingSize, 0);
1617 		}
1618 		if (result < 0) {
1619 			if (errno != EWOULDBLOCK) {
1620 				perror("flush");
1621 				return (-1);
1622 			}
1623 			return (0);
1624 		}
1625 		if (result > 0)
1626 			consumeFromBuffer(ct, result);
1627 		break;
1628 	}
1629 	return (0);
1630 }
1631 
1632 /*
1633  * Non blocking send.
1634  */
1635 
1636 static int
1637 nb_send(struct ct_data *ct, void *buff, unsigned int nBytes)
1638 {
1639 	int result;
1640 
1641 	if (!(ntohl(*(uint32_t *)buff) & 2^31)) {
1642 		return (-1);
1643 	}
1644 
1645 	/*
1646 	 * Check to see if the current message can be stored fully in the
1647 	 * buffer. We have to check this now because it may be impossible
1648 	 * to send any data, so the message must be stored in the buffer.
1649 	 */
1650 	if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize)) {
1651 		/* Try to flush  (to free some space). */
1652 		(void) do_flush(ct, RPC_CL_BESTEFFORT_FLUSH);
1653 
1654 		/* Can we store the message now ? */
1655 		if (nBytes > (ct->ct_bufferSize - ct->ct_bufferPendingSize))
1656 			return (-2);
1657 	}
1658 
1659 	(void) set_blocking_connection(ct, FALSE);
1660 
1661 	/*
1662 	 * If there is no data pending, we can simply try
1663 	 * to send our data.
1664 	 */
1665 	if (ct->ct_bufferPendingSize == 0) {
1666 		result = t_snd(ct->ct_fd, buff, nBytes, 0);
1667 		if (result == -1) {
1668 			if (errno == EWOULDBLOCK) {
1669 				result = 0;
1670 			} else {
1671 				perror("send");
1672 				return (-1);
1673 			}
1674 		}
1675 		/*
1676 		 * If we have not sent all data, we must store them
1677 		 * in the buffer.
1678 		 */
1679 		if (result != nBytes) {
1680 			if (addInBuffer(ct, (char *)buff + result,
1681 			    nBytes - result) == -1) {
1682 				return (-1);
1683 			}
1684 		}
1685 	} else {
1686 		/*
1687 		 * Some data pending in the buffer.  We try to send
1688 		 * both buffer data and current message in one shot.
1689 		 */
1690 		struct iovec iov[3];
1691 		int i = iovFromBuffer(ct, &iov[0]);
1692 
1693 		iov[i].iov_base = buff;
1694 		iov[i].iov_len  = nBytes;
1695 
1696 		result = writev(ct->ct_fd, iov, i+1);
1697 		if (result == -1) {
1698 			if (errno == EWOULDBLOCK) {
1699 				/* No bytes sent */
1700 				result = 0;
1701 			} else {
1702 				return (-1);
1703 			}
1704 		}
1705 
1706 		/*
1707 		 * Add the bytes from the message
1708 		 * that we have not sent.
1709 		 */
1710 		if (result <= ct->ct_bufferPendingSize) {
1711 			/* No bytes from the message sent */
1712 			consumeFromBuffer(ct, result);
1713 			if (addInBuffer(ct, buff, nBytes) == -1) {
1714 				return (-1);
1715 			}
1716 		} else {
1717 			/*
1718 			 * Some bytes of the message are sent.
1719 			 * Compute the length of the message that has
1720 			 * been sent.
1721 			 */
1722 			int len = result - ct->ct_bufferPendingSize;
1723 
1724 			/* So, empty the buffer. */
1725 			ct->ct_bufferReadPtr = ct->ct_buffer;
1726 			ct->ct_bufferWritePtr = ct->ct_buffer;
1727 			ct->ct_bufferPendingSize = 0;
1728 
1729 			/* And add the remaining part of the message. */
1730 			if (len != nBytes) {
1731 				if (addInBuffer(ct, (char *)buff + len,
1732 					nBytes-len) == -1) {
1733 					return (-1);
1734 				}
1735 			}
1736 		}
1737 	}
1738 	return (nBytes);
1739 }
1740 
1741 static void
1742 flush_registered_clients(void)
1743 {
1744 	struct nb_reg_node *node;
1745 
1746 	if (LIST_ISEMPTY(nb_first)) {
1747 		return;
1748 	}
1749 
1750 	LIST_FOR_EACH(nb_first, node) {
1751 		(void) do_flush(node->ct, RPC_CL_BLOCKING_FLUSH);
1752 	}
1753 }
1754 
1755 static int
1756 allocate_chunk(void)
1757 {
1758 #define	CHUNK_SIZE 16
1759 	struct nb_reg_node *chk =
1760 	    malloc(sizeof (struct nb_reg_node) * CHUNK_SIZE);
1761 	struct nb_reg_node *n;
1762 	int i;
1763 
1764 	if (NULL == chk) {
1765 		return (-1);
1766 	}
1767 
1768 	n = chk;
1769 	for (i = 0; i < CHUNK_SIZE-1; ++i) {
1770 		n[i].next = &(n[i+1]);
1771 	}
1772 	n[CHUNK_SIZE-1].next = (struct nb_reg_node *)&nb_free;
1773 	nb_free = chk;
1774 	return (0);
1775 }
1776 
1777 static int
1778 register_nb(struct ct_data *ct)
1779 {
1780 	struct nb_reg_node *node;
1781 
1782 	(void) mutex_lock(&nb_list_mutex);
1783 
1784 	if (LIST_ISEMPTY(nb_free) && (allocate_chunk() == -1)) {
1785 		(void) mutex_unlock(&nb_list_mutex);
1786 		errno = ENOMEM;
1787 		return (-1);
1788 	}
1789 
1790 	if (!exit_handler_set) {
1791 		(void) atexit(flush_registered_clients);
1792 		exit_handler_set = TRUE;
1793 	}
1794 	/* Get the first free node */
1795 	LIST_EXTRACT(nb_free, node);
1796 
1797 	node->ct = ct;
1798 
1799 	LIST_ADD(nb_first, node);
1800 	(void) mutex_unlock(&nb_list_mutex);
1801 
1802 	return (0);
1803 }
1804 
1805 static int
1806 unregister_nb(struct ct_data *ct)
1807 {
1808 	struct nb_reg_node *node;
1809 
1810 	(void) mutex_lock(&nb_list_mutex);
1811 	assert(!LIST_ISEMPTY(nb_first));
1812 
1813 	node = nb_first;
1814 	LIST_FOR_EACH(nb_first, node) {
1815 		if (node->next->ct == ct) {
1816 			/* Get the node to unregister. */
1817 			struct nb_reg_node *n = node->next;
1818 			node->next = n->next;
1819 
1820 			n->ct = NULL;
1821 			LIST_ADD(nb_free, n);
1822 			break;
1823 		}
1824 	}
1825 	(void) mutex_unlock(&nb_list_mutex);
1826 	return (0);
1827 }
1828