1 /*
2  * Copyright (C) Huawei Technologies Co., Ltd. 2019.  ALL RIGHTS RESERVED.
3  * See file LICENSE for terms.
4  */
5 
6 #include "ucg_group.h"
7 
8 #include <ucs/datastruct/queue.h>
9 #include <ucs/datastruct/list.h>
10 #include <ucs/profile/profile.h>
11 #include <ucs/debug/memtrack.h>
12 #include <ucp/core/ucp_ep.inl>
13 #include <ucp/core/ucp_worker.h>
14 #include <ucp/core/ucp_ep.inl>
15 #include <ucp/core/ucp_proxy_ep.h> /* for @ref ucp_proxy_ep_test */
16 
17 #if ENABLE_STATS
18 /**
19  * UCG group statistics counters
20  */
21 enum {
22     UCG_GROUP_STAT_PLANS_CREATED,
23     UCG_GROUP_STAT_PLANS_USED,
24 
25     UCG_GROUP_STAT_OPS_CREATED,
26     UCG_GROUP_STAT_OPS_USED,
27     UCG_GROUP_STAT_OPS_IMMEDIATE,
28 
29     UCG_GROUP_STAT_LAST
30 };
31 
32 static ucs_stats_class_t ucg_group_stats_class = {
33     .name           = "ucg_group",
34     .num_counters   = UCG_GROUP_STAT_LAST,
35     .counter_names  = {
36         [UCG_GROUP_STAT_PLANS_CREATED] = "plans_created",
37         [UCG_GROUP_STAT_PLANS_USED]    = "plans_reused",
38         [UCG_GROUP_STAT_OPS_CREATED]   = "ops_created",
39         [UCG_GROUP_STAT_OPS_USED]      = "ops_started",
40         [UCG_GROUP_STAT_OPS_IMMEDIATE] = "ops_immediate"
41     }
42 };
43 #endif
44 
45 #define UCG_GROUP_PROGRESS_ADD(iface, ctx) {         \
46     unsigned idx = 0;                                \
47     if (ucs_unlikely(idx == UCG_GROUP_MAX_IFACES)) { \
48         return UCS_ERR_EXCEEDS_LIMIT;                \
49     }                                                \
50                                                      \
51     while (idx < (ctx)->iface_cnt) {                 \
52         if ((ctx)->ifaces[idx] == (iface)) {         \
53             break;                                   \
54         }                                            \
55         idx++;                                       \
56     }                                                \
57                                                      \
58     if (idx == (ctx)->iface_cnt) {                   \
59         (ctx)->ifaces[(ctx)->iface_cnt++] = (iface); \
60     }                                                \
61 }
62 
63 __KHASH_IMPL(ucg_group_ep, static UCS_F_MAYBE_UNUSED inline,
64              ucg_group_member_index_t, ucp_ep_h, 1, kh_int64_hash_func,
65              kh_int64_hash_equal);
66 
67 #ifndef HAVE_UCP_EXTENSIONS
68 /**
69  * This code is intended to reside in UCP, but is "hosted" in UCG for now...
70  */
71 
72 typedef ucs_status_t (*ucp_ext_init_f)(ucp_worker_h worker,
73                                        unsigned *next_am_id,
74                                        void *ext_ctx);
75 
76 typedef void (*ucp_ext_cleanup_f)(void *ext_ctx);
77 
78 typedef struct ucp_context_extenstion {
79     ucs_list_link_t list;
80     size_t worker_offset;
81     ucp_ext_init_f init;
82     ucp_ext_cleanup_f cleanup;
83 } ucp_context_extension_t;
84 
85 struct ucg_context {
86 #ifndef HAVE_UCP_EXTENSIONS
87     ucp_context_t                *super;          /* Extending the UCP context */
88 #else
89     ucp_context_t                 super;          /* Extending the UCP context */
90 #endif
91     size_t                        worker_size;    /* Worker allocation size */
92     ucs_list_link_t               extensions;     /* List of registered extensions */
93     unsigned                      last_am_id;     /* Last used AM ID */
94 };
95 
96 #ifndef HAVE_UCP_EXTENSIONS
97 /*
98  * Per the UCX community's request to make minimal changes to accomodate UCG,
99  * this code was added - duplicating UCP worker creation in a way UCG can use.
100  */
101 
102 #include <ucp/core/ucp_worker.c> /* Needed to provide worker creation logic */
103 
ucp_worker_create_by_size(ucp_context_h context,const ucp_worker_params_t * params,size_t worker_size,ucp_worker_h * worker_p)104 ucs_status_t ucp_worker_create_by_size(ucp_context_h context,
105                                        const ucp_worker_params_t *params,
106                                        size_t worker_size,
107                                        ucp_worker_h *worker_p)
108 {
109     ucs_thread_mode_t uct_thread_mode;
110     unsigned config_count;
111     unsigned name_length;
112     ucp_worker_h worker;
113     ucs_status_t status;
114 
115     config_count = ucs_min((context->num_tls + 1) * (context->num_tls + 1) * context->num_tls,
116                            UINT8_MAX);
117 
118     worker = ucs_calloc(1, worker_size, "ucp worker");
119     if (worker == NULL) {
120         return UCS_ERR_NO_MEMORY;
121     }
122 
123     uct_thread_mode = UCS_THREAD_MODE_SINGLE;
124     worker->flags   = 0;
125 
126     if (params->field_mask & UCP_WORKER_PARAM_FIELD_THREAD_MODE) {
127 #if ENABLE_MT
128         if (params->thread_mode != UCS_THREAD_MODE_SINGLE) {
129             /* UCT is serialized by UCP lock or by UCP user */
130             uct_thread_mode = UCS_THREAD_MODE_SERIALIZED;
131         }
132 
133         if (params->thread_mode == UCS_THREAD_MODE_MULTI) {
134             worker->flags |= UCP_WORKER_FLAG_MT;
135         }
136 #else
137         if (params->thread_mode != UCS_THREAD_MODE_SINGLE) {
138             ucs_debug("forced single thread mode on worker create");
139         }
140 #endif
141     }
142 
143     worker->context           = context;
144     worker->uuid              = ucs_generate_uuid((uintptr_t)worker);
145     worker->flush_ops_count   = 0;
146     worker->inprogress        = 0;
147     worker->ep_config_max     = config_count;
148     worker->ep_config_count   = 0;
149     worker->num_active_ifaces = 0;
150     worker->num_ifaces        = 0;
151     worker->am_message_id     = ucs_generate_uuid(0);
152     ucs_list_head_init(&worker->arm_ifaces);
153     ucs_list_head_init(&worker->stream_ready_eps);
154     ucs_list_head_init(&worker->all_eps);
155     ucp_ep_match_init(&worker->ep_match_ctx);
156 
157     UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t));
158     if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) {
159         UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_proto_t) <= sizeof(ucp_ep_t));
160         ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 3);
161     } else {
162         ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 2);
163     }
164 
165     if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) {
166         worker->user_data = params->user_data;
167     } else {
168         worker->user_data = NULL;
169     }
170 
171     if (context->config.features & UCP_FEATURE_AM){
172         worker->am_cbs            = NULL;
173         worker->am_cb_array_len   = 0;
174     }
175     name_length = ucs_min(UCP_WORKER_NAME_MAX,
176                           context->config.ext.max_worker_name + 1);
177     ucs_snprintf_zero(worker->name, name_length, "%s:%d", ucs_get_host_name(),
178                       getpid());
179 
180     /* Create statistics */
181     status = UCS_STATS_NODE_ALLOC(&worker->stats, &ucp_worker_stats_class,
182                                   ucs_stats_get_root(), "-%p", worker);
183     if (status != UCS_OK) {
184         goto err_free;
185     }
186 
187     status = UCS_STATS_NODE_ALLOC(&worker->tm_offload_stats,
188                                   &ucp_worker_tm_offload_stats_class,
189                                   worker->stats);
190     if (status != UCS_OK) {
191         goto err_free_stats;
192     }
193 
194     status = ucs_async_context_init(&worker->async,
195                                     context->config.ext.use_mt_mutex ?
196                                     UCS_ASYNC_MODE_THREAD_MUTEX :
197                                     UCS_ASYNC_THREAD_LOCK_TYPE);
198     if (status != UCS_OK) {
199         goto err_free_tm_offload_stats;
200     }
201 
202     /* Create the underlying UCT worker */
203     status = uct_worker_create(&worker->async, uct_thread_mode, &worker->uct);
204     if (status != UCS_OK) {
205         goto err_destroy_async;
206     }
207 
208     /* Create memory pool for requests */
209     status = ucs_mpool_init(&worker->req_mp, 0,
210                             sizeof(ucp_request_t) + context->config.request.size,
211                             0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
212                             &ucp_request_mpool_ops, "ucp_requests");
213     if (status != UCS_OK) {
214         goto err_destroy_uct_worker;
215     }
216 
217     /* create memory pool for small rkeys */
218     status = ucs_mpool_init(&worker->rkey_mp, 0,
219                             sizeof(ucp_rkey_t) +
220                             sizeof(ucp_tl_rkey_t) * UCP_RKEY_MPOOL_MAX_MD,
221                             0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
222                             &ucp_rkey_mpool_ops, "ucp_rkeys");
223     if (status != UCS_OK) {
224         goto err_req_mp_cleanup;
225     }
226 
227     /* Create UCS event set which combines events from all transports */
228     status = ucp_worker_wakeup_init(worker, params);
229     if (status != UCS_OK) {
230         goto err_rkey_mp_cleanup;
231     }
232 
233     if (params->field_mask & UCP_WORKER_PARAM_FIELD_CPU_MASK) {
234         worker->cpu_mask = params->cpu_mask;
235     } else {
236         UCS_CPU_ZERO(&worker->cpu_mask);
237     }
238 
239     /* Initialize tag matching */
240     status = ucp_tag_match_init(&worker->tm);
241     if (status != UCS_OK) {
242         goto err_wakeup_cleanup;
243     }
244 
245     /* Open all resources as interfaces on this worker */
246     status = ucp_worker_add_resource_ifaces(worker);
247     if (status != UCS_OK) {
248         goto err_close_ifaces;
249     }
250 
251     /* Open all resources as connection managers on this worker */
252     status = ucp_worker_add_resource_cms(worker);
253     if (status != UCS_OK) {
254         goto err_close_cms;
255     }
256 
257     /* create mem type endponts */
258     status = ucp_worker_create_mem_type_endpoints(worker);
259     if (status != UCS_OK) {
260         goto err_close_cms;
261     }
262 
263     /* Init AM and registered memory pools */
264     status = ucp_worker_init_mpools(worker);
265     if (status != UCS_OK) {
266         goto err_close_cms;
267     }
268 
269     /* Select atomic resources */
270     ucp_worker_init_atomic_tls(worker);
271 
272     /* At this point all UCT memory domains and interfaces are already created
273      * so warn about unused environment variables.
274      */
275     ucs_config_parser_warn_unused_env_vars_once();
276 
277     *worker_p = worker;
278     return UCS_OK;
279 
280 err_close_cms:
281     ucp_worker_close_cms(worker);
282 err_close_ifaces:
283     ucp_worker_close_ifaces(worker);
284     ucp_tag_match_cleanup(&worker->tm);
285 err_wakeup_cleanup:
286     ucp_worker_wakeup_cleanup(worker);
287 err_rkey_mp_cleanup:
288     ucs_mpool_cleanup(&worker->rkey_mp, 1);
289 err_req_mp_cleanup:
290     ucs_mpool_cleanup(&worker->req_mp, 1);
291 err_destroy_uct_worker:
292     uct_worker_destroy(worker->uct);
293 err_destroy_async:
294     ucs_async_context_cleanup(&worker->async);
295 err_free_tm_offload_stats:
296     UCS_STATS_NODE_FREE(worker->tm_offload_stats);
297 err_free_stats:
298     UCS_STATS_NODE_FREE(worker->stats);
299 err_free:
300     ucs_strided_alloc_cleanup(&worker->ep_alloc);
301     ucs_free(worker);
302     return status;
303 }
304 
ucg_worker_create(ucg_context_h context,const ucp_worker_params_t * params,ucg_worker_h * worker_p)305 ucs_status_t ucg_worker_create(ucg_context_h context,
306                                const ucp_worker_params_t *params,
307                                ucg_worker_h *worker_p)
308 {
309     return ucp_worker_create_by_size(context->super, params,
310             context->worker_size, worker_p);
311 }
312 
ucg_cleanup(ucg_context_h context)313 void ucg_cleanup(ucg_context_h context) {
314     ucp_cleanup(context->super);
315     ucs_free(context);
316 }
317 #endif
318 
ucp_extend(ucg_context_h context,size_t extension_ctx_length,ucp_ext_init_f init,ucp_ext_cleanup_f cleanup,size_t * extension_ctx_offset_in_worker)319 ucs_status_t ucp_extend(ucg_context_h context, size_t extension_ctx_length,
320                         ucp_ext_init_f init, ucp_ext_cleanup_f cleanup,
321                         size_t *extension_ctx_offset_in_worker)
322 {
323     ucp_context_extension_t *ext = ucs_malloc(sizeof(*ext), "context extension");
324     if (!ext) {
325         return UCS_ERR_NO_MEMORY;
326     }
327 
328     ext->init                       = init;
329     ext->cleanup                    = cleanup;
330     ext->worker_offset              = context->worker_size;
331     context->worker_size           += extension_ctx_length;
332     *extension_ctx_offset_in_worker = ext->worker_offset;
333 
334     ucs_list_add_tail(&context->extensions, &ext->list);
335     return UCS_OK;
336 }
337 
ucp_extension_cleanup(ucg_context_h context)338 void ucp_extension_cleanup(ucg_context_h context)
339 {
340     ucp_context_extension_t *ext, *iter;
341     ucs_list_for_each_safe(ext, iter, &context->extensions, list) {
342         ucs_list_del(&ext->list);
343         ucs_free(ext);
344     } /* Why not simply use while(!is_empty()) ? CLANG emits a false positive */
345 }
346 #endif
347 
ucg_worker_progress(ucg_worker_h worker)348 unsigned ucg_worker_progress(ucg_worker_h worker)
349 {
350     unsigned idx, ret = 0;
351     ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(worker);
352 
353     /* First, try the interfaces used for collectives */
354     for (idx = 0; idx < gctx->iface_cnt; idx++) {
355         ret += uct_iface_progress(gctx->ifaces[idx]);
356     }
357 
358     /* As a fallback (and for correctness - try all other transports */
359     return ucp_worker_progress(worker);
360 }
361 
ucg_group_progress(ucg_group_h group)362 unsigned ucg_group_progress(ucg_group_h group)
363 {
364     unsigned idx, ret = 0;
365     ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
366 
367     /* First, try the per-planner progress functions */
368     for (idx = 0; idx < gctx->num_planners; idx++) {
369         ucg_plan_component_t *planc = gctx->planners[idx].plan_component;
370         ret += planc->progress(group);
371     }
372     if (ret) {
373         return ret;
374     }
375 
376     /* Next, try the per-group interfaces */
377     for (idx = 0; idx < group->iface_cnt; idx++) {
378         ret += uct_iface_progress(group->ifaces[idx]);
379     }
380     if (ret) {
381         return ret;
382     }
383 
384     /* Lastly, try the "global" progress */
385     return ucg_worker_progress(group->worker);
386 }
387 
388 size_t ucg_ctx_worker_offset;
ucg_group_create(ucg_worker_h worker,const ucg_group_params_t * params,ucg_group_h * group_p)389 ucs_status_t ucg_group_create(ucg_worker_h worker,
390                               const ucg_group_params_t *params,
391                               ucg_group_h *group_p)
392 {
393     ucs_status_t status;
394     ucg_groups_t *ctx = UCG_WORKER_TO_GROUPS_CTX(worker);
395     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); // TODO: check where needed
396 
397     /* allocate a new group */
398     size_t distance_size              = sizeof(*params->distance) * params->member_count;
399     struct ucg_group *new_group       = ucs_malloc(sizeof(struct ucg_group) +
400             ctx->total_planner_sizes + distance_size, "communicator group");
401     if (new_group == NULL) {
402         status = UCS_ERR_NO_MEMORY;
403         goto cleanup_none;
404     }
405 
406     /* fill in the group fields */
407     new_group->is_barrier_outstanding = 0;
408     new_group->group_id               = ctx->next_id++;
409     new_group->worker                 = worker;
410     new_group->next_id                = 1; /* Some Transports don't like == 0... // TODO: fix wrap-around ! */
411     new_group->iface_cnt              = 0;
412 
413     ucs_queue_head_init(&new_group->pending);
414     kh_init_inplace(ucg_group_ep, &new_group->eps);
415     memcpy((ucg_group_params_t*)&new_group->params, params, sizeof(*params));
416     new_group->params.distance = (typeof(params->distance))((char*)(new_group
417             + 1) + ctx->total_planner_sizes);
418     memcpy(new_group->params.distance, params->distance, distance_size);
419     memset(new_group + 1, 0, ctx->total_planner_sizes);
420 
421     unsigned idx;
422     for (idx = 0; idx < UCG_GROUP_COLLECTIVE_MODIFIER_MASK; idx++) {
423         new_group->cache[idx] = NULL;
424     }
425 
426     /* Create a loopback connection, since resolve_cb may fail loopback */
427     ucp_ep_params_t ep_params = { .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS };
428     status = ucp_worker_get_address(worker, (ucp_address_t**)&ep_params.address,
429             &distance_size);
430     if (status != UCS_OK) {
431         return status;
432     }
433     ucp_ep_h loopback_ep;
434     status = ucp_ep_create(worker, &ep_params, &loopback_ep);
435     ucp_worker_release_address(worker, (ucp_address_t*)ep_params.address);
436     if (status != UCS_OK) {
437         return status;
438     }
439 
440     /* Store this loopback endpoint, for future reference */
441     ucg_group_member_index_t my_index = new_group->params.member_index;
442     ucs_assert(kh_get(ucg_group_ep, &new_group->eps, my_index) == kh_end(&new_group->eps));
443     khiter_t iter = kh_put(ucg_group_ep, &new_group->eps, my_index, (int*)&idx);
444     kh_value(&new_group->eps, iter) = loopback_ep;
445 
446     /* Initialize the planners (modules) */
447     for (idx = 0; idx < ctx->num_planners; idx++) {
448         /* Create the per-planner per-group context */
449         ucg_plan_component_t *planner = ctx->planners[idx].plan_component;
450         status = planner->create(planner, worker, new_group,
451                 new_group->group_id, &new_group->params);
452         if (status != UCS_OK) {
453             goto cleanup_planners;
454         }
455     }
456 
457     status = UCS_STATS_NODE_ALLOC(&new_group->stats,
458             &ucg_group_stats_class, worker->stats, "-%p", new_group);
459     if (status != UCS_OK) {
460         goto cleanup_planners;
461     }
462 
463     ucs_list_add_head(&ctx->groups_head, &new_group->list);
464     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
465     *group_p = new_group;
466     return UCS_OK;
467 
468 cleanup_planners:
469     while (idx) {
470         ucg_plan_component_t *planner = ctx->planners[idx--].plan_component;
471         planner->destroy((void*)new_group);
472     }
473     ucs_free(new_group);
474 
475 cleanup_none:
476     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
477     return status;
478 }
479 
ucg_group_get_params(ucg_group_h group)480 const ucg_group_params_t* ucg_group_get_params(ucg_group_h group)
481 {
482     return &group->params;
483 }
484 
ucg_group_destroy(ucg_group_h group)485 void ucg_group_destroy(ucg_group_h group)
486 {
487     /* First - make sure all the collectives are completed */
488     while (!ucs_queue_is_empty(&group->pending)) {
489         ucg_group_progress(group);
490     }
491 
492 #if ENABLE_MT
493     ucg_worker_h worker = group->worker;
494 #endif
495     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
496 
497     unsigned idx;
498     ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
499     for (idx = 0; idx < gctx->num_planners; idx++) {
500         ucg_plan_component_t *planc = gctx->planners[idx].plan_component;
501         planc->destroy(group);
502     }
503 
504     kh_destroy_inplace(ucg_group_ep, &group->eps);
505     UCS_STATS_NODE_FREE(group->stats);
506     ucs_list_del(&group->list);
507     ucs_free(group);
508 
509     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
510 }
511 
ucg_request_check_status(void * request)512 ucs_status_t ucg_request_check_status(void *request)
513 {
514     ucg_request_t *req = (ucg_request_t*)request - 1;
515 
516     if (req->flags & UCG_REQUEST_COMMON_FLAG_COMPLETED) {
517         ucs_assert(req->status != UCS_INPROGRESS);
518         return req->status;
519     }
520     return UCS_INPROGRESS;
521 }
522 
ucg_request_cancel(ucg_worker_h worker,void * request)523 void ucg_request_cancel(ucg_worker_h worker, void *request) { }
524 
ucg_request_free(void * request)525 void ucg_request_free(void *request) { }
526 
ucg_plan_select(ucg_group_h group,const char * planner_name,const ucg_collective_params_t * params,ucg_plan_component_t ** planc_p)527 ucs_status_t ucg_plan_select(ucg_group_h group, const char* planner_name,
528                              const ucg_collective_params_t *params,
529                              ucg_plan_component_t **planc_p)
530 {
531     ucg_groups_t *ctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
532     return ucg_plan_select_component(ctx->planners, ctx->num_planners,
533             planner_name, &group->params, params, planc_p);
534 }
535 
536 UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create,
537         (group, params, coll), ucg_group_h group,
538         const ucg_collective_params_t *params, ucg_coll_h *coll)
539 {
540     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(group->worker);
541 
542     /* check the recycling/cache for this collective */
543     ucg_op_t *op;
544     ucs_status_t status;
545     unsigned coll_mask = UCG_FLAG_MASK(params);
546     ucg_plan_t *plan = group->cache[coll_mask];
547     if (ucs_likely(plan != NULL)) {
548         ucs_list_for_each(op, &plan->op_head, list) {
549             if (!memcmp(&op->params, params, sizeof(*params))) {
550                 status = UCS_OK;
551                 goto op_found;
552             }
553         }
554 
555         UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_PLANS_USED, 1);
556         goto plan_found;
557     }
558 
559     /* select which plan to use for this collective operation */
560     ucg_plan_component_t *planc; // TODO: replace NULL with config value
561     status = ucg_plan_select(group, NULL, params, &planc);
562     if (status != UCS_OK) {
563         goto out;
564     }
565 
566     /* create the actual plan for the collective operation */
567     UCS_PROFILE_CODE("ucg_plan") {
568         ucs_trace_req("ucg_collective_create PLAN: planc=%s type=%x root=%lu",
569                 &planc->name[0], params->type.modifiers, (uint64_t)params->type.root);
570         status = ucg_plan(planc, &params->type, group, &plan);
571     }
572     if (status != UCS_OK) {
573         goto out;
574     }
575 
576     plan->planner           = planc;
577     plan->group             = group;
578     plan->type              = params->type;
579     plan->group_id          = group->group_id;
580     plan->group_size        = group->params.member_count;
581 #ifdef UCT_COLLECTIVES
582     plan->group_host_size   = group->worker->context->config.num_local_peers;
583 #endif
584     group->cache[coll_mask] = plan;
585     ucs_list_head_init(&plan->op_head);
586     UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_PLANS_CREATED, 1);
587 
588 plan_found:
589     UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_OPS_CREATED, 1);
590     UCS_PROFILE_CODE("ucg_prepare") {
591         status = ucg_prepare(plan, params, &op);
592     }
593     if (status != UCS_OK) {
594         goto out;
595     }
596 
597     ucs_trace_req("ucg_collective_create OP: planc=%s "
598             "params={type=%u, root=%lu, send=[%p,%i,%lu,%p,%p], "
599             "recv=[%p,%i,%lu,%p,%p], cb=%p, op=%p}", &planc->name[0],
600             (unsigned)params->type.modifiers, (uint64_t)params->type.root,
601             params->send.buf, params->send.count, params->send.dt_len,
602             params->send.dt_ext, params->send.displs,
603             params->recv.buf, params->recv.count, params->recv.dt_len,
604             params->recv.dt_ext, params->recv.displs,
605             params->comp_cb, params->recv.op_ext);
606 
607     ucs_list_add_head(&plan->op_head, &op->list);
608     memcpy(&op->params, params, sizeof(*params));
609     op->plan = plan;
610 
611 op_found:
612     *coll = op;
613 
614 out:
615     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(group->worker);
616     return status;
617 }
618 
619 ucs_status_t static UCS_F_ALWAYS_INLINE
ucg_collective_trigger(ucg_group_h group,ucg_op_t * op,ucg_request_t ** req)620 ucg_collective_trigger(ucg_group_h group, ucg_op_t *op, ucg_request_t **req)
621 {
622     /* Barrier effect - all new collectives are pending */
623     if (ucs_unlikely(op->params.type.modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER)) {
624         ucs_assert(group->is_barrier_outstanding == 0);
625         group->is_barrier_outstanding = 1;
626     }
627 
628     /* Start the first step of the collective operation */
629     ucs_status_t ret;
630     UCS_PROFILE_CODE("ucg_trigger") {
631         ret = ucg_trigger(op, group->next_id++, req);
632     }
633 
634     if (ret != UCS_INPROGRESS) {
635         UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_OPS_IMMEDIATE, 1);
636     }
637 
638     return ret;
639 }
640 
ucg_collective_release_barrier(ucg_group_h group)641 ucs_status_t ucg_collective_release_barrier(ucg_group_h group)
642 {
643     ucs_assert(group->is_barrier_outstanding);
644     group->is_barrier_outstanding = 0;
645     if (ucs_queue_is_empty(&group->pending)) {
646         return UCS_OK;
647     }
648 
649     ucs_status_t ret;
650     do {
651         /* Move the operation from the pending queue back to the original one */
652         ucg_op_t *op = (ucg_op_t*)ucs_queue_pull_non_empty(&group->pending);
653         ucg_request_t **req = op->pending_req;
654         ucs_list_add_head(&op->plan->op_head, &op->list);
655 
656         /* Start this next pending operation */
657         ret = ucg_collective_trigger(group, op, req);
658     } while ((!ucs_queue_is_empty(&group->pending)) &&
659              (!group->is_barrier_outstanding) &&
660              (ret == UCS_OK));
661 
662     return ret;
663 }
664 
665 ucs_status_t static UCS_F_ALWAYS_INLINE
ucg_collective_start(ucg_coll_h coll,ucg_request_t ** req)666 ucg_collective_start(ucg_coll_h coll, ucg_request_t **req)
667 {
668     ucs_status_t ret;
669     ucg_op_t *op = (ucg_op_t*)coll;
670     ucg_group_h group = op->plan->group;
671 
672     /* Since group was created - don't need UCP_CONTEXT_CHECK_FEATURE_FLAGS */
673     UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(group->worker);
674 
675     ucs_trace_req("ucg_collective_start: op=%p req=%p", coll, *req);
676 
677     if (ucs_unlikely(group->is_barrier_outstanding)) {
678         ucs_list_del(&op->list);
679         ucs_queue_push(&group->pending, &op->queue);
680         op->pending_req = req;
681         ret = UCS_INPROGRESS;
682     } else {
683         ret = ucg_collective_trigger(group, op, req);
684     }
685 
686     UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_OPS_USED, 1);
687     UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(group->worker);
688     return ret;
689 }
690 
691 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucg_collective_start_nb,
692                  (coll), ucg_coll_h coll)
693 {
694     ucg_request_t *req = NULL;
695     ucs_status_ptr_t ret = UCS_STATUS_PTR(ucg_collective_start(coll, &req));
696     return UCS_PTR_IS_ERR(ret) ? ret : req;
697 }
698 
699 UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_start_nbr,
700                  (coll, request), ucg_coll_h coll, void *request)
701 {
702     return ucg_collective_start(coll, (ucg_request_t**)&request);
703 }
704 
ucg_collective_destroy(ucg_coll_h coll)705 void ucg_collective_destroy(ucg_coll_h coll)
706 {
707     ucg_discard((ucg_op_t*)coll);
708 }
709 
ucg_worker_groups_init(ucp_worker_h worker,unsigned * next_am_id,void * groups_ctx)710 static ucs_status_t ucg_worker_groups_init(ucp_worker_h worker,
711         unsigned *next_am_id, void *groups_ctx)
712 {
713     ucg_groups_t *gctx  = (ucg_groups_t*)groups_ctx;
714     ucs_status_t status = ucg_plan_query(next_am_id, &gctx->planners, &gctx->num_planners);
715     if (status != UCS_OK) {
716         return status;
717     }
718 
719     unsigned planner_idx;
720     size_t group_ctx_offset  = sizeof(struct ucg_group);
721     size_t global_ctx_offset = ucg_ctx_worker_offset + sizeof(ucg_groups_t);
722     for (planner_idx = 0; planner_idx < gctx->num_planners; planner_idx++) {
723         ucg_plan_desc_t* planner    = &gctx->planners[planner_idx];
724         ucg_plan_component_t* planc = planner->plan_component;
725         planc->global_ctx_offset    = global_ctx_offset;
726         global_ctx_offset          += planc->global_context_size;
727         planc->group_ctx_offset     = group_ctx_offset;
728         group_ctx_offset           += planc->group_context_size;
729     }
730 
731     gctx->next_id             = 0;
732     gctx->iface_cnt           = 0;
733     gctx->total_planner_sizes = group_ctx_offset;
734 #ifdef UCT_COLLECTIVES
735     gctx->num_local_peers     = worker->context->config.num_local_peers;
736     gctx->my_local_peer_idx   = worker->context->config.my_local_peer_idx;
737 #endif
738     ucs_list_head_init(&gctx->groups_head);
739     return UCS_OK;
740 }
741 
ucg_worker_groups_cleanup(void * groups_ctx)742 static void ucg_worker_groups_cleanup(void *groups_ctx)
743 {
744     ucg_groups_t *gctx = (ucg_groups_t*)groups_ctx;
745 
746     ucg_group_h group, tmp;
747     if (!ucs_list_is_empty(&gctx->groups_head)) {
748         ucs_list_for_each_safe(group, tmp, &gctx->groups_head, list) {
749             ucg_group_destroy(group);
750         }
751     }
752 
753     ucg_plan_release_list(gctx->planners, gctx->num_planners);
754 }
755 
ucg_extend_ucp(const ucg_params_t * params,const ucg_config_t * config,ucg_context_h * context_p)756 static ucs_status_t ucg_extend_ucp(const ucg_params_t *params,
757                                    const ucg_config_t *config,
758                                    ucg_context_h *context_p)
759 {
760 #ifndef HAVE_UCP_EXTENSIONS
761         /* Since UCP doesn't support extensions (yet?) - we need to allocate... */
762         ucg_context_h ucg_context = ucs_calloc(1, sizeof(*ucg_context), "ucg context");
763         if (!ucg_context) {
764             return UCS_ERR_NO_MEMORY;
765         }
766 
767         ucg_context->last_am_id = 0;
768         ucg_context->super = (ucp_context_h)context_p;
769         ucg_context->worker_size = sizeof(ucp_worker_t) + sizeof(ucp_ep_config_t) *
770                 ucs_min((ucg_context->super->num_tls + 1) *
771                         (ucg_context->super->num_tls + 1) *
772                         (ucg_context->super->num_tls),
773                         UINT8_MAX);
774         *context_p = ucg_context;
775 #endif
776 
777         size_t ctx_size = sizeof(ucg_groups_t) +
778                 ucs_list_length(&ucg_plan_components_list) * sizeof(void*);
779         ucs_list_head_init(&(*context_p)->extensions);
780         return ucp_extend(*context_p, ctx_size, ucg_worker_groups_init,
781                 ucg_worker_groups_cleanup, &ucg_ctx_worker_offset);
782 }
783 
ucg_init_version(unsigned api_major_version,unsigned api_minor_version,const ucg_params_t * params,const ucg_config_t * config,ucg_context_h * context_p)784 ucs_status_t ucg_init_version(unsigned api_major_version,
785                               unsigned api_minor_version,
786                               const ucg_params_t *params,
787                               const ucg_config_t *config,
788                               ucg_context_h *context_p)
789 {
790     ucs_status_t status = ucp_init_version(api_major_version, api_minor_version,
791                                            params, config, (ucp_context_h*)context_p);
792     if (status == UCS_OK) {
793         status = ucg_extend_ucp(params, config, context_p);
794     }
795     return status;
796 }
797 
ucg_init(const ucg_params_t * params,const ucg_config_t * config,ucg_context_h * context_p)798 ucs_status_t ucg_init(const ucg_params_t *params,
799                       const ucg_config_t *config,
800                       ucg_context_h *context_p)
801 {
802     ucs_status_t status = ucp_init(params, config, (ucp_context_h*)context_p);
803     if (status == UCS_OK) {
804         status = ucg_extend_ucp(params, config, context_p);
805     }
806     return status;
807 }
808 
ucg_plan_connect(ucg_group_h group,ucg_group_member_index_t idx,enum ucg_plan_connect_flags flags,uct_ep_h * ep_p,const uct_iface_attr_t ** ep_attr_p,uct_md_h * md_p,const uct_md_attr_t ** md_attr_p)809 ucs_status_t ucg_plan_connect(ucg_group_h group,
810                               ucg_group_member_index_t idx,
811                               enum ucg_plan_connect_flags flags,
812                               uct_ep_h *ep_p, const uct_iface_attr_t **ep_attr_p,
813                               uct_md_h *md_p, const uct_md_attr_t    **md_attr_p)
814 {
815     int ret;
816     ucs_status_t status;
817     size_t remote_addr_len;
818     ucp_address_t *remote_addr = NULL;
819 
820     /* Look-up the UCP endpoint based on the index */
821     ucp_ep_h ucp_ep;
822     khiter_t iter = kh_get(ucg_group_ep, &group->eps, idx);
823     if (iter != kh_end(&group->eps)) {
824         /* Use the cached connection */
825         ucp_ep = kh_value(&group->eps, iter);
826     } else {
827         /* fill-in UCP connection parameters */
828         status = group->params.resolve_address_f(group->params.cb_group_obj,
829                 idx, &remote_addr, &remote_addr_len);
830         if (status != UCS_OK) {
831             ucs_error("failed to obtain a UCP endpoint from the external callback");
832             return status;
833         }
834 
835         /* special case: connecting to a zero-length address means it's "debugging" */
836         if (ucs_unlikely(remote_addr_len == 0)) {
837             *ep_p = NULL;
838             return UCS_OK;
839         }
840 
841         /* create an endpoint for communication with the remote member */
842         ucp_ep_params_t ep_params = {
843                 .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS,
844                 .address = remote_addr
845         };
846         status = ucp_ep_create(group->worker, &ep_params, &ucp_ep);
847         group->params.release_address_f(remote_addr);
848         if (status != UCS_OK) {
849             return status;
850         }
851 
852         /* Store this endpoint, for future reference */
853         iter = kh_put(ucg_group_ep, &group->eps, idx, &ret);
854         kh_value(&group->eps, iter) = ucp_ep;
855     }
856 
857     /* Connect for point-to-point communication */
858     ucp_lane_index_t lane;
859 am_retry:
860 #ifdef UCT_COLLECTIVES
861     if (flags & UCG_PLAN_CONNECT_FLAG_WANT_INCAST) {
862         lane = ucp_ep_get_incast_lane(ucp_ep);
863         if (ucs_unlikely(lane == UCP_NULL_LANE)) {
864             ucs_warn("No transports with native incast support were found,"
865                      " falling back to P2P transports (slower)");
866             return UCS_ERR_UNREACHABLE;
867         }
868         *ep_p = ucp_ep_get_incast_uct_ep(ucp_ep);
869     } else if (flags & UCG_PLAN_CONNECT_FLAG_WANT_BCAST) {
870         lane = ucp_ep_get_bcast_lane(ucp_ep);
871         if (ucs_unlikely(lane == UCP_NULL_LANE)) {
872             ucs_warn("No transports with native broadcast support were found,"
873                      " falling back to P2P transports (slower)");
874             return UCS_ERR_UNREACHABLE;
875         }
876         *ep_p = ucp_ep_get_bcast_uct_ep(ucp_ep);
877     } else
878 #endif /* UCT_COLLECTIVES */
879     {
880         lane  = ucp_ep_get_am_lane(ucp_ep);
881         *ep_p = ucp_ep_get_am_uct_ep(ucp_ep);
882     }
883 
884     if (*ep_p == NULL) {
885         status = ucp_wireup_connect_remote(ucp_ep, lane);
886         if (status != UCS_OK) {
887             return status;
888         }
889         goto am_retry; /* Just to obtain the right lane */
890     }
891 
892     if (ucp_proxy_ep_test(*ep_p)) {
893         ucp_proxy_ep_t *proxy_ep = ucs_derived_of(*ep_p, ucp_proxy_ep_t);
894         *ep_p = proxy_ep->uct_ep;
895         ucs_assert(*ep_p != NULL);
896     }
897 
898     ucs_assert((*ep_p)->iface != NULL);
899     if ((*ep_p)->iface->ops.ep_am_short ==
900             (typeof((*ep_p)->iface->ops.ep_am_short))
901             ucs_empty_function_return_no_resource) {
902         ucp_worker_progress(group->worker);
903         goto am_retry;
904     }
905 
906     /* Register interfaces to be progressed in future calls */
907     ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
908     UCG_GROUP_PROGRESS_ADD((*ep_p)->iface, gctx);
909     UCG_GROUP_PROGRESS_ADD((*ep_p)->iface, group);
910 
911     *md_p      = ucp_ep_md(ucp_ep, lane);
912     *md_attr_p = ucp_ep_md_attr(ucp_ep, lane);
913     *ep_attr_p = ucp_ep_get_iface_attr(ucp_ep, lane);
914 
915     return UCS_OK;
916 }
917 
ucg_plan_query_resources(ucg_group_h group,ucg_plan_resources_t ** resources)918 ucs_status_t ucg_plan_query_resources(ucg_group_h group,
919                                       ucg_plan_resources_t **resources)
920 {
921     return UCS_OK;
922 }
923