1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2011 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2006 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2006 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2006-2015 Cisco Systems, Inc. All rights reserved.
14 * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
15 * reserved.
16 * Copyright (c) 2013-2015 Intel, Inc. All rights reserved
17 * Copyright (c) 2014-2017 Research Organization for Information Science
18 * and Technology (RIST). All rights reserved.
19 * Copyright (c) 2015-2017 Mellanox Technologies. All rights reserved.
20 *
21 * $COPYRIGHT$
22 *
23 * Additional copyrights may follow
24 *
25 * $HEADER$
26 */
27
28 #include "ompi_config.h"
29
30 #include <string.h>
31 #include <strings.h>
32
33 #include "ompi/constants.h"
34 #include "opal/datatype/opal_convertor.h"
35 #include "opal/threads/mutex.h"
36 #include "opal/dss/dss.h"
37 #include "opal/util/arch.h"
38 #include "opal/util/show_help.h"
39 #include "opal/mca/hwloc/base/base.h"
40 #include "opal/mca/pmix/pmix.h"
41 #include "opal/util/argv.h"
42
43 #include "ompi/proc/proc.h"
44 #include "ompi/datatype/ompi_datatype.h"
45 #include "ompi/runtime/mpiruntime.h"
46 #include "ompi/runtime/params.h"
47 #include "ompi/mca/pml/pml.h"
48
49 opal_list_t ompi_proc_list = {{0}};
50 static opal_mutex_t ompi_proc_lock;
51 static opal_hash_table_t ompi_proc_hash;
52
53 ompi_proc_t* ompi_proc_local_proc = NULL;
54
55 static void ompi_proc_construct(ompi_proc_t* proc);
56 static void ompi_proc_destruct(ompi_proc_t* proc);
57 static ompi_proc_t *ompi_proc_for_name_nolock (const opal_process_name_t proc_name);
58
59 OBJ_CLASS_INSTANCE(
60 ompi_proc_t,
61 opal_proc_t,
62 ompi_proc_construct,
63 ompi_proc_destruct
64 );
65
66
ompi_proc_construct(ompi_proc_t * proc)67 void ompi_proc_construct(ompi_proc_t* proc)
68 {
69 bzero(proc->proc_endpoints, sizeof(proc->proc_endpoints));
70
71 /* By default all processors are supposedly having the same architecture as me. Thus,
72 * by default we run in a homogeneous environment. Later, when the RTE can tell us
73 * the arch of the remote nodes, we will have to set the convertors to the correct
74 * architecture.
75 */
76 OBJ_RETAIN( ompi_mpi_local_convertor );
77 proc->super.proc_convertor = ompi_mpi_local_convertor;
78 }
79
80
ompi_proc_destruct(ompi_proc_t * proc)81 void ompi_proc_destruct(ompi_proc_t* proc)
82 {
83 /* As all the convertors are created with OBJ_NEW we can just call OBJ_RELEASE. All, except
84 * the local convertor, will get destroyed at some point here. If the reference count is correct
85 * the local convertor (who has the reference count increased in the datatype) will not get
86 * destroyed here. It will be destroyed later when the ompi_datatype_finalize is called.
87 */
88 OBJ_RELEASE( proc->super.proc_convertor );
89 if (NULL != proc->super.proc_hostname) {
90 free(proc->super.proc_hostname);
91 }
92 opal_mutex_lock (&ompi_proc_lock);
93 opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc);
94 opal_hash_table_remove_value_ptr (&ompi_proc_hash, &proc->super.proc_name, sizeof (proc->super.proc_name));
95 opal_mutex_unlock (&ompi_proc_lock);
96 }
97
98 /**
99 * Allocate a new ompi_proc_T for the given jobid/vpid
100 *
101 * @param[in] jobid Job identifier
102 * @param[in] vpid Process identifier
103 * @param[out] procp New ompi_proc_t structure
104 *
105 * This function allocates a new ompi_proc_t and inserts it into
106 * the process list and hash table.
107 */
ompi_proc_allocate(ompi_jobid_t jobid,ompi_vpid_t vpid,ompi_proc_t ** procp)108 static int ompi_proc_allocate (ompi_jobid_t jobid, ompi_vpid_t vpid, ompi_proc_t **procp) {
109 ompi_proc_t *proc = OBJ_NEW(ompi_proc_t);
110
111 opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc);
112
113 OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = jobid;
114 OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid = vpid;
115
116 opal_hash_table_set_value_ptr (&ompi_proc_hash, &proc->super.proc_name, sizeof (proc->super.proc_name),
117 proc);
118
119 /* by default we consider process to be remote */
120 proc->super.proc_flags = OPAL_PROC_NON_LOCAL;
121 *procp = proc;
122
123 return OMPI_SUCCESS;
124 }
125
126 /**
127 * Finish setting up an ompi_proc_t
128 *
129 * @param[in] proc ompi process structure
130 *
131 * This function contains the core code of ompi_proc_complete_init() and
132 * ompi_proc_refresh(). The tasks performed by this function include
133 * retrieving the hostname (if below the modex cutoff), determining the
134 * remote architecture, and calculating the locality of the process.
135 */
ompi_proc_complete_init_single(ompi_proc_t * proc)136 int ompi_proc_complete_init_single (ompi_proc_t *proc)
137 {
138 int ret;
139
140 if ((OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid == OMPI_PROC_MY_NAME->jobid) &&
141 (OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid == OMPI_PROC_MY_NAME->vpid)) {
142 /* nothing else to do */
143 return OMPI_SUCCESS;
144 }
145
146 /* we can retrieve the hostname at no cost because it
147 * was provided at startup - but make it optional so
148 * we don't chase after it if some system doesn't
149 * provide it */
150 proc->super.proc_hostname = NULL;
151 OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_HOSTNAME, &proc->super.proc_name,
152 (char**)&(proc->super.proc_hostname), OPAL_STRING);
153
154 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
155 /* get the remote architecture - this might force a modex except
156 * for those environments where the RM provides it */
157 {
158 uint32_t *ui32ptr;
159 ui32ptr = &(proc->super.proc_arch);
160 OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_ARCH, &proc->super.proc_name,
161 (void**)&ui32ptr, OPAL_UINT32);
162 if (OPAL_SUCCESS == ret) {
163 /* if arch is different than mine, create a new convertor for this proc */
164 if (proc->super.proc_arch != opal_local_arch) {
165 OBJ_RELEASE(proc->super.proc_convertor);
166 proc->super.proc_convertor = opal_convertor_create(proc->super.proc_arch, 0);
167 }
168 } else if (OMPI_ERR_NOT_IMPLEMENTED == ret) {
169 proc->super.proc_arch = opal_local_arch;
170 } else {
171 return ret;
172 }
173 }
174 #else
175 /* must be same arch as my own */
176 proc->super.proc_arch = opal_local_arch;
177 #endif
178
179 return OMPI_SUCCESS;
180 }
181
ompi_proc_lookup(const opal_process_name_t proc_name)182 opal_proc_t *ompi_proc_lookup (const opal_process_name_t proc_name)
183 {
184 ompi_proc_t *proc = NULL;
185 int ret;
186
187 /* try to lookup the value in the hash table */
188 ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
189
190 if (OPAL_SUCCESS == ret) {
191 return &proc->super;
192 }
193
194 return NULL;
195 }
196
ompi_proc_for_name_nolock(const opal_process_name_t proc_name)197 static ompi_proc_t *ompi_proc_for_name_nolock (const opal_process_name_t proc_name)
198 {
199 ompi_proc_t *proc = NULL;
200 int ret;
201
202 /* double-check that another competing thread has not added this proc */
203 ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
204 if (OPAL_SUCCESS == ret) {
205 goto exit;
206 }
207
208 /* allocate a new ompi_proc_t object for the process and insert it into the process table */
209 ret = ompi_proc_allocate (proc_name.jobid, proc_name.vpid, &proc);
210 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
211 /* allocation fail */
212 goto exit;
213 }
214
215 /* finish filling in the important proc data fields */
216 ret = ompi_proc_complete_init_single (proc);
217 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
218 goto exit;
219 }
220 exit:
221 return proc;
222 }
223
ompi_proc_for_name(const opal_process_name_t proc_name)224 opal_proc_t *ompi_proc_for_name (const opal_process_name_t proc_name)
225 {
226 ompi_proc_t *proc = NULL;
227 int ret;
228
229 /* try to lookup the value in the hash table */
230 ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
231 if (OPAL_SUCCESS == ret) {
232 return &proc->super;
233 }
234
235 opal_mutex_lock (&ompi_proc_lock);
236 proc = ompi_proc_for_name_nolock (proc_name);
237 opal_mutex_unlock (&ompi_proc_lock);
238
239 return (opal_proc_t *) proc;
240 }
241
ompi_proc_init(void)242 int ompi_proc_init(void)
243 {
244 int opal_proc_hash_init_size = (ompi_process_info.num_procs < ompi_add_procs_cutoff) ? ompi_process_info.num_procs :
245 1024;
246 ompi_proc_t *proc;
247 int ret;
248
249 OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
250 OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t);
251 OBJ_CONSTRUCT(&ompi_proc_hash, opal_hash_table_t);
252
253 ret = opal_hash_table_init (&ompi_proc_hash, opal_proc_hash_init_size);
254 if (OPAL_SUCCESS != ret) {
255 return ret;
256 }
257
258 /* create a proc for the local process */
259 ret = ompi_proc_allocate (OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, &proc);
260 if (OMPI_SUCCESS != ret) {
261 return OMPI_ERR_OUT_OF_RESOURCE;
262 }
263
264 /* set local process data */
265 ompi_proc_local_proc = proc;
266 proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
267 proc->super.proc_hostname = strdup(ompi_process_info.nodename);
268 proc->super.proc_arch = opal_local_arch;
269 /* Register the local proc with OPAL */
270 opal_proc_local_set(&proc->super);
271 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
272 /* add our arch to the modex */
273 OPAL_MODEX_SEND_VALUE(ret, OPAL_PMIX_GLOBAL,
274 OPAL_PMIX_ARCH, &opal_local_arch, OPAL_UINT32);
275 if (OPAL_SUCCESS != ret) {
276 return ret;
277 }
278 #endif
279
280 return OMPI_SUCCESS;
281 }
282
ompi_proc_compare_vid(opal_list_item_t ** a,opal_list_item_t ** b)283 static int ompi_proc_compare_vid (opal_list_item_t **a, opal_list_item_t **b)
284 {
285 ompi_proc_t *proca = (ompi_proc_t *) *a;
286 ompi_proc_t *procb = (ompi_proc_t *) *b;
287
288 if (proca->super.proc_name.vpid > procb->super.proc_name.vpid) {
289 return 1;
290 } else {
291 return -1;
292 }
293
294 /* they should never be equal */
295 }
296
297 /**
298 * The process creation is split into two steps. The second step
299 * is the important one, it sets the properties of the remote
300 * process, such as architecture, node name and locality flags.
301 *
302 * This function is to be called __only__ after the modex exchange
303 * has been performed, in order to allow the modex to carry the data
304 * instead of requiring the runtime to provide it.
305 */
ompi_proc_complete_init(void)306 int ompi_proc_complete_init(void)
307 {
308 opal_process_name_t wildcard_rank;
309 ompi_proc_t *proc;
310 int ret, errcode = OMPI_SUCCESS;
311 char *val;
312
313 opal_mutex_lock (&ompi_proc_lock);
314
315 /* Add all local peers first */
316 wildcard_rank.jobid = OMPI_PROC_MY_NAME->jobid;
317 wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
318 /* retrieve the local peers */
319 OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_PEERS,
320 &wildcard_rank, &val, OPAL_STRING);
321 if (OPAL_SUCCESS == ret && NULL != val) {
322 char **peers = opal_argv_split(val, ',');
323 int i;
324 free(val);
325 for (i=0; NULL != peers[i]; i++) {
326 ompi_vpid_t local_rank = strtoul(peers[i], NULL, 10);
327 uint16_t u16, *u16ptr = &u16;
328 if (OMPI_PROC_MY_NAME->vpid == local_rank) {
329 continue;
330 }
331 ret = ompi_proc_allocate (OMPI_PROC_MY_NAME->jobid, local_rank, &proc);
332 if (OMPI_SUCCESS != ret) {
333 return ret;
334 }
335 /* get the locality information - all RTEs are required
336 * to provide this information at startup */
337 OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_LOCALITY, &proc->super.proc_name, &u16ptr, OPAL_UINT16);
338 if (OPAL_SUCCESS == ret) {
339 proc->super.proc_flags = u16;
340 }
341 }
342 opal_argv_free(peers);
343 }
344
345 /* Complete initialization of node-local procs */
346 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
347 ret = ompi_proc_complete_init_single (proc);
348 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
349 errcode = ret;
350 break;
351 }
352 }
353
354 /* if cutoff is larger than # of procs - add all processes
355 * NOTE that local procs will be automatically skipped as they
356 * are already in the hash table
357 */
358 if (ompi_process_info.num_procs < ompi_add_procs_cutoff) {
359 /* sinse ompi_proc_for_name is locking internally -
360 * we need to release lock here
361 */
362 opal_mutex_unlock (&ompi_proc_lock);
363
364 for (ompi_vpid_t i = 0 ; i < ompi_process_info.num_procs ; ++i ) {
365 opal_process_name_t proc_name;
366 proc_name.jobid = OMPI_PROC_MY_NAME->jobid;
367 proc_name.vpid = i;
368 (void) ompi_proc_for_name (proc_name);
369 }
370
371 /* acquire lock back for the next step - sort */
372 opal_mutex_lock (&ompi_proc_lock);
373 }
374
375 opal_list_sort (&ompi_proc_list, ompi_proc_compare_vid);
376
377 opal_mutex_unlock (&ompi_proc_lock);
378
379 return errcode;
380 }
381
ompi_proc_finalize(void)382 int ompi_proc_finalize (void)
383 {
384 ompi_proc_t *proc;
385
386 /* Unregister the local proc from OPAL */
387 opal_proc_local_set(NULL);
388
389 /* remove all items from list and destroy them. Since we cannot know
390 * the reference count of the procs for certain, it is possible that
391 * a single OBJ_RELEASE won't drive the count to zero, and hence will
392 * not release the memory. Accordingly, we cycle through the list here,
393 * calling release on each item.
394 *
395 * This will cycle until it forces the reference count of each item
396 * to zero, thus causing the destructor to run - which will remove
397 * the item from the list!
398 *
399 * We cannot do this under the thread lock as the destructor will
400 * call it when removing the item from the list. However, this function
401 * is ONLY called from MPI_Finalize, and all threads are prohibited from
402 * calling an MPI function once ANY thread has called MPI_Finalize. Of
403 * course, multiple threads are allowed to call MPI_Finalize, so this
404 * function may get called multiple times by various threads. We believe
405 * it is thread safe to do so...though it may not -appear- to be so
406 * without walking through the entire list/destructor sequence.
407 */
408 while ((ompi_proc_t *)opal_list_get_end(&ompi_proc_list) != (proc = (ompi_proc_t *)opal_list_get_first(&ompi_proc_list))) {
409 OBJ_RELEASE(proc);
410 }
411 /* now destruct the list and thread lock */
412 OBJ_DESTRUCT(&ompi_proc_list);
413 OBJ_DESTRUCT(&ompi_proc_lock);
414 OBJ_DESTRUCT(&ompi_proc_hash);
415
416 return OMPI_SUCCESS;
417 }
418
ompi_proc_world_size(void)419 int ompi_proc_world_size (void)
420 {
421 return ompi_process_info.num_procs;
422 }
423
ompi_proc_get_allocated(size_t * size)424 ompi_proc_t **ompi_proc_get_allocated (size_t *size)
425 {
426 ompi_proc_t **procs;
427 ompi_proc_t *proc;
428 size_t count = 0;
429 ompi_rte_cmp_bitmask_t mask;
430 ompi_process_name_t my_name;
431
432 /* check bozo case */
433 if (NULL == ompi_proc_local_proc) {
434 return NULL;
435 }
436 mask = OMPI_RTE_CMP_JOBID;
437 my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);
438
439 /* First count how many match this jobid */
440 opal_mutex_lock (&ompi_proc_lock);
441 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
442 if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) {
443 ++count;
444 }
445 }
446
447 /* allocate an array */
448 procs = (ompi_proc_t**) malloc(count * sizeof(ompi_proc_t*));
449 if (NULL == procs) {
450 opal_mutex_unlock (&ompi_proc_lock);
451 return NULL;
452 }
453
454 /* now save only the procs that match this jobid */
455 count = 0;
456 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
457 if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) {
458 /* DO NOT RETAIN THIS OBJECT - the reference count on this
459 * object will be adjusted by external callers. The intent
460 * here is to allow the reference count to drop to zero if
461 * the app no longer desires to communicate with this proc.
462 * For example, the proc may call comm_disconnect on all
463 * communicators involving this proc. In such cases, we want
464 * the proc object to be removed from the list. By not incrementing
465 * the reference count here, we allow this to occur.
466 *
467 * We don't implement that yet, but we are still safe for now as
468 * the OBJ_NEW in ompi_proc_init owns the initial reference
469 * count which cannot be released until ompi_proc_finalize is
470 * called.
471 */
472 procs[count++] = proc;
473 }
474 }
475 opal_mutex_unlock (&ompi_proc_lock);
476
477 *size = count;
478 return procs;
479 }
480
ompi_proc_world(size_t * size)481 ompi_proc_t **ompi_proc_world (size_t *size)
482 {
483 ompi_proc_t **procs;
484 size_t count = 0;
485
486 /* check bozo case */
487 if (NULL == ompi_proc_local_proc) {
488 return NULL;
489 }
490
491 /* First count how many match this jobid (we already know this from our process info) */
492 count = ompi_process_info.num_procs;
493
494 /* allocate an array */
495 procs = (ompi_proc_t **) malloc (count * sizeof(ompi_proc_t*));
496 if (NULL == procs) {
497 return NULL;
498 }
499
500 /* now get/allocate all the procs in this jobid */
501 for (size_t i = 0 ; i < count ; ++i) {
502 opal_process_name_t name = {.jobid = OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name)->jobid,
503 .vpid = i};
504
505 /* DO NOT RETAIN THIS OBJECT - the reference count on this
506 * object will be adjusted by external callers. The intent
507 * here is to allow the reference count to drop to zero if
508 * the app no longer desires to communicate with this proc.
509 * For example, the proc may call comm_disconnect on all
510 * communicators involving this proc. In such cases, we want
511 * the proc object to be removed from the list. By not incrementing
512 * the reference count here, we allow this to occur.
513 *
514 * We don't implement that yet, but we are still safe for now as
515 * the OBJ_NEW in ompi_proc_init owns the initial reference
516 * count which cannot be released until ompi_proc_finalize is
517 * called.
518 */
519 procs[i] = (ompi_proc_t*)ompi_proc_for_name (name);
520 }
521
522 *size = count;
523
524 return procs;
525 }
526
527
ompi_proc_all(size_t * size)528 ompi_proc_t** ompi_proc_all(size_t* size)
529 {
530 ompi_proc_t **procs =
531 (ompi_proc_t**) malloc(opal_list_get_size(&ompi_proc_list) * sizeof(ompi_proc_t*));
532 ompi_proc_t *proc;
533 size_t count = 0;
534
535 if (NULL == procs) {
536 return NULL;
537 }
538
539 opal_mutex_lock (&ompi_proc_lock);
540 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
541 /* We know this isn't consistent with the behavior in ompi_proc_world,
542 * but we are leaving the RETAIN for now because the code using this function
543 * assumes that the results need to be released when done. It will
544 * be cleaned up later as the "fix" will impact other places in
545 * the code
546 */
547 OBJ_RETAIN(proc);
548 procs[count++] = proc;
549 }
550 opal_mutex_unlock (&ompi_proc_lock);
551 *size = count;
552 return procs;
553 }
554
555
ompi_proc_self(size_t * size)556 ompi_proc_t** ompi_proc_self(size_t* size)
557 {
558 ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*));
559 if (NULL == procs) {
560 return NULL;
561 }
562 /* We know this isn't consistent with the behavior in ompi_proc_world,
563 * but we are leaving the RETAIN for now because the code using this function
564 * assumes that the results need to be released when done. It will
565 * be cleaned up later as the "fix" will impact other places in
566 * the code
567 */
568 OBJ_RETAIN(ompi_proc_local_proc);
569 *procs = ompi_proc_local_proc;
570 *size = 1;
571 return procs;
572 }
573
ompi_proc_find(const ompi_process_name_t * name)574 ompi_proc_t * ompi_proc_find ( const ompi_process_name_t * name )
575 {
576 ompi_proc_t *proc, *rproc=NULL;
577 ompi_rte_cmp_bitmask_t mask;
578
579 /* return the proc-struct which matches this jobid+process id */
580 mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
581 opal_mutex_lock (&ompi_proc_lock);
582 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
583 if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
584 rproc = proc;
585 break;
586 }
587 }
588 opal_mutex_unlock (&ompi_proc_lock);
589
590 return rproc;
591 }
592
593
ompi_proc_refresh(void)594 int ompi_proc_refresh(void)
595 {
596 ompi_proc_t *proc = NULL;
597 ompi_vpid_t i = 0;
598 int ret=OMPI_SUCCESS;
599
600 opal_mutex_lock (&ompi_proc_lock);
601
602 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
603 /* Does not change: proc->super.proc_name.vpid */
604 OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = OMPI_PROC_MY_NAME->jobid;
605
606 /* Make sure to clear the local flag before we set it below */
607 proc->super.proc_flags = 0;
608
609 if (i == OMPI_PROC_MY_NAME->vpid) {
610 ompi_proc_local_proc = proc;
611 proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
612 proc->super.proc_hostname = ompi_process_info.nodename;
613 proc->super.proc_arch = opal_local_arch;
614 opal_proc_local_set(&proc->super);
615 } else {
616 ret = ompi_proc_complete_init_single (proc);
617 if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
618 break;
619 }
620 }
621 }
622
623 opal_mutex_unlock (&ompi_proc_lock);
624
625 return ret;
626 }
627
628 int
ompi_proc_pack(ompi_proc_t ** proclist,int proclistsize,opal_buffer_t * buf)629 ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
630 opal_buffer_t* buf)
631 {
632 int rc;
633 char *nspace;
634
635 opal_mutex_lock (&ompi_proc_lock);
636
637 /* cycle through the provided array, packing the OMPI level
638 * data for each proc. This data may or may not be included
639 * in any subsequent modex operation, so we include it here
640 * to ensure completion of a connect/accept handshake. See
641 * the ompi/mca/dpm framework for an example of where and how
642 * this info is used.
643 *
644 * Eventually, we will review the procedures that call this
645 * function to see if duplication of communication can be
646 * reduced. For now, just go ahead and pack the info so it
647 * can be sent.
648 */
649 for (int i = 0 ; i < proclistsize ; ++i) {
650 ompi_proc_t *proc = proclist[i];
651
652 if (ompi_proc_is_sentinel (proc)) {
653 proc = ompi_proc_for_name_nolock (ompi_proc_sentinel_to_name ((uintptr_t) proc));
654 }
655
656 /* send proc name */
657 rc = opal_dss.pack(buf, &(proc->super.proc_name), 1, OMPI_NAME);
658 if(rc != OPAL_SUCCESS) {
659 OMPI_ERROR_LOG(rc);
660 opal_mutex_unlock (&ompi_proc_lock);
661 return rc;
662 }
663 /* retrieve and send the corresponding nspace for this job
664 * as the remote side may not know the translation */
665 nspace = (char*)opal_pmix.get_nspace(proc->super.proc_name.jobid);
666 rc = opal_dss.pack(buf, &nspace, 1, OPAL_STRING);
667 if(rc != OPAL_SUCCESS) {
668 OMPI_ERROR_LOG(rc);
669 opal_mutex_unlock (&ompi_proc_lock);
670 return rc;
671 }
672 /* pack architecture flag */
673 rc = opal_dss.pack(buf, &(proc->super.proc_arch), 1, OPAL_UINT32);
674 if(rc != OPAL_SUCCESS) {
675 OMPI_ERROR_LOG(rc);
676 opal_mutex_unlock (&ompi_proc_lock);
677 return rc;
678 }
679 /* pass the name of the host this proc is on */
680 rc = opal_dss.pack(buf, &(proc->super.proc_hostname), 1, OPAL_STRING);
681 if(rc != OPAL_SUCCESS) {
682 OMPI_ERROR_LOG(rc);
683 opal_mutex_unlock (&ompi_proc_lock);
684 return rc;
685 }
686 }
687 opal_mutex_unlock (&ompi_proc_lock);
688 return OMPI_SUCCESS;
689 }
690
691 ompi_proc_t *
ompi_proc_find_and_add(const ompi_process_name_t * name,bool * isnew)692 ompi_proc_find_and_add(const ompi_process_name_t * name, bool* isnew)
693 {
694 ompi_proc_t *proc, *rproc = NULL;
695 ompi_rte_cmp_bitmask_t mask;
696
697 /* return the proc-struct which matches this jobid+process id */
698 mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
699 opal_mutex_lock (&ompi_proc_lock);
700 OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
701 if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
702 rproc = proc;
703 *isnew = false;
704 break;
705 }
706 }
707
708 /* if we didn't find this proc in the list, create a new
709 * proc_t and append it to the list
710 */
711 if (NULL == rproc) {
712 *isnew = true;
713 ompi_proc_allocate (name->jobid, name->vpid, &rproc);
714 }
715
716 opal_mutex_unlock (&ompi_proc_lock);
717
718 return rproc;
719 }
720
721
722 int
ompi_proc_unpack(opal_buffer_t * buf,int proclistsize,ompi_proc_t *** proclist,int * newproclistsize,ompi_proc_t *** newproclist)723 ompi_proc_unpack(opal_buffer_t* buf,
724 int proclistsize, ompi_proc_t ***proclist,
725 int *newproclistsize, ompi_proc_t ***newproclist)
726 {
727 size_t newprocs_len = 0;
728 ompi_proc_t **plist=NULL, **newprocs = NULL;
729
730 /* do not free plist *ever*, since it is used in the remote group
731 structure of a communicator */
732 plist = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
733 if ( NULL == plist ) {
734 return OMPI_ERR_OUT_OF_RESOURCE;
735 }
736 /* free this on the way out */
737 newprocs = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
738 if (NULL == newprocs) {
739 free(plist);
740 return OMPI_ERR_OUT_OF_RESOURCE;
741 }
742
743 /* cycle through the array of provided procs and unpack
744 * their info - as packed by ompi_proc_pack
745 */
746 for (int i = 0; i < proclistsize ; ++i){
747 int32_t count=1;
748 ompi_process_name_t new_name;
749 uint32_t new_arch;
750 char *new_hostname;
751 bool isnew = false;
752 int rc;
753 char *nspace;
754
755 rc = opal_dss.unpack(buf, &new_name, &count, OMPI_NAME);
756 if (rc != OPAL_SUCCESS) {
757 OMPI_ERROR_LOG(rc);
758 free(plist);
759 free(newprocs);
760 return rc;
761 }
762 rc = opal_dss.unpack(buf, &nspace, &count, OPAL_STRING);
763 if (rc != OPAL_SUCCESS) {
764 OMPI_ERROR_LOG(rc);
765 free(plist);
766 free(newprocs);
767 return rc;
768 }
769 opal_pmix.register_jobid(new_name.jobid, nspace);
770 free(nspace);
771 rc = opal_dss.unpack(buf, &new_arch, &count, OPAL_UINT32);
772 if (rc != OPAL_SUCCESS) {
773 OMPI_ERROR_LOG(rc);
774 free(plist);
775 free(newprocs);
776 return rc;
777 }
778 rc = opal_dss.unpack(buf, &new_hostname, &count, OPAL_STRING);
779 if (rc != OPAL_SUCCESS) {
780 OMPI_ERROR_LOG(rc);
781 free(plist);
782 free(newprocs);
783 return rc;
784 }
785 /* see if this proc is already on our ompi_proc_list */
786 plist[i] = ompi_proc_find_and_add(&new_name, &isnew);
787 if (isnew) {
788 /* if not, then it was added, so update the values
789 * in the proc_t struct with the info that was passed
790 * to us
791 */
792 newprocs[newprocs_len++] = plist[i];
793
794 /* update all the values */
795 plist[i]->super.proc_arch = new_arch;
796 /* if arch is different than mine, create a new convertor for this proc */
797 if (plist[i]->super.proc_arch != opal_local_arch) {
798 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
799 OBJ_RELEASE(plist[i]->super.proc_convertor);
800 plist[i]->super.proc_convertor = opal_convertor_create(plist[i]->super.proc_arch, 0);
801 #else
802 opal_show_help("help-mpi-runtime.txt",
803 "heterogeneous-support-unavailable",
804 true, ompi_process_info.nodename,
805 new_hostname == NULL ? "<hostname unavailable>" :
806 new_hostname);
807 free(plist);
808 free(newprocs);
809 return OMPI_ERR_NOT_SUPPORTED;
810 #endif
811 }
812
813 if (NULL != new_hostname) {
814 if (0 == strcmp(ompi_proc_local_proc->super.proc_hostname, new_hostname)) {
815 plist[i]->super.proc_flags |= (OPAL_PROC_ON_NODE | OPAL_PROC_ON_CU | OPAL_PROC_ON_CLUSTER);
816 }
817
818 /* Save the hostname */
819 plist[i]->super.proc_hostname = new_hostname;
820 }
821 } else if (NULL != new_hostname) {
822 free(new_hostname);
823 }
824 }
825
826 if (NULL != newproclistsize) *newproclistsize = newprocs_len;
827 if (NULL != newproclist) {
828 *newproclist = newprocs;
829 } else if (newprocs != NULL) {
830 free(newprocs);
831 }
832
833 *proclist = plist;
834 return OMPI_SUCCESS;
835 }
836