1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2007      The Trustees of Indiana University.
4  *                         All rights reserved.
5  * Copyright (c) 2011-2016 Cisco Systems, Inc.  All rights reserved.
6  * Copyright (c) 2011-2017 Los Alamos National Security, LLC. All
7  *                         rights reserved.
8  * Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
9  * Copyright (c) 2014-2016 Research Organization for Information Science
10  *                         and Technology (RIST). All rights reserved.
11  * $COPYRIGHT$
12  *
13  * Additional copyrights may follow
14  *
15  * $HEADER$
16  */
17 
18 #include "opal_config.h"
19 #include "opal/constants.h"
20 #include "opal/types.h"
21 #include "opal_stdint.h"
22 #include "opal/mca/hwloc/base/base.h"
23 #include "opal/util/argv.h"
24 #include "opal/util/opal_environ.h"
25 #include "opal/util/output.h"
26 #include "opal/util/proc.h"
27 #include "opal/util/output.h"
28 #include "opal/util/show_help.h"
29 #include "opal/util/opal_getcwd.h"
30 #include "opal/constants.h"
31 #include "opal/mca/pmix/base/base.h"
32 #include "opal/mca/pmix/base/pmix_base_hash.h"
33 #include "pmix_cray.h"
34 
35 static char cray_pmi_version[128];
36 
37 static int cray_init(opal_list_t *ilist);
38 static int cray_fini(void);
39 static int cray_initialized(void);
40 static int cray_abort(int flat, const char *msg,
41                       opal_list_t *procs);
42 static int cray_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
43 static int cray_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
44                          opal_pmix_spawn_cbfunc_t cbfunc,
45                          void *cbdata);
46 static int cray_job_connect(opal_list_t *procs);
47 static int cray_job_disconnect(opal_list_t *procs);
48 static int cray_job_disconnect_nb(opal_list_t *procs,
49                                   opal_pmix_op_cbfunc_t cbfunc,
50                                   void *cbdata);
51 static int cray_resolve_peers(const char *nodename,
52                               opal_jobid_t jobid,
53                               opal_list_t *procs);
54 static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist);
55 static int cray_put(opal_pmix_scope_t scope, opal_value_t *kv);
56 static int cray_fence(opal_list_t *procs, int collect_data);
57 static int cray_fencenb(opal_list_t *procs, int collect_data,
58                         opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
59 static int cray_commit(void);
60 static int cray_get(const opal_process_name_t *id,
61                     const char *key, opal_list_t *info,
62                     opal_value_t **kv);
63 static int cray_get_nb(const opal_process_name_t *id, const char *key,
64                        opal_list_t *info,
65                        opal_pmix_value_cbfunc_t cbfunc, void *cbdata);
66 static int cray_publish(opal_list_t *info);
67 static int cray_publish_nb(opal_list_t *info,
68                            opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
69 static int cray_lookup(opal_list_t *data, opal_list_t *info);
70 static int cray_lookup_nb(char **keys, opal_list_t *info,
71                           opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
72 static int cray_unpublish(char **keys, opal_list_t *info);
73 static int cray_unpublish_nb(char **keys, opal_list_t *info,
74                             opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
75 static const char *cray_get_version(void);
76 static int cray_store_local(const opal_process_name_t *proc,
77                           opal_value_t *val);
78 static const char *cray_get_nspace(opal_jobid_t jobid);
79 static void cray_register_jobid(opal_jobid_t jobid, const char *nspace);
80 
81 #if 0
82 static bool cray_get_attr(const char *attr, opal_value_t **kv);
83 #endif
84 
85 const opal_pmix_base_module_t opal_pmix_cray_module = {
86     .init = cray_init,
87     .finalize = cray_fini,
88     .initialized = cray_initialized,
89     .abort = cray_abort,
90     .commit = cray_commit,
91     .fence = cray_fence,
92     .fence_nb = cray_fencenb,
93     .put = cray_put,
94     .get = cray_get,
95     .get_nb = cray_get_nb,
96     .publish = cray_publish,
97     .publish_nb = cray_publish_nb,
98     .lookup = cray_lookup,
99     .lookup_nb = cray_lookup_nb,
100     .unpublish = cray_unpublish,
101     .unpublish_nb = cray_unpublish_nb,
102     .spawn = cray_spawn,
103     .spawn_nb = cray_spawn_nb,
104     .connect = cray_job_connect,
105     .disconnect = cray_job_disconnect,
106     .disconnect_nb = cray_job_disconnect_nb,
107     .resolve_peers = cray_resolve_peers,
108     .resolve_nodes = cray_resolve_nodes,
109     .get_version = cray_get_version,
110     .register_evhandler = opal_pmix_base_register_handler,
111     .deregister_evhandler = opal_pmix_base_deregister_handler,
112     .store_local = cray_store_local,
113     .get_nspace = cray_get_nspace,
114     .register_jobid = cray_register_jobid
115 };
116 
117 // usage accounting
118 static int pmix_init_count = 0;
119 
120 // local object
121 typedef struct {
122     opal_object_t super;
123     opal_event_t ev;
124     opal_pmix_op_cbfunc_t opcbfunc;
125     void *cbdata;
126 } pmi_opcaddy_t;
127 static OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
128                           opal_object_t,
129                           NULL, NULL);
130 
131 struct fence_result {
132     volatile int flag;
133     int status;
134 };
135 
136 // PMI constant values:
137 static int pmix_kvslen_max = 0;
138 static int pmix_keylen_max = 0;
139 static int pmix_vallen_max = 0;
140 static int pmix_vallen_threshold = INT_MAX;
141 
142 // Job environment description
143 static int pmix_size = 0;
144 static int pmix_rank = 0;
145 static int pmix_lrank = 0;
146 static int pmix_nrank = 0;
147 static int pmix_nlranks = 0;
148 static int pmix_appnum = 0;
149 static int pmix_usize = 0;
150 static char *pmix_kvs_name = NULL;
151 static int *pmix_lranks = NULL;
152 static opal_process_name_t pmix_pname;
153 static uint32_t pmix_jobid = -1;
154 
155 static char* pmix_error(int pmix_err);
156 #define OPAL_PMI_ERROR(pmi_err, pmi_func)                       \
157     do {                                                        \
158         opal_output(0, "%s [%s:%d:%s]: %s\n",                   \
159                     pmi_func, __FILE__, __LINE__, __func__,     \
160                     pmix_error(pmi_err));                       \
161     } while(0);
162 
163 #define CRAY_WAIT_FOR_COMPLETION(a)               \
164     do {                                          \
165         while ((a)) {                             \
166             usleep(10);                           \
167         }                                         \
168     } while (0)
169 
cray_get_more_info(void)170 static void cray_get_more_info(void)
171 {
172     int alps_status = 0, i;
173     uint64_t apid;
174     size_t alps_count;
175     int lli_ret = 0, place_ret;
176     alpsAppLayout_t layout;
177     char *npstring;
178     char *firstrankstring;
179     char **nps, **firstranks;
180     int *base_pe_in_app;
181     int *pes_in_app;
182     char pbuf[OPAL_PATH_MAX];
183 
184     /*
185      * First get our apid
186      */
187 
188     lli_ret = alps_app_lli_lock();
189     if (0 != lli_ret) {
190         OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
191                              "%s pmix:cray: alps_app_lli_lock returned %d",
192                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
193         goto fn_exit;
194     }
195 
196     lli_ret = alps_app_lli_put_request(ALPS_APP_LLI_ALPS_REQ_APID, NULL, 0);
197     if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
198         OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
199                              "%s pmix:cray: alps_app_lli_put_request - APID returned %d",
200                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
201         goto fn_exit_w_lock;
202     }
203 
204     lli_ret = alps_app_lli_get_response (&alps_status, &alps_count);
205     if (ALPS_APP_LLI_ALPS_STAT_OK != alps_status) {
206         OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
207                              "%s pmix:cray: alps_app_lli_get_response returned %d",
208                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), alps_status));
209         goto fn_exit_w_lock;
210     }
211 
212     lli_ret = alps_app_lli_get_response_bytes (&apid, sizeof(apid));
213     if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
214         OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
215                              "%s pmix:cray: alps_app_lli_get_response_bytes returned %d",
216                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
217         goto fn_exit_w_lock;
218     }
219 
220     /*
221      * get some items from alps placement file
222      */
223 
224     place_ret = alps_get_placement_info(apid,
225                                         &layout,
226                                         NULL,
227                                         NULL,
228                                         NULL,
229                                         NULL,
230                                         NULL,
231                                         &base_pe_in_app,
232                                         &pes_in_app,
233                                         NULL,
234                                         NULL);
235     if (1 != place_ret) {
236         OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
237                              "%s pmix:cray: alps_get_placement_info returned %d (%s)",
238                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), place_ret, strerror(errno)));
239         goto fn_exit;
240     }
241 
242     OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
243                            "%s pmix:cray: alps_get_placement_info returned %d first pe on node is %d",
244                             OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), place_ret, layout.firstPe));
245 
246     nps = NULL;
247     firstranks = NULL;
248     for (i=0; i < layout.numCmds; i++) {
249         snprintf(pbuf, sizeof(pbuf), "%d", pes_in_app[i]);
250         opal_argv_append_nosize(&nps, pbuf);
251         snprintf(pbuf, sizeof(pbuf), "%d", base_pe_in_app[i]);
252         opal_argv_append_nosize(&firstranks, pbuf);
253     }
254 
255     npstring = opal_argv_join(nps, ' ');
256     firstrankstring = opal_argv_join(firstranks, ' ');
257     opal_argv_free(nps);
258     opal_argv_free(firstranks);
259 
260     /*
261      * stuff values into environment variables
262      */
263 
264     /* add these envars to prep MPI-2 info pre-defined key/values */
265     snprintf(pbuf, sizeof(pbuf), "%d", layout.numCmds);
266     opal_setenv("OMPI_NUM_APP_CTX", pbuf, true, &environ);
267     opal_setenv("OMPI_FIRST_RANKS", firstrankstring, true, &environ);
268     opal_setenv("OMPI_APP_CTX_NUM_PROCS", npstring, true, &environ);
269     free(firstrankstring);
270     free(npstring);
271     free(base_pe_in_app);
272     free(pes_in_app);
273 
274     /*
275      * ALPS always starts the application in the directory
276      * where the aprun command was run to do the launch.
277      * For SLURM, we have to check the SLURM_WORKING_DIR env.
278      * variable.  If it is set, we can't set wdir since
279      * we can't assume PWD is where we started.
280      */
281     if(getenv("SLURM_WORKING_DIR") == NULL) {
282         opal_getcwd(pbuf, OPAL_PATH_MAX);
283         opal_setenv("OMPI_MCA_initial_wdir", pbuf, true, &environ);
284     }
285 
286    fn_exit_w_lock:
287     lli_ret = alps_app_lli_unlock();
288     if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
289         OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
290                              "%s pmix:cray: alps_app_lli_unlock returned %d",
291                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
292     }
293 
294    fn_exit:
295     return;
296 }
297 
cray_init(opal_list_t * ilist)298 static int cray_init(opal_list_t *ilist)
299 {
300     int i, spawned, size, rank, appnum, my_node;
301     int rc, ret = OPAL_ERROR;
302     char *pmapping = NULL;
303     char buf[PMI2_MAX_ATTRVALUE];
304     int found;
305     int major, minor, revision;
306     uint32_t jobfam;
307     opal_value_t kv;
308     opal_process_name_t ldr;
309     char nmtmp[64];
310     char *str, **localranks = NULL;
311     opal_process_name_t name;
312 
313     ++pmix_init_count;
314 
315     /* if we can't startup PMI, we can't be used */
316     if ( PMI2_Initialized () ) {
317         opal_output_verbose(10, opal_pmix_base_framework.framework_output,
318                         "%s pmix:cray: pmi already initialized",
319                         OPAL_NAME_PRINT(pmix_pname));
320         return OPAL_SUCCESS;
321     }
322     size = -1;
323     rank = -1;
324     appnum = -1;
325     if (PMI_SUCCESS != (rc = PMI2_Init(&spawned, &size, &rank, &appnum))) {
326         opal_show_help("help-pmix-base.txt", "pmix2-init-failed", true, rc);
327         return OPAL_ERROR;
328     }
329     if( size < 0 || rank < 0 ){
330         opal_show_help("help-pmix-base.txt", "pmix2-init-returned-bad-values", true);
331         goto err_exit;
332     }
333 
334     pmix_size = size;
335     pmix_rank = rank;
336     pmix_appnum = appnum;
337 
338     pmix_vallen_max = PMI2_MAX_VALLEN;
339     pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility?
340     pmix_keylen_max = PMI2_MAX_KEYLEN;
341     pmix_vallen_threshold = PMI2_MAX_VALLEN * 3;
342     pmix_vallen_threshold >>= 2;
343 
344     /*
345      * get the version info
346      */
347 
348     if (PMI_SUCCESS != PMI_Get_version_info(&major,&minor,&revision)) {
349         return OPAL_ERROR;
350     }
351 
352     snprintf(cray_pmi_version, sizeof(cray_pmi_version),
353              "%d.%d.%d", major, minor, revision);
354 
355     pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
356     if( pmix_kvs_name == NULL ){
357          PMI2_Finalize();
358          ret = OPAL_ERR_OUT_OF_RESOURCE;
359          goto err_exit;
360     }
361 
362     rc = PMI2_Job_GetId(pmix_kvs_name, pmix_kvslen_max);
363     if( PMI_SUCCESS != rc ) {
364         OPAL_PMI_ERROR(rc, "PMI2_Job_GetId");
365         goto err_exit;
366     }
367 
368     rc = sscanf(pmix_kvs_name,"kvs_%u",&jobfam);
369     if (rc != 1) {
370         opal_output_verbose(10, opal_pmix_base_framework.framework_output,
371                            "%s pmix:cray: pmix_kvs_name %s",
372                             OPAL_NAME_PRINT(pmix_pname), pmix_kvs_name);
373         rc = OPAL_ERROR;
374         goto err_exit;
375     }
376 
377     pmix_jobid = jobfam << 16;
378 
379     /* store our name in the opal_proc_t so that
380      * debug messages will make sense - an upper
381      * layer will eventually overwrite it, but that
382      * won't do any harm */
383     pmix_pname.jobid = pmix_jobid;
384     pmix_pname.vpid = pmix_rank;
385     opal_proc_set_name(&pmix_pname);
386     opal_output_verbose(10, opal_pmix_base_framework.framework_output,
387                         "%s pmix:cray: assigned tmp name %d %d pmix_kvs_name %s",
388                         OPAL_NAME_PRINT(pmix_pname),pmix_pname.jobid,pmix_pname.vpid,pmix_kvs_name);
389 
390     pmapping = (char*)malloc(PMI2_MAX_VALLEN);
391     if( pmapping == NULL ){
392         rc = OPAL_ERR_OUT_OF_RESOURCE;
393         OPAL_ERROR_LOG(rc);
394         return rc;
395     }
396 
397     rc = PMI2_Info_GetJobAttr("PMI_process_mapping", pmapping, PMI2_MAX_VALLEN, &found);
398     if( !found || PMI_SUCCESS != rc ) {
399         OPAL_PMI_ERROR(rc,"PMI2_Info_GetJobAttr");
400         return OPAL_ERROR;
401     }
402 
403     pmix_lranks = pmix_cray_parse_pmap(pmapping, pmix_rank, &my_node, &pmix_nlranks);
404     if (NULL == pmix_lranks) {
405         rc = OPAL_ERR_OUT_OF_RESOURCE;
406         OPAL_ERROR_LOG(rc);
407         return rc;
408     }
409 
410     free(pmapping);
411 
412     // setup hash table
413     opal_pmix_base_hash_init();
414 
415     /* setup a name for retrieving data associated with the job */
416     name.jobid = pmix_jobid;
417     name.vpid = OPAL_VPID_WILDCARD;
418 
419     /* save the job size */
420     OBJ_CONSTRUCT(&kv, opal_value_t);
421     kv.key = strdup(OPAL_PMIX_JOB_SIZE);
422     kv.type = OPAL_UINT32;
423     kv.data.uint32 = pmix_size;
424     if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&name, &kv))) {
425         OPAL_ERROR_LOG(rc);
426         OBJ_DESTRUCT(&kv);
427         goto err_exit;
428     }
429     OBJ_DESTRUCT(&kv);
430 
431     /* save the appnum */
432     OBJ_CONSTRUCT(&kv, opal_value_t);
433     kv.key = strdup(OPAL_PMIX_APPNUM);
434     kv.type = OPAL_UINT32;
435     kv.data.uint32 = pmix_appnum;
436     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
437         OPAL_ERROR_LOG(ret);
438         OBJ_DESTRUCT(&kv);
439         goto err_exit;
440     }
441     OBJ_DESTRUCT(&kv);
442 
443     rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found);
444     if( PMI_SUCCESS != rc ) {
445         OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
446         goto err_exit;
447     }
448 
449     pmix_usize = atoi(buf);
450 
451     OBJ_CONSTRUCT(&kv, opal_value_t);
452     kv.key = strdup(OPAL_PMIX_UNIV_SIZE);
453     kv.type = OPAL_UINT32;
454     kv.data.uint32 = pmix_usize;
455     if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
456         OPAL_ERROR_LOG(rc);
457         OBJ_DESTRUCT(&kv);
458         goto err_exit;
459     }
460     OBJ_DESTRUCT(&kv);
461 
462     /* push this into the dstore for subsequent fetches */
463     OBJ_CONSTRUCT(&kv, opal_value_t);
464     kv.key = strdup(OPAL_PMIX_MAX_PROCS);
465     kv.type = OPAL_UINT32;
466     kv.data.uint32 = pmix_usize;
467     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
468         OPAL_ERROR_LOG(ret);
469         OBJ_DESTRUCT(&kv);
470         goto err_exit;
471     }
472     OBJ_DESTRUCT(&kv);
473 
474     OBJ_CONSTRUCT(&kv, opal_value_t);
475     kv.key = strdup(OPAL_PMIX_JOBID);
476     kv.type = OPAL_UINT32;
477     kv.data.uint32 = pmix_jobid;
478     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
479         OPAL_ERROR_LOG(ret);
480         OBJ_DESTRUCT(&kv);
481         goto err_exit;
482     }
483     OBJ_DESTRUCT(&kv);
484 
485     /* save the local size */
486     OBJ_CONSTRUCT(&kv, opal_value_t);
487     kv.key = strdup(OPAL_PMIX_LOCAL_SIZE);
488     kv.type = OPAL_UINT32;
489     kv.data.uint32 = pmix_nlranks;
490     if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&name, &kv))) {
491         OPAL_ERROR_LOG(rc);
492         OBJ_DESTRUCT(&kv);
493         goto err_exit;
494     }
495     OBJ_DESTRUCT(&kv);
496 
497     ldr.vpid = pmix_lranks[0];
498     ldr.jobid = pmix_pname.jobid;
499 
500     /* find ourselves and build up a string for local peer info */
501     memset(nmtmp, 0, 64);
502     for (i=0; i < pmix_nlranks; i++) {
503         ret = snprintf(nmtmp, 64, "%d", pmix_lranks[i]);
504         opal_argv_append_nosize(&localranks, nmtmp);
505         if (pmix_rank == pmix_lranks[i]) {
506             pmix_lrank = i;
507             pmix_nrank = i;
508         }
509     }
510 
511     str = opal_argv_join(localranks, ',');
512     opal_argv_free(localranks);
513 
514     OBJ_CONSTRUCT(&kv, opal_value_t);
515     kv.key = strdup(OPAL_PMIX_LOCAL_PEERS);
516     kv.type = OPAL_STRING;
517     kv.data.string = str;
518     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
519         OPAL_ERROR_LOG(ret);
520         OBJ_DESTRUCT(&kv);
521         goto err_exit;
522     }
523     OBJ_DESTRUCT(&kv);
524 
525     /* save the local leader */
526     OBJ_CONSTRUCT(&kv, opal_value_t);
527     kv.key = strdup(OPAL_PMIX_LOCALLDR);
528     kv.type = OPAL_UINT64;
529     kv.data.uint64 = *(uint64_t*)&ldr;
530     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
531         OPAL_ERROR_LOG(ret);
532         OBJ_DESTRUCT(&kv);
533         goto err_exit;
534     }
535 
536     /* save our local rank */
537     OBJ_CONSTRUCT(&kv, opal_value_t);
538     kv.key = strdup(OPAL_PMIX_LOCAL_RANK);
539     kv.type = OPAL_UINT16;
540     kv.data.uint16 = pmix_lrank;
541     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
542         OPAL_ERROR_LOG(ret);
543         OBJ_DESTRUCT(&kv);
544         goto err_exit;
545     }
546 
547     /* and our node rank */
548     OBJ_CONSTRUCT(&kv, opal_value_t);
549     kv.key = strdup(OPAL_PMIX_NODE_RANK);
550     kv.type = OPAL_UINT16;
551     kv.data.uint16 = pmix_nrank;
552     if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
553         OPAL_ERROR_LOG(ret);
554         OBJ_DESTRUCT(&kv);
555         goto err_exit;
556     }
557     OBJ_DESTRUCT(&kv);
558 
559     cray_get_more_info();
560 
561     return OPAL_SUCCESS;
562 err_exit:
563     PMI2_Finalize();
564     return ret;
565 }
566 
cray_fini(void)567 static int cray_fini(void) {
568 
569     if (0 == pmix_init_count) {
570         return OPAL_SUCCESS;
571     }
572 
573     if (0 == --pmix_init_count) {
574 
575         opal_output_verbose(10, opal_pmix_base_framework.framework_output,
576                         "%s pmix:cray: calling PMI2_Finalize",
577                         OPAL_NAME_PRINT(pmix_pname));
578 
579         PMI2_Finalize();
580 
581         if (NULL != pmix_kvs_name) {
582             free(pmix_kvs_name);
583             pmix_kvs_name = NULL;
584         }
585 
586         if (NULL != pmix_lranks) {
587             free(pmix_lranks);
588             pmix_lranks = NULL;
589         }
590     }
591 
592     return OPAL_SUCCESS;
593 }
594 
cray_initialized(void)595 static int cray_initialized(void)
596 {
597     if (0 < pmix_init_count) {
598         return 1;
599     }
600     return 0;
601 }
602 
cray_abort(int flag,const char * msg,opal_list_t * procs)603 static int cray_abort(int flag, const char *msg,
604                       opal_list_t *procs)
605 {
606     PMI2_Abort(flag, msg);
607     return OPAL_SUCCESS;
608 }
609 
cray_spawn(opal_list_t * jobinfo,opal_list_t * apps,opal_jobid_t * jobid)610 static int cray_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
611 {
612     return OPAL_ERR_NOT_SUPPORTED;
613 }
614 
cray_spawn_nb(opal_list_t * jobinfo,opal_list_t * apps,opal_pmix_spawn_cbfunc_t cbfunc,void * cbdata)615 static int cray_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
616                          opal_pmix_spawn_cbfunc_t cbfunc,
617                          void *cbdata)
618 {
619     return OPAL_ERR_NOT_SUPPORTED;
620 }
621 
cray_job_connect(opal_list_t * procs)622 static int cray_job_connect(opal_list_t *procs)
623 {
624     return OPAL_ERR_NOT_SUPPORTED;
625 }
626 
cray_job_disconnect(opal_list_t * procs)627 static int cray_job_disconnect(opal_list_t *procs)
628 {
629     return OPAL_ERR_NOT_SUPPORTED;
630 }
631 
cray_job_disconnect_nb(opal_list_t * procs,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)632 static int cray_job_disconnect_nb(opal_list_t *procs,
633                                   opal_pmix_op_cbfunc_t cbfunc,
634                                   void *cbdata)
635 {
636     return OPAL_ERR_NOT_SUPPORTED;
637 }
638 
cray_resolve_peers(const char * nodename,opal_jobid_t jobid,opal_list_t * procs)639 static int cray_resolve_peers(const char *nodename,
640                               opal_jobid_t jobid,
641                               opal_list_t *procs)
642 {
643     return OPAL_ERR_NOT_IMPLEMENTED;
644 }
645 
cray_resolve_nodes(opal_jobid_t jobid,char ** nodelist)646 static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist)
647 {
648     return OPAL_ERR_NOT_IMPLEMENTED;
649 }
650 
cray_put(opal_pmix_scope_t scope,opal_value_t * kv)651 static int cray_put(opal_pmix_scope_t scope,
652                   opal_value_t *kv)
653 {
654     int rc;
655 
656     opal_output_verbose(10, opal_pmix_base_framework.framework_output,
657                         "%s pmix:cray cray_put key %s scope %d\n",
658                          OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope);
659 
660     if (!pmix_init_count) {
661         return OPAL_ERROR;
662     }
663 
664     /*
665      * for now just always just global cache
666      */
667 
668     if (NULL == mca_pmix_cray_component.cache_global) {
669         mca_pmix_cray_component.cache_global = OBJ_NEW(opal_buffer_t);
670     }
671 
672     opal_output_verbose(20, opal_pmix_base_framework.framework_output,
673                         "%s pmix:cray put global data for key %s type %d",
674                          OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, kv->type);
675     if (OPAL_SUCCESS != (rc = opal_dss.pack(mca_pmix_cray_component.cache_global, &kv, 1, OPAL_VALUE))) {
676         OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.pack returned error");
677         OPAL_ERROR_LOG(rc);
678     }
679 
680     return rc;
681 }
682 
cray_commit(void)683 static int cray_commit(void)
684 {
685     return OPAL_SUCCESS;
686 }
687 
fencenb(int sd,short args,void * cbdata)688 static void fencenb(int sd, short args, void *cbdata)
689 {
690     pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
691     int rc, cnt;
692     int32_t i;
693     int *all_lens = NULL;
694     opal_value_t *kp, kvn;
695     opal_buffer_t *send_buffer = NULL;
696     opal_buffer_t *buf = NULL;
697     void *sbuf_ptr;
698     char *cptr, *rcv_buff = NULL;
699     opal_process_name_t id;
700     typedef struct {
701         uint32_t pmix_rank;
702         opal_process_name_t name;
703         int32_t nbytes;
704     } bytes_and_rank_t;
705     int32_t rcv_nbytes_tot;
706     bytes_and_rank_t s_bytes_and_rank;
707     bytes_and_rank_t *r_bytes_and_ranks = NULL;
708     opal_hwloc_locality_t locality;
709     opal_list_t vals;
710     char *cpuset = NULL;
711 
712     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
713                         "%s pmix:cray executing fence cache_global %p cache_local %p",
714                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
715                         (void *)mca_pmix_cray_component.cache_global,
716                         (void *)mca_pmix_cray_component.cache_local);
717 
718 
719     /*
720      * "unload" the cache_local/cache_global buffers, first copy
721      * it so we can continue to use the local buffers if further
722      * calls to put can be made
723      */
724 
725     send_buffer = OBJ_NEW(opal_buffer_t);
726     if (NULL == send_buffer) {
727         rc = OPAL_ERR_OUT_OF_RESOURCE;
728         goto fn_exit;
729     }
730 
731     opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global);
732     opal_dss.unload(send_buffer, &sbuf_ptr, &s_bytes_and_rank.nbytes);
733     s_bytes_and_rank.pmix_rank = pmix_rank;
734     s_bytes_and_rank.name = OPAL_PROC_MY_NAME;
735 
736     r_bytes_and_ranks = (bytes_and_rank_t *)malloc(pmix_size * sizeof(bytes_and_rank_t));
737     if (NULL == r_bytes_and_ranks) {
738         rc = OPAL_ERR_OUT_OF_RESOURCE;
739         goto fn_exit;
740     }
741 
742     /*
743      * gather up all the buffer sizes and rank order.
744      * doing this step below since the cray pmi PMI_Allgather doesn't deliver
745      * the gathered data necessarily in PMI rank order, although the order stays
746      * the same for the duration of a job - assuming no node failures.
747      */
748 
749     if (PMI_SUCCESS != (rc = PMI_Allgather(&s_bytes_and_rank,r_bytes_and_ranks,sizeof(bytes_and_rank_t)))) {
750         OPAL_PMI_ERROR(rc,"PMI_Allgather");
751         rc = OPAL_ERR_COMM_FAILURE;
752         goto fn_exit;
753     }
754 
755 
756     for (rcv_nbytes_tot=0,i=0; i < pmix_size; i++) {
757         rcv_nbytes_tot += r_bytes_and_ranks[i].nbytes;
758     }
759 
760     opal_output_verbose(20, opal_pmix_base_framework.framework_output,
761                         "%s pmix:cray total number of bytes to receive %d",
762                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rcv_nbytes_tot);
763 
764     rcv_buff = (char *) malloc(rcv_nbytes_tot * sizeof(char));
765     if (NULL == rcv_buff) {
766         rc = OPAL_ERR_OUT_OF_RESOURCE;
767         goto fn_exit;
768     }
769 
770     all_lens = (int *)malloc(sizeof(int) * pmix_size);
771     if (NULL == all_lens) {
772         rc = OPAL_ERR_OUT_OF_RESOURCE;
773         goto fn_exit;
774     }
775     for (i=0; i< pmix_size; i++) {
776         all_lens[r_bytes_and_ranks[i].pmix_rank] = r_bytes_and_ranks[i].nbytes;
777     }
778 
779     if (PMI_SUCCESS != (rc = PMI_Allgatherv(sbuf_ptr,s_bytes_and_rank.nbytes,rcv_buff,all_lens))) {
780         OPAL_PMI_ERROR(rc,"PMI_Allgatherv");
781         rc = OPAL_ERR_COMM_FAILURE;
782         goto fn_exit;
783     }
784 
785     OBJ_RELEASE(send_buffer);
786     send_buffer  = NULL;
787 
788     buf = OBJ_NEW(opal_buffer_t);
789     if (buf == NULL) {
790         rc = OPAL_ERR_OUT_OF_RESOURCE;
791         goto fn_exit;
792     }
793 
794     for (cptr = rcv_buff, i=0; i < pmix_size; i++) {
795 
796         id = r_bytes_and_ranks[i].name;
797 
798         buf->base_ptr = NULL;  /* TODO: ugh */
799         if (OPAL_SUCCESS != (rc = opal_dss.load(buf, (void *)cptr, r_bytes_and_ranks[i].nbytes))) {
800             OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.load failed");
801             goto fn_exit;
802         }
803 
804         /* unpack and stuff in to the dstore */
805         cnt = 1;
806         while (OPAL_SUCCESS == (rc = opal_dss.unpack(buf, &kp, &cnt, OPAL_VALUE))) {
807             OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
808                                  "%s pmix:cray unpacked kp with key %s type(%d) for id  %s",
809                                  OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, kp->type, OPAL_NAME_PRINT(id)));
810 
811             if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&id, kp))) {
812                 OPAL_ERROR_LOG(rc);
813                 goto fn_exit;
814             }
815             OBJ_RELEASE(kp);
816             cnt = 1;
817         }
818 
819         cptr += r_bytes_and_ranks[i].nbytes;
820 
821     }
822 
823     buf->base_ptr = NULL;  /* TODO: ugh */
824     OBJ_RELEASE(buf);
825 
826     opal_output_verbose(2, opal_pmix_base_framework.framework_output,
827                         "%s pmix:cray kvs_fence complete",
828                         OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
829 
830     /* fetch my cpuset */
831     OBJ_CONSTRUCT(&vals, opal_list_t);
832     if (OPAL_SUCCESS == (rc = opal_pmix_base_fetch(&pmix_pname,
833                                                    OPAL_PMIX_CPUSET, &vals))) {
834         kp = (opal_value_t*)opal_list_get_first(&vals);
835         cpuset = strdup(kp->data.string);
836     } else {
837         cpuset = NULL;
838     }
839     OPAL_LIST_DESTRUCT(&vals);
840 
841     /* Get the modex data from each local process and set the
842      * localities to avoid having the MPI layer fetch data
843      * for every process in the job.
844      *
845      *  we only need to set locality for each local rank as "not found"
846      * equates to "non-local"
847      */
848 
849     for (i=0; i < pmix_nlranks; i++) {
850         id.vpid = pmix_lranks[i];
851         id.jobid = pmix_jobid;
852         OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
853                              "%s checking out if %s is local to me",
854                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
855                              OPAL_NAME_PRINT(id)));
856         /* fetch cpuset for this vpid */
857         OBJ_CONSTRUCT(&vals, opal_list_t);
858         if (OPAL_SUCCESS != (rc = opal_pmix_base_fetch(&id,
859                                                     OPAL_PMIX_CPUSET, &vals))) {
860             OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
861                                  "%s cpuset for local proc %s not found",
862                                  OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
863                                  OPAL_NAME_PRINT(id)));
864             OPAL_LIST_DESTRUCT(&vals);
865             /* even though the cpuset wasn't found, we at least know it is
866              * on the same node with us */
867             locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
868         } else {
869             kp = (opal_value_t*)opal_list_get_first(&vals);
870             if (NULL == kp->data.string) {
871                 /* if we share a node, but we don't know anything more, then
872                  * mark us as on the node as this is all we know
873                  */
874                 locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
875             } else {
876                 /* determine relative location on our node */
877                 locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
878                                                                  cpuset,
879                                                                  kp->data.string);
880             }
881             OPAL_LIST_DESTRUCT(&vals);
882         }
883         OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output,
884                              "%s pmix:cray proc %s locality %s",
885                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
886                              OPAL_NAME_PRINT(id),
887                              opal_hwloc_base_print_locality(locality)));
888 
889         OBJ_CONSTRUCT(&kvn, opal_value_t);
890         kvn.key = strdup(OPAL_PMIX_LOCALITY);
891         kvn.type = OPAL_UINT16;
892         kvn.data.uint16 = locality;
893         opal_pmix_base_store(&id, &kvn);
894         OBJ_DESTRUCT(&kvn);
895     }
896 
897 fn_exit:
898     if (NULL != cpuset) {
899         free(cpuset);
900     }
901     if (all_lens != NULL) {
902         free(all_lens);
903     }
904     if (rcv_buff != NULL) {
905         free(rcv_buff);
906     }
907     if (r_bytes_and_ranks != NULL) {
908         free(r_bytes_and_ranks);
909     }
910     if (NULL != op->opcbfunc) {
911         op->opcbfunc(rc, op->cbdata);
912     }
913     OBJ_RELEASE(op);
914     return;
915 }
916 
fence_release(int status,void * cbdata)917 static void fence_release(int status, void *cbdata)
918 {
919     struct fence_result *res = (struct fence_result*)cbdata;
920     res->status = status;
921     opal_atomic_wmb();
922     res->flag = 0;
923 }
924 
cray_fence(opal_list_t * procs,int collect_data)925 static int cray_fence(opal_list_t *procs, int collect_data)
926 {
927     struct fence_result result = { 1, OPAL_SUCCESS };
928     cray_fencenb(procs, collect_data, fence_release, (void*)&result);
929     CRAY_WAIT_FOR_COMPLETION(result.flag);
930     return result.status;
931 }
932 
933 
cray_fencenb(opal_list_t * procs,int collect_data,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)934 static int cray_fencenb(opal_list_t *procs, int collect_data,
935                       opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
936 {
937     pmi_opcaddy_t *op;
938 
939     /* thread-shift this so we don't block in Cray's barrier */
940     op = OBJ_NEW(pmi_opcaddy_t);
941     op->opcbfunc = cbfunc;
942     op->cbdata = cbdata;
943     event_assign(&op->ev, opal_pmix_base.evbase, -1,
944                  EV_WRITE, fencenb, op);
945     event_active(&op->ev, EV_WRITE, 1);
946 
947     return OPAL_SUCCESS;
948 }
949 
cray_get(const opal_process_name_t * id,const char * key,opal_list_t * info,opal_value_t ** kv)950 static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, opal_value_t **kv)
951 {
952     int rc;
953     opal_list_t vals;
954 
955     OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
956                          "%s pmix:cray getting value for proc %s key %s",
957                          OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
958                          OPAL_NAME_PRINT(*id), key));
959 
960     OBJ_CONSTRUCT(&vals, opal_list_t);
961     rc = opal_pmix_base_fetch(id, key, &vals);
962     if (OPAL_SUCCESS == rc) {
963         *kv = (opal_value_t*)opal_list_remove_first(&vals);
964         return OPAL_SUCCESS;
965     } else {
966         OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
967                              "%s pmix:cray fetch from dstore failed: %d",
968                              OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc));
969     }
970     OPAL_LIST_DESTRUCT(&vals);
971 
972     return rc;
973 
974 }
975 
cray_get_nb(const opal_process_name_t * id,const char * key,opal_list_t * info,opal_pmix_value_cbfunc_t cbfunc,void * cbdata)976 static int cray_get_nb(const opal_process_name_t *id, const char *key,
977                        opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
978 {
979     return OPAL_ERR_NOT_IMPLEMENTED;
980 }
981 
cray_publish(opal_list_t * info)982 static int cray_publish(opal_list_t *info)
983 {
984     return OPAL_ERR_NOT_SUPPORTED;
985 }
986 
cray_publish_nb(opal_list_t * info,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)987 static int cray_publish_nb(opal_list_t *info,
988                            opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
989 {
990     return OPAL_ERR_NOT_SUPPORTED;
991 }
992 
cray_lookup(opal_list_t * data,opal_list_t * info)993 static int cray_lookup(opal_list_t *data, opal_list_t *info)
994 {
995     return OPAL_ERR_NOT_SUPPORTED;
996 }
997 
cray_lookup_nb(char ** keys,opal_list_t * info,opal_pmix_lookup_cbfunc_t cbfunc,void * cbdata)998 static int cray_lookup_nb(char **keys, opal_list_t *info,
999                           opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
1000 {
1001     return OPAL_ERR_NOT_SUPPORTED;
1002 }
1003 
cray_unpublish(char ** keys,opal_list_t * info)1004 static int cray_unpublish(char **keys, opal_list_t *info)
1005 {
1006     return OPAL_ERR_NOT_SUPPORTED;
1007 }
1008 
cray_unpublish_nb(char ** keys,opal_list_t * info,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)1009 static int cray_unpublish_nb(char **keys, opal_list_t *info,
1010                             opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
1011 {
1012     return OPAL_ERR_NOT_SUPPORTED;
1013 }
1014 
cray_get_version(void)1015 static const char *cray_get_version(void)
1016 {
1017     return cray_pmi_version;
1018 }
1019 
cray_store_local(const opal_process_name_t * proc,opal_value_t * val)1020 static int cray_store_local(const opal_process_name_t *proc,
1021                           opal_value_t *val)
1022 {
1023     opal_pmix_base_store(proc, val);
1024 
1025     return OPAL_SUCCESS;
1026 }
1027 
cray_get_nspace(opal_jobid_t jobid)1028 static const char *cray_get_nspace(opal_jobid_t jobid)
1029 {
1030     return "N/A";
1031 }
1032 
cray_register_jobid(opal_jobid_t jobid,const char * nspace)1033 static void cray_register_jobid(opal_jobid_t jobid, const char *nspace)
1034 {
1035     return;
1036 }
1037 
pmix_error(int pmix_err)1038 static char* pmix_error(int pmix_err)
1039 {
1040     char * err_msg;
1041 
1042     switch(pmix_err) {
1043         case PMI_FAIL: err_msg = "Operation failed"; break;
1044         case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
1045         case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
1046         case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
1047         case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
1048         case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
1049         case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
1050         case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
1051         case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
1052         case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
1053         case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
1054         case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
1055         case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
1056         case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
1057 #if defined(PMI_ERR_INVALID_KVS)
1058         /* pmi.h calls this a valid return code but mpich doesn't define it (slurm does). */
1059         case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
1060 #endif
1061         case PMI_SUCCESS: err_msg = "Success"; break;
1062         default: err_msg = "Unkown error";
1063     }
1064     return err_msg;
1065 }
1066