1 /*
2  * Copyright (c) 2015-2017 Intel Corporation, Inc.  All rights reserved.
3  * Copyright (c) 2016 Cisco Systems, Inc. All rights reserved.
4  * Copyright (c) 2019 Amazon.com, Inc. or its affiliates. All rights reserved.
5  *
6  * This software is available to you under a choice of one of two
7  * licenses.  You may choose to be licensed under the terms of the GNU
8  * General Public License (GPL) Version 2, available from the file
9  * COPYING in the main directory of this source tree, or the
10  * BSD license below:
11  *
12  *     Redistribution and use in source and binary forms, with or
13  *     without modification, are permitted provided that the following
14  *     conditions are met:
15  *
16  *      - Redistributions of source code must retain the above
17  *        copyright notice, this list of conditions and the following
18  *        disclaimer.
19  *
20  *      - Redistributions in binary form must reproduce the above
21  *        copyright notice, this list of conditions and the following
22  *        disclaimer in the documentation and/or other materials
23  *        provided with the distribution.
24  *
25  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
26  * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
27  * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
28  * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
29  * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
30  * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
31  * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
32  * SOFTWARE.
33  */
34 
35 #ifndef _OFI_UTIL_H_
36 #define _OFI_UTIL_H_
37 
38 #if HAVE_CONFIG_H
39 #  include <config.h>
40 #endif /* HAVE_CONFIG_H */
41 
42 #include <pthread.h>
43 #include <stdio.h>
44 
45 #include <rdma/fabric.h>
46 #include <rdma/fi_atomic.h>
47 #include <rdma/fi_cm.h>
48 #include <rdma/fi_domain.h>
49 #include <rdma/fi_endpoint.h>
50 #include <rdma/fi_eq.h>
51 #include <rdma/fi_errno.h>
52 #include <rdma/fi_rma.h>
53 #include <rdma/fi_tagged.h>
54 #include <rdma/fi_trigger.h>
55 
56 #include <ofi.h>
57 #include <ofi_mr.h>
58 #include <ofi_list.h>
59 #include <ofi_mem.h>
60 #include <ofi_rbuf.h>
61 #include <ofi_signal.h>
62 #include <ofi_enosys.h>
63 #include <ofi_osd.h>
64 #include <ofi_indexer.h>
65 #include <ofi_epoll.h>
66 #include <ofi_proto.h>
67 #include <ofi_bitmask.h>
68 
69 #include "rbtree.h"
70 #include "uthash.h"
71 
72 #ifdef __cplusplus
73 extern "C" {
74 #endif
75 
76 /* EQ / CQ flags
77  * ERROR: The added entry was the result of an error completion
78  * OVERFLOW: The CQ has overflowed, and events have been lost
79  */
80 #define UTIL_FLAG_ERROR		(1ULL << 60)
81 #define UTIL_FLAG_OVERFLOW	(1ULL << 61)
82 
83 /* Indicates that an EP has been bound to a counter */
84 #define OFI_CNTR_ENABLED	(1ULL << 61)
85 
86 /* Memory registration should not be cached */
87 #define OFI_MR_NOCACHE		BIT_ULL(60)
88 
89 #define OFI_Q_STRERROR(prov, level, subsys, q, q_str, entry, q_strerror)	\
90 	FI_LOG(prov, level, subsys, "fi_" q_str "_readerr: err: %s (%d), "	\
91 	       "prov_err: %s (%d)\n", strerror((entry)->err), (entry)->err,	\
92 	       q_strerror((q), (entry)->prov_errno,				\
93 			  (entry)->err_data, NULL, 0),				\
94 	       (entry)->prov_errno)
95 
96 #define OFI_CQ_STRERROR(prov, level, subsys, cq, entry) \
97 	OFI_Q_STRERROR(prov, level, subsys, cq, "cq", entry, fi_cq_strerror)
98 
99 #define OFI_EQ_STRERROR(prov, level, subsys, eq, entry) \
100 	OFI_Q_STRERROR(prov, level, subsys, eq, "eq", entry, fi_eq_strerror)
101 
102 #define FI_INFO_FIELD(provider, prov_attr, user_attr, prov_str, user_str, type)	\
103 	do {										\
104 		FI_INFO(provider, FI_LOG_CORE, prov_str ": %s\n",			\
105 				fi_tostr(&prov_attr, type));				\
106 		FI_INFO(provider, FI_LOG_CORE, user_str ": %s\n",			\
107 				fi_tostr(&user_attr, type));				\
108 	} while (0)
109 
110 #define FI_INFO_STRING(provider, prov_attr, user_attr, prov_str, user_str)	\
111 	do {									\
112 		FI_INFO(provider, FI_LOG_CORE, prov_str ": %s\n", prov_attr);	\
113 		FI_INFO(provider, FI_LOG_CORE, user_str ": %s\n", user_attr);	\
114 	} while (0)
115 
116 #define FI_INFO_CHECK(provider, prov, user, field, type)		\
117 	FI_INFO_FIELD(provider, prov->field, user->field, "Supported",	\
118 		      "Requested", type)
119 
120 #define FI_INFO_CHECK_VAL(provider, prov, user, field)					\
121 	do {										\
122 		FI_INFO(provider, FI_LOG_CORE, "Supported: %zd\n", prov->field);	\
123 		FI_INFO(provider, FI_LOG_CORE, "Requested: %zd\n", user->field);	\
124 	} while (0)
125 
126 #define FI_INFO_MODE(provider, prov_mode, user_mode)				\
127 	FI_INFO_FIELD(provider, prov_mode, user_mode, "Expected", "Given",	\
128 		      FI_TYPE_MODE)
129 
130 #define FI_INFO_MR_MODE(provider, prov_mode, user_mode)			\
131 	FI_INFO_FIELD(provider, prov_mode, user_mode, "Expected", "Given",	\
132 		      FI_TYPE_MR_MODE)
133 
134 #define FI_INFO_NAME(provider, prov, user)				\
135 	FI_INFO_STRING(provider, prov->name, user->name, "Supported",	\
136 		       "Requested")
137 
138 #define ofi_after_eq(a,b)	((long)((a) - (b)) >= 0)
139 #define ofi_before(a,b)		((long)((a) - (b)) < 0)
140 
141 enum {
142 	UTIL_TX_SHARED_CTX = 1 << 0,
143 	UTIL_RX_SHARED_CTX = 1 << 1,
144 };
145 
146 struct ofi_common_locks {
147 	pthread_mutex_t ini_lock;
148 	pthread_mutex_t util_fabric_lock;
149 };
150 
151 /*
152  * Provider details
153  */
154 struct util_prov {
155 	const struct fi_provider	*prov;
156 	const struct fi_info		*info;
157 	const int			flags;
158 };
159 
160 
161 /*
162  * Fabric
163  */
164 struct util_fabric_info {
165 	const char 			*name;
166 	const struct fi_provider 	*prov;
167 };
168 
169 struct util_fabric {
170 	struct fid_fabric	fabric_fid;
171 	struct dlist_entry	list_entry;
172 	fastlock_t		lock;
173 	ofi_atomic32_t		ref;
174 	const char		*name;
175 	const struct fi_provider *prov;
176 
177 	struct dlist_entry	domain_list;
178 };
179 
180 int ofi_fabric_init(const struct fi_provider *prov,
181 		    const struct fi_fabric_attr *prov_attr,
182 		    const struct fi_fabric_attr *user_attr,
183 		    struct util_fabric *fabric, void *context);
184 int ofi_fabric_close(struct util_fabric *fabric);
185 int ofi_trywait(struct fid_fabric *fabric, struct fid **fids, int count);
186 
187 /*
188  * Domain
189  */
190 struct util_domain {
191 	struct fid_domain	domain_fid;
192 	struct dlist_entry	list_entry;
193 	struct util_fabric	*fabric;
194 	struct util_eq		*eq;
195 	fastlock_t		lock;
196 	ofi_atomic32_t		ref;
197 	const struct fi_provider *prov;
198 
199 	char			*name;
200 	uint64_t		info_domain_caps;
201 	uint64_t		info_domain_mode;
202 	int			mr_mode;
203 	uint32_t		addr_format;
204 	enum fi_av_type		av_type;
205 	struct ofi_mr_map	mr_map;
206 	enum fi_threading	threading;
207 	enum fi_progress	data_progress;
208 };
209 
210 int ofi_domain_init(struct fid_fabric *fabric_fid, const struct fi_info *info,
211 		     struct util_domain *domain, void *context);
212 int ofi_domain_bind_eq(struct util_domain *domain, struct util_eq *eq);
213 int ofi_domain_close(struct util_domain *domain);
214 
215 static const uint64_t ofi_rx_mr_flags[] = {
216 	[ofi_op_msg] = FI_RECV,
217 	[ofi_op_tagged] = FI_RECV,
218 	[ofi_op_read_req] = FI_REMOTE_READ,
219 	[ofi_op_write] = FI_REMOTE_WRITE,
220 	[ofi_op_atomic] = FI_REMOTE_WRITE,
221 	[ofi_op_atomic_fetch] = FI_REMOTE_WRITE | FI_REMOTE_READ,
222 	[ofi_op_atomic_compare] = FI_REMOTE_WRITE | FI_REMOTE_READ,
223 };
224 
ofi_rx_mr_reg_flags(uint32_t op,uint16_t atomic_op)225 static inline uint64_t ofi_rx_mr_reg_flags(uint32_t op, uint16_t atomic_op)
226 {
227 	if (atomic_op == FI_ATOMIC_READ)
228 		return FI_REMOTE_READ;
229 
230 	return ofi_rx_mr_flags[op];
231 }
232 
233 /*
234  * Passive Endpoint
235  */
236 
237 struct util_pep {
238 	struct fid_pep		pep_fid;
239 	struct util_fabric 	*fabric;
240 	struct util_eq 		*eq;
241 };
242 
243 int ofi_pep_init(struct fid_fabric *fabric,  struct fi_info *info,
244 		 struct util_pep *pep, void *context);
245 int ofi_pep_bind_eq(struct util_pep *pep, struct util_eq *eq, uint64_t flags);
246 int ofi_pep_close(struct util_pep *pep);
247 
248 /*
249  * Endpoint
250  */
251 
252 struct util_cntr;
253 struct util_ep;
254 typedef void (*ofi_ep_progress_func)(struct util_ep *util_ep);
255 typedef void (*ofi_cntr_inc_func)(struct util_cntr *util_cntr);
256 
257 struct util_ep {
258 	struct fid_ep		ep_fid;
259 	struct util_domain	*domain;
260 
261 	struct util_av		*av;
262 	struct dlist_entry	av_entry;
263 	struct util_eq		*eq;
264 	/* CQ entries */
265 	struct util_cq		*rx_cq;
266 	uint64_t		rx_op_flags;
267 	struct util_cq		*tx_cq;
268 	uint64_t		tx_op_flags;
269 	uint64_t		inject_op_flags;
270 
271 	/* flags to be ORed in to flags for *msg API calls
272 	 * to properly handle FI_SELECTIVE_COMPLETION bind */
273 	uint64_t		tx_msg_flags;
274 	uint64_t		rx_msg_flags;
275 
276 	/* CNTR entries */
277 	struct util_cntr	*tx_cntr;     /* transmit/send */
278 	struct util_cntr	*rx_cntr;     /* receive       */
279 	struct util_cntr	*rd_cntr;     /* read          */
280 	struct util_cntr	*wr_cntr;     /* write         */
281 	struct util_cntr	*rem_rd_cntr; /* remote read   */
282 	struct util_cntr	*rem_wr_cntr; /* remote write  */
283 
284 	ofi_cntr_inc_func	tx_cntr_inc;
285 	ofi_cntr_inc_func	rx_cntr_inc;
286 	ofi_cntr_inc_func	rd_cntr_inc;
287 	ofi_cntr_inc_func	wr_cntr_inc;
288 	ofi_cntr_inc_func	rem_rd_cntr_inc;
289 	ofi_cntr_inc_func	rem_wr_cntr_inc;
290 
291 	enum fi_ep_type		type;
292 	uint64_t		caps;
293 	uint64_t		flags;
294 	ofi_ep_progress_func	progress;
295 	fastlock_t		lock;
296 	ofi_fastlock_acquire_t	lock_acquire;
297 	ofi_fastlock_release_t	lock_release;
298 
299 	struct bitmask		*coll_cid_mask;
300 	struct slist		coll_ready_queue;
301 };
302 
303 int ofi_ep_bind_av(struct util_ep *util_ep, struct util_av *av);
304 int ofi_ep_bind_eq(struct util_ep *ep, struct util_eq *eq);
305 int ofi_ep_bind_cq(struct util_ep *ep, struct util_cq *cq, uint64_t flags);
306 int ofi_ep_bind_cntr(struct util_ep *ep, struct util_cntr *cntr, uint64_t flags);
307 int ofi_ep_bind(struct util_ep *util_ep, struct fid *fid, uint64_t flags);
308 int ofi_endpoint_init(struct fid_domain *domain, const struct util_prov *util_prov,
309 		      struct fi_info *info, struct util_ep *ep, void *context,
310 		      ofi_ep_progress_func progress);
311 
312 int ofi_endpoint_close(struct util_ep *util_ep);
313 
314 static inline int
ofi_ep_fid_bind(struct fid * ep_fid,struct fid * bfid,uint64_t flags)315 ofi_ep_fid_bind(struct fid *ep_fid, struct fid *bfid, uint64_t flags)
316 {
317 	return ofi_ep_bind(container_of(ep_fid, struct util_ep, ep_fid.fid),
318 			   bfid, flags);
319 }
320 
ofi_ep_lock_acquire(struct util_ep * ep)321 static inline void ofi_ep_lock_acquire(struct util_ep *ep)
322 {
323 	ep->lock_acquire(&ep->lock);
324 }
325 
ofi_ep_lock_release(struct util_ep * ep)326 static inline void ofi_ep_lock_release(struct util_ep *ep)
327 {
328 	ep->lock_release(&ep->lock);
329 }
330 
ofi_ep_tx_cntr_inc(struct util_ep * ep)331 static inline void ofi_ep_tx_cntr_inc(struct util_ep *ep)
332 {
333 	ep->tx_cntr_inc(ep->tx_cntr);
334 }
335 
ofi_ep_rx_cntr_inc(struct util_ep * ep)336 static inline void ofi_ep_rx_cntr_inc(struct util_ep *ep)
337 {
338 	ep->rx_cntr_inc(ep->rx_cntr);
339 }
340 
ofi_ep_rd_cntr_inc(struct util_ep * ep)341 static inline void ofi_ep_rd_cntr_inc(struct util_ep *ep)
342 {
343 	ep->rd_cntr_inc(ep->rd_cntr);
344 }
345 
ofi_ep_wr_cntr_inc(struct util_ep * ep)346 static inline void ofi_ep_wr_cntr_inc(struct util_ep *ep)
347 {
348 	ep->wr_cntr_inc(ep->wr_cntr);
349 }
350 
ofi_ep_rem_rd_cntr_inc(struct util_ep * ep)351 static inline void ofi_ep_rem_rd_cntr_inc(struct util_ep *ep)
352 {
353 	ep->rem_rd_cntr_inc(ep->rem_rd_cntr);
354 }
355 
ofi_ep_rem_wr_cntr_inc(struct util_ep * ep)356 static inline void ofi_ep_rem_wr_cntr_inc(struct util_ep *ep)
357 {
358 	ep->rem_wr_cntr_inc(ep->rem_wr_cntr);
359 }
360 
361 typedef void (*ofi_ep_cntr_inc_func)(struct util_ep *);
362 extern ofi_ep_cntr_inc_func ofi_ep_tx_cntr_inc_funcs[ofi_op_max];
363 extern ofi_ep_cntr_inc_func ofi_ep_rx_cntr_inc_funcs[ofi_op_max];
364 
ofi_ep_tx_cntr_inc_func(struct util_ep * ep,uint8_t op)365 static inline void ofi_ep_tx_cntr_inc_func(struct util_ep *ep, uint8_t op)
366 {
367 	assert(op < ofi_op_max);
368 	ofi_ep_tx_cntr_inc_funcs[op](ep);
369 }
370 
ofi_ep_rx_cntr_inc_func(struct util_ep * ep,uint8_t op)371 static inline void ofi_ep_rx_cntr_inc_func(struct util_ep *ep, uint8_t op)
372 {
373 	assert(op < ofi_op_max);
374 	ofi_ep_rx_cntr_inc_funcs[op](ep);
375 }
376 
377 /*
378  * Tag and address match
379  */
380 
ofi_match_addr(fi_addr_t recv_addr,fi_addr_t addr)381 static inline int ofi_match_addr(fi_addr_t recv_addr, fi_addr_t addr)
382 {
383 	return (recv_addr == FI_ADDR_UNSPEC) || (recv_addr == addr);
384 }
385 
ofi_match_tag(uint64_t recv_tag,uint64_t recv_ignore,uint64_t tag)386 static inline int ofi_match_tag(uint64_t recv_tag, uint64_t recv_ignore,
387 				uint64_t tag)
388 {
389 	return ((recv_tag | recv_ignore) == (tag | recv_ignore));
390 }
391 
392 /*
393  * Wait set
394  */
395 struct util_wait;
396 typedef void (*fi_wait_signal_func)(struct util_wait *wait);
397 typedef int (*fi_wait_try_func)(struct util_wait *wait);
398 
399 struct util_wait {
400 	struct fid_wait		wait_fid;
401 	struct util_fabric	*fabric;
402 	struct util_poll	*pollset;
403 	ofi_atomic32_t		ref;
404 	const struct fi_provider *prov;
405 
406 	enum fi_wait_obj	wait_obj;
407 	fi_wait_signal_func	signal;
408 	fi_wait_try_func	wait_try;
409 
410 	struct dlist_entry	fid_list;
411 	fastlock_t		lock;
412 };
413 
414 int ofi_wait_init(struct util_fabric *fabric, struct fi_wait_attr *attr,
415 		  struct util_wait *wait);
416 int fi_wait_cleanup(struct util_wait *wait);
417 
418 struct util_wait_fd {
419 	struct util_wait	util_wait;
420 	struct fd_signal	signal;
421 	struct dlist_entry	fd_list;
422 
423 	union {
424 		ofi_epoll_t		epoll_fd;
425 		struct ofi_pollfds	*pollfds;
426 	};
427 	uint64_t		change_index;
428 };
429 
430 typedef int (*ofi_wait_try_func)(void *arg);
431 
432 struct ofi_wait_fd_entry {
433 	struct dlist_entry	entry;
434 	int 			fd;
435 	ofi_wait_try_func	wait_try;
436 	void			*arg;
437 	ofi_atomic32_t		ref;
438 };
439 
440 struct ofi_wait_fid_entry {
441 	struct dlist_entry	entry;
442 	ofi_wait_try_func	wait_try;
443 	fid_t			fid;
444 	enum fi_wait_obj	wait_obj;
445 	uint32_t		events;
446 	ofi_atomic32_t		ref;
447 	struct fi_wait_pollfd	pollfds;
448 };
449 
450 int ofi_wait_fd_open(struct fid_fabric *fabric, struct fi_wait_attr *attr,
451 		struct fid_wait **waitset);
452 int ofi_wait_add_fd(struct util_wait *wait, int fd, uint32_t events,
453 		    ofi_wait_try_func wait_try, void *arg, void *context);
454 int ofi_wait_del_fd(struct util_wait *wait, int fd);
455 int ofi_wait_add_fid(struct util_wait *wat, fid_t fid, uint32_t events,
456 		     ofi_wait_try_func wait_try);
457 int ofi_wait_del_fid(struct util_wait *wait, fid_t fid);
458 
459 struct util_wait_yield {
460 	struct util_wait	util_wait;
461 	int			signal;
462 	fastlock_t		signal_lock;
463 };
464 
465 int ofi_wait_yield_open(struct fid_fabric *fabric, struct fi_wait_attr *attr,
466 			struct fid_wait **waitset);
467 
468 /*
469  * Completion queue
470  *
471  * Utility provider derived CQs that require manual progress must
472  * progress the CQ when fi_cq_read is called with a count = 0.
473  * In such cases, fi_cq_read will return 0 if there are available
474  * entries on the CQ.  This allows poll sets to drive progress
475  * without introducing private interfaces to the CQ.
476  */
477 
478 typedef void (*fi_cq_read_func)(void **dst, void *src);
479 
480 struct util_cq_oflow_err_entry {
481 	struct fi_cq_tagged_entry	*parent_comp;
482 	struct fi_cq_err_entry		comp;
483 	fi_addr_t			src;
484 	struct slist_entry		list_entry;
485 };
486 
487 OFI_DECLARE_CIRQUE(struct fi_cq_tagged_entry, util_comp_cirq);
488 
489 typedef void (*ofi_cq_progress_func)(struct util_cq *cq);
490 
491 struct util_cq {
492 	struct fid_cq		cq_fid;
493 	struct util_domain	*domain;
494 	struct util_wait	*wait;
495 	ofi_atomic32_t		ref;
496 	struct dlist_entry	ep_list;
497 	fastlock_t		ep_list_lock;
498 	fastlock_t		cq_lock;
499 	ofi_fastlock_acquire_t	cq_fastlock_acquire;
500 	ofi_fastlock_release_t	cq_fastlock_release;
501 
502 	struct util_comp_cirq	*cirq;
503 	fi_addr_t		*src;
504 
505 	struct slist		oflow_err_list;
506 	fi_cq_read_func		read_entry;
507 	int			internal_wait;
508 	ofi_atomic32_t		signaled;
509 	ofi_cq_progress_func	progress;
510 };
511 
512 int ofi_cq_init(const struct fi_provider *prov, struct fid_domain *domain,
513 		 struct fi_cq_attr *attr, struct util_cq *cq,
514 		 ofi_cq_progress_func progress, void *context);
515 int ofi_check_bind_cq_flags(struct util_ep *ep, struct util_cq *cq,
516 			    uint64_t flags);
517 void ofi_cq_progress(struct util_cq *cq);
518 int ofi_cq_cleanup(struct util_cq *cq);
519 int ofi_cq_control(struct fid *fid, int command, void *arg);
520 ssize_t ofi_cq_read(struct fid_cq *cq_fid, void *buf, size_t count);
521 ssize_t ofi_cq_readfrom(struct fid_cq *cq_fid, void *buf, size_t count,
522 		fi_addr_t *src_addr);
523 ssize_t ofi_cq_readerr(struct fid_cq *cq_fid, struct fi_cq_err_entry *buf,
524 		uint64_t flags);
525 ssize_t ofi_cq_sread(struct fid_cq *cq_fid, void *buf, size_t count,
526 		const void *cond, int timeout);
527 ssize_t ofi_cq_sreadfrom(struct fid_cq *cq_fid, void *buf, size_t count,
528 		fi_addr_t *src_addr, const void *cond, int timeout);
529 int ofi_cq_signal(struct fid_cq *cq_fid);
530 
531 int ofi_cq_write_overflow(struct util_cq *cq, void *context, uint64_t flags, size_t len,
532 			  void *buf, uint64_t data, uint64_t tag, fi_addr_t src);
533 
util_cq_signal(struct util_cq * cq)534 static inline void util_cq_signal(struct util_cq *cq)
535 {
536 	assert(cq->wait);
537 	cq->wait->signal(cq->wait);
538 }
539 
540 static inline void
ofi_cq_write_comp_entry(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag)541 ofi_cq_write_comp_entry(struct util_cq *cq, void *context, uint64_t flags,
542 			size_t len, void *buf, uint64_t data, uint64_t tag)
543 {
544 	struct fi_cq_tagged_entry *comp = ofi_cirque_tail(cq->cirq);
545 	comp->op_context = context;
546 	comp->flags = flags;
547 	comp->len = len;
548 	comp->buf = buf;
549 	comp->data = data;
550 	comp->tag = tag;
551 	ofi_cirque_commit(cq->cirq);
552 }
553 
554 static inline int
ofi_cq_write_thread_unsafe(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag)555 ofi_cq_write_thread_unsafe(struct util_cq *cq, void *context, uint64_t flags,
556 			   size_t len, void *buf, uint64_t data, uint64_t tag)
557 {
558 	if (OFI_UNLIKELY(ofi_cirque_isfull(cq->cirq))) {
559 		FI_DBG(cq->domain->prov, FI_LOG_CQ,
560 		       "util_cq cirq is full!\n");
561 		return ofi_cq_write_overflow(cq, context, flags, len,
562 					     buf, data, tag, 0);
563 	}
564 	ofi_cq_write_comp_entry(cq, context, flags, len, buf, data, tag);
565 	return 0;
566 }
567 
568 static inline int
ofi_cq_write(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag)569 ofi_cq_write(struct util_cq *cq, void *context, uint64_t flags, size_t len,
570 	     void *buf, uint64_t data, uint64_t tag)
571 {
572 	int ret;
573 	cq->cq_fastlock_acquire(&cq->cq_lock);
574 	ret = ofi_cq_write_thread_unsafe(cq, context, flags, len, buf, data, tag);
575 	cq->cq_fastlock_release(&cq->cq_lock);
576 	return ret;
577 }
578 
579 static inline int
ofi_cq_write_src_thread_unsafe(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,fi_addr_t src)580 ofi_cq_write_src_thread_unsafe(struct util_cq *cq, void *context, uint64_t flags, size_t len,
581 			       void *buf, uint64_t data, uint64_t tag, fi_addr_t src)
582 {
583 	if (OFI_UNLIKELY(ofi_cirque_isfull(cq->cirq))) {
584 		FI_DBG(cq->domain->prov, FI_LOG_CQ,
585 		       "util_cq cirq is full!\n");
586 		return ofi_cq_write_overflow(cq, context, flags, len,
587 					     buf, data, tag, src);
588 	}
589 	cq->src[ofi_cirque_windex(cq->cirq)] = src;
590 	ofi_cq_write_comp_entry(cq, context, flags, len, buf, data, tag);
591 	return 0;
592 }
593 
594 static inline int
ofi_cq_write_src(struct util_cq * cq,void * context,uint64_t flags,size_t len,void * buf,uint64_t data,uint64_t tag,fi_addr_t src)595 ofi_cq_write_src(struct util_cq *cq, void *context, uint64_t flags, size_t len,
596 		 void *buf, uint64_t data, uint64_t tag, fi_addr_t src)
597 {
598 	int ret;
599 	cq->cq_fastlock_acquire(&cq->cq_lock);
600 	ret = ofi_cq_write_src_thread_unsafe(cq, context, flags, len,
601 					     buf, data, tag, src);
602 	cq->cq_fastlock_release(&cq->cq_lock);
603 	return ret;
604 }
605 
606 int ofi_cq_write_error(struct util_cq *cq,
607 		       const struct fi_cq_err_entry *err_entry);
608 int ofi_cq_write_error_peek(struct util_cq *cq, uint64_t tag, void *context);
609 int ofi_cq_write_error_trunc(struct util_cq *cq, void *context, uint64_t flags,
610 			     size_t len, void *buf, uint64_t data, uint64_t tag,
611 			     size_t olen);
612 
ofi_need_completion(uint64_t cq_flags,uint64_t op_flags)613 static inline int ofi_need_completion(uint64_t cq_flags, uint64_t op_flags)
614 {
615 	return (!(cq_flags & FI_SELECTIVE_COMPLETION) ||
616 		(op_flags & (FI_COMPLETION | FI_INJECT_COMPLETE |
617 			     FI_TRANSMIT_COMPLETE | FI_DELIVERY_COMPLETE)));
618 }
619 
620 extern uint64_t ofi_rx_flags[ofi_op_max];
621 extern uint64_t ofi_tx_flags[ofi_op_max];
622 
ofi_rx_cq_flags(uint32_t op)623 static inline uint64_t ofi_rx_cq_flags(uint32_t op)
624 {
625 	return ofi_rx_flags[op];
626 }
627 
ofi_tx_cq_flags(uint32_t op)628 static inline uint64_t ofi_tx_cq_flags(uint32_t op)
629 {
630 	return ofi_tx_flags[op];
631 }
632 
633 /*
634  * Counter
635  */
636 typedef void (*ofi_cntr_progress_func)(struct util_cntr *cntr);
637 
638 struct util_cntr {
639 	struct fid_cntr		cntr_fid;
640 	struct util_domain	*domain;
641 	struct util_wait	*wait;
642 	ofi_atomic32_t		ref;
643 
644 	ofi_atomic64_t		cnt;
645 	ofi_atomic64_t		err;
646 
647 	uint64_t		checkpoint_cnt;
648 	uint64_t		checkpoint_err;
649 
650 	struct dlist_entry	ep_list;
651 	fastlock_t		ep_list_lock;
652 
653 	int			internal_wait;
654 	ofi_cntr_progress_func	progress;
655 };
656 
657 void ofi_cntr_progress(struct util_cntr *cntr);
658 int ofi_cntr_init(const struct fi_provider *prov, struct fid_domain *domain,
659 		  struct fi_cntr_attr *attr, struct util_cntr *cntr,
660 		  ofi_cntr_progress_func progress, void *context);
661 int ofi_cntr_cleanup(struct util_cntr *cntr);
util_cntr_signal(struct util_cntr * cntr)662 static inline void util_cntr_signal(struct util_cntr *cntr)
663 {
664 	assert(cntr->wait);
665 	cntr->wait->signal(cntr->wait);
666 }
667 
ofi_cntr_inc_noop(struct util_cntr * cntr)668 static inline void ofi_cntr_inc_noop(struct util_cntr *cntr)
669 {
670 	OFI_UNUSED(cntr);
671 }
672 
ofi_cntr_inc(struct util_cntr * cntr)673 static inline void ofi_cntr_inc(struct util_cntr *cntr)
674 {
675 	cntr->cntr_fid.ops->add(&cntr->cntr_fid, 1);
676 }
677 
678 /*
679  * AV / addressing
680  */
681 
682 struct util_av_entry {
683 	ofi_atomic32_t	use_cnt;
684 	UT_hash_handle	hh;
685 	char		addr[0];
686 };
687 
688 struct util_av {
689 	struct fid_av		av_fid;
690 	struct util_domain	*domain;
691 	struct util_eq		*eq;
692 	ofi_atomic32_t		ref;
693 	fastlock_t		lock;
694 	const struct fi_provider *prov;
695 
696 	struct util_av_entry	*hash;
697 	struct ofi_bufpool	*av_entry_pool;
698 
699 	struct util_coll_mc	*coll_mc;
700 	void			*context;
701 	uint64_t		flags;
702 	size_t			count;
703 	size_t			addrlen;
704 	struct dlist_entry	ep_list;
705 	fastlock_t		ep_list_lock;
706 };
707 
708 struct util_av_attr {
709 	size_t	addrlen;
710 	int	flags;
711 };
712 
713 typedef int (*ofi_av_apply_func)(struct util_av *av, void *addr,
714 				 fi_addr_t fi_addr, void *arg);
715 
716 int ofi_av_init(struct util_domain *domain,
717 	       const struct fi_av_attr *attr, const struct util_av_attr *util_attr,
718 	       struct util_av *av, void *context);
719 int ofi_av_init_lightweight(struct util_domain *domain, const struct fi_av_attr *attr,
720 			    struct util_av *av, void *context);
721 int ofi_av_close(struct util_av *av);
722 int ofi_av_close_lightweight(struct util_av *av);
723 
724 int ofi_av_insert_addr(struct util_av *av, const void *addr, fi_addr_t *fi_addr);
725 int ofi_av_remove_addr(struct util_av *av, fi_addr_t fi_addr);
726 fi_addr_t ofi_av_lookup_fi_addr_unsafe(struct util_av *av, const void *addr);
727 fi_addr_t ofi_av_lookup_fi_addr(struct util_av *av, const void *addr);
728 int ofi_av_elements_iter(struct util_av *av, ofi_av_apply_func apply, void *arg);
729 int ofi_av_bind(struct fid *av_fid, struct fid *eq_fid, uint64_t flags);
730 void ofi_av_write_event(struct util_av *av, uint64_t data,
731 			int err, void *context);
732 
733 int ofi_ip_av_create(struct fid_domain *domain_fid, struct fi_av_attr *attr,
734 		     struct fid_av **av, void *context);
735 int ofi_ip_av_create_flags(struct fid_domain *domain_fid, struct fi_av_attr *attr,
736 			   struct fid_av **av, void *context, int flags);
737 
738 void *ofi_av_get_addr(struct util_av *av, fi_addr_t fi_addr);
739 #define ofi_ip_av_get_addr ofi_av_get_addr
740 fi_addr_t ofi_ip_av_get_fi_addr(struct util_av *av, const void *addr);
741 
742 int ofi_get_addr(uint32_t *addr_format, uint64_t flags,
743 		 const char *node, const char *service,
744 		 void **addr, size_t *addrlen);
745 int ofi_get_src_addr(uint32_t addr_format,
746 		     const void *dest_addr, size_t dest_addrlen,
747 		     void **src_addr, size_t *src_addrlen);
748 void ofi_getnodename(uint16_t sa_family, char *buf, int buflen);
749 int ofi_av_get_index(struct util_av *av, const void *addr);
750 
751 int ofi_verify_av_insert(struct util_av *av, uint64_t flags);
752 int ofi_ip_av_insertv(struct util_av *av, const void *addr, size_t addrlen,
753 		      size_t count, fi_addr_t *fi_addr, void *context);
754 /* Caller should free *addr */
755 int ofi_ip_av_sym_getaddr(struct util_av *av, const char *node,
756 			  size_t nodecnt, const char *service,
757 			  size_t svccnt, void **addr, size_t *addrlen);
758 int ofi_ip_av_insert(struct fid_av *av_fid, const void *addr, size_t count,
759 		     fi_addr_t *fi_addr, uint64_t flags, void *context);
760 int ofi_ip_av_remove(struct fid_av *av_fid, fi_addr_t *fi_addr,
761 		     size_t count, uint64_t flags);
762 int ofi_ip_av_lookup(struct fid_av *av_fid, fi_addr_t fi_addr,
763 		     void *addr, size_t *addrlen);
764 const char *
765 ofi_ip_av_straddr(struct fid_av *av, const void *addr, char *buf, size_t *len);
766 
767 /*
768  * Poll set
769  */
770 struct util_poll {
771 	struct fid_poll		poll_fid;
772 	struct util_domain	*domain;
773 	struct dlist_entry	fid_list;
774 	fastlock_t		lock;
775 	ofi_atomic32_t		ref;
776 	const struct fi_provider *prov;
777 };
778 
779 int fi_poll_create_(const struct fi_provider *prov, struct fid_domain *domain,
780 		    struct fi_poll_attr *attr, struct fid_poll **pollset);
781 int fi_poll_create(struct fid_domain *domain, struct fi_poll_attr *attr,
782 		   struct fid_poll **pollset);
783 
784 /*
785  * EQ
786  */
787 struct util_eq {
788 	struct fid_eq		eq_fid;
789 	struct util_fabric	*fabric;
790 	struct util_wait	*wait;
791 	fastlock_t		lock;
792 	ofi_atomic32_t		ref;
793 	const struct fi_provider *prov;
794 
795 	struct slist		list;
796 	/* This contains error data that are read by user and need to
797 	 * be freed in subsequent fi_eq_readerr call against the EQ */
798 	void			*saved_err_data;
799 	int			internal_wait;
800 };
801 
802 struct util_event {
803 	struct slist_entry	entry;
804 	int			size;
805 	int			event;
806 	int			err;
807 	uint8_t			data[0];
808 };
809 
810 int ofi_eq_create(struct fid_fabric *fabric, struct fi_eq_attr *attr,
811 		 struct fid_eq **eq_fid, void *context);
812 int ofi_eq_init(struct fid_fabric *fabric_fid, struct fi_eq_attr *attr,
813 		struct fid_eq *eq_fid, void *context);
814 int ofi_eq_control(struct fid *fid, int command, void *arg);
815 int ofi_eq_cleanup(struct fid *fid);
816 void ofi_eq_remove_fid_events(struct util_eq *eq, fid_t fid);
817 void ofi_eq_handle_err_entry(uint32_t api_version, uint64_t flags,
818 			     struct fi_eq_err_entry *err_entry,
819 			     struct fi_eq_err_entry *user_err_entry);
820 ssize_t ofi_eq_read(struct fid_eq *eq_fid, uint32_t *event,
821 		    void *buf, size_t len, uint64_t flags);
822 ssize_t ofi_eq_sread(struct fid_eq *eq_fid, uint32_t *event, void *buf,
823 		     size_t len, int timeout, uint64_t flags);
824 ssize_t ofi_eq_readerr(struct fid_eq *eq_fid, struct fi_eq_err_entry *buf,
825 		       uint64_t flags);
826 ssize_t ofi_eq_write(struct fid_eq *eq_fid, uint32_t event,
827 		     const void *buf, size_t len, uint64_t flags);
828 const char *ofi_eq_strerror(struct fid_eq *eq_fid, int prov_errno,
829 			    const void *err_data, char *buf, size_t len);
830 
831 /*
832 
833 #define OFI_MR_MODE_RMA_TARGET (FI_MR_RAW | FI_MR_VIRT_ADDR |\
834 				 FI_MR_PROV_KEY | FI_MR_RMA_EVENT)
835  * Attributes and capabilities
836  */
837 #define FI_PRIMARY_CAPS	(FI_MSG | FI_RMA | FI_TAGGED | FI_ATOMICS | FI_MULTICAST | \
838 			 FI_NAMED_RX_CTX | FI_DIRECTED_RECV | \
839 			 FI_READ | FI_WRITE | FI_RECV | FI_SEND | \
840 			 FI_REMOTE_READ | FI_REMOTE_WRITE | FI_COLLECTIVE | \
841 			 FI_HMEM)
842 
843 #define FI_SECONDARY_CAPS (FI_MULTI_RECV | FI_SOURCE | FI_RMA_EVENT | \
844 			   FI_SHARED_AV | FI_TRIGGER | FI_FENCE | \
845 			   FI_LOCAL_COMM | FI_REMOTE_COMM)
846 
847 #define OFI_TX_MSG_CAPS (FI_MSG | FI_SEND)
848 #define OFI_RX_MSG_CAPS (FI_MSG | FI_RECV)
849 #define OFI_TX_RMA_CAPS (FI_RMA | FI_READ | FI_WRITE)
850 #define OFI_RX_RMA_CAPS (FI_RMA | FI_REMOTE_READ | FI_REMOTE_WRITE)
851 
852 int ofi_check_ep_type(const struct fi_provider *prov,
853 		      const struct fi_ep_attr *prov_attr,
854 		      const struct fi_ep_attr *user_attr);
855 int ofi_check_mr_mode(const struct fi_provider *prov, uint32_t api_version,
856 		      int prov_mode, const struct fi_info *user_info);
857 int ofi_check_fabric_attr(const struct fi_provider *prov,
858 			  const struct fi_fabric_attr *prov_attr,
859 			  const struct fi_fabric_attr *user_attr);
860 int ofi_check_wait_attr(const struct fi_provider *prov,
861 		        const struct fi_wait_attr *attr);
862 int ofi_check_domain_attr(const struct fi_provider *prov, uint32_t api_version,
863 			  const struct fi_domain_attr *prov_attr,
864 			  const struct fi_info *user_info);
865 int ofi_check_ep_attr(const struct util_prov *util_prov, uint32_t api_version,
866 		      const struct fi_info *prov_info,
867 		      const struct fi_info *user_info);
868 int ofi_check_cq_attr(const struct fi_provider *prov,
869 		      const struct fi_cq_attr *attr);
870 int ofi_check_rx_attr(const struct fi_provider *prov,
871 		      const struct fi_info *prov_info,
872 		      const struct fi_rx_attr *user_attr, uint64_t info_mode);
873 int ofi_check_tx_attr(const struct fi_provider *prov,
874 		      const struct fi_tx_attr *prov_attr,
875 		      const struct fi_tx_attr *user_attr, uint64_t info_mode);
876 int ofi_check_attr_subset(const struct fi_provider *prov,
877 			uint64_t base_caps, uint64_t requested_caps);
878 int ofi_prov_check_info(const struct util_prov *util_prov,
879 			uint32_t api_version,
880 			const struct fi_info *user_info);
881 int ofi_prov_check_dup_info(const struct util_prov *util_prov,
882 			    uint32_t api_version,
883 			    const struct fi_info *user_info,
884 			    struct fi_info **info);
885 static inline uint64_t
ofi_pick_core_flags(uint64_t all_util_flags,uint64_t all_core_flags,uint64_t use_core_flags)886 ofi_pick_core_flags(uint64_t all_util_flags, uint64_t all_core_flags,
887 		    uint64_t use_core_flags)
888 {
889 	return (all_util_flags & ~use_core_flags) |
890 	       (all_core_flags & use_core_flags);
891 }
892 
893 int ofi_check_info(const struct util_prov *util_prov,
894 		   const struct fi_info *prov_info, uint32_t api_version,
895 		   const struct fi_info *user_info);
896 void ofi_alter_info(struct fi_info *info, const struct fi_info *hints,
897 		    uint32_t api_version);
898 
899 struct fi_info *ofi_allocinfo_internal(void);
900 int util_getinfo(const struct util_prov *util_prov, uint32_t version,
901 		 const char *node, const char *service, uint64_t flags,
902 		 const struct fi_info *hints, struct fi_info **info);
903 int ofi_ip_getinfo(const struct util_prov *prov, uint32_t version,
904 		   const char *node, const char *service, uint64_t flags,
905 		   const struct fi_info *hints, struct fi_info **info);
906 
907 
908 struct fid_list_entry {
909 	struct dlist_entry	entry;
910 	struct fid		*fid;
911 };
912 
913 int fid_list_insert(struct dlist_entry *fid_list, fastlock_t *lock,
914 		    struct fid *fid);
915 void fid_list_remove(struct dlist_entry *fid_list, fastlock_t *lock,
916 		     struct fid *fid);
917 
918 void ofi_fabric_insert(struct util_fabric *fabric);
919 void ofi_fabric_remove(struct util_fabric *fabric);
920 
921 /*
922  * Utility Providers
923  */
924 
925 #define OFI_NAME_DELIM	';'
926 #define OFI_UTIL_PREFIX "ofi_"
927 
ofi_has_util_prefix(const char * str)928 static inline int ofi_has_util_prefix(const char *str)
929 {
930 	return !strncasecmp(str, OFI_UTIL_PREFIX, strlen(OFI_UTIL_PREFIX));
931 }
932 
933 typedef int (*ofi_alter_info_t)(uint32_t version, const struct fi_info *src_info,
934 				struct fi_info *dest_info);
935 
936 int ofi_get_core_info(uint32_t version, const char *node, const char *service,
937 		      uint64_t flags, const struct util_prov *util_prov,
938 		      const struct fi_info *util_hints, ofi_alter_info_t info_to_core,
939 		      struct fi_info **core_info);
940 int ofix_getinfo(uint32_t version, const char *node, const char *service,
941 		 uint64_t flags, const struct util_prov *util_prov,
942 		 const struct fi_info *hints, ofi_alter_info_t info_to_core,
943 		 ofi_alter_info_t info_to_util, struct fi_info **info);
944 int ofi_get_core_info_fabric(const struct fi_provider *prov,
945 			     const struct fi_fabric_attr *util_attr,
946 			     struct fi_info **core_info);
947 
948 
949 char *ofi_strdup_append(const char *head, const char *tail);
950 // char *ofi_strdup_head(const char *str);
951 // char *ofi_strdup_tail(const char *str);
952 int ofi_exclude_prov_name(char **prov_name, const char *util_prov_name);
953 
954 
955 int ofi_shm_map(struct util_shm *shm, const char *name, size_t size,
956 		int readonly, void **mapped);
957 int ofi_shm_unmap(struct util_shm *shm);
958 
959 /*
960  * Name Server TODO: add support for Windows OS
961  * (osd/windows/pthread.h should be extended)
962  */
963 
964 typedef int(*ofi_ns_service_cmp_func_t)(void *svc1, void *svc2);
965 typedef int(*ofi_ns_is_service_wildcard_func_t)(void *svc);
966 
967 struct util_ns {
968 	SOCKET		listen_sock;
969 	pthread_t	thread;
970 	RbtHandle	map;
971 
972 	char		*hostname;
973 	int		port;
974 
975 	size_t		name_len;
976 	size_t		service_len;
977 
978 	int		run;
979 	int		is_initialized;
980 	ofi_atomic32_t	ref;
981 
982 	ofi_ns_service_cmp_func_t	service_cmp;
983 	ofi_ns_is_service_wildcard_func_t is_service_wildcard;
984 };
985 
986 void ofi_ns_init(struct util_ns *ns);
987 int ofi_ns_start_server(struct util_ns *ns);
988 void ofi_ns_stop_server(struct util_ns *ns);
989 
990 int ofi_ns_add_local_name(struct util_ns *ns, void *service, void *name);
991 int ofi_ns_del_local_name(struct util_ns *ns, void *service, void *name);
992 void *ofi_ns_resolve_name(struct util_ns *ns, const char *server,
993 			  void *service);
994 
995 #ifdef __cplusplus
996 }
997 #endif
998 
999 #endif /* _OFI_UTIL_H_ */
1000