1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019.  ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6 
7 #ifndef UCT_IFACE_H_
8 #define UCT_IFACE_H_
9 
10 #include "uct_worker.h"
11 
12 #include <uct/api/uct.h>
13 #include <uct/base/uct_component.h>
14 #include <ucs/config/parser.h>
15 #include <ucs/datastruct/arbiter.h>
16 #include <ucs/datastruct/mpool.h>
17 #include <ucs/datastruct/queue.h>
18 #include <ucs/debug/log.h>
19 #include <ucs/stats/stats.h>
20 #include <ucs/sys/compiler.h>
21 #include <ucs/sys/sys.h>
22 #include <ucs/type/class.h>
23 
24 #include <ucs/datastruct/mpool.inl>
25 
26 
27 enum {
28     UCT_EP_STAT_AM,
29     UCT_EP_STAT_PUT,
30     UCT_EP_STAT_GET,
31     UCT_EP_STAT_ATOMIC,
32 #if IBV_HW_TM
33     UCT_EP_STAT_TAG,
34 #endif
35     UCT_EP_STAT_BYTES_SHORT,
36     UCT_EP_STAT_BYTES_BCOPY,
37     UCT_EP_STAT_BYTES_ZCOPY,
38     UCT_EP_STAT_NO_RES,
39     UCT_EP_STAT_FLUSH,
40     UCT_EP_STAT_FLUSH_WAIT,  /* number of times flush called while in progress */
41     UCT_EP_STAT_PENDING,
42     UCT_EP_STAT_FENCE,
43     UCT_EP_STAT_LAST
44 };
45 
46 enum {
47     UCT_IFACE_STAT_RX_AM,
48     UCT_IFACE_STAT_RX_AM_BYTES,
49     UCT_IFACE_STAT_TX_NO_DESC,
50     UCT_IFACE_STAT_FLUSH,
51     UCT_IFACE_STAT_FLUSH_WAIT,  /* number of times flush called while in progress */
52     UCT_IFACE_STAT_FENCE,
53     UCT_IFACE_STAT_LAST
54 };
55 
56 
57 /*
58  * Statistics macros
59  */
60 #define UCT_TL_EP_STAT_OP(_ep, _op, _method, _size) \
61     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_##_op, 1); \
62     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_BYTES_##_method, _size);
63 #define UCT_TL_EP_STAT_OP_IF_SUCCESS(_status, _ep, _op, _method, _size) \
64     if (_status >= 0) { \
65         UCT_TL_EP_STAT_OP(_ep, _op, _method, _size) \
66     }
67 #define UCT_TL_EP_STAT_ATOMIC(_ep) \
68     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_ATOMIC, 1);
69 #define UCT_TL_EP_STAT_FLUSH(_ep) \
70     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FLUSH, 1);
71 #define UCT_TL_EP_STAT_FLUSH_WAIT(_ep) \
72     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FLUSH_WAIT, 1);
73 #define UCT_TL_EP_STAT_FENCE(_ep) \
74     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FENCE, 1);
75 #define UCT_TL_EP_STAT_PEND(_ep) \
76     UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_PENDING, 1);
77 
78 #define UCT_TL_IFACE_STAT_FLUSH(_iface) \
79     UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FLUSH, 1);
80 #define UCT_TL_IFACE_STAT_FLUSH_WAIT(_iface) \
81     UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FLUSH_WAIT, 1);
82 #define UCT_TL_IFACE_STAT_FENCE(_iface) \
83     UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FENCE, 1);
84 #define UCT_TL_IFACE_STAT_TX_NO_DESC(_iface) \
85     UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_TX_NO_DESC, 1);
86 
87 
88 #define UCT_CB_FLAGS_CHECK(_flags) \
89     do { \
90         if ((_flags) & UCT_CB_FLAG_RESERVED) { \
91             ucs_error("Unsupported callback flag 0x%x", UCT_CB_FLAG_RESERVED); \
92             return UCS_ERR_INVALID_PARAM; \
93         } \
94     } while (0)
95 
96 
97 /**
98  * In release mode - do nothing.
99  *
100  * In debug mode, if _condition is not true, return an error. This could be less
101  * optimal because of additional checks, and that compiler needs to generate code
102  * for error flow as well.
103  */
104 #define UCT_CHECK_PARAM(_condition, _err_message, ...) \
105     if (ENABLE_PARAMS_CHECK && !(_condition)) { \
106         ucs_error(_err_message, ## __VA_ARGS__); \
107         return UCS_ERR_INVALID_PARAM; \
108     }
109 
110 
111 /**
112  * In release mode - do nothing.
113  *
114  * In debug mode, if @a _params field mask does not have set
115  * @ref UCT_EP_PARAM_FIELD_DEV_ADDR and @ref UCT_EP_PARAM_FIELD_IFACE_ADDR
116  * flags, return an error.
117  */
118 #define UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(_params) \
119     UCT_CHECK_PARAM(ucs_test_all_flags((_params)->field_mask, \
120                                        UCT_EP_PARAM_FIELD_DEV_ADDR | \
121                                        UCT_EP_PARAM_FIELD_IFACE_ADDR), \
122                     "UCT_EP_PARAM_FIELD_DEV_ADDR and UCT_EP_PARAM_FIELD_IFACE_ADDR are not defined")
123 
124 
125 #define UCT_EP_PARAMS_GET_PATH_INDEX(_params) \
126     (((_params)->field_mask & UCT_EP_PARAM_FIELD_PATH_INDEX) ? \
127      (_params)->path_index : 0)
128 
129 /**
130  * Check the condition and return status as a pointer if not true.
131  */
132 #define UCT_CHECK_PARAM_PTR(_condition, _err_message, ...) \
133     if (ENABLE_PARAMS_CHECK && !(_condition)) { \
134         ucs_error(_err_message, ## __VA_ARGS__); \
135         return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM); \
136     }
137 
138 
139 /**
140  * Check the size of the IOV array
141  */
142 #define UCT_CHECK_IOV_SIZE(_iovcnt, _max_iov, _name) \
143     UCT_CHECK_PARAM((_iovcnt) <= (_max_iov), \
144                     "iovcnt(%lu) should be limited by %lu in %s", \
145                     _iovcnt, _max_iov, _name)
146 
147 
148 /**
149  * In debug mode, if _condition is not true, generate 'Invalid length' error.
150  */
151 #define UCT_CHECK_LENGTH(_length, _min_length, _max_length, _name) \
152     { \
153         typeof(_length) __length = _length; \
154         UCT_CHECK_PARAM((_length) <= (_max_length), \
155                         "Invalid %s length: %zu (expected: <= %zu)", \
156                         _name, (size_t)(__length), (size_t)(_max_length)); \
157         UCT_CHECK_PARAM((ssize_t)(_length) >= (_min_length), \
158                         "Invalid %s length: %zu (expected: >= %zu)", \
159                         _name, (size_t)(__length), (size_t)(_min_length)); \
160     }
161 
162 /**
163  * Skip if this is a zero-length operation.
164  */
165 #define UCT_SKIP_ZERO_LENGTH(_length, ...) \
166     if (0 == (_length)) { \
167         ucs_trace_data("Zero length request: skip it"); \
168         UCS_PP_FOREACH(_UCT_RELEASE_DESC, _, __VA_ARGS__)  \
169         return UCS_OK; \
170     }
171 #define _UCT_RELEASE_DESC(_, _desc) \
172     ucs_mpool_put(_desc);
173 
174 
175 /**
176  * In debug mode, check that active message ID is valid.
177  */
178 #define UCT_CHECK_AM_ID(_am_id) \
179     UCT_CHECK_PARAM((_am_id) < UCT_AM_ID_MAX, \
180                     "Invalid active message id (valid range: 0..%d)", \
181                     (int)UCT_AM_ID_MAX - 1)
182 
183 
184 /**
185  * Declare classes for structures defined in api/tl.h
186  */
187 UCS_CLASS_DECLARE(uct_iface_h, uct_iface_ops_t, uct_md_h);
188 UCS_CLASS_DECLARE(uct_ep_t, uct_iface_h);
189 
190 
191 /**
192  * Active message handle table entry
193  */
194 typedef struct uct_am_handler {
195     uct_am_callback_t cb;
196     void              *arg;
197     uint32_t          flags;
198 } uct_am_handler_t;
199 
200 
201 /**
202  * Base structure of all interfaces.
203  * Includes the AM table which we don't want to expose.
204  */
205 typedef struct uct_base_iface {
206     uct_iface_t             super;
207     uct_md_h                md;               /* MD this interface is using */
208     uct_priv_worker_t       *worker;          /* Worker this interface is on */
209     uct_am_handler_t        am[UCT_AM_ID_MAX];/* Active message table */
210     uct_am_tracer_t         am_tracer;        /* Active message tracer */
211     void                    *am_tracer_arg;   /* Tracer argument */
212     uct_error_handler_t     err_handler;      /* Error handler */
213     void                    *err_handler_arg; /* Error handler argument */
214     uint32_t                err_handler_flags; /* Error handler callback flags */
215     uct_worker_progress_t   prog;             /* Will be removed once all transports
216                                                  support progress control */
217     unsigned                progress_flags;   /* Which progress is currently enabled */
218 
219     struct {
220         unsigned            num_alloc_methods;
221         uct_alloc_method_t  alloc_methods[UCT_ALLOC_METHOD_LAST];
222         ucs_log_level_t     failure_level;
223         size_t              max_num_eps;
224     } config;
225 
226     UCS_STATS_NODE_DECLARE(stats)            /* Statistics */
227 } uct_base_iface_t;
228 
229 UCS_CLASS_DECLARE(uct_base_iface_t, uct_iface_ops_t*,  uct_md_h, uct_worker_h,
230                   const uct_iface_params_t*, const uct_iface_config_t*
231                   UCS_STATS_ARG(ucs_stats_node_t*) UCS_STATS_ARG(const char*));
232 
233 
234 /**
235  * Stub interface used for failed endpoints
236  */
237 typedef struct uct_failed_iface {
238     uct_iface_t       super;
239     ucs_queue_head_t  pend_q;
240 } uct_failed_iface_t;
241 
242 
243 /**
244  * Base structure of all endpoints.
245  */
246 typedef struct uct_base_ep {
247     uct_ep_t          super;
248     UCS_STATS_NODE_DECLARE(stats)
249 } uct_base_ep_t;
250 UCS_CLASS_DECLARE(uct_base_ep_t, uct_base_iface_t*);
251 
252 
253 /**
254  * Internal resource descriptor of a transport device
255  */
256 typedef struct uct_tl_device_resource {
257     char                     name[UCT_DEVICE_NAME_MAX]; /**< Hardware device name */
258     uct_device_type_t        type;       /**< The device represented by this resource
259                                               (e.g. UCT_DEVICE_TYPE_NET for a network interface) */
260     ucs_sys_device_t         sys_device; /**< The identifier associated with the device
261                                               bus_id as captured in ucs_sys_bus_id_t struct */
262 } uct_tl_device_resource_t;
263 
264 
265 /**
266  * UCT transport definition. This structure should not be used directly; use
267  * @ref UCT_TL_DEFINE macro to define a transport.
268  */
269 typedef struct uct_tl {
270     char                   name[UCT_TL_NAME_MAX]; /**< Transport name */
271 
272     ucs_status_t           (*query_devices)(uct_md_h md,
273                                             uct_tl_device_resource_t **tl_devices_p,
274                                             unsigned *num_tl_devices_p);
275 
276     ucs_status_t           (*iface_open)(uct_md_h md, uct_worker_h worker,
277                                          const uct_iface_params_t *params,
278                                          const uct_iface_config_t *config,
279                                          uct_iface_h *iface_p);
280 
281     ucs_config_global_list_entry_t config; /**< Transport configuration entry */
282     ucs_list_link_t                list;   /**< Entry in component's transports list */
283 } uct_tl_t;
284 
285 
286 /**
287  * Define a transport
288  *
289  * @param _component      Component to add the transport to
290  * @param _name           Name of the transport (should be a token, not a string)
291  * @param _query_devices  Function to query the list of available devices
292  * @param _iface_class    Struct type defining the uct_iface class
293  */
294 #define UCT_TL_DEFINE(_component, _name, _query_devices, _iface_class, \
295                       _cfg_prefix, _cfg_table, _cfg_struct) \
296     \
297     uct_tl_t uct_##_name##_tl = { \
298         .name               = #_name, \
299         .query_devices      = _query_devices, \
300         .iface_open         = UCS_CLASS_NEW_FUNC_NAME(_iface_class), \
301         .config = { \
302             .name           = #_name" transport", \
303             .prefix         = _cfg_prefix, \
304             .table          = _cfg_table, \
305             .size           = sizeof(_cfg_struct), \
306          } \
307     }; \
308     UCS_CONFIG_REGISTER_TABLE_ENTRY(&(uct_##_name##_tl).config); \
309     UCS_STATIC_INIT { \
310         ucs_list_add_tail(&(_component)->tl_list, &(uct_##_name##_tl).list); \
311     }
312 
313 
314 /**
315  * "Base" structure which defines interface configuration options.
316  * Specific transport extend this structure.
317  */
318 struct uct_iface_config {
319     struct {
320         uct_alloc_method_t  *methods;
321         unsigned            count;
322     } alloc_methods;
323 
324     int               failure;   /* Level of failure reports */
325     size_t            max_num_eps;
326 };
327 
328 
329 /**
330  * Memory pool configuration.
331  */
332 typedef struct uct_iface_mpool_config {
333     unsigned          max_bufs;  /* Upper limit to number of buffers */
334     unsigned          bufs_grow; /* How many buffers (approx.) are allocated every time */
335 } uct_iface_mpool_config_t;
336 
337 
338 /**
339  * Define configuration fields for memory pool parameters.
340  */
341 #define UCT_IFACE_MPOOL_CONFIG_FIELDS(_prefix, _dfl_max, _dfl_grow, _mp_name, _offset, _desc) \
342     {_prefix "MAX_BUFS", UCS_PP_QUOTE(_dfl_max), \
343      "Maximal number of " _mp_name " buffers for the interface. -1 is infinite." \
344      _desc, \
345      (_offset) + ucs_offsetof(uct_iface_mpool_config_t, max_bufs), UCS_CONFIG_TYPE_INT}, \
346     \
347     {_prefix "BUFS_GROW", UCS_PP_QUOTE(_dfl_grow), \
348      "How much buffers are added every time the " _mp_name " memory pool grows.\n" \
349      "0 means the value is chosen by the transport.", \
350      (_offset) + ucs_offsetof(uct_iface_mpool_config_t, bufs_grow), UCS_CONFIG_TYPE_UINT}
351 
352 
353 /**
354  * Get a descriptor from memory pool, tell valgrind it's already defined, return
355  * error if the memory pool is empty.
356  *
357  * @param _mp       Memory pool to get descriptor from.
358  * @param _desc     Variable to assign descriptor to.
359  * @param _failure  What do to if memory poll is empty.
360  *
361  * @return TX descriptor fetched from memory pool.
362  */
363 #define UCT_TL_IFACE_GET_TX_DESC(_iface, _mp, _desc, _failure) \
364     { \
365         _desc = ucs_mpool_get_inline(_mp); \
366         if (ucs_unlikely((_desc) == NULL)) { \
367             UCT_TL_IFACE_STAT_TX_NO_DESC(_iface); \
368             _failure; \
369         } \
370         \
371         VALGRIND_MAKE_MEM_DEFINED(_desc, sizeof(*(_desc))); \
372     }
373 
374 
375 #define UCT_TL_IFACE_GET_RX_DESC(_iface, _mp, _desc, _failure) \
376     { \
377         _desc = ucs_mpool_get_inline(_mp); \
378         if (ucs_unlikely((_desc) == NULL)) { \
379             uct_iface_mpool_empty_warn(_iface, _mp); \
380             _failure; \
381         } \
382         \
383         VALGRIND_MAKE_MEM_DEFINED(_desc, sizeof(*(_desc))); \
384     }
385 
386 
387 #define UCT_TL_IFACE_PUT_DESC(_desc) \
388     { \
389         ucs_mpool_put_inline(_desc); \
390         VALGRIND_MAKE_MEM_UNDEFINED(_desc, sizeof(*(_desc))); \
391     }
392 
393 
394 /**
395  * TL Memory pool object initialization callback.
396  */
397 typedef void (*uct_iface_mpool_init_obj_cb_t)(uct_iface_h iface, void *obj,
398                                               uct_mem_h memh);
399 
400 
401 /**
402  * Base structure for private data held inside a pending request for TLs
403  * which use ucs_arbiter_t to progress pending requests.
404  */
405 typedef struct {
406     ucs_arbiter_elem_t  arb_elem;
407 } uct_pending_req_priv_arb_t;
408 
409 
410 static UCS_F_ALWAYS_INLINE ucs_arbiter_elem_t *
uct_pending_req_priv_arb_elem(uct_pending_req_t * req)411 uct_pending_req_priv_arb_elem(uct_pending_req_t *req)
412 {
413     uct_pending_req_priv_arb_t *priv_arb_p =
414         (uct_pending_req_priv_arb_t *)&req->priv;
415 
416     return &priv_arb_p->arb_elem;
417 }
418 
419 
420 /**
421  * Add a pending request to the arbiter.
422  */
423 #define uct_pending_req_arb_group_push(_arbiter_group, _req) \
424     do { \
425         ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \
426         ucs_arbiter_group_push_elem_always(_arbiter_group, \
427                                            uct_pending_req_priv_arb_elem(_req)); \
428     } while (0)
429 
430 
431 /**
432  * Add a pending request to the head of group in arbiter.
433  */
434 #define uct_pending_req_arb_group_push_head(_arbiter, _arbiter_group, _req) \
435     do { \
436         ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \
437         ucs_arbiter_group_push_head_elem_always(_arbiter_group, \
438                                                 uct_pending_req_priv_arb_elem(_req)); \
439     } while (0)
440 
441 
442 /**
443  * Base structure for private data held inside a pending request for TLs
444  * which use ucs_queue_t to progress pending requests.
445  */
446 typedef struct {
447     ucs_queue_elem_t    queue_elem;
448 } uct_pending_req_priv_queue_t;
449 
450 
451 static UCS_F_ALWAYS_INLINE ucs_queue_elem_t *
uct_pending_req_priv_queue_elem(uct_pending_req_t * req)452 uct_pending_req_priv_queue_elem(uct_pending_req_t* req)
453 {
454     uct_pending_req_priv_queue_t *priv_queue_p =
455         (uct_pending_req_priv_queue_t *)&req->priv;
456 
457     return &priv_queue_p->queue_elem;
458 }
459 
460 
461 /**
462  * Add a pending request to the queue.
463  */
464 #define uct_pending_req_queue_push(_queue, _req) \
465     ucs_queue_push((_queue), uct_pending_req_priv_queue_elem(_req))
466 
467 
468 typedef struct {
469     uct_pending_purge_callback_t cb;
470     void                         *arg;
471 } uct_purge_cb_args_t;
472 
473 
474 /**
475  * Dispatch all requests in the pending queue, as long as _cond holds true.
476  * _cond is an expression which can use "_priv" variable.
477  *
478  * @param _priv   Variable which will hold a pointer to request private data.
479  * @param _queue  The pending queue.
480  * @param _cond   Condition which should be true in order to keep dispatching.
481  *
482  * TODO support a callback returning UCS_INPROGRESS.
483  */
484 #define uct_pending_queue_dispatch(_priv, _queue, _cond) \
485     while (!ucs_queue_is_empty(_queue)) { \
486         uct_pending_req_priv_queue_t *_base_priv; \
487         uct_pending_req_t *_req; \
488         ucs_status_t _status; \
489         \
490         _base_priv = \
491             ucs_queue_head_elem_non_empty((_queue), \
492                                           uct_pending_req_priv_queue_t, \
493                                           queue_elem); \
494         \
495         UCS_STATIC_ASSERT(sizeof(*(_priv)) <= UCT_PENDING_REQ_PRIV_LEN); \
496         _priv = (typeof(_priv))(_base_priv); \
497         \
498         if (!(_cond)) { \
499             break; \
500         } \
501         \
502         _req = ucs_container_of(_priv, uct_pending_req_t, priv); \
503         ucs_queue_pull_non_empty(_queue); \
504         _status = _req->func(_req); \
505         if (_status != UCS_OK) { \
506             ucs_queue_push_head(_queue, &_base_priv->queue_elem); \
507         } \
508     }
509 
510 
511 /**
512  * Purge messages from the pending queue.
513  *
514  * @param _priv   Variable which will hold a pointer to request private data.
515  * @param _queue  The pending queue.
516  * @param _cond   Condition which should be true in order to remove a request.
517  * @param _cb     Callback for purging the request.
518  * @return Callback return value.
519  */
520 #define uct_pending_queue_purge(_priv, _queue, _cond, _cb, _arg) \
521     { \
522         uct_pending_req_priv_queue_t *_base_priv; \
523         ucs_queue_iter_t             _iter; \
524         \
525         ucs_queue_for_each_safe(_base_priv, _iter, _queue, queue_elem) { \
526             _priv = (typeof(_priv))(_base_priv); \
527             if (_cond) { \
528                 ucs_queue_del_iter(_queue, _iter); \
529                 (void)_cb(ucs_container_of(_base_priv, uct_pending_req_t, priv), _arg); \
530             } \
531         } \
532     }
533 
534 
535 /**
536  * Helper macro to trace active message send/receive.
537  *
538  * @param _iface    Interface.
539  * @param _type     Message type (send/receive)
540  * @param _am_id    Active message ID.
541  * @param _payload  Active message payload.
542  * @paral _length   Active message length
543  */
544 #define uct_iface_trace_am(_iface, _type, _am_id, _payload, _length, _fmt, ...) \
545     if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { \
546         char buf[256] = {0}; \
547         uct_iface_dump_am(_iface, _type, _am_id, _payload, _length, \
548                           buf, sizeof(buf) - 1); \
549         ucs_trace_data(_fmt " am_id %d len %zu %s", ## __VA_ARGS__, \
550                        _am_id, (size_t)(_length), buf); \
551     }
552 
553 
554 extern ucs_config_field_t uct_iface_config_table[];
555 
556 
557 /**
558  * Initialize a memory pool for buffers used by TL interface.
559  *
560  * @param mp
561  * @param elem_size
562  * @param align_offset
563  * @param alignment    Data will be aligned to these units.
564  * @param config       Memory pool configuration.
565  * @param grow         Default number of buffers added for every chunk.
566  * @param init_obj_cb  Object constructor.
567  * @param name         Memory pool name.
568  */
569 ucs_status_t uct_iface_mpool_init(uct_base_iface_t *iface, ucs_mpool_t *mp,
570                                   size_t elem_size, size_t align_offset, size_t alignment,
571                                   const uct_iface_mpool_config_t *config, unsigned grow,
572                                   uct_iface_mpool_init_obj_cb_t init_obj_cb,
573                                   const char *name);
574 
575 
576 /**
577  * Dump active message contents using the user-defined tracer callback.
578  */
579 void uct_iface_dump_am(uct_base_iface_t *iface, uct_am_trace_type_t type,
580                        uint8_t id, const void *data, size_t length,
581                        char *buffer, size_t max);
582 
583 void uct_iface_mpool_empty_warn(uct_base_iface_t *iface, ucs_mpool_t *mp);
584 
585 ucs_status_t uct_set_ep_failed(ucs_class_t* cls, uct_ep_h tl_ep, uct_iface_h
586                                tl_iface, ucs_status_t status);
587 
588 void uct_base_iface_query(uct_base_iface_t *iface, uct_iface_attr_t *iface_attr);
589 
590 ucs_status_t uct_single_device_resource(uct_md_h md, const char *dev_name,
591                                         uct_device_type_t dev_type,
592                                         uct_tl_device_resource_t **tl_devices_p,
593                                         unsigned *num_tl_devices_p);
594 
595 ucs_status_t uct_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
596                                   uct_completion_t *comp);
597 
598 ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags);
599 
600 void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags);
601 
602 void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface,
603                                        ucs_callback_t cb, unsigned flags);
604 
605 void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags);
606 
607 ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
608                                uct_completion_t *comp);
609 
610 ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags);
611 
612 /*
613  * Invoke active message handler.
614  *
615  * @param iface    Interface to invoke the handler for.
616  * @param id       Active message ID.
617  * @param data     Received data.
618  * @param length   Length of received data.
619  * @param flags    Mask with @ref uct_cb_param_flags
620  */
621 static inline ucs_status_t
uct_iface_invoke_am(uct_base_iface_t * iface,uint8_t id,void * data,unsigned length,unsigned flags)622 uct_iface_invoke_am(uct_base_iface_t *iface, uint8_t id, void *data,
623                     unsigned length, unsigned flags)
624 {
625     ucs_status_t     status;
626     uct_am_handler_t *handler;
627 
628     ucs_assertv(id < UCT_AM_ID_MAX, "invalid am id: %d (max: %lu)",
629                 id, UCT_AM_ID_MAX - 1);
630 
631     UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_IFACE_STAT_RX_AM, 1);
632     UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_IFACE_STAT_RX_AM_BYTES, length);
633 
634     handler = &iface->am[id];
635     status = handler->cb(handler->arg, data, length, flags);
636     ucs_assert((status == UCS_OK) ||
637                ((status == UCS_INPROGRESS) && (flags & UCT_CB_PARAM_FLAG_DESC)));
638     return status;
639 }
640 
641 
642 /**
643  * Invoke send completion.
644  *
645  * @param comp   Completion to invoke.
646  * @param data   Optional completion data (operation reply).
647  */
648 static UCS_F_ALWAYS_INLINE
uct_invoke_completion(uct_completion_t * comp,ucs_status_t status)649 void uct_invoke_completion(uct_completion_t *comp, ucs_status_t status)
650 {
651     ucs_trace_func("comp=%p, count=%d, status=%d", comp, comp->count, status);
652     ucs_assertv(comp->count > 0, "comp=%p count=%d", comp, comp->count);
653     if (--comp->count == 0) {
654         comp->func(comp, status);
655     }
656 }
657 
658 
659 /**
660  * Copy data to target am_short buffer
661  */
662 static UCS_F_ALWAYS_INLINE
uct_am_short_fill_data(void * buffer,uint64_t header,const void * payload,size_t length)663 void uct_am_short_fill_data(void *buffer, uint64_t header, const void *payload,
664                             size_t length)
665 {
666     /**
667      * Helper structure to fill send buffer of short messages for
668      * non-accelerated transports
669      */
670     struct uct_am_short_packet {
671         uint64_t header;
672         char     payload[];
673     } UCS_S_PACKED *packet = (struct uct_am_short_packet*)buffer;
674 
675     packet->header = header;
676     /* suppress false positive diagnostic from uct_mm_ep_am_common_send call */
677     /* cppcheck-suppress ctunullpointer */
678     memcpy(packet->payload, payload, length);
679 }
680 
681 #endif
682