1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2004-2006 The Trustees of Indiana University and Indiana
4  *                         University Research and Technology
5  *                         Corporation.  All rights reserved.
6  * Copyright (c) 2004-2011 The University of Tennessee and The University
7  *                         of Tennessee Research Foundation.  All rights
8  *                         reserved.
9  * Copyright (c) 2004-2006 High Performance Computing Center Stuttgart,
10  *                         University of Stuttgart.  All rights reserved.
11  * Copyright (c) 2004-2006 The Regents of the University of California.
12  *                         All rights reserved.
13  * Copyright (c) 2006-2015 Cisco Systems, Inc.  All rights reserved.
14  * Copyright (c) 2012-2015 Los Alamos National Security, LLC.  All rights
15  *                         reserved.
16  * Copyright (c) 2013-2015 Intel, Inc. All rights reserved
17  * Copyright (c) 2014-2017 Research Organization for Information Science
18  *                         and Technology (RIST). All rights reserved.
19  * Copyright (c) 2015-2017 Mellanox Technologies. All rights reserved.
20  *
21  * $COPYRIGHT$
22  *
23  * Additional copyrights may follow
24  *
25  * $HEADER$
26  */
27 
28 #include "ompi_config.h"
29 
30 #include <string.h>
31 #include <strings.h>
32 
33 #include "ompi/constants.h"
34 #include "opal/datatype/opal_convertor.h"
35 #include "opal/threads/mutex.h"
36 #include "opal/dss/dss.h"
37 #include "opal/util/arch.h"
38 #include "opal/util/show_help.h"
39 #include "opal/mca/hwloc/base/base.h"
40 #include "opal/mca/pmix/pmix.h"
41 #include "opal/util/argv.h"
42 
43 #include "ompi/proc/proc.h"
44 #include "ompi/datatype/ompi_datatype.h"
45 #include "ompi/runtime/mpiruntime.h"
46 #include "ompi/runtime/params.h"
47 #include "ompi/mca/pml/pml.h"
48 
49 opal_list_t  ompi_proc_list = {{0}};
50 static opal_mutex_t ompi_proc_lock;
51 static opal_hash_table_t ompi_proc_hash;
52 
53 ompi_proc_t* ompi_proc_local_proc = NULL;
54 
55 static void ompi_proc_construct(ompi_proc_t* proc);
56 static void ompi_proc_destruct(ompi_proc_t* proc);
57 static ompi_proc_t *ompi_proc_for_name_nolock (const opal_process_name_t proc_name);
58 
59 OBJ_CLASS_INSTANCE(
60     ompi_proc_t,
61     opal_proc_t,
62     ompi_proc_construct,
63     ompi_proc_destruct
64 );
65 
66 
ompi_proc_construct(ompi_proc_t * proc)67 void ompi_proc_construct(ompi_proc_t* proc)
68 {
69     bzero(proc->proc_endpoints, sizeof(proc->proc_endpoints));
70 
71     /* By default all processors are supposedly having the same architecture as me. Thus,
72      * by default we run in a homogeneous environment. Later, when the RTE can tell us
73      * the arch of the remote nodes, we will have to set the convertors to the correct
74      * architecture.
75      */
76     OBJ_RETAIN( ompi_mpi_local_convertor );
77     proc->super.proc_convertor = ompi_mpi_local_convertor;
78 }
79 
80 
ompi_proc_destruct(ompi_proc_t * proc)81 void ompi_proc_destruct(ompi_proc_t* proc)
82 {
83     /* As all the convertors are created with OBJ_NEW we can just call OBJ_RELEASE. All, except
84      * the local convertor, will get destroyed at some point here. If the reference count is correct
85      * the local convertor (who has the reference count increased in the datatype) will not get
86      * destroyed here. It will be destroyed later when the ompi_datatype_finalize is called.
87      */
88     OBJ_RELEASE( proc->super.proc_convertor );
89     if (NULL != proc->super.proc_hostname) {
90         free(proc->super.proc_hostname);
91     }
92     opal_mutex_lock (&ompi_proc_lock);
93     opal_list_remove_item(&ompi_proc_list, (opal_list_item_t*)proc);
94     opal_hash_table_remove_value_ptr (&ompi_proc_hash, &proc->super.proc_name, sizeof (proc->super.proc_name));
95     opal_mutex_unlock (&ompi_proc_lock);
96 }
97 
98 /**
99  * Allocate a new ompi_proc_T for the given jobid/vpid
100  *
101  * @param[in]  jobid Job identifier
102  * @param[in]  vpid  Process identifier
103  * @param[out] procp New ompi_proc_t structure
104  *
105  * This function allocates a new ompi_proc_t and inserts it into
106  * the process list and hash table.
107  */
ompi_proc_allocate(ompi_jobid_t jobid,ompi_vpid_t vpid,ompi_proc_t ** procp)108 static int ompi_proc_allocate (ompi_jobid_t jobid, ompi_vpid_t vpid, ompi_proc_t **procp) {
109     ompi_proc_t *proc = OBJ_NEW(ompi_proc_t);
110 
111     opal_list_append(&ompi_proc_list, (opal_list_item_t*)proc);
112 
113     OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = jobid;
114     OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid = vpid;
115 
116     opal_hash_table_set_value_ptr (&ompi_proc_hash, &proc->super.proc_name, sizeof (proc->super.proc_name),
117                                    proc);
118 
119     /* by default we consider process to be remote */
120     proc->super.proc_flags = OPAL_PROC_NON_LOCAL;
121     *procp = proc;
122 
123     return OMPI_SUCCESS;
124 }
125 
126 /**
127  * Finish setting up an ompi_proc_t
128  *
129  * @param[in] proc ompi process structure
130  *
131  * This function contains the core code of ompi_proc_complete_init() and
132  * ompi_proc_refresh(). The tasks performed by this function include
133  * retrieving the hostname (if below the modex cutoff), determining the
134  * remote architecture, and calculating the locality of the process.
135  */
ompi_proc_complete_init_single(ompi_proc_t * proc)136 int ompi_proc_complete_init_single (ompi_proc_t *proc)
137 {
138     int ret;
139 
140     if ((OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid == OMPI_PROC_MY_NAME->jobid) &&
141         (OMPI_CAST_RTE_NAME(&proc->super.proc_name)->vpid  == OMPI_PROC_MY_NAME->vpid)) {
142         /* nothing else to do */
143         return OMPI_SUCCESS;
144     }
145 
146     /* we can retrieve the hostname at no cost because it
147      * was provided at startup - but make it optional so
148      * we don't chase after it if some system doesn't
149      * provide it */
150     proc->super.proc_hostname = NULL;
151     OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_HOSTNAME, &proc->super.proc_name,
152                                    (char**)&(proc->super.proc_hostname), OPAL_STRING);
153 
154 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
155     /* get the remote architecture - this might force a modex except
156      * for those environments where the RM provides it */
157     {
158         uint32_t *ui32ptr;
159         ui32ptr = &(proc->super.proc_arch);
160         OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_ARCH, &proc->super.proc_name,
161                               (void**)&ui32ptr, OPAL_UINT32);
162         if (OPAL_SUCCESS == ret) {
163             /* if arch is different than mine, create a new convertor for this proc */
164             if (proc->super.proc_arch != opal_local_arch) {
165                 OBJ_RELEASE(proc->super.proc_convertor);
166                 proc->super.proc_convertor = opal_convertor_create(proc->super.proc_arch, 0);
167             }
168         } else if (OMPI_ERR_NOT_IMPLEMENTED == ret) {
169             proc->super.proc_arch = opal_local_arch;
170         } else {
171             return ret;
172         }
173     }
174 #else
175     /* must be same arch as my own */
176     proc->super.proc_arch = opal_local_arch;
177 #endif
178 
179     return OMPI_SUCCESS;
180 }
181 
ompi_proc_lookup(const opal_process_name_t proc_name)182 opal_proc_t *ompi_proc_lookup (const opal_process_name_t proc_name)
183 {
184     ompi_proc_t *proc = NULL;
185     int ret;
186 
187     /* try to lookup the value in the hash table */
188     ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
189 
190     if (OPAL_SUCCESS == ret) {
191         return &proc->super;
192     }
193 
194     return NULL;
195 }
196 
ompi_proc_for_name_nolock(const opal_process_name_t proc_name)197 static ompi_proc_t *ompi_proc_for_name_nolock (const opal_process_name_t proc_name)
198 {
199     ompi_proc_t *proc = NULL;
200     int ret;
201 
202     /* double-check that another competing thread has not added this proc */
203     ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
204     if (OPAL_SUCCESS == ret) {
205         goto exit;
206     }
207 
208     /* allocate a new ompi_proc_t object for the process and insert it into the process table */
209     ret = ompi_proc_allocate (proc_name.jobid, proc_name.vpid, &proc);
210     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
211         /* allocation fail */
212         goto exit;
213     }
214 
215     /* finish filling in the important proc data fields */
216     ret = ompi_proc_complete_init_single (proc);
217     if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
218         goto exit;
219     }
220 exit:
221     return proc;
222 }
223 
ompi_proc_for_name(const opal_process_name_t proc_name)224 opal_proc_t *ompi_proc_for_name (const opal_process_name_t proc_name)
225 {
226     ompi_proc_t *proc = NULL;
227     int ret;
228 
229     /* try to lookup the value in the hash table */
230     ret = opal_hash_table_get_value_ptr (&ompi_proc_hash, &proc_name, sizeof (proc_name), (void **) &proc);
231     if (OPAL_SUCCESS == ret) {
232         return &proc->super;
233     }
234 
235     opal_mutex_lock (&ompi_proc_lock);
236     proc = ompi_proc_for_name_nolock (proc_name);
237     opal_mutex_unlock (&ompi_proc_lock);
238 
239     return (opal_proc_t *) proc;
240 }
241 
ompi_proc_init(void)242 int ompi_proc_init(void)
243 {
244     int opal_proc_hash_init_size = (ompi_process_info.num_procs < ompi_add_procs_cutoff) ? ompi_process_info.num_procs :
245         1024;
246     ompi_proc_t *proc;
247     int ret;
248 
249     OBJ_CONSTRUCT(&ompi_proc_list, opal_list_t);
250     OBJ_CONSTRUCT(&ompi_proc_lock, opal_mutex_t);
251     OBJ_CONSTRUCT(&ompi_proc_hash, opal_hash_table_t);
252 
253     ret = opal_hash_table_init (&ompi_proc_hash, opal_proc_hash_init_size);
254     if (OPAL_SUCCESS != ret) {
255         return ret;
256     }
257 
258     /* create a proc for the local process */
259     ret = ompi_proc_allocate (OMPI_PROC_MY_NAME->jobid, OMPI_PROC_MY_NAME->vpid, &proc);
260     if (OMPI_SUCCESS != ret) {
261         return OMPI_ERR_OUT_OF_RESOURCE;
262     }
263 
264     /* set local process data */
265     ompi_proc_local_proc = proc;
266     proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
267     proc->super.proc_hostname = strdup(ompi_process_info.nodename);
268     proc->super.proc_arch = opal_local_arch;
269     /* Register the local proc with OPAL */
270     opal_proc_local_set(&proc->super);
271 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
272     /* add our arch to the modex */
273     OPAL_MODEX_SEND_VALUE(ret, OPAL_PMIX_GLOBAL,
274                           OPAL_PMIX_ARCH, &opal_local_arch, OPAL_UINT32);
275     if (OPAL_SUCCESS != ret) {
276         return ret;
277     }
278 #endif
279 
280     return OMPI_SUCCESS;
281 }
282 
ompi_proc_compare_vid(opal_list_item_t ** a,opal_list_item_t ** b)283 static int ompi_proc_compare_vid (opal_list_item_t **a, opal_list_item_t **b)
284 {
285     ompi_proc_t *proca = (ompi_proc_t *) *a;
286     ompi_proc_t *procb = (ompi_proc_t *) *b;
287 
288     if (proca->super.proc_name.vpid > procb->super.proc_name.vpid) {
289         return 1;
290     } else {
291         return -1;
292     }
293 
294     /* they should never be equal */
295 }
296 
297 /**
298  * The process creation is split into two steps. The second step
299  * is the important one, it sets the properties of the remote
300  * process, such as architecture, node name and locality flags.
301  *
302  * This function is to be called __only__ after the modex exchange
303  * has been performed, in order to allow the modex to carry the data
304  * instead of requiring the runtime to provide it.
305  */
ompi_proc_complete_init(void)306 int ompi_proc_complete_init(void)
307 {
308     opal_process_name_t wildcard_rank;
309     ompi_proc_t *proc;
310     int ret, errcode = OMPI_SUCCESS;
311     char *val;
312 
313     opal_mutex_lock (&ompi_proc_lock);
314 
315     /* Add all local peers first */
316     wildcard_rank.jobid = OMPI_PROC_MY_NAME->jobid;
317     wildcard_rank.vpid = OMPI_NAME_WILDCARD->vpid;
318     /* retrieve the local peers */
319     OPAL_MODEX_RECV_VALUE(ret, OPAL_PMIX_LOCAL_PEERS,
320                           &wildcard_rank, &val, OPAL_STRING);
321     if (OPAL_SUCCESS == ret && NULL != val) {
322         char **peers = opal_argv_split(val, ',');
323         int i;
324         free(val);
325         for (i=0; NULL != peers[i]; i++) {
326             ompi_vpid_t local_rank = strtoul(peers[i], NULL, 10);
327             uint16_t u16, *u16ptr = &u16;
328             if (OMPI_PROC_MY_NAME->vpid == local_rank) {
329                 continue;
330             }
331             ret = ompi_proc_allocate (OMPI_PROC_MY_NAME->jobid, local_rank, &proc);
332             if (OMPI_SUCCESS != ret) {
333                 return ret;
334             }
335             /* get the locality information - all RTEs are required
336              * to provide this information at startup */
337             OPAL_MODEX_RECV_VALUE_OPTIONAL(ret, OPAL_PMIX_LOCALITY, &proc->super.proc_name, &u16ptr, OPAL_UINT16);
338             if (OPAL_SUCCESS == ret) {
339                 proc->super.proc_flags = u16;
340             }
341         }
342         opal_argv_free(peers);
343     }
344 
345     /* Complete initialization of node-local procs */
346     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
347         ret = ompi_proc_complete_init_single (proc);
348         if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
349             errcode = ret;
350             break;
351         }
352     }
353 
354     /* if cutoff is larger than # of procs - add all processes
355      * NOTE that local procs will be automatically skipped as they
356      * are already in the hash table
357      */
358     if (ompi_process_info.num_procs < ompi_add_procs_cutoff) {
359         /* sinse ompi_proc_for_name is locking internally -
360          * we need to release lock here
361          */
362         opal_mutex_unlock (&ompi_proc_lock);
363 
364         for (ompi_vpid_t i = 0 ; i < ompi_process_info.num_procs ; ++i ) {
365             opal_process_name_t proc_name;
366             proc_name.jobid = OMPI_PROC_MY_NAME->jobid;
367             proc_name.vpid = i;
368             (void) ompi_proc_for_name (proc_name);
369         }
370 
371         /* acquire lock back for the next step - sort */
372         opal_mutex_lock (&ompi_proc_lock);
373     }
374 
375     opal_list_sort (&ompi_proc_list, ompi_proc_compare_vid);
376 
377     opal_mutex_unlock (&ompi_proc_lock);
378 
379     return errcode;
380 }
381 
ompi_proc_finalize(void)382 int ompi_proc_finalize (void)
383 {
384     ompi_proc_t *proc;
385 
386     /* Unregister the local proc from OPAL */
387     opal_proc_local_set(NULL);
388 
389     /* remove all items from list and destroy them. Since we cannot know
390      * the reference count of the procs for certain, it is possible that
391      * a single OBJ_RELEASE won't drive the count to zero, and hence will
392      * not release the memory. Accordingly, we cycle through the list here,
393      * calling release on each item.
394      *
395      * This will cycle until it forces the reference count of each item
396      * to zero, thus causing the destructor to run - which will remove
397      * the item from the list!
398      *
399      * We cannot do this under the thread lock as the destructor will
400      * call it when removing the item from the list. However, this function
401      * is ONLY called from MPI_Finalize, and all threads are prohibited from
402      * calling an MPI function once ANY thread has called MPI_Finalize. Of
403      * course, multiple threads are allowed to call MPI_Finalize, so this
404      * function may get called multiple times by various threads. We believe
405      * it is thread safe to do so...though it may not -appear- to be so
406      * without walking through the entire list/destructor sequence.
407      */
408     while ((ompi_proc_t *)opal_list_get_end(&ompi_proc_list) != (proc = (ompi_proc_t *)opal_list_get_first(&ompi_proc_list))) {
409         OBJ_RELEASE(proc);
410     }
411     /* now destruct the list and thread lock */
412     OBJ_DESTRUCT(&ompi_proc_list);
413     OBJ_DESTRUCT(&ompi_proc_lock);
414     OBJ_DESTRUCT(&ompi_proc_hash);
415 
416     return OMPI_SUCCESS;
417 }
418 
ompi_proc_world_size(void)419 int ompi_proc_world_size (void)
420 {
421     return ompi_process_info.num_procs;
422 }
423 
ompi_proc_get_allocated(size_t * size)424 ompi_proc_t **ompi_proc_get_allocated (size_t *size)
425 {
426     ompi_proc_t **procs;
427     ompi_proc_t *proc;
428     size_t count = 0;
429     ompi_rte_cmp_bitmask_t mask;
430     ompi_process_name_t my_name;
431 
432     /* check bozo case */
433     if (NULL == ompi_proc_local_proc) {
434         return NULL;
435     }
436     mask = OMPI_RTE_CMP_JOBID;
437     my_name = *OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name);
438 
439     /* First count how many match this jobid */
440     opal_mutex_lock (&ompi_proc_lock);
441     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
442         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, OMPI_CAST_RTE_NAME(&proc->super.proc_name), &my_name)) {
443             ++count;
444         }
445     }
446 
447     /* allocate an array */
448     procs = (ompi_proc_t**) malloc(count * sizeof(ompi_proc_t*));
449     if (NULL == procs) {
450         opal_mutex_unlock (&ompi_proc_lock);
451         return NULL;
452     }
453 
454     /* now save only the procs that match this jobid */
455     count = 0;
456     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
457         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, &my_name)) {
458             /* DO NOT RETAIN THIS OBJECT - the reference count on this
459              * object will be adjusted by external callers. The intent
460              * here is to allow the reference count to drop to zero if
461              * the app no longer desires to communicate with this proc.
462              * For example, the proc may call comm_disconnect on all
463              * communicators involving this proc. In such cases, we want
464              * the proc object to be removed from the list. By not incrementing
465              * the reference count here, we allow this to occur.
466              *
467              * We don't implement that yet, but we are still safe for now as
468              * the OBJ_NEW in ompi_proc_init owns the initial reference
469              * count which cannot be released until ompi_proc_finalize is
470              * called.
471              */
472             procs[count++] = proc;
473         }
474     }
475     opal_mutex_unlock (&ompi_proc_lock);
476 
477     *size = count;
478     return procs;
479 }
480 
ompi_proc_world(size_t * size)481 ompi_proc_t **ompi_proc_world (size_t *size)
482 {
483     ompi_proc_t **procs;
484     size_t count = 0;
485 
486     /* check bozo case */
487     if (NULL == ompi_proc_local_proc) {
488         return NULL;
489     }
490 
491     /* First count how many match this jobid (we already know this from our process info) */
492     count = ompi_process_info.num_procs;
493 
494     /* allocate an array */
495     procs = (ompi_proc_t **) malloc (count * sizeof(ompi_proc_t*));
496     if (NULL == procs) {
497         return NULL;
498     }
499 
500     /* now get/allocate all the procs in this jobid */
501     for (size_t i = 0 ; i < count ; ++i) {
502         opal_process_name_t name = {.jobid = OMPI_CAST_RTE_NAME(&ompi_proc_local_proc->super.proc_name)->jobid,
503                                     .vpid = i};
504 
505         /* DO NOT RETAIN THIS OBJECT - the reference count on this
506          * object will be adjusted by external callers. The intent
507          * here is to allow the reference count to drop to zero if
508          * the app no longer desires to communicate with this proc.
509          * For example, the proc may call comm_disconnect on all
510          * communicators involving this proc. In such cases, we want
511          * the proc object to be removed from the list. By not incrementing
512          * the reference count here, we allow this to occur.
513          *
514          * We don't implement that yet, but we are still safe for now as
515          * the OBJ_NEW in ompi_proc_init owns the initial reference
516          * count which cannot be released until ompi_proc_finalize is
517          * called.
518          */
519         procs[i] = (ompi_proc_t*)ompi_proc_for_name (name);
520     }
521 
522     *size = count;
523 
524     return procs;
525 }
526 
527 
ompi_proc_all(size_t * size)528 ompi_proc_t** ompi_proc_all(size_t* size)
529 {
530     ompi_proc_t **procs =
531         (ompi_proc_t**) malloc(opal_list_get_size(&ompi_proc_list) * sizeof(ompi_proc_t*));
532     ompi_proc_t *proc;
533     size_t count = 0;
534 
535     if (NULL == procs) {
536         return NULL;
537     }
538 
539     opal_mutex_lock (&ompi_proc_lock);
540     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
541         /* We know this isn't consistent with the behavior in ompi_proc_world,
542          * but we are leaving the RETAIN for now because the code using this function
543          * assumes that the results need to be released when done. It will
544          * be cleaned up later as the "fix" will impact other places in
545          * the code
546          */
547         OBJ_RETAIN(proc);
548         procs[count++] = proc;
549     }
550     opal_mutex_unlock (&ompi_proc_lock);
551     *size = count;
552     return procs;
553 }
554 
555 
ompi_proc_self(size_t * size)556 ompi_proc_t** ompi_proc_self(size_t* size)
557 {
558     ompi_proc_t **procs = (ompi_proc_t**) malloc(sizeof(ompi_proc_t*));
559     if (NULL == procs) {
560         return NULL;
561     }
562     /* We know this isn't consistent with the behavior in ompi_proc_world,
563      * but we are leaving the RETAIN for now because the code using this function
564      * assumes that the results need to be released when done. It will
565      * be cleaned up later as the "fix" will impact other places in
566      * the code
567      */
568     OBJ_RETAIN(ompi_proc_local_proc);
569     *procs = ompi_proc_local_proc;
570     *size = 1;
571     return procs;
572 }
573 
ompi_proc_find(const ompi_process_name_t * name)574 ompi_proc_t * ompi_proc_find ( const ompi_process_name_t * name )
575 {
576     ompi_proc_t *proc, *rproc=NULL;
577     ompi_rte_cmp_bitmask_t mask;
578 
579     /* return the proc-struct which matches this jobid+process id */
580     mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
581     opal_mutex_lock (&ompi_proc_lock);
582     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
583         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
584             rproc = proc;
585             break;
586         }
587     }
588     opal_mutex_unlock (&ompi_proc_lock);
589 
590     return rproc;
591 }
592 
593 
ompi_proc_refresh(void)594 int ompi_proc_refresh(void)
595 {
596     ompi_proc_t *proc = NULL;
597     ompi_vpid_t i = 0;
598     int ret=OMPI_SUCCESS;
599 
600     opal_mutex_lock (&ompi_proc_lock);
601 
602     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
603         /* Does not change: proc->super.proc_name.vpid */
604         OMPI_CAST_RTE_NAME(&proc->super.proc_name)->jobid = OMPI_PROC_MY_NAME->jobid;
605 
606         /* Make sure to clear the local flag before we set it below */
607         proc->super.proc_flags = 0;
608 
609         if (i == OMPI_PROC_MY_NAME->vpid) {
610             ompi_proc_local_proc = proc;
611             proc->super.proc_flags = OPAL_PROC_ALL_LOCAL;
612             proc->super.proc_hostname = ompi_process_info.nodename;
613             proc->super.proc_arch = opal_local_arch;
614             opal_proc_local_set(&proc->super);
615         } else {
616             ret = ompi_proc_complete_init_single (proc);
617             if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
618                 break;
619             }
620         }
621     }
622 
623     opal_mutex_unlock (&ompi_proc_lock);
624 
625     return ret;
626 }
627 
628 int
ompi_proc_pack(ompi_proc_t ** proclist,int proclistsize,opal_buffer_t * buf)629 ompi_proc_pack(ompi_proc_t **proclist, int proclistsize,
630                opal_buffer_t* buf)
631 {
632     int rc;
633     char *nspace;
634 
635     opal_mutex_lock (&ompi_proc_lock);
636 
637     /* cycle through the provided array, packing the OMPI level
638      * data for each proc. This data may or may not be included
639      * in any subsequent modex operation, so we include it here
640      * to ensure completion of a connect/accept handshake. See
641      * the ompi/mca/dpm framework for an example of where and how
642      * this info is used.
643      *
644      * Eventually, we will review the procedures that call this
645      * function to see if duplication of communication can be
646      * reduced. For now, just go ahead and pack the info so it
647      * can be sent.
648      */
649     for (int i = 0 ; i < proclistsize ; ++i) {
650         ompi_proc_t *proc = proclist[i];
651 
652         if (ompi_proc_is_sentinel (proc)) {
653             proc = ompi_proc_for_name_nolock (ompi_proc_sentinel_to_name ((uintptr_t) proc));
654         }
655 
656         /* send proc name */
657         rc = opal_dss.pack(buf, &(proc->super.proc_name), 1, OMPI_NAME);
658         if(rc != OPAL_SUCCESS) {
659             OMPI_ERROR_LOG(rc);
660             opal_mutex_unlock (&ompi_proc_lock);
661             return rc;
662         }
663         /* retrieve and send the corresponding nspace for this job
664          * as the remote side may not know the translation */
665         nspace = (char*)opal_pmix.get_nspace(proc->super.proc_name.jobid);
666         rc = opal_dss.pack(buf, &nspace, 1, OPAL_STRING);
667         if(rc != OPAL_SUCCESS) {
668             OMPI_ERROR_LOG(rc);
669             opal_mutex_unlock (&ompi_proc_lock);
670             return rc;
671         }
672         /* pack architecture flag */
673         rc = opal_dss.pack(buf, &(proc->super.proc_arch), 1, OPAL_UINT32);
674         if(rc != OPAL_SUCCESS) {
675             OMPI_ERROR_LOG(rc);
676             opal_mutex_unlock (&ompi_proc_lock);
677             return rc;
678         }
679         /* pass the name of the host this proc is on */
680         rc = opal_dss.pack(buf, &(proc->super.proc_hostname), 1, OPAL_STRING);
681         if(rc != OPAL_SUCCESS) {
682             OMPI_ERROR_LOG(rc);
683             opal_mutex_unlock (&ompi_proc_lock);
684             return rc;
685         }
686     }
687     opal_mutex_unlock (&ompi_proc_lock);
688     return OMPI_SUCCESS;
689 }
690 
691 ompi_proc_t *
ompi_proc_find_and_add(const ompi_process_name_t * name,bool * isnew)692 ompi_proc_find_and_add(const ompi_process_name_t * name, bool* isnew)
693 {
694     ompi_proc_t *proc, *rproc = NULL;
695     ompi_rte_cmp_bitmask_t mask;
696 
697     /* return the proc-struct which matches this jobid+process id */
698     mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
699     opal_mutex_lock (&ompi_proc_lock);
700     OPAL_LIST_FOREACH(proc, &ompi_proc_list, ompi_proc_t) {
701         if (OPAL_EQUAL == ompi_rte_compare_name_fields(mask, &proc->super.proc_name, name)) {
702             rproc = proc;
703             *isnew = false;
704             break;
705         }
706     }
707 
708     /* if we didn't find this proc in the list, create a new
709      * proc_t and append it to the list
710      */
711     if (NULL == rproc) {
712         *isnew = true;
713         ompi_proc_allocate (name->jobid, name->vpid, &rproc);
714     }
715 
716     opal_mutex_unlock (&ompi_proc_lock);
717 
718     return rproc;
719 }
720 
721 
722 int
ompi_proc_unpack(opal_buffer_t * buf,int proclistsize,ompi_proc_t *** proclist,int * newproclistsize,ompi_proc_t *** newproclist)723 ompi_proc_unpack(opal_buffer_t* buf,
724                  int proclistsize, ompi_proc_t ***proclist,
725                  int *newproclistsize, ompi_proc_t ***newproclist)
726 {
727     size_t newprocs_len = 0;
728     ompi_proc_t **plist=NULL, **newprocs = NULL;
729 
730     /* do not free plist *ever*, since it is used in the remote group
731        structure of a communicator */
732     plist = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
733     if ( NULL == plist ) {
734         return OMPI_ERR_OUT_OF_RESOURCE;
735     }
736     /* free this on the way out */
737     newprocs = (ompi_proc_t **) calloc (proclistsize, sizeof (ompi_proc_t *));
738     if (NULL == newprocs) {
739         free(plist);
740         return OMPI_ERR_OUT_OF_RESOURCE;
741     }
742 
743     /* cycle through the array of provided procs and unpack
744      * their info - as packed by ompi_proc_pack
745      */
746     for (int i = 0; i < proclistsize ; ++i){
747         int32_t count=1;
748         ompi_process_name_t new_name;
749         uint32_t new_arch;
750         char *new_hostname;
751         bool isnew = false;
752         int rc;
753         char *nspace;
754 
755         rc = opal_dss.unpack(buf, &new_name, &count, OMPI_NAME);
756         if (rc != OPAL_SUCCESS) {
757             OMPI_ERROR_LOG(rc);
758             free(plist);
759             free(newprocs);
760             return rc;
761         }
762         rc = opal_dss.unpack(buf, &nspace, &count, OPAL_STRING);
763         if (rc != OPAL_SUCCESS) {
764             OMPI_ERROR_LOG(rc);
765             free(plist);
766             free(newprocs);
767             return rc;
768         }
769         opal_pmix.register_jobid(new_name.jobid, nspace);
770         free(nspace);
771         rc = opal_dss.unpack(buf, &new_arch, &count, OPAL_UINT32);
772         if (rc != OPAL_SUCCESS) {
773             OMPI_ERROR_LOG(rc);
774             free(plist);
775             free(newprocs);
776             return rc;
777         }
778         rc = opal_dss.unpack(buf, &new_hostname, &count, OPAL_STRING);
779         if (rc != OPAL_SUCCESS) {
780             OMPI_ERROR_LOG(rc);
781             free(plist);
782             free(newprocs);
783             return rc;
784         }
785         /* see if this proc is already on our ompi_proc_list */
786         plist[i] = ompi_proc_find_and_add(&new_name, &isnew);
787         if (isnew) {
788             /* if not, then it was added, so update the values
789              * in the proc_t struct with the info that was passed
790              * to us
791              */
792             newprocs[newprocs_len++] = plist[i];
793 
794             /* update all the values */
795             plist[i]->super.proc_arch = new_arch;
796             /* if arch is different than mine, create a new convertor for this proc */
797             if (plist[i]->super.proc_arch != opal_local_arch) {
798 #if OPAL_ENABLE_HETEROGENEOUS_SUPPORT
799                 OBJ_RELEASE(plist[i]->super.proc_convertor);
800                 plist[i]->super.proc_convertor = opal_convertor_create(plist[i]->super.proc_arch, 0);
801 #else
802                 opal_show_help("help-mpi-runtime.txt",
803                                "heterogeneous-support-unavailable",
804                                true, ompi_process_info.nodename,
805                                new_hostname == NULL ? "<hostname unavailable>" :
806                                new_hostname);
807                 free(plist);
808                 free(newprocs);
809                 return OMPI_ERR_NOT_SUPPORTED;
810 #endif
811             }
812 
813             if (NULL != new_hostname) {
814                 if (0 == strcmp(ompi_proc_local_proc->super.proc_hostname, new_hostname)) {
815                     plist[i]->super.proc_flags |= (OPAL_PROC_ON_NODE | OPAL_PROC_ON_CU | OPAL_PROC_ON_CLUSTER);
816                 }
817 
818                 /* Save the hostname */
819                 plist[i]->super.proc_hostname = new_hostname;
820             }
821         } else if (NULL != new_hostname) {
822             free(new_hostname);
823         }
824     }
825 
826     if (NULL != newproclistsize) *newproclistsize = newprocs_len;
827     if (NULL != newproclist) {
828         *newproclist = newprocs;
829     } else if (newprocs != NULL) {
830         free(newprocs);
831     }
832 
833     *proclist = plist;
834     return OMPI_SUCCESS;
835 }
836