1 /*
2 * Copyright (c) 2015-2017 Intel Corporation, Inc. All rights reserved.
3 * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
4 * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
5 *
6 * This software is available to you under a choice of one of two
7 * licenses. You may choose to be licensed under the terms of the GNU
8 * General Public License (GPL) Version 2, available from the file
9 * COPYING in the main directory of this source tree, or the
10 * BSD license below:
11 *
12 * Redistribution and use in source and binary forms, with or
13 * without modification, are permitted provided that the following
14 * conditions are met:
15 *
16 * - Redistributions of source code must retain the above
17 * copyright notice, this list of conditions and the following
18 * disclaimer.
19 *
20 * - Redistributions in binary form must reproduce the above
21 * copyright notice, this list of conditions and the following
22 * disclaimer in the documentation and/or other materials
23 * provided with the distribution.
24 *
25 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32 * SOFTWARE.
33 */
34
35 #ifndef _OFI_UTIL_H_
36 #define _OFI_UTIL_H_
37
38 #if HAVE_CONFIG_H
39 # include <config.h>
40 #endif /* HAVE_CONFIG_H */
41
42 #include <pthread.h>
43 #include <stdio.h>
44
45 #include <rdma/fabric.h>
46 #include <rdma/fi_atomic.h>
47 #include <rdma/fi_cm.h>
48 #include <rdma/fi_domain.h>
49 #include <rdma/fi_endpoint.h>
50 #include <rdma/fi_eq.h>
51 #include <rdma/fi_errno.h>
52 #include <rdma/fi_rma.h>
53 #include <rdma/fi_tagged.h>
54 #include <rdma/fi_trigger.h>
55
56 #include <ofi.h>
57 #include <ofi_mr.h>
58 #include <ofi_list.h>
59 #include <ofi_mem.h>
60 #include <ofi_rbuf.h>
61 #include <ofi_signal.h>
62 #include <ofi_enosys.h>
63 #include <ofi_osd.h>
64 #include <ofi_indexer.h>
65 #include <ofi_epoll.h>
66 #include <ofi_proto.h>
67 #include <ofi_bitmask.h>
68
69 #include "rbtree.h"
70 #include "uthash.h"
71
72 #ifdef __cplusplus
73 extern "C" {
74 #endif
75
76 /* EQ / CQ flags
77 * ERROR: The added entry was the result of an error completion
78 * OVERFLOW: The CQ has overflowed, and events have been lost
79 */
80 #define UTIL_FLAG_ERROR (1ULL << 60)
81 #define UTIL_FLAG_OVERFLOW (1ULL << 61)
82
83 /* Indicates that an EP has been bound to a counter */
84 #define OFI_CNTR_ENABLED (1ULL << 61)
85
86 /* Memory registration should not be cached */
87 #define OFI_MR_NOCACHE BIT_ULL(60)
88
89 #define OFI_Q_STRERROR(prov, level, subsys, q, q_str, entry, q_strerror) \
90 FI_LOG(prov, level, subsys, "fi_" q_str "_readerr: err: %s (%d), " \
91 "prov_err: %s (%d)\n", strerror((entry)->err), (entry)->err, \
92 q_strerror((q), (entry)->prov_errno, \
93 (entry)->err_data, NULL, 0), \
94 (entry)->prov_errno)
95
96 #define OFI_CQ_STRERROR(prov, level, subsys, cq, entry) \
97 OFI_Q_STRERROR(prov, level, subsys, cq, "cq", entry, fi_cq_strerror)
98
99 #define OFI_EQ_STRERROR(prov, level, subsys, eq, entry) \
100 OFI_Q_STRERROR(prov, level, subsys, eq, "eq", entry, fi_eq_strerror)
101
102 #define FI_INFO_FIELD(provider, prov_attr, user_attr, prov_str, user_str, type) \
103 do { \
104 FI_INFO(provider, FI_LOG_CORE, prov_str ": %s\n", \
105 fi_tostr(&prov_attr, type)); \
106 FI_INFO(provider, FI_LOG_CORE, user_str ": %s\n", \
107 fi_tostr(&user_attr, type)); \
108 } while (0)
109
110 #define FI_INFO_STRING(provider, prov_attr, user_attr, prov_str, user_str) \
111 do { \
112 FI_INFO(provider, FI_LOG_CORE, prov_str ": %s\n", prov_attr); \
113 FI_INFO(provider, FI_LOG_CORE, user_str ": %s\n", user_attr); \
114 } while (0)
115
116 #define FI_INFO_CHECK(provider, prov, user, field, type) \
117 FI_INFO_FIELD(provider, prov->field, user->field, "Supported", \
118 "Requested", type)
119
120 #define FI_INFO_CHECK_VAL(provider, prov, user, field) \
121 do { \
122 FI_INFO(provider, FI_LOG_CORE, "Supported: %zd\n", prov->field); \
123 FI_INFO(provider, FI_LOG_CORE, "Requested: %zd\n", user->field); \
124 } while (0)
125
126 #define FI_INFO_MODE(provider, prov_mode, user_mode) \
127 FI_INFO_FIELD(provider, prov_mode, user_mode, "Expected", "Given", \
128 FI_TYPE_MODE)
129
130 #define FI_INFO_MR_MODE(provider, prov_mode, user_mode) \
131 FI_INFO_FIELD(provider, prov_mode, user_mode, "Expected", "Given", \
132 FI_TYPE_MR_MODE)
133
134 #define FI_INFO_NAME(provider, prov, user) \
135 FI_INFO_STRING(provider, prov->name, user->name, "Supported", \
136 "Requested")
137
138 #define ofi_after_eq(a,b) ((long)((a) - (b)) >= 0)
139 #define ofi_before(a,b) ((long)((a) - (b)) < 0)
140
141 enum {
142 UTIL_TX_SHARED_CTX = 1 << 0,
143 UTIL_RX_SHARED_CTX = 1 << 1,
144 };
145
146 struct ofi_common_locks {
147 pthread_mutex_t ini_lock;
148 pthread_mutex_t util_fabric_lock;
149 };
150
151 /*
152 * Provider details
153 */
154 struct util_prov {
155 const struct fi_provider *prov;
156 const struct fi_info *info;
157 const int flags;
158 };
159
160
161 /*
162 * Fabric
163 */
164 struct util_fabric_info {
165 const char *name;
166 const struct fi_provider *prov;
167 };
168
169 struct util_fabric {
170 struct fid_fabric fabric_fid;
171 struct dlist_entry list_entry;
172 fastlock_t lock;
173 ofi_atomic32_t ref;
174 const char *name;
175 const struct fi_provider *prov;
176
177 struct dlist_entry domain_list;
178 };
179
180 int ofi_fabric_init(const struct fi_provider *prov,
181 const struct fi_fabric_attr *prov_attr,
182 const struct fi_fabric_attr *user_attr,
183 struct util_fabric *fabric, void *context);
184 int ofi_fabric_close(struct util_fabric *fabric);
185 int ofi_trywait(struct fid_fabric *fabric, struct fid **fids, int count);
186
187 /*
188 * Domain
189 */
190 struct util_domain {
191 struct fid_domain domain_fid;
192 struct dlist_entry list_entry;
193 struct util_fabric *fabric;
194 struct util_eq *eq;
195 fastlock_t lock;
196 ofi_atomic32_t ref;
197 const struct fi_provider *prov;
198
199 char *name;
200 uint64_t info_domain_caps;
201 uint64_t info_domain_mode;
202 int mr_mode;
203 uint32_t addr_format;
204 enum fi_av_type av_type;
205 struct ofi_mr_map mr_map;
206 enum fi_threading threading;
207 enum fi_progress data_progress;
208 };
209
210 int ofi_domain_init(struct fid_fabric *fabric_fid, const struct fi_info *info,
211 struct util_domain *domain, void *context);
212 int ofi_domain_bind_eq(struct util_domain *domain, struct util_eq *eq);
213 int ofi_domain_close(struct util_domain *domain);
214
215 static const uint64_t ofi_rx_mr_flags[] = {
216 [ofi_op_msg] = FI_RECV,
217 [ofi_op_tagged] = FI_RECV,
218 [ofi_op_read_req] = FI_REMOTE_READ,
219 [ofi_op_write] = FI_REMOTE_WRITE,
220 [ofi_op_atomic] = FI_REMOTE_WRITE,
221 [ofi_op_atomic_fetch] = FI_REMOTE_WRITE | FI_REMOTE_READ,
222 [ofi_op_atomic_compare] = FI_REMOTE_WRITE | FI_REMOTE_READ,
223 };
224
ofi_rx_mr_reg_flags(uint32_t op,uint16_t atomic_op)225 static inline uint64_t ofi_rx_mr_reg_flags(uint32_t op, uint16_t atomic_op)
226 {
227 if (atomic_op == FI_ATOMIC_READ)
228 return FI_REMOTE_READ;
229
230 return ofi_rx_mr_flags[op];
231 }
232
233 /*
234 * Passive Endpoint
235 */
236
237 struct util_pep {
238 struct fid_pep pep_fid;
239 struct util_fabric *fabric;
240 struct util_eq *eq;
241 };
242
243 int ofi_pep_init(struct fid_fabric *fabric, struct fi_info *info,
244 struct util_pep *pep, void *context);
245 int ofi_pep_bind_eq(struct util_pep *pep, struct util_eq *eq, uint64_t flags);
246 int ofi_pep_close(struct util_pep *pep);
247
248 /*
249 * Endpoint
250 */
251
252 struct util_cntr;
253 struct util_ep;
254 typedef void (*ofi_ep_progress_func)(struct util_ep *util_ep);
255 typedef void (*ofi_cntr_inc_func)(struct util_cntr *util_cntr);
256
257 struct util_ep {
258 struct fid_ep ep_fid;
259 struct util_domain *domain;
260
261 struct util_av *av;
262 struct dlist_entry av_entry;
263 struct util_eq *eq;
264 /* CQ entries */
265 struct util_cq *rx_cq;
266 uint64_t rx_op_flags;
267 struct util_cq *tx_cq;
268 uint64_t tx_op_flags;
269 uint64_t inject_op_flags;
270
271 /* flags to be ORed in to flags for *msg API calls
272 * to properly handle FI_SELECTIVE_COMPLETION bind */
273 uint64_t tx_msg_flags;
274 uint64_t rx_msg_flags;
275
276 /* CNTR entries */
277 struct util_cntr *tx_cntr; /* transmit/send */
278 struct util_cntr *rx_cntr; /* receive */
279 struct util_cntr *rd_cntr; /* read */
280 struct util_cntr *wr_cntr; /* write */
281 struct util_cntr *rem_rd_cntr; /* remote read */
282 struct util_cntr *rem_wr_cntr; /* remote write */
283
284 ofi_cntr_inc_func tx_cntr_inc;
285 ofi_cntr_inc_func rx_cntr_inc;
286 ofi_cntr_inc_func rd_cntr_inc;
287 ofi_cntr_inc_func wr_cntr_inc;
288 ofi_cntr_inc_func rem_rd_cntr_inc;
289 ofi_cntr_inc_func rem_wr_cntr_inc;
290
291 enum fi_ep_type type;
292 uint64_t caps;
293 uint64_t flags;
294 ofi_ep_progress_func progress;
295 fastlock_t lock;
296 ofi_fastlock_acquire_t lock_acquire;
297 ofi_fastlock_release_t lock_release;
298
299 struct bitmask *coll_cid_mask;
300 struct slist coll_ready_queue;
301 };
302
303 int ofi_ep_bind_av(struct util_ep *util_ep, struct util_av *av);
304 int ofi_ep_bind_eq(struct util_ep *ep, struct util_eq *eq);
305 int ofi_ep_bind_cq(struct util_ep *ep, struct util_cq *cq, uint64_t flags);
306 int ofi_ep_bind_cntr(struct util_ep *ep, struct util_cntr *cntr, uint64_t flags);
307 int ofi_ep_bind(struct util_ep *util_ep, struct fid *fid, uint64_t flags);
308 int ofi_endpoint_init(struct fid_domain *domain, const struct util_prov *util_prov,
309 struct fi_info *info, struct util_ep *ep, void *context,
310 ofi_ep_progress_func progress);
311
312 int ofi_endpoint_close(struct util_ep *util_ep);
313
314 static inline int
ofi_ep_fid_bind(struct fid * ep_fid,struct fid * bfid,uint64_t flags)315 ofi_ep_fid_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags)
316 {
317 return ofi_ep_bind(container_of(ep_fid, struct util_ep, ep_fid.fid),
318 bfid, flags);
319 }
320
ofi_ep_lock_acquire(struct util_ep * ep)321 static inline void ofi_ep_lock_acquire(struct util_ep *ep)
322 {
323 ep->lock_acquire(&ep->lock);
324 }
325
ofi_ep_lock_release(struct util_ep * ep)326 static inline void ofi_ep_lock_release(struct util_ep *ep)
327 {
328 ep->lock_release(&ep->lock);
329 }
330
ofi_ep_tx_cntr_inc(struct util_ep * ep)331 static inline void ofi_ep_tx_cntr_inc(struct util_ep *ep)
332 {
333 ep->tx_cntr_inc(ep->tx_cntr);
334 }
335
ofi_ep_rx_cntr_inc(struct util_ep * ep)336 static inline void ofi_ep_rx_cntr_inc(struct util_ep *ep)
337 {
338 ep->rx_cntr_inc(ep->rx_cntr);
339 }
340
ofi_ep_rd_cntr_inc(struct util_ep * ep)341 static inline void ofi_ep_rd_cntr_inc(struct util_ep *ep)
342 {
343 ep->rd_cntr_inc(ep->rd_cntr);
344 }
345
ofi_ep_wr_cntr_inc(struct util_ep * ep)346 static inline void ofi_ep_wr_cntr_inc(struct util_ep *ep)
347 {
348 ep->wr_cntr_inc(ep->wr_cntr);
349 }
350
ofi_ep_rem_rd_cntr_inc(struct util_ep * ep)351 static inline void ofi_ep_rem_rd_cntr_inc(struct util_ep *ep)
352 {
353 ep->rem_rd_cntr_inc(ep->rem_rd_cntr);
354 }
355
ofi_ep_rem_wr_cntr_inc(struct util_ep * ep)356 static inline void ofi_ep_rem_wr_cntr_inc(struct util_ep *ep)
357 {
358 ep->rem_wr_cntr_inc(ep->rem_wr_cntr);
359 }
360
361 typedef void (*ofi_ep_cntr_inc_func)(struct util_ep *);
362 extern ofi_ep_cntr_inc_func ofi_ep_tx_cntr_inc_funcs[ofi_op_max];
363 extern ofi_ep_cntr_inc_func ofi_ep_rx_cntr_inc_funcs[ofi_op_max];
364
ofi_ep_tx_cntr_inc_func(struct util_ep * ep,uint8_t op)365 static inline void ofi_ep_tx_cntr_inc_func(struct util_ep *ep, uint8_t op)
366 {
367 assert(op < ofi_op_max);
368 ofi_ep_tx_cntr_inc_funcs[op](ep);
369 }
370
ofi_ep_rx_cntr_inc_func(struct util_ep * ep,uint8_t op)371 static inline void ofi_ep_rx_cntr_inc_func(struct util_ep *ep, uint8_t op)
372 {
373 assert(op < ofi_op_max);
374 ofi_ep_rx_cntr_inc_funcs[op](ep);
375 }
376
377 /*
378 * Tag and address match
379 */
380
ofi_match_addr(fi_addr_t recv_addr,fi_addr_t addr)381 static inline int ofi_match_addr(fi_addr_t recv_addr, fi_addr_t addr)
382 {
383 return (recv_addr == FI_ADDR_UNSPEC) || (recv_addr == addr);
384 }
385
ofi_match_tag(uint64_t recv_tag,uint64_t recv_ignore,uint64_t tag)386 static inline int ofi_match_tag(uint64_t recv_tag, uint64_t recv_ignore,
387 uint64_t tag)
388 {
389 return ((recv_tag | recv_ignore) == (tag | recv_ignore));
390 }
391
392 /*
393 * Wait set
394 */
395 struct util_wait;
396 typedef void (*fi_wait_signal_func)(struct util_wait *wait);
397 typedef int (*fi_wait_try_func)(struct util_wait *wait);
398
399 struct util_wait {
400 struct fid_wait wait_fid;
401 struct util_fabric *fabric;
402 struct util_poll *pollset;
403 ofi_atomic32_t ref;
404 const struct fi_provider *prov;
405
406 enum fi_wait_obj wait_obj;
407 fi_wait_signal_func signal;
408 fi_wait_try_func wait_try;
409
410 struct dlist_entry fid_list;
411 fastlock_t lock;
412 };
413
414 int ofi_wait_init(struct util_fabric *fabric, struct fi_wait_attr *attr,
415 struct util_wait *wait);
416 int fi_wait_cleanup(struct util_wait *wait);
417
418 struct util_wait_fd {
419 struct util_wait util_wait;
420 struct fd_signal signal;
421 struct dlist_entry fd_list;
422
423 union {
424 ofi_epoll_t epoll_fd;
425 struct ofi_pollfds *pollfds;
426 };
427 uint64_t change_index;
428 };
429
430 typedef int (*ofi_wait_try_func)(void *arg);
431
432 struct ofi_wait_fd_entry {
433 struct dlist_entry entry;
434 int fd;
435 ofi_wait_try_func wait_try;
436 void *arg;
437 ofi_atomic32_t ref;
438 };
439
440 struct ofi_wait_fid_entry {
441 struct dlist_entry entry;
442 ofi_wait_try_func wait_try;
443 fid_t fid;
444 enum fi_wait_obj wait_obj;
445 uint32_t events;
446 ofi_atomic32_t ref;
447 struct fi_wait_pollfd pollfds;
448 };
449
450 int ofi_wait_fd_open(struct fid_fabric *fabric, struct fi_wait_attr *attr,
451 struct fid_wait **waitset);
452 int ofi_wait_add_fd(struct util_wait *wait, int fd, uint32_t events,
453 ofi_wait_try_func wait_try, void *arg, void *context);
454 int ofi_wait_del_fd(struct util_wait *wait, int fd);
455 int ofi_wait_add_fid(struct util_wait *wat, fid_t fid, uint32_t events,
456 ofi_wait_try_func wait_try);
457 int ofi_wait_del_fid(struct util_wait *wait, fid_t fid);
458
459 struct util_wait_yield {
460 struct util_wait util_wait;
461 int signal;
462 fastlock_t signal_lock;
463 };
464
465 int ofi_wait_yield_open(struct fid_fabric *fabric, struct fi_wait_attr *attr,
466 struct fid_wait **waitset);
467
468 /*
469 * Completion queue
470 *
471 * Utility provider derived CQs that require manual progress must
472 * progress the CQ when fi_cq_read is called with a count = 0.
473 * In such cases, fi_cq_read will return 0 if there are available
474 * entries on the CQ. This allows poll sets to drive progress
475 * without introducing private interfaces to the CQ.
476 */
477
478 typedef void (*fi_cq_read_func)(void **dst, void *src);
479
480 struct util_cq_oflow_err_entry {
481 struct fi_cq_tagged_entry *parent_comp;
482 struct fi_cq_err_entry comp;
483 fi_addr_t src;
484 struct slist_entry list_entry;
485 };
486
487 OFI_DECLARE_CIRQUE(struct fi_cq_tagged_entry, util_comp_cirq);
488
489 typedef void (*ofi_cq_progress_func)(struct util_cq *cq);
490
491 struct util_cq {
492 struct fid_cq cq_fid;
493 struct util_domain *domain;
494 struct util_wait *wait;
495 ofi_atomic32_t ref;
496 struct dlist_entry ep_list;
497 fastlock_t ep_list_lock;
498 fastlock_t cq_lock;
499 ofi_fastlock_acquire_t cq_fastlock_acquire;
500 ofi_fastlock_release_t cq_fastlock_release;
501
502 struct util_comp_cirq *cirq;
503 fi_addr_t *src;
504
505 struct slist oflow_err_list;
506 fi_cq_read_func read_entry;
507 int internal_wait;
508 ofi_atomic32_t signaled;
509 ofi_cq_progress_func progress;
510 };
511
512 int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
513 struct fi_cq_attr *attr, struct util_cq *cq,
514 ofi_cq_progress_func progress, void *context);
515 int ofi_check_bind_cq_flags(struct util_ep *ep, struct util_cq *cq,
516 uint64_t flags);
517 void ofi_cq_progress(struct util_cq *cq);
518 int ofi_cq_cleanup(struct util_cq *cq);
519 int ofi_cq_control(struct fid *fid, int command, void *arg);
520 ssize_t ofi_cq_read(struct fid_cq *cq_fid, void *buf, size_t count);
521 ssize_t ofi_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count,
522 fi_addr_t *src_addr);
523 ssize_t ofi_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *buf,
524 uint64_t flags);
525 ssize_t ofi_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
526 const void *cond, int timeout);
527 ssize_t ofi_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
528 fi_addr_t *src_addr, const void *cond, int timeout);
529 int ofi_cq_signal(struct fid_cq *cq_fid);
530
531 int ofi_cq_write_overflow(struct util_cq *cq, void *context, uint64_t flags, size_t len,
532 void *buf, uint64_t data, uint64_t tag, fi_addr_t src);
533
util_cq_signal(struct util_cq * cq)534 static inline void util_cq_signal(struct util_cq *cq)
535 {
536 assert(cq->wait);
537 cq->wait->signal(cq->wait);
538 }
539
540 static inline void
ofi_cq_write_comp_entry(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag)541 ofi_cq_write_comp_entry(struct util_cq *cq, void *context, uint64_t flags,
542 size_t len, void *buf, uint64_t data, uint64_t tag)
543 {
544 struct fi_cq_tagged_entry *comp = ofi_cirque_tail(cq->cirq);
545 comp->op_context = context;
546 comp->flags = flags;
547 comp->len = len;
548 comp->buf = buf;
549 comp->data = data;
550 comp->tag = tag;
551 ofi_cirque_commit(cq->cirq);
552 }
553
554 static inline int
ofi_cq_write_thread_unsafe(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag)555 ofi_cq_write_thread_unsafe(struct util_cq *cq, void *context, uint64_t flags,
556 size_t len, void *buf, uint64_t data, uint64_t tag)
557 {
558 if (OFI_UNLIKELY(ofi_cirque_isfull(cq->cirq))) {
559 FI_DBG(cq->domain->prov, FI_LOG_CQ,
560 "util_cq cirq is full!\n");
561 return ofi_cq_write_overflow(cq, context, flags, len,
562 buf, data, tag, 0);
563 }
564 ofi_cq_write_comp_entry(cq, context, flags, len, buf, data, tag);
565 return 0;
566 }
567
568 static inline int
ofi_cq_write(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag)569 ofi_cq_write(struct util_cq *cq, void *context, uint64_t flags, size_t len,
570 void *buf, uint64_t data, uint64_t tag)
571 {
572 int ret;
573 cq->cq_fastlock_acquire(&cq->cq_lock);
574 ret = ofi_cq_write_thread_unsafe(cq, context, flags, len, buf, data, tag);
575 cq->cq_fastlock_release(&cq->cq_lock);
576 return ret;
577 }
578
579 static inline int
ofi_cq_write_src_thread_unsafe(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,fi_addr_t src)580 ofi_cq_write_src_thread_unsafe(struct util_cq *cq, void *context, uint64_t flags, size_t len,
581 void *buf, uint64_t data, uint64_t tag, fi_addr_t src)
582 {
583 if (OFI_UNLIKELY(ofi_cirque_isfull(cq->cirq))) {
584 FI_DBG(cq->domain->prov, FI_LOG_CQ,
585 "util_cq cirq is full!\n");
586 return ofi_cq_write_overflow(cq, context, flags, len,
587 buf, data, tag, src);
588 }
589 cq->src[ofi_cirque_windex(cq->cirq)] = src;
590 ofi_cq_write_comp_entry(cq, context, flags, len, buf, data, tag);
591 return 0;
592 }
593
594 static inline int
ofi_cq_write_src(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,fi_addr_t src)595 ofi_cq_write_src(struct util_cq *cq, void *context, uint64_t flags, size_t len,
596 void *buf, uint64_t data, uint64_t tag, fi_addr_t src)
597 {
598 int ret;
599 cq->cq_fastlock_acquire(&cq->cq_lock);
600 ret = ofi_cq_write_src_thread_unsafe(cq, context, flags, len,
601 buf, data, tag, src);
602 cq->cq_fastlock_release(&cq->cq_lock);
603 return ret;
604 }
605
606 int ofi_cq_write_error(struct util_cq *cq,
607 const struct fi_cq_err_entry *err_entry);
608 int ofi_cq_write_error_peek(struct util_cq *cq, uint64_t tag, void *context);
609 int ofi_cq_write_error_trunc(struct util_cq *cq, void *context, uint64_t flags,
610 size_t len, void *buf, uint64_t data, uint64_t tag,
611 size_t olen);
612
ofi_need_completion(uint64_t cq_flags,uint64_t op_flags)613 static inline int ofi_need_completion(uint64_t cq_flags, uint64_t op_flags)
614 {
615 return (!(cq_flags & FI_SELECTIVE_COMPLETION) ||
616 (op_flags & (FI_COMPLETION | FI_INJECT_COMPLETE |
617 FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)));
618 }
619
620 extern uint64_t ofi_rx_flags[ofi_op_max];
621 extern uint64_t ofi_tx_flags[ofi_op_max];
622
ofi_rx_cq_flags(uint32_t op)623 static inline uint64_t ofi_rx_cq_flags(uint32_t op)
624 {
625 return ofi_rx_flags[op];
626 }
627
ofi_tx_cq_flags(uint32_t op)628 static inline uint64_t ofi_tx_cq_flags(uint32_t op)
629 {
630 return ofi_tx_flags[op];
631 }
632
633 /*
634 * Counter
635 */
636 typedef void (*ofi_cntr_progress_func)(struct util_cntr *cntr);
637
638 struct util_cntr {
639 struct fid_cntr cntr_fid;
640 struct util_domain *domain;
641 struct util_wait *wait;
642 ofi_atomic32_t ref;
643
644 ofi_atomic64_t cnt;
645 ofi_atomic64_t err;
646
647 uint64_t checkpoint_cnt;
648 uint64_t checkpoint_err;
649
650 struct dlist_entry ep_list;
651 fastlock_t ep_list_lock;
652
653 int internal_wait;
654 ofi_cntr_progress_func progress;
655 };
656
657 void ofi_cntr_progress(struct util_cntr *cntr);
658 int ofi_cntr_init(const struct fi_provider *prov, struct fid_domain *domain,
659 struct fi_cntr_attr *attr, struct util_cntr *cntr,
660 ofi_cntr_progress_func progress, void *context);
661 int ofi_cntr_cleanup(struct util_cntr *cntr);
util_cntr_signal(struct util_cntr * cntr)662 static inline void util_cntr_signal(struct util_cntr *cntr)
663 {
664 assert(cntr->wait);
665 cntr->wait->signal(cntr->wait);
666 }
667
ofi_cntr_inc_noop(struct util_cntr * cntr)668 static inline void ofi_cntr_inc_noop(struct util_cntr *cntr)
669 {
670 OFI_UNUSED(cntr);
671 }
672
ofi_cntr_inc(struct util_cntr * cntr)673 static inline void ofi_cntr_inc(struct util_cntr *cntr)
674 {
675 cntr->cntr_fid.ops->add(&cntr->cntr_fid, 1);
676 }
677
678 /*
679 * AV / addressing
680 */
681
682 struct util_av_entry {
683 ofi_atomic32_t use_cnt;
684 UT_hash_handle hh;
685 char addr[0];
686 };
687
688 struct util_av {
689 struct fid_av av_fid;
690 struct util_domain *domain;
691 struct util_eq *eq;
692 ofi_atomic32_t ref;
693 fastlock_t lock;
694 const struct fi_provider *prov;
695
696 struct util_av_entry *hash;
697 struct ofi_bufpool *av_entry_pool;
698
699 struct util_coll_mc *coll_mc;
700 void *context;
701 uint64_t flags;
702 size_t count;
703 size_t addrlen;
704 struct dlist_entry ep_list;
705 fastlock_t ep_list_lock;
706 };
707
708 struct util_av_attr {
709 size_t addrlen;
710 int flags;
711 };
712
713 typedef int (*ofi_av_apply_func)(struct util_av *av, void *addr,
714 fi_addr_t fi_addr, void *arg);
715
716 int ofi_av_init(struct util_domain *domain,
717 const struct fi_av_attr *attr, const struct util_av_attr *util_attr,
718 struct util_av *av, void *context);
719 int ofi_av_init_lightweight(struct util_domain *domain, const struct fi_av_attr *attr,
720 struct util_av *av, void *context);
721 int ofi_av_close(struct util_av *av);
722 int ofi_av_close_lightweight(struct util_av *av);
723
724 int ofi_av_insert_addr(struct util_av *av, const void *addr, fi_addr_t *fi_addr);
725 int ofi_av_remove_addr(struct util_av *av, fi_addr_t fi_addr);
726 fi_addr_t ofi_av_lookup_fi_addr_unsafe(struct util_av *av, const void *addr);
727 fi_addr_t ofi_av_lookup_fi_addr(struct util_av *av, const void *addr);
728 int ofi_av_elements_iter(struct util_av *av, ofi_av_apply_func apply, void *arg);
729 int ofi_av_bind(struct fid *av_fid, struct fid *eq_fid, uint64_t flags);
730 void ofi_av_write_event(struct util_av *av, uint64_t data,
731 int err, void *context);
732
733 int ofi_ip_av_create(struct fid_domain *domain_fid, struct fi_av_attr *attr,
734 struct fid_av **av, void *context);
735 int ofi_ip_av_create_flags(struct fid_domain *domain_fid, struct fi_av_attr *attr,
736 struct fid_av **av, void *context, int flags);
737
738 void *ofi_av_get_addr(struct util_av *av, fi_addr_t fi_addr);
739 #define ofi_ip_av_get_addr ofi_av_get_addr
740 fi_addr_t ofi_ip_av_get_fi_addr(struct util_av *av, const void *addr);
741
742 int ofi_get_addr(uint32_t *addr_format, uint64_t flags,
743 const char *node, const char *service,
744 void **addr, size_t *addrlen);
745 int ofi_get_src_addr(uint32_t addr_format,
746 const void *dest_addr, size_t dest_addrlen,
747 void **src_addr, size_t *src_addrlen);
748 void ofi_getnodename(uint16_t sa_family, char *buf, int buflen);
749 int ofi_av_get_index(struct util_av *av, const void *addr);
750
751 int ofi_verify_av_insert(struct util_av *av, uint64_t flags);
752 int ofi_ip_av_insertv(struct util_av *av, const void *addr, size_t addrlen,
753 size_t count, fi_addr_t *fi_addr, void *context);
754 /* Caller should free *addr */
755 int ofi_ip_av_sym_getaddr(struct util_av *av, const char *node,
756 size_t nodecnt, const char *service,
757 size_t svccnt, void **addr, size_t *addrlen);
758 int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
759 fi_addr_t *fi_addr, uint64_t flags, void *context);
760 int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
761 size_t count, uint64_t flags);
762 int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
763 void *addr, size_t *addrlen);
764 const char *
765 ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len);
766
767 /*
768 * Poll set
769 */
770 struct util_poll {
771 struct fid_poll poll_fid;
772 struct util_domain *domain;
773 struct dlist_entry fid_list;
774 fastlock_t lock;
775 ofi_atomic32_t ref;
776 const struct fi_provider *prov;
777 };
778
779 int fi_poll_create_(const struct fi_provider *prov, struct fid_domain *domain,
780 struct fi_poll_attr *attr, struct fid_poll **pollset);
781 int fi_poll_create(struct fid_domain *domain, struct fi_poll_attr *attr,
782 struct fid_poll **pollset);
783
784 /*
785 * EQ
786 */
787 struct util_eq {
788 struct fid_eq eq_fid;
789 struct util_fabric *fabric;
790 struct util_wait *wait;
791 fastlock_t lock;
792 ofi_atomic32_t ref;
793 const struct fi_provider *prov;
794
795 struct slist list;
796 /* This contains error data that are read by user and need to
797 * be freed in subsequent fi_eq_readerr call against the EQ */
798 void *saved_err_data;
799 int internal_wait;
800 };
801
802 struct util_event {
803 struct slist_entry entry;
804 int size;
805 int event;
806 int err;
807 uint8_t data[0];
808 };
809
810 int ofi_eq_create(struct fid_fabric *fabric, struct fi_eq_attr *attr,
811 struct fid_eq **eq_fid, void *context);
812 int ofi_eq_init(struct fid_fabric *fabric_fid, struct fi_eq_attr *attr,
813 struct fid_eq *eq_fid, void *context);
814 int ofi_eq_control(struct fid *fid, int command, void *arg);
815 int ofi_eq_cleanup(struct fid *fid);
816 void ofi_eq_remove_fid_events(struct util_eq *eq, fid_t fid);
817 void ofi_eq_handle_err_entry(uint32_t api_version, uint64_t flags,
818 struct fi_eq_err_entry *err_entry,
819 struct fi_eq_err_entry *user_err_entry);
820 ssize_t ofi_eq_read(struct fid_eq *eq_fid, uint32_t *event,
821 void *buf, size_t len, uint64_t flags);
822 ssize_t ofi_eq_sread(struct fid_eq *eq_fid, uint32_t *event, void *buf,
823 size_t len, int timeout, uint64_t flags);
824 ssize_t ofi_eq_readerr(struct fid_eq *eq_fid, struct fi_eq_err_entry *buf,
825 uint64_t flags);
826 ssize_t ofi_eq_write(struct fid_eq *eq_fid, uint32_t event,
827 const void *buf, size_t len, uint64_t flags);
828 const char *ofi_eq_strerror(struct fid_eq *eq_fid, int prov_errno,
829 const void *err_data, char *buf, size_t len);
830
831 /*
832
833 #define OFI_MR_MODE_RMA_TARGET (FI_MR_RAW | FI_MR_VIRT_ADDR |\
834 FI_MR_PROV_KEY | FI_MR_RMA_EVENT)
835 * Attributes and capabilities
836 */
837 #define FI_PRIMARY_CAPS (FI_MSG | FI_RMA | FI_TAGGED | FI_ATOMICS | FI_MULTICAST | \
838 FI_NAMED_RX_CTX | FI_DIRECTED_RECV | \
839 FI_READ | FI_WRITE | FI_RECV | FI_SEND | \
840 FI_REMOTE_READ | FI_REMOTE_WRITE | FI_COLLECTIVE | \
841 FI_HMEM)
842
843 #define FI_SECONDARY_CAPS (FI_MULTI_RECV | FI_SOURCE | FI_RMA_EVENT | \
844 FI_SHARED_AV | FI_TRIGGER | FI_FENCE | \
845 FI_LOCAL_COMM | FI_REMOTE_COMM)
846
847 #define OFI_TX_MSG_CAPS (FI_MSG | FI_SEND)
848 #define OFI_RX_MSG_CAPS (FI_MSG | FI_RECV)
849 #define OFI_TX_RMA_CAPS (FI_RMA | FI_READ | FI_WRITE)
850 #define OFI_RX_RMA_CAPS (FI_RMA | FI_REMOTE_READ | FI_REMOTE_WRITE)
851
852 int ofi_check_ep_type(const struct fi_provider *prov,
853 const struct fi_ep_attr *prov_attr,
854 const struct fi_ep_attr *user_attr);
855 int ofi_check_mr_mode(const struct fi_provider *prov, uint32_t api_version,
856 int prov_mode, const struct fi_info *user_info);
857 int ofi_check_fabric_attr(const struct fi_provider *prov,
858 const struct fi_fabric_attr *prov_attr,
859 const struct fi_fabric_attr *user_attr);
860 int ofi_check_wait_attr(const struct fi_provider *prov,
861 const struct fi_wait_attr *attr);
862 int ofi_check_domain_attr(const struct fi_provider *prov, uint32_t api_version,
863 const struct fi_domain_attr *prov_attr,
864 const struct fi_info *user_info);
865 int ofi_check_ep_attr(const struct util_prov *util_prov, uint32_t api_version,
866 const struct fi_info *prov_info,
867 const struct fi_info *user_info);
868 int ofi_check_cq_attr(const struct fi_provider *prov,
869 const struct fi_cq_attr *attr);
870 int ofi_check_rx_attr(const struct fi_provider *prov,
871 const struct fi_info *prov_info,
872 const struct fi_rx_attr *user_attr, uint64_t info_mode);
873 int ofi_check_tx_attr(const struct fi_provider *prov,
874 const struct fi_tx_attr *prov_attr,
875 const struct fi_tx_attr *user_attr, uint64_t info_mode);
876 int ofi_check_attr_subset(const struct fi_provider *prov,
877 uint64_t base_caps, uint64_t requested_caps);
878 int ofi_prov_check_info(const struct util_prov *util_prov,
879 uint32_t api_version,
880 const struct fi_info *user_info);
881 int ofi_prov_check_dup_info(const struct util_prov *util_prov,
882 uint32_t api_version,
883 const struct fi_info *user_info,
884 struct fi_info **info);
885 static inline uint64_t
ofi_pick_core_flags(uint64_t all_util_flags,uint64_t all_core_flags,uint64_t use_core_flags)886 ofi_pick_core_flags(uint64_t all_util_flags, uint64_t all_core_flags,
887 uint64_t use_core_flags)
888 {
889 return (all_util_flags & ~use_core_flags) |
890 (all_core_flags & use_core_flags);
891 }
892
893 int ofi_check_info(const struct util_prov *util_prov,
894 const struct fi_info *prov_info, uint32_t api_version,
895 const struct fi_info *user_info);
896 void ofi_alter_info(struct fi_info *info, const struct fi_info *hints,
897 uint32_t api_version);
898
899 struct fi_info *ofi_allocinfo_internal(void);
900 int util_getinfo(const struct util_prov *util_prov, uint32_t version,
901 const char *node, const char *service, uint64_t flags,
902 const struct fi_info *hints, struct fi_info **info);
903 int ofi_ip_getinfo(const struct util_prov *prov, uint32_t version,
904 const char *node, const char *service, uint64_t flags,
905 const struct fi_info *hints, struct fi_info **info);
906
907
908 struct fid_list_entry {
909 struct dlist_entry entry;
910 struct fid *fid;
911 };
912
913 int fid_list_insert(struct dlist_entry *fid_list, fastlock_t *lock,
914 struct fid *fid);
915 void fid_list_remove(struct dlist_entry *fid_list, fastlock_t *lock,
916 struct fid *fid);
917
918 void ofi_fabric_insert(struct util_fabric *fabric);
919 void ofi_fabric_remove(struct util_fabric *fabric);
920
921 /*
922 * Utility Providers
923 */
924
925 #define OFI_NAME_DELIM ';'
926 #define OFI_UTIL_PREFIX "ofi_"
927
ofi_has_util_prefix(const char * str)928 static inline int ofi_has_util_prefix(const char *str)
929 {
930 return !strncasecmp(str, OFI_UTIL_PREFIX, strlen(OFI_UTIL_PREFIX));
931 }
932
933 typedef int (*ofi_alter_info_t)(uint32_t version, const struct fi_info *src_info,
934 struct fi_info *dest_info);
935
936 int ofi_get_core_info(uint32_t version, const char *node, const char *service,
937 uint64_t flags, const struct util_prov *util_prov,
938 const struct fi_info *util_hints, ofi_alter_info_t info_to_core,
939 struct fi_info **core_info);
940 int ofix_getinfo(uint32_t version, const char *node, const char *service,
941 uint64_t flags, const struct util_prov *util_prov,
942 const struct fi_info *hints, ofi_alter_info_t info_to_core,
943 ofi_alter_info_t info_to_util, struct fi_info **info);
944 int ofi_get_core_info_fabric(const struct fi_provider *prov,
945 const struct fi_fabric_attr *util_attr,
946 struct fi_info **core_info);
947
948
949 char *ofi_strdup_append(const char *head, const char *tail);
950 // char *ofi_strdup_head(const char *str);
951 // char *ofi_strdup_tail(const char *str);
952 int ofi_exclude_prov_name(char **prov_name, const char *util_prov_name);
953
954
955 int ofi_shm_map(struct util_shm *shm, const char *name, size_t size,
956 int readonly, void **mapped);
957 int ofi_shm_unmap(struct util_shm *shm);
958
959 /*
960 * Name Server TODO: add support for Windows OS
961 * (osd/windows/pthread.h should be extended)
962 */
963
964 typedef int(*ofi_ns_service_cmp_func_t)(void *svc1, void *svc2);
965 typedef int(*ofi_ns_is_service_wildcard_func_t)(void *svc);
966
967 struct util_ns {
968 SOCKET listen_sock;
969 pthread_t thread;
970 RbtHandle map;
971
972 char *hostname;
973 int port;
974
975 size_t name_len;
976 size_t service_len;
977
978 int run;
979 int is_initialized;
980 ofi_atomic32_t ref;
981
982 ofi_ns_service_cmp_func_t service_cmp;
983 ofi_ns_is_service_wildcard_func_t is_service_wildcard;
984 };
985
986 void ofi_ns_init(struct util_ns *ns);
987 int ofi_ns_start_server(struct util_ns *ns);
988 void ofi_ns_stop_server(struct util_ns *ns);
989
990 int ofi_ns_add_local_name(struct util_ns *ns, void *service, void *name);
991 int ofi_ns_del_local_name(struct util_ns *ns, void *service, void *name);
992 void *ofi_ns_resolve_name(struct util_ns *ns, const char *server,
993 void *service);
994
995 #ifdef __cplusplus
996 }
997 #endif
998
999 #endif /* _OFI_UTIL_H_ */
1000