1 /*
2 * Copyright (c) 2019 Amazon.com, Inc. or its affiliates.
3 * All rights reserved.
4 *
5 * This software is available to you under a choice of one of two
6 * licenses. You may choose to be licensed under the terms of the GNU
7 * General Public License (GPL) Version 2, available from the file
8 * COPYING in the main directory of this source tree, or the
9 * BSD license below:
10 *
11 * Redistribution and use in source and binary forms, with or
12 * without modification, are permitted provided that the following
13 * conditions are met:
14 *
15 * - Redistributions of source code must retain the above
16 * copyright notice, this list of conditions and the following
17 * disclaimer.
18 *
19 * - Redistributions in binary form must reproduce the above
20 * copyright notice, this list of conditions and the following
21 * disclaimer in the documentation and/or other materials
22 * provided with the distribution.
23 *
24 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31 * SOFTWARE.
32 */
33
34 #if HAVE_CONFIG_H
35 #include <config.h>
36 #endif /* HAVE_CONFIG_H */
37
38 #ifndef _RXR_H_
39 #define _RXR_H_
40
41 #include <pthread.h>
42 #include <rdma/fabric.h>
43 #include <rdma/fi_atomic.h>
44 #include <rdma/fi_cm.h>
45 #include <rdma/fi_domain.h>
46 #include <rdma/fi_endpoint.h>
47 #include <rdma/fi_eq.h>
48 #include <rdma/fi_errno.h>
49 #include <rdma/fi_rma.h>
50 #include <rdma/fi_tagged.h>
51 #include <rdma/fi_trigger.h>
52
53 #include <ofi.h>
54 #include <ofi_iov.h>
55 #include <ofi_proto.h>
56 #include <ofi_enosys.h>
57 #include <ofi_rbuf.h>
58 #include <ofi_list.h>
59 #include <ofi_util.h>
60 #include <ofi_tree.h>
61 #include <uthash.h>
62 #include <ofi_recvwin.h>
63 #include <ofi_perf.h>
64
65 #include "rxr_pkt_entry.h"
66 #include "rxr_pkt_type.h"
67
68 #define RXR_MAJOR_VERSION (2)
69 #define RXR_MINOR_VERSION (0)
70
71 /*
72 * EFA support interoperability between protocol version 4 and above,
73 * and version 4 is considered the base version.
74 */
75 #define RXR_BASE_PROTOCOL_VERSION (4)
76 #define RXR_CUR_PROTOCOL_VERSION (4)
77 #define RXR_NUM_PROTOCOL_VERSION (RXR_CUR_PROTOCOL_VERSION - RXR_BASE_PROTOCOL_VERSION + 1)
78 #define RXR_MAX_PROTOCOL_VERSION (100)
79
80 #define RXR_FI_VERSION OFI_VERSION_LATEST
81
82 #define RXR_IOV_LIMIT (4)
83
84 #ifdef ENABLE_EFA_POISONING
85 extern const uint32_t rxr_poison_value;
rxr_poison_mem_region(uint32_t * ptr,size_t size)86 static inline void rxr_poison_mem_region(uint32_t *ptr, size_t size)
87 {
88 int i;
89
90 for (i = 0; i < size / sizeof(rxr_poison_value); i++)
91 memcpy(ptr + i, &rxr_poison_value, sizeof(rxr_poison_value));
92 }
93 #endif
94
95 /*
96 * Set alignment to x86 cache line size.
97 */
98 #define RXR_BUF_POOL_ALIGNMENT (64)
99
100 /*
101 * will add following parameters to env variable for tuning
102 */
103 #define RXR_RECVWIN_SIZE (16384)
104 #define RXR_DEF_CQ_SIZE (8192)
105 #define RXR_REMOTE_CQ_DATA_LEN (8)
106
107 /* maximum timeout for RNR backoff (microseconds) */
108 #define RXR_DEF_RNR_MAX_TIMEOUT (1000000)
109 /* bounds for random RNR backoff timeout */
110 #define RXR_RAND_MIN_TIMEOUT (40)
111 #define RXR_RAND_MAX_TIMEOUT (120)
112
113 /* bounds for flow control */
114 #define RXR_DEF_MAX_RX_WINDOW (128)
115 #define RXR_DEF_MAX_TX_CREDITS (64)
116 #define RXR_DEF_MIN_TX_CREDITS (32)
117
118 /*
119 * maximum time (microseconds) we will allow available_bufs for large msgs to
120 * be exhausted
121 */
122 #define RXR_AVAILABLE_DATA_BUFS_TIMEOUT (5000000)
123
124 #if ENABLE_DEBUG
125 #define RXR_TX_PKT_DBG_SIZE (16384)
126 #define RXR_RX_PKT_DBG_SIZE (16384)
127 #endif
128
129 /*
130 * Based on size of tx_id and rx_id in headers, can be arbitrary once those are
131 * removed.
132 */
133 #define RXR_MAX_RX_QUEUE_SIZE (UINT32_MAX)
134 #define RXR_MAX_TX_QUEUE_SIZE (UINT32_MAX)
135
136 /*
137 * The maximum supported source address length in bytes
138 */
139 #define RXR_MAX_NAME_LENGTH (32)
140
141 /*
142 * RxR specific flags that are sent over the wire.
143 */
144 #define RXR_TAGGED BIT_ULL(0)
145 #define RXR_REMOTE_CQ_DATA BIT_ULL(1)
146 #define RXR_REMOTE_SRC_ADDR BIT_ULL(2)
147
148 /*
149 * TODO: In future we will send RECV_CANCEL signal to sender,
150 * to stop transmitting large message, this flag is also
151 * used for fi_discard which has similar behavior.
152 */
153 #define RXR_RECV_CANCEL BIT_ULL(3)
154
155 /*
156 * Flags to tell if the rx_entry is tracking FI_MULTI_RECV buffers
157 */
158 #define RXR_MULTI_RECV_POSTED BIT_ULL(4)
159 #define RXR_MULTI_RECV_CONSUMER BIT_ULL(5)
160
161 /*
162 * OFI flags
163 * The 64-bit flag field is used as follows:
164 * 1-grow up common (usable with multiple operations)
165 * 59-grow down operation specific (used for single call/class)
166 * 60 - 63 provider specific
167 */
168 #define RXR_NO_COMPLETION BIT_ULL(60)
169
170 /*
171 * RM flags
172 */
173 #define RXR_RM_TX_CQ_FULL BIT_ULL(0)
174 #define RXR_RM_RX_CQ_FULL BIT_ULL(1)
175
176 #define RXR_MTU_MAX_LIMIT BIT_ULL(15)
177
178
179
180 extern struct fi_info *shm_info;
181
182 extern struct fi_provider *lower_efa_prov;
183 extern struct fi_provider rxr_prov;
184 extern struct fi_info rxr_info;
185 extern struct rxr_env rxr_env;
186 extern struct fi_fabric_attr rxr_fabric_attr;
187 extern struct util_prov rxr_util_prov;
188 extern struct efa_ep_addr *local_efa_addr;
189
190 struct rxr_env {
191 int rx_window_size;
192 int tx_min_credits;
193 int tx_max_credits;
194 int tx_queue_size;
195 int use_device_rdma;
196 int enable_shm_transfer;
197 int shm_av_size;
198 int shm_max_medium_size;
199 int recvwin_size;
200 int cq_size;
201 size_t max_memcpy_size;
202 size_t mtu_size;
203 size_t tx_size;
204 size_t rx_size;
205 size_t tx_iov_limit;
206 size_t rx_iov_limit;
207 int rx_copy_unexp;
208 int rx_copy_ooo;
209 int max_timeout;
210 int timeout_interval;
211 size_t efa_cq_read_size;
212 size_t shm_cq_read_size;
213 size_t efa_max_medium_msg_size;
214 size_t efa_min_read_write_size;
215 size_t efa_read_segment_size;
216 };
217
218 enum rxr_lower_ep_type {
219 EFA_EP = 1,
220 SHM_EP,
221 };
222
223 enum rxr_x_entry_type {
224 RXR_TX_ENTRY = 1,
225 RXR_RX_ENTRY,
226 };
227
228 enum rxr_tx_comm_type {
229 RXR_TX_FREE = 0, /* tx_entry free state */
230 RXR_TX_REQ, /* tx_entry sending REQ packet */
231 RXR_TX_SEND, /* tx_entry sending data in progress */
232 RXR_TX_QUEUED_SHM_RMA, /* tx_entry was unable to send RMA operations over shm provider */
233 RXR_TX_QUEUED_CTRL, /* tx_entry was unable to send ctrl packet */
234 RXR_TX_QUEUED_REQ_RNR, /* tx_entry RNR sending REQ packet */
235 RXR_TX_QUEUED_DATA_RNR, /* tx_entry RNR sending data packets */
236 RXR_TX_SENT_READRSP, /* tx_entry (on remote EP) sent
237 * read response (FI_READ only)
238 */
239 RXR_TX_QUEUED_READRSP, /* tx_entry (on remote EP) was
240 * unable to send read response
241 * (FI_READ only)
242 */
243 RXR_TX_WAIT_READ_FINISH, /* tx_entry (on initiating EP) wait
244 * for rx_entry to finish receiving
245 * (FI_READ only)
246 */
247 };
248
249 enum rxr_rx_comm_type {
250 RXR_RX_FREE = 0, /* rx_entry free state */
251 RXR_RX_INIT, /* rx_entry ready to recv RTM */
252 RXR_RX_UNEXP, /* rx_entry unexp msg waiting for post recv */
253 RXR_RX_MATCHED, /* rx_entry matched with RTM */
254 RXR_RX_RECV, /* rx_entry large msg recv data pkts */
255 RXR_RX_QUEUED_CTRL, /* rx_entry was unable to send ctrl packet */
256 RXR_RX_QUEUED_EOR, /* rx_entry was unable to send EOR over shm */
257 RXR_RX_QUEUED_CTS_RNR, /* rx_entry RNR sending CTS */
258 RXR_RX_WAIT_READ_FINISH, /* rx_entry wait for send to finish, FI_READ */
259 RXR_RX_WAIT_ATOMRSP_SENT, /* rx_entry wait for atomrsp packet sent completion */
260 };
261
262 #define RXR_PEER_REQ_SENT BIT_ULL(0) /* sent a REQ to the peer, peer should send a handshake back */
263 #define RXR_PEER_HANDSHAKE_SENT BIT_ULL(1)
264 #define RXR_PEER_HANDSHAKE_RECEIVED BIT_ULL(2)
265 #define RXR_PEER_IN_BACKOFF BIT_ULL(3) /* peer is in backoff, not allowed to send */
266 #define RXR_PEER_BACKED_OFF BIT_ULL(4) /* peer backoff was increased during this loop of the progress engine */
267
268 struct rxr_fabric {
269 struct util_fabric util_fabric;
270 struct fid_fabric *lower_fabric;
271 struct fid_fabric *shm_fabric;
272 #ifdef RXR_PERF_ENABLED
273 struct ofi_perfset perf_set;
274 #endif
275 };
276
277 #define RXR_MAX_NUM_PROTOCOLS (RXR_MAX_PROTOCOL_VERSION - RXR_BASE_PROTOCOL_VERSION + 1)
278
279 struct rxr_peer {
280 bool tx_init; /* tracks initialization of tx state */
281 bool rx_init; /* tracks initialization of rx state */
282 bool is_self; /* self flag */
283 bool is_local; /* local/remote peer flag */
284 fi_addr_t shm_fiaddr; /* fi_addr_t addr from shm provider */
285 struct rxr_robuf *robuf; /* tracks expected msg_id on rx */
286 uint32_t next_msg_id; /* sender's view of msg_id */
287 uint32_t flags;
288 uint32_t maxproto; /* maximum supported protocol version by this peer */
289 uint64_t features[RXR_MAX_NUM_PROTOCOLS]; /* the feature flag for each version */
290 size_t tx_pending; /* tracks pending tx ops to this peer */
291 uint16_t tx_credits; /* available send credits */
292 uint16_t rx_credits; /* available credits to allocate */
293 uint64_t rnr_ts; /* timestamp for RNR backoff tracking */
294 int rnr_queued_pkt_cnt; /* queued RNR packet count */
295 int timeout_interval; /* initial RNR timeout value */
296 int rnr_timeout_exp; /* RNR timeout exponentation calc val */
297 struct dlist_entry rnr_entry; /* linked to rxr_ep peer_backoff_list */
298 struct dlist_entry entry; /* linked to rxr_ep peer_list */
299 };
300
301 struct rxr_queued_ctrl_info {
302 int type;
303 int inject;
304 };
305
306 struct rxr_atomic_hdr {
307 /* atomic_op is different from tx_op */
308 uint32_t atomic_op;
309 uint32_t datatype;
310 };
311
312 /* extra information that is not included in fi_msg_atomic
313 * used by fetch atomic and compare atomic.
314 * resp stands for response
315 * comp stands for compare
316 */
317 struct rxr_atomic_ex {
318 struct iovec resp_iov[RXR_IOV_LIMIT];
319 int resp_iov_count;
320 struct iovec comp_iov[RXR_IOV_LIMIT];
321 int comp_iov_count;
322 };
323
324 struct rxr_rx_entry {
325 /* Must remain at the top */
326 enum rxr_x_entry_type type;
327
328 fi_addr_t addr;
329
330 /*
331 * freestack ids used to lookup rx_entry during pkt recv
332 */
333 uint32_t tx_id;
334 uint32_t rx_id;
335 uint32_t op;
336
337 /*
338 * The following two varibales are for emulated RMA fi_read only
339 */
340 uint32_t rma_loc_tx_id;
341 uint32_t rma_initiator_rx_id;
342
343 struct rxr_atomic_hdr atomic_hdr;
344
345 uint32_t msg_id;
346
347 uint64_t tag;
348 uint64_t ignore;
349
350 uint64_t bytes_done;
351 int64_t window;
352 uint16_t credit_request;
353 int credit_cts;
354
355 uint64_t total_len;
356
357 enum rxr_rx_comm_type state;
358 struct rxr_queued_ctrl_info queued_ctrl;
359
360 uint64_t fi_flags;
361 uint16_t rxr_flags;
362
363 size_t iov_count;
364 struct iovec iov[RXR_IOV_LIMIT];
365
366 /* App-provided reg descriptor */
367 void *desc[RXR_IOV_LIMIT];
368
369 /* iov_count on sender side, used for large message READ over shm */
370 size_t rma_iov_count;
371 struct fi_rma_iov rma_iov[RXR_IOV_LIMIT];
372
373 struct fi_cq_tagged_entry cq_entry;
374
375 /* entry is linked with rx entry lists in rxr_ep */
376 struct dlist_entry entry;
377
378 /* queued_entry is linked with rx_queued_ctrl_list in rxr_ep */
379 struct dlist_entry queued_entry;
380
381 /* Queued packets due to TX queue full or RNR backoff */
382 struct dlist_entry queued_pkts;
383
384 /*
385 * A list of rx_entries tracking FI_MULTI_RECV buffers. An rx_entry of
386 * type RXR_MULTI_RECV_POSTED that was created when the multi-recv
387 * buffer was posted is the list head, and the rx_entries of type
388 * RXR_MULTI_RECV_CONSUMER get added to the list as they consume the
389 * buffer.
390 */
391 struct dlist_entry multi_recv_consumers;
392 struct dlist_entry multi_recv_entry;
393 struct rxr_rx_entry *master_entry;
394
395 struct rxr_pkt_entry *unexp_pkt;
396 struct rxr_pkt_entry *atomrsp_pkt;
397 char *atomrsp_buf;
398
399 #if ENABLE_DEBUG
400 /* linked with rx_pending_list in rxr_ep */
401 struct dlist_entry rx_pending_entry;
402 /* linked with rx_entry_list in rxr_ep */
403 struct dlist_entry rx_entry_entry;
404 #endif
405 };
406
407 struct rxr_tx_entry {
408 /* Must remain at the top */
409 enum rxr_x_entry_type type;
410
411 uint32_t op;
412 fi_addr_t addr;
413
414 /*
415 * freestack ids used to lookup tx_entry during ctrl pkt recv
416 */
417 uint32_t tx_id;
418 uint32_t rx_id;
419
420 uint32_t msg_id;
421
422 uint64_t tag;
423
424 uint64_t bytes_acked;
425 uint64_t bytes_sent;
426 int64_t window;
427 uint16_t credit_request;
428 uint16_t credit_allocated;
429
430 uint64_t total_len;
431
432 enum rxr_tx_comm_type state;
433 struct rxr_queued_ctrl_info queued_ctrl;
434
435 uint64_t fi_flags;
436 uint64_t send_flags;
437 size_t iov_count;
438 size_t iov_index;
439 size_t iov_offset;
440 struct iovec iov[RXR_IOV_LIMIT];
441
442 uint64_t rma_loc_rx_id;
443 uint64_t rma_window;
444 size_t rma_iov_count;
445 struct fi_rma_iov rma_iov[RXR_IOV_LIMIT];
446
447 /* App-provided reg descriptor */
448 void *desc[RXR_IOV_LIMIT];
449
450 /* atomic related variables */
451 struct rxr_atomic_hdr atomic_hdr;
452 struct rxr_atomic_ex atomic_ex;
453
454 /* Only used with mr threshold switch from memcpy */
455 size_t iov_mr_start;
456 struct fid_mr *mr[RXR_IOV_LIMIT];
457
458 struct fi_cq_tagged_entry cq_entry;
459
460 /* entry is linked with tx_pending_list in rxr_ep */
461 struct dlist_entry entry;
462
463 /* queued_entry is linked with tx_queued_ctrl_list in rxr_ep */
464 struct dlist_entry queued_entry;
465
466 /* Queued packets due to TX queue full or RNR backoff */
467 struct dlist_entry queued_pkts;
468
469 #if ENABLE_DEBUG
470 /* linked with tx_entry_list in rxr_ep */
471 struct dlist_entry tx_entry_entry;
472 #endif
473 };
474
475 #define RXR_GET_X_ENTRY_TYPE(pkt_entry) \
476 (*((enum rxr_x_entry_type *) \
477 ((unsigned char *)((pkt_entry)->x_entry))))
478
479 enum efa_domain_type {
480 EFA_DOMAIN_DGRAM = 0,
481 EFA_DOMAIN_RDM,
482 };
483
484 struct rxr_domain {
485 struct util_domain util_domain;
486 enum efa_domain_type type;
487 struct fid_domain *rdm_domain;
488 size_t mtu_size;
489 size_t addrlen;
490 uint8_t mr_local;
491 uint64_t rdm_mode;
492 int do_progress;
493 size_t cq_size;
494 enum fi_resource_mgmt resource_mgmt;
495 };
496
497 struct rxr_ep {
498 struct util_ep util_ep;
499
500 uint8_t core_addr[RXR_MAX_NAME_LENGTH];
501 size_t core_addrlen;
502
503 /* per-version feature flag */
504 uint64_t features[RXR_NUM_PROTOCOL_VERSION];
505
506 /* per-peer information */
507 struct rxr_peer *peer;
508
509 /* free stack for reorder buffer */
510 struct rxr_robuf_fs *robuf_fs;
511
512 /* core provider fid */
513 struct fid_ep *rdm_ep;
514 struct fid_cq *rdm_cq;
515
516 /* shm provider fid */
517 bool use_shm;
518 struct fid_ep *shm_ep;
519 struct fid_cq *shm_cq;
520
521 /*
522 * RxR rx/tx queue sizes. These may be different from the core
523 * provider's rx/tx size and will either limit the number of possible
524 * receives/sends or allow queueing.
525 */
526 size_t rx_size;
527 size_t tx_size;
528 size_t mtu_size;
529 size_t rx_iov_limit;
530 size_t tx_iov_limit;
531
532 /* core's capabilities */
533 uint64_t core_caps;
534
535 /* rx/tx queue size of core provider */
536 size_t core_rx_size;
537 size_t max_outstanding_tx;
538 size_t core_inject_size;
539 size_t max_data_payload_size;
540
541 /* Resource management flag */
542 uint64_t rm_full;
543
544 /* application's ordering requirements */
545 uint64_t msg_order;
546 /* core's supported tx/rx msg_order */
547 uint64_t core_msg_order;
548
549 /* tx iov limit of core provider */
550 size_t core_iov_limit;
551
552 /* threshold to release multi_recv buffer */
553 size_t min_multi_recv_size;
554
555 /* buffer pool for send & recv */
556 struct ofi_bufpool *tx_pkt_efa_pool;
557 struct ofi_bufpool *rx_pkt_efa_pool;
558
559 /*
560 * buffer pool for send & recv for shm as mtu size is different from
561 * the one of efa, and do not require local memory registration
562 */
563 struct ofi_bufpool *tx_pkt_shm_pool;
564 struct ofi_bufpool *rx_pkt_shm_pool;
565
566 /* staging area for unexpected and out-of-order packets */
567 struct ofi_bufpool *rx_unexp_pkt_pool;
568 struct ofi_bufpool *rx_ooo_pkt_pool;
569
570 #ifdef ENABLE_EFA_POISONING
571 size_t tx_pkt_pool_entry_sz;
572 size_t rx_pkt_pool_entry_sz;
573 #endif
574
575 /* datastructure to maintain rxr send/recv states */
576 struct ofi_bufpool *tx_entry_pool;
577 struct ofi_bufpool *rx_entry_pool;
578 /* datastructure to maintain read response */
579 struct ofi_bufpool *readrsp_tx_entry_pool;
580 /* data structure to maintain read */
581 struct ofi_bufpool *read_entry_pool;
582 /* data structure to maintain pkt rx map */
583 struct ofi_bufpool *map_entry_pool;
584 /* rxr medium message pkt_entry to rx_entry map */
585 struct rxr_pkt_rx_map *pkt_rx_map;
586 /* rx_entries with recv buf */
587 struct dlist_entry rx_list;
588 /* rx_entries without recv buf (unexpected message) */
589 struct dlist_entry rx_unexp_list;
590 /* rx_entries with tagged recv buf */
591 struct dlist_entry rx_tagged_list;
592 /* rx_entries without tagged recv buf (unexpected message) */
593 struct dlist_entry rx_unexp_tagged_list;
594 /* list of pre-posted recv buffers */
595 struct dlist_entry rx_posted_buf_list;
596 /* list of pre-posted recv buffers for shm */
597 struct dlist_entry rx_posted_buf_shm_list;
598 /* tx entries with queued messages */
599 struct dlist_entry tx_entry_queued_list;
600 /* rx entries with queued messages */
601 struct dlist_entry rx_entry_queued_list;
602 /* tx_entries with data to be sent (large messages) */
603 struct dlist_entry tx_pending_list;
604 /* read entries with data to be read */
605 struct dlist_entry read_pending_list;
606 /* rxr_peer entries that are in backoff due to RNR */
607 struct dlist_entry peer_backoff_list;
608 /* rxr_peer entries with an allocated robuf */
609 struct dlist_entry peer_list;
610
611 #if ENABLE_DEBUG
612 /* rx_entries waiting for data to arrive (large messages) */
613 struct dlist_entry rx_pending_list;
614 /* count of rx_pending_list */
615 size_t rx_pending;
616
617 /* rx packets being processed or waiting to be processed */
618 struct dlist_entry rx_pkt_list;
619
620 /* tx packets waiting for send completion */
621 struct dlist_entry tx_pkt_list;
622
623 /* track allocated rx_entries and tx_entries for endpoint cleanup */
624 struct dlist_entry rx_entry_list;
625 struct dlist_entry tx_entry_list;
626
627 size_t sends;
628 size_t send_comps;
629 size_t failed_send_comps;
630 size_t recv_comps;
631 #endif
632 /* number of posted buffer for shm */
633 size_t posted_bufs_shm;
634 size_t rx_bufs_shm_to_post;
635
636 /* number of posted buffers */
637 size_t posted_bufs_efa;
638 size_t rx_bufs_efa_to_post;
639 /* number of buffers available for large messages */
640 size_t available_data_bufs;
641 /* Timestamp of when available_data_bufs was exhausted. */
642 uint64_t available_data_bufs_ts;
643
644 /* number of outstanding sends */
645 size_t tx_pending;
646 };
647
648 #define rxr_rx_flags(rxr_ep) ((rxr_ep)->util_ep.rx_op_flags)
649 #define rxr_tx_flags(rxr_ep) ((rxr_ep)->util_ep.tx_op_flags)
650
651 /*
652 * Control header with completion data. CQ data length is static.
653 */
654 #define RXR_CQ_DATA_SIZE (8)
655
rxr_copy_shm_cq_entry(struct fi_cq_tagged_entry * cq_tagged_entry,struct fi_cq_data_entry * shm_cq_entry)656 static inline void rxr_copy_shm_cq_entry(struct fi_cq_tagged_entry *cq_tagged_entry,
657 struct fi_cq_data_entry *shm_cq_entry)
658 {
659 cq_tagged_entry->op_context = shm_cq_entry->op_context;
660 cq_tagged_entry->flags = shm_cq_entry->flags;
661 cq_tagged_entry->len = shm_cq_entry->len;
662 cq_tagged_entry->buf = shm_cq_entry->buf;
663 cq_tagged_entry->data = shm_cq_entry->data;
664 cq_tagged_entry->tag = 0; // No tag for RMA;
665
666 }
rxr_ep_get_peer(struct rxr_ep * ep,fi_addr_t addr)667 static inline struct rxr_peer *rxr_ep_get_peer(struct rxr_ep *ep,
668 fi_addr_t addr)
669 {
670 return &ep->peer[addr];
671 }
672
rxr_ep_peer_init_rx(struct rxr_ep * ep,struct rxr_peer * peer)673 static inline void rxr_ep_peer_init_rx(struct rxr_ep *ep, struct rxr_peer *peer)
674 {
675 assert(!peer->rx_init);
676
677 peer->robuf = freestack_pop(ep->robuf_fs);
678 peer->robuf = ofi_recvwin_buf_alloc(peer->robuf,
679 rxr_env.recvwin_size);
680 assert(peer->robuf);
681 dlist_insert_tail(&peer->entry, &ep->peer_list);
682 peer->rx_credits = rxr_env.rx_window_size;
683 peer->rx_init = 1;
684 }
685
rxr_ep_peer_init_tx(struct rxr_peer * peer)686 static inline void rxr_ep_peer_init_tx(struct rxr_peer *peer)
687 {
688 assert(!peer->tx_init);
689 peer->tx_credits = rxr_env.tx_max_credits;
690 peer->tx_init = 1;
691 }
692
693 struct rxr_rx_entry *rxr_ep_get_rx_entry(struct rxr_ep *ep,
694 const struct fi_msg *msg,
695 uint64_t tag,
696 uint64_t ignore,
697 uint32_t op,
698 uint64_t flags);
699
700 struct rxr_rx_entry *rxr_ep_rx_entry_init(struct rxr_ep *ep,
701 struct rxr_rx_entry *rx_entry,
702 const struct fi_msg *msg,
703 uint64_t tag,
704 uint64_t ignore,
705 uint32_t op,
706 uint64_t flags);
707
708 void rxr_tx_entry_init(struct rxr_ep *rxr_ep, struct rxr_tx_entry *tx_entry,
709 const struct fi_msg *msg, uint32_t op, uint64_t flags);
710
711 struct rxr_tx_entry *rxr_ep_alloc_tx_entry(struct rxr_ep *rxr_ep,
712 const struct fi_msg *msg,
713 uint32_t op,
714 uint64_t tag,
715 uint64_t flags);
716
717 int rxr_tx_entry_mr_dereg(struct rxr_tx_entry *tx_entry);
718
rxr_release_tx_entry(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)719 static inline void rxr_release_tx_entry(struct rxr_ep *ep,
720 struct rxr_tx_entry *tx_entry)
721 {
722 #if ENABLE_DEBUG
723 dlist_remove(&tx_entry->tx_entry_entry);
724 #endif
725 assert(dlist_empty(&tx_entry->queued_pkts));
726 #ifdef ENABLE_EFA_POISONING
727 rxr_poison_mem_region((uint32_t *)tx_entry,
728 sizeof(struct rxr_tx_entry));
729 #endif
730 tx_entry->state = RXR_TX_FREE;
731 ofi_buf_free(tx_entry);
732 }
733
rxr_release_rx_entry(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)734 static inline void rxr_release_rx_entry(struct rxr_ep *ep,
735 struct rxr_rx_entry *rx_entry)
736 {
737 #if ENABLE_DEBUG
738 dlist_remove(&rx_entry->rx_entry_entry);
739 #endif
740 assert(dlist_empty(&rx_entry->queued_pkts));
741 #ifdef ENABLE_EFA_POISONING
742 rxr_poison_mem_region((uint32_t *)rx_entry,
743 sizeof(struct rxr_rx_entry));
744 #endif
745 rx_entry->state = RXR_RX_FREE;
746 ofi_buf_free(rx_entry);
747 }
748
rxr_match_addr(fi_addr_t addr,fi_addr_t match_addr)749 static inline int rxr_match_addr(fi_addr_t addr, fi_addr_t match_addr)
750 {
751 return (addr == FI_ADDR_UNSPEC || addr == match_addr);
752 }
753
rxr_match_tag(uint64_t tag,uint64_t ignore,uint64_t match_tag)754 static inline int rxr_match_tag(uint64_t tag, uint64_t ignore,
755 uint64_t match_tag)
756 {
757 return ((tag | ignore) == (match_tag | ignore));
758 }
759
rxr_ep_inc_tx_pending(struct rxr_ep * ep,struct rxr_peer * peer)760 static inline void rxr_ep_inc_tx_pending(struct rxr_ep *ep,
761 struct rxr_peer *peer)
762 {
763 ep->tx_pending++;
764 peer->tx_pending++;
765 #if ENABLE_DEBUG
766 ep->sends++;
767 #endif
768 }
769
rxr_ep_dec_tx_pending(struct rxr_ep * ep,struct rxr_peer * peer,int failed)770 static inline void rxr_ep_dec_tx_pending(struct rxr_ep *ep,
771 struct rxr_peer *peer,
772 int failed)
773 {
774 ep->tx_pending--;
775 peer->tx_pending--;
776 #if ENABLE_DEBUG
777 if (failed)
778 ep->failed_send_comps++;
779 #endif
780 }
781
rxr_get_rx_pool_chunk_cnt(struct rxr_ep * ep)782 static inline size_t rxr_get_rx_pool_chunk_cnt(struct rxr_ep *ep)
783 {
784 return MIN(ep->core_rx_size, ep->rx_size);
785 }
786
rxr_get_tx_pool_chunk_cnt(struct rxr_ep * ep)787 static inline size_t rxr_get_tx_pool_chunk_cnt(struct rxr_ep *ep)
788 {
789 return MIN(ep->max_outstanding_tx, ep->tx_size);
790 }
791
rxr_need_sas_ordering(struct rxr_ep * ep)792 static inline int rxr_need_sas_ordering(struct rxr_ep *ep)
793 {
794 return ep->msg_order & FI_ORDER_SAS;
795 }
796
797 /* Initialization functions */
798 void rxr_reset_rx_tx_to_core(const struct fi_info *user_info,
799 struct fi_info *core_info);
800 int rxr_get_lower_rdm_info(uint32_t version, const char *node, const char *service,
801 uint64_t flags, const struct util_prov *util_prov,
802 const struct fi_info *util_hints,
803 struct fi_info **core_info);
804 int rxr_fabric(struct fi_fabric_attr *attr,
805 struct fid_fabric **fabric, void *context);
806 int rxr_domain_open(struct fid_fabric *fabric, struct fi_info *info,
807 struct fid_domain **dom, void *context);
808 int rxr_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
809 struct fid_cq **cq_fid, void *context);
810 int rxr_endpoint(struct fid_domain *domain, struct fi_info *info,
811 struct fid_ep **ep, void *context);
812
813 /* EP sub-functions */
814 void rxr_ep_progress(struct util_ep *util_ep);
815 void rxr_ep_progress_internal(struct rxr_ep *rxr_ep);
816 int rxr_ep_post_buf(struct rxr_ep *ep, uint64_t flags, enum rxr_lower_ep_type lower_ep);
817
818 int rxr_ep_set_tx_credit_request(struct rxr_ep *rxr_ep,
819 struct rxr_tx_entry *tx_entry);
820
821 int rxr_ep_tx_init_mr_desc(struct rxr_domain *rxr_domain,
822 struct rxr_tx_entry *tx_entry,
823 int mr_iov_start, uint64_t access);
824
825 void rxr_prepare_desc_send(struct rxr_domain *rxr_domain,
826 struct rxr_tx_entry *tx_entry);
827
828 struct rxr_rx_entry *rxr_ep_lookup_mediumrtm_rx_entry(struct rxr_ep *ep,
829 struct rxr_pkt_entry *pkt_entry);
830
831 void rxr_ep_record_mediumrtm_rx_entry(struct rxr_ep *ep,
832 struct rxr_pkt_entry *pkt_entry,
833 struct rxr_rx_entry *rx_entry);
834
835 struct rxr_rx_entry *rxr_ep_alloc_unexp_rx_entry_for_msgrtm(struct rxr_ep *ep,
836 struct rxr_pkt_entry **pkt_entry);
837
838 struct rxr_rx_entry *rxr_ep_alloc_unexp_rx_entry_for_tagrtm(struct rxr_ep *ep,
839 struct rxr_pkt_entry **pkt_entry);
840
841 struct rxr_rx_entry *rxr_ep_split_rx_entry(struct rxr_ep *ep,
842 struct rxr_rx_entry *posted_entry,
843 struct rxr_rx_entry *consumer_entry,
844 struct rxr_pkt_entry *pkt_entry);
845 int rxr_ep_efa_addr_to_str(const void *addr, char *temp_name);
846
847 /* CQ sub-functions */
848 int rxr_cq_handle_rx_error(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry,
849 ssize_t prov_errno);
850 int rxr_cq_handle_tx_error(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry,
851 ssize_t prov_errno);
852 int rxr_cq_handle_cq_error(struct rxr_ep *ep, ssize_t err);
853
854 void rxr_cq_write_rx_completion(struct rxr_ep *ep,
855 struct rxr_rx_entry *rx_entry);
856
857 void rxr_cq_handle_rx_completion(struct rxr_ep *ep,
858 struct rxr_pkt_entry *pkt_entry,
859 struct rxr_rx_entry *rx_entry);
860
861 void rxr_cq_write_tx_completion(struct rxr_ep *ep,
862 struct rxr_tx_entry *tx_entry);
863
864 void rxr_cq_handle_tx_completion(struct rxr_ep *ep,
865 struct rxr_tx_entry *tx_entry);
866
867 void rxr_cq_handle_shm_completion(struct rxr_ep *ep,
868 struct fi_cq_data_entry *cq_entry,
869 fi_addr_t src_addr);
870
871 int rxr_cq_reorder_msg(struct rxr_ep *ep,
872 struct rxr_peer *peer,
873 struct rxr_pkt_entry *pkt_entry);
874
875 void rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep *ep,
876 struct rxr_peer *peer);
877
878 void rxr_cq_handle_shm_rma_write_data(struct rxr_ep *ep,
879 struct fi_cq_data_entry *shm_comp,
880 fi_addr_t src_addr);
881
882 /* Aborts if unable to write to the eq */
efa_eq_write_error(struct util_ep * ep,ssize_t err,ssize_t prov_errno)883 static inline void efa_eq_write_error(struct util_ep *ep, ssize_t err,
884 ssize_t prov_errno)
885 {
886 struct fi_eq_err_entry err_entry;
887 int ret = -FI_ENOEQ;
888
889 FI_WARN(&rxr_prov, FI_LOG_EQ, "Writing error %s to EQ.\n",
890 fi_strerror(err));
891 if (ep->eq) {
892 memset(&err_entry, 0, sizeof(err_entry));
893 err_entry.err = err;
894 err_entry.prov_errno = prov_errno;
895 ret = fi_eq_write(&ep->eq->eq_fid, FI_NOTIFY,
896 &err_entry, sizeof(err_entry),
897 UTIL_FLAG_ERROR);
898
899 if (ret == sizeof(err_entry))
900 return;
901 }
902
903 FI_WARN(&rxr_prov, FI_LOG_EQ,
904 "Unable to write to EQ: %s. err: %s (%zd) prov_errno: %s (%zd)\n",
905 fi_strerror(-ret), fi_strerror(err), err,
906 fi_strerror(prov_errno), prov_errno);
907 fprintf(stderr,
908 "Unable to write to EQ: %s. err: %s (%zd) prov_errno: %s (%zd) %s:%d\n",
909 fi_strerror(-ret), fi_strerror(err), err,
910 fi_strerror(prov_errno), prov_errno, __FILE__, __LINE__);
911 abort();
912 }
913
rxr_ep_domain(struct rxr_ep * ep)914 static inline struct rxr_domain *rxr_ep_domain(struct rxr_ep *ep)
915 {
916 return container_of(ep->util_ep.domain, struct rxr_domain, util_domain);
917 }
918
rxr_ep_mr_local(struct rxr_ep * ep)919 static inline uint8_t rxr_ep_mr_local(struct rxr_ep *ep)
920 {
921 struct rxr_domain *domain = container_of(ep->util_ep.domain,
922 struct rxr_domain,
923 util_domain);
924 return domain->mr_local;
925 }
926
927 /*
928 * today we have only cq res check, in future we will have ctx, and other
929 * resource check as well.
930 */
is_tx_res_full(struct rxr_ep * ep)931 static inline uint64_t is_tx_res_full(struct rxr_ep *ep)
932 {
933 return ep->rm_full & RXR_RM_TX_CQ_FULL;
934 }
935
is_rx_res_full(struct rxr_ep * ep)936 static inline uint64_t is_rx_res_full(struct rxr_ep *ep)
937 {
938 return ep->rm_full & RXR_RM_RX_CQ_FULL;
939 }
940
rxr_rm_rx_cq_check(struct rxr_ep * ep,struct util_cq * rx_cq)941 static inline void rxr_rm_rx_cq_check(struct rxr_ep *ep, struct util_cq *rx_cq)
942 {
943 fastlock_acquire(&rx_cq->cq_lock);
944 if (ofi_cirque_isfull(rx_cq->cirq))
945 ep->rm_full |= RXR_RM_RX_CQ_FULL;
946 else
947 ep->rm_full &= ~RXR_RM_RX_CQ_FULL;
948 fastlock_release(&rx_cq->cq_lock);
949 }
950
rxr_rm_tx_cq_check(struct rxr_ep * ep,struct util_cq * tx_cq)951 static inline void rxr_rm_tx_cq_check(struct rxr_ep *ep, struct util_cq *tx_cq)
952 {
953 fastlock_acquire(&tx_cq->cq_lock);
954 if (ofi_cirque_isfull(tx_cq->cirq))
955 ep->rm_full |= RXR_RM_TX_CQ_FULL;
956 else
957 ep->rm_full &= ~RXR_RM_TX_CQ_FULL;
958 fastlock_release(&tx_cq->cq_lock);
959 }
960
rxr_peer_timeout_expired(struct rxr_ep * ep,struct rxr_peer * peer,uint64_t ts)961 static inline bool rxr_peer_timeout_expired(struct rxr_ep *ep,
962 struct rxr_peer *peer,
963 uint64_t ts)
964 {
965 return (ts >= (peer->rnr_ts + MIN(rxr_env.max_timeout,
966 peer->timeout_interval *
967 (1 << peer->rnr_timeout_exp))));
968 }
969
970 /* Performance counter declarations */
971 #ifdef RXR_PERF_ENABLED
972 #define RXR_PERF_FOREACH(DECL) \
973 DECL(perf_rxr_tx), \
974 DECL(perf_rxr_recv), \
975 DECL(rxr_perf_size) \
976
977 enum rxr_perf_counters {
978 RXR_PERF_FOREACH(OFI_ENUM_VAL)
979 };
980
981 extern const char *rxr_perf_counters_str[];
982
rxr_perfset_start(struct rxr_ep * ep,size_t index)983 static inline void rxr_perfset_start(struct rxr_ep *ep, size_t index)
984 {
985 struct rxr_domain *domain = rxr_ep_domain(ep);
986 struct rxr_fabric *fabric = container_of(domain->util_domain.fabric,
987 struct rxr_fabric,
988 util_fabric);
989 ofi_perfset_start(&fabric->perf_set, index);
990 }
991
rxr_perfset_end(struct rxr_ep * ep,size_t index)992 static inline void rxr_perfset_end(struct rxr_ep *ep, size_t index)
993 {
994 struct rxr_domain *domain = rxr_ep_domain(ep);
995 struct rxr_fabric *fabric = container_of(domain->util_domain.fabric,
996 struct rxr_fabric,
997 util_fabric);
998 ofi_perfset_end(&fabric->perf_set, index);
999 }
1000 #else
1001 #define rxr_perfset_start(ep, index) do {} while (0)
1002 #define rxr_perfset_end(ep, index) do {} while (0)
1003 #endif
1004 #endif
1005