1 /*
2  * CDDL HEADER START
3  *
4  * The contents of this file are subject to the terms of the
5  * Common Development and Distribution License (the "License").
6  * You may not use this file except in compliance with the License.
7  *
8  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
9  * or http://www.opensolaris.org/os/licensing.
10  * See the License for the specific language governing permissions
11  * and limitations under the License.
12  *
13  * When distributing Covered Code, include this CDDL HEADER in each
14  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
15  * If applicable, add the following below this CDDL HEADER, with the
16  * fields enclosed by brackets "[]" replaced with your own identifying
17  * information: Portions Copyright [yyyy] [name of copyright owner]
18  *
19  * CDDL HEADER END
20  */
21 /*
22  * Copyright (c) 2010, Oracle and/or its affiliates. All rights reserved.
23  */
24 
25 /*
26  * Copyright (c) 2006 Oracle.  All rights reserved.
27  *
28  * This software is available to you under a choice of one of two
29  * licenses.  You may choose to be licensed under the terms of the GNU
30  * General Public License (GPL) Version 2, available from the file
31  * COPYING in the main directory of this source tree, or the
32  * OpenIB.org BSD license below:
33  *
34  *     Redistribution and use in source and binary forms, with or
35  *     without modification, are permitted provided that the following
36  *     conditions are met:
37  *
38  *      - Redistributions of source code must retain the above
39  *        copyright notice, this list of conditions and the following
40  *        disclaimer.
41  *
42  *      - Redistributions in binary form must reproduce the above
43  *        copyright notice, this list of conditions and the following
44  *        disclaimer in the documentation and/or other materials
45  *        provided with the distribution.
46  *
47  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
48  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
49  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
50  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
51  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
52  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
53  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
54  * SOFTWARE.
55  *
56  */
57 #include <sys/types.h>
58 #include <sys/kmem.h>
59 #include <sys/cpuvar.h>
60 #include <sys/rds.h>
61 
62 #include <sys/ib/clients/rdsv3/rdsv3.h>
63 #include <sys/ib/clients/rdsv3/ib.h>
64 #include <sys/ib/clients/rdsv3/rdsv3_debug.h>
65 
66 static struct kmem_cache *rdsv3_ib_incoming_slab;
67 static struct kmem_cache *rdsv3_ib_frag_slab;
68 static atomic_t	rdsv3_ib_allocation = ATOMIC_INIT(0);
69 
70 static void
71 rdsv3_ib_frag_drop_page(struct rdsv3_page_frag *frag)
72 {
73 	RDSV3_DPRINTF5("rdsv3_ib_frag_drop_page",
74 	    "frag %p page %p offset %d", frag, frag->f_page, frag->f_offset);
75 	kmem_free(frag->f_page, PAGE_SIZE);
76 	frag->f_page = NULL;
77 }
78 
79 static void
80 rdsv3_ib_frag_free(struct rdsv3_page_frag *frag)
81 {
82 	RDSV3_DPRINTF5("rdsv3_ib_frag_free", "frag %p page %p",
83 	    frag, frag->f_page);
84 	ASSERT(frag->f_page == NULL);
85 	kmem_cache_free(rdsv3_ib_frag_slab, frag);
86 }
87 
88 /*
89  * We map a page at a time.  Its fragments are posted in order.  This
90  * is called in fragment order as the fragments get send completion events.
91  * Only the last frag in the page performs the unmapping.
92  *
93  * It's OK for ring cleanup to call this in whatever order it likes because
94  * DMA is not in flight and so we can unmap while other ring entries still
95  * hold page references in their frags.
96  */
97 static void
98 rdsv3_ib_recv_unmap_page(struct rdsv3_ib_connection *ic,
99     struct rdsv3_ib_recv_work *recv)
100 {
101 	struct rdsv3_page_frag *frag = recv->r_frag;
102 
103 #if 0
104 	RDSV3_DPRINTF5("rdsv3_ib_recv_unmap_page",
105 	    "recv %p frag %p page %p\n", recv, frag, frag->f_page);
106 #endif
107 	if (frag->f_mapped) {
108 		(void) ibt_unmap_mem_iov(
109 		    ib_get_ibt_hca_hdl(ic->i_cm_id->device), frag->f_mapped);
110 		frag->f_mapped = 0;
111 	}
112 }
113 
114 void
115 rdsv3_ib_recv_init_ring(struct rdsv3_ib_connection *ic)
116 {
117 	struct rdsv3_ib_recv_work *recv;
118 	struct rdsv3_header *hdrp;
119 	uint32_t i;
120 
121 	RDSV3_DPRINTF4("rdsv3_ib_recv_init_ring", "ic: %p", ic);
122 
123 	hdrp = ic->i_recv_hdrs;
124 	for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
125 		recv->r_ibinc = NULL;
126 		recv->r_frag = NULL;
127 
128 		recv->r_wr.recv.wr_id = i;
129 
130 		/* initialize the hdr sgl permanently */
131 		recv->r_sge[0].ds_va = (ib_vaddr_t)(uintptr_t)hdrp++;
132 		recv->r_sge[0].ds_len = sizeof (struct rdsv3_header);
133 		recv->r_sge[0].ds_key = ic->i_mr->lkey;
134 	}
135 }
136 
137 static void
138 rdsv3_ib_recv_clear_one(struct rdsv3_ib_connection *ic,
139     struct rdsv3_ib_recv_work *recv)
140 {
141 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_one", "ic: %p, recv: %p",
142 	    ic, recv);
143 
144 	if (recv->r_ibinc) {
145 		rdsv3_inc_put(&recv->r_ibinc->ii_inc);
146 		recv->r_ibinc = NULL;
147 	}
148 	if (recv->r_frag) {
149 		rdsv3_ib_recv_unmap_page(ic, recv);
150 		if (recv->r_frag->f_page)
151 			rdsv3_ib_frag_drop_page(recv->r_frag);
152 		rdsv3_ib_frag_free(recv->r_frag);
153 		recv->r_frag = NULL;
154 	}
155 
156 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_one", "Return: ic: %p, recv: %p",
157 	    ic, recv);
158 }
159 
160 void
161 rdsv3_ib_recv_clear_ring(struct rdsv3_ib_connection *ic)
162 {
163 	uint32_t i;
164 
165 	RDSV3_DPRINTF4("rdsv3_ib_recv_clear_ring", "ic: %p", ic);
166 
167 	for (i = 0; i < ic->i_recv_ring.w_nr; i++)
168 		rdsv3_ib_recv_clear_one(ic, &ic->i_recvs[i]);
169 
170 	if (ic->i_frag.f_page)
171 		rdsv3_ib_frag_drop_page(&ic->i_frag);
172 }
173 
174 static int
175 rdsv3_ib_recv_refill_one(struct rdsv3_connection *conn,
176     struct rdsv3_ib_recv_work *recv,
177     int kptr_gfp, int page_gfp)
178 {
179 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
180 	ibt_mi_hdl_t mi_hdl;
181 	ibt_iov_attr_t iov_attr;
182 	ibt_iov_t iov_arr[1];
183 	int ret = -ENOMEM;
184 
185 	RDSV3_DPRINTF5("rdsv3_ib_recv_refill_one", "conn: %p, recv: %p",
186 	    conn, recv);
187 
188 	if (recv->r_ibinc == NULL) {
189 		if (atomic_add_32_nv(&rdsv3_ib_allocation, 1) >
190 		    rdsv3_ib_sysctl_max_recv_allocation) {
191 			atomic_add_32(&rdsv3_ib_allocation, -1);
192 			rdsv3_ib_stats_inc(s_ib_rx_alloc_limit);
193 			goto out;
194 		}
195 		recv->r_ibinc = kmem_cache_alloc(rdsv3_ib_incoming_slab,
196 		    kptr_gfp);
197 		if (recv->r_ibinc == NULL) {
198 			atomic_add_32(&rdsv3_ib_allocation, -1);
199 			goto out;
200 		}
201 		list_create(&recv->r_ibinc->ii_frags,
202 		    sizeof (struct rdsv3_page_frag),
203 		    offsetof(struct rdsv3_page_frag, f_item));
204 		rdsv3_inc_init(&recv->r_ibinc->ii_inc, conn, conn->c_faddr);
205 	}
206 
207 	if (recv->r_frag == NULL) {
208 		recv->r_frag = kmem_cache_alloc(rdsv3_ib_frag_slab, kptr_gfp);
209 		if (recv->r_frag == NULL)
210 			goto out;
211 		list_link_init(&recv->r_frag->f_item);
212 		recv->r_frag->f_page = NULL;
213 	}
214 
215 	if (ic->i_frag.f_page == NULL) {
216 		ic->i_frag.f_page = kmem_alloc(PAGE_SIZE, page_gfp);
217 		if (ic->i_frag.f_page == NULL)
218 			goto out;
219 		ic->i_frag.f_offset = 0;
220 	}
221 
222 	iov_attr.iov_as = NULL;
223 	iov_attr.iov = &iov_arr[0];
224 	iov_attr.iov_buf = NULL;
225 	iov_attr.iov_list_len = 1;
226 	iov_attr.iov_wr_nds = 1;
227 	iov_attr.iov_lso_hdr_sz = 0;
228 	iov_attr.iov_flags = IBT_IOV_SLEEP | IBT_IOV_RECV;
229 
230 	/* Data */
231 	iov_arr[0].iov_addr = ic->i_frag.f_page + ic->i_frag.f_offset;
232 	iov_arr[0].iov_len = RDSV3_FRAG_SIZE;
233 
234 	/*
235 	 * Header comes from pre-registered buffer, so don't map it.
236 	 * Map the data only and stick in the header sgl quietly after
237 	 * the call.
238 	 */
239 	recv->r_wr.recv.wr_sgl = &recv->r_sge[1];
240 	recv->r_wr.recv.wr_nds = 1;
241 
242 	ret = ibt_map_mem_iov(ib_get_ibt_hca_hdl(ic->i_cm_id->device),
243 	    &iov_attr, &recv->r_wr, &mi_hdl);
244 	if (ret != IBT_SUCCESS) {
245 		RDSV3_DPRINTF2("rdsv3_ib_recv_refill_one",
246 		    "ibt_map_mem_iov failed: %d", ret);
247 		goto out;
248 	}
249 
250 	/* stick in the header */
251 	recv->r_wr.recv.wr_sgl = &recv->r_sge[0];
252 	recv->r_wr.recv.wr_nds = RDSV3_IB_RECV_SGE;
253 
254 	/*
255 	 * Once we get the RDSV3_PAGE_LAST_OFF frag then rdsv3_ib_frag_unmap()
256 	 * must be called on this recv.  This happens as completions hit
257 	 * in order or on connection shutdown.
258 	 */
259 	recv->r_frag->f_page = ic->i_frag.f_page;
260 	recv->r_frag->f_offset = ic->i_frag.f_offset;
261 	recv->r_frag->f_mapped = mi_hdl;
262 
263 	if (ic->i_frag.f_offset < RDSV3_PAGE_LAST_OFF) {
264 		ic->i_frag.f_offset += RDSV3_FRAG_SIZE;
265 	} else {
266 		ic->i_frag.f_page = NULL;
267 		ic->i_frag.f_offset = 0;
268 	}
269 
270 	ret = 0;
271 
272 	RDSV3_DPRINTF5("rdsv3_ib_recv_refill_one", "Return: conn: %p, recv: %p",
273 	    conn, recv);
274 out:
275 	return (ret);
276 }
277 
278 /*
279  * This tries to allocate and post unused work requests after making sure that
280  * they have all the allocations they need to queue received fragments into
281  * sockets.  The i_recv_mutex is held here so that ring_alloc and _unalloc
282  * pairs don't go unmatched.
283  *
284  * -1 is returned if posting fails due to temporary resource exhaustion.
285  */
286 int
287 rdsv3_ib_recv_refill(struct rdsv3_connection *conn, int kptr_gfp,
288     int page_gfp, int prefill)
289 {
290 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
291 	struct rdsv3_ib_recv_work *recv;
292 	unsigned int succ_wr;
293 	unsigned int posted = 0;
294 	int ret = 0;
295 	uint32_t pos;
296 
297 	RDSV3_DPRINTF4("rdsv3_ib_recv_refill", "conn: %p, prefill: %d",
298 	    conn, prefill);
299 
300 	while ((prefill || rdsv3_conn_up(conn)) &&
301 	    rdsv3_ib_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
302 		if (pos >= ic->i_recv_ring.w_nr) {
303 			RDSV3_DPRINTF2("rdsv3_ib_recv_refill",
304 			    "Argh - ring alloc returned pos=%u",
305 			    pos);
306 			ret = -EINVAL;
307 			break;
308 		}
309 
310 		recv = &ic->i_recvs[pos];
311 		ret = rdsv3_ib_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
312 		if (ret) {
313 			ret = -1;
314 			break;
315 		}
316 
317 		/* XXX when can this fail? */
318 		ret = ibt_post_recv(ib_get_ibt_channel_hdl(ic->i_cm_id),
319 		    &recv->r_wr.recv, 1, &succ_wr);
320 		RDSV3_DPRINTF5("rdsv3_ib_recv_refill",
321 		    "recv %p ibinc %p frag %p ret %d\n", recv,
322 		    recv->r_ibinc, recv->r_frag, ret);
323 		if (ret) {
324 			RDSV3_DPRINTF2("rdsv3_ib_recv_refill",
325 			    "Return: conn: %p, posted: %d", conn, ret);
326 			rdsv3_conn_drop(conn);
327 			ret = -1;
328 			break;
329 		}
330 
331 		posted++;
332 	}
333 
334 	/* We're doing flow control - update the window. */
335 	if (ic->i_flowctl && posted)
336 		rdsv3_ib_advertise_credits(conn, posted);
337 
338 	if (ret)
339 		rdsv3_ib_ring_unalloc(&ic->i_recv_ring, 1);
340 
341 	RDSV3_DPRINTF4("rdsv3_ib_recv_refill", "Return: conn: %p, posted: %d",
342 	    conn, posted);
343 	return (ret);
344 }
345 
346 void
347 rdsv3_ib_inc_purge(struct rdsv3_incoming *inc)
348 {
349 	struct rdsv3_ib_incoming *ibinc;
350 	struct rdsv3_page_frag *frag;
351 	struct rdsv3_page_frag *pos;
352 
353 	RDSV3_DPRINTF4("rdsv3_ib_inc_purge", "inc: %p", inc);
354 
355 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
356 	RDSV3_DPRINTF5("rdsv3_ib_inc_purge",
357 	    "purging ibinc %p inc %p\n", ibinc, inc);
358 
359 	RDSV3_FOR_EACH_LIST_NODE_SAFE(frag, pos, &ibinc->ii_frags, f_item) {
360 		list_remove_node(&frag->f_item);
361 		rdsv3_ib_frag_drop_page(frag);
362 		rdsv3_ib_frag_free(frag);
363 	}
364 
365 	RDSV3_DPRINTF4("rdsv3_ib_inc_purge", "Return: inc: %p", inc);
366 }
367 
368 void
369 rdsv3_ib_inc_free(struct rdsv3_incoming *inc)
370 {
371 	struct rdsv3_ib_incoming *ibinc;
372 
373 	RDSV3_DPRINTF4("rdsv3_ib_inc_free", "inc: %p", inc);
374 
375 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
376 
377 	rdsv3_ib_inc_purge(inc);
378 	RDSV3_DPRINTF5("rdsv3_ib_inc_free", "freeing ibinc %p inc %p",
379 	    ibinc, inc);
380 	ASSERT(list_is_empty(&ibinc->ii_frags));
381 	kmem_cache_free(rdsv3_ib_incoming_slab, ibinc);
382 	atomic_dec_uint(&rdsv3_ib_allocation);
383 
384 	RDSV3_DPRINTF4("rdsv3_ib_inc_free", "Return: inc: %p", inc);
385 }
386 
387 int
388 rdsv3_ib_inc_copy_to_user(struct rdsv3_incoming *inc, uio_t *uiop,
389     size_t size)
390 {
391 	struct rdsv3_ib_incoming *ibinc;
392 	struct rdsv3_page_frag *frag;
393 	unsigned long to_copy;
394 	unsigned long frag_off = 0;
395 	int copied = 0;
396 	int ret;
397 	uint32_t len;
398 
399 	ibinc = container_of(inc, struct rdsv3_ib_incoming, ii_inc);
400 	frag = list_head(&ibinc->ii_frags);
401 	len = ntohl(inc->i_hdr.h_len);
402 
403 	RDSV3_DPRINTF4("rdsv3_ib_inc_copy_to_user", "inc: %p, size: %d len: %d",
404 	    inc, size, len);
405 
406 	while (copied < size && copied < len) {
407 		if (frag_off == RDSV3_FRAG_SIZE) {
408 			frag = list_next(&ibinc->ii_frags, frag);
409 			frag_off = 0;
410 		}
411 
412 		to_copy = min(len - copied, RDSV3_FRAG_SIZE - frag_off);
413 		to_copy = min(size - copied, to_copy);
414 
415 		RDSV3_DPRINTF5("rdsv3_ib_inc_copy_to_user",
416 		    "%lu bytes to user %p from frag [%p, %u] + %lu",
417 		    to_copy, uiop,
418 		    frag->f_page, frag->f_offset, frag_off);
419 
420 		ret = uiomove((caddr_t)(frag->f_page +
421 		    frag->f_offset + frag_off),
422 		    to_copy, UIO_READ, uiop);
423 		if (ret) {
424 			RDSV3_DPRINTF2("rdsv3_ib_inc_copy_to_user",
425 			    "uiomove (%d) returned: %d", to_copy, ret);
426 			break;
427 		}
428 
429 		frag_off += to_copy;
430 		copied += to_copy;
431 	}
432 
433 	RDSV3_DPRINTF4("rdsv3_ib_inc_copy_to_user",
434 	    "Return: inc: %p, copied: %d", inc, copied);
435 
436 	return (copied);
437 }
438 
439 /* ic starts out kmem_zalloc()ed */
440 void
441 rdsv3_ib_recv_init_ack(struct rdsv3_ib_connection *ic)
442 {
443 	ibt_send_wr_t *wr = &ic->i_ack_wr;
444 	ibt_wr_ds_t *sge = &ic->i_ack_sge;
445 
446 	RDSV3_DPRINTF4("rdsv3_ib_recv_init_ack", "ic: %p", ic);
447 
448 	sge->ds_va = ic->i_ack_dma;
449 	sge->ds_len = sizeof (struct rdsv3_header);
450 	sge->ds_key = ic->i_mr->lkey;
451 
452 	wr->wr_sgl = sge;
453 	wr->wr_nds = 1;
454 	wr->wr_opcode = IBT_WRC_SEND;
455 	wr->wr_id = RDSV3_IB_ACK_WR_ID;
456 	wr->wr_flags = IBT_WR_SEND_SIGNAL | IBT_WR_SEND_SOLICIT;
457 }
458 
459 /*
460  * You'd think that with reliable IB connections you wouldn't need to ack
461  * messages that have been received.  The problem is that IB hardware generates
462  * an ack message before it has DMAed the message into memory.  This creates a
463  * potential message loss if the HCA is disabled for any reason between when it
464  * sends the ack and before the message is DMAed and processed.  This is only a
465  * potential issue if another HCA is available for fail-over.
466  *
467  * When the remote host receives our ack they'll free the sent message from
468  * their send queue.  To decrease the latency of this we always send an ack
469  * immediately after we've received messages.
470  *
471  * For simplicity, we only have one ack in flight at a time.  This puts
472  * pressure on senders to have deep enough send queues to absorb the latency of
473  * a single ack frame being in flight.  This might not be good enough.
474  *
475  * This is implemented by have a long-lived send_wr and sge which point to a
476  * statically allocated ack frame.  This ack wr does not fall under the ring
477  * accounting that the tx and rx wrs do.  The QP attribute specifically makes
478  * room for it beyond the ring size.  Send completion notices its special
479  * wr_id and avoids working with the ring in that case.
480  */
481 static void
482 rdsv3_ib_set_ack(struct rdsv3_ib_connection *ic, uint64_t seq,
483     int ack_required)
484 {
485 	RDSV3_DPRINTF4("rdsv3_ib_set_ack", "ic: %p, seq: %lld ack: %d",
486 	    ic, seq, ack_required);
487 
488 	mutex_enter(&ic->i_ack_lock);
489 	ic->i_ack_next = seq;
490 	if (ack_required)
491 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
492 	mutex_exit(&ic->i_ack_lock);
493 }
494 
495 static uint64_t
496 rdsv3_ib_get_ack(struct rdsv3_ib_connection *ic)
497 {
498 	uint64_t seq;
499 
500 	RDSV3_DPRINTF4("rdsv3_ib_get_ack", "ic: %p", ic);
501 
502 	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
503 
504 	mutex_enter(&ic->i_ack_lock);
505 	seq = ic->i_ack_next;
506 	mutex_exit(&ic->i_ack_lock);
507 
508 	return (seq);
509 }
510 
511 static void
512 rdsv3_ib_send_ack(struct rdsv3_ib_connection *ic, unsigned int adv_credits)
513 {
514 	struct rdsv3_header *hdr = ic->i_ack;
515 	uint64_t seq;
516 	int ret;
517 
518 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "ic: %p adv_credits: %d",
519 	    ic, adv_credits);
520 
521 	seq = rdsv3_ib_get_ack(ic);
522 
523 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "send_ack: ic %p ack %llu",
524 	    ic, (unsigned long long) seq);
525 	rdsv3_message_populate_header(hdr, 0, 0, 0);
526 	hdr->h_ack = htonll(seq);
527 	hdr->h_credit = adv_credits;
528 	rdsv3_message_make_checksum(hdr);
529 	ic->i_ack_queued = jiffies;
530 
531 	ret = ibt_post_send(RDSV3_QP2CHANHDL(ic->i_cm_id->qp), &ic->i_ack_wr, 1,
532 	    NULL);
533 	if (ret) {
534 		/*
535 		 * Failed to send. Release the WR, and
536 		 * force another ACK.
537 		 */
538 		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
539 		set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
540 		rdsv3_ib_stats_inc(s_ib_ack_send_failure);
541 #if 1
542 		RDSV3_DPRINTF2("rdsv3_ib_send_ack", "ibt_post_send FAIL");
543 #else
544 		/* Need to finesse this later. */
545 		RDSV3_PANIC();
546 #endif
547 	} else {
548 		rdsv3_ib_stats_inc(s_ib_ack_sent);
549 	}
550 	RDSV3_DPRINTF4("rdsv3_ib_send_ack", "Return: ic: %p adv_credits: %d",
551 	    ic, adv_credits);
552 }
553 
554 /*
555  * There are 3 ways of getting acknowledgements to the peer:
556  *  1.	We call rdsv3_ib_attempt_ack from the recv completion handler
557  *	to send an ACK-only frame.
558  *	However, there can be only one such frame in the send queue
559  *	at any time, so we may have to postpone it.
560  *  2.	When another (data) packet is transmitted while there's
561  *	an ACK in the queue, we piggyback the ACK sequence number
562  *	on the data packet.
563  *  3.	If the ACK WR is done sending, we get called from the
564  *	send queue completion handler, and check whether there's
565  *	another ACK pending (postponed because the WR was on the
566  *	queue). If so, we transmit it.
567  *
568  * We maintain 2 variables:
569  *  -	i_ack_flags, which keeps track of whether the ACK WR
570  *	is currently in the send queue or not (IB_ACK_IN_FLIGHT)
571  *  -	i_ack_next, which is the last sequence number we received
572  *
573  * Potentially, send queue and receive queue handlers can run concurrently.
574  * It would be nice to not have to use a spinlock to synchronize things,
575  * but the one problem that rules this out is that 64bit updates are
576  * not atomic on all platforms. Things would be a lot simpler if
577  * we had atomic64 or maybe cmpxchg64 everywhere.
578  *
579  * Reconnecting complicates this picture just slightly. When we
580  * reconnect, we may be seeing duplicate packets. The peer
581  * is retransmitting them, because it hasn't seen an ACK for
582  * them. It is important that we ACK these.
583  *
584  * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
585  * this flag set *MUST* be acknowledged immediately.
586  */
587 
588 /*
589  * When we get here, we're called from the recv queue handler.
590  * Check whether we ought to transmit an ACK.
591  */
592 void
593 rdsv3_ib_attempt_ack(struct rdsv3_ib_connection *ic)
594 {
595 	unsigned int adv_credits;
596 
597 	RDSV3_DPRINTF4("rdsv3_ib_attempt_ack", "ic: %p", ic);
598 
599 	if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
600 		return;
601 
602 	if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
603 		rdsv3_ib_stats_inc(s_ib_ack_send_delayed);
604 		return;
605 	}
606 
607 	/* Can we get a send credit? */
608 	if (!rdsv3_ib_send_grab_credits(ic, 1, &adv_credits, 0,
609 	    RDSV3_MAX_ADV_CREDIT)) {
610 		rdsv3_ib_stats_inc(s_ib_tx_throttle);
611 		clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
612 		return;
613 	}
614 
615 	clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
616 	rdsv3_ib_send_ack(ic, adv_credits);
617 
618 	RDSV3_DPRINTF4("rdsv3_ib_attempt_ack", "Return: ic: %p", ic);
619 }
620 
621 /*
622  * We get here from the send completion handler, when the
623  * adapter tells us the ACK frame was sent.
624  */
625 void
626 rdsv3_ib_ack_send_complete(struct rdsv3_ib_connection *ic)
627 {
628 	RDSV3_DPRINTF4("rdsv3_ib_ack_send_complete", "ic: %p", ic);
629 	clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
630 	rdsv3_ib_attempt_ack(ic);
631 }
632 
633 /*
634  * This is called by the regular xmit code when it wants to piggyback
635  * an ACK on an outgoing frame.
636  */
637 uint64_t
638 rdsv3_ib_piggyb_ack(struct rdsv3_ib_connection *ic)
639 {
640 	RDSV3_DPRINTF4("rdsv3_ib_piggyb_ack", "ic: %p", ic);
641 	if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags)) {
642 		rdsv3_ib_stats_inc(s_ib_ack_send_piggybacked);
643 	}
644 	return (rdsv3_ib_get_ack(ic));
645 }
646 
647 static struct rdsv3_header *
648 rdsv3_ib_get_header(struct rdsv3_connection *conn,
649     struct rdsv3_ib_recv_work *recv,
650     uint32_t data_len)
651 {
652 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
653 	void *hdr_buff = &ic->i_recv_hdrs[recv - ic->i_recvs];
654 
655 	RDSV3_DPRINTF4("rdsv3_ib_get_header", "conn: %p, recv: %p len: %d",
656 	    conn, recv, data_len);
657 
658 	/*
659 	 * Support header at the front (RDS 3.1+) as well as header-at-end.
660 	 *
661 	 * Cases:
662 	 * 1) header all in header buff (great!)
663 	 * 2) header all in data page (copy all to header buff)
664 	 * 3) header split across hdr buf + data page
665 	 *    (move bit in hdr buff to end before copying other bit from
666 	 *    data page)
667 	 */
668 	if (conn->c_version > RDS_PROTOCOL_3_0 || data_len == RDSV3_FRAG_SIZE)
669 		return (hdr_buff);
670 	/*
671 	 * XXX - Need to discuss the support for version < RDS_PROTOCOL_3_1.
672 	 */
673 	if (conn->c_version == RDS_PROTOCOL_3_0)
674 		return (hdr_buff);
675 
676 	/* version < RDS_PROTOCOL_3_0 */
677 	RDSV3_DPRINTF2("rdsv3_ib_get_header",
678 	    "NULL header (version: 0x%x, data_len: %d)", conn->c_version,
679 	    data_len);
680 	return (NULL);
681 }
682 
683 /*
684  * It's kind of lame that we're copying from the posted receive pages into
685  * long-lived bitmaps.  We could have posted the bitmaps and rdma written into
686  * them.  But receiving new congestion bitmaps should be a *rare* event, so
687  * hopefully we won't need to invest that complexity in making it more
688  * efficient.  By copying we can share a simpler core with TCP which has to
689  * copy.
690  */
691 static void
692 rdsv3_ib_cong_recv(struct rdsv3_connection *conn,
693     struct rdsv3_ib_incoming *ibinc)
694 {
695 	struct rdsv3_cong_map *map;
696 	unsigned int map_off;
697 	unsigned int map_page;
698 	struct rdsv3_page_frag *frag;
699 	unsigned long frag_off;
700 	unsigned long to_copy;
701 	unsigned long copied;
702 	uint64_t uncongested = 0;
703 	caddr_t addr;
704 
705 	RDSV3_DPRINTF4("rdsv3_ib_cong_recv", "conn: %p, ibinc: %p",
706 	    conn, ibinc);
707 
708 	/* catch completely corrupt packets */
709 	if (ntohl(ibinc->ii_inc.i_hdr.h_len) != RDSV3_CONG_MAP_BYTES)
710 		return;
711 
712 	map = conn->c_fcong;
713 	map_page = 0;
714 	map_off = 0;
715 
716 	frag = list_head(&ibinc->ii_frags);
717 	frag_off = 0;
718 
719 	copied = 0;
720 
721 	while (copied < RDSV3_CONG_MAP_BYTES) {
722 		uint64_t *src, *dst;
723 		unsigned int k;
724 
725 		to_copy = min(RDSV3_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
726 		ASSERT(!(to_copy & 7)); /* Must be 64bit aligned. */
727 
728 		addr = frag->f_page + frag->f_offset;
729 
730 		src = (uint64_t *)(addr + frag_off);
731 		dst = (uint64_t *)(map->m_page_addrs[map_page] + map_off);
732 		RDSV3_DPRINTF4("rdsv3_ib_cong_recv",
733 		    "src: %p dst: %p copied: %d", src, dst, copied);
734 		for (k = 0; k < to_copy; k += 8) {
735 			/*
736 			 * Record ports that became uncongested, ie
737 			 * bits that changed from 0 to 1.
738 			 */
739 			uncongested |= ~(*src) & *dst;
740 			*dst++ = *src++;
741 		}
742 
743 		copied += to_copy;
744 		RDSV3_DPRINTF4("rdsv3_ib_cong_recv",
745 		    "src: %p dst: %p copied: %d", src, dst, copied);
746 
747 		map_off += to_copy;
748 		if (map_off == PAGE_SIZE) {
749 			map_off = 0;
750 			map_page++;
751 		}
752 
753 		frag_off += to_copy;
754 		if (frag_off == RDSV3_FRAG_SIZE) {
755 			frag = list_next(&ibinc->ii_frags, frag);
756 			frag_off = 0;
757 		}
758 	}
759 
760 #if 0
761 XXX
762 	/* the congestion map is in little endian order */
763 	uncongested = le64_to_cpu(uncongested);
764 #endif
765 
766 	rdsv3_cong_map_updated(map, uncongested);
767 
768 	RDSV3_DPRINTF4("rdsv3_ib_cong_recv", "Return: conn: %p, ibinc: %p",
769 	    conn, ibinc);
770 }
771 
772 /*
773  * Rings are posted with all the allocations they'll need to queue the
774  * incoming message to the receiving socket so this can't fail.
775  * All fragments start with a header, so we can make sure we're not receiving
776  * garbage, and we can tell a small 8 byte fragment from an ACK frame.
777  */
778 struct rdsv3_ib_ack_state {
779 	uint64_t		ack_next;
780 	uint64_t		ack_recv;
781 	unsigned int	ack_required:1;
782 	unsigned int	ack_next_valid:1;
783 	unsigned int	ack_recv_valid:1;
784 };
785 
786 static void
787 rdsv3_ib_process_recv(struct rdsv3_connection *conn,
788     struct rdsv3_ib_recv_work *recv, uint32_t data_len,
789     struct rdsv3_ib_ack_state *state)
790 {
791 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
792 	struct rdsv3_ib_incoming *ibinc = ic->i_ibinc;
793 	struct rdsv3_header *ihdr, *hdr;
794 
795 	/* XXX shut down the connection if port 0,0 are seen? */
796 
797 	RDSV3_DPRINTF5("rdsv3_ib_process_recv",
798 	    "ic %p ibinc %p recv %p byte len %u", ic, ibinc, recv, data_len);
799 
800 	if (data_len < sizeof (struct rdsv3_header)) {
801 		RDSV3_DPRINTF2("rdsv3_ib_process_recv",
802 		    "incoming message from %u.%u.%u.%u didn't include a "
803 		    "header, disconnecting and reconnecting",
804 		    NIPQUAD(conn->c_faddr));
805 		rdsv3_conn_drop(conn);
806 		return;
807 	}
808 	data_len -= sizeof (struct rdsv3_header);
809 
810 	if ((ihdr = rdsv3_ib_get_header(conn, recv, data_len)) == NULL) {
811 		RDSV3_DPRINTF2("rdsv3_ib_process_recv", "incoming message "
812 		    "from %u.%u.%u.%u didn't have a proper version (0x%x) or"
813 		    "data_len (0x%x), disconnecting and "
814 		    "reconnecting",
815 		    NIPQUAD(conn->c_faddr), conn->c_version, data_len);
816 		rdsv3_conn_drop(conn);
817 		return;
818 	}
819 
820 	/* Validate the checksum. */
821 	if (!rdsv3_message_verify_checksum(ihdr)) {
822 		RDSV3_DPRINTF2("rdsv3_ib_process_recv", "incoming message "
823 		    "from %u.%u.%u.%u has corrupted header - "
824 		    "forcing a reconnect",
825 		    NIPQUAD(conn->c_faddr));
826 		rdsv3_conn_drop(conn);
827 		rdsv3_stats_inc(s_recv_drop_bad_checksum);
828 		return;
829 	}
830 
831 	/* Process the ACK sequence which comes with every packet */
832 	state->ack_recv = ntohll(ihdr->h_ack);
833 	state->ack_recv_valid = 1;
834 
835 	/* Process the credits update if there was one */
836 	if (ihdr->h_credit)
837 		rdsv3_ib_send_add_credits(conn, ihdr->h_credit);
838 
839 	if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && data_len == 0) {
840 		/*
841 		 * This is an ACK-only packet. The fact that it gets
842 		 * special treatment here is that historically, ACKs
843 		 * were rather special beasts.
844 		 */
845 		rdsv3_ib_stats_inc(s_ib_ack_received);
846 
847 		/*
848 		 * Usually the frags make their way on to incs and are then
849 		 * freed as
850 		 * the inc is freed.  We don't go that route, so we have to
851 		 * drop the
852 		 * page ref ourselves.  We can't just leave the page on the recv
853 		 * because that confuses the dma mapping of pages and each
854 		 * recv's use
855 		 * of a partial page.  We can leave the frag, though, it will be
856 		 * reused.
857 		 *
858 		 * FIXME: Fold this into the code path below.
859 		 */
860 		rdsv3_ib_frag_drop_page(recv->r_frag);
861 		return;
862 	}
863 
864 	/*
865 	 * If we don't already have an inc on the connection then this
866 	 * fragment has a header and starts a message.. copy its header
867 	 * into the inc and save the inc so we can hang upcoming fragments
868 	 * off its list.
869 	 */
870 	if (ibinc == NULL) {
871 		ibinc = recv->r_ibinc;
872 		recv->r_ibinc = NULL;
873 		ic->i_ibinc = ibinc;
874 
875 		hdr = &ibinc->ii_inc.i_hdr;
876 		(void) memcpy(hdr, ihdr, sizeof (*hdr));
877 		ic->i_recv_data_rem = ntohl(hdr->h_len);
878 
879 		RDSV3_DPRINTF5("rdsv3_ib_process_recv",
880 		    "ic %p ibinc %p rem %u flag 0x%x", ic, ibinc,
881 		    ic->i_recv_data_rem, hdr->h_flags);
882 	} else {
883 		hdr = &ibinc->ii_inc.i_hdr;
884 		/*
885 		 * We can't just use memcmp here; fragments of a
886 		 * single message may carry different ACKs
887 		 */
888 		if (hdr->h_sequence != ihdr->h_sequence ||
889 		    hdr->h_len != ihdr->h_len ||
890 		    hdr->h_sport != ihdr->h_sport ||
891 		    hdr->h_dport != ihdr->h_dport) {
892 			RDSV3_DPRINTF2("rdsv3_ib_process_recv",
893 			    "fragment header mismatch; forcing reconnect");
894 			rdsv3_conn_drop(conn);
895 			return;
896 		}
897 	}
898 
899 	list_insert_tail(&ibinc->ii_frags, recv->r_frag);
900 	recv->r_frag = NULL;
901 
902 	if (ic->i_recv_data_rem > RDSV3_FRAG_SIZE)
903 		ic->i_recv_data_rem -= RDSV3_FRAG_SIZE;
904 	else {
905 		ic->i_recv_data_rem = 0;
906 		ic->i_ibinc = NULL;
907 
908 		if (ibinc->ii_inc.i_hdr.h_flags == RDSV3_FLAG_CONG_BITMAP)
909 			rdsv3_ib_cong_recv(conn, ibinc);
910 		else {
911 			rdsv3_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
912 			    &ibinc->ii_inc, KM_NOSLEEP);
913 			state->ack_next = ntohll(hdr->h_sequence);
914 			state->ack_next_valid = 1;
915 		}
916 
917 		/*
918 		 * Evaluate the ACK_REQUIRED flag *after* we received
919 		 * the complete frame, and after bumping the next_rx
920 		 * sequence.
921 		 */
922 		if (hdr->h_flags & RDSV3_FLAG_ACK_REQUIRED) {
923 			rdsv3_stats_inc(s_recv_ack_required);
924 			state->ack_required = 1;
925 		}
926 
927 		rdsv3_inc_put(&ibinc->ii_inc);
928 	}
929 
930 	RDSV3_DPRINTF4("rdsv3_ib_process_recv",
931 	    "Return: conn: %p recv: %p len: %d state: %p",
932 	    conn, recv, data_len, state);
933 }
934 
935 /*
936  * Plucking the oldest entry from the ring can be done concurrently with
937  * the thread refilling the ring.  Each ring operation is protected by
938  * spinlocks and the transient state of refilling doesn't change the
939  * recording of which entry is oldest.
940  *
941  * This relies on IB only calling one cq comp_handler for each cq so that
942  * there will only be one caller of rdsv3_recv_incoming() per RDS connection.
943  */
944 
945 void
946 rdsv3_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
947 {
948 	struct rdsv3_connection *conn = context;
949 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
950 
951 	RDSV3_DPRINTF4("rdsv3_ib_recv_cq_comp_handler",
952 	    "Enter(conn: %p cq: %p)", conn, cq);
953 
954 	rdsv3_ib_stats_inc(s_ib_rx_cq_call);
955 
956 	(void) ddi_taskq_dispatch(ic->i_recv_tasklet, rdsv3_ib_recv_tasklet_fn,
957 	    (void *)ic, DDI_SLEEP);
958 }
959 
960 static inline void
961 rdsv3_poll_cq(struct rdsv3_ib_connection *ic, struct rdsv3_ib_ack_state *state)
962 {
963 	struct rdsv3_connection *conn = ic->conn;
964 	ibt_wc_t wc;
965 	struct rdsv3_ib_recv_work *recv;
966 	uint_t polled;
967 
968 	while (ibt_poll_cq(RDSV3_CQ2CQHDL(ic->i_recv_cq), &wc, 1, &polled) ==
969 	    IBT_SUCCESS) {
970 		RDSV3_DPRINTF5("rdsv3_ib_recv_cq_comp_handler",
971 		    "rwc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
972 		    (unsigned long long)wc.wc_id, wc.wc_status,
973 		    wc.wc_bytes_xfer, ntohl(wc.wc_immed_data));
974 		rdsv3_ib_stats_inc(s_ib_rx_cq_event);
975 
976 		recv = &ic->i_recvs[rdsv3_ib_ring_oldest(&ic->i_recv_ring)];
977 
978 		rdsv3_ib_recv_unmap_page(ic, recv);
979 
980 		/*
981 		 * Also process recvs in connecting state because it is possible
982 		 * to get a recv completion _before_ the rdmacm ESTABLISHED
983 		 * event is processed.
984 		 */
985 		if (rdsv3_conn_up(conn) || rdsv3_conn_connecting(conn)) {
986 			/*
987 			 * We expect errors as the qp is drained during
988 			 * shutdown
989 			 */
990 			if (wc.wc_status == IBT_WC_SUCCESS) {
991 				rdsv3_ib_process_recv(conn, recv,
992 				    wc.wc_bytes_xfer, state);
993 			} else {
994 				RDSV3_DPRINTF2("rdsv3_ib_recv_cq_comp_handler",
995 				    "recv completion on "
996 				    "%u.%u.%u.%u had status %u, "
997 				    "disconnecting and reconnecting\n",
998 				    NIPQUAD(conn->c_faddr),
999 				    wc.wc_status);
1000 				rdsv3_conn_drop(conn);
1001 			}
1002 		}
1003 
1004 		rdsv3_ib_ring_free(&ic->i_recv_ring, 1);
1005 	}
1006 }
1007 
1008 static processorid_t rdsv3_taskq_bind_cpuid = 0;
1009 void
1010 rdsv3_ib_recv_tasklet_fn(void *data)
1011 {
1012 	struct rdsv3_ib_connection *ic = (struct rdsv3_ib_connection *)data;
1013 	struct rdsv3_connection *conn = ic->conn;
1014 	struct rdsv3_ib_ack_state state = { 0, };
1015 	cpu_t   *cp;
1016 
1017 	RDSV3_DPRINTF4("rdsv3_ib_recv_tasklet_fn", "Enter: ic: %p", ic);
1018 
1019 	/* If not already bound, bind this thread to a CPU */
1020 	if (ic->i_recv_tasklet_cpuid != rdsv3_taskq_bind_cpuid) {
1021 		cp = cpu[rdsv3_taskq_bind_cpuid];
1022 		mutex_enter(&cpu_lock);
1023 		if (cpu_is_online(cp)) {
1024 			if (ic->i_recv_tasklet_cpuid >= 0)
1025 				thread_affinity_clear(curthread);
1026 			thread_affinity_set(curthread, rdsv3_taskq_bind_cpuid);
1027 			ic->i_recv_tasklet_cpuid = rdsv3_taskq_bind_cpuid;
1028 		}
1029 		mutex_exit(&cpu_lock);
1030 	}
1031 
1032 	rdsv3_poll_cq(ic, &state);
1033 	(void) ibt_enable_cq_notify(RDSV3_CQ2CQHDL(ic->i_recv_cq),
1034 	    IBT_NEXT_SOLICITED);
1035 	rdsv3_poll_cq(ic, &state);
1036 
1037 	if (state.ack_next_valid)
1038 		rdsv3_ib_set_ack(ic, state.ack_next, state.ack_required);
1039 	if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
1040 		rdsv3_send_drop_acked(conn, state.ack_recv, NULL);
1041 		ic->i_ack_recv = state.ack_recv;
1042 	}
1043 	if (rdsv3_conn_up(conn))
1044 		rdsv3_ib_attempt_ack(ic);
1045 
1046 	/*
1047 	 * If we ever end up with a really empty receive ring, we're
1048 	 * in deep trouble, as the sender will definitely see RNR
1049 	 * timeouts.
1050 	 */
1051 	if (rdsv3_ib_ring_empty(&ic->i_recv_ring))
1052 		rdsv3_ib_stats_inc(s_ib_rx_ring_empty);
1053 
1054 	/*
1055 	 * If the ring is running low, then schedule the thread to refill.
1056 	 */
1057 	if (rdsv3_ib_ring_low(&ic->i_recv_ring) &&
1058 	    (rdsv3_conn_up(conn) || rdsv3_conn_connecting(conn)))
1059 		rdsv3_queue_delayed_work(rdsv3_wq, &conn->c_recv_w, 0);
1060 
1061 	RDSV3_DPRINTF4("rdsv3_ib_recv_tasklet_fn", "Return: ic: %p", ic);
1062 }
1063 
1064 int
1065 rdsv3_ib_recv(struct rdsv3_connection *conn)
1066 {
1067 	struct rdsv3_ib_connection *ic = conn->c_transport_data;
1068 	int ret = 0;
1069 
1070 	RDSV3_DPRINTF4("rdsv3_ib_recv", "conn %p\n", conn);
1071 
1072 	/*
1073 	 * If we get a temporary posting failure in this context then
1074 	 * we're really low and we want the caller to back off for a bit.
1075 	 */
1076 	mutex_enter(&ic->i_recv_mutex);
1077 	if (rdsv3_ib_recv_refill(conn, KM_NOSLEEP, 0, 0))
1078 		ret = -ENOMEM;
1079 	else
1080 		rdsv3_ib_stats_inc(s_ib_rx_refill_from_thread);
1081 	mutex_exit(&ic->i_recv_mutex);
1082 
1083 	if (rdsv3_conn_up(conn))
1084 		rdsv3_ib_attempt_ack(ic);
1085 
1086 	RDSV3_DPRINTF4("rdsv3_ib_recv", "Return: conn: %p", conn);
1087 
1088 	return (ret);
1089 }
1090 
1091 uint_t	MaxRecvMemory = 128 * 1024 * 1024;
1092 
1093 int
1094 rdsv3_ib_recv_init(void)
1095 {
1096 	int ret = -ENOMEM;
1097 
1098 	RDSV3_DPRINTF4("rdsv3_ib_recv_init", "Enter");
1099 
1100 	/* XXX - hard code it to 128 MB */
1101 	rdsv3_ib_sysctl_max_recv_allocation = MaxRecvMemory / RDSV3_FRAG_SIZE;
1102 
1103 	rdsv3_ib_incoming_slab = kmem_cache_create("rdsv3_ib_incoming",
1104 	    sizeof (struct rdsv3_ib_incoming), 0, NULL, NULL, NULL,
1105 	    NULL, NULL, 0);
1106 	if (rdsv3_ib_incoming_slab == NULL)
1107 		goto out;
1108 
1109 	rdsv3_ib_frag_slab = kmem_cache_create("rdsv3_ib_frag",
1110 	    sizeof (struct rdsv3_page_frag),
1111 	    0, NULL, NULL, NULL, NULL, NULL, 0);
1112 	if (rdsv3_ib_frag_slab == NULL)
1113 		kmem_cache_destroy(rdsv3_ib_incoming_slab);
1114 	else
1115 		ret = 0;
1116 
1117 	RDSV3_DPRINTF4("rdsv3_ib_recv_init", "Return");
1118 out:
1119 	return (ret);
1120 }
1121 
1122 void
1123 rdsv3_ib_recv_exit(void)
1124 {
1125 	RDSV3_DPRINTF4("rdsv3_ib_recv_exit", "Enter");
1126 	kmem_cache_destroy(rdsv3_ib_incoming_slab);
1127 	kmem_cache_destroy(rdsv3_ib_frag_slab);
1128 	RDSV3_DPRINTF4("rdsv3_ib_recv_exit", "Return");
1129 }
1130