1 /*
2  * Copyright (c) 2009 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include "sdp.h"
33 
34 SDP_MODPARAM_INT(rcvbuf_initial_size, 32 * 1024,
35 		"Receive buffer initial size in bytes.");
36 SDP_MODPARAM_SINT(rcvbuf_scale, 0x8,
37 		"Receive buffer size scale factor.");
38 
39 /* Like tcp_fin - called when SDP_MID_DISCONNECT is received */
40 static void
41 sdp_handle_disconn(struct sdp_sock *ssk)
42 {
43 
44 	sdp_dbg(ssk->socket, "%s\n", __func__);
45 
46 	SDP_WLOCK_ASSERT(ssk);
47 	if (TCPS_HAVERCVDFIN(ssk->state) == 0)
48 		socantrcvmore(ssk->socket);
49 
50 	switch (ssk->state) {
51 	case TCPS_SYN_RECEIVED:
52 	case TCPS_ESTABLISHED:
53 		ssk->state = TCPS_CLOSE_WAIT;
54 		break;
55 
56 	case TCPS_FIN_WAIT_1:
57 		/* Received a reply FIN - start Infiniband tear down */
58 		sdp_dbg(ssk->socket,
59 		    "%s: Starting Infiniband tear down sending DREQ\n",
60 		    __func__);
61 
62 		sdp_cancel_dreq_wait_timeout(ssk);
63 		ssk->qp_active = 0;
64 		if (ssk->id) {
65 			struct rdma_cm_id *id;
66 
67 			id = ssk->id;
68 			SDP_WUNLOCK(ssk);
69 			rdma_disconnect(id);
70 			SDP_WLOCK(ssk);
71 		} else {
72 			sdp_warn(ssk->socket,
73 			    "%s: ssk->id is NULL\n", __func__);
74 			return;
75 		}
76 		break;
77 	case TCPS_TIME_WAIT:
78 		/* This is a mutual close situation and we've got the DREQ from
79 		   the peer before the SDP_MID_DISCONNECT */
80 		break;
81 	case TCPS_CLOSED:
82 		/* FIN arrived after IB teardown started - do nothing */
83 		sdp_dbg(ssk->socket, "%s: fin in state %s\n",
84 		    __func__, sdp_state_str(ssk->state));
85 		return;
86 	default:
87 		sdp_warn(ssk->socket,
88 		    "%s: FIN in unexpected state. state=%d\n",
89 		    __func__, ssk->state);
90 		break;
91 	}
92 }
93 
94 static int
95 sdp_post_recv(struct sdp_sock *ssk)
96 {
97 	struct sdp_buf *rx_req;
98 	int i, rc;
99 	u64 addr;
100 	struct ib_device *dev;
101 	struct ib_recv_wr rx_wr = { NULL };
102 	struct ib_sge ibsge[SDP_MAX_RECV_SGES];
103 	struct ib_sge *sge = ibsge;
104 	struct ib_recv_wr *bad_wr;
105 	struct mbuf *mb, *m;
106 	struct sdp_bsdh *h;
107 	int id = ring_head(ssk->rx_ring);
108 
109 	/* Now, allocate and repost recv */
110 	sdp_prf(ssk->socket, mb, "Posting mb");
111 	mb = m_getm2(NULL, ssk->recv_bytes, M_NOWAIT, MT_DATA, M_PKTHDR);
112 	if (mb == NULL) {
113 		/* Retry so we can't stall out with no memory. */
114 		if (!rx_ring_posted(ssk))
115 			queue_work(rx_comp_wq, &ssk->rx_comp_work);
116 		return -1;
117 	}
118 	for (m = mb; m != NULL; m = m->m_next) {
119 		m->m_len = (m->m_flags & M_EXT) ? m->m_ext.ext_size :
120                         ((m->m_flags & M_PKTHDR) ? MHLEN : MLEN);
121 		mb->m_pkthdr.len += m->m_len;
122 	}
123 	h = mtod(mb, struct sdp_bsdh *);
124 	rx_req = ssk->rx_ring.buffer + (id & (SDP_RX_SIZE - 1));
125 	rx_req->mb = mb;
126 	dev = ssk->ib_device;
127         for (i = 0;  mb != NULL; i++, mb = mb->m_next, sge++) {
128 		addr = ib_dma_map_single(dev, mb->m_data, mb->m_len,
129 		    DMA_TO_DEVICE);
130 		/* TODO: proper error handling */
131 		BUG_ON(ib_dma_mapping_error(dev, addr));
132 		BUG_ON(i >= SDP_MAX_RECV_SGES);
133 		rx_req->mapping[i] = addr;
134 		sge->addr = addr;
135 		sge->length = mb->m_len;
136 		sge->lkey = ssk->sdp_dev->mr->lkey;
137         }
138 
139 	rx_wr.next = NULL;
140 	rx_wr.wr_id = id | SDP_OP_RECV;
141 	rx_wr.sg_list = ibsge;
142 	rx_wr.num_sge = i;
143 	rc = ib_post_recv(ssk->qp, &rx_wr, &bad_wr);
144 	if (unlikely(rc)) {
145 		sdp_warn(ssk->socket, "ib_post_recv failed. status %d\n", rc);
146 
147 		sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
148 		m_freem(mb);
149 
150 		sdp_notify(ssk, ECONNRESET);
151 
152 		return -1;
153 	}
154 
155 	atomic_inc(&ssk->rx_ring.head);
156 	SDPSTATS_COUNTER_INC(post_recv);
157 
158 	return 0;
159 }
160 
161 static inline int
162 sdp_post_recvs_needed(struct sdp_sock *ssk)
163 {
164 	unsigned long bytes_in_process;
165 	unsigned long max_bytes;
166 	int buffer_size;
167 	int posted;
168 
169 	if (!ssk->qp_active || !ssk->socket)
170 		return 0;
171 
172 	posted = rx_ring_posted(ssk);
173 	if (posted >= SDP_RX_SIZE)
174 		return 0;
175 	if (posted < SDP_MIN_TX_CREDITS)
176 		return 1;
177 
178 	buffer_size = ssk->recv_bytes;
179 	max_bytes = max(ssk->socket->so_snd.sb_hiwat,
180 	    (1 + SDP_MIN_TX_CREDITS) * buffer_size);
181 	max_bytes *= rcvbuf_scale;
182 	/*
183 	 * Compute bytes in the receive queue and socket buffer.
184 	 */
185 	bytes_in_process = (posted - SDP_MIN_TX_CREDITS) * buffer_size;
186 	bytes_in_process += ssk->socket->so_rcv.sb_cc;
187 
188 	return bytes_in_process < max_bytes;
189 }
190 
191 static inline void
192 sdp_post_recvs(struct sdp_sock *ssk)
193 {
194 
195 	while (sdp_post_recvs_needed(ssk))
196 		if (sdp_post_recv(ssk))
197 			return;
198 }
199 
200 static inline struct mbuf *
201 sdp_sock_queue_rcv_mb(struct socket *sk, struct mbuf *mb)
202 {
203 	struct sdp_sock *ssk = sdp_sk(sk);
204 	struct sdp_bsdh *h;
205 
206 	h = mtod(mb, struct sdp_bsdh *);
207 
208 #ifdef SDP_ZCOPY
209 	SDP_SKB_CB(mb)->seq = rcv_nxt(ssk);
210 	if (h->mid == SDP_MID_SRCAVAIL) {
211 		struct sdp_srcah *srcah = (struct sdp_srcah *)(h+1);
212 		struct rx_srcavail_state *rx_sa;
213 
214 		ssk->srcavail_cancel_mseq = 0;
215 
216 		ssk->rx_sa = rx_sa = RX_SRCAVAIL_STATE(mb) = kzalloc(
217 				sizeof(struct rx_srcavail_state), M_NOWAIT);
218 
219 		rx_sa->mseq = ntohl(h->mseq);
220 		rx_sa->used = 0;
221 		rx_sa->len = mb_len = ntohl(srcah->len);
222 		rx_sa->rkey = ntohl(srcah->rkey);
223 		rx_sa->vaddr = be64_to_cpu(srcah->vaddr);
224 		rx_sa->flags = 0;
225 
226 		if (ssk->tx_sa) {
227 			sdp_dbg_data(ssk->socket, "got RX SrcAvail while waiting "
228 					"for TX SrcAvail. waking up TX SrcAvail"
229 					"to be aborted\n");
230 			wake_up(sk->sk_sleep);
231 		}
232 
233 		atomic_add(mb->len, &ssk->rcv_nxt);
234 		sdp_dbg_data(sk, "queueing SrcAvail. mb_len = %d vaddr = %lld\n",
235 			mb_len, rx_sa->vaddr);
236 	} else
237 #endif
238 	{
239 		atomic_add(mb->m_pkthdr.len, &ssk->rcv_nxt);
240 	}
241 
242 	m_adj(mb, SDP_HEAD_SIZE);
243 	SOCKBUF_LOCK(&sk->so_rcv);
244 	if (unlikely(h->flags & SDP_OOB_PRES))
245 		sdp_urg(ssk, mb);
246 	sbappend_locked(&sk->so_rcv, mb);
247 	sorwakeup_locked(sk);
248 	return mb;
249 }
250 
251 static int
252 sdp_get_recv_bytes(struct sdp_sock *ssk, u32 new_size)
253 {
254 
255 	return MIN(new_size, SDP_MAX_PACKET);
256 }
257 
258 int
259 sdp_init_buffers(struct sdp_sock *ssk, u32 new_size)
260 {
261 
262 	ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
263 	sdp_post_recvs(ssk);
264 
265 	return 0;
266 }
267 
268 int
269 sdp_resize_buffers(struct sdp_sock *ssk, u32 new_size)
270 {
271 	u32 curr_size = ssk->recv_bytes;
272 	u32 max_size = SDP_MAX_PACKET;
273 
274 	if (new_size > curr_size && new_size <= max_size) {
275 		ssk->recv_bytes = sdp_get_recv_bytes(ssk, new_size);
276 		return 0;
277 	}
278 	return -1;
279 }
280 
281 static void
282 sdp_handle_resize_request(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
283 {
284 	if (sdp_resize_buffers(ssk, ntohl(buf->size)) == 0)
285 		ssk->recv_request_head = ring_head(ssk->rx_ring) + 1;
286 	else
287 		ssk->recv_request_head = ring_tail(ssk->rx_ring);
288 	ssk->recv_request = 1;
289 }
290 
291 static void
292 sdp_handle_resize_ack(struct sdp_sock *ssk, struct sdp_chrecvbuf *buf)
293 {
294 	u32 new_size = ntohl(buf->size);
295 
296 	if (new_size > ssk->xmit_size_goal)
297 		ssk->xmit_size_goal = new_size;
298 }
299 
300 static struct mbuf *
301 sdp_recv_completion(struct sdp_sock *ssk, int id)
302 {
303 	struct sdp_buf *rx_req;
304 	struct ib_device *dev;
305 	struct mbuf *mb;
306 
307 	if (unlikely(id != ring_tail(ssk->rx_ring))) {
308 		printk(KERN_WARNING "Bogus recv completion id %d tail %d\n",
309 			id, ring_tail(ssk->rx_ring));
310 		return NULL;
311 	}
312 
313 	dev = ssk->ib_device;
314 	rx_req = &ssk->rx_ring.buffer[id & (SDP_RX_SIZE - 1)];
315 	mb = rx_req->mb;
316 	sdp_cleanup_sdp_buf(ssk, rx_req, DMA_FROM_DEVICE);
317 
318 	atomic_inc(&ssk->rx_ring.tail);
319 	atomic_dec(&ssk->remote_credits);
320 	return mb;
321 }
322 
323 /* socket lock should be taken before calling this */
324 static int
325 sdp_process_rx_ctl_mb(struct sdp_sock *ssk, struct mbuf *mb)
326 {
327 	struct sdp_bsdh *h;
328 	struct socket *sk;
329 
330 	SDP_WLOCK_ASSERT(ssk);
331 	sk = ssk->socket;
332  	h = mtod(mb, struct sdp_bsdh *);
333 	switch (h->mid) {
334 	case SDP_MID_DATA:
335 	case SDP_MID_SRCAVAIL:
336 		sdp_dbg(sk, "DATA after socket rcv was shutdown\n");
337 
338 		/* got data in RCV_SHUTDOWN */
339 		if (ssk->state == TCPS_FIN_WAIT_1) {
340 			sdp_dbg(sk, "RX data when state = FIN_WAIT1\n");
341 			sdp_notify(ssk, ECONNRESET);
342 		}
343 		m_freem(mb);
344 
345 		break;
346 #ifdef SDP_ZCOPY
347 	case SDP_MID_RDMARDCOMPL:
348 		m_freem(mb);
349 		break;
350 	case SDP_MID_SENDSM:
351 		sdp_handle_sendsm(ssk, ntohl(h->mseq_ack));
352 		m_freem(mb);
353 		break;
354 	case SDP_MID_SRCAVAIL_CANCEL:
355 		sdp_dbg_data(sk, "Handling SrcAvailCancel\n");
356 		sdp_prf(sk, NULL, "Handling SrcAvailCancel");
357 		if (ssk->rx_sa) {
358 			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
359 			ssk->rx_sa->flags |= RX_SA_ABORTED;
360 			ssk->rx_sa = NULL; /* TODO: change it into SDP_MID_DATA and get
361 			                      the dirty logic from recvmsg */
362 		} else {
363 			sdp_dbg(sk, "Got SrcAvailCancel - "
364 					"but no SrcAvail in process\n");
365 		}
366 		m_freem(mb);
367 		break;
368 	case SDP_MID_SINKAVAIL:
369 		sdp_dbg_data(sk, "Got SinkAvail - not supported: ignored\n");
370 		sdp_prf(sk, NULL, "Got SinkAvail - not supported: ignored");
371 		/* FALLTHROUGH */
372 #endif
373 	case SDP_MID_ABORT:
374 		sdp_dbg_data(sk, "Handling ABORT\n");
375 		sdp_prf(sk, NULL, "Handling ABORT");
376 		sdp_notify(ssk, ECONNRESET);
377 		m_freem(mb);
378 		break;
379 	case SDP_MID_DISCONN:
380 		sdp_dbg_data(sk, "Handling DISCONN\n");
381 		sdp_prf(sk, NULL, "Handling DISCONN");
382 		sdp_handle_disconn(ssk);
383 		break;
384 	case SDP_MID_CHRCVBUF:
385 		sdp_dbg_data(sk, "Handling RX CHRCVBUF\n");
386 		sdp_handle_resize_request(ssk, (struct sdp_chrecvbuf *)(h+1));
387 		m_freem(mb);
388 		break;
389 	case SDP_MID_CHRCVBUF_ACK:
390 		sdp_dbg_data(sk, "Handling RX CHRCVBUF_ACK\n");
391 		sdp_handle_resize_ack(ssk, (struct sdp_chrecvbuf *)(h+1));
392 		m_freem(mb);
393 		break;
394 	default:
395 		/* TODO: Handle other messages */
396 		sdp_warn(sk, "SDP: FIXME MID %d\n", h->mid);
397 		m_freem(mb);
398 	}
399 
400 	return 0;
401 }
402 
403 static int
404 sdp_process_rx_mb(struct sdp_sock *ssk, struct mbuf *mb)
405 {
406 	struct socket *sk;
407 	struct sdp_bsdh *h;
408 	unsigned long mseq_ack;
409 	int credits_before;
410 
411 	h = mtod(mb, struct sdp_bsdh *);
412 	sk = ssk->socket;
413 	/*
414 	 * If another thread is in so_pcbfree this may be partially torn
415 	 * down but no further synchronization is required as the destroying
416 	 * thread will wait for receive to shutdown before discarding the
417 	 * socket.
418 	 */
419 	if (sk == NULL) {
420 		m_freem(mb);
421 		return 0;
422 	}
423 
424 	SDPSTATS_HIST_LINEAR(credits_before_update, tx_credits(ssk));
425 
426 	mseq_ack = ntohl(h->mseq_ack);
427 	credits_before = tx_credits(ssk);
428 	atomic_set(&ssk->tx_ring.credits, mseq_ack - ring_head(ssk->tx_ring) +
429 			1 + ntohs(h->bufs));
430 	if (mseq_ack >= ssk->nagle_last_unacked)
431 		ssk->nagle_last_unacked = 0;
432 
433 	sdp_prf1(ssk->socket, mb, "RX %s +%d c:%d->%d mseq:%d ack:%d\n",
434 		mid2str(h->mid), ntohs(h->bufs), credits_before,
435 		tx_credits(ssk), ntohl(h->mseq), ntohl(h->mseq_ack));
436 
437 	if (unlikely(h->mid == SDP_MID_DATA &&
438 	    mb->m_pkthdr.len == SDP_HEAD_SIZE)) {
439 		/* Credit update is valid even after RCV_SHUTDOWN */
440 		m_freem(mb);
441 		return 0;
442 	}
443 
444 	if ((h->mid != SDP_MID_DATA && h->mid != SDP_MID_SRCAVAIL) ||
445 	    TCPS_HAVERCVDFIN(ssk->state)) {
446 		sdp_prf(sk, NULL, "Control mb - queing to control queue");
447 #ifdef SDP_ZCOPY
448 		if (h->mid == SDP_MID_SRCAVAIL_CANCEL) {
449 			sdp_dbg_data(sk, "Got SrcAvailCancel. "
450 					"seq: 0x%d seq_ack: 0x%d\n",
451 					ntohl(h->mseq), ntohl(h->mseq_ack));
452 			ssk->srcavail_cancel_mseq = ntohl(h->mseq);
453 		}
454 
455 
456 		if (h->mid == SDP_MID_RDMARDCOMPL) {
457 			struct sdp_rrch *rrch = (struct sdp_rrch *)(h+1);
458 			sdp_dbg_data(sk, "RdmaRdCompl message arrived\n");
459 			sdp_handle_rdma_read_compl(ssk, ntohl(h->mseq_ack),
460 					ntohl(rrch->len));
461 		}
462 #endif
463 		mb->m_nextpkt = NULL;
464 		if (ssk->rx_ctl_tail)
465 			ssk->rx_ctl_tail->m_nextpkt = mb;
466 		else
467 			ssk->rx_ctl_q = mb;
468 		ssk->rx_ctl_tail = mb;
469 
470 		return 0;
471 	}
472 
473 	sdp_prf1(sk, NULL, "queueing %s mb\n", mid2str(h->mid));
474 	mb = sdp_sock_queue_rcv_mb(sk, mb);
475 
476 
477 	return 0;
478 }
479 
480 /* called only from irq */
481 static struct mbuf *
482 sdp_process_rx_wc(struct sdp_sock *ssk, struct ib_wc *wc)
483 {
484 	struct mbuf *mb;
485 	struct sdp_bsdh *h;
486 	struct socket *sk = ssk->socket;
487 	int mseq;
488 
489 	mb = sdp_recv_completion(ssk, wc->wr_id);
490 	if (unlikely(!mb))
491 		return NULL;
492 
493 	if (unlikely(wc->status)) {
494 		if (ssk->qp_active && sk) {
495 			sdp_dbg(sk, "Recv completion with error. "
496 					"Status %d, vendor: %d\n",
497 				wc->status, wc->vendor_err);
498 			sdp_abort(sk);
499 			ssk->qp_active = 0;
500 		}
501 		m_freem(mb);
502 		return NULL;
503 	}
504 
505 	sdp_dbg_data(sk, "Recv completion. ID %d Length %d\n",
506 			(int)wc->wr_id, wc->byte_len);
507 	if (unlikely(wc->byte_len < sizeof(struct sdp_bsdh))) {
508 		sdp_warn(sk, "SDP BUG! byte_len %d < %zd\n",
509 				wc->byte_len, sizeof(struct sdp_bsdh));
510 		m_freem(mb);
511 		return NULL;
512 	}
513 	/* Use m_adj to trim the tail of data we didn't use. */
514 	m_adj(mb, -(mb->m_pkthdr.len - wc->byte_len));
515 	h = mtod(mb, struct sdp_bsdh *);
516 
517 	SDP_DUMP_PACKET(ssk->socket, "RX", mb, h);
518 
519 	ssk->rx_packets++;
520 	ssk->rx_bytes += mb->m_pkthdr.len;
521 
522 	mseq = ntohl(h->mseq);
523 	atomic_set(&ssk->mseq_ack, mseq);
524 	if (mseq != (int)wc->wr_id)
525 		sdp_warn(sk, "SDP BUG! mseq %d != wrid %d\n",
526 				mseq, (int)wc->wr_id);
527 
528 	return mb;
529 }
530 
531 /* Wakeup writers if we now have credits. */
532 static void
533 sdp_bzcopy_write_space(struct sdp_sock *ssk)
534 {
535 	struct socket *sk = ssk->socket;
536 
537 	if (tx_credits(ssk) >= ssk->min_bufs && sk)
538 		sowwakeup(sk);
539 }
540 
541 /* only from interrupt. */
542 static int
543 sdp_poll_rx_cq(struct sdp_sock *ssk)
544 {
545 	struct ib_cq *cq = ssk->rx_ring.cq;
546 	struct ib_wc ibwc[SDP_NUM_WC];
547 	int n, i;
548 	int wc_processed = 0;
549 	struct mbuf *mb;
550 
551 	do {
552 		n = ib_poll_cq(cq, SDP_NUM_WC, ibwc);
553 		for (i = 0; i < n; ++i) {
554 			struct ib_wc *wc = &ibwc[i];
555 
556 			BUG_ON(!(wc->wr_id & SDP_OP_RECV));
557 			mb = sdp_process_rx_wc(ssk, wc);
558 			if (!mb)
559 				continue;
560 
561 			sdp_process_rx_mb(ssk, mb);
562 			wc_processed++;
563 		}
564 	} while (n == SDP_NUM_WC);
565 
566 	if (wc_processed)
567 		sdp_bzcopy_write_space(ssk);
568 
569 	return wc_processed;
570 }
571 
572 static void
573 sdp_rx_comp_work(struct work_struct *work)
574 {
575 	struct sdp_sock *ssk = container_of(work, struct sdp_sock,
576 			rx_comp_work);
577 
578 	sdp_prf(ssk->socket, NULL, "%s", __func__);
579 
580 	SDP_WLOCK(ssk);
581 	if (unlikely(!ssk->qp)) {
582 		sdp_prf(ssk->socket, NULL, "qp was destroyed");
583 		goto out;
584 	}
585 	if (unlikely(!ssk->rx_ring.cq)) {
586 		sdp_prf(ssk->socket, NULL, "rx_ring.cq is NULL");
587 		goto out;
588 	}
589 
590 	if (unlikely(!ssk->poll_cq)) {
591 		struct rdma_cm_id *id = ssk->id;
592 		if (id && id->qp)
593 			rdma_notify(id, RDMA_CM_EVENT_ESTABLISHED);
594 		goto out;
595 	}
596 
597 	sdp_do_posts(ssk);
598 out:
599 	SDP_WUNLOCK(ssk);
600 }
601 
602 void
603 sdp_do_posts(struct sdp_sock *ssk)
604 {
605 	struct socket *sk = ssk->socket;
606 	int xmit_poll_force;
607 	struct mbuf *mb;
608 
609 	SDP_WLOCK_ASSERT(ssk);
610 	if (!ssk->qp_active) {
611 		sdp_dbg(sk, "QP is deactivated\n");
612 		return;
613 	}
614 
615 	while ((mb = ssk->rx_ctl_q)) {
616 		ssk->rx_ctl_q = mb->m_nextpkt;
617 		mb->m_nextpkt = NULL;
618 		sdp_process_rx_ctl_mb(ssk, mb);
619 	}
620 
621 	if (ssk->state == TCPS_TIME_WAIT)
622 		return;
623 
624 	if (!ssk->rx_ring.cq || !ssk->tx_ring.cq)
625 		return;
626 
627 	sdp_post_recvs(ssk);
628 
629 	if (tx_ring_posted(ssk))
630 		sdp_xmit_poll(ssk, 1);
631 
632 	sdp_post_sends(ssk, M_NOWAIT);
633 
634 	xmit_poll_force = tx_credits(ssk) < SDP_MIN_TX_CREDITS;
635 
636 	if (credit_update_needed(ssk) || xmit_poll_force) {
637 		/* if has pending tx because run out of tx_credits - xmit it */
638 		sdp_prf(sk, NULL, "Processing to free pending sends");
639 		sdp_xmit_poll(ssk,  xmit_poll_force);
640 		sdp_prf(sk, NULL, "Sending credit update");
641 		sdp_post_sends(ssk, M_NOWAIT);
642 	}
643 
644 }
645 
646 int
647 sdp_process_rx(struct sdp_sock *ssk)
648 {
649 	int wc_processed = 0;
650 	int credits_before;
651 
652 	if (!rx_ring_trylock(&ssk->rx_ring)) {
653 		sdp_dbg(ssk->socket, "ring destroyed. not polling it\n");
654 		return 0;
655 	}
656 
657 	credits_before = tx_credits(ssk);
658 
659 	wc_processed = sdp_poll_rx_cq(ssk);
660 	sdp_prf(ssk->socket, NULL, "processed %d", wc_processed);
661 
662 	if (wc_processed) {
663 		sdp_prf(ssk->socket, NULL, "credits:  %d -> %d",
664 				credits_before, tx_credits(ssk));
665 		queue_work(rx_comp_wq, &ssk->rx_comp_work);
666 	}
667 	sdp_arm_rx_cq(ssk);
668 
669 	rx_ring_unlock(&ssk->rx_ring);
670 
671 	return (wc_processed);
672 }
673 
674 static void
675 sdp_rx_irq(struct ib_cq *cq, void *cq_context)
676 {
677 	struct socket *sk = cq_context;
678 	struct sdp_sock *ssk = sdp_sk(sk);
679 
680 	if (cq != ssk->rx_ring.cq) {
681 		sdp_dbg(sk, "cq = %p, ssk->cq = %p\n", cq, ssk->rx_ring.cq);
682 		return;
683 	}
684 
685 	SDPSTATS_COUNTER_INC(rx_int_count);
686 
687 	sdp_prf(sk, NULL, "rx irq");
688 
689 	sdp_process_rx(ssk);
690 }
691 
692 static
693 void sdp_rx_ring_purge(struct sdp_sock *ssk)
694 {
695 	while (rx_ring_posted(ssk) > 0) {
696 		struct mbuf *mb;
697 		mb = sdp_recv_completion(ssk, ring_tail(ssk->rx_ring));
698 		if (!mb)
699 			break;
700 		m_freem(mb);
701 	}
702 }
703 
704 void
705 sdp_rx_ring_init(struct sdp_sock *ssk)
706 {
707 	ssk->rx_ring.buffer = NULL;
708 	ssk->rx_ring.destroyed = 0;
709 	rw_init(&ssk->rx_ring.destroyed_lock, "sdp rx lock");
710 }
711 
712 static void
713 sdp_rx_cq_event_handler(struct ib_event *event, void *data)
714 {
715 }
716 
717 int
718 sdp_rx_ring_create(struct sdp_sock *ssk, struct ib_device *device)
719 {
720 	struct ib_cq *rx_cq;
721 	int rc = 0;
722 
723 
724 	sdp_dbg(ssk->socket, "rx ring created");
725 	INIT_WORK(&ssk->rx_comp_work, sdp_rx_comp_work);
726 	atomic_set(&ssk->rx_ring.head, 1);
727 	atomic_set(&ssk->rx_ring.tail, 1);
728 
729 	ssk->rx_ring.buffer = kmalloc(
730 			sizeof *ssk->rx_ring.buffer * SDP_RX_SIZE, GFP_KERNEL);
731 	if (!ssk->rx_ring.buffer) {
732 		sdp_warn(ssk->socket,
733 			"Unable to allocate RX Ring size %zd.\n",
734 			 sizeof(*ssk->rx_ring.buffer) * SDP_RX_SIZE);
735 
736 		return -ENOMEM;
737 	}
738 
739 	rx_cq = ib_create_cq(device, sdp_rx_irq, sdp_rx_cq_event_handler,
740 			  ssk->socket, SDP_RX_SIZE, IB_CQ_VECTOR_LEAST_ATTACHED);
741 
742 	if (IS_ERR(rx_cq)) {
743 		rc = PTR_ERR(rx_cq);
744 		sdp_warn(ssk->socket, "Unable to allocate RX CQ: %d.\n", rc);
745 		goto err_cq;
746 	}
747 
748 	sdp_sk(ssk->socket)->rx_ring.cq = rx_cq;
749 	sdp_arm_rx_cq(ssk);
750 
751 	return 0;
752 
753 err_cq:
754 	kfree(ssk->rx_ring.buffer);
755 	ssk->rx_ring.buffer = NULL;
756 	return rc;
757 }
758 
759 void
760 sdp_rx_ring_destroy(struct sdp_sock *ssk)
761 {
762 
763 	cancel_work_sync(&ssk->rx_comp_work);
764 	rx_ring_destroy_lock(&ssk->rx_ring);
765 
766 	if (ssk->rx_ring.buffer) {
767 		sdp_rx_ring_purge(ssk);
768 
769 		kfree(ssk->rx_ring.buffer);
770 		ssk->rx_ring.buffer = NULL;
771 	}
772 
773 	if (ssk->rx_ring.cq) {
774 		if (ib_destroy_cq(ssk->rx_ring.cq)) {
775 			sdp_warn(ssk->socket, "destroy cq(%p) failed\n",
776 				ssk->rx_ring.cq);
777 		} else {
778 			ssk->rx_ring.cq = NULL;
779 		}
780 	}
781 
782 	WARN_ON(ring_head(ssk->rx_ring) != ring_tail(ssk->rx_ring));
783 }
784