xref: /reactos/dll/3rdparty/libtirpc/src/clnt_vc.c (revision 40462c92)
1 /*
2  * Copyright (c) 2009, Sun Microsystems, Inc.
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions are met:
7  * - Redistributions of source code must retain the above copyright notice,
8  *   this list of conditions and the following disclaimer.
9  * - Redistributions in binary form must reproduce the above copyright notice,
10  *   this list of conditions and the following disclaimer in the documentation
11  *   and/or other materials provided with the distribution.
12  * - Neither the name of Sun Microsystems, Inc. nor the names of its
13  *   contributors may be used to endorse or promote products derived
14  *   from this software without specific prior written permission.
15  *
16  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
20  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26  * POSSIBILITY OF SUCH DAMAGE.
27  */
28 
29 /*
30  * clnt_tcp.c, Implements a TCP/IP based, client side RPC.
31  *
32  * Copyright (C) 1984, Sun Microsystems, Inc.
33  *
34  * TCP based RPC supports 'batched calls'.
35  * A sequence of calls may be batched-up in a send buffer.  The rpc call
36  * return immediately to the client even though the call was not necessarily
37  * sent.  The batching occurs if the results' xdr routine is NULL (0) AND
38  * the rpc timeout value is zero (see clnt.h, rpc).
39  *
40  * Clients should NOT casually batch calls that in fact return results; that is,
41  * the server side should be aware that a call is batched and not produce any
42  * return message.  Batched calls that produce many result messages can
43  * deadlock (netlock) the client and the server....
44  *
45  * Now go hang yourself.
46  */
47 
48 /* NFSv4.1 client for Windows
49  * Copyright � 2012 The Regents of the University of Michigan
50  *
51  * Olga Kornievskaia <aglo@umich.edu>
52  * Casey Bodley <cbodley@umich.edu>
53  *
54  * This library is free software; you can redistribute it and/or modify it
55  * under the terms of the GNU Lesser General Public License as published by
56  * the Free Software Foundation; either version 2.1 of the License, or (at
57  * your option) any later version.
58  *
59  * This library is distributed in the hope that it will be useful, but
60  * without any warranty; without even the implied warranty of merchantability
61  * or fitness for a particular purpose.  See the GNU Lesser General Public
62  * License for more details.
63  *
64  * You should have received a copy of the GNU Lesser General Public License
65  * along with this library; if not, write to the Free Software Foundation,
66  * Inc., 51 Franklin Street, Fifth Floor, Boston, MA
67  */
68 
69 #include <wintirpc.h>
70 //#include <pthread.h>
71 
72 #include <reentrant.h>
73 #include <sys/types.h>
74 //#include <sys/poll.h>
75 //#include <sys/syslog.h>
76 //#include <sys/un.h>
77 //#include <sys/uio.h>
78 //#include <sys/socket.h>
79 //#include <arpa/inet.h>
80 #include <assert.h>
81 //#include <err.h>
82 #include <errno.h>
83 //#include <netdb.h>
84 #include <stdio.h>
85 #include <stdlib.h>
86 #include <string.h>
87 //#include <unistd.h>
88 //#include <signal.h>
89 #include <time.h>
90 
91 #include <rpc/rpc.h>
92 #include "rpc_com.h"
93 
94 #define MCALL_MSG_SIZE 24
95 
96 #define CMGROUP_MAX    16
97 #define SCM_CREDS      0x03            /* process creds (struct cmsgcred) */
98 
99 /*
100  * Credentials structure, used to verify the identity of a peer
101  * process that has sent us a message. This is allocated by the
102  * peer process but filled in by the kernel. This prevents the
103  * peer from lying about its identity. (Note that cmcred_groups[0]
104  * is the effective GID.)
105  */
106 struct cmsgcred {
107         pid_t   cmcred_pid;             /* PID of sending process */
108         uid_t   cmcred_uid;             /* real UID of sending process */
109         uid_t   cmcred_euid;            /* effective UID of sending process */
110         gid_t   cmcred_gid;             /* real GID of sending process */
111         short   cmcred_ngroups;         /* number or groups */
112         gid_t   cmcred_groups[CMGROUP_MAX];     /* groups */
113 };
114 
115 struct cmessage {
116         struct cmsghdr cmsg;
117         struct cmsgcred cmcred;
118 };
119 
120 static enum clnt_stat clnt_vc_call(CLIENT *, rpcproc_t, xdrproc_t, void *,
121     xdrproc_t, void *, struct timeval);
122 static void clnt_vc_geterr(CLIENT *, struct rpc_err *);
123 static bool_t clnt_vc_freeres(CLIENT *, xdrproc_t, void *);
124 static void clnt_vc_abort(CLIENT *);
125 static bool_t clnt_vc_control(CLIENT *, u_int, void *);
126 static void clnt_vc_destroy(CLIENT *);
127 static struct clnt_ops *clnt_vc_ops(void);
128 static bool_t time_not_ok(struct timeval *);
129 static int read_vc(void *, void *, int);
130 static int write_vc(void *, void *, int);
131 
132 struct ct_data {
133 	int		ct_fd;		/* connection's fd */
134 	bool_t		ct_closeit;	/* close it on destroy */
135 	struct timeval	ct_wait;	/* wait interval in milliseconds */
136 	bool_t          ct_waitset;	/* wait set by clnt_control? */
137 	struct netbuf	ct_addr;	/* remote addr */
138 	struct rpc_err	ct_error;
139 	union {
140 		char	ct_mcallc[MCALL_MSG_SIZE];	/* marshalled callmsg */
141 		u_int32_t ct_mcalli;
142 	} ct_u;
143 	u_int		ct_mpos;	/* pos after marshal */
144 	XDR		ct_xdrs;	/* XDR stream */
145     struct rpc_msg reply_msg;
146     bool_t use_stored_reply_msg;
147 };
148 
149 /*
150  *      This machinery implements per-fd locks for MT-safety.  It is not
151  *      sufficient to do per-CLIENT handle locks for MT-safety because a
152  *      user may create more than one CLIENT handle with the same fd behind
153  *      it.  Therfore, we allocate an array of flags (vc_fd_locks), protected
154  *      by the clnt_fd_lock mutex, and an array (vc_cv) of condition variables
155  *      similarly protected.  Vc_fd_lock[fd] == 1 => a call is active on some
156  *      CLIENT handle created for that fd.
157  *      The current implementation holds locks across the entire RPC and reply.
158  *      Yes, this is silly, and as soon as this code is proven to work, this
159  *      should be the first thing fixed.  One step at a time.
160  */
161 static int      *vc_fd_locks;
162 extern mutex_t  clnt_fd_lock;
163 static cond_t   *vc_cv;
164 #ifndef _WIN32
165 #define release_fd_lock(fd, mask) {	\
166 	mutex_lock(&clnt_fd_lock);	\
167 	vc_fd_locks[fd] = 0;		\
168 	mutex_unlock(&clnt_fd_lock);	\
169 	thr_sigsetmask(SIG_SETMASK, &(mask), (sigset_t *) NULL);	\
170 	cond_signal(&vc_cv[fd]);	\
171 }
172 #else
173 /* XXX Need Windows signal/event stuff XXX */
174 #define release_fd_lock(fd, mask) {	\
175 	mutex_lock(&clnt_fd_lock);	\
176 	vc_fd_locks[WINSOCK_HANDLE_HASH(fd)] = 0;		\
177 	mutex_unlock(&clnt_fd_lock);	\
178 	\
179 	cond_broadcast(&vc_cv[WINSOCK_HANDLE_HASH(fd)]);	\
180 }
181 #endif
182 
183 #define acquire_fd_lock(fd) { \
184 	mutex_lock(&clnt_fd_lock); \
185 	while (vc_fd_locks[WINSOCK_HANDLE_HASH(fd)] && \
186             vc_fd_locks[WINSOCK_HANDLE_HASH(fd)] != GetCurrentThreadId()) \
187 		cond_wait(&vc_cv[WINSOCK_HANDLE_HASH(fd)], &clnt_fd_lock); \
188 	vc_fd_locks[WINSOCK_HANDLE_HASH(fd)] = GetCurrentThreadId(); \
189 	mutex_unlock(&clnt_fd_lock); \
190 }
191 
192 static const char clnt_vc_errstr[] = "%s : %s";
193 static const char clnt_vc_str[] = "clnt_vc_create";
194 static const char clnt_read_vc_str[] = "read_vc";
195 static const char __no_mem_str[] = "out of memory";
196 
197 /* callback thread */
198 #define CALLBACK_TIMEOUT 5000
199 #define	RQCRED_SIZE	400	/* this size is excessive */
200 static unsigned int WINAPI clnt_cb_thread(void *args)
201 {
202     int status = NO_ERROR;
203     CLIENT *cl = (CLIENT *)args;
204 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
205 	XDR *xdrs = &(ct->ct_xdrs);
206     long saved_timeout_sec = ct->ct_wait.tv_sec;
207     long saved_timeout_usec = ct->ct_wait.tv_usec;
208     struct rpc_msg reply_msg;
209     char cred_area[2 * MAX_AUTH_BYTES + RQCRED_SIZE];
210 
211     fprintf(stderr/*stdout*/, "%04x: Creating callback thread\n", GetCurrentThreadId());
212     while(1) {
213         cb_req header;
214         void *res = NULL;
215         mutex_lock(&clnt_fd_lock);
216 	    while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] ||
217                 !ct->use_stored_reply_msg ||
218                 (ct->use_stored_reply_msg && ct->reply_msg.rm_direction != CALL)) {
219             if (cl->shutdown)
220                 break;
221 		    if (!cond_wait_timed(&vc_cv[WINSOCK_HANDLE_HASH(ct->ct_fd)], &clnt_fd_lock,
222                 CALLBACK_TIMEOUT))
223                 if (!vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)])
224                     break;
225         }
226 	    vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] = GetCurrentThreadId();
227 	    mutex_unlock(&clnt_fd_lock);
228 
229         if (cl->shutdown) {
230             fprintf(stdout, "%04x: callback received shutdown signal\n", GetCurrentThreadId());
231             release_fd_lock(ct->ct_fd, mask);
232             goto out;
233         }
234 
235         saved_timeout_sec = ct->ct_wait.tv_sec;
236         saved_timeout_usec = ct->ct_wait.tv_usec;
237         xdrs->x_op = XDR_DECODE;
238         if (ct->use_stored_reply_msg && ct->reply_msg.rm_direction == CALL) {
239             goto process_rpc_call;
240         } else if (!ct->use_stored_reply_msg) {
241             ct->ct_wait.tv_sec = ct->ct_wait.tv_usec = 0;
242             __xdrrec_setnonblock(xdrs, 0);
243 		    if (!xdrrec_skiprecord(xdrs))
244                 goto skip_process;
245             if (!xdr_getxiddir(xdrs, &ct->reply_msg)) {
246                 goto skip_process;
247             }
248             if (ct->reply_msg.rm_direction == CALL) {
249                 goto process_rpc_call;
250             } else {
251                 if (ct->reply_msg.rm_direction == REPLY)
252                     ct->use_stored_reply_msg = TRUE;
253                 goto skip_setlastfrag;
254             }
255         } else {
256             goto skip_setlastfrag;
257         }
258 process_rpc_call:
259         //call to get call headers
260         ct->use_stored_reply_msg = FALSE;
261         ct->reply_msg.rm_call.cb_cred.oa_base = cred_area;
262         ct->reply_msg.rm_call.cb_verf.oa_base = &(cred_area[MAX_AUTH_BYTES]);
263         if (!xdr_getcallbody(xdrs, &ct->reply_msg)) {
264             fprintf(stderr, "%04x: xdr_getcallbody failed\n", GetCurrentThreadId());
265             goto skip_process;
266         } else
267             fprintf(stdout, "%04x: callbody: rpcvers %d cb_prog %d cb_vers %d cb_proc %d\n",
268                 GetCurrentThreadId(),
269                 ct->reply_msg.rm_call.cb_rpcvers, ct->reply_msg.rm_call.cb_prog,
270                 ct->reply_msg.rm_call.cb_vers, ct->reply_msg.rm_call.cb_proc);
271         header.rq_prog = ct->reply_msg.rm_call.cb_prog;
272         header.rq_vers = ct->reply_msg.rm_call.cb_vers;
273         header.rq_proc = ct->reply_msg.rm_call.cb_proc;
274         header.xdr = xdrs;
275         status = (*cl->cb_fn)(cl->cb_args, &header, &res);
276         if (status) {
277             fprintf(stderr, "%04x: callback function failed with %d\n", status);
278         }
279 
280         xdrs->x_op = XDR_ENCODE;
281         __xdrrec_setblock(xdrs);
282         reply_msg.rm_xid = ct->reply_msg.rm_xid;
283         fprintf(stdout, "%04x: cb: replying to xid %d\n", GetCurrentThreadId(),
284             ct->reply_msg.rm_xid);
285         ct->reply_msg.rm_xid = 0;
286         reply_msg.rm_direction = REPLY;
287         reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
288         reply_msg.acpted_rply.ar_verf = _null_auth;
289         reply_msg.acpted_rply.ar_stat = status;
290         reply_msg.acpted_rply.ar_results.where = NULL;
291         reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
292         xdr_replymsg(xdrs, &reply_msg);
293         if (!status) {
294             (*cl->cb_xdr)(xdrs, res); /* encode the results */
295             xdrs->x_op = XDR_FREE;
296             (*cl->cb_xdr)(xdrs, res); /* free the results */
297         }
298         if (! xdrrec_endofrecord(xdrs, 1)) {
299             fprintf(stderr, "%04x: failed to send REPLY\n", GetCurrentThreadId());
300         }
301 skip_process:
302         ct->reply_msg.rm_direction = -1;
303         xdrrec_setlastfrag(xdrs);
304 skip_setlastfrag:
305         ct->ct_wait.tv_sec = saved_timeout_sec;
306         ct->ct_wait.tv_usec = saved_timeout_usec;
307         release_fd_lock(ct->ct_fd, mask);
308     }
309 out:
310     return status;
311 }
312 /*
313  * Create a client handle for a connection.
314  * Default options are set, which the user can change using clnt_control()'s.
315  * The rpc/vc package does buffering similar to stdio, so the client
316  * must pick send and receive buffer sizes, 0 => use the default.
317  * NB: fd is copied into a private area.
318  * NB: The rpch->cl_auth is set null authentication. Caller may wish to
319  * set this something more useful.
320  *
321  * fd should be an open socket
322  */
323 CLIENT *
324 clnt_vc_create(fd, raddr, prog, vers, sendsz, recvsz, cb_xdr, cb_fn, cb_args)
325 	int fd;				/* open file descriptor */
326 	const struct netbuf *raddr;	/* servers address */
327 	const rpcprog_t prog;			/* program number */
328 	const rpcvers_t vers;			/* version number */
329 	u_int sendsz;			/* buffer recv size */
330 	u_int recvsz;			/* buffer send size */
331     int (*cb_xdr)(void *, void *); /* if not NULL, point to function to xdr CB args */
332     int (*cb_fn)(void *, void *, void **);   /* if not NULL, pointer to function to handle RPC_CALLs */
333     void *cb_args;          /* if not NULL, pointer to pass into cb_fn */
334 {
335 	CLIENT *cl;			/* client handle */
336 	struct ct_data *ct = NULL;	/* client handle */
337 	struct timeval now;
338 	struct rpc_msg call_msg;
339 	static u_int32_t disrupt;
340 #ifndef _WIN32
341 	sigset_t mask;
342 	sigset_t newmask;
343 #else
344 	/* XXX Need Windows signal/event stuff XXX */
345 #endif
346 	struct sockaddr_storage ss;
347 	socklen_t slen;
348 	struct __rpc_sockinfo si;
349 
350 	if (disrupt == 0)
351 		disrupt = PtrToUlong(raddr);
352 
353 	cl = (CLIENT *)mem_alloc(sizeof (*cl));
354 	ct = (struct ct_data *)mem_alloc(sizeof (*ct));
355 	if ((cl == (CLIENT *)NULL) || (ct == (struct ct_data *)NULL)) {
356 //		(void) syslog(LOG_ERR, clnt_vc_errstr,
357 //		    clnt_vc_str, __no_mem_str);
358 		rpc_createerr.cf_stat = RPC_SYSTEMERROR;
359 		rpc_createerr.cf_error.re_errno = errno;
360 		goto err;
361 	}
362 	ct->ct_addr.buf = NULL;
363 #ifndef _WIN32
364 	sigfillset(&newmask);
365 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
366 #else
367 	/* XXX Need Windows signal/event stuff XXX */
368 #endif
369 	mutex_lock(&clnt_fd_lock);
370 	if (vc_fd_locks == (int *) NULL) {
371 		int cv_allocsz, fd_allocsz;
372 		int dtbsize = __rpc_dtbsize();
373 
374 		fd_allocsz = dtbsize * sizeof (int);
375 		vc_fd_locks = (int *) mem_alloc(fd_allocsz);
376 		if (vc_fd_locks == (int *) NULL) {
377 			mutex_unlock(&clnt_fd_lock);
378 //			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
379 			goto err;
380 		} else
381 			memset(vc_fd_locks, 0, fd_allocsz);
382 
383 		assert(vc_cv == (cond_t *) NULL);
384 		cv_allocsz = dtbsize * sizeof (cond_t);
385 		vc_cv = (cond_t *) mem_alloc(cv_allocsz);
386 		if (vc_cv == (cond_t *) NULL) {
387 			mem_free(vc_fd_locks, fd_allocsz);
388 			vc_fd_locks = (int *) NULL;
389 			mutex_unlock(&clnt_fd_lock);
390 //			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
391 			goto err;
392 		} else {
393 			int i;
394 
395 			for (i = 0; i < dtbsize; i++)
396 				cond_init(&vc_cv[i], 0, (void *) 0);
397 		}
398 	} else
399 		assert(vc_cv != (cond_t *) NULL);
400 
401 	/*
402 	 * XXX - fvdl connecting while holding a mutex?
403 	 */
404 	slen = sizeof ss;
405 	if (getpeername(fd, (struct sockaddr *)&ss, &slen) == SOCKET_ERROR) {
406 		errno = WSAGetLastError();
407 		if (errno != WSAENOTCONN) {
408 			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
409 			rpc_createerr.cf_error.re_errno = errno;
410 			mutex_unlock(&clnt_fd_lock);
411 //			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
412 			goto err;
413 		}
414 		if (connect(fd, (struct sockaddr *)raddr->buf, raddr->len) == SOCKET_ERROR){
415 			rpc_createerr.cf_stat = RPC_SYSTEMERROR;
416 			rpc_createerr.cf_error.re_errno = WSAGetLastError();
417 			mutex_unlock(&clnt_fd_lock);
418 //			thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
419 			goto err;
420 		}
421 	}
422 	mutex_unlock(&clnt_fd_lock);
423 	if (!__rpc_fd2sockinfo(fd, &si))
424 		goto err;
425 //	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
426 
427 	ct->ct_closeit = FALSE;
428 
429 	/*
430 	 * Set up private data struct
431 	 */
432 	ct->ct_fd = fd;
433 	ct->ct_wait.tv_usec = 0;
434 	ct->ct_waitset = FALSE;
435 	ct->ct_addr.buf = malloc(raddr->maxlen);
436 	if (ct->ct_addr.buf == NULL)
437 		goto err;
438 	memcpy(ct->ct_addr.buf, raddr->buf, raddr->len);
439 	ct->ct_addr.len = raddr->len;
440 	ct->ct_addr.maxlen = raddr->maxlen;
441     ct->use_stored_reply_msg = FALSE;
442 
443 	/*
444 	 * Initialize call message
445 	 */
446 	(void)gettimeofday(&now, NULL);
447 	call_msg.rm_xid = ((u_int32_t)++disrupt) ^ __RPC_GETXID(&now);
448 	call_msg.rm_direction = CALL;
449 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
450 	call_msg.rm_call.cb_prog = (u_int32_t)prog;
451 	call_msg.rm_call.cb_vers = (u_int32_t)vers;
452 
453 	/*
454 	 * pre-serialize the static part of the call msg and stash it away
455 	 */
456 	xdrmem_create(&(ct->ct_xdrs), ct->ct_u.ct_mcallc, MCALL_MSG_SIZE,
457 	    XDR_ENCODE);
458 	if (! xdr_callhdr(&(ct->ct_xdrs), &call_msg)) {
459 		if (ct->ct_closeit) {
460 			(void)closesocket(fd);
461 		}
462 		goto err;
463 	}
464 	ct->ct_mpos = XDR_GETPOS(&(ct->ct_xdrs));
465 	XDR_DESTROY(&(ct->ct_xdrs));
466 
467 	/*
468 	 * Create a client handle which uses xdrrec for serialization
469 	 * and authnone for authentication.
470 	 */
471 	cl->cl_ops = clnt_vc_ops();
472 	cl->cl_private = ct;
473 	cl->cl_auth = authnone_create();
474 	sendsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)sendsz);
475 	recvsz = __rpc_get_t_size(si.si_af, si.si_proto, (int)recvsz);
476 	xdrrec_create(&(ct->ct_xdrs), sendsz, recvsz,
477 	    cl->cl_private, read_vc, write_vc);
478 
479     if (cb_xdr && cb_fn && cb_args) {
480         cl->cb_xdr = cb_xdr;
481         cl->cb_fn = cb_fn;
482         cl->cb_args = cb_args;
483         cl->cb_thread = (HANDLE)_beginthreadex(NULL,
484             0, clnt_cb_thread, cl, 0, NULL);
485         if (cl->cb_thread == INVALID_HANDLE_VALUE) {
486             fprintf(stderr, "_beginthreadex failed %d\n", GetLastError());
487             goto err;
488         } else
489             fprintf(stdout, "%04x: started the callback thread %04x\n",
490                 GetCurrentThreadId(), cl->cb_thread);
491     } else
492         cl->cb_thread = INVALID_HANDLE_VALUE;
493 	return (cl);
494 
495 err:
496 	if (cl) {
497 		if (ct) {
498 			if (ct->ct_addr.len)
499 				mem_free(ct->ct_addr.buf, ct->ct_addr.len);
500 			mem_free(ct, sizeof (struct ct_data));
501 		}
502 		if (cl)
503 			mem_free(cl, sizeof (CLIENT));
504 	}
505 	return ((CLIENT *)NULL);
506 }
507 
508 static enum clnt_stat
509 clnt_vc_call(cl, proc, xdr_args, args_ptr, xdr_results, results_ptr, timeout)
510 	CLIENT *cl;
511 	rpcproc_t proc;
512 	xdrproc_t xdr_args;
513 	void *args_ptr;
514 	xdrproc_t xdr_results;
515 	void *results_ptr;
516 	struct timeval timeout;
517 {
518 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
519 	XDR *xdrs = &(ct->ct_xdrs);
520 	u_int32_t x_id;
521 	u_int32_t *msg_x_id = &ct->ct_u.ct_mcalli;    /* yuk */
522 	bool_t shipnow;
523 	static int refreshes = 2;
524     u_int seq = -1;
525     time_t start_send, time_now;
526 #ifndef _WIN32
527 	sigset_t mask, newmask;
528 #else
529 	/* XXX Need Windows signal/event stuff XXX */
530 #endif
531     enum clnt_stat status;
532 
533 	assert(cl != NULL);
534 
535 #ifndef _WIN32
536 	sigfillset(&newmask);
537 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
538 #else
539 	/* XXX Need Windows signal/event stuff XXX */
540 #endif
541 
542     acquire_fd_lock(ct->ct_fd);
543 
544 	if (!ct->ct_waitset) {
545 		/* If time is not within limits, we ignore it. */
546 		if (time_not_ok(&timeout) == FALSE)
547 			ct->ct_wait = timeout;
548 	}
549 
550 	shipnow =
551 	    (xdr_results == NULL && timeout.tv_sec == 0
552 	    && timeout.tv_usec == 0) ? FALSE : TRUE;
553 
554 call_again:
555     __xdrrec_setblock(xdrs);
556 	xdrs->x_op = XDR_ENCODE;
557 	ct->ct_error.re_status = RPC_SUCCESS;
558 	x_id = ntohl(--(*msg_x_id));
559 
560 	if ((! XDR_PUTBYTES(xdrs, ct->ct_u.ct_mcallc, ct->ct_mpos)) ||
561 	    (! XDR_PUTINT32(xdrs, (int32_t *)&proc)) ||
562 	    (! AUTH_MARSHALL(cl->cl_auth, xdrs, &seq)) ||
563 	    (! AUTH_WRAP(cl->cl_auth, xdrs, xdr_args, args_ptr))) {
564 		if (ct->ct_error.re_status == RPC_SUCCESS)
565 			ct->ct_error.re_status = RPC_CANTENCODEARGS;
566 		(void)xdrrec_endofrecord(xdrs, TRUE);
567         goto out;
568 	}
569 
570 	if (! xdrrec_endofrecord(xdrs, shipnow)) {
571         ct->ct_error.re_status = RPC_CANTSEND;
572         goto out;
573 	}
574 	if (! shipnow) {
575 		release_fd_lock(ct->ct_fd, mask);
576 		return (RPC_SUCCESS);
577 	}
578 
579 #ifdef NO_CB_4_KRB5P
580     if (cl->cb_thread != INVALID_HANDLE_VALUE)
581         release_fd_lock(ct->ct_fd, mask);
582 #endif
583 	/*
584 	 * Keep receiving until we get a valid transaction id
585 	 */
586 
587     time(&start_send);
588 	while (TRUE) {
589 #ifdef NO_CB_4_KRB5P
590         if (cl->cb_thread != INVALID_HANDLE_VALUE) {
591             mutex_lock(&clnt_fd_lock);
592 	        while ((vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] &&
593                     vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] != GetCurrentThreadId()) ||
594                     (ct->reply_msg.rm_xid && ct->reply_msg.rm_xid != x_id))
595 		        cond_wait(&vc_cv[WINSOCK_HANDLE_HASH(ct->ct_fd)], &clnt_fd_lock);
596 	        vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)] = GetCurrentThreadId();
597 	        mutex_unlock(&clnt_fd_lock);
598         }
599 #endif
600         __xdrrec_setnonblock(xdrs, 0);
601         xdrs->x_op = XDR_DECODE;
602 		ct->reply_msg.acpted_rply.ar_verf = _null_auth;
603 		ct->reply_msg.acpted_rply.ar_results.where = NULL;
604 		ct->reply_msg.acpted_rply.ar_results.proc = (xdrproc_t)xdr_void;
605         if (!ct->use_stored_reply_msg) {
606 		    if (!xdrrec_skiprecord(xdrs)) {
607                 if (ct->ct_error.re_status != RPC_CANTRECV) {
608                     time(&time_now);
609                     if (time_now - start_send >= timeout.tv_sec) {
610                         ct->ct_error.re_status = RPC_TIMEDOUT;
611                         goto out;
612                     }
613 #ifdef NO_CB_4_KRB5P
614                     if (cl->cb_thread != INVALID_HANDLE_VALUE)
615 #endif
616 			            release_fd_lock(ct->ct_fd, mask);
617                     SwitchToThread();
618 			        continue;
619                 }
620                 goto out;
621 		    }
622             if (!xdr_getxiddir(xdrs, &ct->reply_msg)) {
623 			    if (ct->ct_error.re_status == RPC_SUCCESS) {
624 #ifdef NO_CB_4_KRB5P
625                     if (cl->cb_thread != INVALID_HANDLE_VALUE)
626 #endif
627                         release_fd_lock(ct->ct_fd, mask);
628                     SwitchToThread();
629                     continue;
630                 }
631                 goto out;
632             }
633 
634             if (ct->reply_msg.rm_direction != REPLY) {
635                 if (cl->cb_thread == INVALID_HANDLE_VALUE) {
636                     ct->reply_msg.rm_xid = 0;
637                 } else {
638                     ct->use_stored_reply_msg = TRUE;
639                 }
640                 release_fd_lock(ct->ct_fd, mask);
641                 SwitchToThread();
642                 continue;
643             }
644         }
645 		if (ct->reply_msg.rm_xid == x_id) {
646             ct->use_stored_reply_msg = FALSE;
647             ct->reply_msg.rm_xid = 0;
648             if (!xdr_getreplyunion(xdrs, &ct->reply_msg))
649                 goto out;
650 			break;
651         }
652         else {
653             time(&time_now);
654             if (time_now - start_send >= timeout.tv_sec) {
655                 ct->ct_error.re_status = RPC_TIMEDOUT;
656                 goto out;
657             }
658             ct->use_stored_reply_msg = TRUE;
659 #ifdef NO_CB_4_KRB5P
660             if (cl->cb_thread != INVALID_HANDLE_VALUE)
661 #endif
662                 release_fd_lock(ct->ct_fd, mask);
663             SwitchToThread();
664         }
665 	}
666 
667 	/*
668 	 * process header
669 	 */
670 	_seterr_reply(&ct->reply_msg, &(ct->ct_error));
671 	if (ct->ct_error.re_status == RPC_SUCCESS) {
672 		if (! AUTH_VALIDATE(cl->cl_auth,
673 		    &ct->reply_msg.acpted_rply.ar_verf, seq)) {
674 			ct->ct_error.re_status = RPC_AUTHERROR;
675 			ct->ct_error.re_why = AUTH_INVALIDRESP;
676         }
677         else if (! AUTH_UNWRAP(cl->cl_auth, xdrs, xdr_results, results_ptr, seq)) {
678 			if (ct->ct_error.re_status == RPC_SUCCESS)
679 				ct->ct_error.re_status = RPC_CANTDECODERES;
680 		}
681 		/* free verifier ... */
682 		if (ct->reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
683 			xdrs->x_op = XDR_FREE;
684 			(void)xdr_opaque_auth(xdrs,
685 			    &(ct->reply_msg.acpted_rply.ar_verf));
686 		}
687 	}  /* end successful completion */
688 	else {
689 		if (ct->reply_msg.acpted_rply.ar_verf.oa_base != NULL) {
690 			xdrs->x_op = XDR_FREE;
691 			(void)xdr_opaque_auth(xdrs,
692 			    &(ct->reply_msg.acpted_rply.ar_verf));
693 		}
694 		/* maybe our credentials need to be refreshed ... */
695 		if (refreshes-- > 0 && AUTH_REFRESH(cl->cl_auth, &ct->reply_msg))
696 			goto call_again;
697 	}  /* end of unsuccessful completion */
698     ct->reply_msg.rm_direction = -1;
699 out:
700     status = ct->ct_error.re_status;
701 	release_fd_lock(ct->ct_fd, mask);
702 	return status;
703 }
704 
705 static void
706 clnt_vc_geterr(cl, errp)
707 	CLIENT *cl;
708 	struct rpc_err *errp;
709 {
710 	struct ct_data *ct;
711 
712 	assert(cl != NULL);
713 	assert(errp != NULL);
714 
715 	ct = (struct ct_data *) cl->cl_private;
716 	*errp = ct->ct_error;
717 }
718 
719 static bool_t
720 clnt_vc_freeres(cl, xdr_res, res_ptr)
721 	CLIENT *cl;
722 	xdrproc_t xdr_res;
723 	void *res_ptr;
724 {
725 	struct ct_data *ct;
726 	XDR *xdrs;
727 	bool_t dummy;
728 #ifndef _WIN32
729 	sigset_t mask;
730 	sigset_t newmask;
731 #else
732 	/* XXX Need Windows signal/event stuff XXX */
733 #endif
734 
735 	assert(cl != NULL);
736 
737 	ct = (struct ct_data *)cl->cl_private;
738 	xdrs = &(ct->ct_xdrs);
739 
740 #ifndef _WIN32
741 	sigfillset(&newmask);
742 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
743 #else
744 	/* XXX Need Windows signal/event stuff XXX */
745 #endif
746 	mutex_lock(&clnt_fd_lock);
747 	while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct->ct_fd)])
748 		cond_wait(&vc_cv[WINSOCK_HANDLE_HASH(ct->ct_fd)], &clnt_fd_lock);
749 	xdrs->x_op = XDR_FREE;
750 	dummy = (*xdr_res)(xdrs, res_ptr);
751 	mutex_unlock(&clnt_fd_lock);
752 //	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
753 	cond_signal(&vc_cv[WINSOCK_HANDLE_HASH(ct->ct_fd)]);
754 
755 	return dummy;
756 }
757 
758 /*ARGSUSED*/
759 static void
760 clnt_vc_abort(cl)
761 	CLIENT *cl;
762 {
763 }
764 
765 static bool_t
766 clnt_vc_control(cl, request, info)
767 	CLIENT *cl;
768 	u_int request;
769 	void *info;
770 {
771 	struct ct_data *ct;
772 	void *infop = info;
773 #ifndef _WIN32
774 	sigset_t mask;
775 	sigset_t newmask;
776 #else
777 	/* XXX Need Windows signal/event stuff XXX */
778 #endif
779 
780 	assert(cl != NULL);
781 
782 	ct = (struct ct_data *)cl->cl_private;
783 
784 #ifndef _WIN32
785 	sigfillset(&newmask);
786 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
787 #else
788 	/* XXX Need Windows signal/event stuff XXX */
789 #endif
790     acquire_fd_lock(ct->ct_fd);
791 
792 	switch (request) {
793 	case CLSET_FD_CLOSE:
794 		ct->ct_closeit = TRUE;
795 		release_fd_lock(ct->ct_fd, mask);
796 		return (TRUE);
797 	case CLSET_FD_NCLOSE:
798 		ct->ct_closeit = FALSE;
799 		release_fd_lock(ct->ct_fd, mask);
800 		return (TRUE);
801 	default:
802 		break;
803 	}
804 
805 	/* for other requests which use info */
806 	if (info == NULL) {
807 		release_fd_lock(ct->ct_fd, mask);
808 		return (FALSE);
809 	}
810 	switch (request) {
811 	case CLSET_TIMEOUT:
812 		if (time_not_ok((struct timeval *)info)) {
813 			release_fd_lock(ct->ct_fd, mask);
814 			return (FALSE);
815 		}
816 		ct->ct_wait = *(struct timeval *)infop;
817 		ct->ct_waitset = TRUE;
818 		break;
819 	case CLGET_TIMEOUT:
820 		*(struct timeval *)infop = ct->ct_wait;
821 		break;
822 	case CLGET_SERVER_ADDR:
823 		(void) memcpy(info, ct->ct_addr.buf, (size_t)ct->ct_addr.len);
824 		break;
825 	case CLGET_FD:
826 		*(int *)info = ct->ct_fd;
827 		break;
828 	case CLGET_SVC_ADDR:
829 		/* The caller should not free this memory area */
830 		*(struct netbuf *)info = ct->ct_addr;
831 		break;
832 	case CLSET_SVC_ADDR:		/* set to new address */
833 		release_fd_lock(ct->ct_fd, mask);
834 		return (FALSE);
835 	case CLGET_XID:
836 		/*
837 		 * use the knowledge that xid is the
838 		 * first element in the call structure
839 		 * This will get the xid of the PREVIOUS call
840 		 */
841 		*(u_int32_t *)info =
842 		    ntohl(*(u_int32_t *)(void *)&ct->ct_u.ct_mcalli);
843 		break;
844 	case CLSET_XID:
845 		/* This will set the xid of the NEXT call */
846 		*(u_int32_t *)(void *)&ct->ct_u.ct_mcalli =
847 		    htonl(*((u_int32_t *)info) + 1);
848 		/* increment by 1 as clnt_vc_call() decrements once */
849 		break;
850 	case CLGET_VERS:
851 		/*
852 		 * This RELIES on the information that, in the call body,
853 		 * the version number field is the fifth field from the
854 		 * begining of the RPC header. MUST be changed if the
855 		 * call_struct is changed
856 		 */
857 		*(u_int32_t *)info =
858 		    ntohl(*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
859 		    4 * BYTES_PER_XDR_UNIT));
860 		break;
861 
862 	case CLSET_VERS:
863 		*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
864 		    4 * BYTES_PER_XDR_UNIT) =
865 		    htonl(*(u_int32_t *)info);
866 		break;
867 
868 	case CLGET_PROG:
869 		/*
870 		 * This RELIES on the information that, in the call body,
871 		 * the program number field is the fourth field from the
872 		 * begining of the RPC header. MUST be changed if the
873 		 * call_struct is changed
874 		 */
875 		*(u_int32_t *)info =
876 		    ntohl(*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
877 		    3 * BYTES_PER_XDR_UNIT));
878 		break;
879 
880 	case CLSET_PROG:
881 		*(u_int32_t *)(void *)(ct->ct_u.ct_mcallc +
882 		    3 * BYTES_PER_XDR_UNIT) =
883 		    htonl(*(u_int32_t *)info);
884 		break;
885 
886 	default:
887 		release_fd_lock(ct->ct_fd, mask);
888 		return (FALSE);
889 	}
890 	release_fd_lock(ct->ct_fd, mask);
891 	return (TRUE);
892 }
893 
894 
895 static void
896 clnt_vc_destroy(cl)
897 	CLIENT *cl;
898 {
899 	struct ct_data *ct = (struct ct_data *) cl->cl_private;
900 	int ct_fd = ct->ct_fd;
901 #ifndef _WIN32
902 	sigset_t mask;
903 	sigset_t newmask;
904 #else
905 	/* XXX Need Windows signal/event stuff XXX */
906 #endif
907 
908 	assert(cl != NULL);
909 
910 	ct = (struct ct_data *) cl->cl_private;
911 
912 #ifndef _WIN32
913 	sigfillset(&newmask);
914 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
915 #else
916 	/* XXX Need Windows signal/event stuff XXX */
917 #endif
918 	mutex_lock(&clnt_fd_lock);
919 	while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct_fd)])
920 		cond_wait(&vc_cv[WINSOCK_HANDLE_HASH(ct_fd)], &clnt_fd_lock);
921 
922     if (cl->cb_thread != INVALID_HANDLE_VALUE) {
923         int status;
924         fprintf(stdout, "%04x: sending shutdown to callback thread %04x\n",
925             GetCurrentThreadId(), cl->cb_thread);
926         cl->shutdown = 1;
927         mutex_unlock(&clnt_fd_lock);
928         cond_signal(&vc_cv[WINSOCK_HANDLE_HASH(ct_fd)]);
929         status = WaitForSingleObject(cl->cb_thread, INFINITE);
930         fprintf(stdout, "%04x: terminated callback thread\n", GetCurrentThreadId());
931         mutex_lock(&clnt_fd_lock);
932         while (vc_fd_locks[WINSOCK_HANDLE_HASH(ct_fd)])
933             cond_wait(&vc_cv[WINSOCK_HANDLE_HASH(ct_fd)], &clnt_fd_lock);
934     }
935 
936 	if (ct->ct_closeit && ct->ct_fd != -1) {
937 		(void)closesocket(ct->ct_fd);
938 	}
939 	XDR_DESTROY(&(ct->ct_xdrs));
940 	if (ct->ct_addr.buf)
941 		free(ct->ct_addr.buf);
942 	mem_free(ct, sizeof(struct ct_data));
943 	if (cl->cl_netid && cl->cl_netid[0])
944 		mem_free(cl->cl_netid, strlen(cl->cl_netid) +1);
945 	if (cl->cl_tp && cl->cl_tp[0])
946 		mem_free(cl->cl_tp, strlen(cl->cl_tp) +1);
947 	mem_free(cl, sizeof(CLIENT));
948 	mutex_unlock(&clnt_fd_lock);
949 //	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
950 	cond_signal(&vc_cv[WINSOCK_HANDLE_HASH(ct_fd)]);
951 }
952 
953 /*
954  * Interface between xdr serializer and tcp connection.
955  * Behaves like the system calls, read & write, but keeps some error state
956  * around for the rpc level.
957  */
958 static int
959 read_vc(ctp, buf, len)
960 	void *ctp;
961 	void *buf;
962 	int len;
963 {
964 	/*
965 	struct sockaddr sa;
966 	socklen_t sal;
967 	*/
968 	struct ct_data *ct = (struct ct_data *)ctp;
969 	struct pollfd fd;
970 	int milliseconds = ct->ct_wait.tv_usec;
971 
972 	if (len == 0)
973 		return (0);
974 	fd.fd = ct->ct_fd;
975 	fd.events = POLLIN;
976 	for (;;) {
977 #ifndef __REACTOS__
978 		switch (poll(&fd, 1, milliseconds)) {
979 #else
980 		/* ReactOS: use select instead of poll */
981 		fd_set infd;
982 		struct timeval timeout;
983 
984 		FD_ZERO(&infd);
985 		FD_SET(ct->ct_fd, &infd);
986 
987 		timeout.tv_sec = 0;
988 		timeout.tv_usec = milliseconds * 1000;
989 
990 		switch (select(0, &infd, NULL, NULL, &timeout)) {
991 #endif
992 
993 		case 0:
994 			ct->ct_error.re_status = RPC_TIMEDOUT;
995 			return (-1);
996 
997 		case SOCKET_ERROR:
998 			errno = WSAGetLastError();
999 			if (errno == WSAEINTR)
1000 				continue;
1001 			ct->ct_error.re_status = RPC_CANTRECV;
1002 			ct->ct_error.re_errno = errno;
1003 			return (-2);
1004 		}
1005 		break;
1006 	}
1007 
1008 	len = recv(ct->ct_fd, buf, (size_t)len, 0);
1009 	errno = WSAGetLastError();
1010 
1011 	switch (len) {
1012 	case 0:
1013 		/* premature eof */
1014 		ct->ct_error.re_errno = WSAECONNRESET;
1015 		ct->ct_error.re_status = RPC_CANTRECV;
1016 		len = -1;  /* it's really an error */
1017 		break;
1018 
1019 	case SOCKET_ERROR:
1020 		ct->ct_error.re_errno = errno;
1021 		ct->ct_error.re_status = RPC_CANTRECV;
1022 		break;
1023 	}
1024 	return (len);
1025 }
1026 
1027 static int
1028 #ifndef __REACTOS__
1029 write_vc(ctp, buf, len)
1030 #else
1031 write_vc(ctp, ptr, len)
1032 #endif
1033 	void *ctp;
1034 #ifndef __REACTOS__
1035 	char *buf;
1036 #else
1037     void *ptr;
1038 #endif
1039 	int len;
1040 {
1041 	struct ct_data *ct = (struct ct_data *)ctp;
1042 	int i = 0, cnt;
1043 #ifdef __REACTOS__
1044     char *buf = ptr;
1045 #endif
1046 
1047 	for (cnt = len; cnt > 0; cnt -= i, buf += i) {
1048 	    if ((i = send(ct->ct_fd, buf, (size_t)cnt, 0)) == SOCKET_ERROR) {
1049 		ct->ct_error.re_errno = WSAGetLastError();
1050 		ct->ct_error.re_status = RPC_CANTSEND;
1051 		return (-1);
1052 	    }
1053 	}
1054 	return (len);
1055 }
1056 
1057 static struct clnt_ops *
1058 clnt_vc_ops()
1059 {
1060 	static struct clnt_ops ops;
1061 	extern mutex_t  ops_lock;
1062 #ifndef _WIN32
1063 	sigset_t mask, newmask;
1064 
1065 	/* VARIABLES PROTECTED BY ops_lock: ops */
1066 
1067 	sigfillset(&newmask);
1068 	thr_sigsetmask(SIG_SETMASK, &newmask, &mask);
1069 #else
1070 	/* XXX Need Windows signal/event stuff XXX */
1071 #endif
1072 	mutex_lock(&ops_lock);
1073 	if (ops.cl_call == NULL) {
1074 		ops.cl_call = clnt_vc_call;
1075 		ops.cl_abort = clnt_vc_abort;
1076 		ops.cl_geterr = clnt_vc_geterr;
1077 		ops.cl_freeres = clnt_vc_freeres;
1078 		ops.cl_destroy = clnt_vc_destroy;
1079 		ops.cl_control = clnt_vc_control;
1080 	}
1081 	mutex_unlock(&ops_lock);
1082 //	thr_sigsetmask(SIG_SETMASK, &(mask), NULL);
1083 	return (&ops);
1084 }
1085 
1086 /*
1087  * Make sure that the time is not garbage.   -1 value is disallowed.
1088  * Note this is different from time_not_ok in clnt_dg.c
1089  */
1090 static bool_t
1091 time_not_ok(t)
1092 	struct timeval *t;
1093 {
1094 	return (t->tv_sec <= -1 || t->tv_sec > 100000000 ||
1095 		t->tv_usec <= -1 || t->tv_usec > 1000000);
1096 }
1097