1 /*
2 * Copyright (C) Huawei Technologies Co., Ltd. 2019. ALL RIGHTS RESERVED.
3 * See file LICENSE for terms.
4 */
5
6 #include "ucg_group.h"
7
8 #include <ucs/datastruct/queue.h>
9 #include <ucs/datastruct/list.h>
10 #include <ucs/profile/profile.h>
11 #include <ucs/debug/memtrack.h>
12 #include <ucp/core/ucp_ep.inl>
13 #include <ucp/core/ucp_worker.h>
14 #include <ucp/core/ucp_ep.inl>
15 #include <ucp/core/ucp_proxy_ep.h> /* for @ref ucp_proxy_ep_test */
16
17 #if ENABLE_STATS
18 /**
19 * UCG group statistics counters
20 */
21 enum {
22 UCG_GROUP_STAT_PLANS_CREATED,
23 UCG_GROUP_STAT_PLANS_USED,
24
25 UCG_GROUP_STAT_OPS_CREATED,
26 UCG_GROUP_STAT_OPS_USED,
27 UCG_GROUP_STAT_OPS_IMMEDIATE,
28
29 UCG_GROUP_STAT_LAST
30 };
31
32 static ucs_stats_class_t ucg_group_stats_class = {
33 .name = "ucg_group",
34 .num_counters = UCG_GROUP_STAT_LAST,
35 .counter_names = {
36 [UCG_GROUP_STAT_PLANS_CREATED] = "plans_created",
37 [UCG_GROUP_STAT_PLANS_USED] = "plans_reused",
38 [UCG_GROUP_STAT_OPS_CREATED] = "ops_created",
39 [UCG_GROUP_STAT_OPS_USED] = "ops_started",
40 [UCG_GROUP_STAT_OPS_IMMEDIATE] = "ops_immediate"
41 }
42 };
43 #endif
44
45 #define UCG_GROUP_PROGRESS_ADD(iface, ctx) { \
46 unsigned idx = 0; \
47 if (ucs_unlikely(idx == UCG_GROUP_MAX_IFACES)) { \
48 return UCS_ERR_EXCEEDS_LIMIT; \
49 } \
50 \
51 while (idx < (ctx)->iface_cnt) { \
52 if ((ctx)->ifaces[idx] == (iface)) { \
53 break; \
54 } \
55 idx++; \
56 } \
57 \
58 if (idx == (ctx)->iface_cnt) { \
59 (ctx)->ifaces[(ctx)->iface_cnt++] = (iface); \
60 } \
61 }
62
63 __KHASH_IMPL(ucg_group_ep, static UCS_F_MAYBE_UNUSED inline,
64 ucg_group_member_index_t, ucp_ep_h, 1, kh_int64_hash_func,
65 kh_int64_hash_equal);
66
67 #ifndef HAVE_UCP_EXTENSIONS
68 /**
69 * This code is intended to reside in UCP, but is "hosted" in UCG for now...
70 */
71
72 typedef ucs_status_t (*ucp_ext_init_f)(ucp_worker_h worker,
73 unsigned *next_am_id,
74 void *ext_ctx);
75
76 typedef void (*ucp_ext_cleanup_f)(void *ext_ctx);
77
78 typedef struct ucp_context_extenstion {
79 ucs_list_link_t list;
80 size_t worker_offset;
81 ucp_ext_init_f init;
82 ucp_ext_cleanup_f cleanup;
83 } ucp_context_extension_t;
84
85 struct ucg_context {
86 #ifndef HAVE_UCP_EXTENSIONS
87 ucp_context_t *super; /* Extending the UCP context */
88 #else
89 ucp_context_t super; /* Extending the UCP context */
90 #endif
91 size_t worker_size; /* Worker allocation size */
92 ucs_list_link_t extensions; /* List of registered extensions */
93 unsigned last_am_id; /* Last used AM ID */
94 };
95
96 #ifndef HAVE_UCP_EXTENSIONS
97 /*
98 * Per the UCX community's request to make minimal changes to accomodate UCG,
99 * this code was added - duplicating UCP worker creation in a way UCG can use.
100 */
101
102 #include <ucp/core/ucp_worker.c> /* Needed to provide worker creation logic */
103
ucp_worker_create_by_size(ucp_context_h context,const ucp_worker_params_t * params,size_t worker_size,ucp_worker_h * worker_p)104 ucs_status_t ucp_worker_create_by_size(ucp_context_h context,
105 const ucp_worker_params_t *params,
106 size_t worker_size,
107 ucp_worker_h *worker_p)
108 {
109 ucs_thread_mode_t uct_thread_mode;
110 unsigned config_count;
111 unsigned name_length;
112 ucp_worker_h worker;
113 ucs_status_t status;
114
115 config_count = ucs_min((context->num_tls + 1) * (context->num_tls + 1) * context->num_tls,
116 UINT8_MAX);
117
118 worker = ucs_calloc(1, worker_size, "ucp worker");
119 if (worker == NULL) {
120 return UCS_ERR_NO_MEMORY;
121 }
122
123 uct_thread_mode = UCS_THREAD_MODE_SINGLE;
124 worker->flags = 0;
125
126 if (params->field_mask & UCP_WORKER_PARAM_FIELD_THREAD_MODE) {
127 #if ENABLE_MT
128 if (params->thread_mode != UCS_THREAD_MODE_SINGLE) {
129 /* UCT is serialized by UCP lock or by UCP user */
130 uct_thread_mode = UCS_THREAD_MODE_SERIALIZED;
131 }
132
133 if (params->thread_mode == UCS_THREAD_MODE_MULTI) {
134 worker->flags |= UCP_WORKER_FLAG_MT;
135 }
136 #else
137 if (params->thread_mode != UCS_THREAD_MODE_SINGLE) {
138 ucs_debug("forced single thread mode on worker create");
139 }
140 #endif
141 }
142
143 worker->context = context;
144 worker->uuid = ucs_generate_uuid((uintptr_t)worker);
145 worker->flush_ops_count = 0;
146 worker->inprogress = 0;
147 worker->ep_config_max = config_count;
148 worker->ep_config_count = 0;
149 worker->num_active_ifaces = 0;
150 worker->num_ifaces = 0;
151 worker->am_message_id = ucs_generate_uuid(0);
152 ucs_list_head_init(&worker->arm_ifaces);
153 ucs_list_head_init(&worker->stream_ready_eps);
154 ucs_list_head_init(&worker->all_eps);
155 ucp_ep_match_init(&worker->ep_match_ctx);
156
157 UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_gen_t) <= sizeof(ucp_ep_t));
158 if (context->config.features & (UCP_FEATURE_STREAM | UCP_FEATURE_AM)) {
159 UCS_STATIC_ASSERT(sizeof(ucp_ep_ext_proto_t) <= sizeof(ucp_ep_t));
160 ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 3);
161 } else {
162 ucs_strided_alloc_init(&worker->ep_alloc, sizeof(ucp_ep_t), 2);
163 }
164
165 if (params->field_mask & UCP_WORKER_PARAM_FIELD_USER_DATA) {
166 worker->user_data = params->user_data;
167 } else {
168 worker->user_data = NULL;
169 }
170
171 if (context->config.features & UCP_FEATURE_AM){
172 worker->am_cbs = NULL;
173 worker->am_cb_array_len = 0;
174 }
175 name_length = ucs_min(UCP_WORKER_NAME_MAX,
176 context->config.ext.max_worker_name + 1);
177 ucs_snprintf_zero(worker->name, name_length, "%s:%d", ucs_get_host_name(),
178 getpid());
179
180 /* Create statistics */
181 status = UCS_STATS_NODE_ALLOC(&worker->stats, &ucp_worker_stats_class,
182 ucs_stats_get_root(), "-%p", worker);
183 if (status != UCS_OK) {
184 goto err_free;
185 }
186
187 status = UCS_STATS_NODE_ALLOC(&worker->tm_offload_stats,
188 &ucp_worker_tm_offload_stats_class,
189 worker->stats);
190 if (status != UCS_OK) {
191 goto err_free_stats;
192 }
193
194 status = ucs_async_context_init(&worker->async,
195 context->config.ext.use_mt_mutex ?
196 UCS_ASYNC_MODE_THREAD_MUTEX :
197 UCS_ASYNC_THREAD_LOCK_TYPE);
198 if (status != UCS_OK) {
199 goto err_free_tm_offload_stats;
200 }
201
202 /* Create the underlying UCT worker */
203 status = uct_worker_create(&worker->async, uct_thread_mode, &worker->uct);
204 if (status != UCS_OK) {
205 goto err_destroy_async;
206 }
207
208 /* Create memory pool for requests */
209 status = ucs_mpool_init(&worker->req_mp, 0,
210 sizeof(ucp_request_t) + context->config.request.size,
211 0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
212 &ucp_request_mpool_ops, "ucp_requests");
213 if (status != UCS_OK) {
214 goto err_destroy_uct_worker;
215 }
216
217 /* create memory pool for small rkeys */
218 status = ucs_mpool_init(&worker->rkey_mp, 0,
219 sizeof(ucp_rkey_t) +
220 sizeof(ucp_tl_rkey_t) * UCP_RKEY_MPOOL_MAX_MD,
221 0, UCS_SYS_CACHE_LINE_SIZE, 128, UINT_MAX,
222 &ucp_rkey_mpool_ops, "ucp_rkeys");
223 if (status != UCS_OK) {
224 goto err_req_mp_cleanup;
225 }
226
227 /* Create UCS event set which combines events from all transports */
228 status = ucp_worker_wakeup_init(worker, params);
229 if (status != UCS_OK) {
230 goto err_rkey_mp_cleanup;
231 }
232
233 if (params->field_mask & UCP_WORKER_PARAM_FIELD_CPU_MASK) {
234 worker->cpu_mask = params->cpu_mask;
235 } else {
236 UCS_CPU_ZERO(&worker->cpu_mask);
237 }
238
239 /* Initialize tag matching */
240 status = ucp_tag_match_init(&worker->tm);
241 if (status != UCS_OK) {
242 goto err_wakeup_cleanup;
243 }
244
245 /* Open all resources as interfaces on this worker */
246 status = ucp_worker_add_resource_ifaces(worker);
247 if (status != UCS_OK) {
248 goto err_close_ifaces;
249 }
250
251 /* Open all resources as connection managers on this worker */
252 status = ucp_worker_add_resource_cms(worker);
253 if (status != UCS_OK) {
254 goto err_close_cms;
255 }
256
257 /* create mem type endponts */
258 status = ucp_worker_create_mem_type_endpoints(worker);
259 if (status != UCS_OK) {
260 goto err_close_cms;
261 }
262
263 /* Init AM and registered memory pools */
264 status = ucp_worker_init_mpools(worker);
265 if (status != UCS_OK) {
266 goto err_close_cms;
267 }
268
269 /* Select atomic resources */
270 ucp_worker_init_atomic_tls(worker);
271
272 /* At this point all UCT memory domains and interfaces are already created
273 * so warn about unused environment variables.
274 */
275 ucs_config_parser_warn_unused_env_vars_once();
276
277 *worker_p = worker;
278 return UCS_OK;
279
280 err_close_cms:
281 ucp_worker_close_cms(worker);
282 err_close_ifaces:
283 ucp_worker_close_ifaces(worker);
284 ucp_tag_match_cleanup(&worker->tm);
285 err_wakeup_cleanup:
286 ucp_worker_wakeup_cleanup(worker);
287 err_rkey_mp_cleanup:
288 ucs_mpool_cleanup(&worker->rkey_mp, 1);
289 err_req_mp_cleanup:
290 ucs_mpool_cleanup(&worker->req_mp, 1);
291 err_destroy_uct_worker:
292 uct_worker_destroy(worker->uct);
293 err_destroy_async:
294 ucs_async_context_cleanup(&worker->async);
295 err_free_tm_offload_stats:
296 UCS_STATS_NODE_FREE(worker->tm_offload_stats);
297 err_free_stats:
298 UCS_STATS_NODE_FREE(worker->stats);
299 err_free:
300 ucs_strided_alloc_cleanup(&worker->ep_alloc);
301 ucs_free(worker);
302 return status;
303 }
304
ucg_worker_create(ucg_context_h context,const ucp_worker_params_t * params,ucg_worker_h * worker_p)305 ucs_status_t ucg_worker_create(ucg_context_h context,
306 const ucp_worker_params_t *params,
307 ucg_worker_h *worker_p)
308 {
309 return ucp_worker_create_by_size(context->super, params,
310 context->worker_size, worker_p);
311 }
312
ucg_cleanup(ucg_context_h context)313 void ucg_cleanup(ucg_context_h context) {
314 ucp_cleanup(context->super);
315 ucs_free(context);
316 }
317 #endif
318
ucp_extend(ucg_context_h context,size_t extension_ctx_length,ucp_ext_init_f init,ucp_ext_cleanup_f cleanup,size_t * extension_ctx_offset_in_worker)319 ucs_status_t ucp_extend(ucg_context_h context, size_t extension_ctx_length,
320 ucp_ext_init_f init, ucp_ext_cleanup_f cleanup,
321 size_t *extension_ctx_offset_in_worker)
322 {
323 ucp_context_extension_t *ext = ucs_malloc(sizeof(*ext), "context extension");
324 if (!ext) {
325 return UCS_ERR_NO_MEMORY;
326 }
327
328 ext->init = init;
329 ext->cleanup = cleanup;
330 ext->worker_offset = context->worker_size;
331 context->worker_size += extension_ctx_length;
332 *extension_ctx_offset_in_worker = ext->worker_offset;
333
334 ucs_list_add_tail(&context->extensions, &ext->list);
335 return UCS_OK;
336 }
337
ucp_extension_cleanup(ucg_context_h context)338 void ucp_extension_cleanup(ucg_context_h context)
339 {
340 ucp_context_extension_t *ext, *iter;
341 ucs_list_for_each_safe(ext, iter, &context->extensions, list) {
342 ucs_list_del(&ext->list);
343 ucs_free(ext);
344 } /* Why not simply use while(!is_empty()) ? CLANG emits a false positive */
345 }
346 #endif
347
ucg_worker_progress(ucg_worker_h worker)348 unsigned ucg_worker_progress(ucg_worker_h worker)
349 {
350 unsigned idx, ret = 0;
351 ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(worker);
352
353 /* First, try the interfaces used for collectives */
354 for (idx = 0; idx < gctx->iface_cnt; idx++) {
355 ret += uct_iface_progress(gctx->ifaces[idx]);
356 }
357
358 /* As a fallback (and for correctness - try all other transports */
359 return ucp_worker_progress(worker);
360 }
361
ucg_group_progress(ucg_group_h group)362 unsigned ucg_group_progress(ucg_group_h group)
363 {
364 unsigned idx, ret = 0;
365 ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
366
367 /* First, try the per-planner progress functions */
368 for (idx = 0; idx < gctx->num_planners; idx++) {
369 ucg_plan_component_t *planc = gctx->planners[idx].plan_component;
370 ret += planc->progress(group);
371 }
372 if (ret) {
373 return ret;
374 }
375
376 /* Next, try the per-group interfaces */
377 for (idx = 0; idx < group->iface_cnt; idx++) {
378 ret += uct_iface_progress(group->ifaces[idx]);
379 }
380 if (ret) {
381 return ret;
382 }
383
384 /* Lastly, try the "global" progress */
385 return ucg_worker_progress(group->worker);
386 }
387
388 size_t ucg_ctx_worker_offset;
ucg_group_create(ucg_worker_h worker,const ucg_group_params_t * params,ucg_group_h * group_p)389 ucs_status_t ucg_group_create(ucg_worker_h worker,
390 const ucg_group_params_t *params,
391 ucg_group_h *group_p)
392 {
393 ucs_status_t status;
394 ucg_groups_t *ctx = UCG_WORKER_TO_GROUPS_CTX(worker);
395 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker); // TODO: check where needed
396
397 /* allocate a new group */
398 size_t distance_size = sizeof(*params->distance) * params->member_count;
399 struct ucg_group *new_group = ucs_malloc(sizeof(struct ucg_group) +
400 ctx->total_planner_sizes + distance_size, "communicator group");
401 if (new_group == NULL) {
402 status = UCS_ERR_NO_MEMORY;
403 goto cleanup_none;
404 }
405
406 /* fill in the group fields */
407 new_group->is_barrier_outstanding = 0;
408 new_group->group_id = ctx->next_id++;
409 new_group->worker = worker;
410 new_group->next_id = 1; /* Some Transports don't like == 0... // TODO: fix wrap-around ! */
411 new_group->iface_cnt = 0;
412
413 ucs_queue_head_init(&new_group->pending);
414 kh_init_inplace(ucg_group_ep, &new_group->eps);
415 memcpy((ucg_group_params_t*)&new_group->params, params, sizeof(*params));
416 new_group->params.distance = (typeof(params->distance))((char*)(new_group
417 + 1) + ctx->total_planner_sizes);
418 memcpy(new_group->params.distance, params->distance, distance_size);
419 memset(new_group + 1, 0, ctx->total_planner_sizes);
420
421 unsigned idx;
422 for (idx = 0; idx < UCG_GROUP_COLLECTIVE_MODIFIER_MASK; idx++) {
423 new_group->cache[idx] = NULL;
424 }
425
426 /* Create a loopback connection, since resolve_cb may fail loopback */
427 ucp_ep_params_t ep_params = { .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS };
428 status = ucp_worker_get_address(worker, (ucp_address_t**)&ep_params.address,
429 &distance_size);
430 if (status != UCS_OK) {
431 return status;
432 }
433 ucp_ep_h loopback_ep;
434 status = ucp_ep_create(worker, &ep_params, &loopback_ep);
435 ucp_worker_release_address(worker, (ucp_address_t*)ep_params.address);
436 if (status != UCS_OK) {
437 return status;
438 }
439
440 /* Store this loopback endpoint, for future reference */
441 ucg_group_member_index_t my_index = new_group->params.member_index;
442 ucs_assert(kh_get(ucg_group_ep, &new_group->eps, my_index) == kh_end(&new_group->eps));
443 khiter_t iter = kh_put(ucg_group_ep, &new_group->eps, my_index, (int*)&idx);
444 kh_value(&new_group->eps, iter) = loopback_ep;
445
446 /* Initialize the planners (modules) */
447 for (idx = 0; idx < ctx->num_planners; idx++) {
448 /* Create the per-planner per-group context */
449 ucg_plan_component_t *planner = ctx->planners[idx].plan_component;
450 status = planner->create(planner, worker, new_group,
451 new_group->group_id, &new_group->params);
452 if (status != UCS_OK) {
453 goto cleanup_planners;
454 }
455 }
456
457 status = UCS_STATS_NODE_ALLOC(&new_group->stats,
458 &ucg_group_stats_class, worker->stats, "-%p", new_group);
459 if (status != UCS_OK) {
460 goto cleanup_planners;
461 }
462
463 ucs_list_add_head(&ctx->groups_head, &new_group->list);
464 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
465 *group_p = new_group;
466 return UCS_OK;
467
468 cleanup_planners:
469 while (idx) {
470 ucg_plan_component_t *planner = ctx->planners[idx--].plan_component;
471 planner->destroy((void*)new_group);
472 }
473 ucs_free(new_group);
474
475 cleanup_none:
476 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
477 return status;
478 }
479
ucg_group_get_params(ucg_group_h group)480 const ucg_group_params_t* ucg_group_get_params(ucg_group_h group)
481 {
482 return &group->params;
483 }
484
ucg_group_destroy(ucg_group_h group)485 void ucg_group_destroy(ucg_group_h group)
486 {
487 /* First - make sure all the collectives are completed */
488 while (!ucs_queue_is_empty(&group->pending)) {
489 ucg_group_progress(group);
490 }
491
492 #if ENABLE_MT
493 ucg_worker_h worker = group->worker;
494 #endif
495 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(worker);
496
497 unsigned idx;
498 ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
499 for (idx = 0; idx < gctx->num_planners; idx++) {
500 ucg_plan_component_t *planc = gctx->planners[idx].plan_component;
501 planc->destroy(group);
502 }
503
504 kh_destroy_inplace(ucg_group_ep, &group->eps);
505 UCS_STATS_NODE_FREE(group->stats);
506 ucs_list_del(&group->list);
507 ucs_free(group);
508
509 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(worker);
510 }
511
ucg_request_check_status(void * request)512 ucs_status_t ucg_request_check_status(void *request)
513 {
514 ucg_request_t *req = (ucg_request_t*)request - 1;
515
516 if (req->flags & UCG_REQUEST_COMMON_FLAG_COMPLETED) {
517 ucs_assert(req->status != UCS_INPROGRESS);
518 return req->status;
519 }
520 return UCS_INPROGRESS;
521 }
522
ucg_request_cancel(ucg_worker_h worker,void * request)523 void ucg_request_cancel(ucg_worker_h worker, void *request) { }
524
ucg_request_free(void * request)525 void ucg_request_free(void *request) { }
526
ucg_plan_select(ucg_group_h group,const char * planner_name,const ucg_collective_params_t * params,ucg_plan_component_t ** planc_p)527 ucs_status_t ucg_plan_select(ucg_group_h group, const char* planner_name,
528 const ucg_collective_params_t *params,
529 ucg_plan_component_t **planc_p)
530 {
531 ucg_groups_t *ctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
532 return ucg_plan_select_component(ctx->planners, ctx->num_planners,
533 planner_name, &group->params, params, planc_p);
534 }
535
536 UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_create,
537 (group, params, coll), ucg_group_h group,
538 const ucg_collective_params_t *params, ucg_coll_h *coll)
539 {
540 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(group->worker);
541
542 /* check the recycling/cache for this collective */
543 ucg_op_t *op;
544 ucs_status_t status;
545 unsigned coll_mask = UCG_FLAG_MASK(params);
546 ucg_plan_t *plan = group->cache[coll_mask];
547 if (ucs_likely(plan != NULL)) {
548 ucs_list_for_each(op, &plan->op_head, list) {
549 if (!memcmp(&op->params, params, sizeof(*params))) {
550 status = UCS_OK;
551 goto op_found;
552 }
553 }
554
555 UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_PLANS_USED, 1);
556 goto plan_found;
557 }
558
559 /* select which plan to use for this collective operation */
560 ucg_plan_component_t *planc; // TODO: replace NULL with config value
561 status = ucg_plan_select(group, NULL, params, &planc);
562 if (status != UCS_OK) {
563 goto out;
564 }
565
566 /* create the actual plan for the collective operation */
567 UCS_PROFILE_CODE("ucg_plan") {
568 ucs_trace_req("ucg_collective_create PLAN: planc=%s type=%x root=%lu",
569 &planc->name[0], params->type.modifiers, (uint64_t)params->type.root);
570 status = ucg_plan(planc, ¶ms->type, group, &plan);
571 }
572 if (status != UCS_OK) {
573 goto out;
574 }
575
576 plan->planner = planc;
577 plan->group = group;
578 plan->type = params->type;
579 plan->group_id = group->group_id;
580 plan->group_size = group->params.member_count;
581 #ifdef UCT_COLLECTIVES
582 plan->group_host_size = group->worker->context->config.num_local_peers;
583 #endif
584 group->cache[coll_mask] = plan;
585 ucs_list_head_init(&plan->op_head);
586 UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_PLANS_CREATED, 1);
587
588 plan_found:
589 UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_OPS_CREATED, 1);
590 UCS_PROFILE_CODE("ucg_prepare") {
591 status = ucg_prepare(plan, params, &op);
592 }
593 if (status != UCS_OK) {
594 goto out;
595 }
596
597 ucs_trace_req("ucg_collective_create OP: planc=%s "
598 "params={type=%u, root=%lu, send=[%p,%i,%lu,%p,%p], "
599 "recv=[%p,%i,%lu,%p,%p], cb=%p, op=%p}", &planc->name[0],
600 (unsigned)params->type.modifiers, (uint64_t)params->type.root,
601 params->send.buf, params->send.count, params->send.dt_len,
602 params->send.dt_ext, params->send.displs,
603 params->recv.buf, params->recv.count, params->recv.dt_len,
604 params->recv.dt_ext, params->recv.displs,
605 params->comp_cb, params->recv.op_ext);
606
607 ucs_list_add_head(&plan->op_head, &op->list);
608 memcpy(&op->params, params, sizeof(*params));
609 op->plan = plan;
610
611 op_found:
612 *coll = op;
613
614 out:
615 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(group->worker);
616 return status;
617 }
618
619 ucs_status_t static UCS_F_ALWAYS_INLINE
ucg_collective_trigger(ucg_group_h group,ucg_op_t * op,ucg_request_t ** req)620 ucg_collective_trigger(ucg_group_h group, ucg_op_t *op, ucg_request_t **req)
621 {
622 /* Barrier effect - all new collectives are pending */
623 if (ucs_unlikely(op->params.type.modifiers & UCG_GROUP_COLLECTIVE_MODIFIER_BARRIER)) {
624 ucs_assert(group->is_barrier_outstanding == 0);
625 group->is_barrier_outstanding = 1;
626 }
627
628 /* Start the first step of the collective operation */
629 ucs_status_t ret;
630 UCS_PROFILE_CODE("ucg_trigger") {
631 ret = ucg_trigger(op, group->next_id++, req);
632 }
633
634 if (ret != UCS_INPROGRESS) {
635 UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_OPS_IMMEDIATE, 1);
636 }
637
638 return ret;
639 }
640
ucg_collective_release_barrier(ucg_group_h group)641 ucs_status_t ucg_collective_release_barrier(ucg_group_h group)
642 {
643 ucs_assert(group->is_barrier_outstanding);
644 group->is_barrier_outstanding = 0;
645 if (ucs_queue_is_empty(&group->pending)) {
646 return UCS_OK;
647 }
648
649 ucs_status_t ret;
650 do {
651 /* Move the operation from the pending queue back to the original one */
652 ucg_op_t *op = (ucg_op_t*)ucs_queue_pull_non_empty(&group->pending);
653 ucg_request_t **req = op->pending_req;
654 ucs_list_add_head(&op->plan->op_head, &op->list);
655
656 /* Start this next pending operation */
657 ret = ucg_collective_trigger(group, op, req);
658 } while ((!ucs_queue_is_empty(&group->pending)) &&
659 (!group->is_barrier_outstanding) &&
660 (ret == UCS_OK));
661
662 return ret;
663 }
664
665 ucs_status_t static UCS_F_ALWAYS_INLINE
ucg_collective_start(ucg_coll_h coll,ucg_request_t ** req)666 ucg_collective_start(ucg_coll_h coll, ucg_request_t **req)
667 {
668 ucs_status_t ret;
669 ucg_op_t *op = (ucg_op_t*)coll;
670 ucg_group_h group = op->plan->group;
671
672 /* Since group was created - don't need UCP_CONTEXT_CHECK_FEATURE_FLAGS */
673 UCP_WORKER_THREAD_CS_ENTER_CONDITIONAL(group->worker);
674
675 ucs_trace_req("ucg_collective_start: op=%p req=%p", coll, *req);
676
677 if (ucs_unlikely(group->is_barrier_outstanding)) {
678 ucs_list_del(&op->list);
679 ucs_queue_push(&group->pending, &op->queue);
680 op->pending_req = req;
681 ret = UCS_INPROGRESS;
682 } else {
683 ret = ucg_collective_trigger(group, op, req);
684 }
685
686 UCS_STATS_UPDATE_COUNTER(group->stats, UCG_GROUP_STAT_OPS_USED, 1);
687 UCP_WORKER_THREAD_CS_EXIT_CONDITIONAL(group->worker);
688 return ret;
689 }
690
691 UCS_PROFILE_FUNC(ucs_status_ptr_t, ucg_collective_start_nb,
692 (coll), ucg_coll_h coll)
693 {
694 ucg_request_t *req = NULL;
695 ucs_status_ptr_t ret = UCS_STATUS_PTR(ucg_collective_start(coll, &req));
696 return UCS_PTR_IS_ERR(ret) ? ret : req;
697 }
698
699 UCS_PROFILE_FUNC(ucs_status_t, ucg_collective_start_nbr,
700 (coll, request), ucg_coll_h coll, void *request)
701 {
702 return ucg_collective_start(coll, (ucg_request_t**)&request);
703 }
704
ucg_collective_destroy(ucg_coll_h coll)705 void ucg_collective_destroy(ucg_coll_h coll)
706 {
707 ucg_discard((ucg_op_t*)coll);
708 }
709
ucg_worker_groups_init(ucp_worker_h worker,unsigned * next_am_id,void * groups_ctx)710 static ucs_status_t ucg_worker_groups_init(ucp_worker_h worker,
711 unsigned *next_am_id, void *groups_ctx)
712 {
713 ucg_groups_t *gctx = (ucg_groups_t*)groups_ctx;
714 ucs_status_t status = ucg_plan_query(next_am_id, &gctx->planners, &gctx->num_planners);
715 if (status != UCS_OK) {
716 return status;
717 }
718
719 unsigned planner_idx;
720 size_t group_ctx_offset = sizeof(struct ucg_group);
721 size_t global_ctx_offset = ucg_ctx_worker_offset + sizeof(ucg_groups_t);
722 for (planner_idx = 0; planner_idx < gctx->num_planners; planner_idx++) {
723 ucg_plan_desc_t* planner = &gctx->planners[planner_idx];
724 ucg_plan_component_t* planc = planner->plan_component;
725 planc->global_ctx_offset = global_ctx_offset;
726 global_ctx_offset += planc->global_context_size;
727 planc->group_ctx_offset = group_ctx_offset;
728 group_ctx_offset += planc->group_context_size;
729 }
730
731 gctx->next_id = 0;
732 gctx->iface_cnt = 0;
733 gctx->total_planner_sizes = group_ctx_offset;
734 #ifdef UCT_COLLECTIVES
735 gctx->num_local_peers = worker->context->config.num_local_peers;
736 gctx->my_local_peer_idx = worker->context->config.my_local_peer_idx;
737 #endif
738 ucs_list_head_init(&gctx->groups_head);
739 return UCS_OK;
740 }
741
ucg_worker_groups_cleanup(void * groups_ctx)742 static void ucg_worker_groups_cleanup(void *groups_ctx)
743 {
744 ucg_groups_t *gctx = (ucg_groups_t*)groups_ctx;
745
746 ucg_group_h group, tmp;
747 if (!ucs_list_is_empty(&gctx->groups_head)) {
748 ucs_list_for_each_safe(group, tmp, &gctx->groups_head, list) {
749 ucg_group_destroy(group);
750 }
751 }
752
753 ucg_plan_release_list(gctx->planners, gctx->num_planners);
754 }
755
ucg_extend_ucp(const ucg_params_t * params,const ucg_config_t * config,ucg_context_h * context_p)756 static ucs_status_t ucg_extend_ucp(const ucg_params_t *params,
757 const ucg_config_t *config,
758 ucg_context_h *context_p)
759 {
760 #ifndef HAVE_UCP_EXTENSIONS
761 /* Since UCP doesn't support extensions (yet?) - we need to allocate... */
762 ucg_context_h ucg_context = ucs_calloc(1, sizeof(*ucg_context), "ucg context");
763 if (!ucg_context) {
764 return UCS_ERR_NO_MEMORY;
765 }
766
767 ucg_context->last_am_id = 0;
768 ucg_context->super = (ucp_context_h)context_p;
769 ucg_context->worker_size = sizeof(ucp_worker_t) + sizeof(ucp_ep_config_t) *
770 ucs_min((ucg_context->super->num_tls + 1) *
771 (ucg_context->super->num_tls + 1) *
772 (ucg_context->super->num_tls),
773 UINT8_MAX);
774 *context_p = ucg_context;
775 #endif
776
777 size_t ctx_size = sizeof(ucg_groups_t) +
778 ucs_list_length(&ucg_plan_components_list) * sizeof(void*);
779 ucs_list_head_init(&(*context_p)->extensions);
780 return ucp_extend(*context_p, ctx_size, ucg_worker_groups_init,
781 ucg_worker_groups_cleanup, &ucg_ctx_worker_offset);
782 }
783
ucg_init_version(unsigned api_major_version,unsigned api_minor_version,const ucg_params_t * params,const ucg_config_t * config,ucg_context_h * context_p)784 ucs_status_t ucg_init_version(unsigned api_major_version,
785 unsigned api_minor_version,
786 const ucg_params_t *params,
787 const ucg_config_t *config,
788 ucg_context_h *context_p)
789 {
790 ucs_status_t status = ucp_init_version(api_major_version, api_minor_version,
791 params, config, (ucp_context_h*)context_p);
792 if (status == UCS_OK) {
793 status = ucg_extend_ucp(params, config, context_p);
794 }
795 return status;
796 }
797
ucg_init(const ucg_params_t * params,const ucg_config_t * config,ucg_context_h * context_p)798 ucs_status_t ucg_init(const ucg_params_t *params,
799 const ucg_config_t *config,
800 ucg_context_h *context_p)
801 {
802 ucs_status_t status = ucp_init(params, config, (ucp_context_h*)context_p);
803 if (status == UCS_OK) {
804 status = ucg_extend_ucp(params, config, context_p);
805 }
806 return status;
807 }
808
ucg_plan_connect(ucg_group_h group,ucg_group_member_index_t idx,enum ucg_plan_connect_flags flags,uct_ep_h * ep_p,const uct_iface_attr_t ** ep_attr_p,uct_md_h * md_p,const uct_md_attr_t ** md_attr_p)809 ucs_status_t ucg_plan_connect(ucg_group_h group,
810 ucg_group_member_index_t idx,
811 enum ucg_plan_connect_flags flags,
812 uct_ep_h *ep_p, const uct_iface_attr_t **ep_attr_p,
813 uct_md_h *md_p, const uct_md_attr_t **md_attr_p)
814 {
815 int ret;
816 ucs_status_t status;
817 size_t remote_addr_len;
818 ucp_address_t *remote_addr = NULL;
819
820 /* Look-up the UCP endpoint based on the index */
821 ucp_ep_h ucp_ep;
822 khiter_t iter = kh_get(ucg_group_ep, &group->eps, idx);
823 if (iter != kh_end(&group->eps)) {
824 /* Use the cached connection */
825 ucp_ep = kh_value(&group->eps, iter);
826 } else {
827 /* fill-in UCP connection parameters */
828 status = group->params.resolve_address_f(group->params.cb_group_obj,
829 idx, &remote_addr, &remote_addr_len);
830 if (status != UCS_OK) {
831 ucs_error("failed to obtain a UCP endpoint from the external callback");
832 return status;
833 }
834
835 /* special case: connecting to a zero-length address means it's "debugging" */
836 if (ucs_unlikely(remote_addr_len == 0)) {
837 *ep_p = NULL;
838 return UCS_OK;
839 }
840
841 /* create an endpoint for communication with the remote member */
842 ucp_ep_params_t ep_params = {
843 .field_mask = UCP_EP_PARAM_FIELD_REMOTE_ADDRESS,
844 .address = remote_addr
845 };
846 status = ucp_ep_create(group->worker, &ep_params, &ucp_ep);
847 group->params.release_address_f(remote_addr);
848 if (status != UCS_OK) {
849 return status;
850 }
851
852 /* Store this endpoint, for future reference */
853 iter = kh_put(ucg_group_ep, &group->eps, idx, &ret);
854 kh_value(&group->eps, iter) = ucp_ep;
855 }
856
857 /* Connect for point-to-point communication */
858 ucp_lane_index_t lane;
859 am_retry:
860 #ifdef UCT_COLLECTIVES
861 if (flags & UCG_PLAN_CONNECT_FLAG_WANT_INCAST) {
862 lane = ucp_ep_get_incast_lane(ucp_ep);
863 if (ucs_unlikely(lane == UCP_NULL_LANE)) {
864 ucs_warn("No transports with native incast support were found,"
865 " falling back to P2P transports (slower)");
866 return UCS_ERR_UNREACHABLE;
867 }
868 *ep_p = ucp_ep_get_incast_uct_ep(ucp_ep);
869 } else if (flags & UCG_PLAN_CONNECT_FLAG_WANT_BCAST) {
870 lane = ucp_ep_get_bcast_lane(ucp_ep);
871 if (ucs_unlikely(lane == UCP_NULL_LANE)) {
872 ucs_warn("No transports with native broadcast support were found,"
873 " falling back to P2P transports (slower)");
874 return UCS_ERR_UNREACHABLE;
875 }
876 *ep_p = ucp_ep_get_bcast_uct_ep(ucp_ep);
877 } else
878 #endif /* UCT_COLLECTIVES */
879 {
880 lane = ucp_ep_get_am_lane(ucp_ep);
881 *ep_p = ucp_ep_get_am_uct_ep(ucp_ep);
882 }
883
884 if (*ep_p == NULL) {
885 status = ucp_wireup_connect_remote(ucp_ep, lane);
886 if (status != UCS_OK) {
887 return status;
888 }
889 goto am_retry; /* Just to obtain the right lane */
890 }
891
892 if (ucp_proxy_ep_test(*ep_p)) {
893 ucp_proxy_ep_t *proxy_ep = ucs_derived_of(*ep_p, ucp_proxy_ep_t);
894 *ep_p = proxy_ep->uct_ep;
895 ucs_assert(*ep_p != NULL);
896 }
897
898 ucs_assert((*ep_p)->iface != NULL);
899 if ((*ep_p)->iface->ops.ep_am_short ==
900 (typeof((*ep_p)->iface->ops.ep_am_short))
901 ucs_empty_function_return_no_resource) {
902 ucp_worker_progress(group->worker);
903 goto am_retry;
904 }
905
906 /* Register interfaces to be progressed in future calls */
907 ucg_groups_t *gctx = UCG_WORKER_TO_GROUPS_CTX(group->worker);
908 UCG_GROUP_PROGRESS_ADD((*ep_p)->iface, gctx);
909 UCG_GROUP_PROGRESS_ADD((*ep_p)->iface, group);
910
911 *md_p = ucp_ep_md(ucp_ep, lane);
912 *md_attr_p = ucp_ep_md_attr(ucp_ep, lane);
913 *ep_attr_p = ucp_ep_get_iface_attr(ucp_ep, lane);
914
915 return UCS_OK;
916 }
917
ucg_plan_query_resources(ucg_group_h group,ucg_plan_resources_t ** resources)918 ucs_status_t ucg_plan_query_resources(ucg_group_h group,
919 ucg_plan_resources_t **resources)
920 {
921 return UCS_OK;
922 }
923