1 /*
2 * Copyright (c) 2019-2020 Amazon.com, Inc. or its affiliates.
3 * All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34 #include <inttypes.h>
35 #include <stdlib.h>
36 #include <string.h>
37 #include "ofi.h"
38 #include <ofi_util.h>
39 #include <ofi_iov.h>
40
41 #include "rxr.h"
42 #include "efa.h"
43 #include "rxr_msg.h"
44 #include "rxr_rma.h"
45
46 /*
47 * General purpose utility functions
48 */
rxr_pkt_entry_alloc(struct rxr_ep * ep,struct ofi_bufpool * pkt_pool)49 struct rxr_pkt_entry *rxr_pkt_entry_alloc(struct rxr_ep *ep,
50 struct ofi_bufpool *pkt_pool)
51 {
52 struct rxr_pkt_entry *pkt_entry;
53 void *mr = NULL;
54
55 pkt_entry = ofi_buf_alloc_ex(pkt_pool, &mr);
56 if (!pkt_entry)
57 return NULL;
58 #ifdef ENABLE_EFA_POISONING
59 memset(pkt_entry, 0, sizeof(*pkt_entry));
60 #endif
61 dlist_init(&pkt_entry->entry);
62 #if ENABLE_DEBUG
63 dlist_init(&pkt_entry->dbg_entry);
64 #endif
65 pkt_entry->mr = (struct fid_mr *)mr;
66 pkt_entry->pkt = (struct rxr_pkt *)((char *)pkt_entry +
67 sizeof(*pkt_entry));
68 #ifdef ENABLE_EFA_POISONING
69 memset(pkt_entry->pkt, 0, ep->mtu_size);
70 #endif
71 pkt_entry->state = RXR_PKT_ENTRY_IN_USE;
72 pkt_entry->iov_count = 0;
73 pkt_entry->next = NULL;
74 return pkt_entry;
75 }
76
77 static
rxr_pkt_entry_release_single_tx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt)78 void rxr_pkt_entry_release_single_tx(struct rxr_ep *ep,
79 struct rxr_pkt_entry *pkt)
80 {
81 struct rxr_peer *peer;
82
83 #if ENABLE_DEBUG
84 dlist_remove(&pkt->dbg_entry);
85 #endif
86 /*
87 * Decrement rnr_queued_pkts counter and reset backoff for this peer if
88 * we get a send completion for a retransmitted packet.
89 */
90 if (OFI_UNLIKELY(pkt->state == RXR_PKT_ENTRY_RNR_RETRANSMIT)) {
91 peer = rxr_ep_get_peer(ep, pkt->addr);
92 peer->rnr_queued_pkt_cnt--;
93 peer->timeout_interval = 0;
94 peer->rnr_timeout_exp = 0;
95 if (peer->flags & RXR_PEER_IN_BACKOFF)
96 dlist_remove(&peer->rnr_entry);
97 peer->flags &= ~RXR_PEER_IN_BACKOFF;
98 FI_DBG(&rxr_prov, FI_LOG_EP_DATA,
99 "reset backoff timer for peer: %" PRIu64 "\n",
100 pkt->addr);
101 }
102 #ifdef ENABLE_EFA_POISONING
103 rxr_poison_mem_region((uint32_t *)pkt, ep->tx_pkt_pool_entry_sz);
104 #endif
105 pkt->state = RXR_PKT_ENTRY_FREE;
106 ofi_buf_free(pkt);
107 }
108
rxr_pkt_entry_release_tx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)109 void rxr_pkt_entry_release_tx(struct rxr_ep *ep,
110 struct rxr_pkt_entry *pkt_entry)
111 {
112 struct rxr_pkt_entry *next;
113
114 while (pkt_entry) {
115 next = pkt_entry->next;
116 rxr_pkt_entry_release_single_tx(ep, pkt_entry);
117 pkt_entry = next;
118 }
119 }
120
121 static
rxr_pkt_entry_release_single_rx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)122 void rxr_pkt_entry_release_single_rx(struct rxr_ep *ep,
123 struct rxr_pkt_entry *pkt_entry)
124 {
125 if (pkt_entry->type == RXR_PKT_ENTRY_POSTED) {
126 struct rxr_peer *peer;
127
128 peer = rxr_ep_get_peer(ep, pkt_entry->addr);
129 assert(peer);
130 if (peer->is_local)
131 ep->rx_bufs_shm_to_post++;
132 else
133 ep->rx_bufs_efa_to_post++;
134 }
135 #if ENABLE_DEBUG
136 dlist_remove(&pkt_entry->dbg_entry);
137 #endif
138 #ifdef ENABLE_EFA_POISONING
139 /* the same pool size is used for all types of rx pkt_entries */
140 rxr_poison_mem_region((uint32_t *)pkt_entry, ep->rx_pkt_pool_entry_sz);
141 #endif
142 pkt_entry->state = RXR_PKT_ENTRY_FREE;
143 ofi_buf_free(pkt_entry);
144 }
145
rxr_pkt_entry_release_rx(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)146 void rxr_pkt_entry_release_rx(struct rxr_ep *ep,
147 struct rxr_pkt_entry *pkt_entry)
148 {
149 struct rxr_pkt_entry *next;
150
151 while (pkt_entry) {
152 next = pkt_entry->next;
153 rxr_pkt_entry_release_single_rx(ep, pkt_entry);
154 pkt_entry = next;
155 }
156 }
157
158 static
rxr_pkt_entry_copy(struct rxr_ep * ep,struct rxr_pkt_entry * dest,struct rxr_pkt_entry * src,int new_entry_type)159 void rxr_pkt_entry_copy(struct rxr_ep *ep,
160 struct rxr_pkt_entry *dest,
161 struct rxr_pkt_entry *src,
162 int new_entry_type)
163 {
164 FI_DBG(&rxr_prov, FI_LOG_EP_CTRL,
165 "Copying packet out of posted buffer\n");
166 assert(src->type == RXR_PKT_ENTRY_POSTED);
167 memcpy(dest, src, sizeof(struct rxr_pkt_entry));
168 dest->pkt = (struct rxr_pkt *)((char *)dest + sizeof(*dest));
169 memcpy(dest->pkt, src->pkt, ep->mtu_size);
170 dlist_init(&dest->entry);
171 #if ENABLE_DEBUG
172 dlist_init(&dest->dbg_entry);
173 #endif
174 dest->state = RXR_PKT_ENTRY_IN_USE;
175 dest->type = new_entry_type;
176 }
177
178 /*
179 * Create a new rx_entry for an unexpected message. Store the packet for later
180 * processing and put the rx_entry on the appropriate unexpected list.
181 */
rxr_pkt_get_unexp(struct rxr_ep * ep,struct rxr_pkt_entry ** pkt_entry_ptr)182 struct rxr_pkt_entry *rxr_pkt_get_unexp(struct rxr_ep *ep,
183 struct rxr_pkt_entry **pkt_entry_ptr)
184 {
185 struct rxr_pkt_entry *unexp_pkt_entry;
186
187 if (rxr_env.rx_copy_unexp && (*pkt_entry_ptr)->type == RXR_PKT_ENTRY_POSTED) {
188 unexp_pkt_entry = rxr_pkt_entry_clone(ep, ep->rx_unexp_pkt_pool, *pkt_entry_ptr, RXR_PKT_ENTRY_UNEXP);
189 if (OFI_UNLIKELY(!unexp_pkt_entry)) {
190 FI_WARN(&rxr_prov, FI_LOG_EP_CTRL,
191 "Unable to allocate rx_pkt_entry for unexp msg\n");
192 return NULL;
193 }
194 rxr_pkt_entry_release_rx(ep, *pkt_entry_ptr);
195 *pkt_entry_ptr = unexp_pkt_entry;
196 } else {
197 unexp_pkt_entry = *pkt_entry_ptr;
198 }
199
200 return unexp_pkt_entry;
201 }
202
rxr_pkt_entry_release_cloned(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)203 void rxr_pkt_entry_release_cloned(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry)
204 {
205 struct rxr_pkt_entry *next;
206
207 while (pkt_entry) {
208 assert(pkt_entry->type == RXR_PKT_ENTRY_OOO ||
209 pkt_entry->type == RXR_PKT_ENTRY_UNEXP);
210 #ifdef ENABLE_EFA_POISONING
211 rxr_poison_mem_region((uint32_t *)pkt_entry, ep->tx_pkt_pool_entry_sz);
212 #endif
213 pkt_entry->state = RXR_PKT_ENTRY_FREE;
214 ofi_buf_free(pkt_entry);
215 next = pkt_entry->next;
216 pkt_entry = next;
217 }
218 }
219
rxr_pkt_entry_clone(struct rxr_ep * ep,struct ofi_bufpool * pkt_pool,struct rxr_pkt_entry * src,int new_entry_type)220 struct rxr_pkt_entry *rxr_pkt_entry_clone(struct rxr_ep *ep,
221 struct ofi_bufpool *pkt_pool,
222 struct rxr_pkt_entry *src,
223 int new_entry_type)
224 {
225 struct rxr_pkt_entry *root = NULL;
226 struct rxr_pkt_entry *dst;
227
228 assert(src);
229 assert(new_entry_type == RXR_PKT_ENTRY_OOO ||
230 new_entry_type == RXR_PKT_ENTRY_UNEXP);
231
232 dst = rxr_pkt_entry_alloc(ep, pkt_pool);
233 if (!dst)
234 return NULL;
235
236 rxr_pkt_entry_copy(ep, dst, src, new_entry_type);
237 root = dst;
238 while (src->next) {
239 dst->next = rxr_pkt_entry_alloc(ep, pkt_pool);
240 if (!dst->next) {
241 rxr_pkt_entry_release_cloned(ep, root);
242 return NULL;
243 }
244
245 rxr_pkt_entry_copy(ep, dst->next, src->next, new_entry_type);
246 src = src->next;
247 dst = dst->next;
248 }
249
250 assert(dst && !dst->next);
251 return root;
252 }
253
rxr_pkt_entry_append(struct rxr_pkt_entry * dst,struct rxr_pkt_entry * src)254 void rxr_pkt_entry_append(struct rxr_pkt_entry *dst,
255 struct rxr_pkt_entry *src)
256 {
257 assert(dst);
258
259 while (dst->next)
260 dst = dst->next;
261 assert(dst && !dst->next);
262 dst->next = src;
263 }
264
265 static inline
rxr_pkt_entry_sendmsg(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,const struct fi_msg * msg,uint64_t flags)266 ssize_t rxr_pkt_entry_sendmsg(struct rxr_ep *ep, struct rxr_pkt_entry *pkt_entry,
267 const struct fi_msg *msg, uint64_t flags)
268 {
269 struct rxr_peer *peer;
270 size_t ret;
271
272 peer = rxr_ep_get_peer(ep, pkt_entry->addr);
273 assert(ep->tx_pending <= ep->max_outstanding_tx);
274
275 if (ep->tx_pending == ep->max_outstanding_tx)
276 return -FI_EAGAIN;
277
278 if (peer->flags & RXR_PEER_IN_BACKOFF)
279 return -FI_EAGAIN;
280
281 #if ENABLE_DEBUG
282 dlist_insert_tail(&pkt_entry->dbg_entry, &ep->tx_pkt_list);
283 #ifdef ENABLE_RXR_PKT_DUMP
284 rxr_pkt_print("Sent", ep, (struct rxr_base_hdr *)pkt_entry->pkt);
285 #endif
286 #endif
287 if (peer->is_local) {
288 assert(ep->use_shm);
289 ret = fi_sendmsg(ep->shm_ep, msg, flags);
290 } else {
291 ret = fi_sendmsg(ep->rdm_ep, msg, flags);
292 if (OFI_LIKELY(!ret))
293 rxr_ep_inc_tx_pending(ep, peer);
294 }
295
296 return ret;
297 }
298
rxr_pkt_entry_sendv(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr,const struct iovec * iov,void ** desc,size_t count,uint64_t flags)299 ssize_t rxr_pkt_entry_sendv(struct rxr_ep *ep,
300 struct rxr_pkt_entry *pkt_entry,
301 fi_addr_t addr, const struct iovec *iov,
302 void **desc, size_t count, uint64_t flags)
303 {
304 struct fi_msg msg;
305 struct rxr_peer *peer;
306
307 msg.msg_iov = iov;
308 msg.desc = desc;
309 msg.iov_count = count;
310 peer = rxr_ep_get_peer(ep, addr);
311 msg.addr = (peer->is_local) ? peer->shm_fiaddr : addr;
312 msg.context = pkt_entry;
313 msg.data = 0;
314
315 return rxr_pkt_entry_sendmsg(ep, pkt_entry, &msg, flags);
316 }
317
318 /* rxr_pkt_start currently expects data pkt right after pkt hdr */
rxr_pkt_entry_send_with_flags(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr,uint64_t flags)319 ssize_t rxr_pkt_entry_send_with_flags(struct rxr_ep *ep,
320 struct rxr_pkt_entry *pkt_entry,
321 fi_addr_t addr, uint64_t flags)
322 {
323 struct iovec iov;
324 void *desc;
325
326 iov.iov_base = rxr_pkt_start(pkt_entry);
327 iov.iov_len = pkt_entry->pkt_size;
328
329 if (rxr_ep_get_peer(ep, addr)->is_local) {
330 assert(ep->use_shm);
331 desc = NULL;
332 } else {
333 desc = rxr_ep_mr_local(ep) ? fi_mr_desc(pkt_entry->mr) : NULL;
334 }
335
336 return rxr_pkt_entry_sendv(ep, pkt_entry, addr, &iov, &desc, 1, flags);
337 }
338
rxr_pkt_entry_send(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr)339 ssize_t rxr_pkt_entry_send(struct rxr_ep *ep,
340 struct rxr_pkt_entry *pkt_entry,
341 fi_addr_t addr)
342 {
343 return rxr_pkt_entry_send_with_flags(ep, pkt_entry, addr, 0);
344 }
345
rxr_pkt_entry_inject(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,fi_addr_t addr)346 ssize_t rxr_pkt_entry_inject(struct rxr_ep *ep,
347 struct rxr_pkt_entry *pkt_entry,
348 fi_addr_t addr)
349 {
350 struct rxr_peer *peer;
351
352 /* currently only EOR packet is injected using shm ep */
353 peer = rxr_ep_get_peer(ep, addr);
354 assert(peer);
355 assert(ep->use_shm && peer->is_local);
356 return fi_inject(ep->shm_ep, rxr_pkt_start(pkt_entry), pkt_entry->pkt_size,
357 peer->shm_fiaddr);
358 }
359
360 /*
361 * Functions for pkt_rx_map
362 */
rxr_pkt_rx_map_lookup(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry)363 struct rxr_rx_entry *rxr_pkt_rx_map_lookup(struct rxr_ep *ep,
364 struct rxr_pkt_entry *pkt_entry)
365 {
366 struct rxr_pkt_rx_map *entry = NULL;
367 struct rxr_pkt_rx_key key;
368
369 key.msg_id = rxr_pkt_msg_id(pkt_entry);
370 key.addr = pkt_entry->addr;
371 HASH_FIND(hh, ep->pkt_rx_map, &key, sizeof(struct rxr_pkt_rx_key), entry);
372 return entry ? entry->rx_entry : NULL;
373 }
374
rxr_pkt_rx_map_insert(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,struct rxr_rx_entry * rx_entry)375 void rxr_pkt_rx_map_insert(struct rxr_ep *ep,
376 struct rxr_pkt_entry *pkt_entry,
377 struct rxr_rx_entry *rx_entry)
378 {
379 struct rxr_pkt_rx_map *entry;
380
381 entry = ofi_buf_alloc(ep->map_entry_pool);
382 if (OFI_UNLIKELY(!entry)) {
383 FI_WARN(&rxr_prov, FI_LOG_CQ,
384 "Map entries for medium size message exhausted.\n");
385 efa_eq_write_error(&ep->util_ep, FI_ENOBUFS, -FI_ENOBUFS);
386 return;
387 }
388
389 entry->key.msg_id = rxr_pkt_msg_id(pkt_entry);
390 entry->key.addr = pkt_entry->addr;
391
392 #if ENABLE_DEBUG
393 {
394 struct rxr_pkt_rx_map *existing_entry = NULL;
395
396 HASH_FIND(hh, ep->pkt_rx_map, &entry->key, sizeof(struct rxr_pkt_rx_key), existing_entry);
397 assert(!existing_entry);
398 }
399 #endif
400
401 entry->rx_entry = rx_entry;
402 HASH_ADD(hh, ep->pkt_rx_map, key, sizeof(struct rxr_pkt_rx_key), entry);
403 }
404
rxr_pkt_rx_map_remove(struct rxr_ep * ep,struct rxr_pkt_entry * pkt_entry,struct rxr_rx_entry * rx_entry)405 void rxr_pkt_rx_map_remove(struct rxr_ep *ep,
406 struct rxr_pkt_entry *pkt_entry,
407 struct rxr_rx_entry *rx_entry)
408 {
409 struct rxr_pkt_rx_map *entry;
410 struct rxr_pkt_rx_key key;
411
412 key.msg_id = rxr_pkt_msg_id(pkt_entry);
413 key.addr = pkt_entry->addr;
414
415 HASH_FIND(hh, ep->pkt_rx_map, &key, sizeof(key), entry);
416 assert(entry && entry->rx_entry == rx_entry);
417 HASH_DEL(ep->pkt_rx_map, entry);
418 ofi_buf_free(entry);
419 }
420
421