1 
2 /*
3  * Copyright (c) 2016 Intel Corporation, Inc.  All rights reserved.
4  * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  */
34 
35 #if HAVE_CONFIG_H
36 #  include <config.h>
37 #endif /* HAVE_CONFIG_H */
38 
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42 
43 #include <rdma/fabric.h>
44 #include <rdma/fi_errno.h>
45 #include <rdma/fi_cm.h>
46 #include <rdma/fi_domain.h>
47 #include <rdma/fi_endpoint.h>
48 #include <rdma/fi_eq.h>
49 
50 #include <ofi.h>
51 #include <ofi_enosys.h>
52 #include <ofi_util.h>
53 #include <ofi_list.h>
54 #include <ofi_proto.h>
55 #include <ofi_iov.h>
56 
57 #ifndef _RXM_H_
58 #define _RXM_H_
59 
60 
61 #define RXM_CM_DATA_VERSION	1
62 #define RXM_OP_VERSION		3
63 #define RXM_CTRL_VERSION	4
64 
65 #define RXM_BUF_SIZE	16384
66 extern size_t rxm_eager_limit;
67 
68 #define RXM_SAR_LIMIT	131072
69 #define RXM_SAR_TX_ERROR	UINT64_MAX
70 #define RXM_SAR_RX_INIT		UINT64_MAX
71 
72 #define RXM_IOV_LIMIT 4
73 
74 #define RXM_MR_MODES	(OFI_MR_BASIC_MAP | FI_MR_LOCAL)
75 
76 #define RXM_PASSTHRU_TX_OP_FLAGS (FI_TRANSMIT_COMPLETE)
77 
78 #define RXM_PASSTHRU_RX_OP_FLAGS 0ULL
79 
80 #define RXM_TX_OP_FLAGS (FI_INJECT | FI_INJECT_COMPLETE | \
81 			 FI_DELIVERY_COMPLETE | FI_COMPLETION)
82 #define RXM_RX_OP_FLAGS (FI_MULTI_RECV | FI_COMPLETION)
83 
84 #define RXM_MR_VIRT_ADDR(info) ((info->domain_attr->mr_mode == FI_MR_BASIC) ||\
85 				info->domain_attr->mr_mode & FI_MR_VIRT_ADDR)
86 
87 #define RXM_MR_PROV_KEY(info) ((info->domain_attr->mr_mode == FI_MR_BASIC) ||\
88 			       info->domain_attr->mr_mode & FI_MR_PROV_KEY)
89 
90 #define RXM_UPDATE_STATE(subsystem, buf, new_state)			\
91 	do {								\
92 		FI_DBG(&rxm_prov, subsystem, "[PROTO] msg_id: 0x%"	\
93 		       PRIx64 " %s -> %s\n", (buf)->pkt.ctrl_hdr.msg_id,\
94 		       rxm_proto_state_str[(buf)->hdr.state],		\
95 		       rxm_proto_state_str[new_state]);			\
96 		(buf)->hdr.state = new_state;				\
97 	} while (0)
98 
99 #define RXM_DBG_ADDR_TAG(subsystem, log_str, addr, tag) 	\
100 	FI_DBG(&rxm_prov, subsystem, log_str 			\
101 	       " (fi_addr: 0x%" PRIx64 " tag: 0x%" PRIx64 ")\n",\
102 	       addr, tag)
103 
104 #define RXM_GET_PROTO_STATE(context)					\
105 	(*(enum rxm_proto_state *)					\
106 	  ((unsigned char *)context + offsetof(struct rxm_buf, state)))
107 
108 #define RXM_SET_PROTO_STATE(comp, new_state)				\
109 do {									\
110 	(*(enum rxm_proto_state *)					\
111 	  ((unsigned char *)(comp)->op_context +			\
112 		offsetof(struct rxm_buf, state))) = (new_state);	\
113 } while (0)
114 
115 #define rxm_tx_buf_2_msg_id(rxm_ep, pool_type, tx_buf)				\
116 	((uint64_t) rxm_get_buf_index(&(rxm_ep)->buf_pools[pool_type],		\
117 				       (void *) tx_buf))
118 #define rxm_msg_id_2_tx_buf(rxm_ep, pool_type, msg_id)				\
119 	((void *) rxm_buf_get_by_index(&(rxm_ep)->buf_pools[pool_type],		\
120 				       (uint64_t) msg_id))
121 
122 extern struct fi_provider rxm_prov;
123 extern struct util_prov rxm_util_prov;
124 extern struct fi_ops_rma rxm_ops_rma;
125 extern struct fi_ops_atomic rxm_ops_atomic;
126 
127 extern size_t rxm_msg_tx_size;
128 extern size_t rxm_msg_rx_size;
129 extern size_t rxm_def_univ_size;
130 extern size_t rxm_cm_progress_interval;
131 extern size_t rxm_cq_eq_fairness;
132 extern int force_auto_progress;
133 extern enum fi_wait_obj def_wait_obj;
134 
135 struct rxm_ep;
136 
137 
138 /*
139  * Connection Map
140  */
141 
142 #define RXM_CMAP_IDX_BITS OFI_IDX_INDEX_BITS
143 
144 enum rxm_cmap_signal {
145 	RXM_CMAP_UNSPEC,
146 	RXM_CMAP_FREE,
147 	RXM_CMAP_EXIT,
148 };
149 
150 #define RXM_CM_STATES(FUNC)		\
151 	FUNC(RXM_CMAP_IDLE),		\
152 	FUNC(RXM_CMAP_CONNREQ_SENT),	\
153 	FUNC(RXM_CMAP_CONNREQ_RECV),	\
154 	FUNC(RXM_CMAP_CONNECTED),	\
155 	FUNC(RXM_CMAP_SHUTDOWN),	\
156 
157 enum rxm_cmap_state {
158 	RXM_CM_STATES(OFI_ENUM_VAL)
159 };
160 
161 extern char *rxm_cm_state_str[];
162 
163 #define RXM_CM_UPDATE_STATE(handle, new_state)				\
164 	do {								\
165 		FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "[CM] handle: "	\
166 		       "%p %s -> %s\n",	handle,				\
167 		       rxm_cm_state_str[handle->state],			\
168 		       rxm_cm_state_str[new_state]);			\
169 		handle->state = new_state;				\
170 	} while (0)
171 
172 struct rxm_cmap_handle {
173 	struct rxm_cmap *cmap;
174 	enum rxm_cmap_state state;
175 	/* Unique identifier for a connection. Can be exchanged with a peer
176 	 * during connection setup and can later be used in a message header
177 	 * to identify the source of the message (Used for FI_SOURCE, RNDV
178 	 * protocol, etc.) */
179 	uint64_t key;
180 	uint64_t remote_key;
181 	fi_addr_t fi_addr;
182 	struct rxm_cmap_peer *peer;
183 };
184 
185 struct rxm_cmap_peer {
186 	struct rxm_cmap_handle *handle;
187 	struct dlist_entry entry;
188 	uint8_t addr[];
189 };
190 
191 struct rxm_cmap_attr {
192 	void 				*name;
193 };
194 
195 struct rxm_cmap {
196 	struct rxm_ep		*ep;
197 	struct util_av		*av;
198 
199 	/* cmap handles that correspond to addresses in AV */
200 	struct rxm_cmap_handle **handles_av;
201 	size_t			num_allocated;
202 
203 	/* Store all cmap handles (inclusive of handles_av) in an indexer.
204 	 * This allows reverse lookup of the handle using the index. */
205 	struct indexer		handles_idx;
206 
207 	struct ofi_key_idx	key_idx;
208 
209 	struct dlist_entry	peer_list;
210 	struct rxm_cmap_attr	attr;
211 	pthread_t		cm_thread;
212 	ofi_fastlock_acquire_t	acquire;
213 	ofi_fastlock_release_t	release;
214 	fastlock_t		lock;
215 };
216 
217 enum rxm_cmap_reject_reason {
218 	RXM_CMAP_REJECT_UNSPEC,
219 	RXM_CMAP_REJECT_GENUINE,
220 	RXM_CMAP_REJECT_SIMULT_CONN,
221 };
222 
223 union rxm_cm_data {
224 	struct _connect {
225 		uint8_t version;
226 		uint8_t endianness;
227 		uint8_t ctrl_version;
228 		uint8_t op_version;
229 		uint16_t port;
230 		uint8_t padding[2];
231 		uint32_t eager_size;
232 		uint32_t rx_size;
233 		uint64_t client_conn_id;
234 	} connect;
235 
236 	struct _accept {
237 		uint64_t server_conn_id;
238 		uint32_t rx_size;
239 	} accept;
240 
241 	struct _reject {
242 		uint8_t version;
243 		uint8_t reason;
244 	} reject;
245 };
246 
247 struct rxm_cmap_handle *rxm_cmap_key2handle(struct rxm_cmap *cmap, uint64_t key);
248 int rxm_cmap_update(struct rxm_cmap *cmap, const void *addr, fi_addr_t fi_addr);
249 
250 void rxm_cmap_process_reject(struct rxm_cmap *cmap,
251 			     struct rxm_cmap_handle *handle,
252 			     enum rxm_cmap_reject_reason cm_reject_reason);
253 void rxm_cmap_process_shutdown(struct rxm_cmap *cmap,
254 			       struct rxm_cmap_handle *handle);
255 int rxm_cmap_connect(struct rxm_ep *rxm_ep, fi_addr_t fi_addr,
256 		     struct rxm_cmap_handle *handle);
257 void rxm_cmap_del_handle_ts(struct rxm_cmap_handle *handle);
258 void rxm_cmap_free(struct rxm_cmap *cmap);
259 int rxm_cmap_alloc(struct rxm_ep *rxm_ep, struct rxm_cmap_attr *attr);
260 int rxm_cmap_remove(struct rxm_cmap *cmap, int index);
261 int rxm_msg_eq_progress(struct rxm_ep *rxm_ep);
262 
263 static inline struct rxm_cmap_handle *
rxm_cmap_acquire_handle(struct rxm_cmap * cmap,fi_addr_t fi_addr)264 rxm_cmap_acquire_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr)
265 {
266 	assert(fi_addr < cmap->num_allocated);
267 	return cmap->handles_av[fi_addr];
268 }
269 
270 struct rxm_fabric {
271 	struct util_fabric util_fabric;
272 	struct fid_fabric *msg_fabric;
273 };
274 
275 struct rxm_domain {
276 	struct util_domain util_domain;
277 	struct fid_domain *msg_domain;
278 	size_t max_atomic_size;
279 	uint64_t mr_key;
280 	uint8_t mr_local;
281 };
282 
283 int rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
284 		struct fid_av **av, void *context);
285 
286 struct rxm_mr {
287 	struct fid_mr mr_fid;
288 	struct fid_mr *msg_mr;
289 	struct rxm_domain *domain;
290 };
291 
292 struct rxm_rndv_hdr {
293 	struct ofi_rma_iov iov[RXM_IOV_LIMIT];
294 	uint8_t count;
295 };
296 
297 #define rxm_pkt_rndv_data(rxm_pkt) \
298 	((rxm_pkt)->data + sizeof(struct rxm_rndv_hdr))
299 
300 struct rxm_atomic_hdr {
301 	struct fi_rma_ioc rma_ioc[RXM_IOV_LIMIT];
302 	char data[];
303 };
304 
305 struct rxm_atomic_resp_hdr {
306 	int32_t status;
307 	uint32_t result_len;
308 	char data[];
309 };
310 
311 /*
312  * Macros to generate enums and associated string values
313  * e.g.
314  * #define RXM_PROTO_STATES(FUNC)	\
315  * 	FUNC(STATE1),			\
316  * 	FUNC(STATE2),			\
317  * 	...				\
318  * 	FUNC(STATEn)
319  *
320  * enum rxm_state {
321  * 	RXM_PROTO_STATES(OFI_ENUM_VAL)
322  * };
323  *
324  * char *rxm_state_str[] = {
325  * 	RXM_PROTO_STATES(OFI_STR)
326  * };
327  */
328 
329 /* RXM protocol states / tx/rx context */
330 #define RXM_PROTO_STATES(FUNC)		\
331 	FUNC(RXM_TX),			\
332 	FUNC(RXM_INJECT_TX),		\
333 	FUNC(RXM_RMA),			\
334 	FUNC(RXM_RX),			\
335 	FUNC(RXM_SAR_TX),		\
336 	FUNC(RXM_RNDV_TX),		\
337 	FUNC(RXM_RNDV_ACK_WAIT),	\
338 	FUNC(RXM_RNDV_READ),		\
339 	FUNC(RXM_RNDV_ACK_SENT),	\
340 	FUNC(RXM_RNDV_ACK_RECVD),	\
341 	FUNC(RXM_RNDV_FINISH),		\
342 	FUNC(RXM_ATOMIC_RESP_WAIT),	\
343 	FUNC(RXM_ATOMIC_RESP_SENT)
344 
345 enum rxm_proto_state {
346 	RXM_PROTO_STATES(OFI_ENUM_VAL)
347 };
348 
349 extern char *rxm_proto_state_str[];
350 
351 enum {
352 	rxm_ctrl_eager,
353 	rxm_ctrl_seg,
354 	rxm_ctrl_rndv,
355 	rxm_ctrl_rndv_ack,
356 	rxm_ctrl_atomic,
357 	rxm_ctrl_atomic_resp,
358 };
359 
360 struct rxm_pkt {
361 	struct ofi_ctrl_hdr ctrl_hdr;
362 	struct ofi_op_hdr hdr;
363 	char data[];
364 };
365 
366 union rxm_sar_ctrl_data {
367 	struct {
368 		enum rxm_sar_seg_type {
369 			RXM_SAR_SEG_FIRST	= 1,
370 			RXM_SAR_SEG_MIDDLE	= 2,
371 			RXM_SAR_SEG_LAST	= 3,
372 		} seg_type : 2;
373 		uint32_t offset;
374 	};
375 	uint64_t align;
376 };
377 
378 static inline enum rxm_sar_seg_type
rxm_sar_get_seg_type(struct ofi_ctrl_hdr * ctrl_hdr)379 rxm_sar_get_seg_type(struct ofi_ctrl_hdr *ctrl_hdr)
380 {
381 	return ((union rxm_sar_ctrl_data *)&(ctrl_hdr->ctrl_data))->seg_type;
382 }
383 
384 static inline void
rxm_sar_set_seg_type(struct ofi_ctrl_hdr * ctrl_hdr,enum rxm_sar_seg_type seg_type)385 rxm_sar_set_seg_type(struct ofi_ctrl_hdr *ctrl_hdr, enum rxm_sar_seg_type seg_type)
386 {
387 	((union rxm_sar_ctrl_data *)&(ctrl_hdr->ctrl_data))->seg_type = seg_type;
388 }
389 
390 struct rxm_recv_match_attr {
391 	fi_addr_t addr;
392 	uint64_t tag;
393 	uint64_t ignore;
394 };
395 
396 struct rxm_unexp_msg {
397 	struct dlist_entry entry;
398 	fi_addr_t addr;
399 	uint64_t tag;
400 };
401 
402 struct rxm_iov {
403 	struct iovec iov[RXM_IOV_LIMIT];
404 	void *desc[RXM_IOV_LIMIT];
405 	uint8_t count;
406 };
407 
408 enum rxm_buf_pool_type {
409 	RXM_BUF_POOL_RX		= 0,
410 	RXM_BUF_POOL_START	= RXM_BUF_POOL_RX,
411 	RXM_BUF_POOL_TX,
412 	RXM_BUF_POOL_TX_START	= RXM_BUF_POOL_TX,
413 	RXM_BUF_POOL_TX_INJECT,
414 	RXM_BUF_POOL_TX_ACK,
415 	RXM_BUF_POOL_TX_RNDV,
416 	RXM_BUF_POOL_TX_ATOMIC,
417 	RXM_BUF_POOL_TX_SAR,
418 	RXM_BUF_POOL_TX_END	= RXM_BUF_POOL_TX_SAR,
419 	RXM_BUF_POOL_RMA,
420 	RXM_BUF_POOL_MAX,
421 };
422 
423 struct rxm_buf {
424 	/* Must stay at top */
425 	struct fi_context fi_context;
426 
427 	enum rxm_proto_state state;
428 
429 	void *desc;
430 };
431 
432 struct rxm_rx_buf {
433 	/* Must stay at top */
434 	struct rxm_buf hdr;
435 
436 	struct rxm_ep *ep;
437 	/* MSG EP / shared context to which bufs would be posted to */
438 	struct fid_ep *msg_ep;
439 	struct dlist_entry repost_entry;
440 	struct rxm_conn *conn;
441 	struct rxm_recv_entry *recv_entry;
442 	struct rxm_unexp_msg unexp_msg;
443 	uint64_t comp_flags;
444 	struct fi_recv_context recv_context;
445 	// TODO remove this and modify unexp msg handling path to not repost
446 	// rx_buf
447 	uint8_t repost;
448 
449 	/* Used for large messages */
450 	struct rxm_rndv_hdr *rndv_hdr;
451 	size_t rndv_rma_index;
452 	struct fid_mr *mr[RXM_IOV_LIMIT];
453 
454 	/* Must stay at bottom */
455 	struct rxm_pkt pkt;
456 };
457 
458 struct rxm_tx_base_buf {
459 	/* Must stay at top */
460 	struct rxm_buf hdr;
461 
462 	/* Must stay at bottom */
463 	struct rxm_pkt pkt;
464 };
465 
466 struct rxm_tx_eager_buf {
467 	/* Must stay at top */
468 	struct rxm_buf hdr;
469 
470 	void *app_context;
471 	uint64_t flags;
472 
473 	/* Must stay at bottom */
474 	struct rxm_pkt pkt;
475 };
476 
477 struct rxm_tx_sar_buf {
478 	/* Must stay at top */
479 	struct rxm_buf hdr;
480 
481 	void *app_context;
482 	uint64_t flags;
483 
484 	/* Must stay at bottom */
485 	struct rxm_pkt pkt;
486 };
487 
488 struct rxm_tx_rndv_buf {
489 	/* Must stay at top */
490 	struct rxm_buf hdr;
491 
492 	void *app_context;
493 	uint64_t flags;
494 	struct fid_mr *mr[RXM_IOV_LIMIT];
495 	uint8_t count;
496 
497 	/* Must stay at bottom */
498 	struct rxm_pkt pkt;
499 };
500 
501 struct rxm_rma_buf {
502 	/* Must stay at top */
503 	struct rxm_buf hdr;
504 
505 	void *app_context;
506 	uint64_t flags;
507 
508 	struct {
509 		struct fid_mr *mr[RXM_IOV_LIMIT];
510 		uint8_t count;
511 	} mr;
512 	/* Must stay at bottom */
513 	struct rxm_pkt pkt;
514 };
515 
516 struct rxm_tx_atomic_buf {
517 	/* Must stay at top */
518 	struct rxm_buf hdr;
519 
520 	void *app_context;
521 	uint64_t flags;
522 	struct iovec result_iov[RXM_IOV_LIMIT];
523 	uint8_t result_iov_count;
524 
525 	/* Must stay at bottom */
526 	struct rxm_pkt pkt;
527 };
528 
529 enum rxm_deferred_tx_entry_type {
530 	RXM_DEFERRED_TX_RNDV_ACK,
531 	RXM_DEFERRED_TX_RNDV_READ,
532 	RXM_DEFERRED_TX_SAR_SEG,
533 	RXM_DEFERRED_TX_ATOMIC_RESP,
534 };
535 
536 struct rxm_deferred_tx_entry {
537 	struct rxm_ep *rxm_ep;
538 	struct rxm_conn *rxm_conn;
539 	struct dlist_entry entry;
540 	enum rxm_deferred_tx_entry_type type;
541 
542 	union {
543 		struct {
544 			struct rxm_rx_buf *rx_buf;
545 		} rndv_ack;
546 		struct {
547 			struct rxm_rx_buf *rx_buf;
548 			struct fi_rma_iov rma_iov;
549 			struct rxm_iov rxm_iov;
550 		} rndv_read;
551 		struct {
552 			struct rxm_tx_sar_buf *cur_seg_tx_buf;
553 			struct {
554 				struct iovec iov[RXM_IOV_LIMIT];
555 				uint8_t count;
556 				size_t cur_iov_offset;
557 				uint64_t data;
558 				uint64_t tag;
559 			} payload;
560 			size_t next_seg_no;
561 			size_t segs_cnt;
562 			uint8_t op;
563 			size_t total_len;
564 			size_t remain_len;
565 			uint64_t msg_id;
566 			void *app_context;
567 			uint64_t flags;
568 		} sar_seg;
569 		struct {
570 			struct rxm_tx_atomic_buf *tx_buf;
571 			ssize_t len;
572 		} atomic_resp;
573 	};
574 };
575 
576 struct rxm_recv_entry {
577 	struct dlist_entry entry;
578 	struct rxm_iov rxm_iov;
579 	fi_addr_t addr;
580 	void *context;
581 	uint64_t flags;
582 	uint64_t tag;
583 	uint64_t ignore;
584 	uint64_t comp_flags;
585 	size_t total_len;
586 	struct rxm_recv_queue *recv_queue;
587 
588 	/* Used for SAR protocol */
589 	struct {
590 		struct dlist_entry entry;
591 		size_t total_recv_len;
592 		struct rxm_conn *conn;
593 		uint64_t msg_id;
594 	} sar;
595 	/* Used for Rendezvous protocol */
596 	struct {
597 		/* This is used to send RNDV ACK */
598 		struct rxm_tx_base_buf *tx_buf;
599 	} rndv;
600 };
601 DECLARE_FREESTACK(struct rxm_recv_entry, rxm_recv_fs);
602 
603 enum rxm_recv_queue_type {
604 	RXM_RECV_QUEUE_UNSPEC,
605 	RXM_RECV_QUEUE_MSG,
606 	RXM_RECV_QUEUE_TAGGED,
607 };
608 
609 struct rxm_recv_queue {
610 	struct rxm_ep *rxm_ep;
611 	enum rxm_recv_queue_type type;
612 	struct rxm_recv_fs *fs;
613 	struct dlist_entry recv_list;
614 	struct dlist_entry unexp_msg_list;
615 	dlist_func_t *match_recv;
616 	dlist_func_t *match_unexp;
617 };
618 
619 struct rxm_buf_pool {
620 	enum rxm_buf_pool_type type;
621 	struct ofi_bufpool *pool;
622 	struct rxm_ep *rxm_ep;
623 };
624 
625 struct rxm_msg_eq_entry {
626 	ssize_t			rd;
627 	uint32_t		event;
628 	/* Used for connection refusal */
629 	void			*context;
630 	struct fi_eq_err_entry	err_entry;
631 	/* must stay at the bottom */
632 	struct fi_eq_cm_entry	cm_entry;
633 };
634 
635 #define RXM_MSG_EQ_ENTRY_SZ (sizeof(struct rxm_msg_eq_entry) + \
636 			     sizeof(union rxm_cm_data))
637 #define RXM_CM_ENTRY_SZ (sizeof(struct fi_eq_cm_entry) + \
638 			 sizeof(union rxm_cm_data))
639 
640 struct rxm_handle_txrx_ops {
641 	int (*comp_eager_tx)(struct rxm_ep *rxm_ep,
642 				    struct rxm_tx_eager_buf *tx_eager_buf);
643 	ssize_t (*handle_eager_rx)(struct rxm_rx_buf *rx_buf);
644 	ssize_t (*handle_rndv_rx)(struct rxm_rx_buf *rx_buf);
645 	ssize_t (*handle_seg_data_rx)(struct rxm_rx_buf *rx_buf);
646 };
647 
648 struct rxm_ep {
649 	struct util_ep 		util_ep;
650 	struct fi_info 		*rxm_info;
651 	struct fi_info 		*msg_info;
652 	struct rxm_cmap		*cmap;
653 	struct fid_pep 		*msg_pep;
654 	struct fid_eq 		*msg_eq;
655 	struct fid_cq 		*msg_cq;
656 	uint64_t		msg_cq_last_poll;
657 	struct fid_ep 		*srx_ctx;
658 	size_t 			comp_per_progress;
659 	ofi_atomic32_t		atomic_tx_credits;
660 	int			cq_eq_fairness;
661 
662 	bool			msg_mr_local;
663 	bool			rdm_mr_local;
664 	bool			do_progress;
665 
666 	size_t			min_multi_recv_size;
667 	size_t			buffered_min;
668 	size_t			buffered_limit;
669 	size_t			inject_limit;
670 	size_t			eager_limit;
671 	size_t			sar_limit;
672 
673 	struct rxm_buf_pool	*buf_pools;
674 
675 	struct dlist_entry	repost_ready_list;
676 	struct dlist_entry	deferred_tx_conn_queue;
677 
678 	struct rxm_recv_queue	recv_queue;
679 	struct rxm_recv_queue	trecv_queue;
680 
681 	struct rxm_handle_txrx_ops *txrx_ops;
682 };
683 
684 struct rxm_conn {
685 	/* This should stay at the top */
686 	struct rxm_cmap_handle handle;
687 
688 	struct fid_ep *msg_ep;
689 
690 	/* This is used only in non-FI_THREAD_SAFE case */
691 	struct rxm_pkt *inject_pkt;
692 	struct rxm_pkt *inject_data_pkt;
693 	struct rxm_pkt *tinject_pkt;
694 	struct rxm_pkt *tinject_data_pkt;
695 
696 	struct dlist_entry deferred_conn_entry;
697 	struct dlist_entry deferred_tx_queue;
698 	struct dlist_entry sar_rx_msg_list;
699 	struct dlist_entry sar_deferred_rx_msg_list;
700 
701 	uint32_t rndv_tx_credits;
702 };
703 
704 extern struct fi_provider rxm_prov;
705 extern struct fi_info rxm_info;
706 extern struct fi_fabric_attr rxm_fabric_attr;
707 extern struct fi_domain_attr rxm_domain_attr;
708 extern struct fi_tx_attr rxm_tx_attr;
709 extern struct fi_rx_attr rxm_rx_attr;
710 
711 int rxm_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric,
712 			void *context);
713 int rxm_info_to_core(uint32_t version, const struct fi_info *rxm_info,
714 		     struct fi_info *core_info);
715 int rxm_info_to_rxm(uint32_t version, const struct fi_info *core_info,
716 		    struct fi_info *info);
717 int rxm_domain_open(struct fid_fabric *fabric, struct fi_info *info,
718 			     struct fid_domain **dom, void *context);
719 int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
720 			 struct fid_cq **cq_fid, void *context);
721 ssize_t rxm_cq_handle_rx_buf(struct rxm_rx_buf *rx_buf);
722 
723 int rxm_endpoint(struct fid_domain *domain, struct fi_info *info,
724 			  struct fid_ep **ep, void *context);
725 
726 int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep);
727 void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr,
728 			void *op_context, int err);
729 void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err);
730 void rxm_cq_read_write_error(struct rxm_ep *rxm_ep);
731 ssize_t rxm_cq_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp);
732 void rxm_ep_progress(struct util_ep *util_ep);
733 void rxm_ep_progress_coll(struct util_ep *util_ep);
734 void rxm_ep_do_progress(struct util_ep *util_ep);
735 
736 ssize_t rxm_cq_handle_eager(struct rxm_rx_buf *rx_buf);
737 ssize_t rxm_cq_handle_coll_eager(struct rxm_rx_buf *rx_buf);
738 ssize_t rxm_cq_handle_rndv(struct rxm_rx_buf *rx_buf);
739 ssize_t rxm_cq_handle_seg_data(struct rxm_rx_buf *rx_buf);
740 int rxm_finish_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_eager_buf);
741 int rxm_finish_coll_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_eager_buf);
742 
743 int rxm_msg_ep_prepost_recv(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep);
744 
745 int rxm_ep_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
746 			enum fi_op op, struct fi_atomic_attr *attr,
747 			uint64_t flags);
748 
rxm_ep_max_atomic_size(struct fi_info * info)749 static inline size_t rxm_ep_max_atomic_size(struct fi_info *info)
750 {
751 	size_t overhead = sizeof(struct rxm_atomic_hdr) +
752 			  sizeof(struct rxm_pkt);
753 
754 	/* Must be set to eager size or less */
755 	return (info->tx_attr && info->tx_attr->inject_size > overhead) ?
756 		info->tx_attr->inject_size - overhead : 0;
757 }
758 
759 static inline ssize_t
rxm_atomic_send_respmsg(struct rxm_ep * rxm_ep,struct rxm_conn * conn,struct rxm_tx_atomic_buf * resp_buf,ssize_t len)760 rxm_atomic_send_respmsg(struct rxm_ep *rxm_ep, struct rxm_conn *conn,
761 			struct rxm_tx_atomic_buf *resp_buf, ssize_t len)
762 {
763 	struct iovec iov = {
764 		.iov_base = (void *) &resp_buf->pkt,
765 		.iov_len = len,
766 	};
767 	struct fi_msg msg = {
768 		.msg_iov = &iov,
769 		.desc = &resp_buf->hdr.desc,
770 		.iov_count = 1,
771 		.context = resp_buf,
772 		.data = 0,
773 	};
774 	return fi_sendmsg(conn->msg_ep, &msg, FI_COMPLETION);
775 }
776 
rxm_needs_atomic_progress(const struct fi_info * info)777 static inline int rxm_needs_atomic_progress(const struct fi_info *info)
778 {
779 	return (info->caps & FI_ATOMIC) && info->domain_attr &&
780 			info->domain_attr->data_progress == FI_PROGRESS_AUTO;
781 }
782 
rxm_key2conn(struct rxm_ep * rxm_ep,uint64_t key)783 static inline struct rxm_conn *rxm_key2conn(struct rxm_ep *rxm_ep, uint64_t key)
784 {
785 	return (struct rxm_conn *)rxm_cmap_key2handle(rxm_ep->cmap, key);
786 }
787 
788 void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep,
789 				    struct rxm_conn *rxm_conn);
790 
791 struct rxm_deferred_tx_entry *
792 rxm_ep_alloc_deferred_tx_entry(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn,
793 			       enum rxm_deferred_tx_entry_type type);
794 
795 static inline void
rxm_ep_enqueue_deferred_tx_queue(struct rxm_deferred_tx_entry * tx_entry)796 rxm_ep_enqueue_deferred_tx_queue(struct rxm_deferred_tx_entry *tx_entry)
797 {
798 	if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue))
799 		dlist_insert_tail(&tx_entry->rxm_conn->deferred_conn_entry,
800 				  &tx_entry->rxm_ep->deferred_tx_conn_queue);
801 	dlist_insert_tail(&tx_entry->entry, &tx_entry->rxm_conn->deferred_tx_queue);
802 }
803 
804 static inline void
rxm_ep_dequeue_deferred_tx_queue(struct rxm_deferred_tx_entry * tx_entry)805 rxm_ep_dequeue_deferred_tx_queue(struct rxm_deferred_tx_entry *tx_entry)
806 {
807 	dlist_remove_init(&tx_entry->entry);
808 	if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue))
809 		dlist_remove(&tx_entry->rxm_conn->deferred_conn_entry);
810 }
811 
812 int rxm_conn_process_eq_events(struct rxm_ep *rxm_ep);
813 
814 void rxm_msg_mr_closev(struct fid_mr **mr, size_t count);
815 int rxm_msg_mr_regv(struct rxm_ep *rxm_ep, const struct iovec *iov,
816 		    size_t count, size_t reg_limit, uint64_t access,
817 		    struct fid_mr **mr);
818 int rxm_msg_mr_reg_internal(struct rxm_domain *rxm_domain, const void *buf,
819 			    size_t len, uint64_t acs, uint64_t flags,
820 			    struct fid_mr **mr);
821 
rxm_cntr_incerr(struct util_cntr * cntr)822 static inline void rxm_cntr_incerr(struct util_cntr *cntr)
823 {
824 	if (cntr)
825 		cntr->cntr_fid.ops->adderr(&cntr->cntr_fid, 1);
826 }
827 
828 
829 
rxm_cq_log_comp(uint64_t flags)830 static inline void rxm_cq_log_comp(uint64_t flags)
831 {
832 #if ENABLE_DEBUG
833 	FI_DBG(&rxm_prov, FI_LOG_CQ, "Reporting %s completion\n",
834 	       fi_tostr((void *)&flags, FI_TYPE_CQ_EVENT_FLAGS));
835 #else
836 	/* NOP */
837 #endif
838 }
839 
840 static inline ssize_t
rxm_ep_prepare_tx(struct rxm_ep * rxm_ep,fi_addr_t dest_addr,struct rxm_conn ** rxm_conn)841 rxm_ep_prepare_tx(struct rxm_ep *rxm_ep, fi_addr_t dest_addr,
842 		  struct rxm_conn **rxm_conn)
843 {
844 	ssize_t ret;
845 
846 	assert(rxm_ep->util_ep.tx_cq);
847 	*rxm_conn = (struct rxm_conn *)rxm_cmap_acquire_handle(rxm_ep->cmap,
848 							       dest_addr);
849 	if (OFI_UNLIKELY(!*rxm_conn))
850 		return -FI_EHOSTUNREACH;
851 
852 	if (OFI_UNLIKELY((*rxm_conn)->handle.state != RXM_CMAP_CONNECTED)) {
853 		ret = rxm_cmap_connect(rxm_ep, dest_addr, &(*rxm_conn)->handle);
854 		if (ret)
855 			return ret;
856 	}
857 
858 	if (OFI_UNLIKELY(!dlist_empty(&(*rxm_conn)->deferred_tx_queue))) {
859 		rxm_ep_do_progress(&rxm_ep->util_ep);
860 		if (!dlist_empty(&(*rxm_conn)->deferred_tx_queue))
861 			return -FI_EAGAIN;
862 	}
863 	return 0;
864 }
865 
866 static inline void
rxm_ep_format_tx_buf_pkt(struct rxm_conn * rxm_conn,size_t len,uint8_t op,uint64_t data,uint64_t tag,uint64_t flags,struct rxm_pkt * pkt)867 rxm_ep_format_tx_buf_pkt(struct rxm_conn *rxm_conn, size_t len, uint8_t op,
868 			 uint64_t data, uint64_t tag, uint64_t flags,
869 			 struct rxm_pkt *pkt)
870 {
871 	pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
872 	pkt->hdr.size = len;
873 	pkt->hdr.op = op;
874 	pkt->hdr.tag = tag;
875 	pkt->hdr.flags = (flags & FI_REMOTE_CQ_DATA);
876 	pkt->hdr.data = data;
877 }
878 
879 
880 static inline struct rxm_buf *
rxm_tx_buf_alloc(struct rxm_ep * rxm_ep,enum rxm_buf_pool_type type)881 rxm_tx_buf_alloc(struct rxm_ep *rxm_ep, enum rxm_buf_pool_type type)
882 {
883 	assert((type == RXM_BUF_POOL_TX) ||
884 	       (type == RXM_BUF_POOL_TX_INJECT) ||
885 	       (type == RXM_BUF_POOL_TX_ACK) ||
886 	       (type == RXM_BUF_POOL_TX_RNDV) ||
887 	       (type == RXM_BUF_POOL_TX_ATOMIC) ||
888 	       (type == RXM_BUF_POOL_TX_SAR));
889 	return ofi_buf_alloc(rxm_ep->buf_pools[type].pool);
890 }
891 
892 
893 static inline struct rxm_rx_buf *
rxm_rx_buf_alloc(struct rxm_ep * rxm_ep,struct fid_ep * msg_ep,uint8_t repost)894 rxm_rx_buf_alloc(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep, uint8_t repost)
895 {
896 	struct rxm_rx_buf *rx_buf =
897 		ofi_buf_alloc(rxm_ep->buf_pools[RXM_BUF_POOL_RX].pool);
898 	if (OFI_LIKELY((long int)rx_buf)) {
899 		assert(rx_buf->ep == rxm_ep);
900 		rx_buf->hdr.state = RXM_RX;
901 		rx_buf->msg_ep = msg_ep;
902 		rx_buf->repost = repost;
903 
904 		if (!rxm_ep->srx_ctx)
905 			rx_buf->conn = container_of(msg_ep->fid.context,
906 						    struct rxm_conn, handle);
907 	}
908 	return rx_buf;
909 }
910 
911 static inline void
rxm_rx_buf_free(struct rxm_rx_buf * rx_buf)912 rxm_rx_buf_free(struct rxm_rx_buf *rx_buf)
913 {
914 	if (rx_buf->repost) {
915 		dlist_insert_tail(&rx_buf->repost_entry,
916 				  &rx_buf->ep->repost_ready_list);
917 	} else {
918 		ofi_buf_free(rx_buf);
919 	}
920 }
921 
rxm_rma_buf_alloc(struct rxm_ep * rxm_ep)922 static inline struct rxm_rma_buf *rxm_rma_buf_alloc(struct rxm_ep *rxm_ep)
923 {
924 	return (struct rxm_rma_buf *)
925 		ofi_buf_alloc(rxm_ep->buf_pools[RXM_BUF_POOL_RMA].pool);
926 }
927 
928 static inline
rxm_tx_atomic_buf_alloc(struct rxm_ep * rxm_ep)929 struct rxm_tx_atomic_buf *rxm_tx_atomic_buf_alloc(struct rxm_ep *rxm_ep)
930 {
931 	return (struct rxm_tx_atomic_buf *)
932 		rxm_tx_buf_alloc(rxm_ep, RXM_BUF_POOL_TX_ATOMIC);
933 }
934 
935 static inline void
rxm_recv_entry_release(struct rxm_recv_queue * queue,struct rxm_recv_entry * entry)936 rxm_recv_entry_release(struct rxm_recv_queue *queue, struct rxm_recv_entry *entry)
937 {
938 	entry->total_len = 0;
939 	freestack_push(queue->fs, entry);
940 }
941 
rxm_cq_write_recv_comp(struct rxm_rx_buf * rx_buf,void * context,uint64_t flags,size_t len,char * buf)942 static inline int rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf,
943 					 void *context, uint64_t flags,
944 					 size_t len, char *buf)
945 {
946 	if (rx_buf->ep->rxm_info->caps & FI_SOURCE)
947 		return ofi_cq_write_src(rx_buf->ep->util_ep.rx_cq, context,
948 					flags, len, buf, rx_buf->pkt.hdr.data,
949 					rx_buf->pkt.hdr.tag,
950 					rx_buf->conn->handle.fi_addr);
951 	else
952 		return ofi_cq_write(rx_buf->ep->util_ep.rx_cq, context,
953 				    flags, len, buf, rx_buf->pkt.hdr.data,
954 				    rx_buf->pkt.hdr.tag);
955 }
956 
957 #endif
958