xref: /illumos-gate/usr/src/uts/common/rpc/clnt_rdma.c (revision bfed486a)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2009 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
26 /* All Rights Reserved */
27 /*
28  * Portions of this source code were derived from Berkeley
29  * 4.3 BSD under license from the Regents of the University of
30  * California.
31  */
32 
33 #include <sys/param.h>
34 #include <sys/types.h>
35 #include <sys/user.h>
36 #include <sys/systm.h>
37 #include <sys/sysmacros.h>
38 #include <sys/errno.h>
39 #include <sys/kmem.h>
40 #include <sys/debug.h>
41 #include <sys/systm.h>
42 #include <sys/kstat.h>
43 #include <sys/t_lock.h>
44 #include <sys/ddi.h>
45 #include <sys/cmn_err.h>
46 #include <sys/time.h>
47 #include <sys/isa_defs.h>
48 #include <sys/zone.h>
49 #include <sys/sdt.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <rpc/auth.h>
54 #include <rpc/clnt.h>
55 #include <rpc/rpc_msg.h>
56 #include <rpc/rpc_rdma.h>
57 #include <nfs/nfs.h>
58 #include <nfs/nfs4_kprot.h>
59 
60 static uint32_t rdma_bufs_rqst = RDMA_BUFS_RQST;
61 
62 static int clnt_compose_rpcmsg(CLIENT *, rpcproc_t, rdma_buf_t *,
63 			    XDR *, xdrproc_t, caddr_t);
64 static int  clnt_compose_rdma_header(CONN *, CLIENT *, rdma_buf_t *,
65 		    XDR **, uint_t *);
66 static int clnt_setup_rlist(CONN *, XDR *, XDR *);
67 static int clnt_setup_wlist(CONN *, XDR *, XDR *);
68 static int clnt_setup_long_reply(CONN *, struct clist **, uint_t);
69 static void clnt_check_credit(CONN *);
70 static void clnt_return_credit(CONN *);
71 static void clnt_decode_long_reply(CONN *, struct clist *,
72 		struct clist *, XDR *, XDR **, struct clist *,
73 		struct clist *, uint_t, uint_t);
74 
75 static void clnt_update_credit(CONN *, uint32_t);
76 static void check_dereg_wlist(CONN *, struct clist *);
77 
78 static enum clnt_stat clnt_rdma_kcallit(CLIENT *, rpcproc_t, xdrproc_t,
79     caddr_t, xdrproc_t, caddr_t, struct timeval);
80 static void	clnt_rdma_kabort(CLIENT *);
81 static void	clnt_rdma_kerror(CLIENT *, struct rpc_err *);
82 static bool_t	clnt_rdma_kfreeres(CLIENT *, xdrproc_t, caddr_t);
83 static void	clnt_rdma_kdestroy(CLIENT *);
84 static bool_t	clnt_rdma_kcontrol(CLIENT *, int, char *);
85 static int	clnt_rdma_ksettimers(CLIENT *, struct rpc_timers *,
86     struct rpc_timers *, int, void(*)(int, int, caddr_t), caddr_t, uint32_t);
87 
88 /*
89  * Operations vector for RDMA based RPC
90  */
91 static struct clnt_ops rdma_clnt_ops = {
92 	clnt_rdma_kcallit,	/* do rpc call */
93 	clnt_rdma_kabort,	/* abort call */
94 	clnt_rdma_kerror,	/* return error status */
95 	clnt_rdma_kfreeres,	/* free results */
96 	clnt_rdma_kdestroy,	/* destroy rpc handle */
97 	clnt_rdma_kcontrol,	/* the ioctl() of rpc */
98 	clnt_rdma_ksettimers,	/* set retry timers */
99 };
100 
101 /*
102  * The size of the preserialized RPC header information.
103  */
104 #define	CKU_HDRSIZE	20
105 #define	CLNT_RDMA_SUCCESS 0
106 #define	CLNT_RDMA_FAIL (-1)
107 
108 #define	AUTH_REFRESH_COUNT 2
109 
110 #define	IS_RPCSEC_GSS(authh)			\
111 	(authh->cl_auth->ah_cred.oa_flavor == RPCSEC_GSS)
112 
113 /*
114  * Per RPC RDMA endpoint details
115  */
116 typedef struct cku_private {
117 	CLIENT			cku_client;	/* client handle */
118 	rdma_mod_t		*cku_rd_mod;	/* underlying RDMA mod */
119 	void			*cku_rd_handle;	/* underlying RDMA device */
120 	struct netbuf		cku_addr;	/* remote netbuf address */
121 	int			cku_addrfmly;	/* for finding addr_type */
122 	struct rpc_err		cku_err;	/* error status */
123 	struct cred		*cku_cred;	/* credentials */
124 	XDR			cku_outxdr;	/* xdr stream for output */
125 	uint32_t		cku_outsz;
126 	XDR			cku_inxdr;	/* xdr stream for input */
127 	char			cku_rpchdr[CKU_HDRSIZE+4]; /* rpc header */
128 	uint32_t		cku_xid;	/* current XID */
129 } cku_private_t;
130 
131 #define	CLNT_RDMA_DELAY	10	/* secs to delay after a connection failure */
132 static int clnt_rdma_min_delay = CLNT_RDMA_DELAY;
133 
134 struct {
135 	kstat_named_t	rccalls;
136 	kstat_named_t	rcbadcalls;
137 	kstat_named_t	rcbadxids;
138 	kstat_named_t	rctimeouts;
139 	kstat_named_t	rcnewcreds;
140 	kstat_named_t	rcbadverfs;
141 	kstat_named_t	rctimers;
142 	kstat_named_t	rccantconn;
143 	kstat_named_t	rcnomem;
144 	kstat_named_t	rcintrs;
145 	kstat_named_t	rclongrpcs;
146 } rdmarcstat = {
147 	{ "calls",	KSTAT_DATA_UINT64 },
148 	{ "badcalls",	KSTAT_DATA_UINT64 },
149 	{ "badxids",	KSTAT_DATA_UINT64 },
150 	{ "timeouts",	KSTAT_DATA_UINT64 },
151 	{ "newcreds",	KSTAT_DATA_UINT64 },
152 	{ "badverfs",	KSTAT_DATA_UINT64 },
153 	{ "timers",	KSTAT_DATA_UINT64 },
154 	{ "cantconn",	KSTAT_DATA_UINT64 },
155 	{ "nomem",	KSTAT_DATA_UINT64 },
156 	{ "interrupts", KSTAT_DATA_UINT64 },
157 	{ "longrpc", 	KSTAT_DATA_UINT64 }
158 };
159 
160 kstat_named_t *rdmarcstat_ptr = (kstat_named_t *)&rdmarcstat;
161 uint_t rdmarcstat_ndata = sizeof (rdmarcstat) / sizeof (kstat_named_t);
162 
163 #ifdef DEBUG
164 int rdma_clnt_debug = 0;
165 #endif
166 
167 #ifdef accurate_stats
168 extern kmutex_t rdmarcstat_lock;    /* mutex for rcstat updates */
169 
170 #define	RCSTAT_INCR(x)			\
171 	mutex_enter(&rdmarcstat_lock);	\
172 	rdmarcstat.x.value.ui64++;	\
173 	mutex_exit(&rdmarcstat_lock);
174 #else
175 #define	RCSTAT_INCR(x)			\
176 	rdmarcstat.x.value.ui64++;
177 #endif
178 
179 #define	ptoh(p)		(&((p)->cku_client))
180 #define	htop(h)		((cku_private_t *)((h)->cl_private))
181 
182 uint_t
183 calc_length(uint_t len)
184 {
185 	len = RNDUP(len);
186 
187 	if (len <= 64 * 1024) {
188 		if (len > 32 * 1024) {
189 			len = 64 * 1024;
190 		} else {
191 			if (len > 16 * 1024) {
192 				len = 32 * 1024;
193 			} else {
194 				if (len > 8 * 1024) {
195 					len = 16 * 1024;
196 				} else {
197 					len = 8 * 1024;
198 				}
199 			}
200 		}
201 	}
202 	return (len);
203 }
204 int
205 clnt_rdma_kcreate(char *proto, void *handle, struct netbuf *raddr, int family,
206     rpcprog_t pgm, rpcvers_t vers, struct cred *cred, CLIENT **cl)
207 {
208 	CLIENT *h;
209 	struct cku_private *p;
210 	struct rpc_msg call_msg;
211 	rdma_registry_t *rp;
212 
213 	ASSERT(INGLOBALZONE(curproc));
214 
215 	if (cl == NULL)
216 		return (EINVAL);
217 	*cl = NULL;
218 
219 	p = kmem_zalloc(sizeof (*p), KM_SLEEP);
220 
221 	/*
222 	 * Find underlying RDMATF plugin
223 	 */
224 	rw_enter(&rdma_lock, RW_READER);
225 	rp = rdma_mod_head;
226 	while (rp != NULL) {
227 		if (strcmp(rp->r_mod->rdma_api, proto))
228 			rp = rp->r_next;
229 		else {
230 			p->cku_rd_mod = rp->r_mod;
231 			p->cku_rd_handle = handle;
232 			break;
233 		}
234 	}
235 	rw_exit(&rdma_lock);
236 
237 	if (p->cku_rd_mod == NULL) {
238 		/*
239 		 * Should not happen.
240 		 * No matching RDMATF plugin.
241 		 */
242 		kmem_free(p, sizeof (struct cku_private));
243 		return (EINVAL);
244 	}
245 
246 	h = ptoh(p);
247 	h->cl_ops = &rdma_clnt_ops;
248 	h->cl_private = (caddr_t)p;
249 	h->cl_auth = authkern_create();
250 
251 	/* call message, just used to pre-serialize below */
252 	call_msg.rm_xid = 0;
253 	call_msg.rm_direction = CALL;
254 	call_msg.rm_call.cb_rpcvers = RPC_MSG_VERSION;
255 	call_msg.rm_call.cb_prog = pgm;
256 	call_msg.rm_call.cb_vers = vers;
257 
258 	xdrmem_create(&p->cku_outxdr, p->cku_rpchdr, CKU_HDRSIZE, XDR_ENCODE);
259 	/* pre-serialize call message header */
260 	if (!xdr_callhdr(&p->cku_outxdr, &call_msg)) {
261 		XDR_DESTROY(&p->cku_outxdr);
262 		auth_destroy(h->cl_auth);
263 		kmem_free(p, sizeof (struct cku_private));
264 		return (EINVAL);
265 	}
266 
267 	/*
268 	 * Set up the rpc information
269 	 */
270 	p->cku_cred = cred;
271 	p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
272 	p->cku_addr.maxlen = raddr->maxlen;
273 	p->cku_addr.len = raddr->len;
274 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
275 	p->cku_addrfmly = family;
276 
277 	*cl = h;
278 	return (0);
279 }
280 
281 static void
282 clnt_rdma_kdestroy(CLIENT *h)
283 {
284 	struct cku_private *p = htop(h);
285 
286 	kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
287 	kmem_free(p, sizeof (*p));
288 }
289 
290 void
291 clnt_rdma_kinit(CLIENT *h, char *proto, void *handle, struct netbuf *raddr,
292     struct cred *cred)
293 {
294 	struct cku_private *p = htop(h);
295 	rdma_registry_t *rp;
296 
297 	ASSERT(INGLOBALZONE(curproc));
298 	/*
299 	 * Find underlying RDMATF plugin
300 	 */
301 	p->cku_rd_mod = NULL;
302 	rw_enter(&rdma_lock, RW_READER);
303 	rp = rdma_mod_head;
304 	while (rp != NULL) {
305 		if (strcmp(rp->r_mod->rdma_api, proto))
306 			rp = rp->r_next;
307 		else {
308 			p->cku_rd_mod = rp->r_mod;
309 			p->cku_rd_handle = handle;
310 			break;
311 		}
312 
313 	}
314 	rw_exit(&rdma_lock);
315 
316 	/*
317 	 * Set up the rpc information
318 	 */
319 	p->cku_cred = cred;
320 	p->cku_xid = 0;
321 
322 	if (p->cku_addr.maxlen < raddr->len) {
323 		if (p->cku_addr.maxlen != 0 && p->cku_addr.buf != NULL)
324 			kmem_free(p->cku_addr.buf, p->cku_addr.maxlen);
325 		p->cku_addr.buf = kmem_zalloc(raddr->maxlen, KM_SLEEP);
326 		p->cku_addr.maxlen = raddr->maxlen;
327 	}
328 
329 	p->cku_addr.len = raddr->len;
330 	bcopy(raddr->buf, p->cku_addr.buf, raddr->len);
331 	h->cl_ops = &rdma_clnt_ops;
332 }
333 
334 static int
335 clnt_compose_rpcmsg(CLIENT *h, rpcproc_t procnum,
336     rdma_buf_t *rpcmsg, XDR *xdrs,
337     xdrproc_t xdr_args, caddr_t argsp)
338 {
339 	cku_private_t *p = htop(h);
340 
341 	if (h->cl_auth->ah_cred.oa_flavor != RPCSEC_GSS) {
342 		/*
343 		 * Copy in the preserialized RPC header
344 		 * information.
345 		 */
346 		bcopy(p->cku_rpchdr, rpcmsg->addr, CKU_HDRSIZE);
347 
348 		/*
349 		 * transaction id is the 1st thing in the output
350 		 * buffer.
351 		 */
352 		/* LINTED pointer alignment */
353 		(*(uint32_t *)(rpcmsg->addr)) = p->cku_xid;
354 
355 		/* Skip the preserialized stuff. */
356 		XDR_SETPOS(xdrs, CKU_HDRSIZE);
357 
358 		/* Serialize dynamic stuff into the output buffer. */
359 		if ((!XDR_PUTINT32(xdrs, (int32_t *)&procnum)) ||
360 		    (!AUTH_MARSHALL(h->cl_auth, xdrs, p->cku_cred)) ||
361 		    (!(*xdr_args)(xdrs, argsp))) {
362 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__dynargs);
363 			return (CLNT_RDMA_FAIL);
364 		}
365 		p->cku_outsz = XDR_GETPOS(xdrs);
366 	} else {
367 		uint32_t *uproc = (uint32_t *)&p->cku_rpchdr[CKU_HDRSIZE];
368 		IXDR_PUT_U_INT32(uproc, procnum);
369 		(*(uint32_t *)(&p->cku_rpchdr[0])) = p->cku_xid;
370 		XDR_SETPOS(xdrs, 0);
371 
372 		/* Serialize the procedure number and the arguments. */
373 		if (!AUTH_WRAP(h->cl_auth, (caddr_t)p->cku_rpchdr,
374 		    CKU_HDRSIZE+4, xdrs, xdr_args, argsp)) {
375 			if (rpcmsg->addr != xdrs->x_base) {
376 				rpcmsg->addr = xdrs->x_base;
377 				rpcmsg->len = xdr_getbufsize(xdrs);
378 			}
379 			DTRACE_PROBE(krpc__e__clntrdma__rpcmsg__procnum);
380 			return (CLNT_RDMA_FAIL);
381 		}
382 		/*
383 		 * If we had to allocate a new buffer while encoding
384 		 * then update the addr and len.
385 		 */
386 		if (rpcmsg->addr != xdrs->x_base) {
387 			rpcmsg->addr = xdrs->x_base;
388 			rpcmsg->len = xdr_getbufsize(xdrs);
389 		}
390 
391 		p->cku_outsz = XDR_GETPOS(xdrs);
392 		DTRACE_PROBE1(krpc__i__compose__size__sec, int, p->cku_outsz)
393 	}
394 
395 	return (CLNT_RDMA_SUCCESS);
396 }
397 
398 static int
399 clnt_compose_rdma_header(CONN *conn, CLIENT *h, rdma_buf_t *clmsg,
400     XDR **xdrs, uint_t *op)
401 {
402 	cku_private_t *p = htop(h);
403 	uint_t vers;
404 	uint32_t rdma_credit = rdma_bufs_rqst;
405 
406 	vers = RPCRDMA_VERS;
407 	clmsg->type = SEND_BUFFER;
408 
409 	if (rdma_buf_alloc(conn, clmsg)) {
410 		return (CLNT_RDMA_FAIL);
411 	}
412 
413 	*xdrs = &p->cku_outxdr;
414 	xdrmem_create(*xdrs, clmsg->addr, clmsg->len, XDR_ENCODE);
415 
416 	(*(uint32_t *)clmsg->addr) = p->cku_xid;
417 	XDR_SETPOS(*xdrs, sizeof (uint32_t));
418 	(void) xdr_u_int(*xdrs, &vers);
419 	(void) xdr_u_int(*xdrs, &rdma_credit);
420 	(void) xdr_u_int(*xdrs, op);
421 
422 	return (CLNT_RDMA_SUCCESS);
423 }
424 
425 /*
426  * If xp_cl is NULL value, then the RPC payload will NOT carry
427  * an RDMA READ chunk list, in this case we insert FALSE into
428  * the XDR stream. Otherwise we use the clist and RDMA register
429  * the memory and encode the clist into the outbound XDR stream.
430  */
431 static int
432 clnt_setup_rlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
433 {
434 	int status;
435 	struct clist *rclp;
436 	int32_t xdr_flag = XDR_RDMA_RLIST_REG;
437 
438 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &rclp);
439 
440 	if (rclp != NULL) {
441 		status = clist_register(conn, rclp, CLIST_REG_SOURCE);
442 		if (status != RDMA_SUCCESS) {
443 			return (CLNT_RDMA_FAIL);
444 		}
445 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
446 	}
447 	(void) xdr_do_clist(xdrs, &rclp);
448 
449 	return (CLNT_RDMA_SUCCESS);
450 }
451 
452 /*
453  * If xp_wcl is NULL value, then the RPC payload will NOT carry
454  * an RDMA WRITE chunk list, in this case we insert FALSE into
455  * the XDR stream. Otherwise we use the clist and  RDMA register
456  * the memory and encode the clist into the outbound XDR stream.
457  */
458 static int
459 clnt_setup_wlist(CONN *conn, XDR *xdrs, XDR *call_xdrp)
460 {
461 	int status;
462 	struct clist *wlist;
463 	int32_t xdr_flag = XDR_RDMA_WLIST_REG;
464 
465 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_WLIST, &wlist);
466 
467 	if (wlist != NULL) {
468 		status = clist_register(conn, wlist, CLIST_REG_DST);
469 		if (status != RDMA_SUCCESS) {
470 			return (CLNT_RDMA_FAIL);
471 		}
472 		XDR_CONTROL(call_xdrp, XDR_RDMA_SET_FLAGS, &xdr_flag);
473 	}
474 
475 	if (!xdr_encode_wlist(xdrs, wlist))
476 		return (CLNT_RDMA_FAIL);
477 
478 	return (CLNT_RDMA_SUCCESS);
479 }
480 
481 static int
482 clnt_setup_long_reply(CONN *conn, struct clist **clpp, uint_t length)
483 {
484 	if (length == 0) {
485 		*clpp = NULL;
486 		return (CLNT_RDMA_SUCCESS);
487 	}
488 
489 	*clpp = clist_alloc();
490 
491 	(*clpp)->rb_longbuf.len = calc_length(length);
492 	(*clpp)->rb_longbuf.type = RDMA_LONG_BUFFER;
493 
494 	if (rdma_buf_alloc(conn, &((*clpp)->rb_longbuf))) {
495 		clist_free(*clpp);
496 		*clpp = NULL;
497 		return (CLNT_RDMA_FAIL);
498 	}
499 
500 	(*clpp)->u.c_daddr3 = (*clpp)->rb_longbuf.addr;
501 	(*clpp)->c_len = (*clpp)->rb_longbuf.len;
502 	(*clpp)->c_next = NULL;
503 	(*clpp)->c_dmemhandle = (*clpp)->rb_longbuf.handle;
504 
505 	if (clist_register(conn, *clpp, CLIST_REG_DST)) {
506 		DTRACE_PROBE(krpc__e__clntrdma__longrep_regbuf);
507 		rdma_buf_free(conn, &((*clpp)->rb_longbuf));
508 		clist_free(*clpp);
509 		return (CLNT_RDMA_FAIL);
510 	}
511 
512 	return (CLNT_RDMA_SUCCESS);
513 }
514 
515 /* ARGSUSED */
516 static enum clnt_stat
517 clnt_rdma_kcallit(CLIENT *h, rpcproc_t procnum, xdrproc_t xdr_args,
518     caddr_t argsp, xdrproc_t xdr_results, caddr_t resultsp,
519     struct timeval wait)
520 {
521 	cku_private_t *p = htop(h);
522 
523 	int 	try_call_again;
524 	int	refresh_attempt = AUTH_REFRESH_COUNT;
525 	int 	status;
526 	int 	msglen;
527 
528 	XDR	*call_xdrp, callxdr; /* for xdrrdma encoding the RPC call */
529 	XDR	*reply_xdrp, replyxdr; /* for xdrrdma decoding the RPC reply */
530 	XDR 	*rdmahdr_o_xdrs, *rdmahdr_i_xdrs;
531 
532 	struct rpc_msg 	reply_msg;
533 	rdma_registry_t	*m;
534 
535 	struct clist *cl_sendlist;
536 	struct clist *cl_recvlist;
537 	struct clist *cl;
538 	struct clist *cl_rpcmsg;
539 	struct clist *cl_rdma_reply;
540 	struct clist *cl_rpcreply_wlist;
541 	struct clist *cl_long_reply;
542 
543 	uint_t vers;
544 	uint_t op;
545 	uint_t off;
546 	uint32_t seg_array_len;
547 	uint_t long_reply_len;
548 	uint_t rpcsec_gss;
549 	uint_t gss_i_or_p;
550 
551 	CONN *conn = NULL;
552 	rdma_buf_t clmsg;
553 	rdma_buf_t rpcmsg;
554 	rdma_chunkinfo_lengths_t rcil;
555 
556 	clock_t	ticks;
557 	bool_t wlist_exists_reply;
558 
559 	uint32_t rdma_credit = rdma_bufs_rqst;
560 
561 	RCSTAT_INCR(rccalls);
562 
563 call_again:
564 
565 	bzero(&clmsg, sizeof (clmsg));
566 	bzero(&rpcmsg, sizeof (rpcmsg));
567 	try_call_again = 0;
568 	cl_sendlist = NULL;
569 	cl_recvlist = NULL;
570 	cl = NULL;
571 	cl_rpcmsg = NULL;
572 	cl_rdma_reply = NULL;
573 	call_xdrp = NULL;
574 	reply_xdrp = NULL;
575 	wlist_exists_reply  = FALSE;
576 	cl_rpcreply_wlist = NULL;
577 	cl_long_reply = NULL;
578 	rcil.rcil_len = 0;
579 	rcil.rcil_len_alt = 0;
580 	long_reply_len = 0;
581 
582 	rw_enter(&rdma_lock, RW_READER);
583 	m = (rdma_registry_t *)p->cku_rd_handle;
584 	if (m->r_mod_state == RDMA_MOD_INACTIVE) {
585 		/*
586 		 * If we didn't find a matching RDMA module in the registry
587 		 * then there is no transport.
588 		 */
589 		rw_exit(&rdma_lock);
590 		p->cku_err.re_status = RPC_CANTSEND;
591 		p->cku_err.re_errno = EIO;
592 		ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
593 		if (h->cl_nosignal == TRUE) {
594 			delay(ticks);
595 		} else {
596 			if (delay_sig(ticks) == EINTR) {
597 				p->cku_err.re_status = RPC_INTR;
598 				p->cku_err.re_errno = EINTR;
599 			}
600 		}
601 		return (RPC_CANTSEND);
602 	}
603 	/*
604 	 * Get unique xid
605 	 */
606 	if (p->cku_xid == 0)
607 		p->cku_xid = alloc_xid();
608 
609 	status = RDMA_GET_CONN(p->cku_rd_mod->rdma_ops, &p->cku_addr,
610 	    p->cku_addrfmly, p->cku_rd_handle, &conn);
611 	rw_exit(&rdma_lock);
612 
613 	/*
614 	 * If there is a problem with the connection reflect the issue
615 	 * back to the higher level to address, we MAY delay for a short
616 	 * period so that we are kind to the transport.
617 	 */
618 	if (conn == NULL) {
619 		/*
620 		 * Connect failed to server. Could be because of one
621 		 * of several things. In some cases we don't want
622 		 * the caller to retry immediately - delay before
623 		 * returning to caller.
624 		 */
625 		switch (status) {
626 		case RDMA_TIMEDOUT:
627 			/*
628 			 * Already timed out. No need to delay
629 			 * some more.
630 			 */
631 			p->cku_err.re_status = RPC_TIMEDOUT;
632 			p->cku_err.re_errno = ETIMEDOUT;
633 			break;
634 		case RDMA_INTR:
635 			/*
636 			 * Failed because of an signal. Very likely
637 			 * the caller will not retry.
638 			 */
639 			p->cku_err.re_status = RPC_INTR;
640 			p->cku_err.re_errno = EINTR;
641 			break;
642 		default:
643 			/*
644 			 * All other failures - server down or service
645 			 * down or temporary resource failure. Delay before
646 			 * returning to caller.
647 			 */
648 			ticks = clnt_rdma_min_delay * drv_usectohz(1000000);
649 			p->cku_err.re_status = RPC_CANTCONNECT;
650 			p->cku_err.re_errno = EIO;
651 
652 			if (h->cl_nosignal == TRUE) {
653 				delay(ticks);
654 			} else {
655 				if (delay_sig(ticks) == EINTR) {
656 					p->cku_err.re_status = RPC_INTR;
657 					p->cku_err.re_errno = EINTR;
658 				}
659 			}
660 			break;
661 		}
662 
663 		return (p->cku_err.re_status);
664 	}
665 
666 	clnt_check_credit(conn);
667 
668 	status = CLNT_RDMA_FAIL;
669 
670 	rpcsec_gss = gss_i_or_p = FALSE;
671 
672 	if (IS_RPCSEC_GSS(h)) {
673 		rpcsec_gss = TRUE;
674 		if (rpc_gss_get_service_type(h->cl_auth) ==
675 		    rpc_gss_svc_integrity ||
676 		    rpc_gss_get_service_type(h->cl_auth) ==
677 		    rpc_gss_svc_privacy)
678 			gss_i_or_p = TRUE;
679 	}
680 
681 	/*
682 	 * Try a regular RDMA message if RPCSEC_GSS is not being used
683 	 * or if RPCSEC_GSS is being used for authentication only.
684 	 */
685 	if (rpcsec_gss == FALSE ||
686 	    (rpcsec_gss == TRUE && gss_i_or_p == FALSE)) {
687 		/*
688 		 * Grab a send buffer for the request.  Try to
689 		 * encode it to see if it fits. If not, then it
690 		 * needs to be sent in a chunk.
691 		 */
692 		rpcmsg.type = SEND_BUFFER;
693 		if (rdma_buf_alloc(conn, &rpcmsg)) {
694 			DTRACE_PROBE(krpc__e__clntrdma__callit_nobufs);
695 			goto done;
696 		}
697 
698 		/* First try to encode into regular send buffer */
699 		op = RDMA_MSG;
700 
701 		call_xdrp = &callxdr;
702 
703 		xdrrdma_create(call_xdrp, rpcmsg.addr, rpcmsg.len,
704 		    rdma_minchunk, NULL, XDR_ENCODE, conn);
705 
706 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
707 		    xdr_args, argsp);
708 
709 		if (status != CLNT_RDMA_SUCCESS) {
710 			/* Clean up from previous encode attempt */
711 			rdma_buf_free(conn, &rpcmsg);
712 			XDR_DESTROY(call_xdrp);
713 		} else {
714 			XDR_CONTROL(call_xdrp, XDR_RDMA_GET_CHUNK_LEN, &rcil);
715 		}
716 	}
717 
718 	/* If the encode didn't work, then try a NOMSG */
719 	if (status != CLNT_RDMA_SUCCESS) {
720 
721 		msglen = CKU_HDRSIZE + BYTES_PER_XDR_UNIT + MAX_AUTH_BYTES +
722 		    xdr_sizeof(xdr_args, argsp);
723 
724 		msglen = calc_length(msglen);
725 
726 		/* pick up the lengths for the reply buffer needed */
727 		(void) xdrrdma_sizeof(xdr_args, argsp, 0,
728 		    &rcil.rcil_len, &rcil.rcil_len_alt);
729 
730 		/*
731 		 * Construct a clist to describe the CHUNK_BUFFER
732 		 * for the rpcmsg.
733 		 */
734 		cl_rpcmsg = clist_alloc();
735 		cl_rpcmsg->c_len = msglen;
736 		cl_rpcmsg->rb_longbuf.type = RDMA_LONG_BUFFER;
737 		cl_rpcmsg->rb_longbuf.len = msglen;
738 		if (rdma_buf_alloc(conn, &cl_rpcmsg->rb_longbuf)) {
739 			clist_free(cl_rpcmsg);
740 			goto done;
741 		}
742 		cl_rpcmsg->w.c_saddr3 = cl_rpcmsg->rb_longbuf.addr;
743 
744 		op = RDMA_NOMSG;
745 		call_xdrp = &callxdr;
746 
747 		xdrrdma_create(call_xdrp, cl_rpcmsg->rb_longbuf.addr,
748 		    cl_rpcmsg->rb_longbuf.len, 0,
749 		    cl_rpcmsg, XDR_ENCODE, conn);
750 
751 		status = clnt_compose_rpcmsg(h, procnum, &rpcmsg, call_xdrp,
752 		    xdr_args, argsp);
753 
754 		if (status != CLNT_RDMA_SUCCESS) {
755 			p->cku_err.re_status = RPC_CANTENCODEARGS;
756 			p->cku_err.re_errno = EIO;
757 			DTRACE_PROBE(krpc__e__clntrdma__callit__composemsg);
758 			goto done;
759 		}
760 	}
761 
762 	/*
763 	 * During the XDR_ENCODE we may have "allocated" an RDMA READ or
764 	 * RDMA WRITE clist.
765 	 *
766 	 * First pull the RDMA READ chunk list from the XDR private
767 	 * area to keep it handy.
768 	 */
769 	XDR_CONTROL(call_xdrp, XDR_RDMA_GET_RLIST, &cl);
770 
771 	if (gss_i_or_p) {
772 		long_reply_len = rcil.rcil_len + rcil.rcil_len_alt;
773 		long_reply_len += MAX_AUTH_BYTES;
774 	} else {
775 		long_reply_len = rcil.rcil_len;
776 	}
777 
778 	/*
779 	 * Update the chunk size information for the Long RPC msg.
780 	 */
781 	if (cl && op == RDMA_NOMSG)
782 		cl->c_len = p->cku_outsz;
783 
784 	/*
785 	 * Prepare the RDMA header. On success xdrs will hold the result
786 	 * of xdrmem_create() for a SEND_BUFFER.
787 	 */
788 	status = clnt_compose_rdma_header(conn, h, &clmsg,
789 	    &rdmahdr_o_xdrs, &op);
790 
791 	if (status != CLNT_RDMA_SUCCESS) {
792 		p->cku_err.re_status = RPC_CANTSEND;
793 		p->cku_err.re_errno = EIO;
794 		RCSTAT_INCR(rcnomem);
795 		DTRACE_PROBE(krpc__e__clntrdma__callit__nobufs2);
796 		goto done;
797 	}
798 
799 	/*
800 	 * Now insert the RDMA READ list iff present
801 	 */
802 	status = clnt_setup_rlist(conn, rdmahdr_o_xdrs, call_xdrp);
803 	if (status != CLNT_RDMA_SUCCESS) {
804 		DTRACE_PROBE(krpc__e__clntrdma__callit__clistreg);
805 		rdma_buf_free(conn, &clmsg);
806 		p->cku_err.re_status = RPC_CANTSEND;
807 		p->cku_err.re_errno = EIO;
808 		goto done;
809 	}
810 
811 	/*
812 	 * Setup RDMA WRITE chunk list for nfs read operation
813 	 * other operations will have a NULL which will result
814 	 * as a NULL list in the XDR stream.
815 	 */
816 	status = clnt_setup_wlist(conn, rdmahdr_o_xdrs, call_xdrp);
817 	if (status != CLNT_RDMA_SUCCESS) {
818 		rdma_buf_free(conn, &clmsg);
819 		p->cku_err.re_status = RPC_CANTSEND;
820 		p->cku_err.re_errno = EIO;
821 		goto done;
822 	}
823 
824 	/*
825 	 * If NULL call and RPCSEC_GSS, provide a chunk such that
826 	 * large responses can flow back to the client.
827 	 * If RPCSEC_GSS with integrity or privacy is in use, get chunk.
828 	 */
829 	if ((procnum == 0 && rpcsec_gss == TRUE) ||
830 	    (rpcsec_gss == TRUE && gss_i_or_p == TRUE))
831 		long_reply_len += 1024;
832 
833 	status = clnt_setup_long_reply(conn, &cl_long_reply, long_reply_len);
834 
835 	if (status != CLNT_RDMA_SUCCESS) {
836 		rdma_buf_free(conn, &clmsg);
837 		p->cku_err.re_status = RPC_CANTSEND;
838 		p->cku_err.re_errno = EIO;
839 		goto done;
840 	}
841 
842 	/*
843 	 * XDR encode the RDMA_REPLY write chunk
844 	 */
845 	seg_array_len = (cl_long_reply ? 1 : 0);
846 	(void) xdr_encode_reply_wchunk(rdmahdr_o_xdrs, cl_long_reply,
847 	    seg_array_len);
848 
849 	/*
850 	 * Construct a clist in "sendlist" that represents what we
851 	 * will push over the wire.
852 	 *
853 	 * Start with the RDMA header and clist (if any)
854 	 */
855 	clist_add(&cl_sendlist, 0, XDR_GETPOS(rdmahdr_o_xdrs), &clmsg.handle,
856 	    clmsg.addr, NULL, NULL);
857 
858 	/*
859 	 * Put the RPC call message in  sendlist if small RPC
860 	 */
861 	if (op == RDMA_MSG) {
862 		clist_add(&cl_sendlist, 0, p->cku_outsz, &rpcmsg.handle,
863 		    rpcmsg.addr, NULL, NULL);
864 	} else {
865 		/* Long RPC already in chunk list */
866 		RCSTAT_INCR(rclongrpcs);
867 	}
868 
869 	/*
870 	 * Set up a reply buffer ready for the reply
871 	 */
872 	status = rdma_clnt_postrecv(conn, p->cku_xid);
873 	if (status != RDMA_SUCCESS) {
874 		rdma_buf_free(conn, &clmsg);
875 		p->cku_err.re_status = RPC_CANTSEND;
876 		p->cku_err.re_errno = EIO;
877 		goto done;
878 	}
879 
880 	/*
881 	 * sync the memory for dma
882 	 */
883 	if (cl != NULL) {
884 		status = clist_syncmem(conn, cl, CLIST_REG_SOURCE);
885 		if (status != RDMA_SUCCESS) {
886 			(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
887 			rdma_buf_free(conn, &clmsg);
888 			p->cku_err.re_status = RPC_CANTSEND;
889 			p->cku_err.re_errno = EIO;
890 			goto done;
891 		}
892 	}
893 
894 	/*
895 	 * Send the RDMA Header and RPC call message to the server
896 	 */
897 	status = RDMA_SEND(conn, cl_sendlist, p->cku_xid);
898 	if (status != RDMA_SUCCESS) {
899 		(void) rdma_clnt_postrecv_remove(conn, p->cku_xid);
900 		p->cku_err.re_status = RPC_CANTSEND;
901 		p->cku_err.re_errno = EIO;
902 		goto done;
903 	}
904 
905 	/*
906 	 * RDMA plugin now owns the send msg buffers.
907 	 * Clear them out and don't free them.
908 	 */
909 	clmsg.addr = NULL;
910 	if (rpcmsg.type == SEND_BUFFER)
911 		rpcmsg.addr = NULL;
912 
913 	/*
914 	 * Recv rpc reply
915 	 */
916 	status = RDMA_RECV(conn, &cl_recvlist, p->cku_xid);
917 
918 	/*
919 	 * Now check recv status
920 	 */
921 	if (status != 0) {
922 		if (status == RDMA_INTR) {
923 			p->cku_err.re_status = RPC_INTR;
924 			p->cku_err.re_errno = EINTR;
925 			RCSTAT_INCR(rcintrs);
926 		} else if (status == RPC_TIMEDOUT) {
927 			p->cku_err.re_status = RPC_TIMEDOUT;
928 			p->cku_err.re_errno = ETIMEDOUT;
929 			RCSTAT_INCR(rctimeouts);
930 		} else {
931 			p->cku_err.re_status = RPC_CANTRECV;
932 			p->cku_err.re_errno = EIO;
933 		}
934 		goto done;
935 	}
936 
937 	/*
938 	 * Process the reply message.
939 	 *
940 	 * First the chunk list (if any)
941 	 */
942 	rdmahdr_i_xdrs = &(p->cku_inxdr);
943 	xdrmem_create(rdmahdr_i_xdrs,
944 	    (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3,
945 	    cl_recvlist->c_len, XDR_DECODE);
946 
947 	/*
948 	 * Treat xid as opaque (xid is the first entity
949 	 * in the rpc rdma message).
950 	 * Skip xid and set the xdr position accordingly.
951 	 */
952 	XDR_SETPOS(rdmahdr_i_xdrs, sizeof (uint32_t));
953 	(void) xdr_u_int(rdmahdr_i_xdrs, &vers);
954 	(void) xdr_u_int(rdmahdr_i_xdrs, &rdma_credit);
955 	(void) xdr_u_int(rdmahdr_i_xdrs, &op);
956 	(void) xdr_do_clist(rdmahdr_i_xdrs, &cl);
957 
958 	clnt_update_credit(conn, rdma_credit);
959 
960 	wlist_exists_reply = FALSE;
961 	if (! xdr_decode_wlist(rdmahdr_i_xdrs, &cl_rpcreply_wlist,
962 	    &wlist_exists_reply)) {
963 		DTRACE_PROBE(krpc__e__clntrdma__callit__wlist_decode);
964 		p->cku_err.re_status = RPC_CANTDECODERES;
965 		p->cku_err.re_errno = EIO;
966 		goto done;
967 	}
968 
969 	/*
970 	 * The server shouldn't have sent a RDMA_SEND that
971 	 * the client needs to RDMA_WRITE a reply back to
972 	 * the server.  So silently ignoring what the
973 	 * server returns in the rdma_reply section of the
974 	 * header.
975 	 */
976 	(void) xdr_decode_reply_wchunk(rdmahdr_i_xdrs, &cl_rdma_reply);
977 	off = xdr_getpos(rdmahdr_i_xdrs);
978 
979 	clnt_decode_long_reply(conn, cl_long_reply,
980 	    cl_rdma_reply, &replyxdr, &reply_xdrp,
981 	    cl, cl_recvlist, op, off);
982 
983 	if (reply_xdrp == NULL)
984 		goto done;
985 
986 	if (wlist_exists_reply) {
987 		XDR_CONTROL(reply_xdrp, XDR_RDMA_SET_WLIST, cl_rpcreply_wlist);
988 	}
989 
990 	reply_msg.rm_direction = REPLY;
991 	reply_msg.rm_reply.rp_stat = MSG_ACCEPTED;
992 	reply_msg.acpted_rply.ar_stat = SUCCESS;
993 	reply_msg.acpted_rply.ar_verf = _null_auth;
994 
995 	/*
996 	 *  xdr_results will be done in AUTH_UNWRAP.
997 	 */
998 	reply_msg.acpted_rply.ar_results.where = NULL;
999 	reply_msg.acpted_rply.ar_results.proc = xdr_void;
1000 
1001 	/*
1002 	 * Decode and validate the response.
1003 	 */
1004 	if (xdr_replymsg(reply_xdrp, &reply_msg)) {
1005 		enum clnt_stat re_status;
1006 
1007 		_seterr_reply(&reply_msg, &(p->cku_err));
1008 
1009 		re_status = p->cku_err.re_status;
1010 		if (re_status == RPC_SUCCESS) {
1011 			/*
1012 			 * Reply is good, check auth.
1013 			 */
1014 			if (!AUTH_VALIDATE(h->cl_auth,
1015 			    &reply_msg.acpted_rply.ar_verf)) {
1016 				p->cku_err.re_status = RPC_AUTHERROR;
1017 				p->cku_err.re_why = AUTH_INVALIDRESP;
1018 				RCSTAT_INCR(rcbadverfs);
1019 				DTRACE_PROBE(
1020 				    krpc__e__clntrdma__callit__authvalidate);
1021 			} else if (!AUTH_UNWRAP(h->cl_auth, reply_xdrp,
1022 			    xdr_results, resultsp)) {
1023 				p->cku_err.re_status = RPC_CANTDECODERES;
1024 				p->cku_err.re_errno = EIO;
1025 				DTRACE_PROBE(
1026 				    krpc__e__clntrdma__callit__authunwrap);
1027 			}
1028 		} else {
1029 			/* set errno in case we can't recover */
1030 			if (re_status != RPC_VERSMISMATCH &&
1031 			    re_status != RPC_AUTHERROR &&
1032 			    re_status != RPC_PROGVERSMISMATCH)
1033 				p->cku_err.re_errno = EIO;
1034 
1035 			if (re_status == RPC_AUTHERROR) {
1036 				if ((refresh_attempt > 0) &&
1037 				    AUTH_REFRESH(h->cl_auth, &reply_msg,
1038 				    p->cku_cred)) {
1039 					refresh_attempt--;
1040 					try_call_again = 1;
1041 					goto done;
1042 				}
1043 
1044 				try_call_again = 0;
1045 
1046 				/*
1047 				 * We have used the client handle to
1048 				 * do an AUTH_REFRESH and the RPC status may
1049 				 * be set to RPC_SUCCESS; Let's make sure to
1050 				 * set it to RPC_AUTHERROR.
1051 				 */
1052 				p->cku_err.re_status = RPC_AUTHERROR;
1053 
1054 				/*
1055 				 * Map recoverable and unrecoverable
1056 				 * authentication errors to appropriate
1057 				 * errno
1058 				 */
1059 				switch (p->cku_err.re_why) {
1060 				case AUTH_BADCRED:
1061 				case AUTH_BADVERF:
1062 				case AUTH_INVALIDRESP:
1063 				case AUTH_TOOWEAK:
1064 				case AUTH_FAILED:
1065 				case RPCSEC_GSS_NOCRED:
1066 				case RPCSEC_GSS_FAILED:
1067 					p->cku_err.re_errno = EACCES;
1068 					break;
1069 				case AUTH_REJECTEDCRED:
1070 				case AUTH_REJECTEDVERF:
1071 				default:
1072 					p->cku_err.re_errno = EIO;
1073 					break;
1074 				}
1075 			}
1076 			DTRACE_PROBE1(krpc__e__clntrdma__callit__rpcfailed,
1077 			    int, p->cku_err.re_why);
1078 		}
1079 	} else {
1080 		p->cku_err.re_status = RPC_CANTDECODERES;
1081 		p->cku_err.re_errno = EIO;
1082 		DTRACE_PROBE(krpc__e__clntrdma__callit__replymsg);
1083 	}
1084 
1085 done:
1086 	clnt_return_credit(conn);
1087 
1088 	if (cl_sendlist != NULL)
1089 		clist_free(cl_sendlist);
1090 
1091 	/*
1092 	 * If rpc reply is in a chunk, free it now.
1093 	 */
1094 	if (cl_long_reply) {
1095 		(void) clist_deregister(conn, cl_long_reply, CLIST_REG_DST);
1096 		rdma_buf_free(conn, &cl_long_reply->rb_longbuf);
1097 		clist_free(cl_long_reply);
1098 	}
1099 
1100 	if (call_xdrp)
1101 		XDR_DESTROY(call_xdrp);
1102 
1103 	if (reply_xdrp) {
1104 		(void) xdr_rpc_free_verifier(reply_xdrp, &reply_msg);
1105 		XDR_DESTROY(reply_xdrp);
1106 	}
1107 
1108 	if (cl_rdma_reply) {
1109 		clist_free(cl_rdma_reply);
1110 	}
1111 
1112 	if (cl_recvlist) {
1113 		rdma_buf_t	recvmsg = {0};
1114 		recvmsg.addr = (caddr_t)(uintptr_t)cl_recvlist->w.c_saddr3;
1115 		recvmsg.type = RECV_BUFFER;
1116 		RDMA_BUF_FREE(conn, &recvmsg);
1117 		clist_free(cl_recvlist);
1118 	}
1119 
1120 	RDMA_REL_CONN(conn);
1121 
1122 	if (try_call_again)
1123 		goto call_again;
1124 
1125 	if (p->cku_err.re_status != RPC_SUCCESS) {
1126 		RCSTAT_INCR(rcbadcalls);
1127 	}
1128 	return (p->cku_err.re_status);
1129 }
1130 
1131 
1132 static void
1133 clnt_decode_long_reply(CONN *conn,
1134     struct clist *cl_long_reply,
1135     struct clist *cl_rdma_reply, XDR *xdrs,
1136     XDR **rxdrp, struct clist *cl,
1137     struct clist *cl_recvlist,
1138     uint_t  op, uint_t off)
1139 {
1140 	if (op != RDMA_NOMSG) {
1141 		DTRACE_PROBE1(krpc__i__longrepl__rdmamsg__len,
1142 		    int, cl_recvlist->c_len - off);
1143 		xdrrdma_create(xdrs,
1144 		    (caddr_t)(uintptr_t)(cl_recvlist->w.c_saddr3 + off),
1145 		    cl_recvlist->c_len - off, 0, cl, XDR_DECODE, conn);
1146 		*rxdrp = xdrs;
1147 		return;
1148 	}
1149 
1150 	/* op must be RDMA_NOMSG */
1151 	if (cl) {
1152 		DTRACE_PROBE(krpc__e__clntrdma__declongreply__serverreadlist);
1153 		return;
1154 	}
1155 
1156 	if (cl_long_reply->u.c_daddr) {
1157 		DTRACE_PROBE1(krpc__i__longrepl__rdmanomsg__len,
1158 		    int, cl_rdma_reply->c_len);
1159 
1160 		xdrrdma_create(xdrs, (caddr_t)cl_long_reply->u.c_daddr3,
1161 		    cl_rdma_reply->c_len, 0, NULL, XDR_DECODE, conn);
1162 
1163 		*rxdrp = xdrs;
1164 	}
1165 }
1166 
1167 static void
1168 clnt_return_credit(CONN *conn)
1169 {
1170 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1171 
1172 	mutex_enter(&conn->c_lock);
1173 	cc_info->clnt_cc_in_flight_ops--;
1174 	cv_signal(&cc_info->clnt_cc_cv);
1175 	mutex_exit(&conn->c_lock);
1176 }
1177 
1178 static void
1179 clnt_update_credit(CONN *conn, uint32_t rdma_credit)
1180 {
1181 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1182 
1183 	/*
1184 	 * If the granted has not altered, avoid taking the
1185 	 * mutex, to essentially do nothing..
1186 	 */
1187 	if (cc_info->clnt_cc_granted_ops == rdma_credit)
1188 		return;
1189 	/*
1190 	 * Get the granted number of buffers for credit control.
1191 	 */
1192 	mutex_enter(&conn->c_lock);
1193 	cc_info->clnt_cc_granted_ops = rdma_credit;
1194 	mutex_exit(&conn->c_lock);
1195 }
1196 
1197 static void
1198 clnt_check_credit(CONN *conn)
1199 {
1200 	rdma_clnt_cred_ctrl_t *cc_info = &conn->rdma_conn_cred_ctrl_u.c_clnt_cc;
1201 
1202 	/*
1203 	 * Make sure we are not going over our allowed buffer use
1204 	 * (and make sure we have gotten a granted value before).
1205 	 */
1206 	mutex_enter(&conn->c_lock);
1207 	while (cc_info->clnt_cc_in_flight_ops >= cc_info->clnt_cc_granted_ops &&
1208 	    cc_info->clnt_cc_granted_ops != 0) {
1209 		/*
1210 		 * Client has maxed out its granted buffers due to
1211 		 * credit control.  Current handling is to block and wait.
1212 		 */
1213 		cv_wait(&cc_info->clnt_cc_cv, &conn->c_lock);
1214 	}
1215 	cc_info->clnt_cc_in_flight_ops++;
1216 	mutex_exit(&conn->c_lock);
1217 }
1218 
1219 /* ARGSUSED */
1220 static void
1221 clnt_rdma_kabort(CLIENT *h)
1222 {
1223 }
1224 
1225 static void
1226 clnt_rdma_kerror(CLIENT *h, struct rpc_err *err)
1227 {
1228 	struct cku_private *p = htop(h);
1229 	*err = p->cku_err;
1230 }
1231 
1232 static bool_t
1233 clnt_rdma_kfreeres(CLIENT *h, xdrproc_t xdr_res, caddr_t res_ptr)
1234 {
1235 	struct cku_private *p = htop(h);
1236 	XDR *xdrs;
1237 
1238 	xdrs = &(p->cku_outxdr);
1239 	xdrs->x_op = XDR_FREE;
1240 	return ((*xdr_res)(xdrs, res_ptr));
1241 }
1242 
1243 /* ARGSUSED */
1244 static bool_t
1245 clnt_rdma_kcontrol(CLIENT *h, int cmd, char *arg)
1246 {
1247 	return (TRUE);
1248 }
1249 
1250 /* ARGSUSED */
1251 static int
1252 clnt_rdma_ksettimers(CLIENT *h, struct rpc_timers *t, struct rpc_timers *all,
1253 	int minimum, void(*feedback)(int, int, caddr_t), caddr_t arg,
1254 	uint32_t xid)
1255 {
1256 	RCSTAT_INCR(rctimers);
1257 	return (0);
1258 }
1259 
1260 int
1261 rdma_reachable(int addr_type, struct netbuf *addr, struct knetconfig **knconf)
1262 {
1263 	rdma_registry_t	*rp;
1264 	void *handle = NULL;
1265 	struct knetconfig *knc;
1266 	char *pf, *p;
1267 	rdma_stat status;
1268 	int error = 0;
1269 
1270 	if (!INGLOBALZONE(curproc))
1271 		return (-1);
1272 
1273 	/*
1274 	 * modload the RDMA plugins if not already done.
1275 	 */
1276 	if (!rdma_modloaded) {
1277 		mutex_enter(&rdma_modload_lock);
1278 		if (!rdma_modloaded) {
1279 			error = rdma_modload();
1280 		}
1281 		mutex_exit(&rdma_modload_lock);
1282 		if (error)
1283 			return (-1);
1284 	}
1285 
1286 	if (!rdma_dev_available)
1287 		return (-1);
1288 
1289 	rw_enter(&rdma_lock, RW_READER);
1290 	rp = rdma_mod_head;
1291 	while (rp != NULL) {
1292 		if (rp->r_mod_state == RDMA_MOD_INACTIVE) {
1293 			rp = rp->r_next;
1294 			continue;
1295 		}
1296 		status = RDMA_REACHABLE(rp->r_mod->rdma_ops, addr_type, addr,
1297 		    &handle);
1298 		if (status == RDMA_SUCCESS) {
1299 			knc = kmem_zalloc(sizeof (struct knetconfig),
1300 			    KM_SLEEP);
1301 			knc->knc_semantics = NC_TPI_RDMA;
1302 			pf = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1303 			p = kmem_alloc(KNC_STRSIZE, KM_SLEEP);
1304 			if (addr_type == AF_INET)
1305 				(void) strncpy(pf, NC_INET, KNC_STRSIZE);
1306 			else if (addr_type == AF_INET6)
1307 				(void) strncpy(pf, NC_INET6, KNC_STRSIZE);
1308 			pf[KNC_STRSIZE - 1] = '\0';
1309 
1310 			(void) strncpy(p, rp->r_mod->rdma_api, KNC_STRSIZE);
1311 			p[KNC_STRSIZE - 1] = '\0';
1312 
1313 			knc->knc_protofmly = pf;
1314 			knc->knc_proto = p;
1315 			knc->knc_rdev = (dev_t)rp;
1316 			*knconf = knc;
1317 			rw_exit(&rdma_lock);
1318 			return (0);
1319 		}
1320 		rp = rp->r_next;
1321 	}
1322 	rw_exit(&rdma_lock);
1323 	return (-1);
1324 }
1325 
1326 static void
1327 check_dereg_wlist(CONN *conn, clist *rwc)
1328 {
1329 	int status;
1330 
1331 	if (rwc == NULL)
1332 		return;
1333 
1334 	if (rwc->c_dmemhandle.mrc_rmr && rwc->c_len) {
1335 
1336 		status = clist_deregister(conn, rwc, CLIST_REG_DST);
1337 
1338 		if (status != RDMA_SUCCESS) {
1339 			DTRACE_PROBE1(krpc__e__clntrdma__dereg_wlist,
1340 			    int, status);
1341 		}
1342 	}
1343 }
1344