xref: /illumos-gate/usr/src/uts/common/rpc/xdr_rdma.c (revision c3a9724d)
1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright 2008 Sun Microsystems, Inc.  All rights reserved.
23  * Use is subject to license terms.
24  */
25 
26 /*
27  * Copyright (c) 2007, The Ohio State University. All rights reserved.
28  *
29  * Portions of this source code is developed by the team members of
30  * The Ohio State University's Network-Based Computing Laboratory (NBCL),
31  * headed by Professor Dhabaleswar K. (DK) Panda.
32  *
33  * Acknowledgements to contributions from developors:
34  *   Ranjit Noronha: noronha@cse.ohio-state.edu
35  *   Lei Chai      : chail@cse.ohio-state.edu
36  *   Weikuan Yu    : yuw@cse.ohio-state.edu
37  *
38  */
39 
40 /*
41  * xdr_rdma.c, XDR implementation using RDMA to move large chunks
42  */
43 
44 #include <sys/param.h>
45 #include <sys/types.h>
46 #include <sys/systm.h>
47 #include <sys/kmem.h>
48 #include <sys/sdt.h>
49 #include <sys/debug.h>
50 
51 #include <rpc/types.h>
52 #include <rpc/xdr.h>
53 #include <sys/cmn_err.h>
54 #include <rpc/rpc_sztypes.h>
55 #include <rpc/rpc_rdma.h>
56 #include <sys/sysmacros.h>
57 
58 static bool_t   xdrrdma_getint32(XDR *, int32_t *);
59 static bool_t   xdrrdma_putint32(XDR *, int32_t *);
60 static bool_t   xdrrdma_getbytes(XDR *, caddr_t, int);
61 static bool_t   xdrrdma_putbytes(XDR *, caddr_t, int);
62 uint_t		xdrrdma_getpos(XDR *);
63 bool_t		xdrrdma_setpos(XDR *, uint_t);
64 static rpc_inline_t *xdrrdma_inline(XDR *, int);
65 void		xdrrdma_destroy(XDR *);
66 static bool_t   xdrrdma_control(XDR *, int, void *);
67 
68 struct xdr_ops  xdrrdmablk_ops = {
69 	xdrrdma_getbytes,
70 	xdrrdma_putbytes,
71 	xdrrdma_getpos,
72 	xdrrdma_setpos,
73 	xdrrdma_inline,
74 	xdrrdma_destroy,
75 	xdrrdma_control,
76 	xdrrdma_getint32,
77 	xdrrdma_putint32
78 };
79 
80 struct xdr_ops  xdrrdma_ops = {
81 	xdrrdma_getbytes,
82 	xdrrdma_putbytes,
83 	xdrrdma_getpos,
84 	xdrrdma_setpos,
85 	xdrrdma_inline,
86 	xdrrdma_destroy,
87 	xdrrdma_control,
88 	xdrrdma_getint32,
89 	xdrrdma_putint32
90 };
91 
92 /*
93  * A chunk list entry identifies a chunk of opaque data to be moved
94  * separately from the rest of the RPC message. xp_min_chunk = 0, is a
95  * special case for ENCODING, which means do not chunk the incoming stream of
96  * data.
97  */
98 
99 typedef struct {
100 	caddr_t		xp_offp;
101 	int		xp_min_chunk;
102 	uint_t		xp_flags;	/* Controls setting for rdma xdr */
103 	int		xp_buf_size;	/* size of xdr buffer */
104 	struct clist	*xp_rcl;		/* head of chunk list */
105 	struct clist	**xp_rcl_next;	/* location to place/find next chunk */
106 	struct clist	*xp_wcl;	/* head of write chunk list */
107 	CONN		*xp_conn;	/* connection for chunk data xfer */
108 	uint_t		xp_reply_chunk_len;
109 	/* used to track length for security modes: integrity/privacy */
110 	uint_t		xp_reply_chunk_len_alt;
111 } xrdma_private_t;
112 
113 extern kmem_cache_t *clist_cache;
114 
115 bool_t
116 xdrrdma_getrdmablk(XDR *xdrs, struct clist **rlist, uint_t *sizep,
117     CONN **conn, const uint_t maxsize)
118 {
119 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
120 	struct clist	*cle = *(xdrp->xp_rcl_next);
121 	struct clist	*cls = *(xdrp->xp_rcl_next);
122 	struct clist	*rdclist = NULL, *prev = NULL;
123 	bool_t		retval = TRUE;
124 	uint32_t	cur_offset = 0;
125 	uint32_t	total_segments = 0;
126 	uint32_t	actual_segments = 0;
127 	uint32_t	alen;
128 	uint_t		total_len;
129 
130 	ASSERT(xdrs->x_op != XDR_FREE);
131 
132 	/*
133 	 * first deal with the length since xdr bytes are counted
134 	 */
135 	if (!xdr_u_int(xdrs, sizep)) {
136 		DTRACE_PROBE(xdr__e__getrdmablk_sizep_fail);
137 		return (FALSE);
138 	}
139 	total_len = *sizep;
140 	if (total_len > maxsize) {
141 		DTRACE_PROBE2(xdr__e__getrdmablk_bad_size,
142 		    int, total_len, int, maxsize);
143 		return (FALSE);
144 	}
145 	(*conn) = xdrp->xp_conn;
146 
147 	/*
148 	 * if no data we are done
149 	 */
150 	if (total_len == 0)
151 		return (TRUE);
152 
153 	while (cle) {
154 		total_segments++;
155 		cle = cle->c_next;
156 	}
157 
158 	cle = *(xdrp->xp_rcl_next);
159 
160 	/*
161 	 * If there was a chunk at the current offset, then setup a read
162 	 * chunk list which records the destination address and length
163 	 * and will RDMA READ the data in later.
164 	 */
165 	if (cle == NULL)
166 		return (FALSE);
167 
168 	if (cle->c_xdroff != (xdrp->xp_offp - xdrs->x_base))
169 		return (FALSE);
170 
171 	/*
172 	 * Setup the chunk list with appropriate
173 	 * address (offset) and length
174 	 */
175 	for (actual_segments = 0;
176 	    actual_segments < total_segments; actual_segments++) {
177 		if (total_len <= 0)
178 			break;
179 		cle->u.c_daddr = (uint64) cur_offset;
180 		alen = 0;
181 		if (cle->c_len > total_len) {
182 			alen = cle->c_len;
183 			cle->c_len = total_len;
184 		}
185 		if (!alen)
186 			xdrp->xp_rcl_next = &cle->c_next;
187 
188 		cur_offset += cle->c_len;
189 		total_len -= cle->c_len;
190 
191 		if ((total_segments - actual_segments - 1) == 0 &&
192 		    total_len > 0) {
193 			DTRACE_PROBE(krpc__e__xdrrdma_getblk_chunktooshort);
194 			retval = FALSE;
195 		}
196 
197 		if ((total_segments - actual_segments - 1) > 0 &&
198 		    total_len == 0) {
199 			DTRACE_PROBE2(krpc__e__xdrrdma_getblk_toobig,
200 			    int, total_segments, int, actual_segments);
201 		}
202 
203 		rdclist = clist_alloc();
204 		(*rdclist) = (*cle);
205 		if ((*rlist) == NULL)
206 			(*rlist) = rdclist;
207 		if (prev == NULL)
208 			prev = rdclist;
209 		else {
210 			prev->c_next = rdclist;
211 			prev = rdclist;
212 		}
213 
214 		cle = cle->c_next;
215 	}
216 
217 out:
218 	if (prev != NULL)
219 		prev->c_next = NULL;
220 
221 	cle = cls;
222 	if (alen) {
223 		cle->w.c_saddr =
224 		    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
225 		cle->c_len = alen - cle->c_len;
226 	}
227 
228 	return (retval);
229 }
230 
231 /*
232  * The procedure xdrrdma_create initializes a stream descriptor for a memory
233  * buffer.
234  */
235 void
236 xdrrdma_create(XDR *xdrs, caddr_t addr, uint_t size,
237     int min_chunk, struct clist *cl, enum xdr_op op, CONN *conn)
238 {
239 	xrdma_private_t *xdrp;
240 	struct clist   *cle;
241 
242 	xdrs->x_op = op;
243 	xdrs->x_ops = &xdrrdma_ops;
244 	xdrs->x_base = addr;
245 	xdrs->x_handy = size;
246 	xdrs->x_public = NULL;
247 
248 	xdrp = (xrdma_private_t *)kmem_zalloc(sizeof (xrdma_private_t),
249 	    KM_SLEEP);
250 	xdrs->x_private = (caddr_t)xdrp;
251 	xdrp->xp_offp = addr;
252 	xdrp->xp_min_chunk = min_chunk;
253 	xdrp->xp_flags = 0;
254 	xdrp->xp_buf_size = size;
255 	xdrp->xp_rcl = cl;
256 	xdrp->xp_reply_chunk_len = 0;
257 	xdrp->xp_reply_chunk_len_alt = 0;
258 
259 	if (op == XDR_ENCODE && cl != NULL) {
260 		/* Find last element in chunk list and set xp_rcl_next */
261 		for (cle = cl; cle->c_next != NULL; cle = cle->c_next)
262 			continue;
263 
264 		xdrp->xp_rcl_next = &(cle->c_next);
265 	} else {
266 		xdrp->xp_rcl_next = &(xdrp->xp_rcl);
267 	}
268 
269 	xdrp->xp_wcl = NULL;
270 
271 	xdrp->xp_conn = conn;
272 	if (xdrp->xp_min_chunk != 0)
273 		xdrp->xp_flags |= XDR_RDMA_CHUNK;
274 }
275 
276 /* ARGSUSED */
277 void
278 xdrrdma_destroy(XDR * xdrs)
279 {
280 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
281 
282 	if (xdrp == NULL)
283 		return;
284 
285 	if (xdrp->xp_wcl) {
286 		if (xdrp->xp_flags & XDR_RDMA_WLIST_REG) {
287 			(void) clist_deregister(xdrp->xp_conn,
288 			    xdrp->xp_wcl, CLIST_REG_DST);
289 			rdma_buf_free(xdrp->xp_conn,
290 			    &xdrp->xp_wcl->rb_longbuf);
291 		}
292 		clist_free(xdrp->xp_wcl);
293 	}
294 
295 	if (xdrp->xp_rcl) {
296 		if (xdrp->xp_flags & XDR_RDMA_RLIST_REG) {
297 			(void) clist_deregister(xdrp->xp_conn,
298 			    xdrp->xp_rcl, CLIST_REG_SOURCE);
299 			rdma_buf_free(xdrp->xp_conn,
300 			    &xdrp->xp_rcl->rb_longbuf);
301 		}
302 		clist_free(xdrp->xp_rcl);
303 	}
304 
305 	(void) kmem_free(xdrs->x_private, sizeof (xrdma_private_t));
306 	xdrs->x_private = NULL;
307 }
308 
309 static	bool_t
310 xdrrdma_getint32(XDR *xdrs, int32_t *int32p)
311 {
312 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
313 
314 	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
315 		return (FALSE);
316 
317 	/* LINTED pointer alignment */
318 	*int32p = (int32_t)ntohl((uint32_t)(*((int32_t *)(xdrp->xp_offp))));
319 	xdrp->xp_offp += sizeof (int32_t);
320 
321 	return (TRUE);
322 }
323 
324 static	bool_t
325 xdrrdma_putint32(XDR *xdrs, int32_t *int32p)
326 {
327 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
328 
329 	if ((xdrs->x_handy -= (int)sizeof (int32_t)) < 0)
330 		return (FALSE);
331 
332 	/* LINTED pointer alignment */
333 	*(int32_t *)xdrp->xp_offp = (int32_t)htonl((uint32_t)(*int32p));
334 	xdrp->xp_offp += sizeof (int32_t);
335 
336 	return (TRUE);
337 }
338 
339 /*
340  * DECODE bytes from XDR stream for rdma.
341  * If the XDR stream contains a read chunk list,
342  * it will go through xdrrdma_getrdmablk instead.
343  */
344 static	bool_t
345 xdrrdma_getbytes(XDR *xdrs, caddr_t addr, int len)
346 {
347 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
348 	struct clist	*cle = *(xdrp->xp_rcl_next);
349 	struct clist	*cls = *(xdrp->xp_rcl_next);
350 	struct clist	cl;
351 	bool_t		retval = TRUE;
352 	uint32_t	total_len = len;
353 	uint32_t	cur_offset = 0;
354 	uint32_t	total_segments = 0;
355 	uint32_t	actual_segments = 0;
356 	uint32_t	status;
357 	uint32_t	alen;
358 
359 	while (cle) {
360 		total_segments++;
361 		cle = cle->c_next;
362 	}
363 
364 	cle = *(xdrp->xp_rcl_next);
365 	/*
366 	 * If there was a chunk at the current offset, then setup a read
367 	 * chunk list which records the destination address and length
368 	 * and will RDMA READ the data in later.
369 	 */
370 
371 	if (cle != NULL &&
372 	    cle->c_xdroff == (xdrp->xp_offp - xdrs->x_base)) {
373 		for (actual_segments = 0;
374 		    actual_segments < total_segments; actual_segments++) {
375 			if (total_len <= 0)
376 				break;
377 			cle->u.c_daddr = (uint64)(uintptr_t)addr + cur_offset;
378 			alen = 0;
379 			if (cle->c_len > total_len) {
380 				alen = cle->c_len;
381 				cle->c_len = total_len;
382 			}
383 			if (!alen)
384 				xdrp->xp_rcl_next = &cle->c_next;
385 
386 			cur_offset += cle->c_len;
387 			total_len -= cle->c_len;
388 
389 			if ((total_segments - actual_segments - 1) == 0 &&
390 			    total_len > 0) {
391 				DTRACE_PROBE(
392 				    krpc__e__xdrrdma_getbytes_chunktooshort);
393 				retval = FALSE;
394 			}
395 
396 			if ((total_segments - actual_segments - 1) > 0 &&
397 			    total_len == 0) {
398 				DTRACE_PROBE2(krpc__e__xdrrdma_getbytes_toobig,
399 				    int, total_segments, int, actual_segments);
400 			}
401 
402 			/*
403 			 * RDMA READ the chunk data from the remote end.
404 			 * First prep the destination buffer by registering
405 			 * it, then RDMA READ the chunk data. Since we are
406 			 * doing streaming memory, sync the destination
407 			 * buffer to CPU and deregister the buffer.
408 			 */
409 			if (xdrp->xp_conn == NULL) {
410 				return (FALSE);
411 			}
412 			cl = *cle;
413 			cl.c_next = NULL;
414 			if (clist_register(xdrp->xp_conn, &cl, CLIST_REG_DST)
415 			    != RDMA_SUCCESS) {
416 				return (FALSE);
417 			}
418 			cle->c_dmemhandle = cl.c_dmemhandle;
419 			cle->c_dsynchandle = cl.c_dsynchandle;
420 
421 			/*
422 			 * Now read the chunk in
423 			 */
424 			if ((total_segments - actual_segments - 1) == 0 ||
425 			    total_len == 0) {
426 				status = RDMA_READ(xdrp->xp_conn, &cl, WAIT);
427 			} else {
428 				status = RDMA_READ(xdrp->xp_conn, &cl, NOWAIT);
429 			}
430 			if (status != RDMA_SUCCESS) {
431 				DTRACE_PROBE1(
432 				    krpc__i__xdrrdma_getblk_readfailed,
433 				    int, status);
434 				retval = FALSE;
435 				goto out;
436 			}
437 			cle = cle->c_next;
438 		}
439 
440 		/*
441 		 * sync the memory for cpu
442 		 */
443 		cl = *cls;
444 		cl.c_next = NULL;
445 		cl.c_len = cur_offset;
446 		if (clist_syncmem(xdrp->xp_conn, &cl, 0) != RDMA_SUCCESS) {
447 			retval = FALSE;
448 		}
449 out:
450 		/*
451 		 * Deregister the chunks
452 		 */
453 		cle = cls;
454 		cl = *cle;
455 		cl.c_next = NULL;
456 		cl.c_len = cur_offset;
457 		(void) clist_deregister(xdrp->xp_conn, &cl, CLIST_REG_DST);
458 		if (alen) {
459 			cle->w.c_saddr =
460 			    (uint64)(uintptr_t)cle->w.c_saddr + cle->c_len;
461 			cle->c_len = alen - cle->c_len;
462 		}
463 		return (retval);
464 	}
465 	if ((xdrs->x_handy -= len) < 0)
466 		return (FALSE);
467 
468 	bcopy(xdrp->xp_offp, addr, len);
469 	xdrp->xp_offp += len;
470 
471 	return (TRUE);
472 }
473 
474 /*
475  * ENCODE some bytes into an XDR stream xp_min_chunk = 0, means the stream of
476  * bytes contain no chunks to seperate out, and if the bytes do not fit in
477  * the supplied buffer, grow the buffer and free the old buffer.
478  */
479 static	bool_t
480 xdrrdma_putbytes(XDR *xdrs, caddr_t addr, int len)
481 {
482 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
483 	/*
484 	 * Is this stream accepting chunks?
485 	 * If so, does the either of the two following conditions exist?
486 	 * - length of bytes to encode is greater than the min chunk size?
487 	 * - remaining space in this stream is shorter than length of
488 	 *   bytes to encode?
489 	 *
490 	 * If the above exists, then create a chunk for this encoding
491 	 * and save the addresses, etc.
492 	 */
493 	if (xdrp->xp_flags & XDR_RDMA_CHUNK &&
494 	    ((xdrp->xp_min_chunk != 0 &&
495 	    len >= xdrp->xp_min_chunk) ||
496 	    (xdrs->x_handy - len  < 0))) {
497 		struct clist	*cle;
498 		int		offset = xdrp->xp_offp - xdrs->x_base;
499 
500 		cle = clist_alloc();
501 		cle->c_xdroff = offset;
502 		cle->c_len = len;
503 		cle->w.c_saddr = (uint64)(uintptr_t)addr;
504 		cle->c_next = NULL;
505 
506 		*(xdrp->xp_rcl_next) = cle;
507 		xdrp->xp_rcl_next = &(cle->c_next);
508 
509 		return (TRUE);
510 	}
511 	/* Is there enough space to encode what is left? */
512 	if ((xdrs->x_handy -= len) < 0) {
513 		return (FALSE);
514 	}
515 	bcopy(addr, xdrp->xp_offp, len);
516 	xdrp->xp_offp += len;
517 
518 	return (TRUE);
519 }
520 
521 uint_t
522 xdrrdma_getpos(XDR *xdrs)
523 {
524 	xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
525 
526 	return ((uint_t)((uintptr_t)xdrp->xp_offp - (uintptr_t)xdrs->x_base));
527 }
528 
529 bool_t
530 xdrrdma_setpos(XDR *xdrs, uint_t pos)
531 {
532 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
533 
534 	caddr_t		newaddr = xdrs->x_base + pos;
535 	caddr_t		lastaddr = xdrp->xp_offp + xdrs->x_handy;
536 	ptrdiff_t	diff;
537 
538 	if (newaddr > lastaddr)
539 		return (FALSE);
540 
541 	xdrp->xp_offp = newaddr;
542 	diff = lastaddr - newaddr;
543 	xdrs->x_handy = (int)diff;
544 
545 	return (TRUE);
546 }
547 
548 /* ARGSUSED */
549 static rpc_inline_t *
550 xdrrdma_inline(XDR *xdrs, int len)
551 {
552 	rpc_inline_t	*buf = NULL;
553 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
554 	struct clist	*cle = *(xdrp->xp_rcl_next);
555 
556 	if (xdrs->x_op == XDR_DECODE) {
557 		/*
558 		 * Since chunks aren't in-line, check to see whether there is
559 		 * a chunk in the inline range.
560 		 */
561 		if (cle != NULL &&
562 		    cle->c_xdroff <= (xdrp->xp_offp - xdrs->x_base + len))
563 			return (NULL);
564 	}
565 
566 	/* LINTED pointer alignment */
567 	buf = (rpc_inline_t *)xdrp->xp_offp;
568 	if (!IS_P2ALIGNED(buf, sizeof (int32_t)))
569 		return (NULL);
570 
571 	if ((xdrs->x_handy < len) || (xdrp->xp_min_chunk != 0 &&
572 	    len >= xdrp->xp_min_chunk)) {
573 		return (NULL);
574 	} else {
575 		xdrs->x_handy -= len;
576 		xdrp->xp_offp += len;
577 		return (buf);
578 	}
579 }
580 
581 static	bool_t
582 xdrrdma_control(XDR *xdrs, int request, void *info)
583 {
584 	int32_t		*int32p;
585 	int		len, i;
586 	uint_t		in_flags;
587 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
588 	rdma_chunkinfo_t *rcip = NULL;
589 	rdma_wlist_conn_info_t *rwcip = NULL;
590 	rdma_chunkinfo_lengths_t *rcilp = NULL;
591 	struct uio *uiop;
592 	struct clist	*rwl = NULL;
593 	struct clist	*prev = NULL;
594 
595 	switch (request) {
596 	case XDR_PEEK:
597 		/*
598 		 * Return the next 4 byte unit in the XDR stream.
599 		 */
600 		if (xdrs->x_handy < sizeof (int32_t))
601 			return (FALSE);
602 
603 		int32p = (int32_t *)info;
604 		*int32p = (int32_t)ntohl((uint32_t)
605 		    (*((int32_t *)(xdrp->xp_offp))));
606 
607 		return (TRUE);
608 
609 	case XDR_SKIPBYTES:
610 		/*
611 		 * Skip the next N bytes in the XDR stream.
612 		 */
613 		int32p = (int32_t *)info;
614 		len = RNDUP((int)(*int32p));
615 		if ((xdrs->x_handy -= len) < 0)
616 			return (FALSE);
617 		xdrp->xp_offp += len;
618 
619 		return (TRUE);
620 
621 	case XDR_RDMA_SET_FLAGS:
622 		/*
623 		 * Set the flags provided in the *info in xp_flags for rdma
624 		 * xdr stream control.
625 		 */
626 		int32p = (int32_t *)info;
627 		in_flags = (uint_t)(*int32p);
628 
629 		xdrp->xp_flags |= in_flags;
630 		return (TRUE);
631 
632 	case XDR_RDMA_GET_FLAGS:
633 		/*
634 		 * Get the flags provided in xp_flags return through *info
635 		 */
636 		int32p = (int32_t *)info;
637 
638 		*int32p = (int32_t)xdrp->xp_flags;
639 		return (TRUE);
640 
641 	case XDR_RDMA_GET_CHUNK_LEN:
642 		rcilp = (rdma_chunkinfo_lengths_t *)info;
643 		rcilp->rcil_len = xdrp->xp_reply_chunk_len;
644 		rcilp->rcil_len_alt = xdrp->xp_reply_chunk_len_alt;
645 
646 		return (TRUE);
647 
648 	case XDR_RDMA_ADD_CHUNK:
649 		/*
650 		 * Store wlist information
651 		 */
652 
653 		rcip = (rdma_chunkinfo_t *)info;
654 
655 		switch (rcip->rci_type) {
656 		case RCI_WRITE_UIO_CHUNK:
657 			xdrp->xp_reply_chunk_len_alt += rcip->rci_len;
658 
659 			if (rcip->rci_len < xdrp->xp_min_chunk) {
660 				xdrp->xp_wcl = NULL;
661 				*(rcip->rci_clpp) = NULL;
662 				return (TRUE);
663 			}
664 			uiop = rcip->rci_a.rci_uiop;
665 
666 			for (i = 0; i < uiop->uio_iovcnt; i++) {
667 				rwl = clist_alloc();
668 				rwl->c_len = uiop->uio_iov[i].iov_len;
669 				rwl->u.c_daddr =
670 				    (uint64)(uintptr_t)
671 				    (uiop->uio_iov[i].iov_base);
672 				/*
673 				 * if userspace address, put adspace ptr in
674 				 * clist. If not, then do nothing since it's
675 				 * already set to NULL (from kmem_zalloc)
676 				 */
677 				if (uiop->uio_segflg == UIO_USERSPACE) {
678 					rwl->c_adspc = ttoproc(curthread)->p_as;
679 				}
680 
681 				if (prev == NULL)
682 					prev = rwl;
683 				else {
684 					prev->c_next = rwl;
685 					prev = rwl;
686 				}
687 			}
688 
689 			rwl->c_next = NULL;
690 			xdrp->xp_wcl = rwl;
691 			*(rcip->rci_clpp) = rwl;
692 
693 			break;
694 
695 		case RCI_WRITE_ADDR_CHUNK:
696 			rwl = clist_alloc();
697 
698 			rwl->c_len = rcip->rci_len;
699 			rwl->u.c_daddr3 = rcip->rci_a.rci_addr;
700 			rwl->c_next = NULL;
701 			xdrp->xp_reply_chunk_len_alt += rcip->rci_len;
702 
703 			xdrp->xp_wcl = rwl;
704 			*(rcip->rci_clpp) = rwl;
705 
706 			break;
707 
708 		case RCI_REPLY_CHUNK:
709 			xdrp->xp_reply_chunk_len += rcip->rci_len;
710 			break;
711 		}
712 		return (TRUE);
713 
714 	case XDR_RDMA_GET_WLIST:
715 		*((struct clist **)info) = xdrp->xp_wcl;
716 		return (TRUE);
717 
718 	case XDR_RDMA_SET_WLIST:
719 		xdrp->xp_wcl = (struct clist *)info;
720 		return (TRUE);
721 
722 	case XDR_RDMA_GET_RLIST:
723 		*((struct clist **)info) = xdrp->xp_rcl;
724 		return (TRUE);
725 
726 	case XDR_RDMA_GET_WCINFO:
727 		rwcip = (rdma_wlist_conn_info_t *)info;
728 
729 		rwcip->rwci_wlist = xdrp->xp_wcl;
730 		rwcip->rwci_conn = xdrp->xp_conn;
731 
732 		return (TRUE);
733 
734 	default:
735 		return (FALSE);
736 	}
737 }
738 
739 bool_t xdr_do_clist(XDR *, clist **);
740 
741 /*
742  * Not all fields in struct clist are interesting to the RPC over RDMA
743  * protocol. Only XDR the interesting fields.
744  */
745 bool_t
746 xdr_clist(XDR *xdrs, clist *objp)
747 {
748 	if (!xdr_uint32(xdrs, &objp->c_xdroff))
749 		return (FALSE);
750 	if (!xdr_uint32(xdrs, &objp->c_smemhandle.mrc_rmr))
751 		return (FALSE);
752 	if (!xdr_uint32(xdrs, &objp->c_len))
753 		return (FALSE);
754 	if (!xdr_uint64(xdrs, &objp->w.c_saddr))
755 		return (FALSE);
756 	if (!xdr_do_clist(xdrs, &objp->c_next))
757 		return (FALSE);
758 	return (TRUE);
759 }
760 
761 /*
762  * The following two functions are forms of xdr_pointer()
763  * and xdr_reference(). Since the generic versions just
764  * kmem_alloc() a new clist, we actually want to use the
765  * rdma_clist kmem_cache.
766  */
767 
768 /*
769  * Generate or free a clist structure from the
770  * kmem_cache "rdma_clist"
771  */
772 bool_t
773 xdr_ref_clist(XDR *xdrs, caddr_t *pp)
774 {
775 	caddr_t loc = *pp;
776 	bool_t stat;
777 
778 	if (loc == NULL) {
779 		switch (xdrs->x_op) {
780 		case XDR_FREE:
781 			return (TRUE);
782 
783 		case XDR_DECODE:
784 			*pp = loc = (caddr_t)clist_alloc();
785 			break;
786 
787 		case XDR_ENCODE:
788 			ASSERT(loc);
789 			break;
790 		}
791 	}
792 
793 	stat = xdr_clist(xdrs, (struct clist *)loc);
794 
795 	if (xdrs->x_op == XDR_FREE) {
796 		kmem_cache_free(clist_cache, loc);
797 		*pp = NULL;
798 	}
799 	return (stat);
800 }
801 
802 /*
803  * XDR a pointer to a possibly recursive clist. This differs
804  * with xdr_reference in that it can serialize/deserialiaze
805  * trees correctly.
806  *
807  *  What is sent is actually a union:
808  *
809  *  union object_pointer switch (boolean b) {
810  *  case TRUE: object_data data;
811  *  case FALSE: void nothing;
812  *  }
813  *
814  * > objpp: Pointer to the pointer to the object.
815  *
816  */
817 
818 bool_t
819 xdr_do_clist(XDR *xdrs, clist **objpp)
820 {
821 	bool_t more_data;
822 
823 	more_data = (*objpp != NULL);
824 	if (!xdr_bool(xdrs, &more_data))
825 		return (FALSE);
826 	if (!more_data) {
827 		*objpp = NULL;
828 		return (TRUE);
829 	}
830 	return (xdr_ref_clist(xdrs, (caddr_t *)objpp));
831 }
832 
833 uint_t
834 xdr_getbufsize(XDR *xdrs)
835 {
836 	xrdma_private_t *xdrp = (xrdma_private_t *)(xdrs->x_private);
837 
838 	return ((uint_t)xdrp->xp_buf_size);
839 }
840 
841 /* ARGSUSED */
842 bool_t
843 xdr_encode_rlist_svc(XDR *xdrs, clist *rlist)
844 {
845 	bool_t	vfalse = FALSE;
846 
847 	ASSERT(rlist == NULL);
848 	return (xdr_bool(xdrs, &vfalse));
849 }
850 
851 bool_t
852 xdr_encode_wlist(XDR *xdrs, clist *w)
853 {
854 	bool_t		vfalse = FALSE, vtrue = TRUE;
855 	int		i;
856 	uint_t		num_segment = 0;
857 	struct clist	*cl;
858 
859 	/* does a wlist exist? */
860 	if (w == NULL) {
861 		return (xdr_bool(xdrs, &vfalse));
862 	}
863 	/* Encode N consecutive segments, 1, N, HLOO, ..., HLOO, 0 */
864 	if (!xdr_bool(xdrs, &vtrue))
865 		return (FALSE);
866 
867 	for (cl = w; cl != NULL; cl = cl->c_next) {
868 		num_segment++;
869 	}
870 
871 	if (!xdr_uint32(xdrs, &num_segment))
872 		return (FALSE);
873 	for (i = 0; i < num_segment; i++) {
874 		if (!xdr_uint32(xdrs, &w->c_dmemhandle.mrc_rmr))
875 			return (FALSE);
876 
877 		if (!xdr_uint32(xdrs, &w->c_len))
878 			return (FALSE);
879 
880 		if (!xdr_uint64(xdrs, &w->u.c_daddr))
881 			return (FALSE);
882 
883 		w = w->c_next;
884 	}
885 
886 	if (!xdr_bool(xdrs, &vfalse))
887 		return (FALSE);
888 
889 	return (TRUE);
890 }
891 
892 
893 /*
894  * Conditionally decode a RDMA WRITE chunk list from XDR stream.
895  *
896  * If the next boolean in the XDR stream is false there is no
897  * RDMA WRITE chunk list present. Otherwise iterate over the
898  * array and for each entry: allocate a struct clist and decode.
899  * Pass back an indication via wlist_exists if we have seen a
900  * RDMA WRITE chunk list.
901  */
902 bool_t
903 xdr_decode_wlist(XDR *xdrs, struct clist **w, bool_t *wlist_exists)
904 {
905 	struct clist	*tmp;
906 	bool_t		more = FALSE;
907 	uint32_t	seg_array_len;
908 	uint32_t	i;
909 
910 	if (!xdr_bool(xdrs, &more))
911 		return (FALSE);
912 
913 	/* is there a wlist? */
914 	if (more == FALSE) {
915 		*wlist_exists = FALSE;
916 		return (TRUE);
917 	}
918 	*wlist_exists = TRUE;
919 
920 	if (!xdr_uint32(xdrs, &seg_array_len))
921 		return (FALSE);
922 
923 	tmp = *w = clist_alloc();
924 	for (i = 0; i < seg_array_len; i++) {
925 		if (!xdr_uint32(xdrs, &tmp->c_dmemhandle.mrc_rmr))
926 			return (FALSE);
927 		if (!xdr_uint32(xdrs, &tmp->c_len))
928 			return (FALSE);
929 		if (!xdr_uint64(xdrs, &tmp->u.c_daddr))
930 			return (FALSE);
931 		if (i < seg_array_len - 1) {
932 			tmp->c_next = clist_alloc();
933 			tmp = tmp->c_next;
934 		} else {
935 			tmp->c_next = NULL;
936 		}
937 	}
938 
939 	more = FALSE;
940 	if (!xdr_bool(xdrs, &more))
941 		return (FALSE);
942 
943 	return (TRUE);
944 }
945 
946 /*
947  * Server side RDMA WRITE list decode.
948  * XDR context is memory ops
949  */
950 bool_t
951 xdr_decode_wlist_svc(XDR *xdrs, struct clist **wclp, bool_t *wwl,
952     uint32_t *total_length, CONN *conn)
953 {
954 	struct clist	*first, *ncl;
955 	char		*memp;
956 	uint32_t	num_wclist;
957 	uint32_t	wcl_length = 0;
958 	uint32_t	i;
959 	bool_t		more = FALSE;
960 
961 	*wclp = NULL;
962 	*wwl = FALSE;
963 	*total_length = 0;
964 
965 	if (!xdr_bool(xdrs, &more)) {
966 		return (FALSE);
967 	}
968 
969 	if (more == FALSE) {
970 		return (TRUE);
971 	}
972 
973 	*wwl = TRUE;
974 
975 	if (!xdr_uint32(xdrs, &num_wclist)) {
976 		DTRACE_PROBE(krpc__e__xdrrdma__wlistsvc__listlength);
977 		return (FALSE);
978 	}
979 
980 	first = ncl = clist_alloc();
981 
982 	for (i = 0; i < num_wclist; i++) {
983 		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
984 			goto err_out;
985 		if (!xdr_uint32(xdrs, &ncl->c_len))
986 			goto err_out;
987 		if (!xdr_uint64(xdrs, &ncl->u.c_daddr))
988 			goto err_out;
989 
990 		if (ncl->c_len > MAX_SVC_XFER_SIZE) {
991 			DTRACE_PROBE(
992 			    krpc__e__xdrrdma__wlistsvc__chunklist_toobig);
993 			ncl->c_len = MAX_SVC_XFER_SIZE;
994 		}
995 
996 		wcl_length += ncl->c_len;
997 
998 		if (i < num_wclist - 1) {
999 			ncl->c_next = clist_alloc();
1000 			ncl = ncl->c_next;
1001 		}
1002 	}
1003 
1004 	if (!xdr_bool(xdrs, &more))
1005 		goto err_out;
1006 
1007 	first->rb_longbuf.type = RDMA_LONG_BUFFER;
1008 	first->rb_longbuf.len =
1009 	    wcl_length > WCL_BUF_LEN ? wcl_length : WCL_BUF_LEN;
1010 
1011 	if (rdma_buf_alloc(conn, &first->rb_longbuf)) {
1012 		clist_free(first);
1013 		return (FALSE);
1014 	}
1015 
1016 	memp = first->rb_longbuf.addr;
1017 
1018 	ncl = first;
1019 	for (i = 0; i < num_wclist; i++) {
1020 		ncl->w.c_saddr3 = (caddr_t)memp;
1021 		memp += ncl->c_len;
1022 		ncl = ncl->c_next;
1023 	}
1024 
1025 	*wclp = first;
1026 	*total_length = wcl_length;
1027 	return (TRUE);
1028 
1029 err_out:
1030 	clist_free(first);
1031 	return (FALSE);
1032 }
1033 
1034 /*
1035  * XDR decode the long reply write chunk.
1036  */
1037 bool_t
1038 xdr_decode_reply_wchunk(XDR *xdrs, struct clist **clist)
1039 {
1040 	bool_t		have_rchunk = FALSE;
1041 	struct clist	*first = NULL, *ncl = NULL;
1042 	uint32_t	num_wclist;
1043 	uint32_t	i;
1044 
1045 	if (!xdr_bool(xdrs, &have_rchunk))
1046 		return (FALSE);
1047 
1048 	if (have_rchunk == FALSE)
1049 		return (TRUE);
1050 
1051 	if (!xdr_uint32(xdrs, &num_wclist)) {
1052 		DTRACE_PROBE(krpc__e__xdrrdma__replywchunk__listlength);
1053 		return (FALSE);
1054 	}
1055 
1056 	if (num_wclist == 0) {
1057 		return (FALSE);
1058 	}
1059 
1060 	first = ncl = clist_alloc();
1061 
1062 	for (i = 0; i < num_wclist; i++) {
1063 		if (!xdr_uint32(xdrs, &ncl->c_dmemhandle.mrc_rmr))
1064 			goto err_out;
1065 		if (!xdr_uint32(xdrs, &ncl->c_len))
1066 			goto err_out;
1067 		if (!xdr_uint64(xdrs, &ncl->u.c_daddr))
1068 			goto err_out;
1069 
1070 		if (ncl->c_len > MAX_SVC_XFER_SIZE) {
1071 			DTRACE_PROBE(
1072 			    krpc__e__xdrrdma__replywchunk__chunklist_toobig);
1073 			ncl->c_len = MAX_SVC_XFER_SIZE;
1074 		}
1075 		if (!(ncl->c_dmemhandle.mrc_rmr &&
1076 		    (ncl->c_len > 0) && ncl->u.c_daddr))
1077 			DTRACE_PROBE(
1078 			    krpc__e__xdrrdma__replywchunk__invalid_segaddr);
1079 
1080 		if (i > 0) {
1081 			ncl->c_next = clist_alloc();
1082 			ncl = ncl->c_next;
1083 		}
1084 	}
1085 	*clist = first;
1086 	return (TRUE);
1087 
1088 err_out:
1089 	clist_free(first);
1090 	return (FALSE);
1091 }
1092 
1093 
1094 bool_t
1095 xdr_encode_reply_wchunk(XDR *xdrs,
1096     struct clist *cl_longreply, uint32_t seg_array_len)
1097 {
1098 	int		i;
1099 	bool_t		long_reply_exists = TRUE;
1100 	uint32_t	length;
1101 	uint64		offset;
1102 
1103 	if (seg_array_len > 0) {
1104 		if (!xdr_bool(xdrs, &long_reply_exists))
1105 			return (FALSE);
1106 		if (!xdr_uint32(xdrs, &seg_array_len))
1107 			return (FALSE);
1108 
1109 		for (i = 0; i < seg_array_len; i++) {
1110 			if (!cl_longreply)
1111 				return (FALSE);
1112 			length = cl_longreply->c_len;
1113 			offset = (uint64) cl_longreply->u.c_daddr;
1114 
1115 			if (!xdr_uint32(xdrs,
1116 			    &cl_longreply->c_dmemhandle.mrc_rmr))
1117 				return (FALSE);
1118 			if (!xdr_uint32(xdrs, &length))
1119 				return (FALSE);
1120 			if (!xdr_uint64(xdrs, &offset))
1121 				return (FALSE);
1122 			cl_longreply = cl_longreply->c_next;
1123 		}
1124 	} else {
1125 		long_reply_exists = FALSE;
1126 		if (!xdr_bool(xdrs, &long_reply_exists))
1127 			return (FALSE);
1128 	}
1129 	return (TRUE);
1130 }
1131 bool_t
1132 xdrrdma_read_from_client(struct clist **rlist, CONN **conn, uint_t count)
1133 {
1134 	struct clist	*rdclist;
1135 	struct clist	cl;
1136 	uint_t		total_len = 0;
1137 	uint32_t	status;
1138 	bool_t		retval = TRUE;
1139 
1140 	(*rlist)->rb_longbuf.type = RDMA_LONG_BUFFER;
1141 	(*rlist)->rb_longbuf.len =
1142 	    count > RCL_BUF_LEN ? count : RCL_BUF_LEN;
1143 
1144 	if (rdma_buf_alloc(*conn, &(*rlist)->rb_longbuf)) {
1145 		return (FALSE);
1146 	}
1147 
1148 	for (rdclist = *rlist;
1149 	    rdclist != NULL; rdclist = rdclist->c_next) {
1150 		total_len += rdclist->c_len;
1151 #if (defined(OBJ32)||defined(DEBUG32))
1152 		rdclist->u.c_daddr3 =
1153 		    (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
1154 		    (uint32) rdclist->u.c_daddr3);
1155 #else
1156 		rdclist->u.c_daddr3 =
1157 		    (caddr_t)((char *)(*rlist)->rb_longbuf.addr +
1158 		    (uint64) rdclist->u.c_daddr);
1159 
1160 #endif
1161 		cl = (*rdclist);
1162 		cl.c_next = NULL;
1163 
1164 		if (clist_register(*conn, &cl, CLIST_REG_DST) != RDMA_SUCCESS) {
1165 			rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
1166 			DTRACE_PROBE(
1167 			    krpc__e__xdrrdma__readfromclient__clist__reg);
1168 			return (FALSE);
1169 		}
1170 
1171 		DTRACE_PROBE1(krpc__i__xdrrdma__readfromclient__buflen,
1172 		    int, rdclist->c_len);
1173 
1174 		/*
1175 		 * Now read the chunk in
1176 		 */
1177 		if (rdclist->c_next == NULL) {
1178 			status = RDMA_READ(*conn, &cl, WAIT);
1179 		} else {
1180 			status = RDMA_READ(*conn, &cl, NOWAIT);
1181 		}
1182 		if (status != RDMA_SUCCESS) {
1183 			DTRACE_PROBE(
1184 			    krpc__e__xdrrdma__readfromclient__readfailed);
1185 			rdma_buf_free(*conn, &(*rlist)->rb_longbuf);
1186 			return (FALSE);
1187 		}
1188 	}
1189 
1190 	cl = (*(*rlist));
1191 	cl.c_next = NULL;
1192 	cl.c_len = total_len;
1193 	if (clist_syncmem(*conn, &cl, 0) != RDMA_SUCCESS) {
1194 		retval = FALSE;
1195 	}
1196 	return (retval);
1197 }
1198 
1199 bool_t
1200 xdrrdma_free_clist(CONN *conn, struct clist *clp)
1201 {
1202 	rdma_buf_free(conn, &clp->rb_longbuf);
1203 	clist_free(clp);
1204 	return (TRUE);
1205 }
1206 
1207 bool_t
1208 xdrrdma_send_read_data(XDR *xdrs, struct clist *wcl)
1209 {
1210 	int status;
1211 	xrdma_private_t	*xdrp = (xrdma_private_t *)(xdrs->x_private);
1212 	struct xdr_ops *xops = xdrrdma_xops();
1213 
1214 	/* caller is doing a sizeof */
1215 	if (xdrs->x_ops != &xdrrdma_ops || xdrs->x_ops == xops)
1216 		return (TRUE);
1217 
1218 	status = clist_register(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
1219 	if (status != RDMA_SUCCESS) {
1220 		return (FALSE);
1221 	}
1222 
1223 	status = clist_syncmem(xdrp->xp_conn, wcl, CLIST_REG_SOURCE);
1224 	if (status != RDMA_SUCCESS) {
1225 		return (FALSE);
1226 	}
1227 
1228 	status = RDMA_WRITE(xdrp->xp_conn, wcl, WAIT);
1229 	if (status != RDMA_SUCCESS) {
1230 		return (FALSE);
1231 	}
1232 
1233 	return (TRUE);
1234 }
1235