1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2007 The Trustees of Indiana University.
4 * All rights reserved.
5 * Copyright (c) 2011-2016 Cisco Systems, Inc. All rights reserved.
6 * Copyright (c) 2011-2017 Los Alamos National Security, LLC. All
7 * rights reserved.
8 * Copyright (c) 2013-2017 Intel, Inc. All rights reserved.
9 * Copyright (c) 2014-2016 Research Organization for Information Science
10 * and Technology (RIST). All rights reserved.
11 * $COPYRIGHT$
12 *
13 * Additional copyrights may follow
14 *
15 * $HEADER$
16 */
17
18 #include "opal_config.h"
19 #include "opal/constants.h"
20 #include "opal/types.h"
21 #include "opal_stdint.h"
22 #include "opal/mca/hwloc/base/base.h"
23 #include "opal/util/argv.h"
24 #include "opal/util/opal_environ.h"
25 #include "opal/util/output.h"
26 #include "opal/util/proc.h"
27 #include "opal/util/output.h"
28 #include "opal/util/show_help.h"
29 #include "opal/util/opal_getcwd.h"
30 #include "opal/constants.h"
31 #include "opal/mca/pmix/base/base.h"
32 #include "opal/mca/pmix/base/pmix_base_hash.h"
33 #include "pmix_cray.h"
34
35 static char cray_pmi_version[128];
36
37 static int cray_init(opal_list_t *ilist);
38 static int cray_fini(void);
39 static int cray_initialized(void);
40 static int cray_abort(int flat, const char *msg,
41 opal_list_t *procs);
42 static int cray_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid);
43 static int cray_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
44 opal_pmix_spawn_cbfunc_t cbfunc,
45 void *cbdata);
46 static int cray_job_connect(opal_list_t *procs);
47 static int cray_job_disconnect(opal_list_t *procs);
48 static int cray_job_disconnect_nb(opal_list_t *procs,
49 opal_pmix_op_cbfunc_t cbfunc,
50 void *cbdata);
51 static int cray_resolve_peers(const char *nodename,
52 opal_jobid_t jobid,
53 opal_list_t *procs);
54 static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist);
55 static int cray_put(opal_pmix_scope_t scope, opal_value_t *kv);
56 static int cray_fence(opal_list_t *procs, int collect_data);
57 static int cray_fencenb(opal_list_t *procs, int collect_data,
58 opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
59 static int cray_commit(void);
60 static int cray_get(const opal_process_name_t *id,
61 const char *key, opal_list_t *info,
62 opal_value_t **kv);
63 static int cray_get_nb(const opal_process_name_t *id, const char *key,
64 opal_list_t *info,
65 opal_pmix_value_cbfunc_t cbfunc, void *cbdata);
66 static int cray_publish(opal_list_t *info);
67 static int cray_publish_nb(opal_list_t *info,
68 opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
69 static int cray_lookup(opal_list_t *data, opal_list_t *info);
70 static int cray_lookup_nb(char **keys, opal_list_t *info,
71 opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata);
72 static int cray_unpublish(char **keys, opal_list_t *info);
73 static int cray_unpublish_nb(char **keys, opal_list_t *info,
74 opal_pmix_op_cbfunc_t cbfunc, void *cbdata);
75 static const char *cray_get_version(void);
76 static int cray_store_local(const opal_process_name_t *proc,
77 opal_value_t *val);
78 static const char *cray_get_nspace(opal_jobid_t jobid);
79 static void cray_register_jobid(opal_jobid_t jobid, const char *nspace);
80
81 #if 0
82 static bool cray_get_attr(const char *attr, opal_value_t **kv);
83 #endif
84
85 const opal_pmix_base_module_t opal_pmix_cray_module = {
86 .init = cray_init,
87 .finalize = cray_fini,
88 .initialized = cray_initialized,
89 .abort = cray_abort,
90 .commit = cray_commit,
91 .fence = cray_fence,
92 .fence_nb = cray_fencenb,
93 .put = cray_put,
94 .get = cray_get,
95 .get_nb = cray_get_nb,
96 .publish = cray_publish,
97 .publish_nb = cray_publish_nb,
98 .lookup = cray_lookup,
99 .lookup_nb = cray_lookup_nb,
100 .unpublish = cray_unpublish,
101 .unpublish_nb = cray_unpublish_nb,
102 .spawn = cray_spawn,
103 .spawn_nb = cray_spawn_nb,
104 .connect = cray_job_connect,
105 .disconnect = cray_job_disconnect,
106 .disconnect_nb = cray_job_disconnect_nb,
107 .resolve_peers = cray_resolve_peers,
108 .resolve_nodes = cray_resolve_nodes,
109 .get_version = cray_get_version,
110 .register_evhandler = opal_pmix_base_register_handler,
111 .deregister_evhandler = opal_pmix_base_deregister_handler,
112 .store_local = cray_store_local,
113 .get_nspace = cray_get_nspace,
114 .register_jobid = cray_register_jobid
115 };
116
117 // usage accounting
118 static int pmix_init_count = 0;
119
120 // local object
121 typedef struct {
122 opal_object_t super;
123 opal_event_t ev;
124 opal_pmix_op_cbfunc_t opcbfunc;
125 void *cbdata;
126 } pmi_opcaddy_t;
127 static OBJ_CLASS_INSTANCE(pmi_opcaddy_t,
128 opal_object_t,
129 NULL, NULL);
130
131 struct fence_result {
132 volatile int flag;
133 int status;
134 };
135
136 // PMI constant values:
137 static int pmix_kvslen_max = 0;
138 static int pmix_keylen_max = 0;
139 static int pmix_vallen_max = 0;
140 static int pmix_vallen_threshold = INT_MAX;
141
142 // Job environment description
143 static int pmix_size = 0;
144 static int pmix_rank = 0;
145 static int pmix_lrank = 0;
146 static int pmix_nrank = 0;
147 static int pmix_nlranks = 0;
148 static int pmix_appnum = 0;
149 static int pmix_usize = 0;
150 static char *pmix_kvs_name = NULL;
151 static int *pmix_lranks = NULL;
152 static opal_process_name_t pmix_pname;
153 static uint32_t pmix_jobid = -1;
154
155 static char* pmix_error(int pmix_err);
156 #define OPAL_PMI_ERROR(pmi_err, pmi_func) \
157 do { \
158 opal_output(0, "%s [%s:%d:%s]: %s\n", \
159 pmi_func, __FILE__, __LINE__, __func__, \
160 pmix_error(pmi_err)); \
161 } while(0);
162
163 #define CRAY_WAIT_FOR_COMPLETION(a) \
164 do { \
165 while ((a)) { \
166 usleep(10); \
167 } \
168 } while (0)
169
cray_get_more_info(void)170 static void cray_get_more_info(void)
171 {
172 int alps_status = 0, i;
173 uint64_t apid;
174 size_t alps_count;
175 int lli_ret = 0, place_ret;
176 alpsAppLayout_t layout;
177 char *npstring;
178 char *firstrankstring;
179 char **nps, **firstranks;
180 int *base_pe_in_app;
181 int *pes_in_app;
182 char pbuf[OPAL_PATH_MAX];
183
184 /*
185 * First get our apid
186 */
187
188 lli_ret = alps_app_lli_lock();
189 if (0 != lli_ret) {
190 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
191 "%s pmix:cray: alps_app_lli_lock returned %d",
192 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
193 goto fn_exit;
194 }
195
196 lli_ret = alps_app_lli_put_request(ALPS_APP_LLI_ALPS_REQ_APID, NULL, 0);
197 if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
198 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
199 "%s pmix:cray: alps_app_lli_put_request - APID returned %d",
200 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
201 goto fn_exit_w_lock;
202 }
203
204 lli_ret = alps_app_lli_get_response (&alps_status, &alps_count);
205 if (ALPS_APP_LLI_ALPS_STAT_OK != alps_status) {
206 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
207 "%s pmix:cray: alps_app_lli_get_response returned %d",
208 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), alps_status));
209 goto fn_exit_w_lock;
210 }
211
212 lli_ret = alps_app_lli_get_response_bytes (&apid, sizeof(apid));
213 if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
214 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
215 "%s pmix:cray: alps_app_lli_get_response_bytes returned %d",
216 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
217 goto fn_exit_w_lock;
218 }
219
220 /*
221 * get some items from alps placement file
222 */
223
224 place_ret = alps_get_placement_info(apid,
225 &layout,
226 NULL,
227 NULL,
228 NULL,
229 NULL,
230 NULL,
231 &base_pe_in_app,
232 &pes_in_app,
233 NULL,
234 NULL);
235 if (1 != place_ret) {
236 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
237 "%s pmix:cray: alps_get_placement_info returned %d (%s)",
238 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), place_ret, strerror(errno)));
239 goto fn_exit;
240 }
241
242 OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
243 "%s pmix:cray: alps_get_placement_info returned %d first pe on node is %d",
244 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), place_ret, layout.firstPe));
245
246 nps = NULL;
247 firstranks = NULL;
248 for (i=0; i < layout.numCmds; i++) {
249 snprintf(pbuf, sizeof(pbuf), "%d", pes_in_app[i]);
250 opal_argv_append_nosize(&nps, pbuf);
251 snprintf(pbuf, sizeof(pbuf), "%d", base_pe_in_app[i]);
252 opal_argv_append_nosize(&firstranks, pbuf);
253 }
254
255 npstring = opal_argv_join(nps, ' ');
256 firstrankstring = opal_argv_join(firstranks, ' ');
257 opal_argv_free(nps);
258 opal_argv_free(firstranks);
259
260 /*
261 * stuff values into environment variables
262 */
263
264 /* add these envars to prep MPI-2 info pre-defined key/values */
265 snprintf(pbuf, sizeof(pbuf), "%d", layout.numCmds);
266 opal_setenv("OMPI_NUM_APP_CTX", pbuf, true, &environ);
267 opal_setenv("OMPI_FIRST_RANKS", firstrankstring, true, &environ);
268 opal_setenv("OMPI_APP_CTX_NUM_PROCS", npstring, true, &environ);
269 free(firstrankstring);
270 free(npstring);
271 free(base_pe_in_app);
272 free(pes_in_app);
273
274 /*
275 * ALPS always starts the application in the directory
276 * where the aprun command was run to do the launch.
277 * For SLURM, we have to check the SLURM_WORKING_DIR env.
278 * variable. If it is set, we can't set wdir since
279 * we can't assume PWD is where we started.
280 */
281 if(getenv("SLURM_WORKING_DIR") == NULL) {
282 opal_getcwd(pbuf, OPAL_PATH_MAX);
283 opal_setenv("OMPI_MCA_initial_wdir", pbuf, true, &environ);
284 }
285
286 fn_exit_w_lock:
287 lli_ret = alps_app_lli_unlock();
288 if (ALPS_APP_LLI_ALPS_STAT_OK != lli_ret) {
289 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
290 "%s pmix:cray: alps_app_lli_unlock returned %d",
291 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), lli_ret));
292 }
293
294 fn_exit:
295 return;
296 }
297
cray_init(opal_list_t * ilist)298 static int cray_init(opal_list_t *ilist)
299 {
300 int i, spawned, size, rank, appnum, my_node;
301 int rc, ret = OPAL_ERROR;
302 char *pmapping = NULL;
303 char buf[PMI2_MAX_ATTRVALUE];
304 int found;
305 int major, minor, revision;
306 uint32_t jobfam;
307 opal_value_t kv;
308 opal_process_name_t ldr;
309 char nmtmp[64];
310 char *str, **localranks = NULL;
311 opal_process_name_t name;
312
313 ++pmix_init_count;
314
315 /* if we can't startup PMI, we can't be used */
316 if ( PMI2_Initialized () ) {
317 opal_output_verbose(10, opal_pmix_base_framework.framework_output,
318 "%s pmix:cray: pmi already initialized",
319 OPAL_NAME_PRINT(pmix_pname));
320 return OPAL_SUCCESS;
321 }
322 size = -1;
323 rank = -1;
324 appnum = -1;
325 if (PMI_SUCCESS != (rc = PMI2_Init(&spawned, &size, &rank, &appnum))) {
326 opal_show_help("help-pmix-base.txt", "pmix2-init-failed", true, rc);
327 return OPAL_ERROR;
328 }
329 if( size < 0 || rank < 0 ){
330 opal_show_help("help-pmix-base.txt", "pmix2-init-returned-bad-values", true);
331 goto err_exit;
332 }
333
334 pmix_size = size;
335 pmix_rank = rank;
336 pmix_appnum = appnum;
337
338 pmix_vallen_max = PMI2_MAX_VALLEN;
339 pmix_kvslen_max = PMI2_MAX_VALLEN; // FIX ME: What to put here for versatility?
340 pmix_keylen_max = PMI2_MAX_KEYLEN;
341 pmix_vallen_threshold = PMI2_MAX_VALLEN * 3;
342 pmix_vallen_threshold >>= 2;
343
344 /*
345 * get the version info
346 */
347
348 if (PMI_SUCCESS != PMI_Get_version_info(&major,&minor,&revision)) {
349 return OPAL_ERROR;
350 }
351
352 snprintf(cray_pmi_version, sizeof(cray_pmi_version),
353 "%d.%d.%d", major, minor, revision);
354
355 pmix_kvs_name = (char*)malloc(pmix_kvslen_max);
356 if( pmix_kvs_name == NULL ){
357 PMI2_Finalize();
358 ret = OPAL_ERR_OUT_OF_RESOURCE;
359 goto err_exit;
360 }
361
362 rc = PMI2_Job_GetId(pmix_kvs_name, pmix_kvslen_max);
363 if( PMI_SUCCESS != rc ) {
364 OPAL_PMI_ERROR(rc, "PMI2_Job_GetId");
365 goto err_exit;
366 }
367
368 rc = sscanf(pmix_kvs_name,"kvs_%u",&jobfam);
369 if (rc != 1) {
370 opal_output_verbose(10, opal_pmix_base_framework.framework_output,
371 "%s pmix:cray: pmix_kvs_name %s",
372 OPAL_NAME_PRINT(pmix_pname), pmix_kvs_name);
373 rc = OPAL_ERROR;
374 goto err_exit;
375 }
376
377 pmix_jobid = jobfam << 16;
378
379 /* store our name in the opal_proc_t so that
380 * debug messages will make sense - an upper
381 * layer will eventually overwrite it, but that
382 * won't do any harm */
383 pmix_pname.jobid = pmix_jobid;
384 pmix_pname.vpid = pmix_rank;
385 opal_proc_set_name(&pmix_pname);
386 opal_output_verbose(10, opal_pmix_base_framework.framework_output,
387 "%s pmix:cray: assigned tmp name %d %d pmix_kvs_name %s",
388 OPAL_NAME_PRINT(pmix_pname),pmix_pname.jobid,pmix_pname.vpid,pmix_kvs_name);
389
390 pmapping = (char*)malloc(PMI2_MAX_VALLEN);
391 if( pmapping == NULL ){
392 rc = OPAL_ERR_OUT_OF_RESOURCE;
393 OPAL_ERROR_LOG(rc);
394 return rc;
395 }
396
397 rc = PMI2_Info_GetJobAttr("PMI_process_mapping", pmapping, PMI2_MAX_VALLEN, &found);
398 if( !found || PMI_SUCCESS != rc ) {
399 OPAL_PMI_ERROR(rc,"PMI2_Info_GetJobAttr");
400 return OPAL_ERROR;
401 }
402
403 pmix_lranks = pmix_cray_parse_pmap(pmapping, pmix_rank, &my_node, &pmix_nlranks);
404 if (NULL == pmix_lranks) {
405 rc = OPAL_ERR_OUT_OF_RESOURCE;
406 OPAL_ERROR_LOG(rc);
407 return rc;
408 }
409
410 free(pmapping);
411
412 // setup hash table
413 opal_pmix_base_hash_init();
414
415 /* setup a name for retrieving data associated with the job */
416 name.jobid = pmix_jobid;
417 name.vpid = OPAL_VPID_WILDCARD;
418
419 /* save the job size */
420 OBJ_CONSTRUCT(&kv, opal_value_t);
421 kv.key = strdup(OPAL_PMIX_JOB_SIZE);
422 kv.type = OPAL_UINT32;
423 kv.data.uint32 = pmix_size;
424 if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&name, &kv))) {
425 OPAL_ERROR_LOG(rc);
426 OBJ_DESTRUCT(&kv);
427 goto err_exit;
428 }
429 OBJ_DESTRUCT(&kv);
430
431 /* save the appnum */
432 OBJ_CONSTRUCT(&kv, opal_value_t);
433 kv.key = strdup(OPAL_PMIX_APPNUM);
434 kv.type = OPAL_UINT32;
435 kv.data.uint32 = pmix_appnum;
436 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
437 OPAL_ERROR_LOG(ret);
438 OBJ_DESTRUCT(&kv);
439 goto err_exit;
440 }
441 OBJ_DESTRUCT(&kv);
442
443 rc = PMI2_Info_GetJobAttr("universeSize", buf, 16, &found);
444 if( PMI_SUCCESS != rc ) {
445 OPAL_PMI_ERROR(rc, "PMI_Get_universe_size");
446 goto err_exit;
447 }
448
449 pmix_usize = atoi(buf);
450
451 OBJ_CONSTRUCT(&kv, opal_value_t);
452 kv.key = strdup(OPAL_PMIX_UNIV_SIZE);
453 kv.type = OPAL_UINT32;
454 kv.data.uint32 = pmix_usize;
455 if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
456 OPAL_ERROR_LOG(rc);
457 OBJ_DESTRUCT(&kv);
458 goto err_exit;
459 }
460 OBJ_DESTRUCT(&kv);
461
462 /* push this into the dstore for subsequent fetches */
463 OBJ_CONSTRUCT(&kv, opal_value_t);
464 kv.key = strdup(OPAL_PMIX_MAX_PROCS);
465 kv.type = OPAL_UINT32;
466 kv.data.uint32 = pmix_usize;
467 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
468 OPAL_ERROR_LOG(ret);
469 OBJ_DESTRUCT(&kv);
470 goto err_exit;
471 }
472 OBJ_DESTRUCT(&kv);
473
474 OBJ_CONSTRUCT(&kv, opal_value_t);
475 kv.key = strdup(OPAL_PMIX_JOBID);
476 kv.type = OPAL_UINT32;
477 kv.data.uint32 = pmix_jobid;
478 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
479 OPAL_ERROR_LOG(ret);
480 OBJ_DESTRUCT(&kv);
481 goto err_exit;
482 }
483 OBJ_DESTRUCT(&kv);
484
485 /* save the local size */
486 OBJ_CONSTRUCT(&kv, opal_value_t);
487 kv.key = strdup(OPAL_PMIX_LOCAL_SIZE);
488 kv.type = OPAL_UINT32;
489 kv.data.uint32 = pmix_nlranks;
490 if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&name, &kv))) {
491 OPAL_ERROR_LOG(rc);
492 OBJ_DESTRUCT(&kv);
493 goto err_exit;
494 }
495 OBJ_DESTRUCT(&kv);
496
497 ldr.vpid = pmix_lranks[0];
498 ldr.jobid = pmix_pname.jobid;
499
500 /* find ourselves and build up a string for local peer info */
501 memset(nmtmp, 0, 64);
502 for (i=0; i < pmix_nlranks; i++) {
503 ret = snprintf(nmtmp, 64, "%d", pmix_lranks[i]);
504 opal_argv_append_nosize(&localranks, nmtmp);
505 if (pmix_rank == pmix_lranks[i]) {
506 pmix_lrank = i;
507 pmix_nrank = i;
508 }
509 }
510
511 str = opal_argv_join(localranks, ',');
512 opal_argv_free(localranks);
513
514 OBJ_CONSTRUCT(&kv, opal_value_t);
515 kv.key = strdup(OPAL_PMIX_LOCAL_PEERS);
516 kv.type = OPAL_STRING;
517 kv.data.string = str;
518 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
519 OPAL_ERROR_LOG(ret);
520 OBJ_DESTRUCT(&kv);
521 goto err_exit;
522 }
523 OBJ_DESTRUCT(&kv);
524
525 /* save the local leader */
526 OBJ_CONSTRUCT(&kv, opal_value_t);
527 kv.key = strdup(OPAL_PMIX_LOCALLDR);
528 kv.type = OPAL_UINT64;
529 kv.data.uint64 = *(uint64_t*)&ldr;
530 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&name, &kv))) {
531 OPAL_ERROR_LOG(ret);
532 OBJ_DESTRUCT(&kv);
533 goto err_exit;
534 }
535
536 /* save our local rank */
537 OBJ_CONSTRUCT(&kv, opal_value_t);
538 kv.key = strdup(OPAL_PMIX_LOCAL_RANK);
539 kv.type = OPAL_UINT16;
540 kv.data.uint16 = pmix_lrank;
541 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
542 OPAL_ERROR_LOG(ret);
543 OBJ_DESTRUCT(&kv);
544 goto err_exit;
545 }
546
547 /* and our node rank */
548 OBJ_CONSTRUCT(&kv, opal_value_t);
549 kv.key = strdup(OPAL_PMIX_NODE_RANK);
550 kv.type = OPAL_UINT16;
551 kv.data.uint16 = pmix_nrank;
552 if (OPAL_SUCCESS != (ret = opal_pmix_base_store(&OPAL_PROC_MY_NAME, &kv))) {
553 OPAL_ERROR_LOG(ret);
554 OBJ_DESTRUCT(&kv);
555 goto err_exit;
556 }
557 OBJ_DESTRUCT(&kv);
558
559 cray_get_more_info();
560
561 return OPAL_SUCCESS;
562 err_exit:
563 PMI2_Finalize();
564 return ret;
565 }
566
cray_fini(void)567 static int cray_fini(void) {
568
569 if (0 == pmix_init_count) {
570 return OPAL_SUCCESS;
571 }
572
573 if (0 == --pmix_init_count) {
574
575 opal_output_verbose(10, opal_pmix_base_framework.framework_output,
576 "%s pmix:cray: calling PMI2_Finalize",
577 OPAL_NAME_PRINT(pmix_pname));
578
579 PMI2_Finalize();
580
581 if (NULL != pmix_kvs_name) {
582 free(pmix_kvs_name);
583 pmix_kvs_name = NULL;
584 }
585
586 if (NULL != pmix_lranks) {
587 free(pmix_lranks);
588 pmix_lranks = NULL;
589 }
590 }
591
592 return OPAL_SUCCESS;
593 }
594
cray_initialized(void)595 static int cray_initialized(void)
596 {
597 if (0 < pmix_init_count) {
598 return 1;
599 }
600 return 0;
601 }
602
cray_abort(int flag,const char * msg,opal_list_t * procs)603 static int cray_abort(int flag, const char *msg,
604 opal_list_t *procs)
605 {
606 PMI2_Abort(flag, msg);
607 return OPAL_SUCCESS;
608 }
609
cray_spawn(opal_list_t * jobinfo,opal_list_t * apps,opal_jobid_t * jobid)610 static int cray_spawn(opal_list_t *jobinfo, opal_list_t *apps, opal_jobid_t *jobid)
611 {
612 return OPAL_ERR_NOT_SUPPORTED;
613 }
614
cray_spawn_nb(opal_list_t * jobinfo,opal_list_t * apps,opal_pmix_spawn_cbfunc_t cbfunc,void * cbdata)615 static int cray_spawn_nb(opal_list_t *jobinfo, opal_list_t *apps,
616 opal_pmix_spawn_cbfunc_t cbfunc,
617 void *cbdata)
618 {
619 return OPAL_ERR_NOT_SUPPORTED;
620 }
621
cray_job_connect(opal_list_t * procs)622 static int cray_job_connect(opal_list_t *procs)
623 {
624 return OPAL_ERR_NOT_SUPPORTED;
625 }
626
cray_job_disconnect(opal_list_t * procs)627 static int cray_job_disconnect(opal_list_t *procs)
628 {
629 return OPAL_ERR_NOT_SUPPORTED;
630 }
631
cray_job_disconnect_nb(opal_list_t * procs,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)632 static int cray_job_disconnect_nb(opal_list_t *procs,
633 opal_pmix_op_cbfunc_t cbfunc,
634 void *cbdata)
635 {
636 return OPAL_ERR_NOT_SUPPORTED;
637 }
638
cray_resolve_peers(const char * nodename,opal_jobid_t jobid,opal_list_t * procs)639 static int cray_resolve_peers(const char *nodename,
640 opal_jobid_t jobid,
641 opal_list_t *procs)
642 {
643 return OPAL_ERR_NOT_IMPLEMENTED;
644 }
645
cray_resolve_nodes(opal_jobid_t jobid,char ** nodelist)646 static int cray_resolve_nodes(opal_jobid_t jobid, char **nodelist)
647 {
648 return OPAL_ERR_NOT_IMPLEMENTED;
649 }
650
cray_put(opal_pmix_scope_t scope,opal_value_t * kv)651 static int cray_put(opal_pmix_scope_t scope,
652 opal_value_t *kv)
653 {
654 int rc;
655
656 opal_output_verbose(10, opal_pmix_base_framework.framework_output,
657 "%s pmix:cray cray_put key %s scope %d\n",
658 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, scope);
659
660 if (!pmix_init_count) {
661 return OPAL_ERROR;
662 }
663
664 /*
665 * for now just always just global cache
666 */
667
668 if (NULL == mca_pmix_cray_component.cache_global) {
669 mca_pmix_cray_component.cache_global = OBJ_NEW(opal_buffer_t);
670 }
671
672 opal_output_verbose(20, opal_pmix_base_framework.framework_output,
673 "%s pmix:cray put global data for key %s type %d",
674 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kv->key, kv->type);
675 if (OPAL_SUCCESS != (rc = opal_dss.pack(mca_pmix_cray_component.cache_global, &kv, 1, OPAL_VALUE))) {
676 OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.pack returned error");
677 OPAL_ERROR_LOG(rc);
678 }
679
680 return rc;
681 }
682
cray_commit(void)683 static int cray_commit(void)
684 {
685 return OPAL_SUCCESS;
686 }
687
fencenb(int sd,short args,void * cbdata)688 static void fencenb(int sd, short args, void *cbdata)
689 {
690 pmi_opcaddy_t *op = (pmi_opcaddy_t*)cbdata;
691 int rc, cnt;
692 int32_t i;
693 int *all_lens = NULL;
694 opal_value_t *kp, kvn;
695 opal_buffer_t *send_buffer = NULL;
696 opal_buffer_t *buf = NULL;
697 void *sbuf_ptr;
698 char *cptr, *rcv_buff = NULL;
699 opal_process_name_t id;
700 typedef struct {
701 uint32_t pmix_rank;
702 opal_process_name_t name;
703 int32_t nbytes;
704 } bytes_and_rank_t;
705 int32_t rcv_nbytes_tot;
706 bytes_and_rank_t s_bytes_and_rank;
707 bytes_and_rank_t *r_bytes_and_ranks = NULL;
708 opal_hwloc_locality_t locality;
709 opal_list_t vals;
710 char *cpuset = NULL;
711
712 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
713 "%s pmix:cray executing fence cache_global %p cache_local %p",
714 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
715 (void *)mca_pmix_cray_component.cache_global,
716 (void *)mca_pmix_cray_component.cache_local);
717
718
719 /*
720 * "unload" the cache_local/cache_global buffers, first copy
721 * it so we can continue to use the local buffers if further
722 * calls to put can be made
723 */
724
725 send_buffer = OBJ_NEW(opal_buffer_t);
726 if (NULL == send_buffer) {
727 rc = OPAL_ERR_OUT_OF_RESOURCE;
728 goto fn_exit;
729 }
730
731 opal_dss.copy_payload(send_buffer, mca_pmix_cray_component.cache_global);
732 opal_dss.unload(send_buffer, &sbuf_ptr, &s_bytes_and_rank.nbytes);
733 s_bytes_and_rank.pmix_rank = pmix_rank;
734 s_bytes_and_rank.name = OPAL_PROC_MY_NAME;
735
736 r_bytes_and_ranks = (bytes_and_rank_t *)malloc(pmix_size * sizeof(bytes_and_rank_t));
737 if (NULL == r_bytes_and_ranks) {
738 rc = OPAL_ERR_OUT_OF_RESOURCE;
739 goto fn_exit;
740 }
741
742 /*
743 * gather up all the buffer sizes and rank order.
744 * doing this step below since the cray pmi PMI_Allgather doesn't deliver
745 * the gathered data necessarily in PMI rank order, although the order stays
746 * the same for the duration of a job - assuming no node failures.
747 */
748
749 if (PMI_SUCCESS != (rc = PMI_Allgather(&s_bytes_and_rank,r_bytes_and_ranks,sizeof(bytes_and_rank_t)))) {
750 OPAL_PMI_ERROR(rc,"PMI_Allgather");
751 rc = OPAL_ERR_COMM_FAILURE;
752 goto fn_exit;
753 }
754
755
756 for (rcv_nbytes_tot=0,i=0; i < pmix_size; i++) {
757 rcv_nbytes_tot += r_bytes_and_ranks[i].nbytes;
758 }
759
760 opal_output_verbose(20, opal_pmix_base_framework.framework_output,
761 "%s pmix:cray total number of bytes to receive %d",
762 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rcv_nbytes_tot);
763
764 rcv_buff = (char *) malloc(rcv_nbytes_tot * sizeof(char));
765 if (NULL == rcv_buff) {
766 rc = OPAL_ERR_OUT_OF_RESOURCE;
767 goto fn_exit;
768 }
769
770 all_lens = (int *)malloc(sizeof(int) * pmix_size);
771 if (NULL == all_lens) {
772 rc = OPAL_ERR_OUT_OF_RESOURCE;
773 goto fn_exit;
774 }
775 for (i=0; i< pmix_size; i++) {
776 all_lens[r_bytes_and_ranks[i].pmix_rank] = r_bytes_and_ranks[i].nbytes;
777 }
778
779 if (PMI_SUCCESS != (rc = PMI_Allgatherv(sbuf_ptr,s_bytes_and_rank.nbytes,rcv_buff,all_lens))) {
780 OPAL_PMI_ERROR(rc,"PMI_Allgatherv");
781 rc = OPAL_ERR_COMM_FAILURE;
782 goto fn_exit;
783 }
784
785 OBJ_RELEASE(send_buffer);
786 send_buffer = NULL;
787
788 buf = OBJ_NEW(opal_buffer_t);
789 if (buf == NULL) {
790 rc = OPAL_ERR_OUT_OF_RESOURCE;
791 goto fn_exit;
792 }
793
794 for (cptr = rcv_buff, i=0; i < pmix_size; i++) {
795
796 id = r_bytes_and_ranks[i].name;
797
798 buf->base_ptr = NULL; /* TODO: ugh */
799 if (OPAL_SUCCESS != (rc = opal_dss.load(buf, (void *)cptr, r_bytes_and_ranks[i].nbytes))) {
800 OPAL_PMI_ERROR(rc,"pmix:cray opal_dss.load failed");
801 goto fn_exit;
802 }
803
804 /* unpack and stuff in to the dstore */
805 cnt = 1;
806 while (OPAL_SUCCESS == (rc = opal_dss.unpack(buf, &kp, &cnt, OPAL_VALUE))) {
807 OPAL_OUTPUT_VERBOSE((20, opal_pmix_base_framework.framework_output,
808 "%s pmix:cray unpacked kp with key %s type(%d) for id %s",
809 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), kp->key, kp->type, OPAL_NAME_PRINT(id)));
810
811 if (OPAL_SUCCESS != (rc = opal_pmix_base_store(&id, kp))) {
812 OPAL_ERROR_LOG(rc);
813 goto fn_exit;
814 }
815 OBJ_RELEASE(kp);
816 cnt = 1;
817 }
818
819 cptr += r_bytes_and_ranks[i].nbytes;
820
821 }
822
823 buf->base_ptr = NULL; /* TODO: ugh */
824 OBJ_RELEASE(buf);
825
826 opal_output_verbose(2, opal_pmix_base_framework.framework_output,
827 "%s pmix:cray kvs_fence complete",
828 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME));
829
830 /* fetch my cpuset */
831 OBJ_CONSTRUCT(&vals, opal_list_t);
832 if (OPAL_SUCCESS == (rc = opal_pmix_base_fetch(&pmix_pname,
833 OPAL_PMIX_CPUSET, &vals))) {
834 kp = (opal_value_t*)opal_list_get_first(&vals);
835 cpuset = strdup(kp->data.string);
836 } else {
837 cpuset = NULL;
838 }
839 OPAL_LIST_DESTRUCT(&vals);
840
841 /* Get the modex data from each local process and set the
842 * localities to avoid having the MPI layer fetch data
843 * for every process in the job.
844 *
845 * we only need to set locality for each local rank as "not found"
846 * equates to "non-local"
847 */
848
849 for (i=0; i < pmix_nlranks; i++) {
850 id.vpid = pmix_lranks[i];
851 id.jobid = pmix_jobid;
852 OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
853 "%s checking out if %s is local to me",
854 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
855 OPAL_NAME_PRINT(id)));
856 /* fetch cpuset for this vpid */
857 OBJ_CONSTRUCT(&vals, opal_list_t);
858 if (OPAL_SUCCESS != (rc = opal_pmix_base_fetch(&id,
859 OPAL_PMIX_CPUSET, &vals))) {
860 OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
861 "%s cpuset for local proc %s not found",
862 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
863 OPAL_NAME_PRINT(id)));
864 OPAL_LIST_DESTRUCT(&vals);
865 /* even though the cpuset wasn't found, we at least know it is
866 * on the same node with us */
867 locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
868 } else {
869 kp = (opal_value_t*)opal_list_get_first(&vals);
870 if (NULL == kp->data.string) {
871 /* if we share a node, but we don't know anything more, then
872 * mark us as on the node as this is all we know
873 */
874 locality = OPAL_PROC_ON_CLUSTER | OPAL_PROC_ON_CU | OPAL_PROC_ON_NODE;
875 } else {
876 /* determine relative location on our node */
877 locality = opal_hwloc_base_get_relative_locality(opal_hwloc_topology,
878 cpuset,
879 kp->data.string);
880 }
881 OPAL_LIST_DESTRUCT(&vals);
882 }
883 OPAL_OUTPUT_VERBOSE((1, opal_pmix_base_framework.framework_output,
884 "%s pmix:cray proc %s locality %s",
885 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
886 OPAL_NAME_PRINT(id),
887 opal_hwloc_base_print_locality(locality)));
888
889 OBJ_CONSTRUCT(&kvn, opal_value_t);
890 kvn.key = strdup(OPAL_PMIX_LOCALITY);
891 kvn.type = OPAL_UINT16;
892 kvn.data.uint16 = locality;
893 opal_pmix_base_store(&id, &kvn);
894 OBJ_DESTRUCT(&kvn);
895 }
896
897 fn_exit:
898 if (NULL != cpuset) {
899 free(cpuset);
900 }
901 if (all_lens != NULL) {
902 free(all_lens);
903 }
904 if (rcv_buff != NULL) {
905 free(rcv_buff);
906 }
907 if (r_bytes_and_ranks != NULL) {
908 free(r_bytes_and_ranks);
909 }
910 if (NULL != op->opcbfunc) {
911 op->opcbfunc(rc, op->cbdata);
912 }
913 OBJ_RELEASE(op);
914 return;
915 }
916
fence_release(int status,void * cbdata)917 static void fence_release(int status, void *cbdata)
918 {
919 struct fence_result *res = (struct fence_result*)cbdata;
920 res->status = status;
921 opal_atomic_wmb();
922 res->flag = 0;
923 }
924
cray_fence(opal_list_t * procs,int collect_data)925 static int cray_fence(opal_list_t *procs, int collect_data)
926 {
927 struct fence_result result = { 1, OPAL_SUCCESS };
928 cray_fencenb(procs, collect_data, fence_release, (void*)&result);
929 CRAY_WAIT_FOR_COMPLETION(result.flag);
930 return result.status;
931 }
932
933
cray_fencenb(opal_list_t * procs,int collect_data,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)934 static int cray_fencenb(opal_list_t *procs, int collect_data,
935 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
936 {
937 pmi_opcaddy_t *op;
938
939 /* thread-shift this so we don't block in Cray's barrier */
940 op = OBJ_NEW(pmi_opcaddy_t);
941 op->opcbfunc = cbfunc;
942 op->cbdata = cbdata;
943 event_assign(&op->ev, opal_pmix_base.evbase, -1,
944 EV_WRITE, fencenb, op);
945 event_active(&op->ev, EV_WRITE, 1);
946
947 return OPAL_SUCCESS;
948 }
949
cray_get(const opal_process_name_t * id,const char * key,opal_list_t * info,opal_value_t ** kv)950 static int cray_get(const opal_process_name_t *id, const char *key, opal_list_t *info, opal_value_t **kv)
951 {
952 int rc;
953 opal_list_t vals;
954
955 OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
956 "%s pmix:cray getting value for proc %s key %s",
957 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
958 OPAL_NAME_PRINT(*id), key));
959
960 OBJ_CONSTRUCT(&vals, opal_list_t);
961 rc = opal_pmix_base_fetch(id, key, &vals);
962 if (OPAL_SUCCESS == rc) {
963 *kv = (opal_value_t*)opal_list_remove_first(&vals);
964 return OPAL_SUCCESS;
965 } else {
966 OPAL_OUTPUT_VERBOSE((2, opal_pmix_base_framework.framework_output,
967 "%s pmix:cray fetch from dstore failed: %d",
968 OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), rc));
969 }
970 OPAL_LIST_DESTRUCT(&vals);
971
972 return rc;
973
974 }
975
cray_get_nb(const opal_process_name_t * id,const char * key,opal_list_t * info,opal_pmix_value_cbfunc_t cbfunc,void * cbdata)976 static int cray_get_nb(const opal_process_name_t *id, const char *key,
977 opal_list_t *info, opal_pmix_value_cbfunc_t cbfunc, void *cbdata)
978 {
979 return OPAL_ERR_NOT_IMPLEMENTED;
980 }
981
cray_publish(opal_list_t * info)982 static int cray_publish(opal_list_t *info)
983 {
984 return OPAL_ERR_NOT_SUPPORTED;
985 }
986
cray_publish_nb(opal_list_t * info,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)987 static int cray_publish_nb(opal_list_t *info,
988 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
989 {
990 return OPAL_ERR_NOT_SUPPORTED;
991 }
992
cray_lookup(opal_list_t * data,opal_list_t * info)993 static int cray_lookup(opal_list_t *data, opal_list_t *info)
994 {
995 return OPAL_ERR_NOT_SUPPORTED;
996 }
997
cray_lookup_nb(char ** keys,opal_list_t * info,opal_pmix_lookup_cbfunc_t cbfunc,void * cbdata)998 static int cray_lookup_nb(char **keys, opal_list_t *info,
999 opal_pmix_lookup_cbfunc_t cbfunc, void *cbdata)
1000 {
1001 return OPAL_ERR_NOT_SUPPORTED;
1002 }
1003
cray_unpublish(char ** keys,opal_list_t * info)1004 static int cray_unpublish(char **keys, opal_list_t *info)
1005 {
1006 return OPAL_ERR_NOT_SUPPORTED;
1007 }
1008
cray_unpublish_nb(char ** keys,opal_list_t * info,opal_pmix_op_cbfunc_t cbfunc,void * cbdata)1009 static int cray_unpublish_nb(char **keys, opal_list_t *info,
1010 opal_pmix_op_cbfunc_t cbfunc, void *cbdata)
1011 {
1012 return OPAL_ERR_NOT_SUPPORTED;
1013 }
1014
cray_get_version(void)1015 static const char *cray_get_version(void)
1016 {
1017 return cray_pmi_version;
1018 }
1019
cray_store_local(const opal_process_name_t * proc,opal_value_t * val)1020 static int cray_store_local(const opal_process_name_t *proc,
1021 opal_value_t *val)
1022 {
1023 opal_pmix_base_store(proc, val);
1024
1025 return OPAL_SUCCESS;
1026 }
1027
cray_get_nspace(opal_jobid_t jobid)1028 static const char *cray_get_nspace(opal_jobid_t jobid)
1029 {
1030 return "N/A";
1031 }
1032
cray_register_jobid(opal_jobid_t jobid,const char * nspace)1033 static void cray_register_jobid(opal_jobid_t jobid, const char *nspace)
1034 {
1035 return;
1036 }
1037
pmix_error(int pmix_err)1038 static char* pmix_error(int pmix_err)
1039 {
1040 char * err_msg;
1041
1042 switch(pmix_err) {
1043 case PMI_FAIL: err_msg = "Operation failed"; break;
1044 case PMI_ERR_INIT: err_msg = "PMI is not initialized"; break;
1045 case PMI_ERR_NOMEM: err_msg = "Input buffer not large enough"; break;
1046 case PMI_ERR_INVALID_ARG: err_msg = "Invalid argument"; break;
1047 case PMI_ERR_INVALID_KEY: err_msg = "Invalid key argument"; break;
1048 case PMI_ERR_INVALID_KEY_LENGTH: err_msg = "Invalid key length argument"; break;
1049 case PMI_ERR_INVALID_VAL: err_msg = "Invalid value argument"; break;
1050 case PMI_ERR_INVALID_VAL_LENGTH: err_msg = "Invalid value length argument"; break;
1051 case PMI_ERR_INVALID_LENGTH: err_msg = "Invalid length argument"; break;
1052 case PMI_ERR_INVALID_NUM_ARGS: err_msg = "Invalid number of arguments"; break;
1053 case PMI_ERR_INVALID_ARGS: err_msg = "Invalid args argument"; break;
1054 case PMI_ERR_INVALID_NUM_PARSED: err_msg = "Invalid num_parsed length argument"; break;
1055 case PMI_ERR_INVALID_KEYVALP: err_msg = "Invalid keyvalp argument"; break;
1056 case PMI_ERR_INVALID_SIZE: err_msg = "Invalid size argument"; break;
1057 #if defined(PMI_ERR_INVALID_KVS)
1058 /* pmi.h calls this a valid return code but mpich doesn't define it (slurm does). */
1059 case PMI_ERR_INVALID_KVS: err_msg = "Invalid kvs argument"; break;
1060 #endif
1061 case PMI_SUCCESS: err_msg = "Success"; break;
1062 default: err_msg = "Unkown error";
1063 }
1064 return err_msg;
1065 }
1066