1 /**
2 * Copyright (C) Mellanox Technologies Ltd. 2001-2019. ALL RIGHTS RESERVED.
3 *
4 * See file LICENSE for terms.
5 */
6
7 #ifndef UCT_IFACE_H_
8 #define UCT_IFACE_H_
9
10 #include "uct_worker.h"
11
12 #include <uct/api/uct.h>
13 #include <uct/base/uct_component.h>
14 #include <ucs/config/parser.h>
15 #include <ucs/datastruct/arbiter.h>
16 #include <ucs/datastruct/mpool.h>
17 #include <ucs/datastruct/queue.h>
18 #include <ucs/debug/log.h>
19 #include <ucs/stats/stats.h>
20 #include <ucs/sys/compiler.h>
21 #include <ucs/sys/sys.h>
22 #include <ucs/type/class.h>
23
24 #include <ucs/datastruct/mpool.inl>
25
26
27 enum {
28 UCT_EP_STAT_AM,
29 UCT_EP_STAT_PUT,
30 UCT_EP_STAT_GET,
31 UCT_EP_STAT_ATOMIC,
32 #if IBV_HW_TM
33 UCT_EP_STAT_TAG,
34 #endif
35 UCT_EP_STAT_BYTES_SHORT,
36 UCT_EP_STAT_BYTES_BCOPY,
37 UCT_EP_STAT_BYTES_ZCOPY,
38 UCT_EP_STAT_NO_RES,
39 UCT_EP_STAT_FLUSH,
40 UCT_EP_STAT_FLUSH_WAIT, /* number of times flush called while in progress */
41 UCT_EP_STAT_PENDING,
42 UCT_EP_STAT_FENCE,
43 UCT_EP_STAT_LAST
44 };
45
46 enum {
47 UCT_IFACE_STAT_RX_AM,
48 UCT_IFACE_STAT_RX_AM_BYTES,
49 UCT_IFACE_STAT_TX_NO_DESC,
50 UCT_IFACE_STAT_FLUSH,
51 UCT_IFACE_STAT_FLUSH_WAIT, /* number of times flush called while in progress */
52 UCT_IFACE_STAT_FENCE,
53 UCT_IFACE_STAT_LAST
54 };
55
56
57 /*
58 * Statistics macros
59 */
60 #define UCT_TL_EP_STAT_OP(_ep, _op, _method, _size) \
61 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_##_op, 1); \
62 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_BYTES_##_method, _size);
63 #define UCT_TL_EP_STAT_OP_IF_SUCCESS(_status, _ep, _op, _method, _size) \
64 if (_status >= 0) { \
65 UCT_TL_EP_STAT_OP(_ep, _op, _method, _size) \
66 }
67 #define UCT_TL_EP_STAT_ATOMIC(_ep) \
68 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_ATOMIC, 1);
69 #define UCT_TL_EP_STAT_FLUSH(_ep) \
70 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FLUSH, 1);
71 #define UCT_TL_EP_STAT_FLUSH_WAIT(_ep) \
72 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FLUSH_WAIT, 1);
73 #define UCT_TL_EP_STAT_FENCE(_ep) \
74 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_FENCE, 1);
75 #define UCT_TL_EP_STAT_PEND(_ep) \
76 UCS_STATS_UPDATE_COUNTER((_ep)->stats, UCT_EP_STAT_PENDING, 1);
77
78 #define UCT_TL_IFACE_STAT_FLUSH(_iface) \
79 UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FLUSH, 1);
80 #define UCT_TL_IFACE_STAT_FLUSH_WAIT(_iface) \
81 UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FLUSH_WAIT, 1);
82 #define UCT_TL_IFACE_STAT_FENCE(_iface) \
83 UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_FENCE, 1);
84 #define UCT_TL_IFACE_STAT_TX_NO_DESC(_iface) \
85 UCS_STATS_UPDATE_COUNTER((_iface)->stats, UCT_IFACE_STAT_TX_NO_DESC, 1);
86
87
88 #define UCT_CB_FLAGS_CHECK(_flags) \
89 do { \
90 if ((_flags) & UCT_CB_FLAG_RESERVED) { \
91 ucs_error("Unsupported callback flag 0x%x", UCT_CB_FLAG_RESERVED); \
92 return UCS_ERR_INVALID_PARAM; \
93 } \
94 } while (0)
95
96
97 /**
98 * In release mode - do nothing.
99 *
100 * In debug mode, if _condition is not true, return an error. This could be less
101 * optimal because of additional checks, and that compiler needs to generate code
102 * for error flow as well.
103 */
104 #define UCT_CHECK_PARAM(_condition, _err_message, ...) \
105 if (ENABLE_PARAMS_CHECK && !(_condition)) { \
106 ucs_error(_err_message, ## __VA_ARGS__); \
107 return UCS_ERR_INVALID_PARAM; \
108 }
109
110
111 /**
112 * In release mode - do nothing.
113 *
114 * In debug mode, if @a _params field mask does not have set
115 * @ref UCT_EP_PARAM_FIELD_DEV_ADDR and @ref UCT_EP_PARAM_FIELD_IFACE_ADDR
116 * flags, return an error.
117 */
118 #define UCT_EP_PARAMS_CHECK_DEV_IFACE_ADDRS(_params) \
119 UCT_CHECK_PARAM(ucs_test_all_flags((_params)->field_mask, \
120 UCT_EP_PARAM_FIELD_DEV_ADDR | \
121 UCT_EP_PARAM_FIELD_IFACE_ADDR), \
122 "UCT_EP_PARAM_FIELD_DEV_ADDR and UCT_EP_PARAM_FIELD_IFACE_ADDR are not defined")
123
124
125 #define UCT_EP_PARAMS_GET_PATH_INDEX(_params) \
126 (((_params)->field_mask & UCT_EP_PARAM_FIELD_PATH_INDEX) ? \
127 (_params)->path_index : 0)
128
129 /**
130 * Check the condition and return status as a pointer if not true.
131 */
132 #define UCT_CHECK_PARAM_PTR(_condition, _err_message, ...) \
133 if (ENABLE_PARAMS_CHECK && !(_condition)) { \
134 ucs_error(_err_message, ## __VA_ARGS__); \
135 return UCS_STATUS_PTR(UCS_ERR_INVALID_PARAM); \
136 }
137
138
139 /**
140 * Check the size of the IOV array
141 */
142 #define UCT_CHECK_IOV_SIZE(_iovcnt, _max_iov, _name) \
143 UCT_CHECK_PARAM((_iovcnt) <= (_max_iov), \
144 "iovcnt(%lu) should be limited by %lu in %s", \
145 _iovcnt, _max_iov, _name)
146
147
148 /**
149 * In debug mode, if _condition is not true, generate 'Invalid length' error.
150 */
151 #define UCT_CHECK_LENGTH(_length, _min_length, _max_length, _name) \
152 { \
153 typeof(_length) __length = _length; \
154 UCT_CHECK_PARAM((_length) <= (_max_length), \
155 "Invalid %s length: %zu (expected: <= %zu)", \
156 _name, (size_t)(__length), (size_t)(_max_length)); \
157 UCT_CHECK_PARAM((ssize_t)(_length) >= (_min_length), \
158 "Invalid %s length: %zu (expected: >= %zu)", \
159 _name, (size_t)(__length), (size_t)(_min_length)); \
160 }
161
162 /**
163 * Skip if this is a zero-length operation.
164 */
165 #define UCT_SKIP_ZERO_LENGTH(_length, ...) \
166 if (0 == (_length)) { \
167 ucs_trace_data("Zero length request: skip it"); \
168 UCS_PP_FOREACH(_UCT_RELEASE_DESC, _, __VA_ARGS__) \
169 return UCS_OK; \
170 }
171 #define _UCT_RELEASE_DESC(_, _desc) \
172 ucs_mpool_put(_desc);
173
174
175 /**
176 * In debug mode, check that active message ID is valid.
177 */
178 #define UCT_CHECK_AM_ID(_am_id) \
179 UCT_CHECK_PARAM((_am_id) < UCT_AM_ID_MAX, \
180 "Invalid active message id (valid range: 0..%d)", \
181 (int)UCT_AM_ID_MAX - 1)
182
183
184 /**
185 * Declare classes for structures defined in api/tl.h
186 */
187 UCS_CLASS_DECLARE(uct_iface_h, uct_iface_ops_t, uct_md_h);
188 UCS_CLASS_DECLARE(uct_ep_t, uct_iface_h);
189
190
191 /**
192 * Active message handle table entry
193 */
194 typedef struct uct_am_handler {
195 uct_am_callback_t cb;
196 void *arg;
197 uint32_t flags;
198 } uct_am_handler_t;
199
200
201 /**
202 * Base structure of all interfaces.
203 * Includes the AM table which we don't want to expose.
204 */
205 typedef struct uct_base_iface {
206 uct_iface_t super;
207 uct_md_h md; /* MD this interface is using */
208 uct_priv_worker_t *worker; /* Worker this interface is on */
209 uct_am_handler_t am[UCT_AM_ID_MAX];/* Active message table */
210 uct_am_tracer_t am_tracer; /* Active message tracer */
211 void *am_tracer_arg; /* Tracer argument */
212 uct_error_handler_t err_handler; /* Error handler */
213 void *err_handler_arg; /* Error handler argument */
214 uint32_t err_handler_flags; /* Error handler callback flags */
215 uct_worker_progress_t prog; /* Will be removed once all transports
216 support progress control */
217 unsigned progress_flags; /* Which progress is currently enabled */
218
219 struct {
220 unsigned num_alloc_methods;
221 uct_alloc_method_t alloc_methods[UCT_ALLOC_METHOD_LAST];
222 ucs_log_level_t failure_level;
223 size_t max_num_eps;
224 } config;
225
226 UCS_STATS_NODE_DECLARE(stats) /* Statistics */
227 } uct_base_iface_t;
228
229 UCS_CLASS_DECLARE(uct_base_iface_t, uct_iface_ops_t*, uct_md_h, uct_worker_h,
230 const uct_iface_params_t*, const uct_iface_config_t*
231 UCS_STATS_ARG(ucs_stats_node_t*) UCS_STATS_ARG(const char*));
232
233
234 /**
235 * Stub interface used for failed endpoints
236 */
237 typedef struct uct_failed_iface {
238 uct_iface_t super;
239 ucs_queue_head_t pend_q;
240 } uct_failed_iface_t;
241
242
243 /**
244 * Base structure of all endpoints.
245 */
246 typedef struct uct_base_ep {
247 uct_ep_t super;
248 UCS_STATS_NODE_DECLARE(stats)
249 } uct_base_ep_t;
250 UCS_CLASS_DECLARE(uct_base_ep_t, uct_base_iface_t*);
251
252
253 /**
254 * Internal resource descriptor of a transport device
255 */
256 typedef struct uct_tl_device_resource {
257 char name[UCT_DEVICE_NAME_MAX]; /**< Hardware device name */
258 uct_device_type_t type; /**< The device represented by this resource
259 (e.g. UCT_DEVICE_TYPE_NET for a network interface) */
260 ucs_sys_device_t sys_device; /**< The identifier associated with the device
261 bus_id as captured in ucs_sys_bus_id_t struct */
262 } uct_tl_device_resource_t;
263
264
265 /**
266 * UCT transport definition. This structure should not be used directly; use
267 * @ref UCT_TL_DEFINE macro to define a transport.
268 */
269 typedef struct uct_tl {
270 char name[UCT_TL_NAME_MAX]; /**< Transport name */
271
272 ucs_status_t (*query_devices)(uct_md_h md,
273 uct_tl_device_resource_t **tl_devices_p,
274 unsigned *num_tl_devices_p);
275
276 ucs_status_t (*iface_open)(uct_md_h md, uct_worker_h worker,
277 const uct_iface_params_t *params,
278 const uct_iface_config_t *config,
279 uct_iface_h *iface_p);
280
281 ucs_config_global_list_entry_t config; /**< Transport configuration entry */
282 ucs_list_link_t list; /**< Entry in component's transports list */
283 } uct_tl_t;
284
285
286 /**
287 * Define a transport
288 *
289 * @param _component Component to add the transport to
290 * @param _name Name of the transport (should be a token, not a string)
291 * @param _query_devices Function to query the list of available devices
292 * @param _iface_class Struct type defining the uct_iface class
293 */
294 #define UCT_TL_DEFINE(_component, _name, _query_devices, _iface_class, \
295 _cfg_prefix, _cfg_table, _cfg_struct) \
296 \
297 uct_tl_t uct_##_name##_tl = { \
298 .name = #_name, \
299 .query_devices = _query_devices, \
300 .iface_open = UCS_CLASS_NEW_FUNC_NAME(_iface_class), \
301 .config = { \
302 .name = #_name" transport", \
303 .prefix = _cfg_prefix, \
304 .table = _cfg_table, \
305 .size = sizeof(_cfg_struct), \
306 } \
307 }; \
308 UCS_CONFIG_REGISTER_TABLE_ENTRY(&(uct_##_name##_tl).config); \
309 UCS_STATIC_INIT { \
310 ucs_list_add_tail(&(_component)->tl_list, &(uct_##_name##_tl).list); \
311 }
312
313
314 /**
315 * "Base" structure which defines interface configuration options.
316 * Specific transport extend this structure.
317 */
318 struct uct_iface_config {
319 struct {
320 uct_alloc_method_t *methods;
321 unsigned count;
322 } alloc_methods;
323
324 int failure; /* Level of failure reports */
325 size_t max_num_eps;
326 };
327
328
329 /**
330 * Memory pool configuration.
331 */
332 typedef struct uct_iface_mpool_config {
333 unsigned max_bufs; /* Upper limit to number of buffers */
334 unsigned bufs_grow; /* How many buffers (approx.) are allocated every time */
335 } uct_iface_mpool_config_t;
336
337
338 /**
339 * Define configuration fields for memory pool parameters.
340 */
341 #define UCT_IFACE_MPOOL_CONFIG_FIELDS(_prefix, _dfl_max, _dfl_grow, _mp_name, _offset, _desc) \
342 {_prefix "MAX_BUFS", UCS_PP_QUOTE(_dfl_max), \
343 "Maximal number of " _mp_name " buffers for the interface. -1 is infinite." \
344 _desc, \
345 (_offset) + ucs_offsetof(uct_iface_mpool_config_t, max_bufs), UCS_CONFIG_TYPE_INT}, \
346 \
347 {_prefix "BUFS_GROW", UCS_PP_QUOTE(_dfl_grow), \
348 "How much buffers are added every time the " _mp_name " memory pool grows.\n" \
349 "0 means the value is chosen by the transport.", \
350 (_offset) + ucs_offsetof(uct_iface_mpool_config_t, bufs_grow), UCS_CONFIG_TYPE_UINT}
351
352
353 /**
354 * Get a descriptor from memory pool, tell valgrind it's already defined, return
355 * error if the memory pool is empty.
356 *
357 * @param _mp Memory pool to get descriptor from.
358 * @param _desc Variable to assign descriptor to.
359 * @param _failure What do to if memory poll is empty.
360 *
361 * @return TX descriptor fetched from memory pool.
362 */
363 #define UCT_TL_IFACE_GET_TX_DESC(_iface, _mp, _desc, _failure) \
364 { \
365 _desc = ucs_mpool_get_inline(_mp); \
366 if (ucs_unlikely((_desc) == NULL)) { \
367 UCT_TL_IFACE_STAT_TX_NO_DESC(_iface); \
368 _failure; \
369 } \
370 \
371 VALGRIND_MAKE_MEM_DEFINED(_desc, sizeof(*(_desc))); \
372 }
373
374
375 #define UCT_TL_IFACE_GET_RX_DESC(_iface, _mp, _desc, _failure) \
376 { \
377 _desc = ucs_mpool_get_inline(_mp); \
378 if (ucs_unlikely((_desc) == NULL)) { \
379 uct_iface_mpool_empty_warn(_iface, _mp); \
380 _failure; \
381 } \
382 \
383 VALGRIND_MAKE_MEM_DEFINED(_desc, sizeof(*(_desc))); \
384 }
385
386
387 #define UCT_TL_IFACE_PUT_DESC(_desc) \
388 { \
389 ucs_mpool_put_inline(_desc); \
390 VALGRIND_MAKE_MEM_UNDEFINED(_desc, sizeof(*(_desc))); \
391 }
392
393
394 /**
395 * TL Memory pool object initialization callback.
396 */
397 typedef void (*uct_iface_mpool_init_obj_cb_t)(uct_iface_h iface, void *obj,
398 uct_mem_h memh);
399
400
401 /**
402 * Base structure for private data held inside a pending request for TLs
403 * which use ucs_arbiter_t to progress pending requests.
404 */
405 typedef struct {
406 ucs_arbiter_elem_t arb_elem;
407 } uct_pending_req_priv_arb_t;
408
409
410 static UCS_F_ALWAYS_INLINE ucs_arbiter_elem_t *
uct_pending_req_priv_arb_elem(uct_pending_req_t * req)411 uct_pending_req_priv_arb_elem(uct_pending_req_t *req)
412 {
413 uct_pending_req_priv_arb_t *priv_arb_p =
414 (uct_pending_req_priv_arb_t *)&req->priv;
415
416 return &priv_arb_p->arb_elem;
417 }
418
419
420 /**
421 * Add a pending request to the arbiter.
422 */
423 #define uct_pending_req_arb_group_push(_arbiter_group, _req) \
424 do { \
425 ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \
426 ucs_arbiter_group_push_elem_always(_arbiter_group, \
427 uct_pending_req_priv_arb_elem(_req)); \
428 } while (0)
429
430
431 /**
432 * Add a pending request to the head of group in arbiter.
433 */
434 #define uct_pending_req_arb_group_push_head(_arbiter, _arbiter_group, _req) \
435 do { \
436 ucs_arbiter_elem_init(uct_pending_req_priv_arb_elem(_req)); \
437 ucs_arbiter_group_push_head_elem_always(_arbiter_group, \
438 uct_pending_req_priv_arb_elem(_req)); \
439 } while (0)
440
441
442 /**
443 * Base structure for private data held inside a pending request for TLs
444 * which use ucs_queue_t to progress pending requests.
445 */
446 typedef struct {
447 ucs_queue_elem_t queue_elem;
448 } uct_pending_req_priv_queue_t;
449
450
451 static UCS_F_ALWAYS_INLINE ucs_queue_elem_t *
uct_pending_req_priv_queue_elem(uct_pending_req_t * req)452 uct_pending_req_priv_queue_elem(uct_pending_req_t* req)
453 {
454 uct_pending_req_priv_queue_t *priv_queue_p =
455 (uct_pending_req_priv_queue_t *)&req->priv;
456
457 return &priv_queue_p->queue_elem;
458 }
459
460
461 /**
462 * Add a pending request to the queue.
463 */
464 #define uct_pending_req_queue_push(_queue, _req) \
465 ucs_queue_push((_queue), uct_pending_req_priv_queue_elem(_req))
466
467
468 typedef struct {
469 uct_pending_purge_callback_t cb;
470 void *arg;
471 } uct_purge_cb_args_t;
472
473
474 /**
475 * Dispatch all requests in the pending queue, as long as _cond holds true.
476 * _cond is an expression which can use "_priv" variable.
477 *
478 * @param _priv Variable which will hold a pointer to request private data.
479 * @param _queue The pending queue.
480 * @param _cond Condition which should be true in order to keep dispatching.
481 *
482 * TODO support a callback returning UCS_INPROGRESS.
483 */
484 #define uct_pending_queue_dispatch(_priv, _queue, _cond) \
485 while (!ucs_queue_is_empty(_queue)) { \
486 uct_pending_req_priv_queue_t *_base_priv; \
487 uct_pending_req_t *_req; \
488 ucs_status_t _status; \
489 \
490 _base_priv = \
491 ucs_queue_head_elem_non_empty((_queue), \
492 uct_pending_req_priv_queue_t, \
493 queue_elem); \
494 \
495 UCS_STATIC_ASSERT(sizeof(*(_priv)) <= UCT_PENDING_REQ_PRIV_LEN); \
496 _priv = (typeof(_priv))(_base_priv); \
497 \
498 if (!(_cond)) { \
499 break; \
500 } \
501 \
502 _req = ucs_container_of(_priv, uct_pending_req_t, priv); \
503 ucs_queue_pull_non_empty(_queue); \
504 _status = _req->func(_req); \
505 if (_status != UCS_OK) { \
506 ucs_queue_push_head(_queue, &_base_priv->queue_elem); \
507 } \
508 }
509
510
511 /**
512 * Purge messages from the pending queue.
513 *
514 * @param _priv Variable which will hold a pointer to request private data.
515 * @param _queue The pending queue.
516 * @param _cond Condition which should be true in order to remove a request.
517 * @param _cb Callback for purging the request.
518 * @return Callback return value.
519 */
520 #define uct_pending_queue_purge(_priv, _queue, _cond, _cb, _arg) \
521 { \
522 uct_pending_req_priv_queue_t *_base_priv; \
523 ucs_queue_iter_t _iter; \
524 \
525 ucs_queue_for_each_safe(_base_priv, _iter, _queue, queue_elem) { \
526 _priv = (typeof(_priv))(_base_priv); \
527 if (_cond) { \
528 ucs_queue_del_iter(_queue, _iter); \
529 (void)_cb(ucs_container_of(_base_priv, uct_pending_req_t, priv), _arg); \
530 } \
531 } \
532 }
533
534
535 /**
536 * Helper macro to trace active message send/receive.
537 *
538 * @param _iface Interface.
539 * @param _type Message type (send/receive)
540 * @param _am_id Active message ID.
541 * @param _payload Active message payload.
542 * @paral _length Active message length
543 */
544 #define uct_iface_trace_am(_iface, _type, _am_id, _payload, _length, _fmt, ...) \
545 if (ucs_log_is_enabled(UCS_LOG_LEVEL_TRACE_DATA)) { \
546 char buf[256] = {0}; \
547 uct_iface_dump_am(_iface, _type, _am_id, _payload, _length, \
548 buf, sizeof(buf) - 1); \
549 ucs_trace_data(_fmt " am_id %d len %zu %s", ## __VA_ARGS__, \
550 _am_id, (size_t)(_length), buf); \
551 }
552
553
554 extern ucs_config_field_t uct_iface_config_table[];
555
556
557 /**
558 * Initialize a memory pool for buffers used by TL interface.
559 *
560 * @param mp
561 * @param elem_size
562 * @param align_offset
563 * @param alignment Data will be aligned to these units.
564 * @param config Memory pool configuration.
565 * @param grow Default number of buffers added for every chunk.
566 * @param init_obj_cb Object constructor.
567 * @param name Memory pool name.
568 */
569 ucs_status_t uct_iface_mpool_init(uct_base_iface_t *iface, ucs_mpool_t *mp,
570 size_t elem_size, size_t align_offset, size_t alignment,
571 const uct_iface_mpool_config_t *config, unsigned grow,
572 uct_iface_mpool_init_obj_cb_t init_obj_cb,
573 const char *name);
574
575
576 /**
577 * Dump active message contents using the user-defined tracer callback.
578 */
579 void uct_iface_dump_am(uct_base_iface_t *iface, uct_am_trace_type_t type,
580 uint8_t id, const void *data, size_t length,
581 char *buffer, size_t max);
582
583 void uct_iface_mpool_empty_warn(uct_base_iface_t *iface, ucs_mpool_t *mp);
584
585 ucs_status_t uct_set_ep_failed(ucs_class_t* cls, uct_ep_h tl_ep, uct_iface_h
586 tl_iface, ucs_status_t status);
587
588 void uct_base_iface_query(uct_base_iface_t *iface, uct_iface_attr_t *iface_attr);
589
590 ucs_status_t uct_single_device_resource(uct_md_h md, const char *dev_name,
591 uct_device_type_t dev_type,
592 uct_tl_device_resource_t **tl_devices_p,
593 unsigned *num_tl_devices_p);
594
595 ucs_status_t uct_base_iface_flush(uct_iface_h tl_iface, unsigned flags,
596 uct_completion_t *comp);
597
598 ucs_status_t uct_base_iface_fence(uct_iface_h tl_iface, unsigned flags);
599
600 void uct_base_iface_progress_enable(uct_iface_h tl_iface, unsigned flags);
601
602 void uct_base_iface_progress_enable_cb(uct_base_iface_t *iface,
603 ucs_callback_t cb, unsigned flags);
604
605 void uct_base_iface_progress_disable(uct_iface_h tl_iface, unsigned flags);
606
607 ucs_status_t uct_base_ep_flush(uct_ep_h tl_ep, unsigned flags,
608 uct_completion_t *comp);
609
610 ucs_status_t uct_base_ep_fence(uct_ep_h tl_ep, unsigned flags);
611
612 /*
613 * Invoke active message handler.
614 *
615 * @param iface Interface to invoke the handler for.
616 * @param id Active message ID.
617 * @param data Received data.
618 * @param length Length of received data.
619 * @param flags Mask with @ref uct_cb_param_flags
620 */
621 static inline ucs_status_t
uct_iface_invoke_am(uct_base_iface_t * iface,uint8_t id,void * data,unsigned length,unsigned flags)622 uct_iface_invoke_am(uct_base_iface_t *iface, uint8_t id, void *data,
623 unsigned length, unsigned flags)
624 {
625 ucs_status_t status;
626 uct_am_handler_t *handler;
627
628 ucs_assertv(id < UCT_AM_ID_MAX, "invalid am id: %d (max: %lu)",
629 id, UCT_AM_ID_MAX - 1);
630
631 UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_IFACE_STAT_RX_AM, 1);
632 UCS_STATS_UPDATE_COUNTER(iface->stats, UCT_IFACE_STAT_RX_AM_BYTES, length);
633
634 handler = &iface->am[id];
635 status = handler->cb(handler->arg, data, length, flags);
636 ucs_assert((status == UCS_OK) ||
637 ((status == UCS_INPROGRESS) && (flags & UCT_CB_PARAM_FLAG_DESC)));
638 return status;
639 }
640
641
642 /**
643 * Invoke send completion.
644 *
645 * @param comp Completion to invoke.
646 * @param data Optional completion data (operation reply).
647 */
648 static UCS_F_ALWAYS_INLINE
uct_invoke_completion(uct_completion_t * comp,ucs_status_t status)649 void uct_invoke_completion(uct_completion_t *comp, ucs_status_t status)
650 {
651 ucs_trace_func("comp=%p, count=%d, status=%d", comp, comp->count, status);
652 ucs_assertv(comp->count > 0, "comp=%p count=%d", comp, comp->count);
653 if (--comp->count == 0) {
654 comp->func(comp, status);
655 }
656 }
657
658
659 /**
660 * Copy data to target am_short buffer
661 */
662 static UCS_F_ALWAYS_INLINE
uct_am_short_fill_data(void * buffer,uint64_t header,const void * payload,size_t length)663 void uct_am_short_fill_data(void *buffer, uint64_t header, const void *payload,
664 size_t length)
665 {
666 /**
667 * Helper structure to fill send buffer of short messages for
668 * non-accelerated transports
669 */
670 struct uct_am_short_packet {
671 uint64_t header;
672 char payload[];
673 } UCS_S_PACKED *packet = (struct uct_am_short_packet*)buffer;
674
675 packet->header = header;
676 /* suppress false positive diagnostic from uct_mm_ep_am_common_send call */
677 /* cppcheck-suppress ctunullpointer */
678 memcpy(packet->payload, payload, length);
679 }
680
681 #endif
682