1 /*
2  * Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates.
3  * All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #include <inttypes.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include "ofi.h"
38 #include <ofi_util.h>
39 #include <ofi_iov.h>
40 
41 #include "rxr.h"
42 #include "efa.h"
43 #include "rxr_msg.h"
44 #include "rxr_rma.h"
45 
46 /*
47  *   General purpose utility functions
48  */
rxr_pkt_entry_alloc(struct rxr_ep * ep,struct ofi_bufpool * pkt_pool)49 struct rxr_pkt_entry *rxr_pkt_entry_alloc(struct rxr_ep *ep,
50 					  struct ofi_bufpool *pkt_pool)
51 {
52 	struct rxr_pkt_entry *pkt_entry;
53 	void *mr = NULL;
54 
55 	pkt_entry = ofi_buf_alloc_ex(pkt_pool, &mr);
56 	if (!pkt_entry)
57 		return NULL;
58 #ifdef ENABLE_EFA_POISONING
59 	memset(pkt_entry, 0, sizeof(*pkt_entry));
60 #endif
61 	dlist_init(&pkt_entry->entry);
62 #if ENABLE_DEBUG
63 	dlist_init(&pkt_entry->dbg_entry);
64 #endif
65 	pkt_entry->mr = (struct fid_mr *)mr;
66 	pkt_entry->pkt = (struct rxr_pkt *)((char *)pkt_entry +
67 			  sizeof(*pkt_entry));
68 #ifdef ENABLE_EFA_POISONING
69 	memset(pkt_entry->pkt, 0, ep->mtu_size);
70 #endif
71 	pkt_entry->state = RXR_PKT_ENTRY_IN_USE;
72 	pkt_entry->iov_count = 0;
73 	pkt_entry->next = NULL;
74 	return pkt_entry;
75 }
76 
77 static
rxr_pkt_entry_release_single_tx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt)78 void rxr_pkt_entry_release_single_tx(struct rxr_ep *ep,
79 				     struct rxr_pkt_entry *pkt)
80 {
81 	struct rxr_peer *peer;
82 
83 #if ENABLE_DEBUG
84 	dlist_remove(&pkt->dbg_entry);
85 #endif
86 	/*
87 	 * Decrement rnr_queued_pkts counter and reset backoff for this peer if
88 	 * we get a send completion for a retransmitted packet.
89 	 */
90 	if (OFI_UNLIKELY(pkt->state == RXR_PKT_ENTRY_RNR_RETRANSMIT)) {
91 		peer = rxr_ep_get_peer(ep, pkt->addr);
92 		peer->rnr_queued_pkt_cnt--;
93 		peer->timeout_interval = 0;
94 		peer->rnr_timeout_exp = 0;
95 		if (peer->flags & RXR_PEER_IN_BACKOFF)
96 			dlist_remove(&peer->rnr_entry);
97 		peer->flags &= ~RXR_PEER_IN_BACKOFF;
98 		FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
99 		       "reset backoff timer for peer: %" PRIu64 "\n",
100 		       pkt->addr);
101 	}
102 #ifdef ENABLE_EFA_POISONING
103 	rxr_poison_mem_region((uint32_t *)pkt, ep->tx_pkt_pool_entry_sz);
104 #endif
105 	pkt->state = RXR_PKT_ENTRY_FREE;
106 	ofi_buf_free(pkt);
107 }
108 
rxr_pkt_entry_release_tx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)109 void rxr_pkt_entry_release_tx(struct rxr_ep *ep,
110 			      struct rxr_pkt_entry *pkt_entry)
111 {
112 	struct rxr_pkt_entry *next;
113 
114 	while (pkt_entry) {
115 		next = pkt_entry->next;
116 		rxr_pkt_entry_release_single_tx(ep, pkt_entry);
117 		pkt_entry = next;
118 	}
119 }
120 
121 static
rxr_pkt_entry_release_single_rx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)122 void rxr_pkt_entry_release_single_rx(struct rxr_ep *ep,
123 				     struct rxr_pkt_entry *pkt_entry)
124 {
125 	if (pkt_entry->type == RXR_PKT_ENTRY_POSTED) {
126 		struct rxr_peer *peer;
127 
128 		peer = rxr_ep_get_peer(ep, pkt_entry->addr);
129 		assert(peer);
130 		if (peer->is_local)
131 			ep->rx_bufs_shm_to_post++;
132 		else
133 			ep->rx_bufs_efa_to_post++;
134 	}
135 #if ENABLE_DEBUG
136 	dlist_remove(&pkt_entry->dbg_entry);
137 #endif
138 #ifdef ENABLE_EFA_POISONING
139 	/* the same pool size is used for all types of rx pkt_entries */
140 	rxr_poison_mem_region((uint32_t *)pkt_entry, ep->rx_pkt_pool_entry_sz);
141 #endif
142 	pkt_entry->state = RXR_PKT_ENTRY_FREE;
143 	ofi_buf_free(pkt_entry);
144 }
145 
rxr_pkt_entry_release_rx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)146 void rxr_pkt_entry_release_rx(struct rxr_ep *ep,
147 			      struct rxr_pkt_entry *pkt_entry)
148 {
149 	struct rxr_pkt_entry *next;
150 
151 	while (pkt_entry) {
152 		next = pkt_entry->next;
153 		rxr_pkt_entry_release_single_rx(ep, pkt_entry);
154 		pkt_entry = next;
155 	}
156 }
157 
158 static
rxr_pkt_entry_copy(struct rxr_ep * ep,struct rxr_pkt_entry * dest,struct rxr_pkt_entry * src,int new_entry_type)159 void rxr_pkt_entry_copy(struct rxr_ep *ep,
160 			struct rxr_pkt_entry *dest,
161 			struct rxr_pkt_entry *src,
162 			int new_entry_type)
163 {
164 	FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
165 	       "Copying packet out of posted buffer\n");
166 	assert(src->type == RXR_PKT_ENTRY_POSTED);
167 	memcpy(dest, src, sizeof(struct rxr_pkt_entry));
168 	dest->pkt = (struct rxr_pkt *)((char *)dest + sizeof(*dest));
169 	memcpy(dest->pkt, src->pkt, ep->mtu_size);
170 	dlist_init(&dest->entry);
171 #if ENABLE_DEBUG
172 	dlist_init(&dest->dbg_entry);
173 #endif
174 	dest->state = RXR_PKT_ENTRY_IN_USE;
175 	dest->type = new_entry_type;
176 }
177 
178 /*
179  * Create a new rx_entry for an unexpected message. Store the packet for later
180  * processing and put the rx_entry on the appropriate unexpected list.
181  */
rxr_pkt_get_unexp(struct rxr_ep * ep,struct rxr_pkt_entry ** pkt_entry_ptr)182 struct rxr_pkt_entry *rxr_pkt_get_unexp(struct rxr_ep *ep,
183 					struct rxr_pkt_entry **pkt_entry_ptr)
184 {
185 	struct rxr_pkt_entry *unexp_pkt_entry;
186 
187 	if (rxr_env.rx_copy_unexp && (*pkt_entry_ptr)->type == RXR_PKT_ENTRY_POSTED) {
188 		unexp_pkt_entry = rxr_pkt_entry_clone(ep, ep->rx_unexp_pkt_pool, *pkt_entry_ptr, RXR_PKT_ENTRY_UNEXP);
189 		if (OFI_UNLIKELY(!unexp_pkt_entry)) {
190 			FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
191 				"Unable to allocate rx_pkt_entry for unexp msg\n");
192 			return NULL;
193 		}
194 		rxr_pkt_entry_release_rx(ep, *pkt_entry_ptr);
195 		*pkt_entry_ptr = unexp_pkt_entry;
196 	} else {
197 		unexp_pkt_entry = *pkt_entry_ptr;
198 	}
199 
200 	return unexp_pkt_entry;
201 }
202 
rxr_pkt_entry_release_cloned(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)203 void rxr_pkt_entry_release_cloned(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry)
204 {
205 	struct rxr_pkt_entry *next;
206 
207 	while (pkt_entry) {
208 		assert(pkt_entry->type == RXR_PKT_ENTRY_OOO  ||
209 		       pkt_entry->type == RXR_PKT_ENTRY_UNEXP);
210 #ifdef ENABLE_EFA_POISONING
211 		rxr_poison_mem_region((uint32_t *)pkt_entry, ep->tx_pkt_pool_entry_sz);
212 #endif
213 		pkt_entry->state = RXR_PKT_ENTRY_FREE;
214 		ofi_buf_free(pkt_entry);
215 		next = pkt_entry->next;
216 		pkt_entry = next;
217 	}
218 }
219 
rxr_pkt_entry_clone(struct rxr_ep * ep,struct ofi_bufpool * pkt_pool,struct rxr_pkt_entry * src,int new_entry_type)220 struct rxr_pkt_entry *rxr_pkt_entry_clone(struct rxr_ep *ep,
221 					  struct ofi_bufpool *pkt_pool,
222 					  struct rxr_pkt_entry *src,
223 					  int new_entry_type)
224 {
225 	struct rxr_pkt_entry *root = NULL;
226 	struct rxr_pkt_entry *dst;
227 
228 	assert(src);
229 	assert(new_entry_type == RXR_PKT_ENTRY_OOO ||
230 	       new_entry_type == RXR_PKT_ENTRY_UNEXP);
231 
232 	dst = rxr_pkt_entry_alloc(ep, pkt_pool);
233 	if (!dst)
234 		return NULL;
235 
236 	rxr_pkt_entry_copy(ep, dst, src, new_entry_type);
237 	root = dst;
238 	while (src->next) {
239 		dst->next = rxr_pkt_entry_alloc(ep, pkt_pool);
240 		if (!dst->next) {
241 			rxr_pkt_entry_release_cloned(ep, root);
242 			return NULL;
243 		}
244 
245 		rxr_pkt_entry_copy(ep, dst->next, src->next, new_entry_type);
246 		src = src->next;
247 		dst = dst->next;
248 	}
249 
250 	assert(dst && !dst->next);
251 	return root;
252 }
253 
rxr_pkt_entry_append(struct rxr_pkt_entry * dst,struct rxr_pkt_entry * src)254 void rxr_pkt_entry_append(struct rxr_pkt_entry *dst,
255 			  struct rxr_pkt_entry *src)
256 {
257 	assert(dst);
258 
259 	while (dst->next)
260 		dst = dst->next;
261 	assert(dst && !dst->next);
262 	dst->next = src;
263 }
264 
265 static inline
rxr_pkt_entry_sendmsg(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,const struct fi_msg * msg,uint64_t flags)266 ssize_t rxr_pkt_entry_sendmsg(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry,
267 			      const struct fi_msg *msg, uint64_t flags)
268 {
269 	struct rxr_peer *peer;
270 	size_t ret;
271 
272 	peer = rxr_ep_get_peer(ep, pkt_entry->addr);
273 	assert(ep->tx_pending <= ep->max_outstanding_tx);
274 
275 	if (ep->tx_pending == ep->max_outstanding_tx)
276 		return -FI_EAGAIN;
277 
278 	if (peer->flags & RXR_PEER_IN_BACKOFF)
279 		return -FI_EAGAIN;
280 
281 #if ENABLE_DEBUG
282 	dlist_insert_tail(&pkt_entry->dbg_entry, &ep->tx_pkt_list);
283 #ifdef ENABLE_RXR_PKT_DUMP
284 	rxr_pkt_print("Sent", ep, (struct rxr_base_hdr *)pkt_entry->pkt);
285 #endif
286 #endif
287 	if (peer->is_local) {
288 		assert(ep->use_shm);
289 		ret = fi_sendmsg(ep->shm_ep, msg, flags);
290 	} else {
291 		ret = fi_sendmsg(ep->rdm_ep, msg, flags);
292 		if (OFI_LIKELY(!ret))
293 			rxr_ep_inc_tx_pending(ep, peer);
294 	}
295 
296 	return ret;
297 }
298 
rxr_pkt_entry_sendv(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr,const struct iovec * iov,void ** desc,size_t count,uint64_t flags)299 ssize_t rxr_pkt_entry_sendv(struct rxr_ep *ep,
300 			    struct rxr_pkt_entry *pkt_entry,
301 			    fi_addr_t addr, const struct iovec *iov,
302 			    void **desc, size_t count, uint64_t flags)
303 {
304 	struct fi_msg msg;
305 	struct rxr_peer *peer;
306 
307 	msg.msg_iov = iov;
308 	msg.desc = desc;
309 	msg.iov_count = count;
310 	peer = rxr_ep_get_peer(ep, addr);
311 	msg.addr = (peer->is_local) ? peer->shm_fiaddr : addr;
312 	msg.context = pkt_entry;
313 	msg.data = 0;
314 
315 	return rxr_pkt_entry_sendmsg(ep, pkt_entry, &msg, flags);
316 }
317 
318 /* rxr_pkt_start currently expects data pkt right after pkt hdr */
rxr_pkt_entry_send_with_flags(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr,uint64_t flags)319 ssize_t rxr_pkt_entry_send_with_flags(struct rxr_ep *ep,
320 				      struct rxr_pkt_entry *pkt_entry,
321 				      fi_addr_t addr, uint64_t flags)
322 {
323 	struct iovec iov;
324 	void *desc;
325 
326 	iov.iov_base = rxr_pkt_start(pkt_entry);
327 	iov.iov_len = pkt_entry->pkt_size;
328 
329 	if (rxr_ep_get_peer(ep, addr)->is_local) {
330 		assert(ep->use_shm);
331 		desc = NULL;
332 	} else {
333 		desc = rxr_ep_mr_local(ep) ? fi_mr_desc(pkt_entry->mr) : NULL;
334 	}
335 
336 	return rxr_pkt_entry_sendv(ep, pkt_entry, addr, &iov, &desc, 1, flags);
337 }
338 
rxr_pkt_entry_send(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr)339 ssize_t rxr_pkt_entry_send(struct rxr_ep *ep,
340 			   struct rxr_pkt_entry *pkt_entry,
341 			   fi_addr_t addr)
342 {
343 	return rxr_pkt_entry_send_with_flags(ep, pkt_entry, addr, 0);
344 }
345 
rxr_pkt_entry_inject(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr)346 ssize_t rxr_pkt_entry_inject(struct rxr_ep *ep,
347 			     struct rxr_pkt_entry *pkt_entry,
348 			     fi_addr_t addr)
349 {
350 	struct rxr_peer *peer;
351 
352 	/* currently only EOR packet is injected using shm ep */
353 	peer = rxr_ep_get_peer(ep, addr);
354 	assert(peer);
355 	assert(ep->use_shm && peer->is_local);
356 	return fi_inject(ep->shm_ep, rxr_pkt_start(pkt_entry), pkt_entry->pkt_size,
357 			 peer->shm_fiaddr);
358 }
359 
360 /*
361  * Functions for pkt_rx_map
362  */
rxr_pkt_rx_map_lookup(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)363 struct rxr_rx_entry *rxr_pkt_rx_map_lookup(struct rxr_ep *ep,
364 					   struct rxr_pkt_entry *pkt_entry)
365 {
366 	struct rxr_pkt_rx_map *entry = NULL;
367 	struct rxr_pkt_rx_key key;
368 
369 	key.msg_id = rxr_pkt_msg_id(pkt_entry);
370 	key.addr = pkt_entry->addr;
371 	HASH_FIND(hh, ep->pkt_rx_map, &key, sizeof(struct rxr_pkt_rx_key), entry);
372 	return entry ? entry->rx_entry : NULL;
373 }
374 
rxr_pkt_rx_map_insert(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,struct rxr_rx_entry * rx_entry)375 void rxr_pkt_rx_map_insert(struct rxr_ep *ep,
376 			   struct rxr_pkt_entry *pkt_entry,
377 			   struct rxr_rx_entry *rx_entry)
378 {
379 	struct rxr_pkt_rx_map *entry;
380 
381 	entry = ofi_buf_alloc(ep->map_entry_pool);
382 	if (OFI_UNLIKELY(!entry)) {
383 		FI_WARN(&rxr_prov, FI_LOG_CQ,
384 			"Map entries for medium size message exhausted.\n");
385 			efa_eq_write_error(&ep->util_ep, FI_ENOBUFS, -FI_ENOBUFS);
386 		return;
387 	}
388 
389 	entry->key.msg_id = rxr_pkt_msg_id(pkt_entry);
390 	entry->key.addr = pkt_entry->addr;
391 
392 #if ENABLE_DEBUG
393 	{
394 		struct rxr_pkt_rx_map *existing_entry = NULL;
395 
396 		HASH_FIND(hh, ep->pkt_rx_map, &entry->key, sizeof(struct rxr_pkt_rx_key), existing_entry);
397 		assert(!existing_entry);
398 	}
399 #endif
400 
401 	entry->rx_entry = rx_entry;
402 	HASH_ADD(hh, ep->pkt_rx_map, key, sizeof(struct rxr_pkt_rx_key), entry);
403 }
404 
rxr_pkt_rx_map_remove(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,struct rxr_rx_entry * rx_entry)405 void rxr_pkt_rx_map_remove(struct rxr_ep *ep,
406 			   struct rxr_pkt_entry *pkt_entry,
407 			   struct rxr_rx_entry *rx_entry)
408 {
409 	struct rxr_pkt_rx_map *entry;
410 	struct rxr_pkt_rx_key key;
411 
412 	key.msg_id = rxr_pkt_msg_id(pkt_entry);
413 	key.addr = pkt_entry->addr;
414 
415 	HASH_FIND(hh, ep->pkt_rx_map, &key, sizeof(key), entry);
416 	assert(entry && entry->rx_entry == rx_entry);
417 	HASH_DEL(ep->pkt_rx_map, entry);
418 	ofi_buf_free(entry);
419 }
420 
421