1 /*
2  * Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates.
3  * All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include "rxr.h"
35 #include "rxr_msg.h"
36 #include "rxr_pkt_cmd.h"
37 #include "efa_cuda.h"
38 
39 /*
40  * This function contains data packet related functions
41  * Data packet is used by long message protocol.
42  */
43 
44 /*
45  * Functions to send data packet, including
46  */
47 
rxr_pkt_send_data(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry,struct rxr_pkt_entry * pkt_entry)48 ssize_t rxr_pkt_send_data(struct rxr_ep *ep,
49 			  struct rxr_tx_entry *tx_entry,
50 			  struct rxr_pkt_entry *pkt_entry)
51 {
52 	uint64_t payload_size, copied_size;
53 	struct rxr_data_pkt *data_pkt;
54 
55 	pkt_entry->x_entry = (void *)tx_entry;
56 	pkt_entry->addr = tx_entry->addr;
57 
58 	payload_size = MIN(tx_entry->total_len - tx_entry->bytes_sent,
59 			   ep->max_data_payload_size);
60 	payload_size = MIN(payload_size, tx_entry->window);
61 
62 	data_pkt = (struct rxr_data_pkt *)pkt_entry->pkt;
63 	data_pkt->hdr.seg_size = payload_size;
64 
65 	copied_size = rxr_copy_from_tx(data_pkt->data, payload_size, tx_entry, tx_entry->bytes_sent);
66 	assert(copied_size == payload_size);
67 
68 	pkt_entry->pkt_size = copied_size + sizeof(struct rxr_data_hdr);
69 	pkt_entry->addr = tx_entry->addr;
70 
71 	return rxr_pkt_entry_send_with_flags(ep, pkt_entry, pkt_entry->addr,
72 					     tx_entry->send_flags);
73 }
74 
75 /*
76  * Copies all consecutive small iov's into one buffer. If the function reaches
77  * an iov greater than the max memcpy size, it will end, only copying up to
78  * that iov.
79  */
rxr_copy_from_iov(void * buf,uint64_t remaining_len,struct rxr_tx_entry * tx_entry)80 static size_t rxr_copy_from_iov(void *buf, uint64_t remaining_len,
81 				struct rxr_tx_entry *tx_entry)
82 {
83 	struct iovec *tx_iov = tx_entry->iov;
84 	uint64_t done = 0, len;
85 
86 	while (tx_entry->iov_index < tx_entry->iov_count &&
87 	       done < remaining_len) {
88 		len = tx_iov[tx_entry->iov_index].iov_len;
89 		if (tx_entry->mr[tx_entry->iov_index])
90 			break;
91 
92 		len -= tx_entry->iov_offset;
93 
94 		/*
95 		 * If the amount to be written surpasses the remaining length,
96 		 * copy up to the remaining length and return, else copy the
97 		 * entire iov and continue.
98 		 */
99 		if (done + len > remaining_len) {
100 			len = remaining_len - done;
101 			memcpy((char *)buf + done,
102 			       (char *)tx_iov[tx_entry->iov_index].iov_base +
103 			       tx_entry->iov_offset, len);
104 			tx_entry->iov_offset += len;
105 			done += len;
106 			break;
107 		}
108 		memcpy((char *)buf + done,
109 		       (char *)tx_iov[tx_entry->iov_index].iov_base +
110 		       tx_entry->iov_offset, len);
111 		tx_entry->iov_index++;
112 		tx_entry->iov_offset = 0;
113 		done += len;
114 	}
115 	return done;
116 }
117 
rxr_pkt_send_data_desc(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry,struct rxr_pkt_entry * pkt_entry)118 ssize_t rxr_pkt_send_data_desc(struct rxr_ep *ep,
119 			       struct rxr_tx_entry *tx_entry,
120 			       struct rxr_pkt_entry *pkt_entry)
121 {
122 	struct rxr_data_pkt *data_pkt;
123 	/* The user's iov */
124 	struct iovec *tx_iov = tx_entry->iov;
125 	/* The constructed iov to be passed to sendv
126 	 * and corresponding fid_mrs
127 	 */
128 	struct iovec iov[ep->core_iov_limit];
129 	void *desc[ep->core_iov_limit];
130 	/* Constructed iov's total size */
131 	uint64_t payload_size = 0;
132 	/* pkt_entry offset to write data into */
133 	uint64_t pkt_used = 0;
134 	/* Remaining size that can fit in the constructed iov */
135 	uint64_t remaining_len = MIN(tx_entry->window,
136 				     ep->max_data_payload_size);
137 	/* The constructed iov's index */
138 	size_t i = 0;
139 	size_t len = 0;
140 
141 	ssize_t ret;
142 
143 	data_pkt = (struct rxr_data_pkt *)pkt_entry->pkt;
144 	/* Assign packet header in constructed iov */
145 	iov[i].iov_base = rxr_pkt_start(pkt_entry);
146 	iov[i].iov_len = sizeof(struct rxr_data_hdr);
147 	desc[i] = rxr_ep_mr_local(ep) ? fi_mr_desc(pkt_entry->mr) : NULL;
148 	i++;
149 
150 	/*
151 	 * Loops until payload size is at max, all user iovs are sent, the
152 	 * constructed iov count is greater than the core iov limit, or the tx
153 	 * entry window is exhausted.  Each iteration fills one entry of the
154 	 * iov to be sent.
155 	 */
156 	while (tx_entry->iov_index < tx_entry->iov_count &&
157 	       remaining_len > 0 && i < ep->core_iov_limit) {
158 		if (!rxr_ep_mr_local(ep) || tx_entry->desc[tx_entry->iov_index]) {
159 			iov[i].iov_base =
160 				(char *)tx_iov[tx_entry->iov_index].iov_base +
161 				tx_entry->iov_offset;
162 			if (rxr_ep_mr_local(ep))
163 				desc[i] = tx_entry->desc[tx_entry->iov_index];
164 
165 			len = tx_iov[tx_entry->iov_index].iov_len
166 			      - tx_entry->iov_offset;
167 			if (len > remaining_len) {
168 				len = remaining_len;
169 				tx_entry->iov_offset += len;
170 			} else {
171 				tx_entry->iov_index++;
172 				tx_entry->iov_offset = 0;
173 			}
174 			iov[i].iov_len = len;
175 		} else {
176 			/* It should be noted for cuda buffer, caller will always
177 			 * provide desc, and will not enter this branch.
178 			 *
179 			 * Copies any consecutive small iov's, returning size
180 			 * written while updating iov index and offset
181 			 */
182 
183 			len = rxr_copy_from_iov((char *)data_pkt->data +
184 						 pkt_used,
185 						 remaining_len,
186 						 tx_entry);
187 
188 			iov[i].iov_base = (char *)data_pkt->data + pkt_used;
189 			iov[i].iov_len = len;
190 			desc[i] = fi_mr_desc(pkt_entry->mr);
191 			pkt_used += len;
192 		}
193 		payload_size += len;
194 		remaining_len -= len;
195 		i++;
196 	}
197 	data_pkt->hdr.seg_size = (uint16_t)payload_size;
198 	pkt_entry->pkt_size = payload_size + RXR_DATA_HDR_SIZE;
199 	pkt_entry->x_entry = tx_entry;
200 	pkt_entry->addr = tx_entry->addr;
201 
202 	FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
203 	       "Sending an iov count, %zu with payload size: %lu.\n",
204 	       i, payload_size);
205 	ret = rxr_pkt_entry_sendv(ep, pkt_entry, tx_entry->addr,
206 				  (const struct iovec *)iov,
207 				  desc, i, tx_entry->send_flags);
208 	return ret;
209 }
210 
rxr_pkt_handle_data_send_completion(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)211 void rxr_pkt_handle_data_send_completion(struct rxr_ep *ep,
212 					 struct rxr_pkt_entry *pkt_entry)
213 {
214 	struct rxr_tx_entry *tx_entry;
215 
216 	tx_entry = (struct rxr_tx_entry *)pkt_entry->x_entry;
217 	tx_entry->bytes_acked +=
218 		rxr_get_data_pkt(pkt_entry->pkt)->hdr.seg_size;
219 
220 	if (tx_entry->total_len == tx_entry->bytes_acked)
221 		rxr_cq_handle_tx_completion(ep, tx_entry);
222 }
223 
224 /*
225  *  rxr_pkt_handle_data_recv() and related functions
226  */
rxr_pkt_proc_data(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry,struct rxr_pkt_entry * pkt_entry,char * data,size_t seg_offset,size_t seg_size)227 int rxr_pkt_proc_data(struct rxr_ep *ep,
228 		      struct rxr_rx_entry *rx_entry,
229 		      struct rxr_pkt_entry *pkt_entry,
230 		      char *data, size_t seg_offset,
231 		      size_t seg_size)
232 {
233 	struct rxr_peer *peer;
234 	int64_t bytes_left, bytes_copied;
235 	ssize_t ret = 0;
236 
237 #if ENABLE_DEBUG
238 	int pkt_type = rxr_get_base_hdr(pkt_entry->pkt)->type;
239 
240 	assert(pkt_type == RXR_DATA_PKT || pkt_type == RXR_READRSP_PKT);
241 #endif
242 	/* we are sinking message for CANCEL/DISCARD entry */
243 	if (OFI_LIKELY(!(rx_entry->rxr_flags & RXR_RECV_CANCEL)) &&
244 	    rx_entry->cq_entry.len > seg_offset) {
245 		bytes_copied = rxr_copy_to_rx(data, seg_size, rx_entry, seg_offset);
246 
247 		if (bytes_copied != MIN(seg_size, rx_entry->cq_entry.len - seg_offset)) {
248 			FI_WARN(&rxr_prov, FI_LOG_CQ, "wrong size! bytes_copied: %ld\n",
249 				bytes_copied);
250 			if (rxr_cq_handle_rx_error(ep, rx_entry, -FI_EINVAL))
251 				assert(0 && "error writing error cq entry for EOR\n");
252 		}
253 	}
254 
255 	rx_entry->bytes_done += seg_size;
256 
257 	peer = rxr_ep_get_peer(ep, rx_entry->addr);
258 	peer->rx_credits += ofi_div_ceil(seg_size, ep->max_data_payload_size);
259 
260 	rx_entry->window -= seg_size;
261 	if (ep->available_data_bufs < rxr_get_rx_pool_chunk_cnt(ep))
262 		ep->available_data_bufs++;
263 
264 	/* bytes_done is total bytes sent/received, which could be larger than
265 	 * to bytes copied to recv buffer (for truncated messages).
266 	 * rx_entry->total_len is from rtm header and is the size of send buffer,
267 	 * thus we always have:
268 	 *             rx_entry->total >= rx_entry->bytes_done
269 	 */
270 	bytes_left = rx_entry->total_len - rx_entry->bytes_done;
271 	assert(bytes_left >= 0);
272 	if (!bytes_left) {
273 #if ENABLE_DEBUG
274 		dlist_remove(&rx_entry->rx_pending_entry);
275 		ep->rx_pending--;
276 #endif
277 		rxr_cq_handle_rx_completion(ep, pkt_entry, rx_entry);
278 
279 		rxr_msg_multi_recv_free_posted_entry(ep, rx_entry);
280 		rxr_release_rx_entry(ep, rx_entry);
281 		return 0;
282 	}
283 
284 	if (!rx_entry->window) {
285 		assert(rx_entry->state == RXR_RX_RECV);
286 		ret = rxr_pkt_post_ctrl_or_queue(ep, RXR_RX_ENTRY, rx_entry, RXR_CTS_PKT, 0);
287 	}
288 
289 	rxr_pkt_entry_release_rx(ep, pkt_entry);
290 	return ret;
291 }
292 
rxr_pkt_handle_data_recv(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)293 void rxr_pkt_handle_data_recv(struct rxr_ep *ep,
294 			      struct rxr_pkt_entry *pkt_entry)
295 {
296 	struct rxr_data_pkt *data_pkt;
297 	struct rxr_rx_entry *rx_entry;
298 
299 	data_pkt = (struct rxr_data_pkt *)pkt_entry->pkt;
300 
301 	rx_entry = ofi_bufpool_get_ibuf(ep->rx_entry_pool,
302 					data_pkt->hdr.rx_id);
303 
304 	rxr_pkt_proc_data(ep, rx_entry,
305 			  pkt_entry,
306 			  data_pkt->data,
307 			  data_pkt->hdr.seg_offset,
308 			  data_pkt->hdr.seg_size);
309 }
310 
311