1 /*
2  * Copyright (c) 2006 Mellanox Technologies Ltd.  All rights reserved.
3  *
4  * This software is available to you under a choice of one of two
5  * licenses.  You may choose to be licensed under the terms of the GNU
6  * General Public License (GPL) Version 2, available from the file
7  * COPYING in the main directory of this source tree, or the
8  * OpenIB.org BSD license below:
9  *
10  *     Redistribution and use in source and binary forms, with or
11  *     without modification, are permitted provided that the following
12  *     conditions are met:
13  *
14  *      - Redistributions of source code must retain the above
15  *        copyright notice, this list of conditions and the following
16  *        disclaimer.
17  *
18  *      - Redistributions in binary form must reproduce the above
19  *        copyright notice, this list of conditions and the following
20  *        disclaimer in the documentation and/or other materials
21  *        provided with the distribution.
22  *
23  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30  * SOFTWARE.
31  */
32 #include <linux/tcp.h>
33 #include <asm/ioctls.h>
34 #include <linux/workqueue.h>
35 #include <linux/net.h>
36 #include <linux/socket.h>
37 #include <net/protocol.h>
38 #include <net/inet_common.h>
39 #include <rdma/rdma_cm.h>
40 #include <rdma/ib_verbs.h>
41 #include <rdma/ib_fmr_pool.h>
42 #include <rdma/ib_umem.h>
43 #include <net/tcp.h> /* for memcpy_toiovec */
44 #include <asm/io.h>
45 #include <asm/uaccess.h>
46 #include <linux/delay.h>
47 #include "sdp.h"
48 
49 static int sdp_post_srcavail(struct socket *sk, struct tx_srcavail_state *tx_sa)
50 {
51 	struct sdp_sock *ssk = sdp_sk(sk);
52 	struct mbuf *mb;
53 	int payload_len;
54 	struct page *payload_pg;
55 	int off, len;
56 	struct ib_umem_chunk *chunk;
57 
58 	WARN_ON(ssk->tx_sa);
59 
60 	BUG_ON(!tx_sa);
61 	BUG_ON(!tx_sa->fmr || !tx_sa->fmr->fmr->lkey);
62 	BUG_ON(!tx_sa->umem);
63 	BUG_ON(!tx_sa->umem->chunk_list.next);
64 
65 	chunk = list_entry(tx_sa->umem->chunk_list.next, struct ib_umem_chunk, list);
66 	BUG_ON(!chunk->nmap);
67 
68 	off = tx_sa->umem->offset;
69 	len = tx_sa->umem->length;
70 
71 	tx_sa->bytes_sent = tx_sa->bytes_acked = 0;
72 
73 	mb = sdp_alloc_mb_srcavail(sk, len, tx_sa->fmr->fmr->lkey, off, 0);
74 	if (!mb) {
75 		return -ENOMEM;
76 	}
77 	sdp_dbg_data(sk, "sending SrcAvail\n");
78 
79 	TX_SRCAVAIL_STATE(mb) = tx_sa; /* tx_sa is hanged on the mb
80 					 * but continue to live after mb is freed */
81 	ssk->tx_sa = tx_sa;
82 
83 	/* must have payload inlined in SrcAvail packet in combined mode */
84 	payload_len = MIN(tx_sa->umem->page_size - off, len);
85 	payload_len = MIN(payload_len, ssk->xmit_size_goal - sizeof(struct sdp_srcah));
86 	payload_pg  = sg_page(&chunk->page_list[0]);
87 	get_page(payload_pg);
88 
89 	sdp_dbg_data(sk, "payload: off: 0x%x, pg: %p, len: 0x%x\n",
90 		off, payload_pg, payload_len);
91 
92 	mb_fill_page_desc(mb, mb_shinfo(mb)->nr_frags,
93 			payload_pg, off, payload_len);
94 
95 	mb->len             += payload_len;
96 	mb->data_len         = payload_len;
97 	mb->truesize        += payload_len;
98 //	sk->sk_wmem_queued   += payload_len;
99 //	sk->sk_forward_alloc -= payload_len;
100 
101 	mb_entail(sk, ssk, mb);
102 
103 	ssk->write_seq += payload_len;
104 	SDP_SKB_CB(mb)->end_seq += payload_len;
105 
106 	tx_sa->bytes_sent = tx_sa->umem->length;
107 	tx_sa->bytes_acked = payload_len;
108 
109 	/* TODO: pushing the mb into the tx_queue should be enough */
110 
111 	return 0;
112 }
113 
114 static int sdp_post_srcavail_cancel(struct socket *sk)
115 {
116 	struct sdp_sock *ssk = sdp_sk(sk);
117 	struct mbuf *mb;
118 
119 	sdp_dbg_data(ssk->socket, "Posting srcavail cancel\n");
120 
121 	mb = sdp_alloc_mb_srcavail_cancel(sk, 0);
122 	mb_entail(sk, ssk, mb);
123 
124 	sdp_post_sends(ssk, 0);
125 
126 	schedule_delayed_work(&ssk->srcavail_cancel_work,
127 			SDP_SRCAVAIL_CANCEL_TIMEOUT);
128 
129 	return 0;
130 }
131 
132 void srcavail_cancel_timeout(struct work_struct *work)
133 {
134 	struct sdp_sock *ssk =
135 		container_of(work, struct sdp_sock, srcavail_cancel_work.work);
136 	struct socket *sk = ssk->socket;
137 
138 	lock_sock(sk);
139 
140 	sdp_dbg_data(sk, "both SrcAvail and SrcAvailCancel timedout."
141 			" closing connection\n");
142 	sdp_set_error(sk, -ECONNRESET);
143 	wake_up(&ssk->wq);
144 
145 	release_sock(sk);
146 }
147 
148 static int sdp_wait_rdmardcompl(struct sdp_sock *ssk, long *timeo_p,
149 		int ignore_signals)
150 {
151 	struct socket *sk = ssk->socket;
152 	int err = 0;
153 	long vm_wait = 0;
154 	long current_timeo = *timeo_p;
155 	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
156 	DEFINE_WAIT(wait);
157 
158 	sdp_dbg_data(sk, "sleep till RdmaRdCompl. timeo = %ld.\n", *timeo_p);
159 	sdp_prf1(sk, NULL, "Going to sleep");
160 	while (ssk->qp_active) {
161 		prepare_to_wait(sk->sk_sleep, &wait, TASK_INTERRUPTIBLE);
162 
163 		if (unlikely(!*timeo_p)) {
164 			err = -ETIME;
165 			tx_sa->abort_flags |= TX_SA_TIMEDOUT;
166 			sdp_prf1(sk, NULL, "timeout");
167 			SDPSTATS_COUNTER_INC(zcopy_tx_timeout);
168 			break;
169 		}
170 
171 		else if (tx_sa->bytes_acked > tx_sa->bytes_sent) {
172 			err = -EINVAL;
173 			sdp_dbg_data(sk, "acked bytes > sent bytes\n");
174 			tx_sa->abort_flags |= TX_SA_ERROR;
175 			break;
176 		}
177 
178 		if (tx_sa->abort_flags & TX_SA_SENDSM) {
179 			sdp_prf1(sk, NULL, "Aborting SrcAvail sending");
180 			SDPSTATS_COUNTER_INC(zcopy_tx_aborted);
181 			err = -EAGAIN;
182 			break ;
183 		}
184 
185 		if (!ignore_signals) {
186 			if (signal_pending(current)) {
187 				err = -EINTR;
188 				sdp_prf1(sk, NULL, "signalled");
189 				tx_sa->abort_flags |= TX_SA_INTRRUPTED;
190 				break;
191 			}
192 
193 			if (ssk->rx_sa && (tx_sa->bytes_acked < tx_sa->bytes_sent)) {
194 				sdp_dbg_data(sk, "Crossing SrcAvail - aborting this\n");
195 				tx_sa->abort_flags |= TX_SA_CROSS_SEND;
196 				SDPSTATS_COUNTER_INC(zcopy_cross_send);
197 				err = -ETIME;
198 				break ;
199 			}
200 		}
201 
202 		posts_handler_put(ssk);
203 
204 		sk_wait_event(sk, &current_timeo,
205 				tx_sa->abort_flags &&
206 				ssk->rx_sa &&
207 				(tx_sa->bytes_acked < tx_sa->bytes_sent) &&
208 				vm_wait);
209 		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
210 
211 		posts_handler_get(ssk);
212 
213 		if (tx_sa->bytes_acked == tx_sa->bytes_sent)
214 			break;
215 
216 		if (vm_wait) {
217 			vm_wait -= current_timeo;
218 			current_timeo = *timeo_p;
219 			if (current_timeo != MAX_SCHEDULE_TIMEOUT &&
220 			    (current_timeo -= vm_wait) < 0)
221 				current_timeo = 0;
222 			vm_wait = 0;
223 		}
224 		*timeo_p = current_timeo;
225 	}
226 
227 	finish_wait(sk->sk_sleep, &wait);
228 
229 	sdp_dbg_data(sk, "Finished waiting - RdmaRdCompl: %d/%d bytes, flags: 0x%x\n",
230 			tx_sa->bytes_acked, tx_sa->bytes_sent, tx_sa->abort_flags);
231 
232 	if (!ssk->qp_active) {
233 		sdp_dbg(sk, "QP destroyed while waiting\n");
234 		return -EINVAL;
235 	}
236 	return err;
237 }
238 
239 static void sdp_wait_rdma_wr_finished(struct sdp_sock *ssk)
240 {
241 	struct socket *sk = ssk->socket;
242 	long timeo = HZ * 5; /* Timeout for for RDMA read */
243 	DEFINE_WAIT(wait);
244 
245 	sdp_dbg_data(sk, "Sleep till RDMA wr finished.\n");
246 	while (1) {
247 		prepare_to_wait(sk->sk_sleep, &wait, TASK_UNINTERRUPTIBLE);
248 
249 		if (!ssk->tx_ring.rdma_inflight->busy) {
250 			sdp_dbg_data(sk, "got rdma cqe\n");
251 			break;
252 		}
253 
254 		if (!ssk->qp_active) {
255 			sdp_dbg_data(sk, "QP destroyed\n");
256 			break;
257 		}
258 
259 		if (!timeo) {
260 			sdp_warn(sk, "Panic: Timed out waiting for RDMA read\n");
261 			WARN_ON(1);
262 			break;
263 		}
264 
265 		posts_handler_put(ssk);
266 
267 		sdp_prf1(sk, NULL, "Going to sleep");
268 		sk_wait_event(sk, &timeo,
269 			!ssk->tx_ring.rdma_inflight->busy);
270 		sdp_prf1(sk, NULL, "Woke up");
271 		sdp_dbg_data(ssk->socket, "woke up sleepers\n");
272 
273 		posts_handler_get(ssk);
274 	}
275 
276 	finish_wait(sk->sk_sleep, &wait);
277 
278 	sdp_dbg_data(sk, "Finished waiting\n");
279 }
280 
281 int sdp_post_rdma_rd_compl(struct sdp_sock *ssk,
282 		struct rx_srcavail_state *rx_sa)
283 {
284 	struct mbuf *mb;
285 	int copied = rx_sa->used - rx_sa->reported;
286 
287 	if (rx_sa->used <= rx_sa->reported)
288 		return 0;
289 
290 	mb = sdp_alloc_mb_rdmardcompl(ssk->socket, copied, 0);
291 
292 	rx_sa->reported += copied;
293 
294 	/* TODO: What if no tx_credits available? */
295 	sdp_post_send(ssk, mb);
296 
297 	return 0;
298 }
299 
300 int sdp_post_sendsm(struct socket *sk)
301 {
302 	struct mbuf *mb = sdp_alloc_mb_sendsm(sk, 0);
303 
304 	sdp_post_send(sdp_sk(sk), mb);
305 
306 	return 0;
307 }
308 
309 static int sdp_update_iov_used(struct socket *sk, struct iovec *iov, int len)
310 {
311 	sdp_dbg_data(sk, "updating consumed 0x%x bytes from iov\n", len);
312 	while (len > 0) {
313 		if (iov->iov_len) {
314 			int copy = min_t(unsigned int, iov->iov_len, len);
315 			len -= copy;
316 			iov->iov_len -= copy;
317 			iov->iov_base += copy;
318 		}
319 		iov++;
320 	}
321 
322 	return 0;
323 }
324 
325 static inline int sge_bytes(struct ib_sge *sge, int sge_cnt)
326 {
327 	int bytes = 0;
328 
329 	while (sge_cnt > 0) {
330 		bytes += sge->length;
331 		sge++;
332 		sge_cnt--;
333 	}
334 
335 	return bytes;
336 }
337 void sdp_handle_sendsm(struct sdp_sock *ssk, u32 mseq_ack)
338 {
339 	struct socket *sk = ssk->socket;
340 	unsigned long flags;
341 
342 	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
343 
344 	if (!ssk->tx_sa) {
345 		sdp_prf1(sk, NULL, "SendSM for cancelled/finished SrcAvail");
346 		goto out;
347 	}
348 
349 	if (ssk->tx_sa->mseq > mseq_ack) {
350 		sdp_dbg_data(sk, "SendSM arrived for old SrcAvail. "
351 			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
352 			mseq_ack, ssk->tx_sa->mseq);
353 		goto out;
354 	}
355 
356 	sdp_dbg_data(sk, "Got SendSM - aborting SrcAvail\n");
357 
358 	ssk->tx_sa->abort_flags |= TX_SA_SENDSM;
359 	cancel_delayed_work(&ssk->srcavail_cancel_work);
360 
361 	wake_up(sk->sk_sleep);
362 	sdp_dbg_data(sk, "woke up sleepers\n");
363 
364 out:
365 	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
366 }
367 
368 void sdp_handle_rdma_read_compl(struct sdp_sock *ssk, u32 mseq_ack,
369 		u32 bytes_completed)
370 {
371 	struct socket *sk = ssk->socket;
372 	unsigned long flags;
373 
374 	sdp_prf1(sk, NULL, "RdmaRdCompl ssk=%p tx_sa=%p", ssk, ssk->tx_sa);
375 	sdp_dbg_data(sk, "RdmaRdCompl ssk=%p tx_sa=%p\n", ssk, ssk->tx_sa);
376 
377 	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
378 
379 	BUG_ON(!ssk);
380 
381 	if (!ssk->tx_sa) {
382 		sdp_dbg_data(sk, "Got RdmaRdCompl for aborted SrcAvail\n");
383 		goto out;
384 	}
385 
386 	if (ssk->tx_sa->mseq > mseq_ack) {
387 		sdp_dbg_data(sk, "RdmaRdCompl arrived for old SrcAvail. "
388 			"SendSM mseq_ack: 0x%x, SrcAvail mseq: 0x%x\n",
389 			mseq_ack, ssk->tx_sa->mseq);
390 		goto out;
391 	}
392 
393 	ssk->tx_sa->bytes_acked += bytes_completed;
394 
395 	wake_up(sk->sk_sleep);
396 	sdp_dbg_data(sk, "woke up sleepers\n");
397 
398 out:
399 	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
400 	return;
401 }
402 
403 static unsigned long sdp_get_max_memlockable_bytes(unsigned long offset)
404 {
405 	unsigned long avail;
406 	unsigned long lock_limit;
407 
408 	if (capable(CAP_IPC_LOCK))
409 		return ULONG_MAX;
410 
411 	lock_limit = current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur;
412 	avail = lock_limit - (current->mm->locked_vm << PAGE_SHIFT);
413 
414 	return avail - offset;
415 }
416 
417 static int sdp_alloc_fmr(struct socket *sk, void *uaddr, size_t len,
418 	struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
419 {
420 	struct ib_pool_fmr *fmr;
421 	struct ib_umem *umem;
422 	struct ib_device *dev;
423 	u64 *pages;
424 	struct ib_umem_chunk *chunk;
425 	int n, j, k;
426 	int rc = 0;
427 	unsigned long max_lockable_bytes;
428 
429 	if (unlikely(len > SDP_MAX_RDMA_READ_LEN)) {
430 		sdp_dbg_data(sk, "len:0x%lx > FMR_SIZE: 0x%lx\n",
431 			len, SDP_MAX_RDMA_READ_LEN);
432 		len = SDP_MAX_RDMA_READ_LEN;
433 	}
434 
435 	max_lockable_bytes = sdp_get_max_memlockable_bytes((unsigned long)uaddr & ~PAGE_MASK);
436 	if (unlikely(len > max_lockable_bytes)) {
437 		sdp_dbg_data(sk, "len:0x%lx > RLIMIT_MEMLOCK available: 0x%lx\n",
438 			len, max_lockable_bytes);
439 		len = max_lockable_bytes;
440 	}
441 
442 	sdp_dbg_data(sk, "user buf: %p, len:0x%lx max_lockable_bytes: 0x%lx\n",
443 			uaddr, len, max_lockable_bytes);
444 
445 	umem = ib_umem_get(&sdp_sk(sk)->context, (unsigned long)uaddr, len,
446 		IB_ACCESS_REMOTE_WRITE, 0);
447 
448 	if (IS_ERR(umem)) {
449 		rc = PTR_ERR(umem);
450 		sdp_warn(sk, "Error doing umem_get 0x%lx bytes: %d\n", len, rc);
451 		sdp_warn(sk, "RLIMIT_MEMLOCK: 0x%lx[cur] 0x%lx[max] CAP_IPC_LOCK: %d\n",
452 				current->signal->rlim[RLIMIT_MEMLOCK].rlim_cur,
453 				current->signal->rlim[RLIMIT_MEMLOCK].rlim_max,
454 				capable(CAP_IPC_LOCK));
455 		goto err_umem_get;
456 	}
457 
458 	sdp_dbg_data(sk, "umem->offset = 0x%x, length = 0x%lx\n",
459 		umem->offset, umem->length);
460 
461 	pages = (u64 *) __get_free_page(GFP_KERNEL);
462 	if (!pages)
463 		goto err_pages_alloc;
464 
465 	n = 0;
466 
467 	dev = sdp_sk(sk)->ib_device;
468 	list_for_each_entry(chunk, &umem->chunk_list, list) {
469 		for (j = 0; j < chunk->nmap; ++j) {
470 			len = ib_sg_dma_len(dev,
471 					&chunk->page_list[j]) >> PAGE_SHIFT;
472 
473 			for (k = 0; k < len; ++k) {
474 				pages[n++] = ib_sg_dma_address(dev,
475 						&chunk->page_list[j]) +
476 					umem->page_size * k;
477 
478 			}
479 		}
480 	}
481 
482 	fmr = ib_fmr_pool_map_phys(sdp_sk(sk)->sdp_dev->fmr_pool, pages, n, 0);
483 	if (IS_ERR(fmr)) {
484 		sdp_warn(sk, "Error allocating fmr: %ld\n", PTR_ERR(fmr));
485 		goto err_fmr_alloc;
486 	}
487 
488 	free_page((unsigned long) pages);
489 
490 	*_umem = umem;
491 	*_fmr = fmr;
492 
493 	return 0;
494 
495 err_fmr_alloc:
496 	free_page((unsigned long) pages);
497 
498 err_pages_alloc:
499 	ib_umem_release(umem);
500 
501 err_umem_get:
502 
503 	return rc;
504 }
505 
506 void sdp_free_fmr(struct socket *sk, struct ib_pool_fmr **_fmr, struct ib_umem **_umem)
507 {
508 	if (!sdp_sk(sk)->qp_active)
509 		return;
510 
511 	ib_fmr_pool_unmap(*_fmr);
512 	*_fmr = NULL;
513 
514 	ib_umem_release(*_umem);
515 	*_umem = NULL;
516 }
517 
518 static int sdp_post_rdma_read(struct socket *sk, struct rx_srcavail_state *rx_sa)
519 {
520 	struct sdp_sock *ssk = sdp_sk(sk);
521 	struct ib_send_wr *bad_wr;
522 	struct ib_send_wr wr = { NULL };
523 	struct ib_sge sge;
524 
525 	wr.opcode = IB_WR_RDMA_READ;
526 	wr.next = NULL;
527 	wr.wr_id = SDP_OP_RDMA;
528 	wr.wr.rdma.rkey = rx_sa->rkey;
529 	wr.send_flags = 0;
530 
531 	ssk->tx_ring.rdma_inflight = rx_sa;
532 
533 	sge.addr = rx_sa->umem->offset;
534 	sge.length = rx_sa->umem->length;
535 	sge.lkey = rx_sa->fmr->fmr->lkey;
536 
537 	wr.wr.rdma.remote_addr = rx_sa->vaddr + rx_sa->used;
538 	wr.num_sge = 1;
539 	wr.sg_list = &sge;
540 	rx_sa->busy++;
541 
542 	wr.send_flags = IB_SEND_SIGNALED;
543 
544 	return ib_post_send(ssk->qp, &wr, &bad_wr);
545 }
546 
547 int sdp_rdma_to_iovec(struct socket *sk, struct iovec *iov, struct mbuf *mb,
548 		unsigned long *used)
549 {
550 	struct sdp_sock *ssk = sdp_sk(sk);
551 	struct rx_srcavail_state *rx_sa = RX_SRCAVAIL_STATE(mb);
552 	int got_srcavail_cancel;
553 	int rc = 0;
554 	int len = *used;
555 	int copied;
556 
557 	sdp_dbg_data(ssk->socket, "preparing RDMA read."
558 		" len: 0x%x. buffer len: 0x%lx\n", len, iov->iov_len);
559 
560 	sock_hold(sk, SOCK_REF_RDMA_RD);
561 
562 	if (len > rx_sa->len) {
563 		sdp_warn(sk, "len:0x%x > rx_sa->len: 0x%x\n", len, rx_sa->len);
564 		WARN_ON(1);
565 		len = rx_sa->len;
566 	}
567 
568 	rc = sdp_alloc_fmr(sk, iov->iov_base, len, &rx_sa->fmr, &rx_sa->umem);
569 	if (rc) {
570 		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
571 		goto err_alloc_fmr;
572 	}
573 
574 	rc = sdp_post_rdma_read(sk, rx_sa);
575 	if (unlikely(rc)) {
576 		sdp_warn(sk, "ib_post_send failed with status %d.\n", rc);
577 		sdp_set_error(ssk->socket, -ECONNRESET);
578 		wake_up(&ssk->wq);
579 		goto err_post_send;
580 	}
581 
582 	sdp_prf(sk, mb, "Finished posting(rc=%d), now to wait", rc);
583 
584 	got_srcavail_cancel = ssk->srcavail_cancel_mseq > rx_sa->mseq;
585 
586 	sdp_arm_tx_cq(sk);
587 
588 	sdp_wait_rdma_wr_finished(ssk);
589 
590 	sdp_prf(sk, mb, "Finished waiting(rc=%d)", rc);
591 	if (!ssk->qp_active) {
592 		sdp_dbg_data(sk, "QP destroyed during RDMA read\n");
593 		rc = -EPIPE;
594 		goto err_post_send;
595 	}
596 
597 	copied = rx_sa->umem->length;
598 
599 	sdp_update_iov_used(sk, iov, copied);
600 	rx_sa->used += copied;
601 	atomic_add(copied, &ssk->rcv_nxt);
602 	*used = copied;
603 
604 	ssk->tx_ring.rdma_inflight = NULL;
605 
606 err_post_send:
607 	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
608 
609 err_alloc_fmr:
610 	if (rc && ssk->qp_active) {
611 		sdp_warn(sk, "Couldn't do RDMA - post sendsm\n");
612 		rx_sa->flags |= RX_SA_ABORTED;
613 	}
614 
615 	sock_put(sk, SOCK_REF_RDMA_RD);
616 
617 	return rc;
618 }
619 
620 static inline int wait_for_sndbuf(struct socket *sk, long *timeo_p)
621 {
622 	struct sdp_sock *ssk = sdp_sk(sk);
623 	int ret = 0;
624 	int credits_needed = 1;
625 
626 	sdp_dbg_data(sk, "Wait for mem\n");
627 
628 	set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
629 
630 	SDPSTATS_COUNTER_INC(send_wait_for_mem);
631 
632 	sdp_do_posts(ssk);
633 
634 	sdp_xmit_poll(ssk, 1);
635 
636 	ret = sdp_tx_wait_memory(ssk, timeo_p, &credits_needed);
637 
638 	return ret;
639 }
640 
641 static int do_sdp_sendmsg_zcopy(struct socket *sk, struct tx_srcavail_state *tx_sa,
642 		struct iovec *iov, long *timeo)
643 {
644 	struct sdp_sock *ssk = sdp_sk(sk);
645 	int rc = 0;
646 	unsigned long lock_flags;
647 
648 	rc = sdp_alloc_fmr(sk, iov->iov_base, iov->iov_len,
649 			&tx_sa->fmr, &tx_sa->umem);
650 	if (rc) {
651 		sdp_warn(sk, "Error allocating fmr: %d\n", rc);
652 		goto err_alloc_fmr;
653 	}
654 
655 	if (tx_slots_free(ssk) == 0) {
656 		rc = wait_for_sndbuf(sk, timeo);
657 		if (rc) {
658 			sdp_warn(sk, "Couldn't get send buffer\n");
659 			goto err_no_tx_slots;
660 		}
661 	}
662 
663 	rc = sdp_post_srcavail(sk, tx_sa);
664 	if (rc) {
665 		sdp_dbg(sk, "Error posting SrcAvail\n");
666 		goto err_abort_send;
667 	}
668 
669 	rc = sdp_wait_rdmardcompl(ssk, timeo, 0);
670 	if (unlikely(rc)) {
671 		enum tx_sa_flag f = tx_sa->abort_flags;
672 
673 		if (f & TX_SA_SENDSM) {
674 			sdp_dbg_data(sk, "Got SendSM. use SEND verb.\n");
675 		} else if (f & TX_SA_ERROR) {
676 			sdp_dbg_data(sk, "SrcAvail error completion\n");
677 			sdp_reset(sk);
678 			SDPSTATS_COUNTER_INC(zcopy_tx_error);
679 		} else if (ssk->qp_active) {
680 			sdp_post_srcavail_cancel(sk);
681 
682 			/* Wait for RdmaRdCompl/SendSM to
683 			 * finish the transaction */
684 			*timeo = 2 * HZ;
685 			sdp_dbg_data(sk, "Waiting for SendSM\n");
686 			sdp_wait_rdmardcompl(ssk, timeo, 1);
687 			sdp_dbg_data(sk, "finished waiting\n");
688 
689 			cancel_delayed_work(&ssk->srcavail_cancel_work);
690 		} else {
691 			sdp_dbg_data(sk, "QP was destroyed while waiting\n");
692 		}
693 	} else {
694 		sdp_dbg_data(sk, "got RdmaRdCompl\n");
695 	}
696 
697 	spin_lock_irqsave(&ssk->tx_sa_lock, lock_flags);
698 	ssk->tx_sa = NULL;
699 	spin_unlock_irqrestore(&ssk->tx_sa_lock, lock_flags);
700 
701 err_abort_send:
702 	sdp_update_iov_used(sk, iov, tx_sa->bytes_acked);
703 
704 err_no_tx_slots:
705 	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
706 
707 err_alloc_fmr:
708 	return rc;
709 }
710 
711 int sdp_sendmsg_zcopy(struct kiocb *iocb, struct socket *sk, struct iovec *iov)
712 {
713 	struct sdp_sock *ssk = sdp_sk(sk);
714 	int rc = 0;
715 	long timeo;
716 	struct tx_srcavail_state *tx_sa;
717 	int offset;
718 	size_t bytes_to_copy = 0;
719 	int copied = 0;
720 
721 	sdp_dbg_data(sk, "Sending iov: %p, iov_len: 0x%lx\n",
722 			iov->iov_base, iov->iov_len);
723 	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy start");
724 	if (ssk->rx_sa) {
725 		sdp_dbg_data(sk, "Deadlock prevent: crossing SrcAvail\n");
726 		return 0;
727 	}
728 
729 	sock_hold(ssk->socket, SOCK_REF_ZCOPY);
730 
731 	SDPSTATS_COUNTER_INC(sendmsg_zcopy_segment);
732 
733 	timeo = SDP_SRCAVAIL_ADV_TIMEOUT ;
734 
735 	/* Ok commence sending. */
736 	offset = (unsigned long)iov->iov_base & (PAGE_SIZE - 1);
737 
738 	tx_sa = kmalloc(sizeof(struct tx_srcavail_state), GFP_KERNEL);
739 	if (!tx_sa) {
740 		sdp_warn(sk, "Error allocating zcopy context\n");
741 		rc = -EAGAIN; /* Buffer too big - fallback to bcopy */
742 		goto err_alloc_tx_sa;
743 	}
744 
745 	bytes_to_copy = iov->iov_len;
746 	do {
747 		tx_sa_reset(tx_sa);
748 
749 		rc = do_sdp_sendmsg_zcopy(sk, tx_sa, iov, &timeo);
750 
751 		if (iov->iov_len && iov->iov_len < sdp_zcopy_thresh) {
752 			sdp_dbg_data(sk, "0x%lx bytes left, switching to bcopy\n",
753 				iov->iov_len);
754 			break;
755 		}
756 	} while (!rc && iov->iov_len > 0 && !tx_sa->abort_flags);
757 
758 	kfree(tx_sa);
759 err_alloc_tx_sa:
760 	copied = bytes_to_copy - iov->iov_len;
761 
762 	sdp_prf1(sk, NULL, "sdp_sendmsg_zcopy end rc: %d copied: %d", rc, copied);
763 
764 	sock_put(ssk->socket, SOCK_REF_ZCOPY);
765 
766 	if (rc < 0 && rc != -EAGAIN && rc != -ETIME)
767 		return rc;
768 
769 	return copied;
770 }
771 
772 void sdp_abort_srcavail(struct socket *sk)
773 {
774 	struct sdp_sock *ssk = sdp_sk(sk);
775 	struct tx_srcavail_state *tx_sa = ssk->tx_sa;
776 	unsigned long flags;
777 
778 	if (!tx_sa)
779 		return;
780 
781 	cancel_delayed_work(&ssk->srcavail_cancel_work);
782 	flush_scheduled_work();
783 
784 	spin_lock_irqsave(&ssk->tx_sa_lock, flags);
785 
786 	sdp_free_fmr(sk, &tx_sa->fmr, &tx_sa->umem);
787 
788 	ssk->tx_sa = NULL;
789 
790 	spin_unlock_irqrestore(&ssk->tx_sa_lock, flags);
791 }
792 
793 void sdp_abort_rdma_read(struct socket *sk)
794 {
795 	struct sdp_sock *ssk = sdp_sk(sk);
796 	struct rx_srcavail_state *rx_sa = ssk->rx_sa;
797 
798 	if (!rx_sa)
799 		return;
800 
801 	sdp_free_fmr(sk, &rx_sa->fmr, &rx_sa->umem);
802 
803 	ssk->rx_sa = NULL;
804 }
805