1
2 /*
3 * Copyright (c) 2016 Intel Corporation, Inc. All rights reserved.
4 * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * BSD license below:
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
18 * disclaimer.
19 *
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 */
34
35 #if HAVE_CONFIG_H
36 # include <config.h>
37 #endif /* HAVE_CONFIG_H */
38
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <string.h>
42
43 #include <rdma/fabric.h>
44 #include <rdma/fi_errno.h>
45 #include <rdma/fi_cm.h>
46 #include <rdma/fi_domain.h>
47 #include <rdma/fi_endpoint.h>
48 #include <rdma/fi_eq.h>
49
50 #include <ofi.h>
51 #include <ofi_enosys.h>
52 #include <ofi_util.h>
53 #include <ofi_list.h>
54 #include <ofi_proto.h>
55 #include <ofi_iov.h>
56
57 #ifndef _RXM_H_
58 #define _RXM_H_
59
60
61 #define RXM_CM_DATA_VERSION 1
62 #define RXM_OP_VERSION 3
63 #define RXM_CTRL_VERSION 4
64
65 #define RXM_BUF_SIZE 16384
66 extern size_t rxm_eager_limit;
67
68 #define RXM_SAR_LIMIT 131072
69 #define RXM_SAR_TX_ERROR UINT64_MAX
70 #define RXM_SAR_RX_INIT UINT64_MAX
71
72 #define RXM_IOV_LIMIT 4
73
74 #define RXM_MR_MODES (OFI_MR_BASIC_MAP | FI_MR_LOCAL)
75
76 #define RXM_PASSTHRU_TX_OP_FLAGS (FI_TRANSMIT_COMPLETE)
77
78 #define RXM_PASSTHRU_RX_OP_FLAGS 0ULL
79
80 #define RXM_TX_OP_FLAGS (FI_INJECT | FI_INJECT_COMPLETE | \
81 FI_DELIVERY_COMPLETE | FI_COMPLETION)
82 #define RXM_RX_OP_FLAGS (FI_MULTI_RECV | FI_COMPLETION)
83
84 #define RXM_MR_VIRT_ADDR(info) ((info->domain_attr->mr_mode == FI_MR_BASIC) ||\
85 info->domain_attr->mr_mode & FI_MR_VIRT_ADDR)
86
87 #define RXM_MR_PROV_KEY(info) ((info->domain_attr->mr_mode == FI_MR_BASIC) ||\
88 info->domain_attr->mr_mode & FI_MR_PROV_KEY)
89
90 #define RXM_UPDATE_STATE(subsystem, buf, new_state) \
91 do { \
92 FI_DBG(&rxm_prov, subsystem, "[PROTO] msg_id: 0x%" \
93 PRIx64 " %s -> %s\n", (buf)->pkt.ctrl_hdr.msg_id,\
94 rxm_proto_state_str[(buf)->hdr.state], \
95 rxm_proto_state_str[new_state]); \
96 (buf)->hdr.state = new_state; \
97 } while (0)
98
99 #define RXM_DBG_ADDR_TAG(subsystem, log_str, addr, tag) \
100 FI_DBG(&rxm_prov, subsystem, log_str \
101 " (fi_addr: 0x%" PRIx64 " tag: 0x%" PRIx64 ")\n",\
102 addr, tag)
103
104 #define RXM_GET_PROTO_STATE(context) \
105 (*(enum rxm_proto_state *) \
106 ((unsigned char *)context + offsetof(struct rxm_buf, state)))
107
108 #define RXM_SET_PROTO_STATE(comp, new_state) \
109 do { \
110 (*(enum rxm_proto_state *) \
111 ((unsigned char *)(comp)->op_context + \
112 offsetof(struct rxm_buf, state))) = (new_state); \
113 } while (0)
114
115 #define rxm_tx_buf_2_msg_id(rxm_ep, pool_type, tx_buf) \
116 ((uint64_t) rxm_get_buf_index(&(rxm_ep)->buf_pools[pool_type], \
117 (void *) tx_buf))
118 #define rxm_msg_id_2_tx_buf(rxm_ep, pool_type, msg_id) \
119 ((void *) rxm_buf_get_by_index(&(rxm_ep)->buf_pools[pool_type], \
120 (uint64_t) msg_id))
121
122 extern struct fi_provider rxm_prov;
123 extern struct util_prov rxm_util_prov;
124 extern struct fi_ops_rma rxm_ops_rma;
125 extern struct fi_ops_atomic rxm_ops_atomic;
126
127 extern size_t rxm_msg_tx_size;
128 extern size_t rxm_msg_rx_size;
129 extern size_t rxm_def_univ_size;
130 extern size_t rxm_cm_progress_interval;
131 extern size_t rxm_cq_eq_fairness;
132 extern int force_auto_progress;
133 extern enum fi_wait_obj def_wait_obj;
134
135 struct rxm_ep;
136
137
138 /*
139 * Connection Map
140 */
141
142 #define RXM_CMAP_IDX_BITS OFI_IDX_INDEX_BITS
143
144 enum rxm_cmap_signal {
145 RXM_CMAP_UNSPEC,
146 RXM_CMAP_FREE,
147 RXM_CMAP_EXIT,
148 };
149
150 #define RXM_CM_STATES(FUNC) \
151 FUNC(RXM_CMAP_IDLE), \
152 FUNC(RXM_CMAP_CONNREQ_SENT), \
153 FUNC(RXM_CMAP_CONNREQ_RECV), \
154 FUNC(RXM_CMAP_CONNECTED), \
155 FUNC(RXM_CMAP_SHUTDOWN), \
156
157 enum rxm_cmap_state {
158 RXM_CM_STATES(OFI_ENUM_VAL)
159 };
160
161 extern char *rxm_cm_state_str[];
162
163 #define RXM_CM_UPDATE_STATE(handle, new_state) \
164 do { \
165 FI_DBG(&rxm_prov, FI_LOG_EP_CTRL, "[CM] handle: " \
166 "%p %s -> %s\n", handle, \
167 rxm_cm_state_str[handle->state], \
168 rxm_cm_state_str[new_state]); \
169 handle->state = new_state; \
170 } while (0)
171
172 struct rxm_cmap_handle {
173 struct rxm_cmap *cmap;
174 enum rxm_cmap_state state;
175 /* Unique identifier for a connection. Can be exchanged with a peer
176 * during connection setup and can later be used in a message header
177 * to identify the source of the message (Used for FI_SOURCE, RNDV
178 * protocol, etc.) */
179 uint64_t key;
180 uint64_t remote_key;
181 fi_addr_t fi_addr;
182 struct rxm_cmap_peer *peer;
183 };
184
185 struct rxm_cmap_peer {
186 struct rxm_cmap_handle *handle;
187 struct dlist_entry entry;
188 uint8_t addr[];
189 };
190
191 struct rxm_cmap_attr {
192 void *name;
193 };
194
195 struct rxm_cmap {
196 struct rxm_ep *ep;
197 struct util_av *av;
198
199 /* cmap handles that correspond to addresses in AV */
200 struct rxm_cmap_handle **handles_av;
201 size_t num_allocated;
202
203 /* Store all cmap handles (inclusive of handles_av) in an indexer.
204 * This allows reverse lookup of the handle using the index. */
205 struct indexer handles_idx;
206
207 struct ofi_key_idx key_idx;
208
209 struct dlist_entry peer_list;
210 struct rxm_cmap_attr attr;
211 pthread_t cm_thread;
212 ofi_fastlock_acquire_t acquire;
213 ofi_fastlock_release_t release;
214 fastlock_t lock;
215 };
216
217 enum rxm_cmap_reject_reason {
218 RXM_CMAP_REJECT_UNSPEC,
219 RXM_CMAP_REJECT_GENUINE,
220 RXM_CMAP_REJECT_SIMULT_CONN,
221 };
222
223 union rxm_cm_data {
224 struct _connect {
225 uint8_t version;
226 uint8_t endianness;
227 uint8_t ctrl_version;
228 uint8_t op_version;
229 uint16_t port;
230 uint8_t padding[2];
231 uint32_t eager_size;
232 uint32_t rx_size;
233 uint64_t client_conn_id;
234 } connect;
235
236 struct _accept {
237 uint64_t server_conn_id;
238 uint32_t rx_size;
239 } accept;
240
241 struct _reject {
242 uint8_t version;
243 uint8_t reason;
244 } reject;
245 };
246
247 struct rxm_cmap_handle *rxm_cmap_key2handle(struct rxm_cmap *cmap, uint64_t key);
248 int rxm_cmap_update(struct rxm_cmap *cmap, const void *addr, fi_addr_t fi_addr);
249
250 void rxm_cmap_process_reject(struct rxm_cmap *cmap,
251 struct rxm_cmap_handle *handle,
252 enum rxm_cmap_reject_reason cm_reject_reason);
253 void rxm_cmap_process_shutdown(struct rxm_cmap *cmap,
254 struct rxm_cmap_handle *handle);
255 int rxm_cmap_connect(struct rxm_ep *rxm_ep, fi_addr_t fi_addr,
256 struct rxm_cmap_handle *handle);
257 void rxm_cmap_del_handle_ts(struct rxm_cmap_handle *handle);
258 void rxm_cmap_free(struct rxm_cmap *cmap);
259 int rxm_cmap_alloc(struct rxm_ep *rxm_ep, struct rxm_cmap_attr *attr);
260 int rxm_cmap_remove(struct rxm_cmap *cmap, int index);
261 int rxm_msg_eq_progress(struct rxm_ep *rxm_ep);
262
263 static inline struct rxm_cmap_handle *
rxm_cmap_acquire_handle(struct rxm_cmap * cmap,fi_addr_t fi_addr)264 rxm_cmap_acquire_handle(struct rxm_cmap *cmap, fi_addr_t fi_addr)
265 {
266 assert(fi_addr < cmap->num_allocated);
267 return cmap->handles_av[fi_addr];
268 }
269
270 struct rxm_fabric {
271 struct util_fabric util_fabric;
272 struct fid_fabric *msg_fabric;
273 };
274
275 struct rxm_domain {
276 struct util_domain util_domain;
277 struct fid_domain *msg_domain;
278 size_t max_atomic_size;
279 uint64_t mr_key;
280 uint8_t mr_local;
281 };
282
283 int rxm_av_open(struct fid_domain *domain_fid, struct fi_av_attr *attr,
284 struct fid_av **av, void *context);
285
286 struct rxm_mr {
287 struct fid_mr mr_fid;
288 struct fid_mr *msg_mr;
289 struct rxm_domain *domain;
290 };
291
292 struct rxm_rndv_hdr {
293 struct ofi_rma_iov iov[RXM_IOV_LIMIT];
294 uint8_t count;
295 };
296
297 #define rxm_pkt_rndv_data(rxm_pkt) \
298 ((rxm_pkt)->data + sizeof(struct rxm_rndv_hdr))
299
300 struct rxm_atomic_hdr {
301 struct fi_rma_ioc rma_ioc[RXM_IOV_LIMIT];
302 char data[];
303 };
304
305 struct rxm_atomic_resp_hdr {
306 int32_t status;
307 uint32_t result_len;
308 char data[];
309 };
310
311 /*
312 * Macros to generate enums and associated string values
313 * e.g.
314 * #define RXM_PROTO_STATES(FUNC) \
315 * FUNC(STATE1), \
316 * FUNC(STATE2), \
317 * ... \
318 * FUNC(STATEn)
319 *
320 * enum rxm_state {
321 * RXM_PROTO_STATES(OFI_ENUM_VAL)
322 * };
323 *
324 * char *rxm_state_str[] = {
325 * RXM_PROTO_STATES(OFI_STR)
326 * };
327 */
328
329 /* RXM protocol states / tx/rx context */
330 #define RXM_PROTO_STATES(FUNC) \
331 FUNC(RXM_TX), \
332 FUNC(RXM_INJECT_TX), \
333 FUNC(RXM_RMA), \
334 FUNC(RXM_RX), \
335 FUNC(RXM_SAR_TX), \
336 FUNC(RXM_RNDV_TX), \
337 FUNC(RXM_RNDV_ACK_WAIT), \
338 FUNC(RXM_RNDV_READ), \
339 FUNC(RXM_RNDV_ACK_SENT), \
340 FUNC(RXM_RNDV_ACK_RECVD), \
341 FUNC(RXM_RNDV_FINISH), \
342 FUNC(RXM_ATOMIC_RESP_WAIT), \
343 FUNC(RXM_ATOMIC_RESP_SENT)
344
345 enum rxm_proto_state {
346 RXM_PROTO_STATES(OFI_ENUM_VAL)
347 };
348
349 extern char *rxm_proto_state_str[];
350
351 enum {
352 rxm_ctrl_eager,
353 rxm_ctrl_seg,
354 rxm_ctrl_rndv,
355 rxm_ctrl_rndv_ack,
356 rxm_ctrl_atomic,
357 rxm_ctrl_atomic_resp,
358 };
359
360 struct rxm_pkt {
361 struct ofi_ctrl_hdr ctrl_hdr;
362 struct ofi_op_hdr hdr;
363 char data[];
364 };
365
366 union rxm_sar_ctrl_data {
367 struct {
368 enum rxm_sar_seg_type {
369 RXM_SAR_SEG_FIRST = 1,
370 RXM_SAR_SEG_MIDDLE = 2,
371 RXM_SAR_SEG_LAST = 3,
372 } seg_type : 2;
373 uint32_t offset;
374 };
375 uint64_t align;
376 };
377
378 static inline enum rxm_sar_seg_type
rxm_sar_get_seg_type(struct ofi_ctrl_hdr * ctrl_hdr)379 rxm_sar_get_seg_type(struct ofi_ctrl_hdr *ctrl_hdr)
380 {
381 return ((union rxm_sar_ctrl_data *)&(ctrl_hdr->ctrl_data))->seg_type;
382 }
383
384 static inline void
rxm_sar_set_seg_type(struct ofi_ctrl_hdr * ctrl_hdr,enum rxm_sar_seg_type seg_type)385 rxm_sar_set_seg_type(struct ofi_ctrl_hdr *ctrl_hdr, enum rxm_sar_seg_type seg_type)
386 {
387 ((union rxm_sar_ctrl_data *)&(ctrl_hdr->ctrl_data))->seg_type = seg_type;
388 }
389
390 struct rxm_recv_match_attr {
391 fi_addr_t addr;
392 uint64_t tag;
393 uint64_t ignore;
394 };
395
396 struct rxm_unexp_msg {
397 struct dlist_entry entry;
398 fi_addr_t addr;
399 uint64_t tag;
400 };
401
402 struct rxm_iov {
403 struct iovec iov[RXM_IOV_LIMIT];
404 void *desc[RXM_IOV_LIMIT];
405 uint8_t count;
406 };
407
408 enum rxm_buf_pool_type {
409 RXM_BUF_POOL_RX = 0,
410 RXM_BUF_POOL_START = RXM_BUF_POOL_RX,
411 RXM_BUF_POOL_TX,
412 RXM_BUF_POOL_TX_START = RXM_BUF_POOL_TX,
413 RXM_BUF_POOL_TX_INJECT,
414 RXM_BUF_POOL_TX_ACK,
415 RXM_BUF_POOL_TX_RNDV,
416 RXM_BUF_POOL_TX_ATOMIC,
417 RXM_BUF_POOL_TX_SAR,
418 RXM_BUF_POOL_TX_END = RXM_BUF_POOL_TX_SAR,
419 RXM_BUF_POOL_RMA,
420 RXM_BUF_POOL_MAX,
421 };
422
423 struct rxm_buf {
424 /* Must stay at top */
425 struct fi_context fi_context;
426
427 enum rxm_proto_state state;
428
429 void *desc;
430 };
431
432 struct rxm_rx_buf {
433 /* Must stay at top */
434 struct rxm_buf hdr;
435
436 struct rxm_ep *ep;
437 /* MSG EP / shared context to which bufs would be posted to */
438 struct fid_ep *msg_ep;
439 struct dlist_entry repost_entry;
440 struct rxm_conn *conn;
441 struct rxm_recv_entry *recv_entry;
442 struct rxm_unexp_msg unexp_msg;
443 uint64_t comp_flags;
444 struct fi_recv_context recv_context;
445 // TODO remove this and modify unexp msg handling path to not repost
446 // rx_buf
447 uint8_t repost;
448
449 /* Used for large messages */
450 struct rxm_rndv_hdr *rndv_hdr;
451 size_t rndv_rma_index;
452 struct fid_mr *mr[RXM_IOV_LIMIT];
453
454 /* Must stay at bottom */
455 struct rxm_pkt pkt;
456 };
457
458 struct rxm_tx_base_buf {
459 /* Must stay at top */
460 struct rxm_buf hdr;
461
462 /* Must stay at bottom */
463 struct rxm_pkt pkt;
464 };
465
466 struct rxm_tx_eager_buf {
467 /* Must stay at top */
468 struct rxm_buf hdr;
469
470 void *app_context;
471 uint64_t flags;
472
473 /* Must stay at bottom */
474 struct rxm_pkt pkt;
475 };
476
477 struct rxm_tx_sar_buf {
478 /* Must stay at top */
479 struct rxm_buf hdr;
480
481 void *app_context;
482 uint64_t flags;
483
484 /* Must stay at bottom */
485 struct rxm_pkt pkt;
486 };
487
488 struct rxm_tx_rndv_buf {
489 /* Must stay at top */
490 struct rxm_buf hdr;
491
492 void *app_context;
493 uint64_t flags;
494 struct fid_mr *mr[RXM_IOV_LIMIT];
495 uint8_t count;
496
497 /* Must stay at bottom */
498 struct rxm_pkt pkt;
499 };
500
501 struct rxm_rma_buf {
502 /* Must stay at top */
503 struct rxm_buf hdr;
504
505 void *app_context;
506 uint64_t flags;
507
508 struct {
509 struct fid_mr *mr[RXM_IOV_LIMIT];
510 uint8_t count;
511 } mr;
512 /* Must stay at bottom */
513 struct rxm_pkt pkt;
514 };
515
516 struct rxm_tx_atomic_buf {
517 /* Must stay at top */
518 struct rxm_buf hdr;
519
520 void *app_context;
521 uint64_t flags;
522 struct iovec result_iov[RXM_IOV_LIMIT];
523 uint8_t result_iov_count;
524
525 /* Must stay at bottom */
526 struct rxm_pkt pkt;
527 };
528
529 enum rxm_deferred_tx_entry_type {
530 RXM_DEFERRED_TX_RNDV_ACK,
531 RXM_DEFERRED_TX_RNDV_READ,
532 RXM_DEFERRED_TX_SAR_SEG,
533 RXM_DEFERRED_TX_ATOMIC_RESP,
534 };
535
536 struct rxm_deferred_tx_entry {
537 struct rxm_ep *rxm_ep;
538 struct rxm_conn *rxm_conn;
539 struct dlist_entry entry;
540 enum rxm_deferred_tx_entry_type type;
541
542 union {
543 struct {
544 struct rxm_rx_buf *rx_buf;
545 } rndv_ack;
546 struct {
547 struct rxm_rx_buf *rx_buf;
548 struct fi_rma_iov rma_iov;
549 struct rxm_iov rxm_iov;
550 } rndv_read;
551 struct {
552 struct rxm_tx_sar_buf *cur_seg_tx_buf;
553 struct {
554 struct iovec iov[RXM_IOV_LIMIT];
555 uint8_t count;
556 size_t cur_iov_offset;
557 uint64_t data;
558 uint64_t tag;
559 } payload;
560 size_t next_seg_no;
561 size_t segs_cnt;
562 uint8_t op;
563 size_t total_len;
564 size_t remain_len;
565 uint64_t msg_id;
566 void *app_context;
567 uint64_t flags;
568 } sar_seg;
569 struct {
570 struct rxm_tx_atomic_buf *tx_buf;
571 ssize_t len;
572 } atomic_resp;
573 };
574 };
575
576 struct rxm_recv_entry {
577 struct dlist_entry entry;
578 struct rxm_iov rxm_iov;
579 fi_addr_t addr;
580 void *context;
581 uint64_t flags;
582 uint64_t tag;
583 uint64_t ignore;
584 uint64_t comp_flags;
585 size_t total_len;
586 struct rxm_recv_queue *recv_queue;
587
588 /* Used for SAR protocol */
589 struct {
590 struct dlist_entry entry;
591 size_t total_recv_len;
592 struct rxm_conn *conn;
593 uint64_t msg_id;
594 } sar;
595 /* Used for Rendezvous protocol */
596 struct {
597 /* This is used to send RNDV ACK */
598 struct rxm_tx_base_buf *tx_buf;
599 } rndv;
600 };
601 DECLARE_FREESTACK(struct rxm_recv_entry, rxm_recv_fs);
602
603 enum rxm_recv_queue_type {
604 RXM_RECV_QUEUE_UNSPEC,
605 RXM_RECV_QUEUE_MSG,
606 RXM_RECV_QUEUE_TAGGED,
607 };
608
609 struct rxm_recv_queue {
610 struct rxm_ep *rxm_ep;
611 enum rxm_recv_queue_type type;
612 struct rxm_recv_fs *fs;
613 struct dlist_entry recv_list;
614 struct dlist_entry unexp_msg_list;
615 dlist_func_t *match_recv;
616 dlist_func_t *match_unexp;
617 };
618
619 struct rxm_buf_pool {
620 enum rxm_buf_pool_type type;
621 struct ofi_bufpool *pool;
622 struct rxm_ep *rxm_ep;
623 };
624
625 struct rxm_msg_eq_entry {
626 ssize_t rd;
627 uint32_t event;
628 /* Used for connection refusal */
629 void *context;
630 struct fi_eq_err_entry err_entry;
631 /* must stay at the bottom */
632 struct fi_eq_cm_entry cm_entry;
633 };
634
635 #define RXM_MSG_EQ_ENTRY_SZ (sizeof(struct rxm_msg_eq_entry) + \
636 sizeof(union rxm_cm_data))
637 #define RXM_CM_ENTRY_SZ (sizeof(struct fi_eq_cm_entry) + \
638 sizeof(union rxm_cm_data))
639
640 struct rxm_handle_txrx_ops {
641 int (*comp_eager_tx)(struct rxm_ep *rxm_ep,
642 struct rxm_tx_eager_buf *tx_eager_buf);
643 ssize_t (*handle_eager_rx)(struct rxm_rx_buf *rx_buf);
644 ssize_t (*handle_rndv_rx)(struct rxm_rx_buf *rx_buf);
645 ssize_t (*handle_seg_data_rx)(struct rxm_rx_buf *rx_buf);
646 };
647
648 struct rxm_ep {
649 struct util_ep util_ep;
650 struct fi_info *rxm_info;
651 struct fi_info *msg_info;
652 struct rxm_cmap *cmap;
653 struct fid_pep *msg_pep;
654 struct fid_eq *msg_eq;
655 struct fid_cq *msg_cq;
656 uint64_t msg_cq_last_poll;
657 struct fid_ep *srx_ctx;
658 size_t comp_per_progress;
659 ofi_atomic32_t atomic_tx_credits;
660 int cq_eq_fairness;
661
662 bool msg_mr_local;
663 bool rdm_mr_local;
664 bool do_progress;
665
666 size_t min_multi_recv_size;
667 size_t buffered_min;
668 size_t buffered_limit;
669 size_t inject_limit;
670 size_t eager_limit;
671 size_t sar_limit;
672
673 struct rxm_buf_pool *buf_pools;
674
675 struct dlist_entry repost_ready_list;
676 struct dlist_entry deferred_tx_conn_queue;
677
678 struct rxm_recv_queue recv_queue;
679 struct rxm_recv_queue trecv_queue;
680
681 struct rxm_handle_txrx_ops *txrx_ops;
682 };
683
684 struct rxm_conn {
685 /* This should stay at the top */
686 struct rxm_cmap_handle handle;
687
688 struct fid_ep *msg_ep;
689
690 /* This is used only in non-FI_THREAD_SAFE case */
691 struct rxm_pkt *inject_pkt;
692 struct rxm_pkt *inject_data_pkt;
693 struct rxm_pkt *tinject_pkt;
694 struct rxm_pkt *tinject_data_pkt;
695
696 struct dlist_entry deferred_conn_entry;
697 struct dlist_entry deferred_tx_queue;
698 struct dlist_entry sar_rx_msg_list;
699 struct dlist_entry sar_deferred_rx_msg_list;
700
701 uint32_t rndv_tx_credits;
702 };
703
704 extern struct fi_provider rxm_prov;
705 extern struct fi_info rxm_info;
706 extern struct fi_fabric_attr rxm_fabric_attr;
707 extern struct fi_domain_attr rxm_domain_attr;
708 extern struct fi_tx_attr rxm_tx_attr;
709 extern struct fi_rx_attr rxm_rx_attr;
710
711 int rxm_fabric(struct fi_fabric_attr *attr, struct fid_fabric **fabric,
712 void *context);
713 int rxm_info_to_core(uint32_t version, const struct fi_info *rxm_info,
714 struct fi_info *core_info);
715 int rxm_info_to_rxm(uint32_t version, const struct fi_info *core_info,
716 struct fi_info *info);
717 int rxm_domain_open(struct fid_fabric *fabric, struct fi_info *info,
718 struct fid_domain **dom, void *context);
719 int rxm_cq_open(struct fid_domain *domain, struct fi_cq_attr *attr,
720 struct fid_cq **cq_fid, void *context);
721 ssize_t rxm_cq_handle_rx_buf(struct rxm_rx_buf *rx_buf);
722
723 int rxm_endpoint(struct fid_domain *domain, struct fi_info *info,
724 struct fid_ep **ep, void *context);
725
726 int rxm_conn_cmap_alloc(struct rxm_ep *rxm_ep);
727 void rxm_cq_write_error(struct util_cq *cq, struct util_cntr *cntr,
728 void *op_context, int err);
729 void rxm_cq_write_error_all(struct rxm_ep *rxm_ep, int err);
730 void rxm_cq_read_write_error(struct rxm_ep *rxm_ep);
731 ssize_t rxm_cq_handle_comp(struct rxm_ep *rxm_ep, struct fi_cq_data_entry *comp);
732 void rxm_ep_progress(struct util_ep *util_ep);
733 void rxm_ep_progress_coll(struct util_ep *util_ep);
734 void rxm_ep_do_progress(struct util_ep *util_ep);
735
736 ssize_t rxm_cq_handle_eager(struct rxm_rx_buf *rx_buf);
737 ssize_t rxm_cq_handle_coll_eager(struct rxm_rx_buf *rx_buf);
738 ssize_t rxm_cq_handle_rndv(struct rxm_rx_buf *rx_buf);
739 ssize_t rxm_cq_handle_seg_data(struct rxm_rx_buf *rx_buf);
740 int rxm_finish_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_eager_buf);
741 int rxm_finish_coll_eager_send(struct rxm_ep *rxm_ep, struct rxm_tx_eager_buf *tx_eager_buf);
742
743 int rxm_msg_ep_prepost_recv(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep);
744
745 int rxm_ep_query_atomic(struct fid_domain *domain, enum fi_datatype datatype,
746 enum fi_op op, struct fi_atomic_attr *attr,
747 uint64_t flags);
748
rxm_ep_max_atomic_size(struct fi_info * info)749 static inline size_t rxm_ep_max_atomic_size(struct fi_info *info)
750 {
751 size_t overhead = sizeof(struct rxm_atomic_hdr) +
752 sizeof(struct rxm_pkt);
753
754 /* Must be set to eager size or less */
755 return (info->tx_attr && info->tx_attr->inject_size > overhead) ?
756 info->tx_attr->inject_size - overhead : 0;
757 }
758
759 static inline ssize_t
rxm_atomic_send_respmsg(struct rxm_ep * rxm_ep,struct rxm_conn * conn,struct rxm_tx_atomic_buf * resp_buf,ssize_t len)760 rxm_atomic_send_respmsg(struct rxm_ep *rxm_ep, struct rxm_conn *conn,
761 struct rxm_tx_atomic_buf *resp_buf, ssize_t len)
762 {
763 struct iovec iov = {
764 .iov_base = (void *) &resp_buf->pkt,
765 .iov_len = len,
766 };
767 struct fi_msg msg = {
768 .msg_iov = &iov,
769 .desc = &resp_buf->hdr.desc,
770 .iov_count = 1,
771 .context = resp_buf,
772 .data = 0,
773 };
774 return fi_sendmsg(conn->msg_ep, &msg, FI_COMPLETION);
775 }
776
rxm_needs_atomic_progress(const struct fi_info * info)777 static inline int rxm_needs_atomic_progress(const struct fi_info *info)
778 {
779 return (info->caps & FI_ATOMIC) && info->domain_attr &&
780 info->domain_attr->data_progress == FI_PROGRESS_AUTO;
781 }
782
rxm_key2conn(struct rxm_ep * rxm_ep,uint64_t key)783 static inline struct rxm_conn *rxm_key2conn(struct rxm_ep *rxm_ep, uint64_t key)
784 {
785 return (struct rxm_conn *)rxm_cmap_key2handle(rxm_ep->cmap, key);
786 }
787
788 void rxm_ep_progress_deferred_queue(struct rxm_ep *rxm_ep,
789 struct rxm_conn *rxm_conn);
790
791 struct rxm_deferred_tx_entry *
792 rxm_ep_alloc_deferred_tx_entry(struct rxm_ep *rxm_ep, struct rxm_conn *rxm_conn,
793 enum rxm_deferred_tx_entry_type type);
794
795 static inline void
rxm_ep_enqueue_deferred_tx_queue(struct rxm_deferred_tx_entry * tx_entry)796 rxm_ep_enqueue_deferred_tx_queue(struct rxm_deferred_tx_entry *tx_entry)
797 {
798 if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue))
799 dlist_insert_tail(&tx_entry->rxm_conn->deferred_conn_entry,
800 &tx_entry->rxm_ep->deferred_tx_conn_queue);
801 dlist_insert_tail(&tx_entry->entry, &tx_entry->rxm_conn->deferred_tx_queue);
802 }
803
804 static inline void
rxm_ep_dequeue_deferred_tx_queue(struct rxm_deferred_tx_entry * tx_entry)805 rxm_ep_dequeue_deferred_tx_queue(struct rxm_deferred_tx_entry *tx_entry)
806 {
807 dlist_remove_init(&tx_entry->entry);
808 if (dlist_empty(&tx_entry->rxm_conn->deferred_tx_queue))
809 dlist_remove(&tx_entry->rxm_conn->deferred_conn_entry);
810 }
811
812 int rxm_conn_process_eq_events(struct rxm_ep *rxm_ep);
813
814 void rxm_msg_mr_closev(struct fid_mr **mr, size_t count);
815 int rxm_msg_mr_regv(struct rxm_ep *rxm_ep, const struct iovec *iov,
816 size_t count, size_t reg_limit, uint64_t access,
817 struct fid_mr **mr);
818 int rxm_msg_mr_reg_internal(struct rxm_domain *rxm_domain, const void *buf,
819 size_t len, uint64_t acs, uint64_t flags,
820 struct fid_mr **mr);
821
rxm_cntr_incerr(struct util_cntr * cntr)822 static inline void rxm_cntr_incerr(struct util_cntr *cntr)
823 {
824 if (cntr)
825 cntr->cntr_fid.ops->adderr(&cntr->cntr_fid, 1);
826 }
827
828
829
rxm_cq_log_comp(uint64_t flags)830 static inline void rxm_cq_log_comp(uint64_t flags)
831 {
832 #if ENABLE_DEBUG
833 FI_DBG(&rxm_prov, FI_LOG_CQ, "Reporting %s completion\n",
834 fi_tostr((void *)&flags, FI_TYPE_CQ_EVENT_FLAGS));
835 #else
836 /* NOP */
837 #endif
838 }
839
840 static inline ssize_t
rxm_ep_prepare_tx(struct rxm_ep * rxm_ep,fi_addr_t dest_addr,struct rxm_conn ** rxm_conn)841 rxm_ep_prepare_tx(struct rxm_ep *rxm_ep, fi_addr_t dest_addr,
842 struct rxm_conn **rxm_conn)
843 {
844 ssize_t ret;
845
846 assert(rxm_ep->util_ep.tx_cq);
847 *rxm_conn = (struct rxm_conn *)rxm_cmap_acquire_handle(rxm_ep->cmap,
848 dest_addr);
849 if (OFI_UNLIKELY(!*rxm_conn))
850 return -FI_EHOSTUNREACH;
851
852 if (OFI_UNLIKELY((*rxm_conn)->handle.state != RXM_CMAP_CONNECTED)) {
853 ret = rxm_cmap_connect(rxm_ep, dest_addr, &(*rxm_conn)->handle);
854 if (ret)
855 return ret;
856 }
857
858 if (OFI_UNLIKELY(!dlist_empty(&(*rxm_conn)->deferred_tx_queue))) {
859 rxm_ep_do_progress(&rxm_ep->util_ep);
860 if (!dlist_empty(&(*rxm_conn)->deferred_tx_queue))
861 return -FI_EAGAIN;
862 }
863 return 0;
864 }
865
866 static inline void
rxm_ep_format_tx_buf_pkt(struct rxm_conn * rxm_conn,size_t len,uint8_t op,uint64_t data,uint64_t tag,uint64_t flags,struct rxm_pkt * pkt)867 rxm_ep_format_tx_buf_pkt(struct rxm_conn *rxm_conn, size_t len, uint8_t op,
868 uint64_t data, uint64_t tag, uint64_t flags,
869 struct rxm_pkt *pkt)
870 {
871 pkt->ctrl_hdr.conn_id = rxm_conn->handle.remote_key;
872 pkt->hdr.size = len;
873 pkt->hdr.op = op;
874 pkt->hdr.tag = tag;
875 pkt->hdr.flags = (flags & FI_REMOTE_CQ_DATA);
876 pkt->hdr.data = data;
877 }
878
879
880 static inline struct rxm_buf *
rxm_tx_buf_alloc(struct rxm_ep * rxm_ep,enum rxm_buf_pool_type type)881 rxm_tx_buf_alloc(struct rxm_ep *rxm_ep, enum rxm_buf_pool_type type)
882 {
883 assert((type == RXM_BUF_POOL_TX) ||
884 (type == RXM_BUF_POOL_TX_INJECT) ||
885 (type == RXM_BUF_POOL_TX_ACK) ||
886 (type == RXM_BUF_POOL_TX_RNDV) ||
887 (type == RXM_BUF_POOL_TX_ATOMIC) ||
888 (type == RXM_BUF_POOL_TX_SAR));
889 return ofi_buf_alloc(rxm_ep->buf_pools[type].pool);
890 }
891
892
893 static inline struct rxm_rx_buf *
rxm_rx_buf_alloc(struct rxm_ep * rxm_ep,struct fid_ep * msg_ep,uint8_t repost)894 rxm_rx_buf_alloc(struct rxm_ep *rxm_ep, struct fid_ep *msg_ep, uint8_t repost)
895 {
896 struct rxm_rx_buf *rx_buf =
897 ofi_buf_alloc(rxm_ep->buf_pools[RXM_BUF_POOL_RX].pool);
898 if (OFI_LIKELY((long int)rx_buf)) {
899 assert(rx_buf->ep == rxm_ep);
900 rx_buf->hdr.state = RXM_RX;
901 rx_buf->msg_ep = msg_ep;
902 rx_buf->repost = repost;
903
904 if (!rxm_ep->srx_ctx)
905 rx_buf->conn = container_of(msg_ep->fid.context,
906 struct rxm_conn, handle);
907 }
908 return rx_buf;
909 }
910
911 static inline void
rxm_rx_buf_free(struct rxm_rx_buf * rx_buf)912 rxm_rx_buf_free(struct rxm_rx_buf *rx_buf)
913 {
914 if (rx_buf->repost) {
915 dlist_insert_tail(&rx_buf->repost_entry,
916 &rx_buf->ep->repost_ready_list);
917 } else {
918 ofi_buf_free(rx_buf);
919 }
920 }
921
rxm_rma_buf_alloc(struct rxm_ep * rxm_ep)922 static inline struct rxm_rma_buf *rxm_rma_buf_alloc(struct rxm_ep *rxm_ep)
923 {
924 return (struct rxm_rma_buf *)
925 ofi_buf_alloc(rxm_ep->buf_pools[RXM_BUF_POOL_RMA].pool);
926 }
927
928 static inline
rxm_tx_atomic_buf_alloc(struct rxm_ep * rxm_ep)929 struct rxm_tx_atomic_buf *rxm_tx_atomic_buf_alloc(struct rxm_ep *rxm_ep)
930 {
931 return (struct rxm_tx_atomic_buf *)
932 rxm_tx_buf_alloc(rxm_ep, RXM_BUF_POOL_TX_ATOMIC);
933 }
934
935 static inline void
rxm_recv_entry_release(struct rxm_recv_queue * queue,struct rxm_recv_entry * entry)936 rxm_recv_entry_release(struct rxm_recv_queue *queue, struct rxm_recv_entry *entry)
937 {
938 entry->total_len = 0;
939 freestack_push(queue->fs, entry);
940 }
941
rxm_cq_write_recv_comp(struct rxm_rx_buf * rx_buf,void * context,uint64_t flags,size_t len,char * buf)942 static inline int rxm_cq_write_recv_comp(struct rxm_rx_buf *rx_buf,
943 void *context, uint64_t flags,
944 size_t len, char *buf)
945 {
946 if (rx_buf->ep->rxm_info->caps & FI_SOURCE)
947 return ofi_cq_write_src(rx_buf->ep->util_ep.rx_cq, context,
948 flags, len, buf, rx_buf->pkt.hdr.data,
949 rx_buf->pkt.hdr.tag,
950 rx_buf->conn->handle.fi_addr);
951 else
952 return ofi_cq_write(rx_buf->ep->util_ep.rx_cq, context,
953 flags, len, buf, rx_buf->pkt.hdr.data,
954 rx_buf->pkt.hdr.tag);
955 }
956
957 #endif
958