1 /*
2 * Copyright (c) 2015-2020 Intel, Inc. All rights reserved.
3 * Copyright (c) 2016-2018 IBM Corporation. All rights reserved.
4 * Copyright (c) 2018 Research Organization for Information Science
5 * and Technology (RIST). All rights reserved.
6 * Copyright (c) 2018-2020 Mellanox Technologies, Inc.
7 * All rights reserved.
8 *
9 * $COPYRIGHT$
10 *
11 * Additional copyrights may follow
12 *
13 * $HEADER$
14 */
15
16 #include "src/include/pmix_config.h"
17
18 #include <string.h>
19 #ifdef HAVE_UNISTD_H
20 #include <unistd.h>
21 #endif
22 #ifdef HAVE_SYS_TYPES_H
23 #include <sys/types.h>
24 #endif
25 #ifdef HAVE_SYS_STAT_H
26 #include <sys/stat.h>
27 #endif
28 #ifdef HAVE_FCNTL_H
29 #include <fcntl.h>
30 #endif
31 #include <time.h>
32
33 #include "include/pmix_common.h"
34
35 #include "src/include/pmix_globals.h"
36 #include "src/class/pmix_list.h"
37 #include "src/client/pmix_client_ops.h"
38 #include "src/server/pmix_server_ops.h"
39 #include "src/mca/pcompress/base/base.h"
40 #include "src/mca/preg/preg.h"
41 #include "src/mca/ptl/base/base.h"
42 #include "src/util/argv.h"
43 #include "src/util/error.h"
44 #include "src/util/hash.h"
45 #include "src/util/output.h"
46 #include "src/util/name_fns.h"
47 #include "src/util/pmix_environ.h"
48
49 #include "src/mca/gds/base/base.h"
50 #include "gds_hash.h"
51
52 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo);
53 static void hash_finalize(void);
54
55 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
56 int *priority);
57
58 static pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
59 pmix_info_t info[], size_t ninfo);
60
61 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
62 pmix_buffer_t *reply);
63
64 static pmix_status_t hash_store_job_info(const char *nspace,
65 pmix_buffer_t *buf);
66
67 static pmix_status_t hash_store(const pmix_proc_t *proc,
68 pmix_scope_t scope,
69 pmix_kval_t *kv);
70
71 static pmix_status_t hash_store_modex(struct pmix_namespace_t *ns,
72 pmix_buffer_t *buff,
73 void *cbdata);
74
75 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
76 pmix_proc_t *proc,
77 pmix_gds_modex_key_fmt_t key_fmt,
78 char **kmap,
79 pmix_buffer_t *pbkt);
80
81 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
82 pmix_scope_t scope, bool copy,
83 const char *key,
84 pmix_info_t info[], size_t ninfo,
85 pmix_list_t *kvs);
86
87 static pmix_status_t setup_fork(const pmix_proc_t *peer, char ***env);
88
89 static pmix_status_t nspace_add(const char *nspace, uint32_t nlocalprocs,
90 pmix_info_t info[], size_t ninfo);
91
92 static pmix_status_t nspace_del(const char *nspace);
93
94 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
95 pmix_list_t *kvs,
96 pmix_buffer_t *bo,
97 void *cbdata);
98
99 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf);
100
101 pmix_gds_base_module_t pmix_hash_module = {
102 .name = "hash",
103 .is_tsafe = false,
104 .init = hash_init,
105 .finalize = hash_finalize,
106 .assign_module = hash_assign_module,
107 .cache_job_info = hash_cache_job_info,
108 .register_job_info = hash_register_job_info,
109 .store_job_info = hash_store_job_info,
110 .store = hash_store,
111 .store_modex = hash_store_modex,
112 .fetch = hash_fetch,
113 .setup_fork = setup_fork,
114 .add_nspace = nspace_add,
115 .del_nspace = nspace_del,
116 .assemb_kvs_req = assemb_kvs_req,
117 .accept_kvs_resp = accept_kvs_resp
118 };
119
120 /* Define a bitmask to track what information may not have
121 * been provided but is computable from other info */
122 #define PMIX_HASH_PROC_DATA 0x00000001
123 #define PMIX_HASH_JOB_SIZE 0x00000002
124 #define PMIX_HASH_MAX_PROCS 0x00000004
125 #define PMIX_HASH_NUM_NODES 0x00000008
126 #define PMIX_HASH_PROC_MAP 0x00000010
127 #define PMIX_HASH_NODE_MAP 0x00000020
128
129 static pmix_list_t mysessions, myjobs;
130
131 /**********************************************/
132 /* struct definitions */
133 typedef struct {
134 pmix_list_item_t super;
135 uint32_t session;
136 pmix_list_t sessioninfo;
137 pmix_list_t nodeinfo;
138 } pmix_session_t;
139
140 typedef struct {
141 pmix_list_item_t super;
142 char *ns;
143 pmix_namespace_t *nptr;
144 pmix_hash_table_t internal;
145 pmix_hash_table_t remote;
146 pmix_hash_table_t local;
147 bool gdata_added;
148 pmix_list_t jobinfo;
149 pmix_list_t apps;
150 pmix_list_t nodeinfo;
151 pmix_session_t *session;
152 } pmix_job_t;
153
154 typedef struct {
155 pmix_list_item_t super;
156 uint32_t appnum;
157 pmix_list_t appinfo;
158 pmix_list_t nodeinfo;
159 pmix_job_t *job;
160 } pmix_apptrkr_t;
161
162 typedef struct {
163 pmix_list_item_t super;
164 uint32_t nodeid;
165 char *hostname;
166 char **aliases;
167 pmix_list_t info;
168 } pmix_nodeinfo_t;
169
170 /**********************************************/
171 /* class instantiations */
scon(pmix_session_t * s)172 static void scon(pmix_session_t *s)
173 {
174 s->session = UINT32_MAX;
175 PMIX_CONSTRUCT(&s->sessioninfo, pmix_list_t);
176 PMIX_CONSTRUCT(&s->nodeinfo, pmix_list_t);
177 }
sdes(pmix_session_t * s)178 static void sdes(pmix_session_t *s)
179 {
180 PMIX_LIST_DESTRUCT(&s->sessioninfo);
181 PMIX_LIST_DESTRUCT(&s->nodeinfo);
182 }
183 static PMIX_CLASS_INSTANCE(pmix_session_t,
184 pmix_list_item_t,
185 scon, sdes);
186
htcon(pmix_job_t * p)187 static void htcon(pmix_job_t *p)
188 {
189 p->ns = NULL;
190 p->nptr = NULL;
191 PMIX_CONSTRUCT(&p->jobinfo, pmix_list_t);
192 PMIX_CONSTRUCT(&p->internal, pmix_hash_table_t);
193 pmix_hash_table_init(&p->internal, 256);
194 PMIX_CONSTRUCT(&p->remote, pmix_hash_table_t);
195 pmix_hash_table_init(&p->remote, 256);
196 PMIX_CONSTRUCT(&p->local, pmix_hash_table_t);
197 pmix_hash_table_init(&p->local, 256);
198 p->gdata_added = false;
199 PMIX_CONSTRUCT(&p->apps, pmix_list_t);
200 PMIX_CONSTRUCT(&p->nodeinfo, pmix_list_t);
201 p->session = NULL;
202 }
htdes(pmix_job_t * p)203 static void htdes(pmix_job_t *p)
204 {
205 if (NULL != p->ns) {
206 free(p->ns);
207 }
208 if (NULL != p->nptr) {
209 PMIX_RELEASE(p->nptr);
210 }
211 PMIX_LIST_DESTRUCT(&p->jobinfo);
212 pmix_hash_remove_data(&p->internal, PMIX_RANK_WILDCARD, NULL);
213 PMIX_DESTRUCT(&p->internal);
214 pmix_hash_remove_data(&p->remote, PMIX_RANK_WILDCARD, NULL);
215 PMIX_DESTRUCT(&p->remote);
216 pmix_hash_remove_data(&p->local, PMIX_RANK_WILDCARD, NULL);
217 PMIX_DESTRUCT(&p->local);
218 PMIX_LIST_DESTRUCT(&p->apps);
219 PMIX_LIST_DESTRUCT(&p->nodeinfo);
220 if (NULL != p->session) {
221 PMIX_RELEASE(p->session);
222 }
223 }
224 static PMIX_CLASS_INSTANCE(pmix_job_t,
225 pmix_list_item_t,
226 htcon, htdes);
227
apcon(pmix_apptrkr_t * p)228 static void apcon(pmix_apptrkr_t *p)
229 {
230 p->appnum = 0;
231 PMIX_CONSTRUCT(&p->appinfo, pmix_list_t);
232 PMIX_CONSTRUCT(&p->nodeinfo, pmix_list_t);
233 p->job = NULL;
234 }
apdes(pmix_apptrkr_t * p)235 static void apdes(pmix_apptrkr_t *p)
236 {
237 PMIX_LIST_DESTRUCT(&p->appinfo);
238 PMIX_LIST_DESTRUCT(&p->nodeinfo);
239 if (NULL != p->job) {
240 PMIX_RELEASE(p->job);
241 }
242 }
243 static PMIX_CLASS_INSTANCE(pmix_apptrkr_t,
244 pmix_list_item_t,
245 apcon, apdes);
246
ndinfocon(pmix_nodeinfo_t * p)247 static void ndinfocon(pmix_nodeinfo_t *p)
248 {
249 p->nodeid = UINT32_MAX;
250 p->hostname = NULL;
251 p->aliases = NULL;
252 PMIX_CONSTRUCT(&p->info, pmix_list_t);
253 }
ndinfodes(pmix_nodeinfo_t * p)254 static void ndinfodes(pmix_nodeinfo_t *p)
255 {
256 if (NULL != p->hostname) {
257 free(p->hostname);
258 }
259 if (NULL != p->aliases) {
260 pmix_argv_free(p->aliases);
261 }
262 PMIX_LIST_DESTRUCT(&p->info);
263 }
264 static PMIX_CLASS_INSTANCE(pmix_nodeinfo_t,
265 pmix_list_item_t,
266 ndinfocon, ndinfodes);
267
268 /**********************************************
269 * Local Functions
270 **********************************************/
get_tracker(const pmix_nspace_t nspace,bool create)271 static pmix_job_t* get_tracker(const pmix_nspace_t nspace, bool create)
272 {
273 pmix_job_t *trk, *t;
274 pmix_namespace_t *ns, *nptr;
275
276 /* find the hash table for this nspace */
277 trk = NULL;
278 PMIX_LIST_FOREACH(t, &myjobs, pmix_job_t) {
279 if (0 == strcmp(nspace, t->ns)) {
280 trk = t;
281 break;
282 }
283 }
284 if (NULL == trk && create) {
285 /* create one */
286 trk = PMIX_NEW(pmix_job_t);
287 trk->ns = strdup(nspace);
288 /* see if we already have this nspace */
289 nptr = NULL;
290 PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
291 if (0 == strcmp(ns->nspace, nspace)) {
292 nptr = ns;
293 break;
294 }
295 }
296 if (NULL == nptr) {
297 nptr = PMIX_NEW(pmix_namespace_t);
298 if (NULL == nptr) {
299 PMIX_RELEASE(trk);
300 return NULL;
301 }
302 nptr->nspace = strdup(nspace);
303 pmix_list_append(&pmix_globals.nspaces, &nptr->super);
304 }
305 PMIX_RETAIN(nptr);
306 trk->nptr = nptr;
307 pmix_list_append(&myjobs, &trk->super);
308 }
309 return trk;
310 }
311
check_hostname(char * h1,char * h2)312 static bool check_hostname(char *h1, char *h2)
313 {
314 if (0 == strcmp(h1, h2)) {
315 return true;
316 }
317 return false;
318 }
319
check_node(pmix_nodeinfo_t * n1,pmix_nodeinfo_t * n2)320 static bool check_node(pmix_nodeinfo_t *n1,
321 pmix_nodeinfo_t *n2)
322 {
323 int i, j;
324
325 if (UINT32_MAX != n1->nodeid &&
326 UINT32_MAX != n2->nodeid &&
327 n1->nodeid == n2->nodeid) {
328 return true;
329 }
330
331 if (NULL == n1->hostname || NULL == n2->hostname) {
332 return false;
333 }
334
335 if (check_hostname(n1->hostname, n2->hostname)) {
336 return true;
337 }
338
339 if (NULL != n1->aliases) {
340 for (i=0; NULL != n1->aliases[i]; i++) {
341 if (check_hostname(n1->aliases[i], n2->hostname)) {
342 return true;
343 }
344 if (NULL != n2->aliases) {
345 for (j=0; NULL != n2->aliases[j]; j++) {
346 if (check_hostname(n1->hostname, n2->aliases[j])) {
347 return true;
348 }
349 if (check_hostname(n1->aliases[i], n2->aliases[j])) {
350 return true;
351 }
352 }
353 }
354 }
355 } else if (NULL != n2->aliases) {
356 for (j=0; NULL != n2->aliases[j]; j++) {
357 if (check_hostname(n1->hostname, n2->aliases[j])) {
358 return true;
359 }
360 }
361 }
362
363 return false;
364 }
365
check_nodename(pmix_nodeinfo_t * nptr,char * hostname)366 static bool check_nodename(pmix_nodeinfo_t *nptr, char *hostname)
367 {
368 int i;
369
370 if (NULL == nptr->hostname) {
371 return false;
372 }
373
374 if (check_hostname(nptr->hostname, hostname)) {
375 return true;
376 }
377
378 if (NULL != nptr->aliases) {
379 for (i=0; NULL != nptr->aliases[i]; i++) {
380 if (check_hostname(nptr->aliases[i], hostname)) {
381 return true;
382 }
383 }
384 }
385 return false;
386 }
387
388 /**********************************************
389 * Forward Declarations
390 **********************************************/
391 static pmix_status_t fetch_nodeinfo(const char *key, pmix_list_t *tgt,
392 pmix_info_t *info, size_t ninfo,
393 pmix_list_t *kvs);
394 static pmix_status_t fetch_appinfo(const char *key, pmix_list_t *tgt,
395 pmix_info_t *info, size_t ninfo,
396 pmix_list_t *kvs);
397
398 /**********************************************/
399
400 /* process a node array - contains an array of
401 * node-level info for a single node. Either the
402 * nodeid, hostname, or both must be included
403 * in the array to identify the node */
process_node_array(pmix_value_t * val,pmix_list_t * tgt)404 static pmix_status_t process_node_array(pmix_value_t *val,
405 pmix_list_t *tgt)
406 {
407 size_t size, j;
408 pmix_info_t *iptr;
409 pmix_status_t rc = PMIX_SUCCESS;
410 pmix_kval_t *kp2, *k1;
411 pmix_list_t cache;
412 pmix_nodeinfo_t *nd = NULL, *ndptr;
413 bool update;
414
415 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
416 "PROCESSING NODE ARRAY");
417
418 /* array of node-level info for a specific node */
419 if (PMIX_DATA_ARRAY != val->type) {
420 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
421 return PMIX_ERR_TYPE_MISMATCH;
422 }
423
424 /* setup arrays */
425 size = val->data.darray->size;
426 iptr = (pmix_info_t*)val->data.darray->array;
427 PMIX_CONSTRUCT(&cache, pmix_list_t);
428
429 /* cache the values while searching for the nodeid
430 * and/or hostname */
431 for (j=0; j < size; j++) {
432 pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
433 "%s gds:hash:node_array for key %s",
434 PMIX_NAME_PRINT(&pmix_globals.myid), iptr[j].key);
435 if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODEID)) {
436 if (NULL == nd) {
437 nd = PMIX_NEW(pmix_nodeinfo_t);
438 }
439 PMIX_VALUE_GET_NUMBER(rc, &iptr[j].value, nd->nodeid, uint32_t);
440 if (PMIX_SUCCESS != rc) {
441 PMIX_ERROR_LOG(rc);
442 PMIX_RELEASE(nd);
443 PMIX_LIST_DESTRUCT(&cache);
444 return rc;
445 }
446 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_HOSTNAME)) {
447 if (NULL == nd) {
448 nd = PMIX_NEW(pmix_nodeinfo_t);
449 }
450 nd->hostname = strdup(iptr[j].value.data.string);
451 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_HOSTNAME_ALIASES)) {
452 if (NULL == nd) {
453 nd = PMIX_NEW(pmix_nodeinfo_t);
454 }
455 nd->aliases = pmix_argv_split(iptr[j].value.data.string, ',');
456 /* need to cache this value as well */
457 kp2 = PMIX_NEW(pmix_kval_t);
458 kp2->key = strdup(iptr[j].key);
459 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
460 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
461 if (PMIX_SUCCESS != rc) {
462 PMIX_ERROR_LOG(rc);
463 PMIX_RELEASE(kp2);
464 PMIX_RELEASE(nd);
465 PMIX_LIST_DESTRUCT(&cache);
466 return rc;
467 }
468 pmix_list_append(&cache, &kp2->super);
469 } else {
470 kp2 = PMIX_NEW(pmix_kval_t);
471 kp2->key = strdup(iptr[j].key);
472 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
473 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
474 if (PMIX_SUCCESS != rc) {
475 PMIX_ERROR_LOG(rc);
476 PMIX_RELEASE(kp2);
477 if (NULL != nd) {
478 PMIX_RELEASE(nd);
479 }
480 PMIX_LIST_DESTRUCT(&cache);
481 return rc;
482 }
483 pmix_list_append(&cache, &kp2->super);
484 }
485 }
486
487 if (NULL == nd) {
488 /* they forgot to pass us the ident for the node */
489 PMIX_LIST_DESTRUCT(&cache);
490 return PMIX_ERR_BAD_PARAM;
491 }
492
493 /* see if we already have this node on the
494 * provided list */
495 update = false;
496 PMIX_LIST_FOREACH(ndptr, tgt, pmix_nodeinfo_t) {
497 if (check_node(ndptr, nd)) {
498 /* we assume that the data is updating the current
499 * values */
500 if (NULL == ndptr->hostname && NULL != nd->hostname) {
501 ndptr->hostname = strdup(nd->hostname);
502 }
503 PMIX_RELEASE(nd);
504 nd = ndptr;
505 update = true;
506 break;
507 }
508 }
509 if (!update) {
510 pmix_list_append(tgt, &nd->super);
511 }
512
513 /* transfer the cached items to the nodeinfo list */
514 kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
515 while (NULL != kp2) {
516 /* if this is an update, we have to ensure each data
517 * item only appears once on the list */
518 if (update) {
519 PMIX_LIST_FOREACH(k1, &nd->info, pmix_kval_t) {
520 if (PMIX_CHECK_KEY(k1, kp2->key)) {
521 pmix_list_remove_item(&nd->info, &k1->super);
522 PMIX_RELEASE(k1);
523 break;
524 }
525 }
526 }
527 pmix_list_append(&nd->info, &kp2->super);
528 kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
529 }
530 PMIX_LIST_DESTRUCT(&cache);
531
532 return PMIX_SUCCESS;
533 }
534
535 /* process an app array - contains an array of
536 * app-level info for a single app. If the
537 * appnum is not included in the array, then
538 * it is assumed that only app is in the job.
539 * This assumption is checked and generates
540 * an error if violated */
process_app_array(pmix_value_t * val,pmix_job_t * trk)541 static pmix_status_t process_app_array(pmix_value_t *val,
542 pmix_job_t *trk)
543 {
544 pmix_list_t cache, ncache;
545 size_t size, j;
546 pmix_info_t *iptr;
547 pmix_status_t rc = PMIX_SUCCESS;
548 uint32_t appnum;
549 pmix_apptrkr_t *app = NULL, *apptr;
550 pmix_kval_t *kp2, *k1;
551 pmix_nodeinfo_t *nd;
552 bool update;
553
554 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
555 "PROCESSING APP ARRAY");
556
557 /* apps have to belong to a job */
558 if (NULL == trk) {
559 return PMIX_ERR_BAD_PARAM;
560 }
561
562 /* array of app-level info */
563 if (PMIX_DATA_ARRAY != val->type) {
564 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
565 return PMIX_ERR_TYPE_MISMATCH;
566 }
567
568 /* setup arrays and lists */
569 PMIX_CONSTRUCT(&cache, pmix_list_t);
570 PMIX_CONSTRUCT(&ncache, pmix_list_t);
571 size = val->data.darray->size;
572 iptr = (pmix_info_t*)val->data.darray->array;
573
574 for (j=0; j < size; j++) {
575 pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
576 "%s gds:hash:app_array for key %s",
577 PMIX_NAME_PRINT(&pmix_globals.myid), iptr[j].key);
578 if (PMIX_CHECK_KEY(&iptr[j], PMIX_APPNUM)) {
579 PMIX_VALUE_GET_NUMBER(rc, &iptr[j].value, appnum, uint32_t);
580 if (PMIX_SUCCESS != rc) {
581 PMIX_ERROR_LOG(rc);
582 goto release;
583 }
584 if (NULL != app) {
585 /* this is an error - there can be only one app
586 * described in this array */
587 PMIX_RELEASE(app);
588 PMIX_LIST_DESTRUCT(&cache);
589 PMIX_LIST_DESTRUCT(&ncache);
590 return PMIX_ERR_BAD_PARAM;
591 }
592 app = PMIX_NEW(pmix_apptrkr_t);
593 app->appnum = appnum;
594 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_INFO_ARRAY)) {
595 if (PMIX_SUCCESS != (rc = process_node_array(&iptr[j].value, &ncache))) {
596 PMIX_ERROR_LOG(rc);
597 goto release;
598 }
599 } else {
600 kp2 = PMIX_NEW(pmix_kval_t);
601 kp2->key = strdup(iptr[j].key);
602 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
603 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
604 if (PMIX_SUCCESS != rc) {
605 PMIX_ERROR_LOG(rc);
606 PMIX_RELEASE(kp2);
607 goto release;
608 }
609 pmix_list_append(&cache, &kp2->super);
610 }
611 }
612 if (NULL == app) {
613 /* per the standard, they don't have to provide us with
614 * an appnum so long as only one app is in the job */
615 if (0 == pmix_list_get_size(&trk->apps)) {
616 app = PMIX_NEW(pmix_apptrkr_t);
617 app->appnum = 0;
618 } else {
619 /* this is not allowed to happen - they are required
620 * to provide us with an app number per the standard */
621 rc = PMIX_ERR_BAD_PARAM;
622 PMIX_ERROR_LOG(rc);
623 goto release;
624 }
625 }
626 /* see if we already have this app on the
627 * provided list */
628 update = false;
629 PMIX_LIST_FOREACH(apptr, &trk->apps, pmix_apptrkr_t) {
630 if (apptr->appnum == app->appnum) {
631 /* we assume that the data is updating the current
632 * values */
633 PMIX_RELEASE(app);
634 app = apptr;
635 update = true;
636 break;
637 }
638 }
639 if (!update) {
640 pmix_list_append(&trk->apps, &app->super);
641 }
642 /* point the app at its job */
643 if (NULL == app->job) {
644 PMIX_RETAIN(trk);
645 app->job = trk;
646 }
647
648 /* transfer the app-level data across */
649 kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
650 while (NULL != kp2) {
651 /* if this is an update, we have to ensure each data
652 * item only appears once on the list */
653 if (update) {
654 PMIX_LIST_FOREACH(k1, &app->appinfo, pmix_kval_t) {
655 if (PMIX_CHECK_KEY(k1, kp2->key)) {
656 pmix_list_remove_item(&app->appinfo, &k1->super);
657 PMIX_RELEASE(k1);
658 break;
659 }
660 }
661 }
662 pmix_list_append(&app->appinfo, &kp2->super);
663 kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
664 }
665 /* transfer the associated node-level data across */
666 nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
667 while (NULL != nd) {
668 pmix_list_append(&app->nodeinfo, &nd->super);
669 nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
670 }
671
672 release:
673 PMIX_LIST_DESTRUCT(&cache);
674 PMIX_LIST_DESTRUCT(&ncache);
675
676 return rc;
677 }
678
679 /* process a job array */
process_job_array(pmix_info_t * info,pmix_job_t * trk,uint32_t * flags,char *** procs,char *** nodes)680 static pmix_status_t process_job_array(pmix_info_t *info,
681 pmix_job_t *trk,
682 uint32_t *flags,
683 char ***procs,
684 char ***nodes)
685 {
686 pmix_list_t cache;
687 size_t j, size;
688 pmix_info_t *iptr;
689 pmix_kval_t *kp2;
690 pmix_status_t rc;
691
692 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
693 "PROCESSING JOB ARRAY");
694
695 /* array of job-level info */
696 if (PMIX_DATA_ARRAY != info->value.type) {
697 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
698 return PMIX_ERR_TYPE_MISMATCH;
699 }
700 size = info->value.data.darray->size;
701 iptr = (pmix_info_t*)info->value.data.darray->array;
702 PMIX_CONSTRUCT(&cache, pmix_list_t);
703 for (j=0; j < size; j++) {
704 if (PMIX_CHECK_KEY(&iptr[j], PMIX_APP_INFO_ARRAY)) {
705 if (PMIX_SUCCESS != (rc = process_app_array(&iptr[j].value, trk))) {
706 return rc;
707 }
708 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_INFO_ARRAY)) {
709 if (PMIX_SUCCESS != (rc = process_node_array(&iptr[j].value, &trk->nodeinfo))) {
710 PMIX_ERROR_LOG(rc);
711 return rc;
712 }
713 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_PROC_MAP)) {
714 /* not allowed to get this more than once */
715 if (*flags & PMIX_HASH_PROC_MAP) {
716 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
717 return PMIX_ERR_BAD_PARAM;
718 }
719 /* parse the regex to get the argv array containing proc ranks on each node */
720 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(iptr[j].value.data.bo.bytes, procs))) {
721 PMIX_ERROR_LOG(rc);
722 return rc;
723 }
724 /* mark that we got the map */
725 *flags |= PMIX_HASH_PROC_MAP;
726 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_MAP)) {
727 /* not allowed to get this more than once */
728 if (*flags & PMIX_HASH_NODE_MAP) {
729 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
730 return PMIX_ERR_BAD_PARAM;
731 }
732 /* parse the regex to get the argv array of node names */
733 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(iptr[j].value.data.bo.bytes, nodes))) {
734 PMIX_ERROR_LOG(rc);
735 return rc;
736 }
737 /* mark that we got the map */
738 *flags |= PMIX_HASH_NODE_MAP;
739 } else {
740 kp2 = PMIX_NEW(pmix_kval_t);
741 kp2->key = strdup(iptr[j].key);
742 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
743 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
744 if (PMIX_SUCCESS != rc) {
745 PMIX_RELEASE(kp2);
746 PMIX_LIST_DESTRUCT(&cache);
747 return rc;
748 }
749 pmix_list_append(&trk->jobinfo, &kp2->super);
750 /* check for job size */
751 if (PMIX_CHECK_KEY(&iptr[j], PMIX_JOB_SIZE) &&
752 !(PMIX_HASH_JOB_SIZE & *flags)) {
753 trk->nptr->nprocs = iptr[j].value.data.uint32;
754 *flags |= PMIX_HASH_JOB_SIZE;
755 }
756 }
757 }
758 return PMIX_SUCCESS;
759 }
760
process_session_array(pmix_value_t * val,pmix_job_t * trk)761 static pmix_status_t process_session_array(pmix_value_t *val,
762 pmix_job_t *trk)
763 {
764 pmix_session_t *s = NULL, *sptr;
765 size_t j, size;
766 pmix_info_t *iptr;
767 pmix_list_t cache, ncache;
768 pmix_status_t rc;
769 pmix_kval_t *kp2;
770 pmix_nodeinfo_t *nd;
771 uint32_t sid;
772
773 /* array of session-level info */
774 if (PMIX_DATA_ARRAY != val->type) {
775 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
776 return PMIX_ERR_TYPE_MISMATCH;
777 }
778 size = val->data.darray->size;
779 iptr = (pmix_info_t*)val->data.darray->array;
780
781 PMIX_CONSTRUCT(&cache, pmix_list_t);
782 PMIX_CONSTRUCT(&ncache, pmix_list_t);
783 for (j=0; j < size; j++) {
784 if (PMIX_CHECK_KEY(&iptr[j], PMIX_SESSION_ID)) {
785 PMIX_VALUE_GET_NUMBER(rc, &iptr[j].value, sid, uint32_t);
786 if (PMIX_SUCCESS != rc) {
787 PMIX_ERROR_LOG(rc);
788 PMIX_LIST_DESTRUCT(&cache);
789 PMIX_LIST_DESTRUCT(&ncache);
790 return rc;
791 }
792 /* see if we already have this session - it could have
793 * been defined by a separate PMIX_SESSION_ID key */
794 PMIX_LIST_FOREACH(sptr, &mysessions, pmix_session_t) {
795 if (sptr->session == sid) {
796 s = sptr;
797 break;
798 }
799 }
800 if (NULL == s) {
801 /* wasn't found, so create one */
802 s = PMIX_NEW(pmix_session_t);
803 s->session = sid;
804 pmix_list_append(&mysessions, &s->super);
805 }
806 } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_INFO_ARRAY)) {
807 if (PMIX_SUCCESS != (rc = process_node_array(&iptr[j].value, &ncache))) {
808 PMIX_ERROR_LOG(rc);
809 PMIX_LIST_DESTRUCT(&cache);
810 PMIX_LIST_DESTRUCT(&ncache);
811 return rc;
812 }
813 } else {
814 kp2 = PMIX_NEW(pmix_kval_t);
815 kp2->key = strdup(iptr[j].key);
816 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
817 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
818 if (PMIX_SUCCESS != rc) {
819 PMIX_ERROR_LOG(rc);
820 PMIX_RELEASE(kp2);
821 PMIX_LIST_DESTRUCT(&cache);
822 PMIX_LIST_DESTRUCT(&ncache);
823 return rc;
824 }
825 pmix_list_append(&cache, &kp2->super);
826 }
827 }
828 if (NULL == s) {
829 /* this is not allowed to happen - they are required
830 * to provide us with a session ID per the standard */
831 PMIX_LIST_DESTRUCT(&cache);
832 PMIX_LIST_DESTRUCT(&ncache);
833 rc = PMIX_ERR_BAD_PARAM;
834 PMIX_ERROR_LOG(rc);
835 return rc;
836 }
837 /* point the job at it */
838 if (NULL != trk->session) {
839 PMIX_RELEASE(trk->session);
840 }
841 PMIX_RETAIN(s);
842 trk->session = s;
843 /* transfer the data across */
844 kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
845 while (NULL != kp2) {
846 pmix_list_append(&s->sessioninfo, &kp2->super);
847 kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
848 }
849 PMIX_LIST_DESTRUCT(&cache);
850 nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
851 while (NULL != nd) {
852 pmix_list_append(&s->nodeinfo, &nd->super);
853 nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
854 }
855 PMIX_LIST_DESTRUCT(&ncache);
856 return PMIX_SUCCESS;
857 }
858
hash_init(pmix_info_t info[],size_t ninfo)859 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo)
860 {
861 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
862 "gds: hash init");
863
864 PMIX_CONSTRUCT(&mysessions, pmix_list_t);
865 PMIX_CONSTRUCT(&myjobs, pmix_list_t);
866 return PMIX_SUCCESS;
867 }
868
hash_finalize(void)869 static void hash_finalize(void)
870 {
871 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
872 "gds: hash finalize");
873
874 PMIX_LIST_DESTRUCT(&mysessions);
875 PMIX_LIST_DESTRUCT(&myjobs);
876 }
877
hash_assign_module(pmix_info_t * info,size_t ninfo,int * priority)878 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
879 int *priority)
880 {
881 size_t n, m;
882 char **options;
883
884 *priority = 10;
885 if (NULL != info) {
886 for (n=0; n < ninfo; n++) {
887 if (0 == strncmp(info[n].key, PMIX_GDS_MODULE, PMIX_MAX_KEYLEN)) {
888 options = pmix_argv_split(info[n].value.data.string, ',');
889 for (m=0; NULL != options[m]; m++) {
890 if (0 == strcmp(options[m], "hash")) {
891 /* they specifically asked for us */
892 *priority = 100;
893 break;
894 }
895 }
896 pmix_argv_free(options);
897 break;
898 }
899 }
900 }
901 return PMIX_SUCCESS;
902 }
903
store_map(pmix_job_t * trk,char ** nodes,char ** ppn,uint32_t flags)904 static pmix_status_t store_map(pmix_job_t *trk,
905 char **nodes, char **ppn,
906 uint32_t flags)
907 {
908 pmix_status_t rc;
909 size_t m, n;
910 pmix_rank_t rank;
911 pmix_kval_t *kp1, *kp2;
912 char **procs;
913 uint32_t totalprocs=0;
914 pmix_hash_table_t *ht = &trk->internal;
915 pmix_nodeinfo_t *nd, *ndptr;
916
917 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
918 "[%s:%d] gds:hash:store_map",
919 pmix_globals.myid.nspace, pmix_globals.myid.rank);
920
921 /* if the lists don't match, then that's wrong */
922 if (pmix_argv_count(nodes) != pmix_argv_count(ppn)) {
923 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
924 return PMIX_ERR_BAD_PARAM;
925 }
926
927 /* if they didn't provide the number of nodes, then
928 * compute it from the list of nodes */
929 if (!(PMIX_HASH_NUM_NODES & flags)) {
930 kp2 = PMIX_NEW(pmix_kval_t);
931 kp2->key = strdup(PMIX_NUM_NODES);
932 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
933 kp2->value->type = PMIX_UINT32;
934 kp2->value->data.uint32 = pmix_argv_count(nodes);
935 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
936 "[%s:%d] gds:hash:store_map adding key %s to job info",
937 pmix_globals.myid.nspace, pmix_globals.myid.rank,
938 kp2->key);
939 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
940 PMIX_ERROR_LOG(rc);
941 PMIX_RELEASE(kp2);
942 return rc;
943 }
944 PMIX_RELEASE(kp2); // maintain acctg
945 }
946
947 for (n=0; NULL != nodes[n]; n++) {
948 /* check and see if we already have this node */
949 nd = NULL;
950 PMIX_LIST_FOREACH(ndptr, &trk->nodeinfo, pmix_nodeinfo_t) {
951 if (check_nodename(ndptr, nodes[n])) {
952 nd = ndptr;
953 break;
954 }
955 }
956 if (NULL == nd) {
957 nd = PMIX_NEW(pmix_nodeinfo_t);
958 nd->hostname = strdup(nodes[n]);
959 nd->nodeid = n;
960 pmix_list_append(&trk->nodeinfo, &nd->super);
961 }
962 /* store the proc list as-is */
963 kp2 = PMIX_NEW(pmix_kval_t);
964 if (NULL == kp2) {
965 return PMIX_ERR_NOMEM;
966 }
967 kp2->key = strdup(PMIX_LOCAL_PEERS);
968 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
969 if (NULL == kp2->value) {
970 PMIX_RELEASE(kp2);
971 return PMIX_ERR_NOMEM;
972 }
973 kp2->value->type = PMIX_STRING;
974 kp2->value->data.string = strdup(ppn[n]);
975 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
976 "[%s:%d] gds:hash:store_map adding key %s to node %s info",
977 pmix_globals.myid.nspace, pmix_globals.myid.rank,
978 kp2->key, nodes[n]);
979 /* ensure this item only appears once on the list */
980 PMIX_LIST_FOREACH(kp1, &nd->info, pmix_kval_t) {
981 if (PMIX_CHECK_KEY(kp1, kp2->key)) {
982 pmix_list_remove_item(&nd->info, &kp1->super);
983 PMIX_RELEASE(kp1);
984 break;
985 }
986 }
987 pmix_list_append(&nd->info, &kp2->super);
988
989 /* save the local leader */
990 rank = strtoul(ppn[n], NULL, 10);
991 kp2 = PMIX_NEW(pmix_kval_t);
992 if (NULL == kp2) {
993 return PMIX_ERR_NOMEM;
994 }
995 kp2->key = strdup(PMIX_LOCALLDR);
996 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
997 if (NULL == kp2->value) {
998 PMIX_RELEASE(kp2);
999 return PMIX_ERR_NOMEM;
1000 }
1001 kp2->value->type = PMIX_PROC_RANK;
1002 kp2->value->data.rank = rank;
1003 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1004 "[%s:%d] gds:hash:store_map adding key %s to node %s info",
1005 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1006 kp2->key, nodes[n]);
1007 /* ensure this item only appears once on the list */
1008 PMIX_LIST_FOREACH(kp1, &nd->info, pmix_kval_t) {
1009 if (PMIX_CHECK_KEY(kp1, kp2->key)) {
1010 pmix_list_remove_item(&nd->info, &kp1->super);
1011 PMIX_RELEASE(kp1);
1012 break;
1013 }
1014 }
1015 pmix_list_append(&nd->info, &kp2->super);
1016
1017 /* split the list of procs so we can store their
1018 * individual location data */
1019 procs = pmix_argv_split(ppn[n], ',');
1020 /* save the local size in case they don't
1021 * give it to us */
1022 kp2 = PMIX_NEW(pmix_kval_t);
1023 if (NULL == kp2) {
1024 return PMIX_ERR_NOMEM;
1025 }
1026 kp2->key = strdup(PMIX_LOCAL_SIZE);
1027 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1028 if (NULL == kp2->value) {
1029 PMIX_RELEASE(kp2);
1030 return PMIX_ERR_NOMEM;
1031 }
1032 kp2->value->type = PMIX_UINT32;
1033 kp2->value->data.uint32 = pmix_argv_count(procs);
1034 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1035 "[%s:%d] gds:hash:store_map adding key %s to node %s info",
1036 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1037 kp2->key, nodes[n]);
1038 /* ensure this item only appears once on the list */
1039 PMIX_LIST_FOREACH(kp1, &nd->info, pmix_kval_t) {
1040 if (PMIX_CHECK_KEY(kp1, kp2->key)) {
1041 pmix_list_remove_item(&nd->info, &kp1->super);
1042 PMIX_RELEASE(kp1);
1043 break;
1044 }
1045 }
1046 pmix_list_append(&nd->info, &kp2->super);
1047 /* track total procs in job in case they
1048 * didn't give it to us */
1049 totalprocs += pmix_argv_count(procs);
1050 for (m=0; NULL != procs[m]; m++) {
1051 /* store the hostname for each proc */
1052 kp2 = PMIX_NEW(pmix_kval_t);
1053 kp2->key = strdup(PMIX_HOSTNAME);
1054 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1055 kp2->value->type = PMIX_STRING;
1056 kp2->value->data.string = strdup(nodes[n]);
1057 rank = strtol(procs[m], NULL, 10);
1058 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1059 "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1060 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1061 trk->ns, rank, kp2->key);
1062 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1063 PMIX_ERROR_LOG(rc);
1064 PMIX_RELEASE(kp2);
1065 pmix_argv_free(procs);
1066 return rc;
1067 }
1068 PMIX_RELEASE(kp2); // maintain acctg
1069 if (!(PMIX_HASH_PROC_DATA & flags)) {
1070 /* add an entry for the nodeid */
1071 kp2 = PMIX_NEW(pmix_kval_t);
1072 kp2->key = strdup(PMIX_NODEID);
1073 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1074 kp2->value->type = PMIX_UINT32;
1075 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1076 "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1077 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1078 trk->ns, rank, kp2->key);
1079 kp2->value->data.uint32 = n;
1080 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1081 PMIX_ERROR_LOG(rc);
1082 PMIX_RELEASE(kp2);
1083 pmix_argv_free(procs);
1084 return rc;
1085 }
1086 PMIX_RELEASE(kp2); // maintain acctg
1087 /* add an entry for the local rank */
1088 kp2 = PMIX_NEW(pmix_kval_t);
1089 kp2->key = strdup(PMIX_LOCAL_RANK);
1090 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1091 kp2->value->type = PMIX_UINT16;
1092 kp2->value->data.uint16 = m;
1093 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1094 "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1095 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1096 trk->ns, rank, kp2->key);
1097 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1098 PMIX_ERROR_LOG(rc);
1099 PMIX_RELEASE(kp2);
1100 pmix_argv_free(procs);
1101 return rc;
1102 }
1103 PMIX_RELEASE(kp2); // maintain acctg
1104 /* add an entry for the node rank - for now, we assume
1105 * only the one job is running */
1106 kp2 = PMIX_NEW(pmix_kval_t);
1107 kp2->key = strdup(PMIX_NODE_RANK);
1108 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1109 kp2->value->type = PMIX_UINT16;
1110 kp2->value->data.uint16 = m;
1111 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1112 "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1113 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1114 trk->ns, rank, kp2->key);
1115 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1116 PMIX_ERROR_LOG(rc);
1117 PMIX_RELEASE(kp2);
1118 pmix_argv_free(procs);
1119 return rc;
1120 }
1121 PMIX_RELEASE(kp2); // maintain acctg
1122 }
1123 }
1124 pmix_argv_free(procs);
1125 }
1126
1127 /* store the comma-delimited list of nodes hosting
1128 * procs in this nspace in case someone using PMIx v2
1129 * requests it */
1130 kp2 = PMIX_NEW(pmix_kval_t);
1131 kp2->key = strdup(PMIX_NODE_LIST);
1132 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1133 kp2->value->type = PMIX_STRING;
1134 kp2->value->data.string = pmix_argv_join(nodes, ',');
1135 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1136 "[%s:%d] gds:hash:store_map for nspace %s: key %s",
1137 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1138 trk->ns, kp2->key);
1139 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1140 PMIX_ERROR_LOG(rc);
1141 PMIX_RELEASE(kp2);
1142 return rc;
1143 }
1144 PMIX_RELEASE(kp2); // maintain acctg
1145
1146 /* if they didn't provide the job size, compute it as
1147 * being the number of provided procs (i.e., size of
1148 * ppn list) */
1149 if (!(PMIX_HASH_JOB_SIZE & flags)) {
1150 kp2 = PMIX_NEW(pmix_kval_t);
1151 kp2->key = strdup(PMIX_JOB_SIZE);
1152 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1153 kp2->value->type = PMIX_UINT32;
1154 kp2->value->data.uint32 = totalprocs;
1155 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1156 "[%s:%d] gds:hash:store_map for nspace %s: key %s",
1157 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1158 trk->ns, kp2->key);
1159 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1160 PMIX_ERROR_LOG(rc);
1161 PMIX_RELEASE(kp2);
1162 return rc;
1163 }
1164 PMIX_RELEASE(kp2); // maintain acctg
1165 flags |= PMIX_HASH_JOB_SIZE;
1166 trk->nptr->nprocs = totalprocs;
1167 }
1168
1169 /* if they didn't provide a value for max procs, just
1170 * assume it is the same as the number of procs in the
1171 * job and store it */
1172 if (!(PMIX_HASH_MAX_PROCS & flags)) {
1173 kp2 = PMIX_NEW(pmix_kval_t);
1174 kp2->key = strdup(PMIX_MAX_PROCS);
1175 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1176 kp2->value->type = PMIX_UINT32;
1177 kp2->value->data.uint32 = totalprocs;
1178 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1179 "[%s:%d] gds:hash:store_map for nspace %s: key %s",
1180 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1181 trk->ns, kp2->key);
1182 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1183 PMIX_ERROR_LOG(rc);
1184 PMIX_RELEASE(kp2);
1185 return rc;
1186 }
1187 PMIX_RELEASE(kp2); // maintain acctg
1188 flags |= PMIX_HASH_MAX_PROCS;
1189 }
1190
1191
1192 return PMIX_SUCCESS;
1193 }
1194
hash_cache_job_info(struct pmix_namespace_t * ns,pmix_info_t info[],size_t ninfo)1195 pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
1196 pmix_info_t info[], size_t ninfo)
1197 {
1198 pmix_namespace_t *nptr = (pmix_namespace_t*)ns;
1199 pmix_job_t *trk;
1200 pmix_session_t *s = NULL, *sptr;
1201 pmix_hash_table_t *ht;
1202 pmix_kval_t *kp2, *kvptr;
1203 pmix_info_t *iptr;
1204 char **nodes=NULL, **procs=NULL;
1205 uint8_t *tmp;
1206 uint32_t sid=UINT32_MAX;
1207 pmix_rank_t rank;
1208 pmix_status_t rc=PMIX_SUCCESS;
1209 size_t n, j, size, len;
1210 uint32_t flags = 0;
1211 pmix_nodeinfo_t *nd, *ndptr;
1212 pmix_apptrkr_t *apptr;
1213 bool found;
1214
1215 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1216 "[%s:%d] gds:hash:cache_job_info for nspace %s with %lu info",
1217 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1218 nptr->nspace, ninfo);
1219
1220 trk = get_tracker(nptr->nspace, true);
1221 if (NULL == trk) {
1222 return PMIX_ERR_NOMEM;
1223 }
1224
1225 /* if there isn't any data, then be content with just
1226 * creating the tracker */
1227 if (NULL == info || 0 == ninfo) {
1228 return PMIX_SUCCESS;
1229 }
1230
1231 /* cache the job info on the internal hash table for this nspace */
1232 ht = &trk->internal;
1233 for (n=0; n < ninfo; n++) {
1234 pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
1235 "%s gds:hash:cache_job_info for key %s",
1236 PMIX_NAME_PRINT(&pmix_globals.myid), info[n].key);
1237 if (PMIX_CHECK_KEY(&info[n], PMIX_SESSION_ID)) {
1238 PMIX_VALUE_GET_NUMBER(rc, &info[n].value, sid, uint32_t);
1239 if (PMIX_SUCCESS != rc) {
1240 PMIX_ERROR_LOG(rc);
1241 goto release;
1242 }
1243 /* see if we have this session */
1244 s = NULL;
1245 PMIX_LIST_FOREACH(sptr, &mysessions, pmix_session_t) {
1246 if (sptr->session == sid) {
1247 s = sptr;
1248 break;
1249 }
1250 }
1251 if (NULL == s) {
1252 s = PMIX_NEW(pmix_session_t);
1253 s->session = sid;
1254 pmix_list_append(&mysessions, &s->super);
1255 }
1256 /* point the job at it */
1257 if (NULL == trk->session) {
1258 PMIX_RETAIN(s);
1259 trk->session = s;
1260 }
1261 } else if (PMIX_CHECK_KEY(&info[n], PMIX_SESSION_INFO_ARRAY)) {
1262 if (PMIX_SUCCESS != (rc = process_session_array(&info[n].value, trk))) {
1263 PMIX_ERROR_LOG(rc);
1264 goto release;
1265 }
1266 } else if (PMIX_CHECK_KEY(&info[n], PMIX_JOB_INFO_ARRAY)) {
1267 if (PMIX_SUCCESS != (rc = process_job_array(&info[n], trk, &flags, &procs, &nodes))) {
1268 PMIX_ERROR_LOG(rc);
1269 goto release;
1270 }
1271 } else if (PMIX_CHECK_KEY(&info[n], PMIX_APP_INFO_ARRAY)) {
1272 if (PMIX_SUCCESS != (rc = process_app_array(&info[n].value, trk))) {
1273 PMIX_ERROR_LOG(rc);
1274 goto release;
1275 }
1276 } else if (PMIX_CHECK_KEY(&info[n], PMIX_NODE_INFO_ARRAY)) {
1277 if (PMIX_SUCCESS != (rc = process_node_array(&info[n].value, &trk->nodeinfo))) {
1278 PMIX_ERROR_LOG(rc);
1279 goto release;
1280 }
1281 } else if (PMIX_CHECK_KEY(&info[n], PMIX_NODE_MAP)) {
1282 /* not allowed to get this more than once */
1283 if (flags & PMIX_HASH_NODE_MAP) {
1284 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1285 return PMIX_ERR_BAD_PARAM;
1286 }
1287 /* parse the regex to get the argv array of node names */
1288 if (PMIX_REGEX == info[n].value.type) {
1289 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(info[n].value.data.bo.bytes, &nodes))) {
1290 PMIX_ERROR_LOG(rc);
1291 goto release;
1292 }
1293 } else if (PMIX_STRING == info[n].value.type) {
1294 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(info[n].value.data.string, &nodes))) {
1295 PMIX_ERROR_LOG(rc);
1296 goto release;
1297 }
1298 } else {
1299 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
1300 rc = PMIX_ERR_TYPE_MISMATCH;
1301 goto release;
1302 }
1303 /* mark that we got the map */
1304 flags |= PMIX_HASH_NODE_MAP;
1305 } else if (PMIX_CHECK_KEY(&info[n], PMIX_PROC_MAP)) {
1306 /* not allowed to get this more than once */
1307 if (flags & PMIX_HASH_PROC_MAP) {
1308 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1309 return PMIX_ERR_BAD_PARAM;
1310 }
1311 /* parse the regex to get the argv array containing proc ranks on each node */
1312 if (PMIX_REGEX == info[n].value.type) {
1313 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(info[n].value.data.bo.bytes, &procs))) {
1314 PMIX_ERROR_LOG(rc);
1315 goto release;
1316 }
1317 } else if (PMIX_STRING == info[n].value.type) {
1318 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(info[n].value.data.string, &procs))) {
1319 PMIX_ERROR_LOG(rc);
1320 goto release;
1321 }
1322 } else {
1323 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
1324 rc = PMIX_ERR_TYPE_MISMATCH;
1325 goto release;
1326 }
1327 /* mark that we got the map */
1328 flags |= PMIX_HASH_PROC_MAP;
1329 } else if (PMIX_CHECK_KEY(&info[n], PMIX_PROC_DATA)) {
1330 flags |= PMIX_HASH_PROC_DATA;
1331 found = false;
1332 /* an array of data pertaining to a specific proc */
1333 if (PMIX_DATA_ARRAY != info[n].value.type) {
1334 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1335 rc = PMIX_ERR_TYPE_MISMATCH;
1336 goto release;
1337 }
1338 size = info[n].value.data.darray->size;
1339 iptr = (pmix_info_t*)info[n].value.data.darray->array;
1340 /* first element of the array must be the rank */
1341 if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
1342 PMIX_PROC_RANK != iptr[0].value.type) {
1343 rc = PMIX_ERR_TYPE_MISMATCH;
1344 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1345 goto release;
1346 }
1347 rank = iptr[0].value.data.rank;
1348 /* cycle thru the values for this rank and store them */
1349 for (j=1; j < size; j++) {
1350 kp2 = PMIX_NEW(pmix_kval_t);
1351 if (NULL == kp2) {
1352 rc = PMIX_ERR_NOMEM;
1353 goto release;
1354 }
1355 kp2->key = strdup(iptr[j].key);
1356 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
1357 if (PMIX_SUCCESS != rc) {
1358 PMIX_ERROR_LOG(rc);
1359 PMIX_RELEASE(kp2);
1360 goto release;
1361 }
1362 /* if the value contains a string that is longer than the
1363 * limit, then compress it */
1364 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
1365 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
1366 if (NULL == tmp) {
1367 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1368 rc = PMIX_ERR_NOMEM;
1369 goto release;
1370 }
1371 kp2->value->type = PMIX_COMPRESSED_STRING;
1372 free(kp2->value->data.string);
1373 kp2->value->data.bo.bytes = (char*)tmp;
1374 kp2->value->data.bo.size = len;
1375 }
1376 }
1377 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1378 "[%s:%d] gds:hash:cache_job_info proc data for [%s:%u]: key %s",
1379 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1380 trk->ns, rank, kp2->key);
1381 /* store it in the hash_table */
1382 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1383 PMIX_ERROR_LOG(rc);
1384 PMIX_RELEASE(kp2);
1385 goto release;
1386 }
1387 /* if this is the appnum, pass it to the pmdl framework */
1388 if (PMIX_CHECK_KEY(kp2, PMIX_APPNUM)) {
1389 found = true;
1390 if (rank == pmix_globals.myid.rank) {
1391 pmix_globals.appnum = kp2->value->data.uint32;
1392 }
1393 }
1394 PMIX_RELEASE(kp2); // maintain acctg
1395 }
1396 if (!found) {
1397 /* if they didn't give us an appnum for this proc, we have
1398 * to assume it is appnum=0 */
1399 uint32_t zero = 0;
1400 kp2 = PMIX_NEW(pmix_kval_t);
1401 if (NULL == kp2) {
1402 rc = PMIX_ERR_NOMEM;
1403 goto release;
1404 }
1405 kp2->key = strdup(PMIX_APPNUM);
1406 PMIX_VALUE_CREATE(kp2->value, 1);
1407 PMIX_VALUE_LOAD(kp2->value, &zero, PMIX_UINT32);
1408 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1409 PMIX_ERROR_LOG(rc);
1410 PMIX_RELEASE(kp2);
1411 goto release;
1412 }
1413 PMIX_RELEASE(kp2); // maintain acctg
1414 }
1415 } else if (pmix_check_node_info(info[n].key)) {
1416 /* they are passing us the node-level info for just this
1417 * node - start by seeing if our node is on the list */
1418 nd = NULL;
1419 PMIX_LIST_FOREACH(ndptr, &trk->nodeinfo, pmix_nodeinfo_t) {
1420 if (check_nodename(ndptr, pmix_globals.hostname)) {
1421 nd = ndptr;
1422 break;
1423 }
1424 }
1425 /* if not, then add it */
1426 if (NULL == nd) {
1427 nd = PMIX_NEW(pmix_nodeinfo_t);
1428 nd->hostname = strdup(pmix_globals.hostname);
1429 pmix_list_append(&trk->nodeinfo, &nd->super);
1430 }
1431 /* ensure the value isn't already on the node info */
1432 PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
1433 if (PMIX_CHECK_KEY(kp2, info[n].key)) {
1434 pmix_list_remove_item(&nd->info, &kp2->super);
1435 PMIX_RELEASE(kp2);
1436 break;
1437 }
1438 }
1439 /* add the provided value */
1440 kp2 = PMIX_NEW(pmix_kval_t);
1441 kp2->key = strdup(info[n].key);
1442 PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
1443 pmix_list_append(&nd->info, &kp2->super);
1444 } else if (pmix_check_app_info(info[n].key)) {
1445 /* they are passing us app-level info for a default
1446 * app number - have to assume it is app=0 */
1447 if (0 == pmix_list_get_size(&trk->apps)) {
1448 apptr = PMIX_NEW(pmix_apptrkr_t);
1449 pmix_list_append(&trk->apps, &apptr->super);
1450 } else if (1 < pmix_list_get_size(&trk->apps)) {
1451 rc = PMIX_ERR_BAD_PARAM;
1452 goto release;
1453 } else {
1454 apptr = (pmix_apptrkr_t*)pmix_list_get_first(&trk->apps);
1455 }
1456 /* ensure the value isn't already on the app info */
1457 PMIX_LIST_FOREACH(kp2, &apptr->appinfo, pmix_kval_t) {
1458 if (PMIX_CHECK_KEY(kp2, info[n].key)) {
1459 pmix_list_remove_item(&apptr->appinfo, &kp2->super);
1460 PMIX_RELEASE(kp2);
1461 break;
1462 }
1463 }
1464 /* add the provided value */
1465 kp2 = PMIX_NEW(pmix_kval_t);
1466 kp2->key = strdup(info[n].key);
1467 PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
1468 pmix_list_append(&apptr->appinfo, &kp2->super);
1469 } else {
1470 /* just a value relating to the entire job */
1471 kp2 = PMIX_NEW(pmix_kval_t);
1472 if (NULL == kp2) {
1473 rc = PMIX_ERR_NOMEM;
1474 goto release;
1475 }
1476 kp2->key = strdup(info[n].key);
1477 PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
1478 if (PMIX_SUCCESS != rc) {
1479 PMIX_ERROR_LOG(rc);
1480 PMIX_RELEASE(kp2);
1481 goto release;
1482 }
1483 /* if the value contains a string that is longer than the
1484 * limit, then compress it */
1485 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
1486 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
1487 if (NULL == tmp) {
1488 rc = PMIX_ERR_NOMEM;
1489 PMIX_ERROR_LOG(rc);
1490 PMIX_RELEASE(kp2);
1491 goto release;
1492 }
1493 kp2->value->type = PMIX_COMPRESSED_STRING;
1494 free(kp2->value->data.string);
1495 kp2->value->data.bo.bytes = (char*)tmp;
1496 kp2->value->data.bo.size = len;
1497 }
1498 }
1499 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1500 PMIX_ERROR_LOG(rc);
1501 PMIX_RELEASE(kp2);
1502 goto release;
1503 }
1504 PMIX_RELEASE(kp2); // maintain acctg
1505 /* if this is the job size, then store it in
1506 * the nptr tracker and flag that we were given it */
1507 if (PMIX_CHECK_KEY(&info[n], PMIX_JOB_SIZE)) {
1508 nptr->nprocs = info[n].value.data.uint32;
1509 flags |= PMIX_HASH_JOB_SIZE;
1510 } else if (PMIX_CHECK_KEY(&info[n], PMIX_NUM_NODES)) {
1511 flags |= PMIX_HASH_NUM_NODES;
1512 } else if (PMIX_CHECK_KEY(&info[n], PMIX_MAX_PROCS)) {
1513 flags |= PMIX_HASH_MAX_PROCS;
1514 }
1515 }
1516 }
1517
1518 /* now add any global data that was provided */
1519 if (!trk->gdata_added) {
1520 PMIX_LIST_FOREACH(kvptr, &pmix_server_globals.gdata, pmix_kval_t) {
1521 /* sadly, the data cannot simultaneously exist on two lists,
1522 * so we must make a copy of it here */
1523 kp2 = PMIX_NEW(pmix_kval_t);
1524 if (NULL == kp2) {
1525 rc = PMIX_ERR_NOMEM;
1526 goto release;
1527 }
1528 kp2->key = strdup(kvptr->key);
1529 PMIX_VALUE_XFER(rc, kp2->value, kvptr->value);
1530 if (PMIX_SUCCESS != rc) {
1531 PMIX_ERROR_LOG(rc);
1532 PMIX_RELEASE(kp2);
1533 goto release;
1534 }
1535 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1536 PMIX_ERROR_LOG(rc);
1537 PMIX_RELEASE(kp2);
1538 break;
1539 }
1540 PMIX_RELEASE(kp2); // maintain acctg
1541 }
1542 trk->gdata_added = true;
1543 }
1544
1545 /* we must have the proc AND node maps */
1546 if (NULL != procs && NULL != nodes) {
1547 if (PMIX_SUCCESS != (rc = store_map(trk, nodes, procs, flags))) {
1548 PMIX_ERROR_LOG(rc);
1549 }
1550 }
1551
1552 release:
1553 if (NULL != nodes) {
1554 pmix_argv_free(nodes);
1555 }
1556 if (NULL != procs) {
1557 pmix_argv_free(procs);
1558 }
1559 return rc;
1560 }
1561
register_info(pmix_peer_t * peer,pmix_namespace_t * ns,pmix_buffer_t * reply)1562 static pmix_status_t register_info(pmix_peer_t *peer,
1563 pmix_namespace_t *ns,
1564 pmix_buffer_t *reply)
1565 {
1566 pmix_job_t *trk;
1567 pmix_hash_table_t *ht;
1568 pmix_value_t *val, blob;
1569 pmix_status_t rc = PMIX_SUCCESS;
1570 pmix_info_t *info;
1571 size_t ninfo, n;
1572 pmix_kval_t kv, *kvptr;
1573 pmix_buffer_t buf;
1574 pmix_rank_t rank;
1575 pmix_list_t results;
1576 char *hname;
1577
1578 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1579 "REGISTERING FOR PEER %s type %d.%d.%d", PMIX_PNAME_PRINT(&peer->info->pname),
1580 peer->proc_type.major, peer->proc_type.minor, peer->proc_type.release);
1581
1582 trk = get_tracker(ns->nspace, true);
1583 if (NULL == trk) {
1584 return PMIX_ERR_NOMEM;
1585 }
1586 /* the job data is stored on the internal hash table */
1587 ht = &trk->internal;
1588
1589 /* fetch all values from the hash table tied to rank=wildcard */
1590 val = NULL;
1591 rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
1592 if (PMIX_SUCCESS != rc) {
1593 PMIX_ERROR_LOG(rc);
1594 if (NULL != val) {
1595 PMIX_VALUE_RELEASE(val);
1596 }
1597 return rc;
1598 }
1599
1600 if (NULL == val || NULL == val->data.darray ||
1601 PMIX_INFO != val->data.darray->type ||
1602 0 == val->data.darray->size) {
1603 return PMIX_ERR_NOT_FOUND;
1604 }
1605 info = (pmix_info_t*)val->data.darray->array;
1606 ninfo = val->data.darray->size;
1607 for (n=0; n < ninfo; n++) {
1608 kv.key = info[n].key;
1609 kv.value = &info[n].value;
1610 PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
1611 }
1612 if (NULL != val) {
1613 PMIX_VALUE_RELEASE(val);
1614 }
1615
1616 /* add all values in the jobinfo list */
1617 PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
1618 PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1619 }
1620
1621 /* get any node-level info for this job */
1622 PMIX_CONSTRUCT(&results, pmix_list_t);
1623 rc = fetch_nodeinfo(NULL, &trk->nodeinfo, NULL, 0, &results);
1624 if (PMIX_SUCCESS == rc) {
1625 PMIX_LIST_FOREACH(kvptr, &results, pmix_kval_t) {
1626 /* if the peer is earlier than v3.1.5, it is expecting
1627 * node info to be in the form of an array, but with the
1628 * hostname as the key. Detect and convert that here */
1629 if (PMIX_PEER_IS_EARLIER(peer, 3, 1, 5)) {
1630 info = (pmix_info_t*)kvptr->value->data.darray->array;
1631 ninfo = kvptr->value->data.darray->size;
1632 hname = NULL;
1633 /* find the hostname */
1634 for (n=0; n < ninfo; n++) {
1635 if (PMIX_CHECK_KEY(&info[n], PMIX_HOSTNAME)) {
1636 free(kvptr->key);
1637 kvptr->key = strdup(info[n].value.data.string);
1638 PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1639 hname = kvptr->key;
1640 break;
1641 }
1642 }
1643 if (NULL != hname && check_hostname(pmix_globals.hostname, hname)) {
1644 /* older versions are looking for node-level keys for
1645 * only their own node as standalone keys */
1646 for (n=0; n < ninfo; n++) {
1647 if (pmix_check_node_info(info[n].key)) {
1648 kv.key = strdup(info[n].key);
1649 kv.value = &info[n].value;
1650 PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
1651 }
1652 }
1653 }
1654 } else {
1655 PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1656 }
1657 }
1658 }
1659 PMIX_LIST_DESTRUCT(&results);
1660
1661 /* get any app-level info for this job */
1662 PMIX_CONSTRUCT(&results, pmix_list_t);
1663 rc = fetch_appinfo(NULL, &trk->apps, NULL, 0, &results);
1664 if (PMIX_SUCCESS == rc) {
1665 PMIX_LIST_FOREACH(kvptr, &results, pmix_kval_t) {
1666 PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1667 }
1668 }
1669 PMIX_LIST_DESTRUCT(&results);
1670
1671 /* get the proc-level data for each proc in the job */
1672 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1673 "FETCHING PROC INFO FOR NSPACE %s NPROCS %u",
1674 ns->nspace, ns->nprocs);
1675 for (rank=0; rank < ns->nprocs; rank++) {
1676 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1677 "FETCHING PROC INFO FOR RANK %s", PMIX_RANK_PRINT(rank));
1678 val = NULL;
1679 rc = pmix_hash_fetch(ht, rank, NULL, &val);
1680 if (PMIX_SUCCESS != rc && PMIX_ERR_PROC_ENTRY_NOT_FOUND != rc) {
1681 PMIX_ERROR_LOG(rc);
1682 if (NULL != val) {
1683 PMIX_VALUE_RELEASE(val);
1684 }
1685 return rc;
1686 }
1687 PMIX_CONSTRUCT(&buf, pmix_buffer_t);
1688 PMIX_BFROPS_PACK(rc, peer, &buf, &rank, 1, PMIX_PROC_RANK);
1689
1690 if (NULL != val) {
1691 info = (pmix_info_t*)val->data.darray->array;
1692 ninfo = val->data.darray->size;
1693 for (n=0; n < ninfo; n++) {
1694 kv.key = info[n].key;
1695 kv.value = &info[n].value;
1696 PMIX_BFROPS_PACK(rc, peer, &buf, &kv, 1, PMIX_KVAL);
1697 }
1698 }
1699 kv.key = PMIX_PROC_BLOB;
1700 kv.value = &blob;
1701 blob.type = PMIX_BYTE_OBJECT;
1702 PMIX_UNLOAD_BUFFER(&buf, blob.data.bo.bytes, blob.data.bo.size);
1703 PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
1704 PMIX_VALUE_DESTRUCT(&blob);
1705 PMIX_DESTRUCT(&buf);
1706
1707 if (NULL != val) {
1708 PMIX_VALUE_RELEASE(val);
1709 }
1710 }
1711 return rc;
1712 }
1713
1714 /* the purpose of this function is to pack the job-level
1715 * info stored in the pmix_namespace_t into a buffer and send
1716 * it to the given client */
hash_register_job_info(struct pmix_peer_t * pr,pmix_buffer_t * reply)1717 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
1718 pmix_buffer_t *reply)
1719 {
1720 pmix_peer_t *peer = (pmix_peer_t*)pr;
1721 pmix_namespace_t *ns = peer->nptr;
1722 char *msg;
1723 pmix_status_t rc;
1724 pmix_job_t *trk;
1725
1726 if (!PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
1727 !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
1728 /* this function is only available on servers */
1729 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
1730 return PMIX_ERR_NOT_SUPPORTED;
1731 }
1732
1733 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1734 "[%s:%d] gds:hash:register_job_info for peer [%s:%d]",
1735 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1736 peer->info->pname.nspace, peer->info->pname.rank);
1737
1738 /* first see if we already have processed this data
1739 * for another peer in this nspace so we don't waste
1740 * time doing it again */
1741 if (NULL != ns->jobbkt) {
1742 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1743 "[%s:%d] gds:hash:register_job_info copying prepacked payload",
1744 pmix_globals.myid.nspace, pmix_globals.myid.rank);
1745 /* we have packed this before - can just deliver it */
1746 PMIX_BFROPS_COPY_PAYLOAD(rc, peer, reply, ns->jobbkt);
1747 if (PMIX_SUCCESS != rc) {
1748 PMIX_ERROR_LOG(rc);
1749 }
1750 /* now see if we have delivered it to all our local
1751 * clients for this nspace */
1752 if (!PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer) && ns->ndelivered == ns->nlocalprocs) {
1753 /* we have, so let's get rid of the packed
1754 * copy of the data */
1755 PMIX_RELEASE(ns->jobbkt);
1756 ns->jobbkt = NULL;
1757 }
1758 return rc;
1759 }
1760
1761 /* setup a tracker for this nspace as we will likely
1762 * need it again */
1763 trk = get_tracker(ns->nspace, true);
1764 if (NULL == trk) {
1765 return PMIX_ERR_NOMEM;
1766 }
1767
1768 /* the job info for the specified nspace has
1769 * been given to us in the info array - pack
1770 * them for delivery */
1771 /* pack the name of the nspace */
1772 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1773 "[%s:%d] gds:hash:register_job_info packing new payload",
1774 pmix_globals.myid.nspace, pmix_globals.myid.rank);
1775 msg = ns->nspace;
1776 PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING);
1777 if (PMIX_SUCCESS != rc) {
1778 PMIX_ERROR_LOG(rc);
1779 return rc;
1780 }
1781
1782 rc = register_info(peer, ns, reply);
1783 if (PMIX_SUCCESS == rc) {
1784 /* if we have more than one local client for this nspace,
1785 * save this packed object so we don't do this again */
1786 if (PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer) || 1 < ns->nlocalprocs) {
1787 PMIX_RETAIN(reply);
1788 ns->jobbkt = reply;
1789 }
1790 } else {
1791 PMIX_ERROR_LOG(rc);
1792 }
1793
1794 return rc;
1795 }
1796
hash_store_job_info(const char * nspace,pmix_buffer_t * buf)1797 static pmix_status_t hash_store_job_info(const char *nspace,
1798 pmix_buffer_t *buf)
1799 {
1800 pmix_status_t rc = PMIX_SUCCESS;
1801 pmix_kval_t *kptr, *kp2, *kp3, kv;
1802 int32_t cnt;
1803 size_t nnodes, len;
1804 uint32_t i, j;
1805 char **procs = NULL;
1806 uint8_t *tmp;
1807 pmix_byte_object_t *bo;
1808 pmix_buffer_t buf2;
1809 int rank;
1810 pmix_job_t *trk;
1811 pmix_hash_table_t *ht;
1812 char **nodelist = NULL;
1813 pmix_nodeinfo_t *nd, *ndptr;
1814 pmix_namespace_t *ns, *nptr;
1815
1816 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1817 "[%s:%u] pmix:gds:hash store job info for nspace %s",
1818 pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace);
1819
1820 if (PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
1821 !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
1822 /* this function is NOT available on servers */
1823 PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
1824 return PMIX_ERR_NOT_SUPPORTED;
1825 }
1826
1827 /* check buf data */
1828 if ((NULL == buf) || (0 == buf->bytes_used)) {
1829 rc = PMIX_ERR_BAD_PARAM;
1830 PMIX_ERROR_LOG(rc);
1831 return rc;
1832 }
1833
1834 trk = get_tracker(nspace, true);
1835 if (NULL == trk) {
1836 return PMIX_ERR_NOMEM;
1837 }
1838 ht = &trk->internal;
1839
1840 /* retrieve the nspace pointer */
1841 nptr = NULL;
1842 PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
1843 if (0 == strcmp(ns->nspace, nspace)) {
1844 nptr = ns;
1845 break;
1846 }
1847 }
1848 if (NULL == nptr) {
1849 /* only can happen if we are out of mem */
1850 return PMIX_ERR_NOMEM;
1851 }
1852
1853 cnt = 1;
1854 kptr = PMIX_NEW(pmix_kval_t);
1855 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1856 buf, kptr, &cnt, PMIX_KVAL);
1857 while (PMIX_SUCCESS == rc) {
1858 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1859 "[%s:%u] pmix:gds:hash store job info working key %s",
1860 pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
1861 if (PMIX_CHECK_KEY(kptr, PMIX_PROC_BLOB)) {
1862 bo = &(kptr->value->data.bo);
1863 PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
1864 PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
1865 /* start by unpacking the rank */
1866 cnt = 1;
1867 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1868 &buf2, &rank, &cnt, PMIX_PROC_RANK);
1869 if (PMIX_SUCCESS != rc) {
1870 PMIX_ERROR_LOG(rc);
1871 PMIX_RELEASE(kptr);
1872 PMIX_DESTRUCT(&buf2);
1873 return rc;
1874 }
1875 /* unpack the blob and save the values for this rank */
1876 cnt = 1;
1877 kp2 = PMIX_NEW(pmix_kval_t);
1878 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1879 &buf2, kp2, &cnt, PMIX_KVAL);
1880 while (PMIX_SUCCESS == rc) {
1881 /* if the value contains a string that is longer than the
1882 * limit, then compress it */
1883 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
1884 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
1885 if (NULL == tmp) {
1886 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1887 rc = PMIX_ERR_NOMEM;
1888 return rc;
1889 }
1890 kp2->value->type = PMIX_COMPRESSED_STRING;
1891 free(kp2->value->data.string);
1892 kp2->value->data.bo.bytes = (char*)tmp;
1893 kp2->value->data.bo.size = len;
1894 }
1895 }
1896 /* this is data provided by a job-level exchange, so store it
1897 * in the job-level data hash_table */
1898 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1899 PMIX_ERROR_LOG(rc);
1900 PMIX_RELEASE(kp2);
1901 PMIX_RELEASE(kptr);
1902 PMIX_DESTRUCT(&buf2);
1903 return rc;
1904 }
1905 PMIX_RELEASE(kp2); // maintain accounting
1906 cnt = 1;
1907 kp2 = PMIX_NEW(pmix_kval_t);
1908 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1909 &buf2, kp2, &cnt, PMIX_KVAL);
1910 }
1911 /* cleanup */
1912 PMIX_DESTRUCT(&buf2); // releases the original kptr data
1913 PMIX_RELEASE(kp2);
1914 } else if (PMIX_CHECK_KEY(kptr, PMIX_MAP_BLOB)) {
1915 /* transfer the byte object for unpacking */
1916 bo = &(kptr->value->data.bo);
1917 PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
1918 PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
1919 /* start by unpacking the number of nodes */
1920 cnt = 1;
1921 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1922 &buf2, &nnodes, &cnt, PMIX_SIZE);
1923 if (PMIX_SUCCESS != rc) {
1924 PMIX_ERROR_LOG(rc);
1925 PMIX_RELEASE(kptr);
1926 PMIX_DESTRUCT(&buf2);
1927 return rc;
1928 }
1929 for (i=0; i < nnodes; i++) {
1930 /* unpack the list of procs on each node */
1931 cnt = 1;
1932 PMIX_CONSTRUCT(&kv, pmix_kval_t);
1933 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1934 &buf2, &kv, &cnt, PMIX_KVAL);
1935 if (PMIX_SUCCESS != rc) {
1936 PMIX_ERROR_LOG(rc);
1937 PMIX_RELEASE(kptr);
1938 PMIX_DESTRUCT(&buf2);
1939 PMIX_DESTRUCT(&kv);
1940 return rc;
1941 }
1942 /* track the nodes in this nspace */
1943 pmix_argv_append_nosize(&nodelist, kv.key);
1944 /* check and see if we already have this node */
1945 nd = NULL;
1946 PMIX_LIST_FOREACH(ndptr, &trk->nodeinfo, pmix_nodeinfo_t) {
1947 if (check_nodename(ndptr, kv.key)) {
1948 /* we assume that the data is updating the current
1949 * values */
1950 nd = ndptr;
1951 break;
1952 }
1953 }
1954 if (NULL == nd) {
1955 nd = PMIX_NEW(pmix_nodeinfo_t);
1956 nd->hostname = strdup(kv.key);
1957 pmix_list_append(&trk->nodeinfo, &nd->super);
1958 }
1959 /* save the list of peers for this node */
1960 kp2 = PMIX_NEW(pmix_kval_t);
1961 if (NULL == kp2) {
1962 PMIX_RELEASE(kptr);
1963 return PMIX_ERR_NOMEM;
1964 }
1965 kp2->key = strdup(PMIX_LOCAL_PEERS);
1966 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1967 if (NULL == kp2->value) {
1968 PMIX_RELEASE(kp2);
1969 PMIX_RELEASE(kptr);
1970 return PMIX_ERR_NOMEM;
1971 }
1972 kp2->value->type = PMIX_STRING;
1973 kp2->value->data.string = strdup(kv.value->data.string);
1974 /* ensure this item only appears once on the list */
1975 PMIX_LIST_FOREACH(kp3, &nd->info, pmix_kval_t) {
1976 if (PMIX_CHECK_KEY(kp3, kp2->key)) {
1977 pmix_list_remove_item(&nd->info, &kp3->super);
1978 PMIX_RELEASE(kp3);
1979 break;
1980 }
1981 }
1982 pmix_list_append(&nd->info, &kp2->super);
1983 /* split the list of procs so we can store their
1984 * individual location data */
1985 procs = pmix_argv_split(kv.value->data.string, ',');
1986 for (j=0; NULL != procs[j]; j++) {
1987 /* store the hostname for each proc - again, this is
1988 * data obtained via a job-level exchange, so store it
1989 * in the job-level data hash_table */
1990 kp2 = PMIX_NEW(pmix_kval_t);
1991 kp2->key = strdup(PMIX_HOSTNAME);
1992 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1993 kp2->value->type = PMIX_STRING;
1994 kp2->value->data.string = strdup(kv.key);
1995 rank = strtol(procs[j], NULL, 10);
1996 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1997 PMIX_ERROR_LOG(rc);
1998 PMIX_RELEASE(kp2);
1999 PMIX_RELEASE(kptr);
2000 PMIX_DESTRUCT(&kv);
2001 PMIX_DESTRUCT(&buf2);
2002 pmix_argv_free(procs);
2003 return rc;
2004 }
2005 PMIX_RELEASE(kp2); // maintain acctg
2006 }
2007 pmix_argv_free(procs);
2008 PMIX_DESTRUCT(&kv);
2009 }
2010 if (NULL != nodelist) {
2011 /* store the comma-delimited list of nodes hosting
2012 * procs in this nspace */
2013 kp2 = PMIX_NEW(pmix_kval_t);
2014 kp2->key = strdup(PMIX_NODE_LIST);
2015 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2016 kp2->value->type = PMIX_STRING;
2017 kp2->value->data.string = pmix_argv_join(nodelist, ',');
2018 pmix_argv_free(nodelist);
2019 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
2020 PMIX_ERROR_LOG(rc);
2021 PMIX_RELEASE(kp2);
2022 PMIX_RELEASE(kptr);
2023 PMIX_DESTRUCT(&kv);
2024 PMIX_DESTRUCT(&buf2);
2025 return rc;
2026 }
2027 PMIX_RELEASE(kp2); // maintain acctg
2028 }
2029 /* cleanup */
2030 PMIX_DESTRUCT(&buf2);
2031 } else if (PMIX_CHECK_KEY(kptr, PMIX_APP_INFO_ARRAY)) {
2032 if (PMIX_SUCCESS != (rc = process_app_array(kptr->value, trk))) {
2033 PMIX_ERROR_LOG(rc);
2034 PMIX_RELEASE(kptr);
2035 return rc;
2036 }
2037 } else if (PMIX_CHECK_KEY(kptr, PMIX_NODE_INFO_ARRAY)) {
2038 if (PMIX_SUCCESS != (rc = process_node_array(kptr->value, &trk->nodeinfo))) {
2039 PMIX_ERROR_LOG(rc);
2040 PMIX_RELEASE(kptr);
2041 return rc;
2042 }
2043 } else {
2044 /* if the value contains a string that is longer than the
2045 * limit, then compress it */
2046 if (PMIX_STRING_SIZE_CHECK(kptr->value)) {
2047 if (pmix_compress.compress_string(kptr->value->data.string, &tmp, &len)) {
2048 if (NULL == tmp) {
2049 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2050 rc = PMIX_ERR_NOMEM;
2051 PMIX_RELEASE(kptr);
2052 return rc;
2053 }
2054 kptr->value->type = PMIX_COMPRESSED_STRING;
2055 free(kptr->value->data.string);
2056 kptr->value->data.bo.bytes = (char*)tmp;
2057 kptr->value->data.bo.size = len;
2058 }
2059 }
2060 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2061 "[%s:%u] pmix:gds:hash store job info storing key %s for WILDCARD rank",
2062 pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
2063 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kptr))) {
2064 PMIX_ERROR_LOG(rc);
2065 PMIX_RELEASE(kptr);
2066 return rc;
2067 }
2068 /* if this is the job size, then store it in
2069 * the nptr tracker */
2070 if (0 == nptr->nprocs && PMIX_CHECK_KEY(kptr, PMIX_JOB_SIZE)) {
2071 nptr->nprocs = kptr->value->data.uint32;
2072 }
2073 }
2074 PMIX_RELEASE(kptr);
2075 kptr = PMIX_NEW(pmix_kval_t);
2076 cnt = 1;
2077 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
2078 buf, kptr, &cnt, PMIX_KVAL);
2079 }
2080 /* need to release the leftover kptr */
2081 PMIX_RELEASE(kptr);
2082
2083 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2084 PMIX_ERROR_LOG(rc);
2085 } else {
2086 rc = PMIX_SUCCESS;
2087 }
2088 return rc;
2089 }
2090
hash_store(const pmix_proc_t * proc,pmix_scope_t scope,pmix_kval_t * kv)2091 static pmix_status_t hash_store(const pmix_proc_t *proc,
2092 pmix_scope_t scope,
2093 pmix_kval_t *kv)
2094 {
2095 pmix_job_t *trk;
2096 pmix_status_t rc;
2097 pmix_kval_t *kp;
2098 pmix_rank_t rank;
2099 size_t j, size, len;
2100 pmix_info_t *iptr;
2101 uint8_t *tmp;
2102
2103 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2104 "%s gds:hash:hash_store for proc %s key %s type %s scope %s",
2105 PMIX_NAME_PRINT(&pmix_globals.myid),
2106 PMIX_NAME_PRINT(proc), kv->key,
2107 PMIx_Data_type_string(kv->value->type), PMIx_Scope_string(scope));
2108
2109 if (NULL == kv->key) {
2110 return PMIX_ERR_BAD_PARAM;
2111 }
2112
2113 /* find the hash table for this nspace */
2114 trk = get_tracker(proc->nspace, true);
2115 if (NULL == trk) {
2116 return PMIX_ERR_NOMEM;
2117 }
2118
2119 /* see if the proc is me - cannot use CHECK_PROCID as
2120 * we don't want rank=wildcard to match */
2121 if (proc->rank == pmix_globals.myid.rank &&
2122 PMIX_CHECK_NSPACE(proc->nspace, pmix_globals.myid.nspace)) {
2123 if (PMIX_INTERNAL != scope) {
2124 /* always maintain a copy of my own info here to simplify
2125 * later retrieval */
2126 kp = PMIX_NEW(pmix_kval_t);
2127 if (NULL == kp) {
2128 return PMIX_ERR_NOMEM;
2129 }
2130 kp->key = strdup(kv->key);
2131 kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2132 if (NULL == kp->value) {
2133 PMIX_RELEASE(kp);
2134 return PMIX_ERR_NOMEM;
2135 }
2136 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
2137 if (PMIX_SUCCESS != rc) {
2138 PMIX_RELEASE(kp);
2139 return rc;
2140 }
2141 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kp))) {
2142 PMIX_ERROR_LOG(rc);
2143 PMIX_RELEASE(kp);
2144 return rc;
2145 }
2146 PMIX_RELEASE(kp); // maintain accounting
2147 }
2148 }
2149
2150 /* if the number of procs for the nspace object is new, then update it */
2151 if (0 == trk->nptr->nprocs && PMIX_CHECK_KEY(kv, PMIX_JOB_SIZE)) {
2152 trk->nptr->nprocs = kv->value->data.uint32;
2153 }
2154
2155 /* store it in the corresponding hash table */
2156 if (PMIX_INTERNAL == scope) {
2157 /* if this is proc data, then we have to expand it and
2158 * store the values on that rank */
2159 if (PMIX_CHECK_KEY(kv, PMIX_PROC_DATA)) {
2160 /* an array of data pertaining to a specific proc */
2161 if (PMIX_DATA_ARRAY != kv->value->type) {
2162 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
2163 return PMIX_ERR_TYPE_MISMATCH;
2164 }
2165 size = kv->value->data.darray->size;
2166 iptr = (pmix_info_t*)kv->value->data.darray->array;
2167 /* first element of the array must be the rank */
2168 if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
2169 PMIX_PROC_RANK != iptr[0].value.type) {
2170 rc = PMIX_ERR_TYPE_MISMATCH;
2171 PMIX_ERROR_LOG(rc);
2172 return rc;
2173 }
2174 rank = iptr[0].value.data.rank;
2175 /* cycle thru the values for this rank and store them */
2176 for (j=1; j < size; j++) {
2177 kp = PMIX_NEW(pmix_kval_t);
2178 if (NULL == kp) {
2179 rc = PMIX_ERR_NOMEM;
2180 return rc;
2181 }
2182 kp->key = strdup(iptr[j].key);
2183 PMIX_VALUE_XFER(rc, kp->value, &iptr[j].value);
2184 if (PMIX_SUCCESS != rc) {
2185 PMIX_ERROR_LOG(rc);
2186 PMIX_RELEASE(kp);
2187 return rc;
2188 }
2189 /* if the value contains a string that is longer than the
2190 * limit, then compress it */
2191 if (PMIX_STRING_SIZE_CHECK(kp->value)) {
2192 if (pmix_compress.compress_string(kp->value->data.string, &tmp, &len)) {
2193 if (NULL == tmp) {
2194 PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2195 rc = PMIX_ERR_NOMEM;
2196 return rc;
2197 }
2198 kp->value->type = PMIX_COMPRESSED_STRING;
2199 free(kp->value->data.string);
2200 kp->value->data.bo.bytes = (char*)tmp;
2201 kp->value->data.bo.size = len;
2202 }
2203 }
2204 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2205 "%s gds:hash:STORE data for nspace %s rank %u: key %s",
2206 PMIX_NAME_PRINT(&pmix_globals.myid),
2207 trk->ns, rank, kp->key);
2208 /* store it in the hash_table */
2209 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, rank, kp))) {
2210 PMIX_ERROR_LOG(rc);
2211 PMIX_RELEASE(kp);
2212 return rc;
2213 }
2214 PMIX_RELEASE(kp); // maintain acctg
2215 }
2216 return PMIX_SUCCESS;
2217 }
2218 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kv))) {
2219 PMIX_ERROR_LOG(rc);
2220 return rc;
2221 }
2222 } else if (PMIX_REMOTE == scope) {
2223 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
2224 PMIX_ERROR_LOG(rc);
2225 return rc;
2226 }
2227 } else if (PMIX_LOCAL == scope) {
2228 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kv))) {
2229 PMIX_ERROR_LOG(rc);
2230 return rc;
2231 }
2232 } else if (PMIX_GLOBAL == scope) {
2233 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
2234 PMIX_ERROR_LOG(rc);
2235 return rc;
2236 }
2237 /* a pmix_kval_t can only be on one list at a time, so we
2238 * have to duplicate it here */
2239 kp = PMIX_NEW(pmix_kval_t);
2240 if (NULL == kp) {
2241 return PMIX_ERR_NOMEM;
2242 }
2243 kp->key = strdup(kv->key);
2244 kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2245 if (NULL == kp->value) {
2246 PMIX_RELEASE(kp);
2247 return PMIX_ERR_NOMEM;
2248 }
2249 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
2250 if (PMIX_SUCCESS != rc) {
2251 PMIX_ERROR_LOG(rc);
2252 PMIX_RELEASE(kp);
2253 return rc;
2254 }
2255 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kp))) {
2256 PMIX_ERROR_LOG(rc);
2257 PMIX_RELEASE(kp);
2258 return rc;
2259 }
2260 PMIX_RELEASE(kp); // maintain accounting
2261 } else {
2262 return PMIX_ERR_BAD_PARAM;
2263 }
2264
2265 return PMIX_SUCCESS;
2266 }
2267
2268 /* this function is only called by the PMIx server when its
2269 * host has received data from some other peer. It therefore
2270 * always contains data solely from remote procs, and we
2271 * shall store it accordingly */
hash_store_modex(struct pmix_namespace_t * nspace,pmix_buffer_t * buf,void * cbdata)2272 static pmix_status_t hash_store_modex(struct pmix_namespace_t *nspace,
2273 pmix_buffer_t *buf,
2274 void *cbdata) {
2275 return pmix_gds_base_store_modex(nspace, buf, NULL,
2276 _hash_store_modex, cbdata);
2277 }
2278
_hash_store_modex(pmix_gds_base_ctx_t ctx,pmix_proc_t * proc,pmix_gds_modex_key_fmt_t key_fmt,char ** kmap,pmix_buffer_t * pbkt)2279 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
2280 pmix_proc_t *proc,
2281 pmix_gds_modex_key_fmt_t key_fmt,
2282 char **kmap,
2283 pmix_buffer_t *pbkt)
2284 {
2285 pmix_job_t *trk;
2286 pmix_status_t rc = PMIX_SUCCESS;
2287 pmix_kval_t *kv;
2288
2289 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2290 "[%s:%d] gds:hash:store_modex for nspace %s",
2291 pmix_globals.myid.nspace, pmix_globals.myid.rank,
2292 proc->nspace);
2293
2294 /* find the hash table for this nspace */
2295 trk = get_tracker(proc->nspace, true);
2296 if (NULL == trk) {
2297 return PMIX_ERR_NOMEM;
2298 }
2299
2300 /* this is data returned via the PMIx_Fence call when
2301 * data collection was requested, so it only contains
2302 * REMOTE/GLOBAL data. The byte object contains
2303 * the rank followed by pmix_kval_t's. The list of callbacks
2304 * contains all local participants. */
2305
2306 /* unpack the remaining values until we hit the end of the buffer */
2307 kv = PMIX_NEW(pmix_kval_t);
2308 rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2309
2310 while (PMIX_SUCCESS == rc) {
2311 if (PMIX_RANK_UNDEF == proc->rank) {
2312 /* if the rank is undefined, then we store it on the
2313 * remote table of rank=0 as we know that rank must
2314 * always exist */
2315 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, 0, kv))) {
2316 PMIX_ERROR_LOG(rc);
2317 return rc;
2318 }
2319 } else {
2320 /* store this in the hash table */
2321 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
2322 PMIX_ERROR_LOG(rc);
2323 return rc;
2324 }
2325 }
2326 PMIX_RELEASE(kv); // maintain accounting as the hash increments the ref count
2327 /* continue along */
2328 kv = PMIX_NEW(pmix_kval_t);
2329 rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2330 }
2331 PMIX_RELEASE(kv); // maintain accounting
2332 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2333 PMIX_ERROR_LOG(rc);
2334 } else {
2335 rc = PMIX_SUCCESS;
2336 }
2337 return rc;
2338 }
2339
2340
dohash(pmix_hash_table_t * ht,const char * key,pmix_rank_t rank,int skip_genvals,pmix_list_t * kvs)2341 static pmix_status_t dohash(pmix_hash_table_t *ht,
2342 const char *key,
2343 pmix_rank_t rank,
2344 int skip_genvals,
2345 pmix_list_t *kvs)
2346 {
2347 pmix_status_t rc;
2348 pmix_value_t *val;
2349 pmix_kval_t *kv, *k2;
2350 pmix_info_t *info;
2351 size_t n, ninfo;
2352 bool found;
2353
2354 rc = pmix_hash_fetch(ht, rank, key, &val);
2355 if (PMIX_SUCCESS == rc) {
2356 /* if the key was NULL, then all found keys will be
2357 * returned as a pmix_data_array_t in the value */
2358 if (NULL == key) {
2359 if (NULL == val->data.darray ||
2360 PMIX_INFO != val->data.darray->type ||
2361 0 == val->data.darray->size) {
2362 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
2363 PMIX_RELEASE(val);
2364 return PMIX_ERR_NOT_FOUND;
2365 }
2366 /* if they want the value returned in its array form,
2367 * then we are done */
2368 if (2 == skip_genvals) {
2369 kv = PMIX_NEW(pmix_kval_t);
2370 if (NULL == kv) {
2371 PMIX_VALUE_RELEASE(val);
2372 return PMIX_ERR_NOMEM;
2373 }
2374 kv->value = val;
2375 pmix_list_append(kvs, &kv->super);
2376 return PMIX_SUCCESS;
2377 }
2378 info = (pmix_info_t*)val->data.darray->array;
2379 ninfo = val->data.darray->size;
2380 for (n=0; n < ninfo; n++) {
2381 /* if the rank is UNDEF, then we don't want
2382 * anything that starts with "pmix" */
2383 if (1 == skip_genvals &&
2384 0 == strncmp(info[n].key, "pmix", 4)) {
2385 continue;
2386 }
2387 /* see if we already have this on the list */
2388 found = false;
2389 PMIX_LIST_FOREACH(k2, kvs, pmix_kval_t) {
2390 if (PMIX_CHECK_KEY(&info[n], k2->key)) {
2391 found = true;
2392 break;
2393 }
2394 }
2395 if (found) {
2396 continue;
2397 }
2398 kv = PMIX_NEW(pmix_kval_t);
2399 if (NULL == kv) {
2400 PMIX_VALUE_RELEASE(val);
2401 return PMIX_ERR_NOMEM;
2402 }
2403 kv->key = strdup(info[n].key);
2404 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2405 if (NULL == kv->value) {
2406 PMIX_VALUE_RELEASE(val);
2407 PMIX_RELEASE(kv);
2408 return PMIX_ERR_NOMEM;
2409 }
2410 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
2411 kv->value, &info[n].value);
2412 if (PMIX_SUCCESS != rc) {
2413 PMIX_ERROR_LOG(rc);
2414 PMIX_VALUE_RELEASE(val);
2415 PMIX_RELEASE(kv);
2416 return rc;
2417 }
2418 pmix_list_append(kvs, &kv->super);
2419 }
2420 PMIX_VALUE_RELEASE(val);
2421 } else {
2422 kv = PMIX_NEW(pmix_kval_t);
2423 if (NULL == kv) {
2424 PMIX_VALUE_RELEASE(val);
2425 return PMIX_ERR_NOMEM;
2426 }
2427 kv->key = strdup(key);
2428 kv->value = val;
2429 pmix_list_append(kvs, &kv->super);
2430 }
2431 }
2432 return rc;
2433 }
2434
fetch_nodeinfo(const char * key,pmix_list_t * tgt,pmix_info_t * info,size_t ninfo,pmix_list_t * kvs)2435 static pmix_status_t fetch_nodeinfo(const char *key, pmix_list_t *tgt,
2436 pmix_info_t *info, size_t ninfo,
2437 pmix_list_t *kvs)
2438 {
2439 size_t n, nds;
2440 pmix_status_t rc;
2441 uint32_t nid=0;
2442 char *hostname = NULL;
2443 bool found = false;
2444 pmix_nodeinfo_t *nd, *ndptr;
2445 pmix_kval_t *kv, *kp2;
2446 pmix_data_array_t *darray;
2447 pmix_info_t *iptr;
2448
2449 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2450 "FETCHING NODE INFO");
2451
2452 /* scan for the nodeID or hostname to identify
2453 * which node they are asking about */
2454 for (n=0; n < ninfo; n++) {
2455 if (PMIX_CHECK_KEY(&info[n], PMIX_NODEID)) {
2456 PMIX_VALUE_GET_NUMBER(rc, &info[n].value, nid, uint32_t);
2457 if (PMIX_SUCCESS != rc) {
2458 return rc;
2459 }
2460 found = true;
2461 break;
2462 } else if (PMIX_CHECK_KEY(&info[n], PMIX_HOSTNAME)) {
2463 hostname = info[n].value.data.string;
2464 found = true;
2465 break;
2466 }
2467 }
2468 if (!found) {
2469 /* if the key is NULL, then they want all the info from
2470 * all nodes */
2471 if (NULL == key) {
2472 PMIX_LIST_FOREACH(nd, tgt, pmix_nodeinfo_t) {
2473 kv = PMIX_NEW(pmix_kval_t);
2474 kv->key = strdup(PMIX_NODE_INFO_ARRAY);
2475 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2476 if (NULL == kv->value) {
2477 PMIX_RELEASE(kv);
2478 return PMIX_ERR_NOMEM;
2479 }
2480 nds = pmix_list_get_size(&nd->info);
2481 if (NULL != nd->hostname) {
2482 ++nds;
2483 }
2484 if (UINT32_MAX != nd->nodeid) {
2485 ++nds;
2486 }
2487 PMIX_DATA_ARRAY_CREATE(darray, nds, PMIX_INFO);
2488 if (NULL == darray) {
2489 PMIX_RELEASE(kv);
2490 return PMIX_ERR_NOMEM;
2491 }
2492 iptr = (pmix_info_t*)darray->array;
2493 n = 0;
2494 if (NULL != nd->hostname) {
2495 PMIX_INFO_LOAD(&iptr[n], PMIX_HOSTNAME, nd->hostname, PMIX_STRING);
2496 ++n;
2497 }
2498 if (UINT32_MAX != nd->nodeid) {
2499 PMIX_INFO_LOAD(&iptr[n], PMIX_NODEID, &nd->nodeid, PMIX_UINT32);
2500 ++n;
2501 }
2502 PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
2503 pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
2504 "%s gds:hash:fetch_nodearray adding key %s",
2505 PMIX_NAME_PRINT(&pmix_globals.myid), kp2->key);
2506 PMIX_LOAD_KEY(iptr[n].key, kp2->key);
2507 rc = pmix_value_xfer(&iptr[n].value, kp2->value);
2508 if (PMIX_SUCCESS != rc) {
2509 PMIX_ERROR_LOG(rc);
2510 PMIX_DATA_ARRAY_FREE(darray);
2511 PMIX_RELEASE(kv);
2512 return rc;
2513 }
2514 ++n;
2515 }
2516 kv->value->data.darray = darray;
2517 kv->value->type = PMIX_DATA_ARRAY;
2518 pmix_list_append(kvs, &kv->super);
2519 }
2520 return PMIX_SUCCESS;
2521
2522 }
2523 /* assume they want it from this node */
2524 hostname = pmix_globals.hostname;
2525 }
2526
2527 /* scan the list of nodes to find the matching entry */
2528 nd = NULL;
2529 PMIX_LIST_FOREACH(ndptr, tgt, pmix_nodeinfo_t) {
2530 if (NULL != hostname) {
2531 if (check_nodename(ndptr, hostname)) {
2532 nd = ndptr;
2533 break;
2534 }
2535 } else if (nid == ndptr->nodeid) {
2536 nd = ndptr;
2537 break;
2538 }
2539 }
2540 if (NULL == nd) {
2541 if (!found) {
2542 /* they didn't specify, so it is optional */
2543 return PMIX_ERR_DATA_VALUE_NOT_FOUND;
2544 }
2545 return PMIX_ERR_NOT_FOUND;
2546 }
2547
2548 /* if they want it all, give it to them */
2549 if (NULL == key) {
2550 kv = PMIX_NEW(pmix_kval_t);
2551 kv->key = strdup(PMIX_NODE_INFO_ARRAY);
2552 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2553 if (NULL == kv->value) {
2554 PMIX_RELEASE(kv);
2555 return PMIX_ERR_NOMEM;
2556 }
2557 nds = pmix_list_get_size(&nd->info);
2558 if (NULL != nd->hostname) {
2559 ++nds;
2560 }
2561 if (UINT32_MAX != nd->nodeid) {
2562 ++nds;
2563 }
2564 PMIX_DATA_ARRAY_CREATE(darray, nds, PMIX_INFO);
2565 if (NULL == darray) {
2566 PMIX_RELEASE(kv);
2567 return PMIX_ERR_NOMEM;
2568 }
2569 iptr = (pmix_info_t*)darray->array;
2570 n = 0;
2571 if (NULL != nd->hostname) {
2572 PMIX_INFO_LOAD(&iptr[n], PMIX_HOSTNAME, nd->hostname, PMIX_STRING);
2573 ++n;
2574 }
2575 if (UINT32_MAX != nd->nodeid) {
2576 PMIX_INFO_LOAD(&iptr[n], PMIX_NODEID, &nd->nodeid, PMIX_UINT32);
2577 ++n;
2578 }
2579 PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
2580 pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
2581 "%s gds:hash:fetch_nodearray adding key %s",
2582 PMIX_NAME_PRINT(&pmix_globals.myid), kp2->key);
2583 PMIX_LOAD_KEY(iptr[n].key, kp2->key);
2584 rc = pmix_value_xfer(&iptr[n].value, kp2->value);
2585 if (PMIX_SUCCESS != rc) {
2586 PMIX_ERROR_LOG(rc);
2587 PMIX_DATA_ARRAY_FREE(darray);
2588 PMIX_RELEASE(kv);
2589 return rc;
2590 }
2591 ++n;
2592 }
2593 kv->value->data.darray = darray;
2594 kv->value->type = PMIX_DATA_ARRAY;
2595 pmix_list_append(kvs, &kv->super);
2596 return PMIX_SUCCESS;
2597 }
2598
2599 /* scan the info list of this node to find the key they want */
2600 rc = PMIX_ERR_NOT_FOUND;
2601 PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
2602 if (PMIX_CHECK_KEY(kp2, key)) {
2603 pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
2604 "%s gds:hash:fetch_nodearray adding key %s",
2605 PMIX_NAME_PRINT(&pmix_globals.myid), kp2->key);
2606 /* since they only asked for one key, return just that value */
2607 kv = PMIX_NEW(pmix_kval_t);
2608 kv->key = strdup(kp2->key);
2609 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2610 if (NULL == kv->value) {
2611 PMIX_RELEASE(kv);
2612 return PMIX_ERR_NOMEM;
2613 }
2614 rc = pmix_value_xfer(kv->value, kp2->value);
2615 if (PMIX_SUCCESS != rc) {
2616 PMIX_ERROR_LOG(rc);
2617 PMIX_RELEASE(kv);
2618 return rc;
2619 }
2620 pmix_list_append(kvs, &kv->super);
2621 break;
2622 }
2623 }
2624 return rc;
2625 }
2626
fetch_appinfo(const char * key,pmix_list_t * tgt,pmix_info_t * info,size_t ninfo,pmix_list_t * kvs)2627 static pmix_status_t fetch_appinfo(const char *key, pmix_list_t *tgt,
2628 pmix_info_t *info, size_t ninfo,
2629 pmix_list_t *kvs)
2630 {
2631 size_t n, nds;
2632 pmix_status_t rc;
2633 uint32_t appnum;
2634 bool found = false;
2635 pmix_apptrkr_t *app, *apptr;
2636 pmix_kval_t *kv, *kp2;
2637 pmix_data_array_t *darray;
2638
2639 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2640 "FETCHING APP INFO WITH %d APPS",
2641 (int)pmix_list_get_size(tgt));
2642
2643 /* scan for the appnum to identify
2644 * which app they are asking about */
2645 for (n=0; n < ninfo; n++) {
2646 if (PMIX_CHECK_KEY(&info[n], PMIX_APPNUM)) {
2647 PMIX_VALUE_GET_NUMBER(rc, &info[n].value, appnum, uint32_t);
2648 if (PMIX_SUCCESS != rc) {
2649 return rc;
2650 }
2651 found = true;
2652 break;
2653 }
2654 }
2655 if (!found) {
2656 /* if the key is NULL, then they want all the info from
2657 * all apps */
2658 if (NULL == key) {
2659 PMIX_LIST_FOREACH(apptr, tgt, pmix_apptrkr_t) {
2660 kv = PMIX_NEW(pmix_kval_t);
2661 kv->key = strdup(PMIX_APP_INFO_ARRAY);
2662 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2663 if (NULL == kv->value) {
2664 PMIX_RELEASE(kv);
2665 return PMIX_ERR_NOMEM;
2666 }
2667 nds = pmix_list_get_size(&apptr->appinfo) + 1;
2668 PMIX_DATA_ARRAY_CREATE(darray, nds, PMIX_INFO);
2669 if (NULL == darray) {
2670 PMIX_RELEASE(kv);
2671 return PMIX_ERR_NOMEM;
2672 }
2673 info = (pmix_info_t*)darray->array;
2674 n = 0;
2675 /* put in the appnum */
2676 PMIX_INFO_LOAD(&info[n], PMIX_APPNUM, &apptr->appnum, PMIX_UINT32);
2677 ++n;
2678 PMIX_LIST_FOREACH(kp2, &apptr->appinfo, pmix_kval_t) {
2679 PMIX_LOAD_KEY(info[n].key, kp2->key);
2680 rc = pmix_value_xfer(&info[n].value, kp2->value);
2681 if (PMIX_SUCCESS != rc) {
2682 PMIX_ERROR_LOG(rc);
2683 PMIX_DATA_ARRAY_FREE(darray);
2684 PMIX_RELEASE(kv);
2685 return rc;
2686 }
2687 ++n;
2688 }
2689 kv->value->data.darray = darray;
2690 kv->value->type = PMIX_DATA_ARRAY;
2691 pmix_list_append(kvs, &kv->super);
2692 }
2693 return PMIX_SUCCESS;
2694 }
2695 /* assume they are asking for our app */
2696 appnum = pmix_globals.appnum;
2697 }
2698
2699 /* scan the list of apps to find the matching entry */
2700 app = NULL;
2701 PMIX_LIST_FOREACH(apptr, tgt, pmix_apptrkr_t) {
2702 if (appnum == apptr->appnum) {
2703 app = apptr;
2704 break;
2705 }
2706 }
2707 if (NULL == app) {
2708 return PMIX_ERR_NOT_FOUND;
2709 }
2710
2711 /* see if they wanted to know something about a node that
2712 * is associated with this app */
2713 rc = fetch_nodeinfo(key, &app->nodeinfo, info, ninfo, kvs);
2714 if (PMIX_ERR_DATA_VALUE_NOT_FOUND != rc) {
2715 return rc;
2716 }
2717
2718 /* scan the info list of this app to generate the results */
2719 rc = PMIX_ERR_NOT_FOUND;
2720 PMIX_LIST_FOREACH(kv, &app->appinfo, pmix_kval_t) {
2721 if (NULL == key || PMIX_CHECK_KEY(kv, key)) {
2722 kp2 = PMIX_NEW(pmix_kval_t);
2723 kp2->key = strdup(kv->key);
2724 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2725 rc = pmix_value_xfer(kp2->value, kv->value);
2726 if (PMIX_SUCCESS != rc) {
2727 PMIX_ERROR_LOG(rc);
2728 PMIX_RELEASE(kp2);
2729 return rc;
2730 }
2731 pmix_list_append(kvs, &kp2->super);
2732 rc = PMIX_SUCCESS;
2733 if (NULL != key) {
2734 break;
2735 }
2736 }
2737 }
2738
2739 return rc;
2740 }
2741
hash_fetch(const pmix_proc_t * proc,pmix_scope_t scope,bool copy,const char * key,pmix_info_t qualifiers[],size_t nqual,pmix_list_t * kvs)2742 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
2743 pmix_scope_t scope, bool copy,
2744 const char *key,
2745 pmix_info_t qualifiers[], size_t nqual,
2746 pmix_list_t *kvs)
2747 {
2748 pmix_job_t *trk;
2749 pmix_status_t rc;
2750 pmix_kval_t *kv, *kvptr;
2751 pmix_info_t *info, *iptr;
2752 size_t m, n, ninfo, niptr;
2753 pmix_hash_table_t *ht;
2754 pmix_session_t *sptr;
2755 uint32_t sid;
2756 pmix_rank_t rnk;
2757 pmix_list_t rkvs;
2758 bool nodeinfo = false;
2759 bool appinfo = false;
2760
2761 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2762 "%s pmix:gds:hash fetch %s for proc %s on scope %s",
2763 PMIX_NAME_PRINT(&pmix_globals.myid),
2764 (NULL == key) ? "NULL" : key,
2765 PMIX_NAME_PRINT(proc), PMIx_Scope_string(scope));
2766
2767 /* if the rank is wildcard and the key is NULL, then
2768 * they are asking for a complete copy of the job-level
2769 * info for this nspace - retrieve it */
2770 if (NULL == key && PMIX_RANK_WILDCARD == proc->rank) {
2771 /* see if we have a tracker for this nspace - we will
2772 * if we already cached the job info for it */
2773 trk = get_tracker(proc->nspace, false);
2774 if (NULL == trk) {
2775 /* let the caller know */
2776 return PMIX_ERR_INVALID_NAMESPACE;
2777 }
2778 /* fetch all values from the hash table tied to rank=wildcard */
2779 dohash(&trk->internal, NULL, PMIX_RANK_WILDCARD, 0, kvs);
2780 /* also need to add any job-level info */
2781 PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
2782 kv = PMIX_NEW(pmix_kval_t);
2783 kv->key = strdup(kvptr->key);
2784 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2785 PMIX_VALUE_XFER(rc, kv->value, kvptr->value);
2786 if (PMIX_SUCCESS != rc) {
2787 PMIX_RELEASE(kv);
2788 return rc;
2789 }
2790 pmix_list_append(kvs, &kv->super);
2791 }
2792 /* collect the relevant node-level info */
2793 rc = fetch_nodeinfo(NULL, &trk->nodeinfo, qualifiers, nqual, kvs);
2794 if (PMIX_SUCCESS != rc) {
2795 return rc;
2796 }
2797 /* collect the relevant app-level info */
2798 rc = fetch_appinfo(NULL, &trk->apps, qualifiers, nqual, kvs);
2799 if (PMIX_SUCCESS != rc) {
2800 return rc;
2801 }
2802 /* finally, we need the job-level info for each rank in the job */
2803 for (rnk=0; rnk < trk->nptr->nprocs; rnk++) {
2804 PMIX_CONSTRUCT(&rkvs, pmix_list_t);
2805 rc = dohash(&trk->internal, NULL, rnk, 2, &rkvs);
2806 if (PMIX_ERR_NOMEM == rc) {
2807 return rc;
2808 }
2809 if (0 == pmix_list_get_size(&rkvs)) {
2810 PMIX_DESTRUCT(&rkvs);
2811 continue;
2812 }
2813 /* should only have one entry on list */
2814 kvptr = (pmix_kval_t*)pmix_list_get_first(&rkvs);
2815 /* we have to assemble the results into a proc blob
2816 * so the remote end will know what to do with it */
2817 info = (pmix_info_t*)kvptr->value->data.darray->array;
2818 ninfo = kvptr->value->data.darray->size;
2819 /* setup to return the result */
2820 kv = PMIX_NEW(pmix_kval_t);
2821 kv->key = strdup(PMIX_PROC_DATA);
2822 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2823 kv->value->type = PMIX_DATA_ARRAY;
2824 niptr = ninfo + 1; // need space for the rank
2825 PMIX_DATA_ARRAY_CREATE(kv->value->data.darray, niptr, PMIX_INFO);
2826 iptr = (pmix_info_t*)kv->value->data.darray->array;
2827 /* start with the rank */
2828 PMIX_INFO_LOAD(&iptr[0], PMIX_RANK, &rnk, PMIX_PROC_RANK);
2829 /* now transfer rest of data across */
2830 for (n=0; n < ninfo; n++) {
2831 PMIX_INFO_XFER(&iptr[n+1], &info[n]);
2832 }
2833 /* add to the results */
2834 pmix_list_append(kvs, &kv->super);
2835 /* release the search result */
2836 PMIX_LIST_DESTRUCT(&rkvs);
2837 }
2838 return PMIX_SUCCESS;
2839 }
2840
2841 /* see if they are asking for session, node, or app-level info */
2842 for (n=0; n < nqual; n++) {
2843 if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_SESSION_INFO)) {
2844 /* they must have provided a session ID */
2845 for (m=0; m < nqual; m++) {
2846 if (PMIX_CHECK_KEY(&qualifiers[m], PMIX_SESSION_ID)) {
2847 /* see if we have this session */
2848 PMIX_VALUE_GET_NUMBER(rc, &qualifiers[m].value, sid, uint32_t);
2849 if (PMIX_SUCCESS != rc) {
2850 /* didn't provide a correct value */
2851 PMIX_ERROR_LOG(rc);
2852 return rc;
2853 }
2854 PMIX_LIST_FOREACH(sptr, &mysessions, pmix_session_t) {
2855 if (sptr->session == sid) {
2856 /* see if they want info for a specific node */
2857 rc = fetch_nodeinfo(key, &sptr->nodeinfo, qualifiers, nqual, kvs);
2858 /* if they did, then we are done */
2859 if (PMIX_ERR_DATA_VALUE_NOT_FOUND != rc) {
2860 return rc;
2861 }
2862 /* check the session info */
2863 PMIX_LIST_FOREACH(kvptr, &sptr->sessioninfo, pmix_kval_t) {
2864 if (NULL == key || PMIX_CHECK_KEY(kvptr, key)) {
2865 kv = PMIX_NEW(pmix_kval_t);
2866 kv->key = strdup(kvptr->key);
2867 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2868 PMIX_VALUE_XFER(rc, kv->value, kvptr->value);
2869 if (PMIX_SUCCESS != rc) {
2870 PMIX_RELEASE(kv);
2871 return rc;
2872 }
2873 pmix_list_append(kvs, &kv->super);
2874 if (NULL != key) {
2875 /* we are done */
2876 return PMIX_SUCCESS;
2877 }
2878 }
2879 }
2880 }
2881 }
2882 }
2883 }
2884 /* if we get here, then the session wasn't found */
2885 return PMIX_ERR_NOT_FOUND;
2886 } else if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_NODE_INFO)) {
2887 nodeinfo = PMIX_INFO_TRUE(&qualifiers[n]);
2888 } else if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_APP_INFO)) {
2889 appinfo = PMIX_INFO_TRUE(&qualifiers[n]);
2890 }
2891 }
2892
2893 /* check for node/app keys in the absence of corresponding qualifier */
2894 if (NULL != key) {
2895 if (pmix_check_node_info(key)) {
2896 nodeinfo = true;
2897 } else if (pmix_check_app_info(key)) {
2898 appinfo = true;
2899 }
2900 }
2901
2902 /* find the hash table for this nspace */
2903 trk = get_tracker(proc->nspace, false);
2904 if (NULL == trk) {
2905 return PMIX_ERR_INVALID_NAMESPACE;
2906 }
2907
2908 if (nodeinfo) {
2909 rc = fetch_nodeinfo(key, &trk->nodeinfo, qualifiers, nqual, kvs);
2910 if (PMIX_SUCCESS != rc && PMIX_RANK_WILDCARD == proc->rank) {
2911 /* need to check internal as we might have an older peer */
2912 ht = &trk->internal;
2913 goto doover;
2914 }
2915 return rc;
2916 } else if (appinfo) {
2917 rc = fetch_appinfo(key, &trk->apps, qualifiers, nqual, kvs);
2918 if (PMIX_SUCCESS != rc && PMIX_RANK_WILDCARD == proc->rank) {
2919 /* need to check internal as we might have an older peer */
2920 ht = &trk->internal;
2921 goto doover;
2922 }
2923 return rc;
2924 }
2925
2926 /* fetch from the corresponding hash table - note that
2927 * we always provide a copy as we don't support
2928 * shared memory */
2929 if (PMIX_INTERNAL == scope ||
2930 PMIX_SCOPE_UNDEF == scope ||
2931 PMIX_GLOBAL == scope ||
2932 PMIX_RANK_WILDCARD == proc->rank) {
2933 ht = &trk->internal;
2934 } else if (PMIX_LOCAL == scope ||
2935 PMIX_GLOBAL == scope) {
2936 ht = &trk->local;
2937 } else if (PMIX_REMOTE == scope) {
2938 ht = &trk->remote;
2939 } else {
2940 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
2941 return PMIX_ERR_BAD_PARAM;
2942 }
2943
2944 doover:
2945 /* if rank=PMIX_RANK_UNDEF, then we need to search all
2946 * known ranks for this nspace as any one of them could
2947 * be the source */
2948 if (PMIX_RANK_UNDEF == proc->rank) {
2949 for (rnk=0; rnk < trk->nptr->nprocs; rnk++) {
2950 rc = dohash(ht, key, rnk, true, kvs);
2951 if (PMIX_ERR_NOMEM == rc) {
2952 return rc;
2953 }
2954 if (PMIX_SUCCESS == rc && NULL != key) {
2955 return rc;
2956 }
2957 }
2958 /* also need to check any job-level info */
2959 PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
2960 if (NULL == key || PMIX_CHECK_KEY(kvptr, key)) {
2961 kv = PMIX_NEW(pmix_kval_t);
2962 kv->key = strdup(kvptr->key);
2963 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2964 PMIX_VALUE_XFER(rc, kv->value, kvptr->value);
2965 if (PMIX_SUCCESS != rc) {
2966 PMIX_RELEASE(kv);
2967 return rc;
2968 }
2969 pmix_list_append(kvs, &kv->super);
2970 if (NULL != key) {
2971 break;
2972 }
2973 }
2974 }
2975 if (NULL == key) {
2976 /* and need to add all job info just in case that was
2977 * passed via a different GDS component */
2978 rc = dohash(&trk->internal, NULL, PMIX_RANK_WILDCARD, false, kvs);
2979 } else {
2980 rc = PMIX_ERR_NOT_FOUND;
2981 }
2982 } else {
2983 rc = dohash(ht, key, proc->rank, false, kvs);
2984 }
2985 if (PMIX_SUCCESS == rc) {
2986 if (PMIX_GLOBAL == scope) {
2987 if (ht == &trk->local) {
2988 /* need to do this again for the remote data */
2989 ht = &trk->remote;
2990 goto doover;
2991 } else if (ht == &trk->internal) {
2992 /* check local */
2993 ht = &trk->local;
2994 goto doover;
2995 }
2996 }
2997 } else {
2998 if (PMIX_GLOBAL == scope ||
2999 PMIX_SCOPE_UNDEF == scope) {
3000 if (ht == &trk->internal) {
3001 /* need to also try the local data */
3002 ht = &trk->local;
3003 goto doover;
3004 } else if (ht == &trk->local) {
3005 /* need to also try the remote data */
3006 ht = &trk->remote;
3007 goto doover;
3008 }
3009 }
3010 }
3011 if (0 == pmix_list_get_size(kvs)) {
3012 rc = PMIX_ERR_NOT_FOUND;
3013 }
3014
3015 return rc;
3016 }
3017
setup_fork(const pmix_proc_t * proc,char *** env)3018 static pmix_status_t setup_fork(const pmix_proc_t *proc, char ***env)
3019 {
3020 /* we don't need to add anything */
3021 return PMIX_SUCCESS;
3022 }
3023
nspace_add(const char * nspace,uint32_t nlocalprocs,pmix_info_t info[],size_t ninfo)3024 static pmix_status_t nspace_add(const char *nspace, uint32_t nlocalprocs,
3025 pmix_info_t info[], size_t ninfo)
3026 {
3027 /* we don't need to do anything here */
3028 return PMIX_SUCCESS;
3029 }
3030
nspace_del(const char * nspace)3031 static pmix_status_t nspace_del(const char *nspace)
3032 {
3033 pmix_job_t *t;
3034
3035 /* find the hash table for this nspace */
3036 PMIX_LIST_FOREACH(t, &myjobs, pmix_job_t) {
3037 if (0 == strcmp(nspace, t->ns)) {
3038 /* release it */
3039 pmix_list_remove_item(&myjobs, &t->super);
3040 PMIX_RELEASE(t);
3041 break;
3042 }
3043 }
3044 return PMIX_SUCCESS;
3045 }
3046
assemb_kvs_req(const pmix_proc_t * proc,pmix_list_t * kvs,pmix_buffer_t * buf,void * cbdata)3047 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
3048 pmix_list_t *kvs,
3049 pmix_buffer_t *buf,
3050 void *cbdata)
3051 {
3052 pmix_status_t rc = PMIX_SUCCESS;
3053 pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
3054 pmix_kval_t *kv;
3055
3056 if (!PMIX_PEER_IS_V1(cd->peer)) {
3057 PMIX_BFROPS_PACK(rc, cd->peer, buf, proc, 1, PMIX_PROC);
3058 if (PMIX_SUCCESS != rc) {
3059 return rc;
3060 }
3061 }
3062 PMIX_LIST_FOREACH(kv, kvs, pmix_kval_t) {
3063 PMIX_BFROPS_PACK(rc, cd->peer, buf, kv, 1, PMIX_KVAL);
3064 if (PMIX_SUCCESS != rc) {
3065 return rc;
3066 }
3067 }
3068 return rc;
3069 }
3070
store_session_info(pmix_nspace_t nspace,pmix_kval_t * kv)3071 static pmix_status_t store_session_info(pmix_nspace_t nspace,
3072 pmix_kval_t *kv)
3073 {
3074 pmix_job_t *trk;
3075 pmix_status_t rc;
3076
3077 /* find the hash table for this nspace */
3078 trk = get_tracker(nspace, true);
3079 if (NULL == trk) {
3080 return PMIX_ERR_NOMEM;
3081 }
3082 rc = process_session_array(kv->value, trk);
3083 return rc;
3084 }
3085
store_node_info(pmix_nspace_t nspace,pmix_kval_t * kv)3086 static pmix_status_t store_node_info(pmix_nspace_t nspace,
3087 pmix_kval_t *kv)
3088 {
3089 pmix_job_t *trk;
3090 pmix_status_t rc;
3091
3092 /* find the hash table for this nspace */
3093 trk = get_tracker(nspace, true);
3094 if (NULL == trk) {
3095 return PMIX_ERR_NOMEM;
3096 }
3097 rc = process_node_array(kv->value, &trk->nodeinfo);
3098 return rc;
3099 }
3100
store_app_info(pmix_nspace_t nspace,pmix_kval_t * kv)3101 static pmix_status_t store_app_info(pmix_nspace_t nspace,
3102 pmix_kval_t *kv)
3103 {
3104 pmix_job_t *trk;
3105 pmix_status_t rc;
3106
3107 /* find the hash table for this nspace */
3108 trk = get_tracker(nspace, true);
3109 if (NULL == trk) {
3110 return PMIX_ERR_NOMEM;
3111 }
3112 rc = process_app_array(kv->value, trk);
3113 return rc;
3114 }
3115
accept_kvs_resp(pmix_buffer_t * buf)3116 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf)
3117 {
3118 pmix_status_t rc = PMIX_SUCCESS;
3119 int32_t cnt;
3120 pmix_byte_object_t bo;
3121 pmix_buffer_t pbkt;
3122 pmix_kval_t *kv;
3123 pmix_proc_t proct;
3124
3125 /* the incoming payload is provided as a set of packed
3126 * byte objects, one for each rank. A pmix_proc_t is the first
3127 * entry in the byte object. If the rank=PMIX_RANK_WILDCARD,
3128 * then that byte object contains job level info
3129 * for the provided nspace. Otherwise, the byte
3130 * object contains the pmix_kval_t's that were "put" by the
3131 * referenced process */
3132 cnt = 1;
3133 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3134 buf, &bo, &cnt, PMIX_BYTE_OBJECT);
3135 while (PMIX_SUCCESS == rc) {
3136 /* setup the byte object for unpacking */
3137 PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
3138 PMIX_LOAD_BUFFER(pmix_client_globals.myserver,
3139 &pbkt, bo.bytes, bo.size);
3140 /* unpack the id of the providing process */
3141 cnt = 1;
3142 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3143 &pbkt, &proct, &cnt, PMIX_PROC);
3144 if (PMIX_SUCCESS != rc) {
3145 PMIX_ERROR_LOG(rc);
3146 return rc;
3147 }
3148 /* if the rank is UNDEF, then we store this on our own
3149 * rank tables */
3150 if (PMIX_RANK_UNDEF == proct.rank) {
3151 proct.rank = pmix_globals.myid.rank;
3152 }
3153
3154 cnt = 1;
3155 kv = PMIX_NEW(pmix_kval_t);
3156 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3157 &pbkt, kv, &cnt, PMIX_KVAL);
3158 while (PMIX_SUCCESS == rc) {
3159 /* if this is an info array, then store it here as dstore
3160 * doesn't know how to handle it */
3161 if (PMIX_CHECK_KEY(kv, PMIX_SESSION_INFO_ARRAY)) {
3162 rc = store_session_info(proct.nspace, kv);
3163 } else if (PMIX_CHECK_KEY(kv, PMIX_NODE_INFO_ARRAY)) {
3164 rc = store_node_info(proct.nspace, kv);
3165 } else if (PMIX_CHECK_KEY(kv, PMIX_APP_INFO_ARRAY)) {
3166 rc = store_app_info(proct.nspace, kv);
3167 } else {
3168 rc = hash_store(&proct, PMIX_INTERNAL, kv);
3169 }
3170 if (PMIX_SUCCESS != rc) {
3171 PMIX_ERROR_LOG(rc);
3172 PMIX_RELEASE(kv);
3173 PMIX_DESTRUCT(&pbkt);
3174 return rc;
3175 }
3176 PMIX_RELEASE(kv); // maintain accounting
3177 /* get the next one */
3178 kv = PMIX_NEW(pmix_kval_t);
3179 cnt = 1;
3180 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3181 &pbkt, kv, &cnt, PMIX_KVAL);
3182 }
3183 PMIX_RELEASE(kv); // maintain accounting
3184 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
3185 PMIX_ERROR_LOG(rc);
3186 PMIX_DESTRUCT(&pbkt);
3187 return rc;
3188 }
3189 PMIX_DESTRUCT(&pbkt);
3190 /* get the next one */
3191 cnt = 1;
3192 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3193 buf, &bo, &cnt, PMIX_BYTE_OBJECT);
3194 }
3195 if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
3196 PMIX_ERROR_LOG(rc);
3197 return rc;
3198 }
3199 return rc;
3200 }
3201