1 /*
2  * Copyright (c) 2019 Amazon.com, Inc. or its affiliates.
3  * All rights reserved.
4  *
5  * This software is available to you under a choice of one of two
6  * licenses.  You may choose to be licensed under the terms of the GNU
7  * General Public License (GPL) Version 2, available from the file
8  * COPYING in the main directory of this source tree, or the
9  * BSD license below:
10  *
11  *     Redistribution and use in source and binary forms, with or
12  *     without modification, are permitted provided that the following
13  *     conditions are met:
14  *
15  *      - Redistributions of source code must retain the above
16  *        copyright notice, this list of conditions and the following
17  *        disclaimer.
18  *
19  *      - Redistributions in binary form must reproduce the above
20  *        copyright notice, this list of conditions and the following
21  *        disclaimer in the documentation and/or other materials
22  *        provided with the distribution.
23  *
24  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
25  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
26  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
27  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
28  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
29  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
30  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
31  * SOFTWARE.
32  */
33 
34 #if HAVE_CONFIG_H
35 #include <config.h>
36 #endif /* HAVE_CONFIG_H */
37 
38 #ifndef _RXR_H_
39 #define _RXR_H_
40 
41 #include <pthread.h>
42 #include <rdma/fabric.h>
43 #include <rdma/fi_atomic.h>
44 #include <rdma/fi_cm.h>
45 #include <rdma/fi_domain.h>
46 #include <rdma/fi_endpoint.h>
47 #include <rdma/fi_eq.h>
48 #include <rdma/fi_errno.h>
49 #include <rdma/fi_rma.h>
50 #include <rdma/fi_tagged.h>
51 #include <rdma/fi_trigger.h>
52 
53 #include <ofi.h>
54 #include <ofi_iov.h>
55 #include <ofi_proto.h>
56 #include <ofi_enosys.h>
57 #include <ofi_rbuf.h>
58 #include <ofi_list.h>
59 #include <ofi_util.h>
60 #include <ofi_tree.h>
61 #include <uthash.h>
62 #include <ofi_recvwin.h>
63 #include <ofi_perf.h>
64 
65 #include "rxr_pkt_entry.h"
66 #include "rxr_pkt_type.h"
67 
68 #define RXR_MAJOR_VERSION	(2)
69 #define RXR_MINOR_VERSION	(0)
70 
71 /*
72  * EFA support interoperability between protocol version 4 and above,
73  * and version 4 is considered the base version.
74  */
75 #define RXR_BASE_PROTOCOL_VERSION	(4)
76 #define RXR_CUR_PROTOCOL_VERSION	(4)
77 #define RXR_NUM_PROTOCOL_VERSION	(RXR_CUR_PROTOCOL_VERSION - RXR_BASE_PROTOCOL_VERSION + 1)
78 #define RXR_MAX_PROTOCOL_VERSION	(100)
79 
80 #define RXR_FI_VERSION		OFI_VERSION_LATEST
81 
82 #define RXR_IOV_LIMIT		(4)
83 
84 #ifdef ENABLE_EFA_POISONING
85 extern const uint32_t rxr_poison_value;
rxr_poison_mem_region(uint32_t * ptr,size_t size)86 static inline void rxr_poison_mem_region(uint32_t *ptr, size_t size)
87 {
88 	int i;
89 
90 	for (i = 0; i < size / sizeof(rxr_poison_value); i++)
91 		memcpy(ptr + i, &rxr_poison_value, sizeof(rxr_poison_value));
92 }
93 #endif
94 
95 /*
96  * Set alignment to x86 cache line size.
97  */
98 #define RXR_BUF_POOL_ALIGNMENT	(64)
99 
100 /*
101  * will add following parameters to env variable for tuning
102  */
103 #define RXR_RECVWIN_SIZE		(16384)
104 #define RXR_DEF_CQ_SIZE			(8192)
105 #define RXR_REMOTE_CQ_DATA_LEN		(8)
106 
107 /* maximum timeout for RNR backoff (microseconds) */
108 #define RXR_DEF_RNR_MAX_TIMEOUT		(1000000)
109 /* bounds for random RNR backoff timeout */
110 #define RXR_RAND_MIN_TIMEOUT		(40)
111 #define RXR_RAND_MAX_TIMEOUT		(120)
112 
113 /* bounds for flow control */
114 #define RXR_DEF_MAX_RX_WINDOW		(128)
115 #define RXR_DEF_MAX_TX_CREDITS		(64)
116 #define RXR_DEF_MIN_TX_CREDITS		(32)
117 
118 /*
119  * maximum time (microseconds) we will allow available_bufs for large msgs to
120  * be exhausted
121  */
122 #define RXR_AVAILABLE_DATA_BUFS_TIMEOUT	(5000000)
123 
124 #if ENABLE_DEBUG
125 #define RXR_TX_PKT_DBG_SIZE	(16384)
126 #define RXR_RX_PKT_DBG_SIZE	(16384)
127 #endif
128 
129 /*
130  * Based on size of tx_id and rx_id in headers, can be arbitrary once those are
131  * removed.
132  */
133 #define RXR_MAX_RX_QUEUE_SIZE (UINT32_MAX)
134 #define RXR_MAX_TX_QUEUE_SIZE (UINT32_MAX)
135 
136 /*
137  * The maximum supported source address length in bytes
138  */
139 #define RXR_MAX_NAME_LENGTH	(32)
140 
141 /*
142  * RxR specific flags that are sent over the wire.
143  */
144 #define RXR_TAGGED		BIT_ULL(0)
145 #define RXR_REMOTE_CQ_DATA	BIT_ULL(1)
146 #define RXR_REMOTE_SRC_ADDR	BIT_ULL(2)
147 
148 /*
149  * TODO: In future we will send RECV_CANCEL signal to sender,
150  * to stop transmitting large message, this flag is also
151  * used for fi_discard which has similar behavior.
152  */
153 #define RXR_RECV_CANCEL		BIT_ULL(3)
154 
155 /*
156  * Flags to tell if the rx_entry is tracking FI_MULTI_RECV buffers
157  */
158 #define RXR_MULTI_RECV_POSTED	BIT_ULL(4)
159 #define RXR_MULTI_RECV_CONSUMER	BIT_ULL(5)
160 
161 /*
162  * OFI flags
163  * The 64-bit flag field is used as follows:
164  * 1-grow up    common (usable with multiple operations)
165  * 59-grow down operation specific (used for single call/class)
166  * 60 - 63      provider specific
167  */
168 #define RXR_NO_COMPLETION	BIT_ULL(60)
169 
170 /*
171  * RM flags
172  */
173 #define RXR_RM_TX_CQ_FULL	BIT_ULL(0)
174 #define RXR_RM_RX_CQ_FULL	BIT_ULL(1)
175 
176 #define RXR_MTU_MAX_LIMIT	BIT_ULL(15)
177 
178 
179 
180 extern struct fi_info *shm_info;
181 
182 extern struct fi_provider *lower_efa_prov;
183 extern struct fi_provider rxr_prov;
184 extern struct fi_info rxr_info;
185 extern struct rxr_env rxr_env;
186 extern struct fi_fabric_attr rxr_fabric_attr;
187 extern struct util_prov rxr_util_prov;
188 extern struct efa_ep_addr *local_efa_addr;
189 
190 struct rxr_env {
191 	int rx_window_size;
192 	int tx_min_credits;
193 	int tx_max_credits;
194 	int tx_queue_size;
195 	int use_device_rdma;
196 	int enable_shm_transfer;
197 	int shm_av_size;
198 	int shm_max_medium_size;
199 	int recvwin_size;
200 	int cq_size;
201 	size_t max_memcpy_size;
202 	size_t mtu_size;
203 	size_t tx_size;
204 	size_t rx_size;
205 	size_t tx_iov_limit;
206 	size_t rx_iov_limit;
207 	int rx_copy_unexp;
208 	int rx_copy_ooo;
209 	int max_timeout;
210 	int timeout_interval;
211 	size_t efa_cq_read_size;
212 	size_t shm_cq_read_size;
213 	size_t efa_max_medium_msg_size;
214 	size_t efa_min_read_write_size;
215 	size_t efa_read_segment_size;
216 };
217 
218 enum rxr_lower_ep_type {
219 	EFA_EP = 1,
220 	SHM_EP,
221 };
222 
223 enum rxr_x_entry_type {
224 	RXR_TX_ENTRY = 1,
225 	RXR_RX_ENTRY,
226 };
227 
228 enum rxr_tx_comm_type {
229 	RXR_TX_FREE = 0,	/* tx_entry free state */
230 	RXR_TX_REQ,		/* tx_entry sending REQ packet */
231 	RXR_TX_SEND,		/* tx_entry sending data in progress */
232 	RXR_TX_QUEUED_SHM_RMA,	/* tx_entry was unable to send RMA operations over shm provider */
233 	RXR_TX_QUEUED_CTRL,	/* tx_entry was unable to send ctrl packet */
234 	RXR_TX_QUEUED_REQ_RNR,  /* tx_entry RNR sending REQ packet */
235 	RXR_TX_QUEUED_DATA_RNR,	/* tx_entry RNR sending data packets */
236 	RXR_TX_SENT_READRSP,	/* tx_entry (on remote EP) sent
237 				 * read response (FI_READ only)
238 				 */
239 	RXR_TX_QUEUED_READRSP, /* tx_entry (on remote EP) was
240 				* unable to send read response
241 				* (FI_READ only)
242 				*/
243 	RXR_TX_WAIT_READ_FINISH, /* tx_entry (on initiating EP) wait
244 				  * for rx_entry to finish receiving
245 				  * (FI_READ only)
246 				  */
247 };
248 
249 enum rxr_rx_comm_type {
250 	RXR_RX_FREE = 0,	/* rx_entry free state */
251 	RXR_RX_INIT,		/* rx_entry ready to recv RTM */
252 	RXR_RX_UNEXP,		/* rx_entry unexp msg waiting for post recv */
253 	RXR_RX_MATCHED,		/* rx_entry matched with RTM */
254 	RXR_RX_RECV,		/* rx_entry large msg recv data pkts */
255 	RXR_RX_QUEUED_CTRL,	/* rx_entry was unable to send ctrl packet */
256 	RXR_RX_QUEUED_EOR,	/* rx_entry was unable to send EOR over shm */
257 	RXR_RX_QUEUED_CTS_RNR,	/* rx_entry RNR sending CTS */
258 	RXR_RX_WAIT_READ_FINISH, /* rx_entry wait for send to finish, FI_READ */
259 	RXR_RX_WAIT_ATOMRSP_SENT, /* rx_entry wait for atomrsp packet sent completion */
260 };
261 
262 #define RXR_PEER_REQ_SENT BIT_ULL(0) /* sent a REQ to the peer, peer should send a handshake back */
263 #define RXR_PEER_HANDSHAKE_SENT BIT_ULL(1)
264 #define RXR_PEER_HANDSHAKE_RECEIVED BIT_ULL(2)
265 #define RXR_PEER_IN_BACKOFF BIT_ULL(3) /* peer is in backoff, not allowed to send */
266 #define RXR_PEER_BACKED_OFF BIT_ULL(4) /* peer backoff was increased during this loop of the progress engine */
267 
268 struct rxr_fabric {
269 	struct util_fabric util_fabric;
270 	struct fid_fabric *lower_fabric;
271 	struct fid_fabric *shm_fabric;
272 #ifdef RXR_PERF_ENABLED
273 	struct ofi_perfset perf_set;
274 #endif
275 };
276 
277 #define RXR_MAX_NUM_PROTOCOLS (RXR_MAX_PROTOCOL_VERSION - RXR_BASE_PROTOCOL_VERSION + 1)
278 
279 struct rxr_peer {
280 	bool tx_init;			/* tracks initialization of tx state */
281 	bool rx_init;			/* tracks initialization of rx state */
282 	bool is_self;			/* self flag */
283 	bool is_local;			/* local/remote peer flag */
284 	fi_addr_t shm_fiaddr;		/* fi_addr_t addr from shm provider */
285 	struct rxr_robuf *robuf;	/* tracks expected msg_id on rx */
286 	uint32_t next_msg_id;		/* sender's view of msg_id */
287 	uint32_t flags;
288 	uint32_t maxproto;		/* maximum supported protocol version by this peer */
289 	uint64_t features[RXR_MAX_NUM_PROTOCOLS]; /* the feature flag for each version */
290 	size_t tx_pending;		/* tracks pending tx ops to this peer */
291 	uint16_t tx_credits;		/* available send credits */
292 	uint16_t rx_credits;		/* available credits to allocate */
293 	uint64_t rnr_ts;		/* timestamp for RNR backoff tracking */
294 	int rnr_queued_pkt_cnt;		/* queued RNR packet count */
295 	int timeout_interval;		/* initial RNR timeout value */
296 	int rnr_timeout_exp;		/* RNR timeout exponentation calc val */
297 	struct dlist_entry rnr_entry;	/* linked to rxr_ep peer_backoff_list */
298 	struct dlist_entry entry;	/* linked to rxr_ep peer_list */
299 };
300 
301 struct rxr_queued_ctrl_info {
302 	int type;
303 	int inject;
304 };
305 
306 struct rxr_atomic_hdr {
307 	/* atomic_op is different from tx_op */
308 	uint32_t atomic_op;
309 	uint32_t datatype;
310 };
311 
312 /* extra information that is not included in fi_msg_atomic
313  * used by fetch atomic and compare atomic.
314  *     resp stands for response
315  *     comp stands for compare
316  */
317 struct rxr_atomic_ex {
318 	struct iovec resp_iov[RXR_IOV_LIMIT];
319 	int resp_iov_count;
320 	struct iovec comp_iov[RXR_IOV_LIMIT];
321 	int comp_iov_count;
322 };
323 
324 struct rxr_rx_entry {
325 	/* Must remain at the top */
326 	enum rxr_x_entry_type type;
327 
328 	fi_addr_t addr;
329 
330 	/*
331 	 * freestack ids used to lookup rx_entry during pkt recv
332 	 */
333 	uint32_t tx_id;
334 	uint32_t rx_id;
335 	uint32_t op;
336 
337 	/*
338 	 * The following two varibales are for emulated RMA fi_read only
339 	 */
340 	uint32_t rma_loc_tx_id;
341 	uint32_t rma_initiator_rx_id;
342 
343 	struct rxr_atomic_hdr atomic_hdr;
344 
345 	uint32_t msg_id;
346 
347 	uint64_t tag;
348 	uint64_t ignore;
349 
350 	uint64_t bytes_done;
351 	int64_t window;
352 	uint16_t credit_request;
353 	int credit_cts;
354 
355 	uint64_t total_len;
356 
357 	enum rxr_rx_comm_type state;
358 	struct rxr_queued_ctrl_info queued_ctrl;
359 
360 	uint64_t fi_flags;
361 	uint16_t rxr_flags;
362 
363 	size_t iov_count;
364 	struct iovec iov[RXR_IOV_LIMIT];
365 
366 	/* App-provided reg descriptor */
367 	void *desc[RXR_IOV_LIMIT];
368 
369 	/* iov_count on sender side, used for large message READ over shm */
370 	size_t rma_iov_count;
371 	struct fi_rma_iov rma_iov[RXR_IOV_LIMIT];
372 
373 	struct fi_cq_tagged_entry cq_entry;
374 
375 	/* entry is linked with rx entry lists in rxr_ep */
376 	struct dlist_entry entry;
377 
378 	/* queued_entry is linked with rx_queued_ctrl_list in rxr_ep */
379 	struct dlist_entry queued_entry;
380 
381 	/* Queued packets due to TX queue full or RNR backoff */
382 	struct dlist_entry queued_pkts;
383 
384 	/*
385 	 * A list of rx_entries tracking FI_MULTI_RECV buffers. An rx_entry of
386 	 * type RXR_MULTI_RECV_POSTED that was created when the multi-recv
387 	 * buffer was posted is the list head, and the rx_entries of type
388 	 * RXR_MULTI_RECV_CONSUMER get added to the list as they consume the
389 	 * buffer.
390 	 */
391 	struct dlist_entry multi_recv_consumers;
392 	struct dlist_entry multi_recv_entry;
393 	struct rxr_rx_entry *master_entry;
394 
395 	struct rxr_pkt_entry *unexp_pkt;
396 	struct rxr_pkt_entry *atomrsp_pkt;
397 	char *atomrsp_buf;
398 
399 #if ENABLE_DEBUG
400 	/* linked with rx_pending_list in rxr_ep */
401 	struct dlist_entry rx_pending_entry;
402 	/* linked with rx_entry_list in rxr_ep */
403 	struct dlist_entry rx_entry_entry;
404 #endif
405 };
406 
407 struct rxr_tx_entry {
408 	/* Must remain at the top */
409 	enum rxr_x_entry_type type;
410 
411 	uint32_t op;
412 	fi_addr_t addr;
413 
414 	/*
415 	 * freestack ids used to lookup tx_entry during ctrl pkt recv
416 	 */
417 	uint32_t tx_id;
418 	uint32_t rx_id;
419 
420 	uint32_t msg_id;
421 
422 	uint64_t tag;
423 
424 	uint64_t bytes_acked;
425 	uint64_t bytes_sent;
426 	int64_t window;
427 	uint16_t credit_request;
428 	uint16_t credit_allocated;
429 
430 	uint64_t total_len;
431 
432 	enum rxr_tx_comm_type state;
433 	struct rxr_queued_ctrl_info queued_ctrl;
434 
435 	uint64_t fi_flags;
436 	uint64_t send_flags;
437 	size_t iov_count;
438 	size_t iov_index;
439 	size_t iov_offset;
440 	struct iovec iov[RXR_IOV_LIMIT];
441 
442 	uint64_t rma_loc_rx_id;
443 	uint64_t rma_window;
444 	size_t rma_iov_count;
445 	struct fi_rma_iov rma_iov[RXR_IOV_LIMIT];
446 
447 	/* App-provided reg descriptor */
448 	void *desc[RXR_IOV_LIMIT];
449 
450 	/* atomic related variables */
451 	struct rxr_atomic_hdr atomic_hdr;
452 	struct rxr_atomic_ex atomic_ex;
453 
454 	/* Only used with mr threshold switch from memcpy */
455 	size_t iov_mr_start;
456 	struct fid_mr *mr[RXR_IOV_LIMIT];
457 
458 	struct fi_cq_tagged_entry cq_entry;
459 
460 	/* entry is linked with tx_pending_list in rxr_ep */
461 	struct dlist_entry entry;
462 
463 	/* queued_entry is linked with tx_queued_ctrl_list in rxr_ep */
464 	struct dlist_entry queued_entry;
465 
466 	/* Queued packets due to TX queue full or RNR backoff */
467 	struct dlist_entry queued_pkts;
468 
469 #if ENABLE_DEBUG
470 	/* linked with tx_entry_list in rxr_ep */
471 	struct dlist_entry tx_entry_entry;
472 #endif
473 };
474 
475 #define RXR_GET_X_ENTRY_TYPE(pkt_entry)	\
476 	(*((enum rxr_x_entry_type *)	\
477 	 ((unsigned char *)((pkt_entry)->x_entry))))
478 
479 enum efa_domain_type {
480 	EFA_DOMAIN_DGRAM = 0,
481 	EFA_DOMAIN_RDM,
482 };
483 
484 struct rxr_domain {
485 	struct util_domain util_domain;
486 	enum efa_domain_type type;
487 	struct fid_domain *rdm_domain;
488 	size_t mtu_size;
489 	size_t addrlen;
490 	uint8_t mr_local;
491 	uint64_t rdm_mode;
492 	int do_progress;
493 	size_t cq_size;
494 	enum fi_resource_mgmt resource_mgmt;
495 };
496 
497 struct rxr_ep {
498 	struct util_ep util_ep;
499 
500 	uint8_t core_addr[RXR_MAX_NAME_LENGTH];
501 	size_t core_addrlen;
502 
503 	/* per-version feature flag */
504 	uint64_t features[RXR_NUM_PROTOCOL_VERSION];
505 
506 	/* per-peer information */
507 	struct rxr_peer *peer;
508 
509 	/* free stack for reorder buffer */
510 	struct rxr_robuf_fs *robuf_fs;
511 
512 	/* core provider fid */
513 	struct fid_ep *rdm_ep;
514 	struct fid_cq *rdm_cq;
515 
516 	/* shm provider fid */
517 	bool use_shm;
518 	struct fid_ep *shm_ep;
519 	struct fid_cq *shm_cq;
520 
521 	/*
522 	 * RxR rx/tx queue sizes. These may be different from the core
523 	 * provider's rx/tx size and will either limit the number of possible
524 	 * receives/sends or allow queueing.
525 	 */
526 	size_t rx_size;
527 	size_t tx_size;
528 	size_t mtu_size;
529 	size_t rx_iov_limit;
530 	size_t tx_iov_limit;
531 
532 	/* core's capabilities */
533 	uint64_t core_caps;
534 
535 	/* rx/tx queue size of core provider */
536 	size_t core_rx_size;
537 	size_t max_outstanding_tx;
538 	size_t core_inject_size;
539 	size_t max_data_payload_size;
540 
541 	/* Resource management flag */
542 	uint64_t rm_full;
543 
544 	/* application's ordering requirements */
545 	uint64_t msg_order;
546 	/* core's supported tx/rx msg_order */
547 	uint64_t core_msg_order;
548 
549 	/* tx iov limit of core provider */
550 	size_t core_iov_limit;
551 
552 	/* threshold to release multi_recv buffer */
553 	size_t min_multi_recv_size;
554 
555 	/* buffer pool for send & recv */
556 	struct ofi_bufpool *tx_pkt_efa_pool;
557 	struct ofi_bufpool *rx_pkt_efa_pool;
558 
559 	/*
560 	 * buffer pool for send & recv for shm as mtu size is different from
561 	 * the one of efa, and do not require local memory registration
562 	 */
563 	struct ofi_bufpool *tx_pkt_shm_pool;
564 	struct ofi_bufpool *rx_pkt_shm_pool;
565 
566 	/* staging area for unexpected and out-of-order packets */
567 	struct ofi_bufpool *rx_unexp_pkt_pool;
568 	struct ofi_bufpool *rx_ooo_pkt_pool;
569 
570 #ifdef ENABLE_EFA_POISONING
571 	size_t tx_pkt_pool_entry_sz;
572 	size_t rx_pkt_pool_entry_sz;
573 #endif
574 
575 	/* datastructure to maintain rxr send/recv states */
576 	struct ofi_bufpool *tx_entry_pool;
577 	struct ofi_bufpool *rx_entry_pool;
578 	/* datastructure to maintain read response */
579 	struct ofi_bufpool *readrsp_tx_entry_pool;
580 	/* data structure to maintain read */
581 	struct ofi_bufpool *read_entry_pool;
582 	/* data structure to maintain pkt rx map */
583 	struct ofi_bufpool *map_entry_pool;
584 	/* rxr medium message pkt_entry to rx_entry map */
585 	struct rxr_pkt_rx_map *pkt_rx_map;
586 	/* rx_entries with recv buf */
587 	struct dlist_entry rx_list;
588 	/* rx_entries without recv buf (unexpected message) */
589 	struct dlist_entry rx_unexp_list;
590 	/* rx_entries with tagged recv buf */
591 	struct dlist_entry rx_tagged_list;
592 	/* rx_entries without tagged recv buf (unexpected message) */
593 	struct dlist_entry rx_unexp_tagged_list;
594 	/* list of pre-posted recv buffers */
595 	struct dlist_entry rx_posted_buf_list;
596 	/* list of pre-posted recv buffers for shm */
597 	struct dlist_entry rx_posted_buf_shm_list;
598 	/* tx entries with queued messages */
599 	struct dlist_entry tx_entry_queued_list;
600 	/* rx entries with queued messages */
601 	struct dlist_entry rx_entry_queued_list;
602 	/* tx_entries with data to be sent (large messages) */
603 	struct dlist_entry tx_pending_list;
604 	/* read entries with data to be read */
605 	struct dlist_entry read_pending_list;
606 	/* rxr_peer entries that are in backoff due to RNR */
607 	struct dlist_entry peer_backoff_list;
608 	/* rxr_peer entries with an allocated robuf */
609 	struct dlist_entry peer_list;
610 
611 #if ENABLE_DEBUG
612 	/* rx_entries waiting for data to arrive (large messages) */
613 	struct dlist_entry rx_pending_list;
614 	/* count of rx_pending_list */
615 	size_t rx_pending;
616 
617 	/* rx packets being processed or waiting to be processed */
618 	struct dlist_entry rx_pkt_list;
619 
620 	/* tx packets waiting for send completion */
621 	struct dlist_entry tx_pkt_list;
622 
623 	/* track allocated rx_entries and tx_entries for endpoint cleanup */
624 	struct dlist_entry rx_entry_list;
625 	struct dlist_entry tx_entry_list;
626 
627 	size_t sends;
628 	size_t send_comps;
629 	size_t failed_send_comps;
630 	size_t recv_comps;
631 #endif
632 	/* number of posted buffer for shm */
633 	size_t posted_bufs_shm;
634 	size_t rx_bufs_shm_to_post;
635 
636 	/* number of posted buffers */
637 	size_t posted_bufs_efa;
638 	size_t rx_bufs_efa_to_post;
639 	/* number of buffers available for large messages */
640 	size_t available_data_bufs;
641 	/* Timestamp of when available_data_bufs was exhausted. */
642 	uint64_t available_data_bufs_ts;
643 
644 	/* number of outstanding sends */
645 	size_t tx_pending;
646 };
647 
648 #define rxr_rx_flags(rxr_ep) ((rxr_ep)->util_ep.rx_op_flags)
649 #define rxr_tx_flags(rxr_ep) ((rxr_ep)->util_ep.tx_op_flags)
650 
651 /*
652  * Control header with completion data. CQ data length is static.
653  */
654 #define RXR_CQ_DATA_SIZE (8)
655 
rxr_copy_shm_cq_entry(struct fi_cq_tagged_entry * cq_tagged_entry,struct fi_cq_data_entry * shm_cq_entry)656 static inline void rxr_copy_shm_cq_entry(struct fi_cq_tagged_entry *cq_tagged_entry,
657 					 struct fi_cq_data_entry *shm_cq_entry)
658 {
659 	cq_tagged_entry->op_context = shm_cq_entry->op_context;
660 	cq_tagged_entry->flags = shm_cq_entry->flags;
661 	cq_tagged_entry->len = shm_cq_entry->len;
662 	cq_tagged_entry->buf = shm_cq_entry->buf;
663 	cq_tagged_entry->data = shm_cq_entry->data;
664 	cq_tagged_entry->tag = 0; // No tag for RMA;
665 
666 }
rxr_ep_get_peer(struct rxr_ep * ep,fi_addr_t addr)667 static inline struct rxr_peer *rxr_ep_get_peer(struct rxr_ep *ep,
668 					       fi_addr_t addr)
669 {
670 	return &ep->peer[addr];
671 }
672 
rxr_ep_peer_init_rx(struct rxr_ep * ep,struct rxr_peer * peer)673 static inline void rxr_ep_peer_init_rx(struct rxr_ep *ep, struct rxr_peer *peer)
674 {
675 	assert(!peer->rx_init);
676 
677 	peer->robuf = freestack_pop(ep->robuf_fs);
678 	peer->robuf = ofi_recvwin_buf_alloc(peer->robuf,
679 					    rxr_env.recvwin_size);
680 	assert(peer->robuf);
681 	dlist_insert_tail(&peer->entry, &ep->peer_list);
682 	peer->rx_credits = rxr_env.rx_window_size;
683 	peer->rx_init = 1;
684 }
685 
rxr_ep_peer_init_tx(struct rxr_peer * peer)686 static inline void rxr_ep_peer_init_tx(struct rxr_peer *peer)
687 {
688 	assert(!peer->tx_init);
689 	peer->tx_credits = rxr_env.tx_max_credits;
690 	peer->tx_init = 1;
691 }
692 
693 struct rxr_rx_entry *rxr_ep_get_rx_entry(struct rxr_ep *ep,
694 					 const struct fi_msg *msg,
695 					 uint64_t tag,
696 					 uint64_t ignore,
697 					 uint32_t op,
698 					 uint64_t flags);
699 
700 struct rxr_rx_entry *rxr_ep_rx_entry_init(struct rxr_ep *ep,
701 					  struct rxr_rx_entry *rx_entry,
702 					  const struct fi_msg *msg,
703 					  uint64_t tag,
704 					  uint64_t ignore,
705 					  uint32_t op,
706 					  uint64_t flags);
707 
708 void rxr_tx_entry_init(struct rxr_ep *rxr_ep, struct rxr_tx_entry *tx_entry,
709 		       const struct fi_msg *msg, uint32_t op, uint64_t flags);
710 
711 struct rxr_tx_entry *rxr_ep_alloc_tx_entry(struct rxr_ep *rxr_ep,
712 					   const struct fi_msg *msg,
713 					   uint32_t op,
714 					   uint64_t tag,
715 					   uint64_t flags);
716 
717 int rxr_tx_entry_mr_dereg(struct rxr_tx_entry *tx_entry);
718 
rxr_release_tx_entry(struct rxr_ep * ep,struct rxr_tx_entry * tx_entry)719 static inline void rxr_release_tx_entry(struct rxr_ep *ep,
720 					struct rxr_tx_entry *tx_entry)
721 {
722 #if ENABLE_DEBUG
723 	dlist_remove(&tx_entry->tx_entry_entry);
724 #endif
725 	assert(dlist_empty(&tx_entry->queued_pkts));
726 #ifdef ENABLE_EFA_POISONING
727 	rxr_poison_mem_region((uint32_t *)tx_entry,
728 			      sizeof(struct rxr_tx_entry));
729 #endif
730 	tx_entry->state = RXR_TX_FREE;
731 	ofi_buf_free(tx_entry);
732 }
733 
rxr_release_rx_entry(struct rxr_ep * ep,struct rxr_rx_entry * rx_entry)734 static inline void rxr_release_rx_entry(struct rxr_ep *ep,
735 					struct rxr_rx_entry *rx_entry)
736 {
737 #if ENABLE_DEBUG
738 	dlist_remove(&rx_entry->rx_entry_entry);
739 #endif
740 	assert(dlist_empty(&rx_entry->queued_pkts));
741 #ifdef ENABLE_EFA_POISONING
742 	rxr_poison_mem_region((uint32_t *)rx_entry,
743 			      sizeof(struct rxr_rx_entry));
744 #endif
745 	rx_entry->state = RXR_RX_FREE;
746 	ofi_buf_free(rx_entry);
747 }
748 
rxr_match_addr(fi_addr_t addr,fi_addr_t match_addr)749 static inline int rxr_match_addr(fi_addr_t addr, fi_addr_t match_addr)
750 {
751 	return (addr == FI_ADDR_UNSPEC || addr == match_addr);
752 }
753 
rxr_match_tag(uint64_t tag,uint64_t ignore,uint64_t match_tag)754 static inline int rxr_match_tag(uint64_t tag, uint64_t ignore,
755 				uint64_t match_tag)
756 {
757 	return ((tag | ignore) == (match_tag | ignore));
758 }
759 
rxr_ep_inc_tx_pending(struct rxr_ep * ep,struct rxr_peer * peer)760 static inline void rxr_ep_inc_tx_pending(struct rxr_ep *ep,
761 					 struct rxr_peer *peer)
762 {
763 	ep->tx_pending++;
764 	peer->tx_pending++;
765 #if ENABLE_DEBUG
766 	ep->sends++;
767 #endif
768 }
769 
rxr_ep_dec_tx_pending(struct rxr_ep * ep,struct rxr_peer * peer,int failed)770 static inline void rxr_ep_dec_tx_pending(struct rxr_ep *ep,
771 					 struct rxr_peer *peer,
772 					 int failed)
773 {
774 	ep->tx_pending--;
775 	peer->tx_pending--;
776 #if ENABLE_DEBUG
777 	if (failed)
778 		ep->failed_send_comps++;
779 #endif
780 }
781 
rxr_get_rx_pool_chunk_cnt(struct rxr_ep * ep)782 static inline size_t rxr_get_rx_pool_chunk_cnt(struct rxr_ep *ep)
783 {
784 	return MIN(ep->core_rx_size, ep->rx_size);
785 }
786 
rxr_get_tx_pool_chunk_cnt(struct rxr_ep * ep)787 static inline size_t rxr_get_tx_pool_chunk_cnt(struct rxr_ep *ep)
788 {
789 	return MIN(ep->max_outstanding_tx, ep->tx_size);
790 }
791 
rxr_need_sas_ordering(struct rxr_ep * ep)792 static inline int rxr_need_sas_ordering(struct rxr_ep *ep)
793 {
794 	return ep->msg_order & FI_ORDER_SAS;
795 }
796 
797 /* Initialization functions */
798 void rxr_reset_rx_tx_to_core(const struct fi_info *user_info,
799 			     struct fi_info *core_info);
800 int rxr_get_lower_rdm_info(uint32_t version, const char *node, const char *service,
801 			   uint64_t flags, const struct util_prov *util_prov,
802 			   const struct fi_info *util_hints,
803 			   struct fi_info **core_info);
804 int rxr_fabric(struct fi_fabric_attr *attr,
805 	       struct fid_fabric **fabric, void *context);
806 int rxr_domain_open(struct fid_fabric *fabric, struct fi_info *info,
807 		    struct fid_domain **dom, void *context);
808 int rxr_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
809 		struct fid_cq **cq_fid, void *context);
810 int rxr_endpoint(struct fid_domain *domain, struct fi_info *info,
811 		 struct fid_ep **ep, void *context);
812 
813 /* EP sub-functions */
814 void rxr_ep_progress(struct util_ep *util_ep);
815 void rxr_ep_progress_internal(struct rxr_ep *rxr_ep);
816 int rxr_ep_post_buf(struct rxr_ep *ep, uint64_t flags, enum rxr_lower_ep_type lower_ep);
817 
818 int rxr_ep_set_tx_credit_request(struct rxr_ep *rxr_ep,
819 				 struct rxr_tx_entry *tx_entry);
820 
821 int rxr_ep_tx_init_mr_desc(struct rxr_domain *rxr_domain,
822 			   struct rxr_tx_entry *tx_entry,
823 			   int mr_iov_start, uint64_t access);
824 
825 void rxr_prepare_desc_send(struct rxr_domain *rxr_domain,
826 			   struct rxr_tx_entry *tx_entry);
827 
828 struct rxr_rx_entry *rxr_ep_lookup_mediumrtm_rx_entry(struct rxr_ep *ep,
829 						      struct rxr_pkt_entry *pkt_entry);
830 
831 void rxr_ep_record_mediumrtm_rx_entry(struct rxr_ep *ep,
832 				      struct rxr_pkt_entry *pkt_entry,
833 				      struct rxr_rx_entry *rx_entry);
834 
835 struct rxr_rx_entry *rxr_ep_alloc_unexp_rx_entry_for_msgrtm(struct rxr_ep *ep,
836 							    struct rxr_pkt_entry **pkt_entry);
837 
838 struct rxr_rx_entry *rxr_ep_alloc_unexp_rx_entry_for_tagrtm(struct rxr_ep *ep,
839 							    struct rxr_pkt_entry **pkt_entry);
840 
841 struct rxr_rx_entry *rxr_ep_split_rx_entry(struct rxr_ep *ep,
842 					   struct rxr_rx_entry *posted_entry,
843 					   struct rxr_rx_entry *consumer_entry,
844 					   struct rxr_pkt_entry *pkt_entry);
845 int rxr_ep_efa_addr_to_str(const void *addr, char *temp_name);
846 
847 /* CQ sub-functions */
848 int rxr_cq_handle_rx_error(struct rxr_ep *ep, struct rxr_rx_entry *rx_entry,
849 			   ssize_t prov_errno);
850 int rxr_cq_handle_tx_error(struct rxr_ep *ep, struct rxr_tx_entry *tx_entry,
851 			   ssize_t prov_errno);
852 int rxr_cq_handle_cq_error(struct rxr_ep *ep, ssize_t err);
853 
854 void rxr_cq_write_rx_completion(struct rxr_ep *ep,
855 				struct rxr_rx_entry *rx_entry);
856 
857 void rxr_cq_handle_rx_completion(struct rxr_ep *ep,
858 				 struct rxr_pkt_entry *pkt_entry,
859 				 struct rxr_rx_entry *rx_entry);
860 
861 void rxr_cq_write_tx_completion(struct rxr_ep *ep,
862 				struct rxr_tx_entry *tx_entry);
863 
864 void rxr_cq_handle_tx_completion(struct rxr_ep *ep,
865 				 struct rxr_tx_entry *tx_entry);
866 
867 void rxr_cq_handle_shm_completion(struct rxr_ep *ep,
868 				  struct fi_cq_data_entry *cq_entry,
869 				  fi_addr_t src_addr);
870 
871 int rxr_cq_reorder_msg(struct rxr_ep *ep,
872 		       struct rxr_peer *peer,
873 		       struct rxr_pkt_entry *pkt_entry);
874 
875 void rxr_cq_proc_pending_items_in_recvwin(struct rxr_ep *ep,
876 					  struct rxr_peer *peer);
877 
878 void rxr_cq_handle_shm_rma_write_data(struct rxr_ep *ep,
879 				      struct fi_cq_data_entry *shm_comp,
880 				      fi_addr_t src_addr);
881 
882 /* Aborts if unable to write to the eq */
efa_eq_write_error(struct util_ep * ep,ssize_t err,ssize_t prov_errno)883 static inline void efa_eq_write_error(struct util_ep *ep, ssize_t err,
884 				      ssize_t prov_errno)
885 {
886 	struct fi_eq_err_entry err_entry;
887 	int ret = -FI_ENOEQ;
888 
889 	FI_WARN(&rxr_prov, FI_LOG_EQ, "Writing error %s to EQ.\n",
890 		fi_strerror(err));
891 	if (ep->eq) {
892 		memset(&err_entry, 0, sizeof(err_entry));
893 		err_entry.err = err;
894 		err_entry.prov_errno = prov_errno;
895 		ret = fi_eq_write(&ep->eq->eq_fid, FI_NOTIFY,
896 				  &err_entry, sizeof(err_entry),
897 				  UTIL_FLAG_ERROR);
898 
899 		if (ret == sizeof(err_entry))
900 			return;
901 	}
902 
903 	FI_WARN(&rxr_prov, FI_LOG_EQ,
904 		"Unable to write to EQ: %s. err: %s (%zd) prov_errno: %s (%zd)\n",
905 		fi_strerror(-ret), fi_strerror(err), err,
906 		fi_strerror(prov_errno), prov_errno);
907 	fprintf(stderr,
908 		"Unable to write to EQ: %s. err: %s (%zd) prov_errno: %s (%zd) %s:%d\n",
909 		fi_strerror(-ret), fi_strerror(err), err,
910 		fi_strerror(prov_errno), prov_errno, __FILE__, __LINE__);
911 	abort();
912 }
913 
rxr_ep_domain(struct rxr_ep * ep)914 static inline struct rxr_domain *rxr_ep_domain(struct rxr_ep *ep)
915 {
916 	return container_of(ep->util_ep.domain, struct rxr_domain, util_domain);
917 }
918 
rxr_ep_mr_local(struct rxr_ep * ep)919 static inline uint8_t rxr_ep_mr_local(struct rxr_ep *ep)
920 {
921 	struct rxr_domain *domain = container_of(ep->util_ep.domain,
922 						 struct rxr_domain,
923 						 util_domain);
924 	return domain->mr_local;
925 }
926 
927 /*
928  * today we have only cq res check, in future we will have ctx, and other
929  * resource check as well.
930  */
is_tx_res_full(struct rxr_ep * ep)931 static inline uint64_t is_tx_res_full(struct rxr_ep *ep)
932 {
933 	return ep->rm_full & RXR_RM_TX_CQ_FULL;
934 }
935 
is_rx_res_full(struct rxr_ep * ep)936 static inline uint64_t is_rx_res_full(struct rxr_ep *ep)
937 {
938 	return ep->rm_full & RXR_RM_RX_CQ_FULL;
939 }
940 
rxr_rm_rx_cq_check(struct rxr_ep * ep,struct util_cq * rx_cq)941 static inline void rxr_rm_rx_cq_check(struct rxr_ep *ep, struct util_cq *rx_cq)
942 {
943 	fastlock_acquire(&rx_cq->cq_lock);
944 	if (ofi_cirque_isfull(rx_cq->cirq))
945 		ep->rm_full |= RXR_RM_RX_CQ_FULL;
946 	else
947 		ep->rm_full &= ~RXR_RM_RX_CQ_FULL;
948 	fastlock_release(&rx_cq->cq_lock);
949 }
950 
rxr_rm_tx_cq_check(struct rxr_ep * ep,struct util_cq * tx_cq)951 static inline void rxr_rm_tx_cq_check(struct rxr_ep *ep, struct util_cq *tx_cq)
952 {
953 	fastlock_acquire(&tx_cq->cq_lock);
954 	if (ofi_cirque_isfull(tx_cq->cirq))
955 		ep->rm_full |= RXR_RM_TX_CQ_FULL;
956 	else
957 		ep->rm_full &= ~RXR_RM_TX_CQ_FULL;
958 	fastlock_release(&tx_cq->cq_lock);
959 }
960 
rxr_peer_timeout_expired(struct rxr_ep * ep,struct rxr_peer * peer,uint64_t ts)961 static inline bool rxr_peer_timeout_expired(struct rxr_ep *ep,
962 					    struct rxr_peer *peer,
963 					    uint64_t ts)
964 {
965 	return (ts >= (peer->rnr_ts + MIN(rxr_env.max_timeout,
966 					  peer->timeout_interval *
967 					  (1 << peer->rnr_timeout_exp))));
968 }
969 
970 /* Performance counter declarations */
971 #ifdef RXR_PERF_ENABLED
972 #define RXR_PERF_FOREACH(DECL)	\
973 	DECL(perf_rxr_tx),	\
974 	DECL(perf_rxr_recv),	\
975 	DECL(rxr_perf_size)	\
976 
977 enum rxr_perf_counters {
978 	RXR_PERF_FOREACH(OFI_ENUM_VAL)
979 };
980 
981 extern const char *rxr_perf_counters_str[];
982 
rxr_perfset_start(struct rxr_ep * ep,size_t index)983 static inline void rxr_perfset_start(struct rxr_ep *ep, size_t index)
984 {
985 	struct rxr_domain *domain = rxr_ep_domain(ep);
986 	struct rxr_fabric *fabric = container_of(domain->util_domain.fabric,
987 						 struct rxr_fabric,
988 						 util_fabric);
989 	ofi_perfset_start(&fabric->perf_set, index);
990 }
991 
rxr_perfset_end(struct rxr_ep * ep,size_t index)992 static inline void rxr_perfset_end(struct rxr_ep *ep, size_t index)
993 {
994 	struct rxr_domain *domain = rxr_ep_domain(ep);
995 	struct rxr_fabric *fabric = container_of(domain->util_domain.fabric,
996 						 struct rxr_fabric,
997 						 util_fabric);
998 	ofi_perfset_end(&fabric->perf_set, index);
999 }
1000 #else
1001 #define rxr_perfset_start(ep, index) do {} while (0)
1002 #define rxr_perfset_end(ep, index) do {} while (0)
1003 #endif
1004 #endif
1005