xref: /illumos-gate/usr/src/uts/common/rpc/svc_rdma.c (revision 4703203d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License, Version 1.0 only
6  * (the "License").  You may not use this file except in compliance
7  * with the License.
8  *
9  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10  * or http://www.opensolaris.org/os/licensing.
11  * See the License for the specific language governing permissions
12  * and limitations under the License.
13  *
14  * When distributing Covered Code, include this CDDL HEADER in each
15  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16  * If applicable, add the following below this CDDL HEADER, with the
17  * fields enclosed by brackets "[]" replaced with your own identifying
18  * information: Portions Copyright [yyyy] [name of copyright owner]
19  *
20  * CDDL HEADER END
21  */
22 /*
23  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24  * Use is subject to license terms.
25  */
26 /* Copyright (c) 1983, 1984, 1985, 1986, 1987, 1988, 1989 AT&T */
27 /* All Rights Reserved */
28 /*
29  * Portions of this source code were derived from Berkeley
30  * 4.3 BSD under license from the Regents of the University of
31  * California.
32  */
33 
34 #pragma ident	"%Z%%M%	%I%	%E% SMI"
35 
36 /*
37  * Server side of RPC over RDMA in the kernel.
38  */
39 
40 #include <sys/param.h>
41 #include <sys/types.h>
42 #include <sys/user.h>
43 #include <sys/sysmacros.h>
44 #include <sys/proc.h>
45 #include <sys/file.h>
46 #include <sys/errno.h>
47 #include <sys/kmem.h>
48 #include <sys/debug.h>
49 #include <sys/systm.h>
50 #include <sys/cmn_err.h>
51 #include <sys/kstat.h>
52 #include <sys/vtrace.h>
53 #include <sys/debug.h>
54 
55 #include <rpc/types.h>
56 #include <rpc/xdr.h>
57 #include <rpc/auth.h>
58 #include <rpc/clnt.h>
59 #include <rpc/rpc_msg.h>
60 #include <rpc/svc.h>
61 #include <rpc/rpc_rdma.h>
62 #include <sys/ddi.h>
63 #include <sys/sunddi.h>
64 
65 #include <inet/common.h>
66 #include <inet/ip.h>
67 #include <inet/ip6.h>
68 
69 /*
70  * RDMA transport specific data associated with SVCMASTERXPRT
71  */
72 struct rdma_data {
73 	SVCMASTERXPRT 	*rd_xprt;	/* back ptr to SVCMASTERXPRT */
74 	struct rdma_svc_data rd_data;	/* rdma data */
75 	rdma_mod_t	*r_mod;		/* RDMA module containing ops ptr */
76 };
77 
78 /*
79  * Plugin connection specific data stashed away in clone SVCXPRT
80  */
81 struct clone_rdma_data {
82 	CONN		*conn;		/* RDMA connection */
83 	rdma_buf_t	rpcbuf;		/* RPC req/resp buffer */
84 };
85 
86 #ifdef DEBUG
87 int rdma_svc_debug = 0;
88 #endif
89 
90 #define	MAXADDRLEN	128	/* max length for address mask */
91 
92 /*
93  * Routines exported through ops vector.
94  */
95 static bool_t		svc_rdma_krecv(SVCXPRT *, mblk_t *, struct rpc_msg *);
96 static bool_t		svc_rdma_ksend(SVCXPRT *, struct rpc_msg *);
97 static bool_t		svc_rdma_kgetargs(SVCXPRT *, xdrproc_t, caddr_t);
98 static bool_t		svc_rdma_kfreeargs(SVCXPRT *, xdrproc_t, caddr_t);
99 void			svc_rdma_kdestroy(SVCMASTERXPRT *);
100 static int		svc_rdma_kdup(struct svc_req *, caddr_t, int,
101 				struct dupreq **, bool_t *);
102 static void		svc_rdma_kdupdone(struct dupreq *, caddr_t,
103 				void (*)(), int, int);
104 static int32_t		*svc_rdma_kgetres(SVCXPRT *, int);
105 static void		svc_rdma_kfreeres(SVCXPRT *);
106 static void		svc_rdma_kclone_destroy(SVCXPRT *);
107 static void		svc_rdma_kstart(SVCMASTERXPRT *);
108 void			svc_rdma_kstop(SVCMASTERXPRT *);
109 
110 /*
111  * Server transport operations vector.
112  */
113 struct svc_ops rdma_svc_ops = {
114 	svc_rdma_krecv,		/* Get requests */
115 	svc_rdma_kgetargs,	/* Deserialize arguments */
116 	svc_rdma_ksend,		/* Send reply */
117 	svc_rdma_kfreeargs,	/* Free argument data space */
118 	svc_rdma_kdestroy,	/* Destroy transport handle */
119 	svc_rdma_kdup,		/* Check entry in dup req cache */
120 	svc_rdma_kdupdone,	/* Mark entry in dup req cache as done */
121 	svc_rdma_kgetres,	/* Get pointer to response buffer */
122 	svc_rdma_kfreeres,	/* Destroy pre-serialized response header */
123 	svc_rdma_kclone_destroy,	/* Destroy a clone xprt */
124 	svc_rdma_kstart		/* Tell `ready-to-receive' to rpcmod */
125 };
126 
127 /*
128  * Server statistics
129  * NOTE: This structure type is duplicated in the NFS fast path.
130  */
131 struct {
132 	kstat_named_t	rscalls;
133 	kstat_named_t	rsbadcalls;
134 	kstat_named_t	rsnullrecv;
135 	kstat_named_t	rsbadlen;
136 	kstat_named_t	rsxdrcall;
137 	kstat_named_t	rsdupchecks;
138 	kstat_named_t	rsdupreqs;
139 	kstat_named_t	rslongrpcs;
140 } rdmarsstat = {
141 	{ "calls",	KSTAT_DATA_UINT64 },
142 	{ "badcalls",	KSTAT_DATA_UINT64 },
143 	{ "nullrecv",	KSTAT_DATA_UINT64 },
144 	{ "badlen",	KSTAT_DATA_UINT64 },
145 	{ "xdrcall",	KSTAT_DATA_UINT64 },
146 	{ "dupchecks",	KSTAT_DATA_UINT64 },
147 	{ "dupreqs",	KSTAT_DATA_UINT64 },
148 	{ "longrpcs",	KSTAT_DATA_UINT64 }
149 };
150 
151 kstat_named_t *rdmarsstat_ptr = (kstat_named_t *)&rdmarsstat;
152 uint_t rdmarsstat_ndata = sizeof (rdmarsstat) / sizeof (kstat_named_t);
153 
154 #define	RSSTAT_INCR(x)	rdmarsstat.x.value.ui64++
155 
156 /*
157  * Create a transport record.
158  * The transport record, output buffer, and private data structure
159  * are allocated.  The output buffer is serialized into using xdrmem.
160  * There is one transport record per user process which implements a
161  * set of services.
162  */
163 /* ARGSUSED */
164 int
165 svc_rdma_kcreate(char *netid, SVC_CALLOUT_TABLE *sct, int id,
166 	rdma_xprt_group_t *started_xprts)
167 {
168 	int error;
169 	SVCMASTERXPRT *xprt;
170 	struct rdma_data *rd;
171 	rdma_registry_t *rmod;
172 	rdma_xprt_record_t *xprt_rec;
173 	queue_t	*q;
174 
175 	/*
176 	 * modload the RDMA plugins is not already done.
177 	 */
178 	if (!rdma_modloaded) {
179 		mutex_enter(&rdma_modload_lock);
180 		if (!rdma_modloaded) {
181 			error = rdma_modload();
182 		}
183 		mutex_exit(&rdma_modload_lock);
184 
185 		if (error)
186 			return (error);
187 	}
188 
189 	/*
190 	 * master_xprt_count is the count of master transport handles
191 	 * that were successfully created and are ready to recieve for
192 	 * RDMA based access.
193 	 */
194 	error = 0;
195 	xprt_rec = NULL;
196 	rw_enter(&rdma_lock, RW_READER);
197 	if (rdma_mod_head == NULL) {
198 		started_xprts->rtg_count = 0;
199 		rw_exit(&rdma_lock);
200 		if (rdma_dev_available)
201 			return (EPROTONOSUPPORT);
202 		else
203 			return (ENODEV);
204 	}
205 
206 	/*
207 	 * If we have reached here, then atleast one RDMA plugin has loaded.
208 	 * Create a master_xprt, make it start listenining on the device,
209 	 * if an error is generated, record it, we might need to shut
210 	 * the master_xprt.
211 	 * SVC_START() calls svc_rdma_kstart which calls plugin binding
212 	 * routines.
213 	 */
214 	for (rmod = rdma_mod_head; rmod != NULL; rmod = rmod->r_next) {
215 
216 		/*
217 		 * One SVCMASTERXPRT per RDMA plugin.
218 		 */
219 		xprt = kmem_zalloc(sizeof (*xprt), KM_SLEEP);
220 		xprt->xp_ops = &rdma_svc_ops;
221 		xprt->xp_sct = sct;
222 		xprt->xp_type = T_RDMA;
223 		mutex_init(&xprt->xp_req_lock, NULL, MUTEX_DEFAULT, NULL);
224 		mutex_init(&xprt->xp_thread_lock, NULL, MUTEX_DEFAULT, NULL);
225 		xprt->xp_req_head = (mblk_t *)0;
226 		xprt->xp_req_tail = (mblk_t *)0;
227 		xprt->xp_threads = 0;
228 		xprt->xp_detached_threads = 0;
229 
230 		rd = kmem_zalloc(sizeof (*rd), KM_SLEEP);
231 		xprt->xp_p2 = (caddr_t)rd;
232 		rd->rd_xprt = xprt;
233 		rd->r_mod = rmod->r_mod;
234 
235 		q = &rd->rd_data.q;
236 		xprt->xp_wq = q;
237 		q->q_ptr = &rd->rd_xprt;
238 		xprt->xp_netid = NULL;
239 
240 		if (netid != NULL) {
241 			xprt->xp_netid = kmem_alloc(strlen(netid) + 1,
242 						KM_SLEEP);
243 			(void) strcpy(xprt->xp_netid, netid);
244 		}
245 
246 		xprt->xp_addrmask.maxlen =
247 		    xprt->xp_addrmask.len = sizeof (struct sockaddr_in);
248 		xprt->xp_addrmask.buf =
249 		    kmem_zalloc(xprt->xp_addrmask.len, KM_SLEEP);
250 		((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_addr.s_addr =
251 		    (uint32_t)~0;
252 		((struct sockaddr_in *)xprt->xp_addrmask.buf)->sin_family =
253 		    (ushort_t)~0;
254 
255 		/*
256 		 * Each of the plugins will have their own Service ID
257 		 * to listener specific mapping, like port number for VI
258 		 * and service name for IB.
259 		 */
260 		rd->rd_data.svcid = id;
261 		error = svc_xprt_register(xprt, id);
262 		if (error) {
263 			cmn_err(CE_WARN, "svc_rdma_kcreate: svc_xprt_register"
264 				"failed");
265 			goto cleanup;
266 		}
267 
268 		SVC_START(xprt);
269 		if (!rd->rd_data.active) {
270 			svc_xprt_unregister(xprt);
271 			error = rd->rd_data.err_code;
272 			goto cleanup;
273 		}
274 
275 		/*
276 		 * This is set only when there is atleast one or more
277 		 * transports successfully created. We insert the pointer
278 		 * to the created RDMA master xprt into a separately maintained
279 		 * list. This way we can easily reference it later to cleanup,
280 		 * when NFS kRPC service pool is going away/unregistered.
281 		 */
282 		started_xprts->rtg_count ++;
283 		xprt_rec = kmem_alloc(sizeof (*xprt_rec), KM_SLEEP);
284 		xprt_rec->rtr_xprt_ptr = xprt;
285 		xprt_rec->rtr_next = started_xprts->rtg_listhead;
286 		started_xprts->rtg_listhead = xprt_rec;
287 		continue;
288 cleanup:
289 		SVC_DESTROY(xprt);
290 		if (error == RDMA_FAILED)
291 			error = EPROTONOSUPPORT;
292 	}
293 
294 	rw_exit(&rdma_lock);
295 
296 	/*
297 	 * Don't return any error even if a single plugin was started
298 	 * successfully.
299 	 */
300 	if (started_xprts->rtg_count == 0)
301 		return (error);
302 	return (0);
303 }
304 
305 /*
306  * Cleanup routine for freeing up memory allocated by
307  * svc_rdma_kcreate()
308  */
309 void
310 svc_rdma_kdestroy(SVCMASTERXPRT *xprt)
311 {
312 	struct rdma_data *rd = (struct rdma_data *)xprt->xp_p2;
313 
314 
315 	mutex_destroy(&xprt->xp_req_lock);
316 	mutex_destroy(&xprt->xp_thread_lock);
317 	kmem_free(xprt->xp_netid, strlen(xprt->xp_netid) + 1);
318 	kmem_free(rd, sizeof (*rd));
319 	kmem_free(xprt->xp_addrmask.buf, xprt->xp_addrmask.maxlen);
320 	kmem_free(xprt, sizeof (*xprt));
321 }
322 
323 
324 static void
325 svc_rdma_kstart(SVCMASTERXPRT *xprt)
326 {
327 	struct rdma_svc_data *svcdata;
328 	rdma_mod_t *rmod;
329 
330 	svcdata = &((struct rdma_data *)xprt->xp_p2)->rd_data;
331 	rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
332 
333 	/*
334 	 * Create a listener for  module at this port
335 	 */
336 
337 	(*rmod->rdma_ops->rdma_svc_listen)(svcdata);
338 }
339 
340 void
341 svc_rdma_kstop(SVCMASTERXPRT *xprt)
342 {
343 	struct rdma_svc_data *svcdata;
344 	rdma_mod_t *rmod;
345 
346 	svcdata	= &((struct rdma_data *)xprt->xp_p2)->rd_data;
347 	rmod = ((struct rdma_data *)xprt->xp_p2)->r_mod;
348 
349 	/*
350 	 * Call the stop listener routine for each plugin.
351 	 */
352 	(*rmod->rdma_ops->rdma_svc_stop)(svcdata);
353 	if (svcdata->active)
354 		cmn_err(CE_WARN, "rdma_stop: Failed to shutdown RDMA based kRPC"
355 			"  listener");
356 }
357 
358 /* ARGSUSED */
359 static void
360 svc_rdma_kclone_destroy(SVCXPRT *clone_xprt)
361 {
362 }
363 
364 static bool_t
365 svc_rdma_krecv(SVCXPRT *clone_xprt, mblk_t *mp, struct rpc_msg *msg)
366 {
367 	XDR *xdrs;
368 	rdma_stat status;
369 	struct recv_data *rdp = (struct recv_data *)mp->b_rptr;
370 	CONN *conn;
371 	struct clone_rdma_data *vd;
372 	struct clist *cl;
373 	uint_t vers, op, pos;
374 	uint32_t xid;
375 
376 	vd = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
377 	RSSTAT_INCR(rscalls);
378 	conn = rdp->conn;
379 
380 	/*
381 	 * Post a receive descriptor on this
382 	 * endpoint to ensure all packets are received.
383 	 */
384 	status = rdma_svc_postrecv(conn);
385 	if (status != RDMA_SUCCESS) {
386 		cmn_err(CE_NOTE,
387 		    "svc_rdma_krecv: rdma_svc_postrecv failed %d", status);
388 	}
389 
390 	if (rdp->status != 0) {
391 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
392 		RDMA_REL_CONN(conn);
393 		RSSTAT_INCR(rsbadcalls);
394 		freeb(mp);
395 		return (FALSE);
396 	}
397 
398 	/*
399 	 * Decode rpc message
400 	 */
401 	xdrs = &clone_xprt->xp_xdrin;
402 	xdrmem_create(xdrs, rdp->rpcmsg.addr, rdp->rpcmsg.len, XDR_DECODE);
403 
404 	/*
405 	 * Get the XID
406 	 */
407 	/*
408 	 * Treat xid as opaque (xid is the first entity
409 	 * in the rpc rdma message).
410 	 */
411 	xid = *(uint32_t *)rdp->rpcmsg.addr;
412 	/* Skip xid and set the xdr position accordingly. */
413 	XDR_SETPOS(xdrs, sizeof (uint32_t));
414 	if (! xdr_u_int(xdrs, &vers) ||
415 	    ! xdr_u_int(xdrs, &op)) {
416 		cmn_err(CE_WARN, "svc_rdma_krecv: xdr_u_int failed");
417 		XDR_DESTROY(xdrs);
418 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
419 		RDMA_REL_CONN(conn);
420 		freeb(mp);
421 		RSSTAT_INCR(rsbadcalls);
422 		return (FALSE);
423 	}
424 	if (op == RDMA_DONE) {
425 		/*
426 		 * Should not get RDMA_DONE
427 		 */
428 		freeb(mp);
429 		XDR_DESTROY(xdrs);
430 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
431 		RDMA_REL_CONN(conn);
432 		RSSTAT_INCR(rsbadcalls);
433 		return (FALSE); /* no response */
434 	}
435 
436 #ifdef DEBUG
437 	if (rdma_svc_debug)
438 		printf("svc_rdma_krecv: recv'd call xid %u\n", xid);
439 #endif
440 	/*
441 	 * Now decode the chunk list
442 	 */
443 	cl = NULL;
444 	if (! xdr_do_clist(xdrs, &cl)) {
445 		cmn_err(CE_WARN, "svc_rdma_krecv: xdr_do_clist failed");
446 	}
447 
448 	/*
449 	 * A chunk at 0 offset indicates that the RPC call message
450 	 * is in a chunk. Get the RPC call message chunk.
451 	 */
452 	if (cl != NULL && op == RDMA_NOMSG) {
453 		struct clist *cllong;	/* Long RPC chunk */
454 
455 		/* Remove RPC call message chunk from chunklist */
456 		cllong = cl;
457 		cl = cl->c_next;
458 		cllong->c_next = NULL;
459 
460 		/* Allocate and register memory for the RPC call msg chunk */
461 		cllong->c_daddr = (uint64)(uintptr_t)
462 		    kmem_alloc(cllong->c_len, KM_SLEEP);
463 		if (cllong->c_daddr == NULL) {
464 			cmn_err(CE_WARN,
465 				"svc_rdma_krecv: no memory for rpc call");
466 			XDR_DESTROY(xdrs);
467 			RDMA_BUF_FREE(conn, &rdp->rpcmsg);
468 			RDMA_REL_CONN(conn);
469 			freeb(mp);
470 			RSSTAT_INCR(rsbadcalls);
471 			clist_free(cl);
472 			clist_free(cllong);
473 			return (FALSE);
474 		}
475 		status = clist_register(conn, cllong, 0);
476 		if (status) {
477 			cmn_err(CE_WARN,
478 				"svc_rdma_krecv: clist_register failed");
479 			kmem_free((void *)(uintptr_t)cllong->c_daddr,
480 			    cllong->c_len);
481 			XDR_DESTROY(xdrs);
482 			RDMA_BUF_FREE(conn, &rdp->rpcmsg);
483 			RDMA_REL_CONN(conn);
484 			freeb(mp);
485 			RSSTAT_INCR(rsbadcalls);
486 			clist_free(cl);
487 			clist_free(cllong);
488 			return (FALSE);
489 		}
490 
491 		/*
492 		 * Now read the RPC call message in
493 		 */
494 		status = RDMA_READ(conn, cllong, WAIT);
495 		if (status) {
496 			cmn_err(CE_WARN,
497 			    "svc_rdma_krecv: rdma_read failed %d", status);
498 			(void) clist_deregister(conn, cllong, 0);
499 			kmem_free((void *)(uintptr_t)cllong->c_daddr,
500 			    cllong->c_len);
501 			XDR_DESTROY(xdrs);
502 			RDMA_BUF_FREE(conn, &rdp->rpcmsg);
503 			RDMA_REL_CONN(conn);
504 			freeb(mp);
505 			RSSTAT_INCR(rsbadcalls);
506 			clist_free(cl);
507 			clist_free(cllong);
508 			return (FALSE);
509 		}
510 		/*
511 		 * Sync memory for CPU after DMA
512 		 */
513 		status = clist_syncmem(conn, cllong, 0);
514 
515 		/*
516 		 * Deregister the chunk
517 		 */
518 		(void) clist_deregister(conn, cllong, 0);
519 
520 		/*
521 		 * Setup the XDR for the RPC call message
522 		 */
523 		xdrrdma_create(xdrs, (caddr_t)(uintptr_t)cllong->c_daddr,
524 		    cllong->c_len, 0, cl, XDR_DECODE, conn);
525 		vd->rpcbuf.type = CHUNK_BUFFER;
526 		vd->rpcbuf.addr = (caddr_t)(uintptr_t)cllong->c_daddr;
527 		vd->rpcbuf.len = cllong->c_len;
528 		vd->rpcbuf.handle.mrc_rmr = 0;
529 
530 		/*
531 		 * Free the chunk element with the Long RPC details and
532 		 * the message received.
533 		 */
534 		clist_free(cllong);
535 		RDMA_BUF_FREE(conn, &rdp->rpcmsg);
536 	} else {
537 		pos = XDR_GETPOS(xdrs);
538 
539 		/*
540 		 * Now the RPC call message header
541 		 */
542 		xdrrdma_create(xdrs, rdp->rpcmsg.addr + pos,
543 			rdp->rpcmsg.len - pos, 0, cl, XDR_DECODE, conn);
544 		vd->rpcbuf = rdp->rpcmsg;
545 	}
546 	if (! xdr_callmsg(xdrs, msg)) {
547 		cmn_err(CE_WARN, "svc_rdma_krecv: xdr_callmsg failed");
548 		if (cl != NULL)
549 			clist_free(cl);
550 		XDR_DESTROY(xdrs);
551 		rdma_buf_free(conn, &vd->rpcbuf);
552 		RDMA_REL_CONN(conn);
553 		freeb(mp);
554 		RSSTAT_INCR(rsxdrcall);
555 		RSSTAT_INCR(rsbadcalls);
556 		return (FALSE);
557 	}
558 
559 	/*
560 	 * Point the remote transport address in the service_transport
561 	 * handle at the address in the request.
562 	 */
563 	clone_xprt->xp_rtaddr.buf = conn->c_raddr.buf;
564 	clone_xprt->xp_rtaddr.len = conn->c_raddr.len;
565 	clone_xprt->xp_rtaddr.maxlen = conn->c_raddr.len;
566 
567 #ifdef DEBUG
568 	if (rdma_svc_debug) {
569 		struct sockaddr_in *sin4;
570 		char print_addr[INET_ADDRSTRLEN];
571 
572 		sin4 = (struct sockaddr_in *)clone_xprt->xp_rtaddr.buf;
573 		bzero(print_addr, INET_ADDRSTRLEN);
574 		(void) inet_ntop(AF_INET,
575 		    &sin4->sin_addr, print_addr, INET_ADDRSTRLEN);
576 		cmn_err(CE_NOTE,
577 		    "svc_rdma_krecv: remote clnt_addr: %s", print_addr);
578 	}
579 #endif
580 
581 	clone_xprt->xp_xid = xid;
582 	vd->conn = conn;
583 	freeb(mp);
584 	return (TRUE);
585 }
586 
587 /*
588  * Send rpc reply.
589  */
590 static bool_t
591 svc_rdma_ksend(SVCXPRT *clone_xprt, struct rpc_msg *msg)
592 {
593 	struct clone_rdma_data *vd;
594 	XDR *xdrs = &(clone_xprt->xp_xdrout), rxdrs;
595 	int retval = FALSE;
596 	xdrproc_t xdr_results;
597 	caddr_t xdr_location;
598 	bool_t has_args, reg = FALSE;
599 	uint_t len, op;
600 	uint_t vers;
601 	struct clist *cl = NULL, *cle = NULL;
602 	struct clist *sendlist = NULL;
603 	int status;
604 	int msglen;
605 	rdma_buf_t clmsg, longreply, rpcreply;
606 
607 	vd = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
608 
609 	/*
610 	 * If there is a result procedure specified in the reply message,
611 	 * it will be processed in the xdr_replymsg and SVCAUTH_WRAP.
612 	 * We need to make sure it won't be processed twice, so we null
613 	 * it for xdr_replymsg here.
614 	 */
615 	has_args = FALSE;
616 	if (msg->rm_reply.rp_stat == MSG_ACCEPTED &&
617 	    msg->rm_reply.rp_acpt.ar_stat == SUCCESS) {
618 		if ((xdr_results = msg->acpted_rply.ar_results.proc) != NULL) {
619 			has_args = TRUE;
620 			xdr_location = msg->acpted_rply.ar_results.where;
621 			msg->acpted_rply.ar_results.proc = xdr_void;
622 			msg->acpted_rply.ar_results.where = NULL;
623 		}
624 	}
625 
626 	/*
627 	 * Get the size of the rpc reply message. Need this
628 	 * to determine if the rpc reply message will fit in
629 	 * the pre-allocated RDMA buffers. If the rpc reply
630 	 * message length is greater that the pre-allocated
631 	 * buffers then, a one time use buffer is allocated
632 	 * and registered for this rpc reply.
633 	 */
634 	msglen = xdr_sizeof(xdr_replymsg, msg);
635 	if (has_args && msg->rm_reply.rp_acpt.ar_verf.oa_flavor != RPCSEC_GSS) {
636 		msglen += xdrrdma_sizeof(xdr_results, xdr_location,
637 				rdma_minchunk);
638 		if (msglen > RPC_MSG_SZ) {
639 
640 			/*
641 			 * Allocate chunk buffer for rpc reply
642 			 */
643 			rpcreply.type = CHUNK_BUFFER;
644 			rpcreply.addr = kmem_zalloc(msglen, KM_SLEEP);
645 			cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
646 			cle->c_xdroff = 0;
647 			cle->c_len  = rpcreply.len = msglen;
648 			cle->c_saddr = (uint64)(uintptr_t)rpcreply.addr;
649 			cle->c_next = NULL;
650 			xdrrdma_create(xdrs, rpcreply.addr, msglen,
651 			    rdma_minchunk, cle, XDR_ENCODE, NULL);
652 			op = RDMA_NOMSG;
653 		} else {
654 			/*
655 			 * Get a pre-allocated buffer for rpc reply
656 			 */
657 			rpcreply.type = SEND_BUFFER;
658 			if (RDMA_BUF_ALLOC(vd->conn, &rpcreply)) {
659 				cmn_err(CE_WARN,
660 				    "svc_rdma_ksend: no free buffers!");
661 				return (retval);
662 			}
663 			xdrrdma_create(xdrs, rpcreply.addr, rpcreply.len,
664 			    rdma_minchunk, NULL, XDR_ENCODE, NULL);
665 			op = RDMA_MSG;
666 		}
667 
668 		/*
669 		 * Initialize the XDR encode stream.
670 		 */
671 		msg->rm_xid = clone_xprt->xp_xid;
672 
673 		if (!(xdr_replymsg(xdrs, msg) &&
674 		    (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, xdrs,
675 		    xdr_results, xdr_location)))) {
676 			rdma_buf_free(vd->conn, &rpcreply);
677 			if (cle)
678 				clist_free(cle);
679 			cmn_err(CE_WARN,
680 			    "svc_rdma_ksend: xdr_replymsg/SVCAUTH_WRAP "
681 			    "failed");
682 			goto out;
683 		}
684 		len = XDR_GETPOS(xdrs);
685 	}
686 	if (has_args && msg->rm_reply.rp_acpt.ar_verf.oa_flavor == RPCSEC_GSS) {
687 
688 		/*
689 		 * For RPCSEC_GSS since we cannot accurately presize the
690 		 * buffer required for encoding, we assume that its going
691 		 * to be a Long RPC to start with. We also create the
692 		 * the XDR stream with min_chunk set to 0 which instructs
693 		 * the XDR layer to not chunk the incoming byte stream.
694 		 */
695 		msglen += 2 * MAX_AUTH_BYTES + 2 * sizeof (struct opaque_auth);
696 		msglen += xdr_sizeof(xdr_results, xdr_location);
697 
698 		/*
699 		 * Long RPC. Allocate one time use custom buffer.
700 		 */
701 		longreply.type = CHUNK_BUFFER;
702 		longreply.addr = kmem_zalloc(msglen, KM_SLEEP);
703 		cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
704 		cle->c_xdroff = 0;
705 		cle->c_len  = longreply.len = msglen;
706 		cle->c_saddr = (uint64)(uintptr_t)longreply.addr;
707 		cle->c_next = NULL;
708 		xdrrdma_create(xdrs, longreply.addr, msglen, 0, cle,
709 		    XDR_ENCODE, NULL);
710 		op = RDMA_NOMSG;
711 		/*
712 		 * Initialize the XDR encode stream.
713 		 */
714 		msg->rm_xid = clone_xprt->xp_xid;
715 
716 		if (!(xdr_replymsg(xdrs, msg) &&
717 		    (!has_args || SVCAUTH_WRAP(&clone_xprt->xp_auth, xdrs,
718 		    xdr_results, xdr_location)))) {
719 			if (longreply.addr != xdrs->x_base) {
720 				longreply.addr = xdrs->x_base;
721 				longreply.len = xdr_getbufsize(xdrs);
722 			}
723 			rdma_buf_free(vd->conn, &longreply);
724 			if (cle)
725 				clist_free(cle);
726 			cmn_err(CE_WARN,
727 			    "svc_rdma_ksend: xdr_replymsg/SVCAUTH_WRAP "
728 			    "failed");
729 			goto out;
730 		}
731 
732 		/*
733 		 * If we had to allocate a new buffer while encoding
734 		 * then update the addr and len.
735 		 */
736 		if (longreply.addr != xdrs->x_base) {
737 			longreply.addr = xdrs->x_base;
738 			longreply.len = xdr_getbufsize(xdrs);
739 		}
740 
741 		len = XDR_GETPOS(xdrs);
742 
743 		/*
744 		 * If it so happens that the encoded message is after all
745 		 * not long enough to be a Long RPC then allocate a
746 		 * SEND_BUFFER and copy the encoded message into it.
747 		 */
748 		if (len > RPC_MSG_SZ) {
749 			rpcreply.type = CHUNK_BUFFER;
750 			rpcreply.addr = longreply.addr;
751 			rpcreply.len = longreply.len;
752 		} else {
753 			clist_free(cle);
754 			XDR_DESTROY(xdrs);
755 			/*
756 			 * Get a pre-allocated buffer for rpc reply
757 			 */
758 			rpcreply.type = SEND_BUFFER;
759 			if (RDMA_BUF_ALLOC(vd->conn, &rpcreply)) {
760 				cmn_err(CE_WARN,
761 				    "svc_rdma_ksend: no free buffers!");
762 				rdma_buf_free(vd->conn, &longreply);
763 				return (retval);
764 			}
765 			bcopy(longreply.addr, rpcreply.addr, len);
766 			xdrrdma_create(xdrs, rpcreply.addr, len, 0, NULL,
767 			    XDR_ENCODE, NULL);
768 			rdma_buf_free(vd->conn, &longreply);
769 			op = RDMA_MSG;
770 		}
771 	}
772 
773 	if (has_args == FALSE) {
774 
775 		if (msglen > RPC_MSG_SZ) {
776 
777 			/*
778 			 * Allocate chunk buffer for rpc reply
779 			 */
780 			rpcreply.type = CHUNK_BUFFER;
781 			rpcreply.addr = kmem_zalloc(msglen, KM_SLEEP);
782 			cle = kmem_zalloc(sizeof (*cle), KM_SLEEP);
783 			cle->c_xdroff = 0;
784 			cle->c_len  = rpcreply.len = msglen;
785 			cle->c_saddr = (uint64)(uintptr_t)rpcreply.addr;
786 			cle->c_next = NULL;
787 			xdrrdma_create(xdrs, rpcreply.addr, msglen,
788 			    rdma_minchunk, cle, XDR_ENCODE, NULL);
789 			op = RDMA_NOMSG;
790 		} else {
791 			/*
792 			 * Get a pre-allocated buffer for rpc reply
793 			 */
794 			rpcreply.type = SEND_BUFFER;
795 			if (RDMA_BUF_ALLOC(vd->conn, &rpcreply)) {
796 				cmn_err(CE_WARN,
797 				    "svc_rdma_ksend: no free buffers!");
798 				return (retval);
799 			}
800 			xdrrdma_create(xdrs, rpcreply.addr, rpcreply.len,
801 			    rdma_minchunk, NULL, XDR_ENCODE, NULL);
802 			op = RDMA_MSG;
803 		}
804 
805 		/*
806 		 * Initialize the XDR encode stream.
807 		 */
808 		msg->rm_xid = clone_xprt->xp_xid;
809 
810 		if (!xdr_replymsg(xdrs, msg)) {
811 			rdma_buf_free(vd->conn, &rpcreply);
812 			if (cle)
813 				clist_free(cle);
814 			cmn_err(CE_WARN,
815 			    "svc_rdma_ksend: xdr_replymsg/SVCAUTH_WRAP "
816 			    "failed");
817 			goto out;
818 		}
819 		len = XDR_GETPOS(xdrs);
820 	}
821 
822 	/*
823 	 * Get clist and a buffer for sending it across
824 	 */
825 	cl = xdrrdma_clist(xdrs);
826 	clmsg.type = SEND_BUFFER;
827 	if (RDMA_BUF_ALLOC(vd->conn, &clmsg)) {
828 		rdma_buf_free(vd->conn, &rpcreply);
829 		cmn_err(CE_WARN, "svc_rdma_ksend: no free buffers!!");
830 		goto out;
831 	}
832 
833 	/*
834 	 * Now register the chunks in the list
835 	 */
836 	if (cl != NULL) {
837 		status = clist_register(vd->conn, cl, 1);
838 		if (status != RDMA_SUCCESS) {
839 			rdma_buf_free(vd->conn, &clmsg);
840 			cmn_err(CE_WARN,
841 				"svc_rdma_ksend: clist register failed");
842 			goto out;
843 		}
844 		reg = TRUE;
845 	}
846 
847 	/*
848 	 * XDR the XID, vers, and op
849 	 */
850 	/*
851 	 * Treat xid as opaque (xid is the first entity
852 	 * in the rpc rdma message).
853 	 */
854 	vers = RPCRDMA_VERS;
855 	xdrs = &rxdrs;
856 	xdrmem_create(xdrs, clmsg.addr, clmsg.len, XDR_ENCODE);
857 	(*(uint32_t *)clmsg.addr) = msg->rm_xid;
858 	/* Skip xid and set the xdr position accordingly. */
859 	XDR_SETPOS(xdrs, sizeof (uint32_t));
860 	if (! xdr_u_int(xdrs, &vers) ||
861 	    ! xdr_u_int(xdrs, &op)) {
862 		rdma_buf_free(vd->conn, &rpcreply);
863 		rdma_buf_free(vd->conn, &clmsg);
864 		cmn_err(CE_WARN, "svc_rdma_ksend: xdr_u_int failed");
865 		goto out;
866 	}
867 
868 	/*
869 	 * Now XDR the chunk list
870 	 */
871 	(void) xdr_do_clist(xdrs, &cl);
872 
873 	clist_add(&sendlist, 0, XDR_GETPOS(xdrs), &clmsg.handle, clmsg.addr,
874 		NULL, NULL);
875 
876 	if (op == RDMA_MSG) {
877 		clist_add(&sendlist, 0, len, &rpcreply.handle, rpcreply.addr,
878 			NULL, NULL);
879 	} else {
880 		cl->c_len = len;
881 		RSSTAT_INCR(rslongrpcs);
882 	}
883 
884 	/*
885 	 * Send the reply message to the client
886 	 */
887 	if (cl != NULL) {
888 		status = clist_syncmem(vd->conn, cl, 1);
889 		if (status != RDMA_SUCCESS) {
890 			rdma_buf_free(vd->conn, &rpcreply);
891 			rdma_buf_free(vd->conn, &clmsg);
892 			goto out;
893 		}
894 #ifdef DEBUG
895 	if (rdma_svc_debug)
896 		printf("svc_rdma_ksend: chunk response len %d xid %u\n",
897 			cl->c_len, msg->rm_xid);
898 #endif
899 		/*
900 		 * Post a receive buffer because we expect a RDMA_DONE
901 		 * message.
902 		 */
903 		status = rdma_svc_postrecv(vd->conn);
904 
905 		/*
906 		 * Send the RPC reply message and wait for RDMA_DONE
907 		 */
908 		status = RDMA_SEND_RESP(vd->conn, sendlist, msg->rm_xid);
909 		if (status != RDMA_SUCCESS) {
910 #ifdef DEBUG
911 			if (rdma_svc_debug)
912 				cmn_err(CE_NOTE, "svc_rdma_ksend: "
913 					"rdma_send_resp failed %d", status);
914 #endif
915 			goto out;
916 		}
917 #ifdef DEBUG
918 	if (rdma_svc_debug)
919 		printf("svc_rdma_ksend: got RDMA_DONE xid %u\n", msg->rm_xid);
920 #endif
921 	} else {
922 #ifdef DEBUG
923 	if (rdma_svc_debug)
924 		printf("svc_rdma_ksend: msg response xid %u\n", msg->rm_xid);
925 #endif
926 		status = RDMA_SEND(vd->conn, sendlist, msg->rm_xid);
927 		if (status != RDMA_SUCCESS) {
928 #ifdef DEBUG
929 			if (rdma_svc_debug)
930 				cmn_err(CE_NOTE, "svc_rdma_ksend: "
931 					"rdma_send failed %d", status);
932 #endif
933 			goto out;
934 		}
935 	}
936 
937 	retval = TRUE;
938 out:
939 	/*
940 	 * Deregister the chunks
941 	 */
942 	if (cl != NULL) {
943 		if (reg)
944 			(void) clist_deregister(vd->conn, cl, 1);
945 		if (op == RDMA_NOMSG) {
946 			/*
947 			 * Long RPC reply in chunk. Free it up.
948 			 */
949 			rdma_buf_free(vd->conn, &rpcreply);
950 		}
951 		clist_free(cl);
952 	}
953 
954 	/*
955 	 * Free up sendlist chunks
956 	 */
957 	if (sendlist != NULL)
958 		clist_free(sendlist);
959 
960 	/*
961 	 * Destroy private data for xdr rdma
962 	 */
963 	XDR_DESTROY(&(clone_xprt->xp_xdrout));
964 
965 	/*
966 	 * This is completely disgusting.  If public is set it is
967 	 * a pointer to a structure whose first field is the address
968 	 * of the function to free that structure and any related
969 	 * stuff.  (see rrokfree in nfs_xdr.c).
970 	 */
971 	if (xdrs->x_public) {
972 		/* LINTED pointer alignment */
973 		(**((int (**)())xdrs->x_public))(xdrs->x_public);
974 	}
975 
976 	return (retval);
977 }
978 
979 /*
980  * Deserialize arguments.
981  */
982 static bool_t
983 svc_rdma_kgetargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args, caddr_t args_ptr)
984 {
985 	if ((SVCAUTH_UNWRAP(&clone_xprt->xp_auth, &clone_xprt->xp_xdrin,
986 	    xdr_args, args_ptr)) != TRUE)
987 		return (FALSE);
988 	return (TRUE);
989 }
990 
991 static bool_t
992 svc_rdma_kfreeargs(SVCXPRT *clone_xprt, xdrproc_t xdr_args,
993     caddr_t args_ptr)
994 {
995 	struct clone_rdma_data *vd;
996 	bool_t retval;
997 
998 	vd = (struct clone_rdma_data *)clone_xprt->xp_p2buf;
999 	if (args_ptr) {
1000 		XDR	*xdrs = &clone_xprt->xp_xdrin;
1001 		struct clist *cl;
1002 
1003 		cl = xdrrdma_clist(xdrs);
1004 		if (cl != NULL)
1005 			clist_free(cl);
1006 
1007 		xdrs->x_op = XDR_FREE;
1008 		retval = (*xdr_args)(xdrs, args_ptr);
1009 	}
1010 	XDR_DESTROY(&(clone_xprt->xp_xdrin));
1011 	rdma_buf_free(vd->conn, &vd->rpcbuf);
1012 	RDMA_REL_CONN(vd->conn);
1013 	return (retval);
1014 }
1015 
1016 /* ARGSUSED */
1017 static int32_t *
1018 svc_rdma_kgetres(SVCXPRT *clone_xprt, int size)
1019 {
1020 	return (NULL);
1021 }
1022 
1023 /* ARGSUSED */
1024 static void
1025 svc_rdma_kfreeres(SVCXPRT *clone_xprt)
1026 {
1027 }
1028 
1029 /*
1030  * the dup cacheing routines below provide a cache of non-failure
1031  * transaction id's.  rpc service routines can use this to detect
1032  * retransmissions and re-send a non-failure response.
1033  */
1034 
1035 /*
1036  * MAXDUPREQS is the number of cached items.  It should be adjusted
1037  * to the service load so that there is likely to be a response entry
1038  * when the first retransmission comes in.
1039  */
1040 #define	MAXDUPREQS	1024
1041 
1042 /*
1043  * This should be appropriately scaled to MAXDUPREQS.
1044  */
1045 #define	DRHASHSZ	257
1046 
1047 #if ((DRHASHSZ & (DRHASHSZ - 1)) == 0)
1048 #define	XIDHASH(xid)	((xid) & (DRHASHSZ - 1))
1049 #else
1050 #define	XIDHASH(xid)	((xid) % DRHASHSZ)
1051 #endif
1052 #define	DRHASH(dr)	XIDHASH((dr)->dr_xid)
1053 #define	REQTOXID(req)	((req)->rq_xprt->xp_xid)
1054 
1055 static int	rdmandupreqs = 0;
1056 static int	rdmamaxdupreqs = MAXDUPREQS;
1057 static kmutex_t rdmadupreq_lock;
1058 static struct dupreq *rdmadrhashtbl[DRHASHSZ];
1059 static int	rdmadrhashstat[DRHASHSZ];
1060 
1061 static void unhash(struct dupreq *);
1062 
1063 /*
1064  * rdmadrmru points to the head of a circular linked list in lru order.
1065  * rdmadrmru->dr_next == drlru
1066  */
1067 struct dupreq *rdmadrmru;
1068 
1069 /*
1070  * svc_rdma_kdup searches the request cache and returns 0 if the
1071  * request is not found in the cache.  If it is found, then it
1072  * returns the state of the request (in progress or done) and
1073  * the status or attributes that were part of the original reply.
1074  */
1075 static int
1076 svc_rdma_kdup(struct svc_req *req, caddr_t res, int size, struct dupreq **drpp,
1077 	bool_t *dupcachedp)
1078 {
1079 	struct dupreq *dr;
1080 	uint32_t xid;
1081 	uint32_t drhash;
1082 	int status;
1083 
1084 	xid = REQTOXID(req);
1085 	mutex_enter(&rdmadupreq_lock);
1086 	RSSTAT_INCR(rsdupchecks);
1087 	/*
1088 	 * Check to see whether an entry already exists in the cache.
1089 	 */
1090 	dr = rdmadrhashtbl[XIDHASH(xid)];
1091 	while (dr != NULL) {
1092 		if (dr->dr_xid == xid &&
1093 		    dr->dr_proc == req->rq_proc &&
1094 		    dr->dr_prog == req->rq_prog &&
1095 		    dr->dr_vers == req->rq_vers &&
1096 		    dr->dr_addr.len == req->rq_xprt->xp_rtaddr.len &&
1097 		    bcmp((caddr_t)dr->dr_addr.buf,
1098 		    (caddr_t)req->rq_xprt->xp_rtaddr.buf,
1099 		    dr->dr_addr.len) == 0) {
1100 			status = dr->dr_status;
1101 			if (status == DUP_DONE) {
1102 				bcopy(dr->dr_resp.buf, res, size);
1103 				if (dupcachedp != NULL)
1104 					*dupcachedp = (dr->dr_resfree != NULL);
1105 			} else {
1106 				dr->dr_status = DUP_INPROGRESS;
1107 				*drpp = dr;
1108 			}
1109 			RSSTAT_INCR(rsdupreqs);
1110 			mutex_exit(&rdmadupreq_lock);
1111 			return (status);
1112 		}
1113 		dr = dr->dr_chain;
1114 	}
1115 
1116 	/*
1117 	 * There wasn't an entry, either allocate a new one or recycle
1118 	 * an old one.
1119 	 */
1120 	if (rdmandupreqs < rdmamaxdupreqs) {
1121 		dr = kmem_alloc(sizeof (*dr), KM_NOSLEEP);
1122 		if (dr == NULL) {
1123 			mutex_exit(&rdmadupreq_lock);
1124 			return (DUP_ERROR);
1125 		}
1126 		dr->dr_resp.buf = NULL;
1127 		dr->dr_resp.maxlen = 0;
1128 		dr->dr_addr.buf = NULL;
1129 		dr->dr_addr.maxlen = 0;
1130 		if (rdmadrmru) {
1131 			dr->dr_next = rdmadrmru->dr_next;
1132 			rdmadrmru->dr_next = dr;
1133 		} else {
1134 			dr->dr_next = dr;
1135 		}
1136 		rdmandupreqs++;
1137 	} else {
1138 		dr = rdmadrmru->dr_next;
1139 		while (dr->dr_status == DUP_INPROGRESS) {
1140 			dr = dr->dr_next;
1141 			if (dr == rdmadrmru->dr_next) {
1142 				cmn_err(CE_WARN, "svc_rdma_kdup no slots free");
1143 				mutex_exit(&rdmadupreq_lock);
1144 				return (DUP_ERROR);
1145 			}
1146 		}
1147 		unhash(dr);
1148 		if (dr->dr_resfree) {
1149 			(*dr->dr_resfree)(dr->dr_resp.buf);
1150 		}
1151 	}
1152 	dr->dr_resfree = NULL;
1153 	rdmadrmru = dr;
1154 
1155 	dr->dr_xid = REQTOXID(req);
1156 	dr->dr_prog = req->rq_prog;
1157 	dr->dr_vers = req->rq_vers;
1158 	dr->dr_proc = req->rq_proc;
1159 	if (dr->dr_addr.maxlen < req->rq_xprt->xp_rtaddr.len) {
1160 		if (dr->dr_addr.buf != NULL)
1161 			kmem_free(dr->dr_addr.buf, dr->dr_addr.maxlen);
1162 		dr->dr_addr.maxlen = req->rq_xprt->xp_rtaddr.len;
1163 		dr->dr_addr.buf = kmem_alloc(dr->dr_addr.maxlen, KM_NOSLEEP);
1164 		if (dr->dr_addr.buf == NULL) {
1165 			dr->dr_addr.maxlen = 0;
1166 			dr->dr_status = DUP_DROP;
1167 			mutex_exit(&rdmadupreq_lock);
1168 			return (DUP_ERROR);
1169 		}
1170 	}
1171 	dr->dr_addr.len = req->rq_xprt->xp_rtaddr.len;
1172 	bcopy(req->rq_xprt->xp_rtaddr.buf, dr->dr_addr.buf, dr->dr_addr.len);
1173 	if (dr->dr_resp.maxlen < size) {
1174 		if (dr->dr_resp.buf != NULL)
1175 			kmem_free(dr->dr_resp.buf, dr->dr_resp.maxlen);
1176 		dr->dr_resp.maxlen = (unsigned int)size;
1177 		dr->dr_resp.buf = kmem_alloc(size, KM_NOSLEEP);
1178 		if (dr->dr_resp.buf == NULL) {
1179 			dr->dr_resp.maxlen = 0;
1180 			dr->dr_status = DUP_DROP;
1181 			mutex_exit(&rdmadupreq_lock);
1182 			return (DUP_ERROR);
1183 		}
1184 	}
1185 	dr->dr_status = DUP_INPROGRESS;
1186 
1187 	drhash = (uint32_t)DRHASH(dr);
1188 	dr->dr_chain = rdmadrhashtbl[drhash];
1189 	rdmadrhashtbl[drhash] = dr;
1190 	rdmadrhashstat[drhash]++;
1191 	mutex_exit(&rdmadupreq_lock);
1192 	*drpp = dr;
1193 	return (DUP_NEW);
1194 }
1195 
1196 /*
1197  * svc_rdma_kdupdone marks the request done (DUP_DONE or DUP_DROP)
1198  * and stores the response.
1199  */
1200 static void
1201 svc_rdma_kdupdone(struct dupreq *dr, caddr_t res, void (*dis_resfree)(),
1202 	int size, int status)
1203 {
1204 	ASSERT(dr->dr_resfree == NULL);
1205 	if (status == DUP_DONE) {
1206 		bcopy(res, dr->dr_resp.buf, size);
1207 		dr->dr_resfree = dis_resfree;
1208 	}
1209 	dr->dr_status = status;
1210 }
1211 
1212 /*
1213  * This routine expects that the mutex, rdmadupreq_lock, is already held.
1214  */
1215 static void
1216 unhash(struct dupreq *dr)
1217 {
1218 	struct dupreq *drt;
1219 	struct dupreq *drtprev = NULL;
1220 	uint32_t drhash;
1221 
1222 	ASSERT(MUTEX_HELD(&rdmadupreq_lock));
1223 
1224 	drhash = (uint32_t)DRHASH(dr);
1225 	drt = rdmadrhashtbl[drhash];
1226 	while (drt != NULL) {
1227 		if (drt == dr) {
1228 			rdmadrhashstat[drhash]--;
1229 			if (drtprev == NULL) {
1230 				rdmadrhashtbl[drhash] = drt->dr_chain;
1231 			} else {
1232 				drtprev->dr_chain = drt->dr_chain;
1233 			}
1234 			return;
1235 		}
1236 		drtprev = drt;
1237 		drt = drt->dr_chain;
1238 	}
1239 }
1240