1 /*
2  * Copyright (c) 2015-2020 Intel, Inc.  All rights reserved.
3  * Copyright (c) 2016-2018 IBM Corporation.  All rights reserved.
4  * Copyright (c) 2018      Research Organization for Information Science
5  *                         and Technology (RIST).  All rights reserved.
6  * Copyright (c) 2018-2020 Mellanox Technologies, Inc.
7  *                         All rights reserved.
8  *
9  * $COPYRIGHT$
10  *
11  * Additional copyrights may follow
12  *
13  * $HEADER$
14  */
15 
16 #include "src/include/pmix_config.h"
17 
18 #include <string.h>
19 #ifdef HAVE_UNISTD_H
20 #include <unistd.h>
21 #endif
22 #ifdef HAVE_SYS_TYPES_H
23 #include <sys/types.h>
24 #endif
25 #ifdef HAVE_SYS_STAT_H
26 #include <sys/stat.h>
27 #endif
28 #ifdef HAVE_FCNTL_H
29 #include <fcntl.h>
30 #endif
31 #include <time.h>
32 
33 #include "include/pmix_common.h"
34 
35 #include "src/include/pmix_globals.h"
36 #include "src/class/pmix_list.h"
37 #include "src/client/pmix_client_ops.h"
38 #include "src/server/pmix_server_ops.h"
39 #include "src/mca/pcompress/base/base.h"
40 #include "src/mca/preg/preg.h"
41 #include "src/mca/ptl/base/base.h"
42 #include "src/util/argv.h"
43 #include "src/util/error.h"
44 #include "src/util/hash.h"
45 #include "src/util/output.h"
46 #include "src/util/name_fns.h"
47 #include "src/util/pmix_environ.h"
48 
49 #include "src/mca/gds/base/base.h"
50 #include "gds_hash.h"
51 
52 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo);
53 static void hash_finalize(void);
54 
55 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
56                                         int *priority);
57 
58 static pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
59                                          pmix_info_t info[], size_t ninfo);
60 
61 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
62                                             pmix_buffer_t *reply);
63 
64 static pmix_status_t hash_store_job_info(const char *nspace,
65                                         pmix_buffer_t *buf);
66 
67 static pmix_status_t hash_store(const pmix_proc_t *proc,
68                                 pmix_scope_t scope,
69                                 pmix_kval_t *kv);
70 
71 static pmix_status_t hash_store_modex(struct pmix_namespace_t *ns,
72                                       pmix_buffer_t *buff,
73                                       void *cbdata);
74 
75 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
76                                        pmix_proc_t *proc,
77                                        pmix_gds_modex_key_fmt_t key_fmt,
78                                        char **kmap,
79                                        pmix_buffer_t *pbkt);
80 
81 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
82                                 pmix_scope_t scope, bool copy,
83                                 const char *key,
84                                 pmix_info_t info[], size_t ninfo,
85                                 pmix_list_t *kvs);
86 
87 static pmix_status_t setup_fork(const pmix_proc_t *peer, char ***env);
88 
89 static pmix_status_t nspace_add(const char *nspace, uint32_t nlocalprocs,
90                                 pmix_info_t info[], size_t ninfo);
91 
92 static pmix_status_t nspace_del(const char *nspace);
93 
94 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
95                               pmix_list_t *kvs,
96                               pmix_buffer_t *bo,
97                               void *cbdata);
98 
99 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf);
100 
101 pmix_gds_base_module_t pmix_hash_module = {
102     .name = "hash",
103     .is_tsafe = false,
104     .init = hash_init,
105     .finalize = hash_finalize,
106     .assign_module = hash_assign_module,
107     .cache_job_info = hash_cache_job_info,
108     .register_job_info = hash_register_job_info,
109     .store_job_info = hash_store_job_info,
110     .store = hash_store,
111     .store_modex = hash_store_modex,
112     .fetch = hash_fetch,
113     .setup_fork = setup_fork,
114     .add_nspace = nspace_add,
115     .del_nspace = nspace_del,
116     .assemb_kvs_req = assemb_kvs_req,
117     .accept_kvs_resp = accept_kvs_resp
118 };
119 
120 /* Define a bitmask to track what information may not have
121  * been provided but is computable from other info */
122 #define PMIX_HASH_PROC_DATA     0x00000001
123 #define PMIX_HASH_JOB_SIZE      0x00000002
124 #define PMIX_HASH_MAX_PROCS     0x00000004
125 #define PMIX_HASH_NUM_NODES     0x00000008
126 #define PMIX_HASH_PROC_MAP      0x00000010
127 #define PMIX_HASH_NODE_MAP      0x00000020
128 
129 static pmix_list_t mysessions, myjobs;
130 
131 /**********************************************/
132 /* struct definitions */
133 typedef struct {
134     pmix_list_item_t super;
135     uint32_t session;
136     pmix_list_t sessioninfo;
137     pmix_list_t nodeinfo;
138 } pmix_session_t;
139 
140 typedef struct {
141     pmix_list_item_t super;
142     char *ns;
143     pmix_namespace_t *nptr;
144     pmix_hash_table_t internal;
145     pmix_hash_table_t remote;
146     pmix_hash_table_t local;
147     bool gdata_added;
148     pmix_list_t jobinfo;
149     pmix_list_t apps;
150     pmix_list_t nodeinfo;
151     pmix_session_t *session;
152 } pmix_job_t;
153 
154 typedef struct {
155     pmix_list_item_t super;
156     uint32_t appnum;
157     pmix_list_t appinfo;
158     pmix_list_t nodeinfo;
159     pmix_job_t *job;
160 } pmix_apptrkr_t;
161 
162 typedef struct {
163     pmix_list_item_t super;
164     uint32_t nodeid;
165     char *hostname;
166     char **aliases;
167     pmix_list_t info;
168 } pmix_nodeinfo_t;
169 
170 /**********************************************/
171 /* class instantiations */
scon(pmix_session_t * s)172 static void scon(pmix_session_t *s)
173 {
174     s->session = UINT32_MAX;
175     PMIX_CONSTRUCT(&s->sessioninfo, pmix_list_t);
176     PMIX_CONSTRUCT(&s->nodeinfo, pmix_list_t);
177 }
sdes(pmix_session_t * s)178 static void sdes(pmix_session_t *s)
179 {
180     PMIX_LIST_DESTRUCT(&s->sessioninfo);
181     PMIX_LIST_DESTRUCT(&s->nodeinfo);
182 }
183 static PMIX_CLASS_INSTANCE(pmix_session_t,
184                            pmix_list_item_t,
185                            scon, sdes);
186 
htcon(pmix_job_t * p)187 static void htcon(pmix_job_t *p)
188 {
189     p->ns = NULL;
190     p->nptr = NULL;
191     PMIX_CONSTRUCT(&p->jobinfo, pmix_list_t);
192     PMIX_CONSTRUCT(&p->internal, pmix_hash_table_t);
193     pmix_hash_table_init(&p->internal, 256);
194     PMIX_CONSTRUCT(&p->remote, pmix_hash_table_t);
195     pmix_hash_table_init(&p->remote, 256);
196     PMIX_CONSTRUCT(&p->local, pmix_hash_table_t);
197     pmix_hash_table_init(&p->local, 256);
198     p->gdata_added = false;
199     PMIX_CONSTRUCT(&p->apps, pmix_list_t);
200     PMIX_CONSTRUCT(&p->nodeinfo, pmix_list_t);
201     p->session = NULL;
202 }
htdes(pmix_job_t * p)203 static void htdes(pmix_job_t *p)
204 {
205     if (NULL != p->ns) {
206         free(p->ns);
207     }
208     if (NULL != p->nptr) {
209         PMIX_RELEASE(p->nptr);
210     }
211     PMIX_LIST_DESTRUCT(&p->jobinfo);
212     pmix_hash_remove_data(&p->internal, PMIX_RANK_WILDCARD, NULL);
213     PMIX_DESTRUCT(&p->internal);
214     pmix_hash_remove_data(&p->remote, PMIX_RANK_WILDCARD, NULL);
215     PMIX_DESTRUCT(&p->remote);
216     pmix_hash_remove_data(&p->local, PMIX_RANK_WILDCARD, NULL);
217     PMIX_DESTRUCT(&p->local);
218     PMIX_LIST_DESTRUCT(&p->apps);
219     PMIX_LIST_DESTRUCT(&p->nodeinfo);
220     if (NULL != p->session) {
221         PMIX_RELEASE(p->session);
222     }
223 }
224 static PMIX_CLASS_INSTANCE(pmix_job_t,
225                            pmix_list_item_t,
226                            htcon, htdes);
227 
apcon(pmix_apptrkr_t * p)228 static void apcon(pmix_apptrkr_t *p)
229 {
230     p->appnum = 0;
231     PMIX_CONSTRUCT(&p->appinfo, pmix_list_t);
232     PMIX_CONSTRUCT(&p->nodeinfo, pmix_list_t);
233     p->job = NULL;
234 }
apdes(pmix_apptrkr_t * p)235 static void apdes(pmix_apptrkr_t *p)
236 {
237     PMIX_LIST_DESTRUCT(&p->appinfo);
238     PMIX_LIST_DESTRUCT(&p->nodeinfo);
239     if (NULL != p->job) {
240         PMIX_RELEASE(p->job);
241     }
242 }
243 static PMIX_CLASS_INSTANCE(pmix_apptrkr_t,
244                            pmix_list_item_t,
245                            apcon, apdes);
246 
ndinfocon(pmix_nodeinfo_t * p)247 static void ndinfocon(pmix_nodeinfo_t *p)
248 {
249     p->nodeid = UINT32_MAX;
250     p->hostname = NULL;
251     p->aliases = NULL;
252     PMIX_CONSTRUCT(&p->info, pmix_list_t);
253 }
ndinfodes(pmix_nodeinfo_t * p)254 static void ndinfodes(pmix_nodeinfo_t *p)
255 {
256     if (NULL != p->hostname) {
257         free(p->hostname);
258     }
259     if (NULL != p->aliases) {
260         pmix_argv_free(p->aliases);
261     }
262     PMIX_LIST_DESTRUCT(&p->info);
263 }
264 static PMIX_CLASS_INSTANCE(pmix_nodeinfo_t,
265                            pmix_list_item_t,
266                            ndinfocon, ndinfodes);
267 
268 /**********************************************
269  *  Local Functions
270  **********************************************/
get_tracker(const pmix_nspace_t nspace,bool create)271 static pmix_job_t* get_tracker(const pmix_nspace_t nspace, bool create)
272 {
273     pmix_job_t *trk, *t;
274     pmix_namespace_t *ns, *nptr;
275 
276     /* find the hash table for this nspace */
277     trk = NULL;
278     PMIX_LIST_FOREACH(t, &myjobs, pmix_job_t) {
279         if (0 == strcmp(nspace, t->ns)) {
280             trk = t;
281             break;
282         }
283     }
284     if (NULL == trk && create) {
285         /* create one */
286         trk = PMIX_NEW(pmix_job_t);
287         trk->ns = strdup(nspace);
288         /* see if we already have this nspace */
289         nptr = NULL;
290         PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
291             if (0 == strcmp(ns->nspace, nspace)) {
292                 nptr = ns;
293                 break;
294             }
295         }
296         if (NULL == nptr) {
297             nptr = PMIX_NEW(pmix_namespace_t);
298             if (NULL == nptr) {
299                 PMIX_RELEASE(trk);
300                 return NULL;
301             }
302             nptr->nspace = strdup(nspace);
303             pmix_list_append(&pmix_globals.nspaces, &nptr->super);
304         }
305         PMIX_RETAIN(nptr);
306         trk->nptr = nptr;
307         pmix_list_append(&myjobs, &trk->super);
308     }
309     return trk;
310 }
311 
check_hostname(char * h1,char * h2)312 static bool check_hostname(char *h1, char *h2)
313 {
314     if (0 == strcmp(h1, h2)) {
315         return true;
316     }
317     return false;
318 }
319 
check_node(pmix_nodeinfo_t * n1,pmix_nodeinfo_t * n2)320 static bool check_node(pmix_nodeinfo_t *n1,
321                        pmix_nodeinfo_t *n2)
322 {
323     int i, j;
324 
325     if (UINT32_MAX != n1->nodeid &&
326         UINT32_MAX != n2->nodeid &&
327         n1->nodeid == n2->nodeid) {
328         return true;
329     }
330 
331     if (NULL == n1->hostname || NULL == n2->hostname) {
332         return false;
333     }
334 
335     if (check_hostname(n1->hostname, n2->hostname)) {
336         return true;
337     }
338 
339     if (NULL != n1->aliases) {
340         for (i=0; NULL != n1->aliases[i]; i++) {
341             if (check_hostname(n1->aliases[i], n2->hostname)) {
342                 return true;
343             }
344             if (NULL != n2->aliases) {
345                 for (j=0; NULL != n2->aliases[j]; j++) {
346                     if (check_hostname(n1->hostname, n2->aliases[j])) {
347                         return true;
348                     }
349                     if (check_hostname(n1->aliases[i], n2->aliases[j])) {
350                         return true;
351                     }
352                 }
353             }
354         }
355     } else if (NULL != n2->aliases) {
356         for (j=0; NULL != n2->aliases[j]; j++) {
357             if (check_hostname(n1->hostname, n2->aliases[j])) {
358                 return true;
359             }
360         }
361     }
362 
363     return false;
364 }
365 
check_nodename(pmix_nodeinfo_t * nptr,char * hostname)366 static bool check_nodename(pmix_nodeinfo_t *nptr, char *hostname)
367 {
368     int i;
369 
370     if (NULL == nptr->hostname) {
371         return false;
372     }
373 
374     if (check_hostname(nptr->hostname, hostname)) {
375         return true;
376     }
377 
378     if (NULL != nptr->aliases) {
379         for (i=0; NULL != nptr->aliases[i]; i++) {
380             if (check_hostname(nptr->aliases[i], hostname)) {
381                 return true;
382             }
383         }
384     }
385     return false;
386 }
387 
388 /**********************************************
389  *   Forward Declarations
390  **********************************************/
391 static pmix_status_t fetch_nodeinfo(const char *key, pmix_list_t *tgt,
392                                     pmix_info_t *info, size_t ninfo,
393                                     pmix_list_t *kvs);
394 static pmix_status_t fetch_appinfo(const char *key, pmix_list_t *tgt,
395                                    pmix_info_t *info, size_t ninfo,
396                                    pmix_list_t *kvs);
397 
398 /**********************************************/
399 
400 /* process a node array - contains an array of
401  * node-level info for a single node. Either the
402  * nodeid, hostname, or both must be included
403  * in the array to identify the node */
process_node_array(pmix_value_t * val,pmix_list_t * tgt)404 static pmix_status_t process_node_array(pmix_value_t *val,
405                                         pmix_list_t *tgt)
406 {
407     size_t size, j;
408     pmix_info_t *iptr;
409     pmix_status_t rc = PMIX_SUCCESS;
410     pmix_kval_t *kp2, *k1;
411     pmix_list_t cache;
412     pmix_nodeinfo_t *nd = NULL, *ndptr;
413     bool update;
414 
415     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
416                         "PROCESSING NODE ARRAY");
417 
418     /* array of node-level info for a specific node */
419     if (PMIX_DATA_ARRAY != val->type) {
420         PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
421         return PMIX_ERR_TYPE_MISMATCH;
422     }
423 
424     /* setup arrays */
425     size = val->data.darray->size;
426     iptr = (pmix_info_t*)val->data.darray->array;
427     PMIX_CONSTRUCT(&cache, pmix_list_t);
428 
429     /* cache the values while searching for the nodeid
430      * and/or hostname */
431     for (j=0; j < size; j++) {
432         pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
433                             "%s gds:hash:node_array for key %s",
434                             PMIX_NAME_PRINT(&pmix_globals.myid), iptr[j].key);
435         if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODEID)) {
436             if (NULL == nd) {
437                 nd = PMIX_NEW(pmix_nodeinfo_t);
438             }
439             PMIX_VALUE_GET_NUMBER(rc, &iptr[j].value, nd->nodeid, uint32_t);
440             if (PMIX_SUCCESS != rc) {
441                 PMIX_ERROR_LOG(rc);
442                 PMIX_RELEASE(nd);
443                 PMIX_LIST_DESTRUCT(&cache);
444                 return rc;
445             }
446         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_HOSTNAME)) {
447             if (NULL == nd) {
448                 nd = PMIX_NEW(pmix_nodeinfo_t);
449             }
450             nd->hostname = strdup(iptr[j].value.data.string);
451         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_HOSTNAME_ALIASES)) {
452             if (NULL == nd) {
453                 nd = PMIX_NEW(pmix_nodeinfo_t);
454             }
455             nd->aliases = pmix_argv_split(iptr[j].value.data.string, ',');
456             /* need to cache this value as well */
457             kp2 = PMIX_NEW(pmix_kval_t);
458             kp2->key = strdup(iptr[j].key);
459             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
460             PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
461             if (PMIX_SUCCESS != rc) {
462                 PMIX_ERROR_LOG(rc);
463                 PMIX_RELEASE(kp2);
464                 PMIX_RELEASE(nd);
465                 PMIX_LIST_DESTRUCT(&cache);
466                 return rc;
467             }
468             pmix_list_append(&cache, &kp2->super);
469         } else {
470             kp2 = PMIX_NEW(pmix_kval_t);
471             kp2->key = strdup(iptr[j].key);
472             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
473             PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
474             if (PMIX_SUCCESS != rc) {
475                 PMIX_ERROR_LOG(rc);
476                 PMIX_RELEASE(kp2);
477                 if (NULL != nd) {
478                     PMIX_RELEASE(nd);
479                 }
480                 PMIX_LIST_DESTRUCT(&cache);
481                 return rc;
482             }
483             pmix_list_append(&cache, &kp2->super);
484         }
485     }
486 
487     if (NULL == nd) {
488         /* they forgot to pass us the ident for the node */
489         PMIX_LIST_DESTRUCT(&cache);
490         return PMIX_ERR_BAD_PARAM;
491     }
492 
493     /* see if we already have this node on the
494      * provided list */
495     update = false;
496     PMIX_LIST_FOREACH(ndptr, tgt, pmix_nodeinfo_t) {
497         if (check_node(ndptr, nd)) {
498             /* we assume that the data is updating the current
499              * values */
500             if (NULL == ndptr->hostname && NULL != nd->hostname) {
501                 ndptr->hostname = strdup(nd->hostname);
502             }
503             PMIX_RELEASE(nd);
504             nd = ndptr;
505             update = true;
506             break;
507         }
508     }
509     if (!update) {
510         pmix_list_append(tgt, &nd->super);
511     }
512 
513     /* transfer the cached items to the nodeinfo list */
514     kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
515     while (NULL != kp2) {
516         /* if this is an update, we have to ensure each data
517          * item only appears once on the list */
518         if (update) {
519             PMIX_LIST_FOREACH(k1, &nd->info, pmix_kval_t) {
520                 if (PMIX_CHECK_KEY(k1, kp2->key)) {
521                     pmix_list_remove_item(&nd->info, &k1->super);
522                     PMIX_RELEASE(k1);
523                     break;
524                 }
525             }
526         }
527         pmix_list_append(&nd->info, &kp2->super);
528         kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
529     }
530     PMIX_LIST_DESTRUCT(&cache);
531 
532     return PMIX_SUCCESS;
533 }
534 
535 /* process an app array - contains an array of
536  * app-level info for a single app. If the
537  * appnum is not included in the array, then
538  * it is assumed that only app is in the job.
539  * This assumption is checked and generates
540  * an error if violated */
process_app_array(pmix_value_t * val,pmix_job_t * trk)541 static pmix_status_t process_app_array(pmix_value_t *val,
542                                        pmix_job_t *trk)
543 {
544     pmix_list_t cache, ncache;
545     size_t size, j;
546     pmix_info_t *iptr;
547     pmix_status_t rc = PMIX_SUCCESS;
548     uint32_t appnum;
549     pmix_apptrkr_t *app = NULL, *apptr;
550     pmix_kval_t *kp2, *k1;
551     pmix_nodeinfo_t *nd;
552     bool update;
553 
554     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
555                         "PROCESSING APP ARRAY");
556 
557     /* apps have to belong to a job */
558     if (NULL == trk) {
559         return PMIX_ERR_BAD_PARAM;
560     }
561 
562     /* array of app-level info */
563     if (PMIX_DATA_ARRAY != val->type) {
564         PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
565         return PMIX_ERR_TYPE_MISMATCH;
566     }
567 
568     /* setup arrays and lists */
569     PMIX_CONSTRUCT(&cache, pmix_list_t);
570     PMIX_CONSTRUCT(&ncache, pmix_list_t);
571     size = val->data.darray->size;
572     iptr = (pmix_info_t*)val->data.darray->array;
573 
574     for (j=0; j < size; j++) {
575         pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
576                             "%s gds:hash:app_array for key %s",
577                             PMIX_NAME_PRINT(&pmix_globals.myid), iptr[j].key);
578         if (PMIX_CHECK_KEY(&iptr[j], PMIX_APPNUM)) {
579             PMIX_VALUE_GET_NUMBER(rc, &iptr[j].value, appnum, uint32_t);
580             if (PMIX_SUCCESS != rc) {
581                 PMIX_ERROR_LOG(rc);
582                 goto release;
583             }
584             if (NULL != app) {
585                 /* this is an error - there can be only one app
586                  * described in this array */
587                 PMIX_RELEASE(app);
588                 PMIX_LIST_DESTRUCT(&cache);
589                 PMIX_LIST_DESTRUCT(&ncache);
590                 return PMIX_ERR_BAD_PARAM;
591             }
592             app = PMIX_NEW(pmix_apptrkr_t);
593             app->appnum = appnum;
594         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_INFO_ARRAY)) {
595             if (PMIX_SUCCESS != (rc = process_node_array(&iptr[j].value, &ncache))) {
596                 PMIX_ERROR_LOG(rc);
597                 goto release;
598             }
599         } else {
600             kp2 = PMIX_NEW(pmix_kval_t);
601             kp2->key = strdup(iptr[j].key);
602             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
603             PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
604             if (PMIX_SUCCESS != rc) {
605                 PMIX_ERROR_LOG(rc);
606                 PMIX_RELEASE(kp2);
607                 goto release;
608             }
609             pmix_list_append(&cache, &kp2->super);
610         }
611     }
612     if (NULL == app) {
613         /* per the standard, they don't have to provide us with
614          * an appnum so long as only one app is in the job */
615         if (0 == pmix_list_get_size(&trk->apps)) {
616             app = PMIX_NEW(pmix_apptrkr_t);
617             app->appnum = 0;
618         } else {
619             /* this is not allowed to happen - they are required
620              * to provide us with an app number per the standard */
621             rc = PMIX_ERR_BAD_PARAM;
622             PMIX_ERROR_LOG(rc);
623             goto release;
624         }
625     }
626     /* see if we already have this app on the
627      * provided list */
628     update = false;
629     PMIX_LIST_FOREACH(apptr, &trk->apps, pmix_apptrkr_t) {
630         if (apptr->appnum == app->appnum) {
631             /* we assume that the data is updating the current
632              * values */
633             PMIX_RELEASE(app);
634             app = apptr;
635             update = true;
636             break;
637         }
638     }
639     if (!update) {
640         pmix_list_append(&trk->apps, &app->super);
641     }
642     /* point the app at its job */
643     if (NULL == app->job) {
644         PMIX_RETAIN(trk);
645         app->job = trk;
646     }
647 
648     /* transfer the app-level data across */
649     kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
650     while (NULL != kp2) {
651         /* if this is an update, we have to ensure each data
652          * item only appears once on the list */
653         if (update) {
654             PMIX_LIST_FOREACH(k1, &app->appinfo, pmix_kval_t) {
655                 if (PMIX_CHECK_KEY(k1, kp2->key)) {
656                     pmix_list_remove_item(&app->appinfo, &k1->super);
657                     PMIX_RELEASE(k1);
658                     break;
659                 }
660             }
661         }
662         pmix_list_append(&app->appinfo, &kp2->super);
663         kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
664     }
665     /* transfer the associated node-level data across */
666     nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
667     while (NULL != nd) {
668         pmix_list_append(&app->nodeinfo, &nd->super);
669         nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
670     }
671 
672   release:
673     PMIX_LIST_DESTRUCT(&cache);
674     PMIX_LIST_DESTRUCT(&ncache);
675 
676     return rc;
677 }
678 
679 /* process a job array */
process_job_array(pmix_info_t * info,pmix_job_t * trk,uint32_t * flags,char *** procs,char *** nodes)680 static pmix_status_t process_job_array(pmix_info_t *info,
681                                        pmix_job_t *trk,
682                                        uint32_t *flags,
683                                        char ***procs,
684                                        char ***nodes)
685 {
686     pmix_list_t cache;
687     size_t j, size;
688     pmix_info_t *iptr;
689     pmix_kval_t *kp2;
690     pmix_status_t rc;
691 
692     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
693                         "PROCESSING JOB ARRAY");
694 
695     /* array of job-level info */
696     if (PMIX_DATA_ARRAY != info->value.type) {
697         PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
698         return PMIX_ERR_TYPE_MISMATCH;
699     }
700     size = info->value.data.darray->size;
701     iptr = (pmix_info_t*)info->value.data.darray->array;
702     PMIX_CONSTRUCT(&cache, pmix_list_t);
703     for (j=0; j < size; j++) {
704         if (PMIX_CHECK_KEY(&iptr[j], PMIX_APP_INFO_ARRAY)) {
705             if (PMIX_SUCCESS != (rc = process_app_array(&iptr[j].value, trk))) {
706                 return rc;
707             }
708         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_INFO_ARRAY)) {
709             if (PMIX_SUCCESS != (rc = process_node_array(&iptr[j].value, &trk->nodeinfo))) {
710                 PMIX_ERROR_LOG(rc);
711                 return rc;
712             }
713         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_PROC_MAP)) {
714             /* not allowed to get this more than once */
715             if (*flags & PMIX_HASH_PROC_MAP) {
716                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
717                 return PMIX_ERR_BAD_PARAM;
718             }
719             /* parse the regex to get the argv array containing proc ranks on each node */
720             if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(iptr[j].value.data.bo.bytes, procs))) {
721                 PMIX_ERROR_LOG(rc);
722                 return rc;
723             }
724             /* mark that we got the map */
725             *flags |= PMIX_HASH_PROC_MAP;
726         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_MAP)) {
727             /* not allowed to get this more than once */
728             if (*flags & PMIX_HASH_NODE_MAP) {
729                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
730                 return PMIX_ERR_BAD_PARAM;
731             }
732             /* parse the regex to get the argv array of node names */
733             if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(iptr[j].value.data.bo.bytes, nodes))) {
734                 PMIX_ERROR_LOG(rc);
735                 return rc;
736             }
737             /* mark that we got the map */
738             *flags |= PMIX_HASH_NODE_MAP;
739         } else {
740             kp2 = PMIX_NEW(pmix_kval_t);
741             kp2->key = strdup(iptr[j].key);
742             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
743             PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
744             if (PMIX_SUCCESS != rc) {
745                 PMIX_RELEASE(kp2);
746                 PMIX_LIST_DESTRUCT(&cache);
747                 return rc;
748             }
749             pmix_list_append(&trk->jobinfo, &kp2->super);
750             /* check for job size */
751             if (PMIX_CHECK_KEY(&iptr[j], PMIX_JOB_SIZE) &&
752                 !(PMIX_HASH_JOB_SIZE & *flags)) {
753                 trk->nptr->nprocs = iptr[j].value.data.uint32;
754                 *flags |= PMIX_HASH_JOB_SIZE;
755             }
756         }
757     }
758     return PMIX_SUCCESS;
759 }
760 
process_session_array(pmix_value_t * val,pmix_job_t * trk)761 static pmix_status_t process_session_array(pmix_value_t *val,
762                                            pmix_job_t *trk)
763 {
764     pmix_session_t *s = NULL, *sptr;
765     size_t j, size;
766     pmix_info_t *iptr;
767     pmix_list_t cache, ncache;
768     pmix_status_t rc;
769     pmix_kval_t *kp2;
770     pmix_nodeinfo_t *nd;
771     uint32_t sid;
772 
773     /* array of session-level info */
774     if (PMIX_DATA_ARRAY != val->type) {
775         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
776         return PMIX_ERR_TYPE_MISMATCH;
777     }
778     size = val->data.darray->size;
779     iptr = (pmix_info_t*)val->data.darray->array;
780 
781     PMIX_CONSTRUCT(&cache, pmix_list_t);
782     PMIX_CONSTRUCT(&ncache, pmix_list_t);
783     for (j=0; j < size; j++) {
784         if (PMIX_CHECK_KEY(&iptr[j], PMIX_SESSION_ID)) {
785             PMIX_VALUE_GET_NUMBER(rc, &iptr[j].value, sid, uint32_t);
786             if (PMIX_SUCCESS != rc) {
787                 PMIX_ERROR_LOG(rc);
788                 PMIX_LIST_DESTRUCT(&cache);
789                 PMIX_LIST_DESTRUCT(&ncache);
790                 return rc;
791             }
792             /* see if we already have this session - it could have
793              * been defined by a separate PMIX_SESSION_ID key */
794             PMIX_LIST_FOREACH(sptr, &mysessions, pmix_session_t) {
795                 if (sptr->session == sid) {
796                     s = sptr;
797                     break;
798                 }
799             }
800             if (NULL == s) {
801                 /* wasn't found, so create one */
802                 s = PMIX_NEW(pmix_session_t);
803                 s->session = sid;
804                 pmix_list_append(&mysessions, &s->super);
805             }
806         } else if (PMIX_CHECK_KEY(&iptr[j], PMIX_NODE_INFO_ARRAY)) {
807             if (PMIX_SUCCESS != (rc = process_node_array(&iptr[j].value, &ncache))) {
808                 PMIX_ERROR_LOG(rc);
809                 PMIX_LIST_DESTRUCT(&cache);
810                 PMIX_LIST_DESTRUCT(&ncache);
811                 return rc;
812             }
813         } else {
814             kp2 = PMIX_NEW(pmix_kval_t);
815             kp2->key = strdup(iptr[j].key);
816             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
817             PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
818             if (PMIX_SUCCESS != rc) {
819                 PMIX_ERROR_LOG(rc);
820                 PMIX_RELEASE(kp2);
821                 PMIX_LIST_DESTRUCT(&cache);
822                 PMIX_LIST_DESTRUCT(&ncache);
823                 return rc;
824             }
825             pmix_list_append(&cache, &kp2->super);
826         }
827     }
828     if (NULL == s) {
829         /* this is not allowed to happen - they are required
830          * to provide us with a session ID per the standard */
831         PMIX_LIST_DESTRUCT(&cache);
832         PMIX_LIST_DESTRUCT(&ncache);
833         rc = PMIX_ERR_BAD_PARAM;
834         PMIX_ERROR_LOG(rc);
835         return rc;
836     }
837     /* point the job at it */
838     if (NULL != trk->session) {
839         PMIX_RELEASE(trk->session);
840     }
841     PMIX_RETAIN(s);
842     trk->session = s;
843     /* transfer the data across */
844     kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
845     while (NULL != kp2) {
846         pmix_list_append(&s->sessioninfo, &kp2->super);
847         kp2 = (pmix_kval_t*)pmix_list_remove_first(&cache);
848     }
849     PMIX_LIST_DESTRUCT(&cache);
850     nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
851     while (NULL != nd) {
852         pmix_list_append(&s->nodeinfo, &nd->super);
853         nd = (pmix_nodeinfo_t*)pmix_list_remove_first(&ncache);
854     }
855     PMIX_LIST_DESTRUCT(&ncache);
856     return PMIX_SUCCESS;
857 }
858 
hash_init(pmix_info_t info[],size_t ninfo)859 static pmix_status_t hash_init(pmix_info_t info[], size_t ninfo)
860 {
861     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
862                         "gds: hash init");
863 
864     PMIX_CONSTRUCT(&mysessions, pmix_list_t);
865     PMIX_CONSTRUCT(&myjobs, pmix_list_t);
866     return PMIX_SUCCESS;
867 }
868 
hash_finalize(void)869 static void hash_finalize(void)
870 {
871     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
872                         "gds: hash finalize");
873 
874     PMIX_LIST_DESTRUCT(&mysessions);
875     PMIX_LIST_DESTRUCT(&myjobs);
876 }
877 
hash_assign_module(pmix_info_t * info,size_t ninfo,int * priority)878 static pmix_status_t hash_assign_module(pmix_info_t *info, size_t ninfo,
879                                         int *priority)
880 {
881     size_t n, m;
882     char **options;
883 
884     *priority = 10;
885     if (NULL != info) {
886         for (n=0; n < ninfo; n++) {
887             if (0 == strncmp(info[n].key, PMIX_GDS_MODULE, PMIX_MAX_KEYLEN)) {
888                 options = pmix_argv_split(info[n].value.data.string, ',');
889                 for (m=0; NULL != options[m]; m++) {
890                     if (0 == strcmp(options[m], "hash")) {
891                         /* they specifically asked for us */
892                         *priority = 100;
893                         break;
894                     }
895                 }
896                 pmix_argv_free(options);
897                 break;
898             }
899         }
900     }
901     return PMIX_SUCCESS;
902 }
903 
store_map(pmix_job_t * trk,char ** nodes,char ** ppn,uint32_t flags)904 static pmix_status_t store_map(pmix_job_t *trk,
905                                char **nodes, char **ppn,
906                                uint32_t flags)
907 {
908     pmix_status_t rc;
909     size_t m, n;
910     pmix_rank_t rank;
911     pmix_kval_t *kp1, *kp2;
912     char **procs;
913     uint32_t totalprocs=0;
914     pmix_hash_table_t *ht = &trk->internal;
915     pmix_nodeinfo_t *nd, *ndptr;
916 
917     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
918                         "[%s:%d] gds:hash:store_map",
919                         pmix_globals.myid.nspace, pmix_globals.myid.rank);
920 
921     /* if the lists don't match, then that's wrong */
922     if (pmix_argv_count(nodes) != pmix_argv_count(ppn)) {
923         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
924         return PMIX_ERR_BAD_PARAM;
925     }
926 
927     /* if they didn't provide the number of nodes, then
928      * compute it from the list of nodes */
929     if (!(PMIX_HASH_NUM_NODES & flags)) {
930         kp2 = PMIX_NEW(pmix_kval_t);
931         kp2->key = strdup(PMIX_NUM_NODES);
932         kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
933         kp2->value->type = PMIX_UINT32;
934         kp2->value->data.uint32 = pmix_argv_count(nodes);
935         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
936                             "[%s:%d] gds:hash:store_map adding key %s to job info",
937                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
938                             kp2->key);
939         if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
940             PMIX_ERROR_LOG(rc);
941             PMIX_RELEASE(kp2);
942             return rc;
943         }
944         PMIX_RELEASE(kp2);  // maintain acctg
945     }
946 
947     for (n=0; NULL != nodes[n]; n++) {
948         /* check and see if we already have this node */
949         nd = NULL;
950         PMIX_LIST_FOREACH(ndptr, &trk->nodeinfo, pmix_nodeinfo_t) {
951             if (check_nodename(ndptr, nodes[n])) {
952                 nd = ndptr;
953                 break;
954             }
955         }
956         if (NULL == nd) {
957             nd = PMIX_NEW(pmix_nodeinfo_t);
958             nd->hostname = strdup(nodes[n]);
959             nd->nodeid = n;
960             pmix_list_append(&trk->nodeinfo, &nd->super);
961         }
962         /* store the proc list as-is */
963         kp2 = PMIX_NEW(pmix_kval_t);
964         if (NULL == kp2) {
965             return PMIX_ERR_NOMEM;
966         }
967         kp2->key = strdup(PMIX_LOCAL_PEERS);
968         kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
969         if (NULL == kp2->value) {
970             PMIX_RELEASE(kp2);
971             return PMIX_ERR_NOMEM;
972         }
973         kp2->value->type = PMIX_STRING;
974         kp2->value->data.string = strdup(ppn[n]);
975         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
976                             "[%s:%d] gds:hash:store_map adding key %s to node %s info",
977                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
978                             kp2->key, nodes[n]);
979         /* ensure this item only appears once on the list */
980         PMIX_LIST_FOREACH(kp1, &nd->info, pmix_kval_t) {
981             if (PMIX_CHECK_KEY(kp1, kp2->key)) {
982                 pmix_list_remove_item(&nd->info, &kp1->super);
983                 PMIX_RELEASE(kp1);
984                 break;
985             }
986         }
987         pmix_list_append(&nd->info, &kp2->super);
988 
989         /* save the local leader */
990         rank = strtoul(ppn[n], NULL, 10);
991         kp2 = PMIX_NEW(pmix_kval_t);
992         if (NULL == kp2) {
993             return PMIX_ERR_NOMEM;
994         }
995         kp2->key = strdup(PMIX_LOCALLDR);
996         kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
997         if (NULL == kp2->value) {
998             PMIX_RELEASE(kp2);
999             return PMIX_ERR_NOMEM;
1000         }
1001         kp2->value->type = PMIX_PROC_RANK;
1002         kp2->value->data.rank = rank;
1003         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1004                             "[%s:%d] gds:hash:store_map adding key %s to node %s info",
1005                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
1006                             kp2->key, nodes[n]);
1007         /* ensure this item only appears once on the list */
1008         PMIX_LIST_FOREACH(kp1, &nd->info, pmix_kval_t) {
1009             if (PMIX_CHECK_KEY(kp1, kp2->key)) {
1010                 pmix_list_remove_item(&nd->info, &kp1->super);
1011                 PMIX_RELEASE(kp1);
1012                 break;
1013             }
1014         }
1015         pmix_list_append(&nd->info, &kp2->super);
1016 
1017         /* split the list of procs so we can store their
1018          * individual location data */
1019         procs = pmix_argv_split(ppn[n], ',');
1020         /* save the local size in case they don't
1021          * give it to us */
1022         kp2 = PMIX_NEW(pmix_kval_t);
1023         if (NULL == kp2) {
1024             return PMIX_ERR_NOMEM;
1025         }
1026         kp2->key = strdup(PMIX_LOCAL_SIZE);
1027         kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1028         if (NULL == kp2->value) {
1029             PMIX_RELEASE(kp2);
1030             return PMIX_ERR_NOMEM;
1031         }
1032         kp2->value->type = PMIX_UINT32;
1033         kp2->value->data.uint32 = pmix_argv_count(procs);
1034         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1035                             "[%s:%d] gds:hash:store_map adding key %s to node %s info",
1036                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
1037                             kp2->key, nodes[n]);
1038         /* ensure this item only appears once on the list */
1039         PMIX_LIST_FOREACH(kp1, &nd->info, pmix_kval_t) {
1040             if (PMIX_CHECK_KEY(kp1, kp2->key)) {
1041                 pmix_list_remove_item(&nd->info, &kp1->super);
1042                 PMIX_RELEASE(kp1);
1043                 break;
1044             }
1045         }
1046         pmix_list_append(&nd->info, &kp2->super);
1047         /* track total procs in job in case they
1048          * didn't give it to us */
1049         totalprocs += pmix_argv_count(procs);
1050         for (m=0; NULL != procs[m]; m++) {
1051             /* store the hostname for each proc */
1052             kp2 = PMIX_NEW(pmix_kval_t);
1053             kp2->key = strdup(PMIX_HOSTNAME);
1054             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1055             kp2->value->type = PMIX_STRING;
1056             kp2->value->data.string = strdup(nodes[n]);
1057             rank = strtol(procs[m], NULL, 10);
1058             pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1059                                 "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1060                                 pmix_globals.myid.nspace, pmix_globals.myid.rank,
1061                                 trk->ns, rank, kp2->key);
1062             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1063                 PMIX_ERROR_LOG(rc);
1064                 PMIX_RELEASE(kp2);
1065                 pmix_argv_free(procs);
1066                 return rc;
1067             }
1068             PMIX_RELEASE(kp2);  // maintain acctg
1069             if (!(PMIX_HASH_PROC_DATA & flags)) {
1070                 /* add an entry for the nodeid */
1071                 kp2 = PMIX_NEW(pmix_kval_t);
1072                 kp2->key = strdup(PMIX_NODEID);
1073                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1074                 kp2->value->type = PMIX_UINT32;
1075                 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1076                                     "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1077                                     pmix_globals.myid.nspace, pmix_globals.myid.rank,
1078                                     trk->ns, rank, kp2->key);
1079                 kp2->value->data.uint32 = n;
1080                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1081                     PMIX_ERROR_LOG(rc);
1082                     PMIX_RELEASE(kp2);
1083                     pmix_argv_free(procs);
1084                     return rc;
1085                 }
1086                 PMIX_RELEASE(kp2);  // maintain acctg
1087                 /* add an entry for the local rank */
1088                 kp2 = PMIX_NEW(pmix_kval_t);
1089                 kp2->key = strdup(PMIX_LOCAL_RANK);
1090                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1091                 kp2->value->type = PMIX_UINT16;
1092                 kp2->value->data.uint16 = m;
1093                 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1094                                     "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1095                                     pmix_globals.myid.nspace, pmix_globals.myid.rank,
1096                                     trk->ns, rank, kp2->key);
1097                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1098                     PMIX_ERROR_LOG(rc);
1099                     PMIX_RELEASE(kp2);
1100                     pmix_argv_free(procs);
1101                     return rc;
1102                 }
1103                 PMIX_RELEASE(kp2);  // maintain acctg
1104                 /* add an entry for the node rank - for now, we assume
1105                  * only the one job is running */
1106                 kp2 = PMIX_NEW(pmix_kval_t);
1107                 kp2->key = strdup(PMIX_NODE_RANK);
1108                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1109                 kp2->value->type = PMIX_UINT16;
1110                 kp2->value->data.uint16 = m;
1111                 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1112                                     "[%s:%d] gds:hash:store_map for [%s:%u]: key %s",
1113                                     pmix_globals.myid.nspace, pmix_globals.myid.rank,
1114                                     trk->ns, rank, kp2->key);
1115                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1116                     PMIX_ERROR_LOG(rc);
1117                     PMIX_RELEASE(kp2);
1118                     pmix_argv_free(procs);
1119                     return rc;
1120                 }
1121                 PMIX_RELEASE(kp2);  // maintain acctg
1122             }
1123         }
1124         pmix_argv_free(procs);
1125     }
1126 
1127     /* store the comma-delimited list of nodes hosting
1128      * procs in this nspace in case someone using PMIx v2
1129      * requests it */
1130     kp2 = PMIX_NEW(pmix_kval_t);
1131     kp2->key = strdup(PMIX_NODE_LIST);
1132     kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1133     kp2->value->type = PMIX_STRING;
1134     kp2->value->data.string = pmix_argv_join(nodes, ',');
1135     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1136                         "[%s:%d] gds:hash:store_map for nspace %s: key %s",
1137                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
1138                         trk->ns, kp2->key);
1139     if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1140         PMIX_ERROR_LOG(rc);
1141         PMIX_RELEASE(kp2);
1142         return rc;
1143     }
1144     PMIX_RELEASE(kp2);  // maintain acctg
1145 
1146     /* if they didn't provide the job size, compute it as
1147      * being the number of provided procs (i.e., size of
1148      * ppn list) */
1149     if (!(PMIX_HASH_JOB_SIZE & flags)) {
1150         kp2 = PMIX_NEW(pmix_kval_t);
1151         kp2->key = strdup(PMIX_JOB_SIZE);
1152         kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1153         kp2->value->type = PMIX_UINT32;
1154         kp2->value->data.uint32 = totalprocs;
1155         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1156                             "[%s:%d] gds:hash:store_map for nspace %s: key %s",
1157                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
1158                             trk->ns, kp2->key);
1159         if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1160             PMIX_ERROR_LOG(rc);
1161             PMIX_RELEASE(kp2);
1162             return rc;
1163         }
1164         PMIX_RELEASE(kp2);  // maintain acctg
1165         flags |= PMIX_HASH_JOB_SIZE;
1166         trk->nptr->nprocs = totalprocs;
1167     }
1168 
1169     /* if they didn't provide a value for max procs, just
1170      * assume it is the same as the number of procs in the
1171      * job and store it */
1172     if (!(PMIX_HASH_MAX_PROCS & flags)) {
1173         kp2 = PMIX_NEW(pmix_kval_t);
1174         kp2->key = strdup(PMIX_MAX_PROCS);
1175         kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1176         kp2->value->type = PMIX_UINT32;
1177         kp2->value->data.uint32 = totalprocs;
1178         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1179                             "[%s:%d] gds:hash:store_map for nspace %s: key %s",
1180                             pmix_globals.myid.nspace, pmix_globals.myid.rank,
1181                             trk->ns, kp2->key);
1182         if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1183             PMIX_ERROR_LOG(rc);
1184             PMIX_RELEASE(kp2);
1185             return rc;
1186         }
1187         PMIX_RELEASE(kp2);  // maintain acctg
1188         flags |= PMIX_HASH_MAX_PROCS;
1189     }
1190 
1191 
1192     return PMIX_SUCCESS;
1193 }
1194 
hash_cache_job_info(struct pmix_namespace_t * ns,pmix_info_t info[],size_t ninfo)1195 pmix_status_t hash_cache_job_info(struct pmix_namespace_t *ns,
1196                                   pmix_info_t info[], size_t ninfo)
1197 {
1198     pmix_namespace_t *nptr = (pmix_namespace_t*)ns;
1199     pmix_job_t *trk;
1200     pmix_session_t *s = NULL, *sptr;
1201     pmix_hash_table_t *ht;
1202     pmix_kval_t *kp2, *kvptr;
1203     pmix_info_t *iptr;
1204     char **nodes=NULL, **procs=NULL;
1205     uint8_t *tmp;
1206     uint32_t sid=UINT32_MAX;
1207     pmix_rank_t rank;
1208     pmix_status_t rc=PMIX_SUCCESS;
1209     size_t n, j, size, len;
1210     uint32_t flags = 0;
1211     pmix_nodeinfo_t *nd, *ndptr;
1212     pmix_apptrkr_t *apptr;
1213     bool found;
1214 
1215     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1216                         "[%s:%d] gds:hash:cache_job_info for nspace %s with %lu info",
1217                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
1218                         nptr->nspace, ninfo);
1219 
1220     trk = get_tracker(nptr->nspace, true);
1221     if (NULL == trk) {
1222         return PMIX_ERR_NOMEM;
1223     }
1224 
1225     /* if there isn't any data, then be content with just
1226      * creating the tracker */
1227     if (NULL == info || 0 == ninfo) {
1228         return PMIX_SUCCESS;
1229     }
1230 
1231     /* cache the job info on the internal hash table for this nspace */
1232     ht = &trk->internal;
1233     for (n=0; n < ninfo; n++) {
1234         pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
1235                             "%s gds:hash:cache_job_info for key %s",
1236                             PMIX_NAME_PRINT(&pmix_globals.myid), info[n].key);
1237         if (PMIX_CHECK_KEY(&info[n], PMIX_SESSION_ID)) {
1238             PMIX_VALUE_GET_NUMBER(rc, &info[n].value, sid, uint32_t);
1239             if (PMIX_SUCCESS != rc) {
1240                 PMIX_ERROR_LOG(rc);
1241                 goto release;
1242             }
1243             /* see if we have this session */
1244             s = NULL;
1245             PMIX_LIST_FOREACH(sptr, &mysessions, pmix_session_t) {
1246                 if (sptr->session == sid) {
1247                     s = sptr;
1248                     break;
1249                 }
1250             }
1251             if (NULL == s) {
1252                 s = PMIX_NEW(pmix_session_t);
1253                 s->session = sid;
1254                 pmix_list_append(&mysessions, &s->super);
1255             }
1256             /* point the job at it */
1257             if (NULL == trk->session) {
1258                 PMIX_RETAIN(s);
1259                 trk->session = s;
1260             }
1261         } else if (PMIX_CHECK_KEY(&info[n], PMIX_SESSION_INFO_ARRAY)) {
1262             if (PMIX_SUCCESS != (rc = process_session_array(&info[n].value, trk))) {
1263                 PMIX_ERROR_LOG(rc);
1264                 goto release;
1265             }
1266         } else if (PMIX_CHECK_KEY(&info[n], PMIX_JOB_INFO_ARRAY)) {
1267             if (PMIX_SUCCESS != (rc = process_job_array(&info[n], trk, &flags, &procs, &nodes))) {
1268                 PMIX_ERROR_LOG(rc);
1269                 goto release;
1270             }
1271         } else if (PMIX_CHECK_KEY(&info[n], PMIX_APP_INFO_ARRAY)) {
1272             if (PMIX_SUCCESS != (rc = process_app_array(&info[n].value, trk))) {
1273                 PMIX_ERROR_LOG(rc);
1274                 goto release;
1275             }
1276         } else if (PMIX_CHECK_KEY(&info[n], PMIX_NODE_INFO_ARRAY)) {
1277             if (PMIX_SUCCESS != (rc = process_node_array(&info[n].value, &trk->nodeinfo))) {
1278                 PMIX_ERROR_LOG(rc);
1279                 goto release;
1280             }
1281         } else if (PMIX_CHECK_KEY(&info[n], PMIX_NODE_MAP)) {
1282             /* not allowed to get this more than once */
1283             if (flags & PMIX_HASH_NODE_MAP) {
1284                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1285                 return PMIX_ERR_BAD_PARAM;
1286             }
1287             /* parse the regex to get the argv array of node names */
1288             if (PMIX_REGEX == info[n].value.type) {
1289                 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(info[n].value.data.bo.bytes, &nodes))) {
1290                     PMIX_ERROR_LOG(rc);
1291                     goto release;
1292                 }
1293             } else if (PMIX_STRING == info[n].value.type) {
1294                 if (PMIX_SUCCESS != (rc = pmix_preg.parse_nodes(info[n].value.data.string, &nodes))) {
1295                     PMIX_ERROR_LOG(rc);
1296                     goto release;
1297                 }
1298             } else {
1299                 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
1300                 rc = PMIX_ERR_TYPE_MISMATCH;
1301                 goto release;
1302             }
1303             /* mark that we got the map */
1304             flags |= PMIX_HASH_NODE_MAP;
1305         } else if (PMIX_CHECK_KEY(&info[n], PMIX_PROC_MAP)) {
1306             /* not allowed to get this more than once */
1307             if (flags & PMIX_HASH_PROC_MAP) {
1308                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1309                 return PMIX_ERR_BAD_PARAM;
1310             }
1311             /* parse the regex to get the argv array containing proc ranks on each node */
1312             if (PMIX_REGEX == info[n].value.type) {
1313                 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(info[n].value.data.bo.bytes, &procs))) {
1314                     PMIX_ERROR_LOG(rc);
1315                     goto release;
1316                 }
1317             } else if (PMIX_STRING == info[n].value.type) {
1318                 if (PMIX_SUCCESS != (rc = pmix_preg.parse_procs(info[n].value.data.string, &procs))) {
1319                     PMIX_ERROR_LOG(rc);
1320                     goto release;
1321                 }
1322             } else {
1323                 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
1324                 rc = PMIX_ERR_TYPE_MISMATCH;
1325                 goto release;
1326             }
1327             /* mark that we got the map */
1328             flags |= PMIX_HASH_PROC_MAP;
1329         } else if (PMIX_CHECK_KEY(&info[n], PMIX_PROC_DATA)) {
1330             flags |= PMIX_HASH_PROC_DATA;
1331             found = false;
1332             /* an array of data pertaining to a specific proc */
1333             if (PMIX_DATA_ARRAY != info[n].value.type) {
1334                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1335                 rc = PMIX_ERR_TYPE_MISMATCH;
1336                 goto release;
1337             }
1338             size = info[n].value.data.darray->size;
1339             iptr = (pmix_info_t*)info[n].value.data.darray->array;
1340             /* first element of the array must be the rank */
1341             if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
1342                 PMIX_PROC_RANK != iptr[0].value.type) {
1343                 rc = PMIX_ERR_TYPE_MISMATCH;
1344                 PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
1345                 goto release;
1346             }
1347             rank = iptr[0].value.data.rank;
1348             /* cycle thru the values for this rank and store them */
1349             for (j=1; j < size; j++) {
1350                 kp2 = PMIX_NEW(pmix_kval_t);
1351                 if (NULL == kp2) {
1352                     rc = PMIX_ERR_NOMEM;
1353                     goto release;
1354                 }
1355                 kp2->key = strdup(iptr[j].key);
1356                 PMIX_VALUE_XFER(rc, kp2->value, &iptr[j].value);
1357                 if (PMIX_SUCCESS != rc) {
1358                     PMIX_ERROR_LOG(rc);
1359                     PMIX_RELEASE(kp2);
1360                     goto release;
1361                 }
1362                 /* if the value contains a string that is longer than the
1363                  * limit, then compress it */
1364                 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
1365                     if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
1366                         if (NULL == tmp) {
1367                             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1368                             rc = PMIX_ERR_NOMEM;
1369                             goto release;
1370                         }
1371                         kp2->value->type = PMIX_COMPRESSED_STRING;
1372                         free(kp2->value->data.string);
1373                         kp2->value->data.bo.bytes = (char*)tmp;
1374                         kp2->value->data.bo.size = len;
1375                     }
1376                 }
1377                 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1378                                     "[%s:%d] gds:hash:cache_job_info proc data for [%s:%u]: key %s",
1379                                     pmix_globals.myid.nspace, pmix_globals.myid.rank,
1380                                     trk->ns, rank, kp2->key);
1381                 /* store it in the hash_table */
1382                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1383                     PMIX_ERROR_LOG(rc);
1384                     PMIX_RELEASE(kp2);
1385                     goto release;
1386                 }
1387                 /* if this is the appnum, pass it to the pmdl framework */
1388                 if (PMIX_CHECK_KEY(kp2, PMIX_APPNUM)) {
1389                     found = true;
1390                     if (rank == pmix_globals.myid.rank) {
1391                         pmix_globals.appnum = kp2->value->data.uint32;
1392                     }
1393                 }
1394                 PMIX_RELEASE(kp2);  // maintain acctg
1395             }
1396             if (!found) {
1397                 /* if they didn't give us an appnum for this proc, we have
1398                  * to assume it is appnum=0 */
1399                 uint32_t zero = 0;
1400                 kp2 = PMIX_NEW(pmix_kval_t);
1401                 if (NULL == kp2) {
1402                     rc = PMIX_ERR_NOMEM;
1403                     goto release;
1404                 }
1405                 kp2->key = strdup(PMIX_APPNUM);
1406                 PMIX_VALUE_CREATE(kp2->value, 1);
1407                 PMIX_VALUE_LOAD(kp2->value, &zero, PMIX_UINT32);
1408                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1409                     PMIX_ERROR_LOG(rc);
1410                     PMIX_RELEASE(kp2);
1411                     goto release;
1412                 }
1413                 PMIX_RELEASE(kp2);  // maintain acctg
1414             }
1415         } else if (pmix_check_node_info(info[n].key)) {
1416             /* they are passing us the node-level info for just this
1417              * node - start by seeing if our node is on the list */
1418             nd = NULL;
1419             PMIX_LIST_FOREACH(ndptr, &trk->nodeinfo, pmix_nodeinfo_t) {
1420                 if (check_nodename(ndptr, pmix_globals.hostname)) {
1421                     nd = ndptr;
1422                     break;
1423                 }
1424             }
1425             /* if not, then add it */
1426             if (NULL == nd) {
1427                 nd = PMIX_NEW(pmix_nodeinfo_t);
1428                 nd->hostname = strdup(pmix_globals.hostname);
1429                 pmix_list_append(&trk->nodeinfo, &nd->super);
1430             }
1431             /* ensure the value isn't already on the node info */
1432             PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
1433                 if (PMIX_CHECK_KEY(kp2, info[n].key)) {
1434                     pmix_list_remove_item(&nd->info, &kp2->super);
1435                     PMIX_RELEASE(kp2);
1436                     break;
1437                 }
1438             }
1439             /* add the provided value */
1440             kp2 = PMIX_NEW(pmix_kval_t);
1441             kp2->key = strdup(info[n].key);
1442             PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
1443             pmix_list_append(&nd->info, &kp2->super);
1444         } else if (pmix_check_app_info(info[n].key)) {
1445             /* they are passing us app-level info for a default
1446              * app number - have to assume it is app=0 */
1447             if (0 == pmix_list_get_size(&trk->apps)) {
1448                 apptr = PMIX_NEW(pmix_apptrkr_t);
1449                 pmix_list_append(&trk->apps, &apptr->super);
1450             } else if (1 < pmix_list_get_size(&trk->apps)) {
1451                 rc = PMIX_ERR_BAD_PARAM;
1452                 goto release;
1453             } else {
1454                 apptr = (pmix_apptrkr_t*)pmix_list_get_first(&trk->apps);
1455             }
1456             /* ensure the value isn't already on the app info */
1457             PMIX_LIST_FOREACH(kp2, &apptr->appinfo, pmix_kval_t) {
1458                 if (PMIX_CHECK_KEY(kp2, info[n].key)) {
1459                     pmix_list_remove_item(&apptr->appinfo, &kp2->super);
1460                     PMIX_RELEASE(kp2);
1461                     break;
1462                 }
1463             }
1464             /* add the provided value */
1465             kp2 = PMIX_NEW(pmix_kval_t);
1466             kp2->key = strdup(info[n].key);
1467             PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
1468             pmix_list_append(&apptr->appinfo, &kp2->super);
1469         } else {
1470             /* just a value relating to the entire job */
1471             kp2 = PMIX_NEW(pmix_kval_t);
1472             if (NULL == kp2) {
1473                 rc = PMIX_ERR_NOMEM;
1474                 goto release;
1475             }
1476             kp2->key = strdup(info[n].key);
1477             PMIX_VALUE_XFER(rc, kp2->value, &info[n].value);
1478             if (PMIX_SUCCESS != rc) {
1479                 PMIX_ERROR_LOG(rc);
1480                 PMIX_RELEASE(kp2);
1481                 goto release;
1482             }
1483             /* if the value contains a string that is longer than the
1484              * limit, then compress it */
1485             if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
1486                 if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
1487                     if (NULL == tmp) {
1488                         rc = PMIX_ERR_NOMEM;
1489                         PMIX_ERROR_LOG(rc);
1490                         PMIX_RELEASE(kp2);
1491                         goto release;
1492                     }
1493                     kp2->value->type = PMIX_COMPRESSED_STRING;
1494                     free(kp2->value->data.string);
1495                     kp2->value->data.bo.bytes = (char*)tmp;
1496                     kp2->value->data.bo.size = len;
1497                 }
1498             }
1499             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1500                 PMIX_ERROR_LOG(rc);
1501                 PMIX_RELEASE(kp2);
1502                 goto release;
1503             }
1504             PMIX_RELEASE(kp2);  // maintain acctg
1505             /* if this is the job size, then store it in
1506              * the nptr tracker and flag that we were given it */
1507             if (PMIX_CHECK_KEY(&info[n], PMIX_JOB_SIZE)) {
1508                 nptr->nprocs = info[n].value.data.uint32;
1509                 flags |= PMIX_HASH_JOB_SIZE;
1510             } else if (PMIX_CHECK_KEY(&info[n], PMIX_NUM_NODES)) {
1511                 flags |= PMIX_HASH_NUM_NODES;
1512             } else if (PMIX_CHECK_KEY(&info[n], PMIX_MAX_PROCS)) {
1513                 flags |= PMIX_HASH_MAX_PROCS;
1514             }
1515         }
1516     }
1517 
1518     /* now add any global data that was provided */
1519     if (!trk->gdata_added) {
1520         PMIX_LIST_FOREACH(kvptr, &pmix_server_globals.gdata, pmix_kval_t) {
1521             /* sadly, the data cannot simultaneously exist on two lists,
1522              * so we must make a copy of it here */
1523             kp2 = PMIX_NEW(pmix_kval_t);
1524             if (NULL == kp2) {
1525                 rc = PMIX_ERR_NOMEM;
1526                 goto release;
1527             }
1528             kp2->key = strdup(kvptr->key);
1529             PMIX_VALUE_XFER(rc, kp2->value, kvptr->value);
1530             if (PMIX_SUCCESS != rc) {
1531                 PMIX_ERROR_LOG(rc);
1532                 PMIX_RELEASE(kp2);
1533                 goto release;
1534             }
1535             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
1536                 PMIX_ERROR_LOG(rc);
1537                 PMIX_RELEASE(kp2);
1538                 break;
1539             }
1540             PMIX_RELEASE(kp2);  // maintain acctg
1541         }
1542         trk->gdata_added = true;
1543     }
1544 
1545     /* we must have the proc AND node maps */
1546     if (NULL != procs && NULL != nodes) {
1547         if (PMIX_SUCCESS != (rc = store_map(trk, nodes, procs, flags))) {
1548             PMIX_ERROR_LOG(rc);
1549         }
1550     }
1551 
1552   release:
1553     if (NULL != nodes) {
1554         pmix_argv_free(nodes);
1555     }
1556     if (NULL != procs) {
1557         pmix_argv_free(procs);
1558     }
1559     return rc;
1560 }
1561 
register_info(pmix_peer_t * peer,pmix_namespace_t * ns,pmix_buffer_t * reply)1562 static pmix_status_t register_info(pmix_peer_t *peer,
1563                                    pmix_namespace_t *ns,
1564                                    pmix_buffer_t *reply)
1565 {
1566     pmix_job_t *trk;
1567     pmix_hash_table_t *ht;
1568     pmix_value_t *val, blob;
1569     pmix_status_t rc = PMIX_SUCCESS;
1570     pmix_info_t *info;
1571     size_t ninfo, n;
1572     pmix_kval_t kv, *kvptr;
1573     pmix_buffer_t buf;
1574     pmix_rank_t rank;
1575     pmix_list_t results;
1576     char *hname;
1577 
1578     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1579                         "REGISTERING FOR PEER %s type %d.%d.%d", PMIX_PNAME_PRINT(&peer->info->pname),
1580                         peer->proc_type.major, peer->proc_type.minor, peer->proc_type.release);
1581 
1582     trk = get_tracker(ns->nspace, true);
1583     if (NULL == trk) {
1584         return PMIX_ERR_NOMEM;
1585     }
1586     /* the job data is stored on the internal hash table */
1587     ht = &trk->internal;
1588 
1589     /* fetch all values from the hash table tied to rank=wildcard */
1590     val = NULL;
1591     rc = pmix_hash_fetch(ht, PMIX_RANK_WILDCARD, NULL, &val);
1592     if (PMIX_SUCCESS != rc) {
1593         PMIX_ERROR_LOG(rc);
1594         if (NULL != val) {
1595             PMIX_VALUE_RELEASE(val);
1596         }
1597         return rc;
1598     }
1599 
1600     if (NULL == val || NULL == val->data.darray ||
1601         PMIX_INFO != val->data.darray->type ||
1602         0 == val->data.darray->size) {
1603         return PMIX_ERR_NOT_FOUND;
1604     }
1605     info = (pmix_info_t*)val->data.darray->array;
1606     ninfo = val->data.darray->size;
1607     for (n=0; n < ninfo; n++) {
1608         kv.key = info[n].key;
1609         kv.value = &info[n].value;
1610         PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
1611     }
1612     if (NULL != val) {
1613         PMIX_VALUE_RELEASE(val);
1614     }
1615 
1616     /* add all values in the jobinfo list */
1617     PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
1618         PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1619     }
1620 
1621     /* get any node-level info for this job */
1622     PMIX_CONSTRUCT(&results, pmix_list_t);
1623     rc = fetch_nodeinfo(NULL, &trk->nodeinfo, NULL, 0, &results);
1624     if (PMIX_SUCCESS == rc) {
1625         PMIX_LIST_FOREACH(kvptr, &results, pmix_kval_t) {
1626             /* if the peer is earlier than v3.1.5, it is expecting
1627              * node info to be in the form of an array, but with the
1628              * hostname as the key. Detect and convert that here */
1629             if (PMIX_PEER_IS_EARLIER(peer, 3, 1, 5)) {
1630                 info = (pmix_info_t*)kvptr->value->data.darray->array;
1631                 ninfo = kvptr->value->data.darray->size;
1632                 hname = NULL;
1633                 /* find the hostname */
1634                 for (n=0; n < ninfo; n++) {
1635                     if (PMIX_CHECK_KEY(&info[n], PMIX_HOSTNAME)) {
1636                         free(kvptr->key);
1637                         kvptr->key = strdup(info[n].value.data.string);
1638                         PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1639                         hname = kvptr->key;
1640                         break;
1641                     }
1642                 }
1643                 if (NULL != hname && check_hostname(pmix_globals.hostname, hname)) {
1644                     /* older versions are looking for node-level keys for
1645                      * only their own node as standalone keys */
1646                     for (n=0; n < ninfo; n++) {
1647                         if (pmix_check_node_info(info[n].key)) {
1648                             kv.key = strdup(info[n].key);
1649                             kv.value = &info[n].value;
1650                             PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
1651                         }
1652                     }
1653                 }
1654             } else {
1655                 PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1656             }
1657         }
1658     }
1659     PMIX_LIST_DESTRUCT(&results);
1660 
1661     /* get any app-level info for this job */
1662     PMIX_CONSTRUCT(&results, pmix_list_t);
1663     rc = fetch_appinfo(NULL, &trk->apps, NULL, 0, &results);
1664     if (PMIX_SUCCESS == rc) {
1665         PMIX_LIST_FOREACH(kvptr, &results, pmix_kval_t) {
1666             PMIX_BFROPS_PACK(rc, peer, reply, kvptr, 1, PMIX_KVAL);
1667         }
1668     }
1669     PMIX_LIST_DESTRUCT(&results);
1670 
1671     /* get the proc-level data for each proc in the job */
1672     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1673                         "FETCHING PROC INFO FOR NSPACE %s NPROCS %u",
1674                         ns->nspace, ns->nprocs);
1675     for (rank=0; rank < ns->nprocs; rank++) {
1676         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1677                             "FETCHING PROC INFO FOR RANK %s", PMIX_RANK_PRINT(rank));
1678         val = NULL;
1679         rc = pmix_hash_fetch(ht, rank, NULL, &val);
1680         if (PMIX_SUCCESS != rc && PMIX_ERR_PROC_ENTRY_NOT_FOUND != rc) {
1681             PMIX_ERROR_LOG(rc);
1682             if (NULL != val) {
1683                 PMIX_VALUE_RELEASE(val);
1684             }
1685             return rc;
1686         }
1687         PMIX_CONSTRUCT(&buf, pmix_buffer_t);
1688         PMIX_BFROPS_PACK(rc, peer, &buf, &rank, 1, PMIX_PROC_RANK);
1689 
1690         if (NULL != val) {
1691             info = (pmix_info_t*)val->data.darray->array;
1692             ninfo = val->data.darray->size;
1693             for (n=0; n < ninfo; n++) {
1694                 kv.key = info[n].key;
1695                 kv.value = &info[n].value;
1696                 PMIX_BFROPS_PACK(rc, peer, &buf, &kv, 1, PMIX_KVAL);
1697             }
1698         }
1699         kv.key = PMIX_PROC_BLOB;
1700         kv.value = &blob;
1701         blob.type = PMIX_BYTE_OBJECT;
1702         PMIX_UNLOAD_BUFFER(&buf, blob.data.bo.bytes, blob.data.bo.size);
1703         PMIX_BFROPS_PACK(rc, peer, reply, &kv, 1, PMIX_KVAL);
1704         PMIX_VALUE_DESTRUCT(&blob);
1705         PMIX_DESTRUCT(&buf);
1706 
1707         if (NULL != val) {
1708             PMIX_VALUE_RELEASE(val);
1709         }
1710     }
1711     return rc;
1712 }
1713 
1714 /* the purpose of this function is to pack the job-level
1715  * info stored in the pmix_namespace_t into a buffer and send
1716  * it to the given client */
hash_register_job_info(struct pmix_peer_t * pr,pmix_buffer_t * reply)1717 static pmix_status_t hash_register_job_info(struct pmix_peer_t *pr,
1718                                             pmix_buffer_t *reply)
1719 {
1720     pmix_peer_t *peer = (pmix_peer_t*)pr;
1721     pmix_namespace_t *ns = peer->nptr;
1722     char *msg;
1723     pmix_status_t rc;
1724     pmix_job_t *trk;
1725 
1726     if (!PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
1727         !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
1728         /* this function is only available on servers */
1729         PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
1730         return PMIX_ERR_NOT_SUPPORTED;
1731     }
1732 
1733     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1734                         "[%s:%d] gds:hash:register_job_info for peer [%s:%d]",
1735                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
1736                         peer->info->pname.nspace, peer->info->pname.rank);
1737 
1738     /* first see if we already have processed this data
1739      * for another peer in this nspace so we don't waste
1740      * time doing it again */
1741     if (NULL != ns->jobbkt) {
1742         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1743                             "[%s:%d] gds:hash:register_job_info copying prepacked payload",
1744                             pmix_globals.myid.nspace, pmix_globals.myid.rank);
1745         /* we have packed this before - can just deliver it */
1746         PMIX_BFROPS_COPY_PAYLOAD(rc, peer, reply, ns->jobbkt);
1747         if (PMIX_SUCCESS != rc) {
1748             PMIX_ERROR_LOG(rc);
1749         }
1750         /* now see if we have delivered it to all our local
1751          * clients for this nspace */
1752         if (!PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer) && ns->ndelivered == ns->nlocalprocs) {
1753             /* we have, so let's get rid of the packed
1754              * copy of the data */
1755             PMIX_RELEASE(ns->jobbkt);
1756             ns->jobbkt = NULL;
1757         }
1758         return rc;
1759     }
1760 
1761     /* setup a tracker for this nspace as we will likely
1762      * need it again */
1763     trk = get_tracker(ns->nspace, true);
1764     if (NULL == trk) {
1765         return PMIX_ERR_NOMEM;
1766     }
1767 
1768     /* the job info for the specified nspace has
1769      * been given to us in the info array - pack
1770      * them for delivery */
1771     /* pack the name of the nspace */
1772     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1773                         "[%s:%d] gds:hash:register_job_info packing new payload",
1774                         pmix_globals.myid.nspace, pmix_globals.myid.rank);
1775     msg = ns->nspace;
1776     PMIX_BFROPS_PACK(rc, peer, reply, &msg, 1, PMIX_STRING);
1777     if (PMIX_SUCCESS != rc) {
1778         PMIX_ERROR_LOG(rc);
1779         return rc;
1780     }
1781 
1782     rc = register_info(peer, ns, reply);
1783     if (PMIX_SUCCESS == rc) {
1784         /* if we have more than one local client for this nspace,
1785          * save this packed object so we don't do this again */
1786         if (PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer) || 1 < ns->nlocalprocs) {
1787             PMIX_RETAIN(reply);
1788             ns->jobbkt = reply;
1789         }
1790     } else {
1791         PMIX_ERROR_LOG(rc);
1792     }
1793 
1794     return rc;
1795 }
1796 
hash_store_job_info(const char * nspace,pmix_buffer_t * buf)1797 static pmix_status_t hash_store_job_info(const char *nspace,
1798                                          pmix_buffer_t *buf)
1799 {
1800     pmix_status_t rc = PMIX_SUCCESS;
1801     pmix_kval_t *kptr, *kp2, *kp3, kv;
1802     int32_t cnt;
1803     size_t nnodes, len;
1804     uint32_t i, j;
1805     char **procs = NULL;
1806     uint8_t *tmp;
1807     pmix_byte_object_t *bo;
1808     pmix_buffer_t buf2;
1809     int rank;
1810     pmix_job_t *trk;
1811     pmix_hash_table_t *ht;
1812     char **nodelist = NULL;
1813     pmix_nodeinfo_t *nd, *ndptr;
1814     pmix_namespace_t *ns, *nptr;
1815 
1816     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1817                         "[%s:%u] pmix:gds:hash store job info for nspace %s",
1818                         pmix_globals.myid.nspace, pmix_globals.myid.rank, nspace);
1819 
1820     if (PMIX_PEER_IS_SERVER(pmix_globals.mypeer) &&
1821         !PMIX_PEER_IS_LAUNCHER(pmix_globals.mypeer)) {
1822         /* this function is NOT available on servers */
1823         PMIX_ERROR_LOG(PMIX_ERR_NOT_SUPPORTED);
1824         return PMIX_ERR_NOT_SUPPORTED;
1825     }
1826 
1827     /* check buf data */
1828     if ((NULL == buf) || (0 == buf->bytes_used)) {
1829         rc = PMIX_ERR_BAD_PARAM;
1830         PMIX_ERROR_LOG(rc);
1831         return rc;
1832     }
1833 
1834     trk = get_tracker(nspace, true);
1835     if (NULL == trk) {
1836         return PMIX_ERR_NOMEM;
1837     }
1838     ht = &trk->internal;
1839 
1840     /* retrieve the nspace pointer */
1841     nptr = NULL;
1842     PMIX_LIST_FOREACH(ns, &pmix_globals.nspaces, pmix_namespace_t) {
1843         if (0 == strcmp(ns->nspace, nspace)) {
1844             nptr = ns;
1845             break;
1846         }
1847     }
1848     if (NULL == nptr) {
1849         /* only can happen if we are out of mem */
1850         return PMIX_ERR_NOMEM;
1851     }
1852 
1853     cnt = 1;
1854     kptr = PMIX_NEW(pmix_kval_t);
1855     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1856                        buf, kptr, &cnt, PMIX_KVAL);
1857     while (PMIX_SUCCESS == rc) {
1858         pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
1859                             "[%s:%u] pmix:gds:hash store job info working key %s",
1860                             pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
1861         if (PMIX_CHECK_KEY(kptr, PMIX_PROC_BLOB)) {
1862             bo = &(kptr->value->data.bo);
1863             PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
1864             PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
1865             /* start by unpacking the rank */
1866             cnt = 1;
1867             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1868                                &buf2, &rank, &cnt, PMIX_PROC_RANK);
1869             if (PMIX_SUCCESS != rc) {
1870                 PMIX_ERROR_LOG(rc);
1871                 PMIX_RELEASE(kptr);
1872                 PMIX_DESTRUCT(&buf2);
1873                 return rc;
1874             }
1875             /* unpack the blob and save the values for this rank */
1876             cnt = 1;
1877             kp2 = PMIX_NEW(pmix_kval_t);
1878             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1879                                &buf2, kp2, &cnt, PMIX_KVAL);
1880             while (PMIX_SUCCESS == rc) {
1881                 /* if the value contains a string that is longer than the
1882                  * limit, then compress it */
1883                 if (PMIX_STRING_SIZE_CHECK(kp2->value)) {
1884                     if (pmix_compress.compress_string(kp2->value->data.string, &tmp, &len)) {
1885                         if (NULL == tmp) {
1886                             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
1887                             rc = PMIX_ERR_NOMEM;
1888                             return rc;
1889                         }
1890                         kp2->value->type = PMIX_COMPRESSED_STRING;
1891                         free(kp2->value->data.string);
1892                         kp2->value->data.bo.bytes = (char*)tmp;
1893                         kp2->value->data.bo.size = len;
1894                     }
1895                 }
1896                 /* this is data provided by a job-level exchange, so store it
1897                  * in the job-level data hash_table */
1898                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1899                     PMIX_ERROR_LOG(rc);
1900                     PMIX_RELEASE(kp2);
1901                     PMIX_RELEASE(kptr);
1902                     PMIX_DESTRUCT(&buf2);
1903                     return rc;
1904                 }
1905                 PMIX_RELEASE(kp2); // maintain accounting
1906                 cnt = 1;
1907                 kp2 = PMIX_NEW(pmix_kval_t);
1908                 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1909                                    &buf2, kp2, &cnt, PMIX_KVAL);
1910             }
1911             /* cleanup */
1912             PMIX_DESTRUCT(&buf2);  // releases the original kptr data
1913             PMIX_RELEASE(kp2);
1914         } else if (PMIX_CHECK_KEY(kptr, PMIX_MAP_BLOB)) {
1915             /* transfer the byte object for unpacking */
1916             bo = &(kptr->value->data.bo);
1917             PMIX_CONSTRUCT(&buf2, pmix_buffer_t);
1918             PMIX_LOAD_BUFFER(pmix_client_globals.myserver, &buf2, bo->bytes, bo->size);
1919             /* start by unpacking the number of nodes */
1920             cnt = 1;
1921             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1922                                &buf2, &nnodes, &cnt, PMIX_SIZE);
1923             if (PMIX_SUCCESS != rc) {
1924                 PMIX_ERROR_LOG(rc);
1925                 PMIX_RELEASE(kptr);
1926                 PMIX_DESTRUCT(&buf2);
1927                 return rc;
1928             }
1929             for (i=0; i < nnodes; i++) {
1930                 /* unpack the list of procs on each node */
1931                 cnt = 1;
1932                 PMIX_CONSTRUCT(&kv, pmix_kval_t);
1933                 PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
1934                                    &buf2, &kv, &cnt, PMIX_KVAL);
1935                 if (PMIX_SUCCESS != rc) {
1936                     PMIX_ERROR_LOG(rc);
1937                     PMIX_RELEASE(kptr);
1938                     PMIX_DESTRUCT(&buf2);
1939                     PMIX_DESTRUCT(&kv);
1940                     return rc;
1941                 }
1942                 /* track the nodes in this nspace */
1943                 pmix_argv_append_nosize(&nodelist, kv.key);
1944                 /* check and see if we already have this node */
1945                 nd = NULL;
1946                 PMIX_LIST_FOREACH(ndptr, &trk->nodeinfo, pmix_nodeinfo_t) {
1947                     if (check_nodename(ndptr, kv.key)) {
1948                         /* we assume that the data is updating the current
1949                          * values */
1950                         nd = ndptr;
1951                         break;
1952                     }
1953                 }
1954                 if (NULL == nd) {
1955                     nd = PMIX_NEW(pmix_nodeinfo_t);
1956                     nd->hostname = strdup(kv.key);
1957                     pmix_list_append(&trk->nodeinfo, &nd->super);
1958                 }
1959                 /* save the list of peers for this node */
1960                 kp2 = PMIX_NEW(pmix_kval_t);
1961                 if (NULL == kp2) {
1962                     PMIX_RELEASE(kptr);
1963                     return PMIX_ERR_NOMEM;
1964                 }
1965                 kp2->key = strdup(PMIX_LOCAL_PEERS);
1966                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1967                 if (NULL == kp2->value) {
1968                     PMIX_RELEASE(kp2);
1969                     PMIX_RELEASE(kptr);
1970                     return PMIX_ERR_NOMEM;
1971                 }
1972                 kp2->value->type = PMIX_STRING;
1973                 kp2->value->data.string = strdup(kv.value->data.string);
1974                 /* ensure this item only appears once on the list */
1975                 PMIX_LIST_FOREACH(kp3, &nd->info, pmix_kval_t) {
1976                     if (PMIX_CHECK_KEY(kp3, kp2->key)) {
1977                         pmix_list_remove_item(&nd->info, &kp3->super);
1978                         PMIX_RELEASE(kp3);
1979                         break;
1980                     }
1981                 }
1982                 pmix_list_append(&nd->info, &kp2->super);
1983                 /* split the list of procs so we can store their
1984                  * individual location data */
1985                 procs = pmix_argv_split(kv.value->data.string, ',');
1986                 for (j=0; NULL != procs[j]; j++) {
1987                     /* store the hostname for each proc - again, this is
1988                      * data obtained via a job-level exchange, so store it
1989                      * in the job-level data hash_table */
1990                     kp2 = PMIX_NEW(pmix_kval_t);
1991                     kp2->key = strdup(PMIX_HOSTNAME);
1992                     kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
1993                     kp2->value->type = PMIX_STRING;
1994                     kp2->value->data.string = strdup(kv.key);
1995                     rank = strtol(procs[j], NULL, 10);
1996                     if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, rank, kp2))) {
1997                         PMIX_ERROR_LOG(rc);
1998                         PMIX_RELEASE(kp2);
1999                         PMIX_RELEASE(kptr);
2000                         PMIX_DESTRUCT(&kv);
2001                         PMIX_DESTRUCT(&buf2);
2002                         pmix_argv_free(procs);
2003                         return rc;
2004                     }
2005                     PMIX_RELEASE(kp2);  // maintain acctg
2006                 }
2007                 pmix_argv_free(procs);
2008                 PMIX_DESTRUCT(&kv);
2009             }
2010             if (NULL != nodelist) {
2011                 /* store the comma-delimited list of nodes hosting
2012                  * procs in this nspace */
2013                 kp2 = PMIX_NEW(pmix_kval_t);
2014                 kp2->key = strdup(PMIX_NODE_LIST);
2015                 kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2016                 kp2->value->type = PMIX_STRING;
2017                 kp2->value->data.string = pmix_argv_join(nodelist, ',');
2018                 pmix_argv_free(nodelist);
2019                 if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kp2))) {
2020                     PMIX_ERROR_LOG(rc);
2021                     PMIX_RELEASE(kp2);
2022                     PMIX_RELEASE(kptr);
2023                     PMIX_DESTRUCT(&kv);
2024                     PMIX_DESTRUCT(&buf2);
2025                     return rc;
2026                 }
2027                 PMIX_RELEASE(kp2);  // maintain acctg
2028             }
2029             /* cleanup */
2030             PMIX_DESTRUCT(&buf2);
2031         } else if (PMIX_CHECK_KEY(kptr, PMIX_APP_INFO_ARRAY)) {
2032             if (PMIX_SUCCESS != (rc = process_app_array(kptr->value, trk))) {
2033                 PMIX_ERROR_LOG(rc);
2034                 PMIX_RELEASE(kptr);
2035                 return rc;
2036             }
2037         } else if (PMIX_CHECK_KEY(kptr, PMIX_NODE_INFO_ARRAY)) {
2038             if (PMIX_SUCCESS != (rc = process_node_array(kptr->value, &trk->nodeinfo))) {
2039                 PMIX_ERROR_LOG(rc);
2040                 PMIX_RELEASE(kptr);
2041                 return rc;
2042             }
2043         } else {
2044             /* if the value contains a string that is longer than the
2045              * limit, then compress it */
2046             if (PMIX_STRING_SIZE_CHECK(kptr->value)) {
2047                 if (pmix_compress.compress_string(kptr->value->data.string, &tmp, &len)) {
2048                     if (NULL == tmp) {
2049                         PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2050                         rc = PMIX_ERR_NOMEM;
2051                         PMIX_RELEASE(kptr);
2052                         return rc;
2053                     }
2054                     kptr->value->type = PMIX_COMPRESSED_STRING;
2055                     free(kptr->value->data.string);
2056                     kptr->value->data.bo.bytes = (char*)tmp;
2057                     kptr->value->data.bo.size = len;
2058                 }
2059             }
2060             pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2061                                 "[%s:%u] pmix:gds:hash store job info storing key %s for WILDCARD rank",
2062                                 pmix_globals.myid.nspace, pmix_globals.myid.rank, kptr->key);
2063             if (PMIX_SUCCESS != (rc = pmix_hash_store(ht, PMIX_RANK_WILDCARD, kptr))) {
2064                 PMIX_ERROR_LOG(rc);
2065                 PMIX_RELEASE(kptr);
2066                 return rc;
2067             }
2068             /* if this is the job size, then store it in
2069              * the nptr tracker */
2070             if (0 == nptr->nprocs && PMIX_CHECK_KEY(kptr, PMIX_JOB_SIZE)) {
2071                 nptr->nprocs = kptr->value->data.uint32;
2072             }
2073         }
2074         PMIX_RELEASE(kptr);
2075         kptr = PMIX_NEW(pmix_kval_t);
2076         cnt = 1;
2077         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
2078                            buf, kptr, &cnt, PMIX_KVAL);
2079     }
2080     /* need to release the leftover kptr */
2081     PMIX_RELEASE(kptr);
2082 
2083     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2084         PMIX_ERROR_LOG(rc);
2085     } else {
2086         rc = PMIX_SUCCESS;
2087     }
2088     return rc;
2089 }
2090 
hash_store(const pmix_proc_t * proc,pmix_scope_t scope,pmix_kval_t * kv)2091 static pmix_status_t hash_store(const pmix_proc_t *proc,
2092                                 pmix_scope_t scope,
2093                                 pmix_kval_t *kv)
2094 {
2095     pmix_job_t *trk;
2096     pmix_status_t rc;
2097     pmix_kval_t *kp;
2098     pmix_rank_t rank;
2099     size_t j, size, len;
2100     pmix_info_t *iptr;
2101     uint8_t *tmp;
2102 
2103     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2104                         "%s gds:hash:hash_store for proc %s key %s type %s scope %s",
2105                         PMIX_NAME_PRINT(&pmix_globals.myid),
2106                         PMIX_NAME_PRINT(proc), kv->key,
2107                         PMIx_Data_type_string(kv->value->type), PMIx_Scope_string(scope));
2108 
2109     if (NULL == kv->key) {
2110         return PMIX_ERR_BAD_PARAM;
2111     }
2112 
2113     /* find the hash table for this nspace */
2114     trk = get_tracker(proc->nspace, true);
2115     if (NULL == trk) {
2116         return PMIX_ERR_NOMEM;
2117     }
2118 
2119     /* see if the proc is me - cannot use CHECK_PROCID as
2120      * we don't want rank=wildcard to match */
2121     if (proc->rank == pmix_globals.myid.rank &&
2122         PMIX_CHECK_NSPACE(proc->nspace, pmix_globals.myid.nspace)) {
2123         if (PMIX_INTERNAL != scope) {
2124             /* always maintain a copy of my own info here to simplify
2125              * later retrieval */
2126             kp = PMIX_NEW(pmix_kval_t);
2127             if (NULL == kp) {
2128                 return PMIX_ERR_NOMEM;
2129             }
2130             kp->key = strdup(kv->key);
2131             kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2132             if (NULL == kp->value) {
2133                 PMIX_RELEASE(kp);
2134                 return PMIX_ERR_NOMEM;
2135             }
2136             PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
2137             if (PMIX_SUCCESS != rc) {
2138                 PMIX_RELEASE(kp);
2139                 return rc;
2140             }
2141             if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kp))) {
2142                 PMIX_ERROR_LOG(rc);
2143                 PMIX_RELEASE(kp);
2144                 return rc;
2145             }
2146             PMIX_RELEASE(kp);  // maintain accounting
2147         }
2148     }
2149 
2150     /* if the number of procs for the nspace object is new, then update it */
2151     if (0 == trk->nptr->nprocs && PMIX_CHECK_KEY(kv, PMIX_JOB_SIZE)) {
2152         trk->nptr->nprocs = kv->value->data.uint32;
2153     }
2154 
2155     /* store it in the corresponding hash table */
2156     if (PMIX_INTERNAL == scope) {
2157         /* if this is proc data, then we have to expand it and
2158          * store the values on that rank */
2159         if (PMIX_CHECK_KEY(kv, PMIX_PROC_DATA)) {
2160             /* an array of data pertaining to a specific proc */
2161             if (PMIX_DATA_ARRAY != kv->value->type) {
2162                 PMIX_ERROR_LOG(PMIX_ERR_TYPE_MISMATCH);
2163                 return PMIX_ERR_TYPE_MISMATCH;
2164             }
2165             size = kv->value->data.darray->size;
2166             iptr = (pmix_info_t*)kv->value->data.darray->array;
2167             /* first element of the array must be the rank */
2168             if (0 != strcmp(iptr[0].key, PMIX_RANK) ||
2169                 PMIX_PROC_RANK != iptr[0].value.type) {
2170                 rc = PMIX_ERR_TYPE_MISMATCH;
2171                 PMIX_ERROR_LOG(rc);
2172                 return rc;
2173             }
2174             rank = iptr[0].value.data.rank;
2175             /* cycle thru the values for this rank and store them */
2176             for (j=1; j < size; j++) {
2177                 kp = PMIX_NEW(pmix_kval_t);
2178                 if (NULL == kp) {
2179                     rc = PMIX_ERR_NOMEM;
2180                     return rc;
2181                 }
2182                 kp->key = strdup(iptr[j].key);
2183                 PMIX_VALUE_XFER(rc, kp->value, &iptr[j].value);
2184                 if (PMIX_SUCCESS != rc) {
2185                     PMIX_ERROR_LOG(rc);
2186                     PMIX_RELEASE(kp);
2187                     return rc;
2188                 }
2189                 /* if the value contains a string that is longer than the
2190                  * limit, then compress it */
2191                 if (PMIX_STRING_SIZE_CHECK(kp->value)) {
2192                     if (pmix_compress.compress_string(kp->value->data.string, &tmp, &len)) {
2193                         if (NULL == tmp) {
2194                             PMIX_ERROR_LOG(PMIX_ERR_NOMEM);
2195                             rc = PMIX_ERR_NOMEM;
2196                             return rc;
2197                         }
2198                         kp->value->type = PMIX_COMPRESSED_STRING;
2199                         free(kp->value->data.string);
2200                         kp->value->data.bo.bytes = (char*)tmp;
2201                         kp->value->data.bo.size = len;
2202                     }
2203                 }
2204                 pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2205                                     "%s gds:hash:STORE data for nspace %s rank %u: key %s",
2206                                     PMIX_NAME_PRINT(&pmix_globals.myid),
2207                                     trk->ns, rank, kp->key);
2208                 /* store it in the hash_table */
2209                 if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, rank, kp))) {
2210                     PMIX_ERROR_LOG(rc);
2211                     PMIX_RELEASE(kp);
2212                     return rc;
2213                 }
2214                 PMIX_RELEASE(kp);  // maintain acctg
2215             }
2216             return PMIX_SUCCESS;
2217         }
2218         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->internal, proc->rank, kv))) {
2219             PMIX_ERROR_LOG(rc);
2220             return rc;
2221         }
2222     } else if (PMIX_REMOTE == scope) {
2223         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
2224             PMIX_ERROR_LOG(rc);
2225             return rc;
2226         }
2227     } else if (PMIX_LOCAL == scope) {
2228         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kv))) {
2229             PMIX_ERROR_LOG(rc);
2230             return rc;
2231         }
2232     } else if (PMIX_GLOBAL == scope) {
2233         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
2234             PMIX_ERROR_LOG(rc);
2235             return rc;
2236         }
2237         /* a pmix_kval_t can only be on one list at a time, so we
2238          * have to duplicate it here */
2239         kp = PMIX_NEW(pmix_kval_t);
2240         if (NULL == kp) {
2241             return PMIX_ERR_NOMEM;
2242         }
2243         kp->key = strdup(kv->key);
2244         kp->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2245         if (NULL == kp->value) {
2246             PMIX_RELEASE(kp);
2247             return PMIX_ERR_NOMEM;
2248         }
2249         PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer, kp->value, kv->value);
2250         if (PMIX_SUCCESS != rc) {
2251             PMIX_ERROR_LOG(rc);
2252             PMIX_RELEASE(kp);
2253             return rc;
2254         }
2255         if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->local, proc->rank, kp))) {
2256             PMIX_ERROR_LOG(rc);
2257             PMIX_RELEASE(kp);
2258             return rc;
2259         }
2260         PMIX_RELEASE(kp);  // maintain accounting
2261     } else {
2262         return PMIX_ERR_BAD_PARAM;
2263     }
2264 
2265     return PMIX_SUCCESS;
2266 }
2267 
2268 /* this function is only called by the PMIx server when its
2269  * host has received data from some other peer. It therefore
2270  * always contains data solely from remote procs, and we
2271  * shall store it accordingly */
hash_store_modex(struct pmix_namespace_t * nspace,pmix_buffer_t * buf,void * cbdata)2272 static pmix_status_t hash_store_modex(struct pmix_namespace_t *nspace,
2273                                       pmix_buffer_t *buf,
2274                                       void *cbdata) {
2275     return pmix_gds_base_store_modex(nspace, buf, NULL,
2276                                      _hash_store_modex, cbdata);
2277 }
2278 
_hash_store_modex(pmix_gds_base_ctx_t ctx,pmix_proc_t * proc,pmix_gds_modex_key_fmt_t key_fmt,char ** kmap,pmix_buffer_t * pbkt)2279 static pmix_status_t _hash_store_modex(pmix_gds_base_ctx_t ctx,
2280                                        pmix_proc_t *proc,
2281                                        pmix_gds_modex_key_fmt_t key_fmt,
2282                                        char **kmap,
2283                                        pmix_buffer_t *pbkt)
2284 {
2285     pmix_job_t *trk;
2286     pmix_status_t rc = PMIX_SUCCESS;
2287     pmix_kval_t *kv;
2288 
2289     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2290                         "[%s:%d] gds:hash:store_modex for nspace %s",
2291                         pmix_globals.myid.nspace, pmix_globals.myid.rank,
2292                         proc->nspace);
2293 
2294     /* find the hash table for this nspace */
2295     trk = get_tracker(proc->nspace, true);
2296     if (NULL == trk) {
2297         return PMIX_ERR_NOMEM;
2298     }
2299 
2300     /* this is data returned via the PMIx_Fence call when
2301      * data collection was requested, so it only contains
2302      * REMOTE/GLOBAL data. The byte object contains
2303      * the rank followed by pmix_kval_t's. The list of callbacks
2304      * contains all local participants. */
2305 
2306     /* unpack the remaining values until we hit the end of the buffer */
2307     kv = PMIX_NEW(pmix_kval_t);
2308     rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2309 
2310     while (PMIX_SUCCESS == rc) {
2311         if (PMIX_RANK_UNDEF == proc->rank) {
2312             /* if the rank is undefined, then we store it on the
2313              * remote table of rank=0 as we know that rank must
2314              * always exist */
2315             if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, 0, kv))) {
2316                 PMIX_ERROR_LOG(rc);
2317                 return rc;
2318             }
2319         } else {
2320             /* store this in the hash table */
2321             if (PMIX_SUCCESS != (rc = pmix_hash_store(&trk->remote, proc->rank, kv))) {
2322                 PMIX_ERROR_LOG(rc);
2323                 return rc;
2324             }
2325         }
2326         PMIX_RELEASE(kv);  // maintain accounting as the hash increments the ref count
2327         /* continue along */
2328         kv = PMIX_NEW(pmix_kval_t);
2329         rc = pmix_gds_base_modex_unpack_kval(key_fmt, pbkt, kmap, kv);
2330     }
2331     PMIX_RELEASE(kv);  // maintain accounting
2332     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
2333         PMIX_ERROR_LOG(rc);
2334     } else {
2335         rc = PMIX_SUCCESS;
2336     }
2337     return rc;
2338 }
2339 
2340 
dohash(pmix_hash_table_t * ht,const char * key,pmix_rank_t rank,int skip_genvals,pmix_list_t * kvs)2341 static pmix_status_t dohash(pmix_hash_table_t *ht,
2342                             const char *key,
2343                             pmix_rank_t rank,
2344                             int skip_genvals,
2345                             pmix_list_t *kvs)
2346 {
2347     pmix_status_t rc;
2348     pmix_value_t *val;
2349     pmix_kval_t *kv, *k2;
2350     pmix_info_t *info;
2351     size_t n, ninfo;
2352     bool found;
2353 
2354     rc = pmix_hash_fetch(ht, rank, key, &val);
2355     if (PMIX_SUCCESS == rc) {
2356         /* if the key was NULL, then all found keys will be
2357          * returned as a pmix_data_array_t in the value */
2358         if (NULL == key) {
2359             if (NULL == val->data.darray ||
2360                 PMIX_INFO != val->data.darray->type ||
2361                 0 == val->data.darray->size) {
2362                 PMIX_ERROR_LOG(PMIX_ERR_NOT_FOUND);
2363                 PMIX_RELEASE(val);
2364                 return PMIX_ERR_NOT_FOUND;
2365             }
2366             /* if they want the value returned in its array form,
2367              * then we are done */
2368             if (2 == skip_genvals) {
2369                 kv = PMIX_NEW(pmix_kval_t);
2370                 if (NULL == kv) {
2371                     PMIX_VALUE_RELEASE(val);
2372                     return PMIX_ERR_NOMEM;
2373                 }
2374                 kv->value = val;
2375                 pmix_list_append(kvs, &kv->super);
2376                 return PMIX_SUCCESS;
2377             }
2378             info = (pmix_info_t*)val->data.darray->array;
2379             ninfo = val->data.darray->size;
2380             for (n=0; n < ninfo; n++) {
2381                 /* if the rank is UNDEF, then we don't want
2382                  * anything that starts with "pmix" */
2383                 if (1 == skip_genvals &&
2384                     0 == strncmp(info[n].key, "pmix", 4)) {
2385                     continue;
2386                 }
2387                 /* see if we already have this on the list */
2388                 found = false;
2389                 PMIX_LIST_FOREACH(k2, kvs, pmix_kval_t) {
2390                     if (PMIX_CHECK_KEY(&info[n], k2->key)) {
2391                         found = true;
2392                         break;
2393                     }
2394                 }
2395                 if (found) {
2396                     continue;
2397                 }
2398                 kv = PMIX_NEW(pmix_kval_t);
2399                 if (NULL == kv) {
2400                     PMIX_VALUE_RELEASE(val);
2401                     return PMIX_ERR_NOMEM;
2402                 }
2403                 kv->key = strdup(info[n].key);
2404                 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2405                 if (NULL == kv->value) {
2406                     PMIX_VALUE_RELEASE(val);
2407                     PMIX_RELEASE(kv);
2408                     return PMIX_ERR_NOMEM;
2409                 }
2410                 PMIX_BFROPS_VALUE_XFER(rc, pmix_globals.mypeer,
2411                                        kv->value, &info[n].value);
2412                 if (PMIX_SUCCESS != rc) {
2413                     PMIX_ERROR_LOG(rc);
2414                     PMIX_VALUE_RELEASE(val);
2415                     PMIX_RELEASE(kv);
2416                     return rc;
2417                 }
2418                 pmix_list_append(kvs, &kv->super);
2419             }
2420             PMIX_VALUE_RELEASE(val);
2421         } else {
2422             kv = PMIX_NEW(pmix_kval_t);
2423             if (NULL == kv) {
2424                 PMIX_VALUE_RELEASE(val);
2425                 return PMIX_ERR_NOMEM;
2426             }
2427             kv->key = strdup(key);
2428             kv->value = val;
2429             pmix_list_append(kvs, &kv->super);
2430         }
2431     }
2432     return rc;
2433 }
2434 
fetch_nodeinfo(const char * key,pmix_list_t * tgt,pmix_info_t * info,size_t ninfo,pmix_list_t * kvs)2435 static pmix_status_t fetch_nodeinfo(const char *key, pmix_list_t *tgt,
2436                                     pmix_info_t *info, size_t ninfo,
2437                                     pmix_list_t *kvs)
2438 {
2439     size_t n, nds;
2440     pmix_status_t rc;
2441     uint32_t nid=0;
2442     char *hostname = NULL;
2443     bool found = false;
2444     pmix_nodeinfo_t *nd, *ndptr;
2445     pmix_kval_t *kv, *kp2;
2446     pmix_data_array_t *darray;
2447     pmix_info_t *iptr;
2448 
2449     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2450                         "FETCHING NODE INFO");
2451 
2452     /* scan for the nodeID or hostname to identify
2453      * which node they are asking about */
2454     for (n=0; n < ninfo; n++) {
2455         if (PMIX_CHECK_KEY(&info[n], PMIX_NODEID)) {
2456             PMIX_VALUE_GET_NUMBER(rc, &info[n].value, nid, uint32_t);
2457             if (PMIX_SUCCESS != rc) {
2458                 return rc;
2459             }
2460             found = true;
2461             break;
2462         } else if (PMIX_CHECK_KEY(&info[n], PMIX_HOSTNAME)) {
2463             hostname = info[n].value.data.string;
2464             found = true;
2465             break;
2466         }
2467     }
2468     if (!found) {
2469         /* if the key is NULL, then they want all the info from
2470          * all nodes */
2471         if (NULL == key) {
2472      		PMIX_LIST_FOREACH(nd, tgt, pmix_nodeinfo_t) {
2473      			kv = PMIX_NEW(pmix_kval_t);
2474      			kv->key = strdup(PMIX_NODE_INFO_ARRAY);
2475      			kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2476      			if (NULL == kv->value) {
2477      				PMIX_RELEASE(kv);
2478      				return PMIX_ERR_NOMEM;
2479      			}
2480      			nds = pmix_list_get_size(&nd->info);
2481      			if (NULL != nd->hostname) {
2482      				++nds;
2483      			}
2484      			if (UINT32_MAX != nd->nodeid) {
2485      				++nds;
2486      			}
2487      			PMIX_DATA_ARRAY_CREATE(darray, nds, PMIX_INFO);
2488      			if (NULL == darray) {
2489      				PMIX_RELEASE(kv);
2490      				return PMIX_ERR_NOMEM;
2491      			}
2492      			iptr = (pmix_info_t*)darray->array;
2493      			n = 0;
2494      			if (NULL != nd->hostname) {
2495      				PMIX_INFO_LOAD(&iptr[n], PMIX_HOSTNAME, nd->hostname, PMIX_STRING);
2496      				++n;
2497      			}
2498      			if (UINT32_MAX != nd->nodeid) {
2499      				PMIX_INFO_LOAD(&iptr[n], PMIX_NODEID, &nd->nodeid, PMIX_UINT32);
2500      				++n;
2501      			}
2502      			PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
2503                     pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
2504                                         "%s gds:hash:fetch_nodearray adding key %s",
2505                                         PMIX_NAME_PRINT(&pmix_globals.myid), kp2->key);
2506      				PMIX_LOAD_KEY(iptr[n].key, kp2->key);
2507      				rc = pmix_value_xfer(&iptr[n].value, kp2->value);
2508      				if (PMIX_SUCCESS != rc) {
2509      					PMIX_ERROR_LOG(rc);
2510      					PMIX_DATA_ARRAY_FREE(darray);
2511      					PMIX_RELEASE(kv);
2512      					return rc;
2513      				}
2514      				++n;
2515      			}
2516      			kv->value->data.darray = darray;
2517      			kv->value->type = PMIX_DATA_ARRAY;
2518      			pmix_list_append(kvs, &kv->super);
2519      		}
2520      		return PMIX_SUCCESS;
2521 
2522         }
2523         /* assume they want it from this node */
2524         hostname = pmix_globals.hostname;
2525     }
2526 
2527     /* scan the list of nodes to find the matching entry */
2528     nd = NULL;
2529     PMIX_LIST_FOREACH(ndptr, tgt, pmix_nodeinfo_t) {
2530         if (NULL != hostname) {
2531             if (check_nodename(ndptr, hostname)) {
2532                 nd = ndptr;
2533                 break;
2534             }
2535         } else if (nid == ndptr->nodeid) {
2536             nd = ndptr;
2537             break;
2538         }
2539     }
2540     if (NULL == nd) {
2541         if (!found) {
2542             /* they didn't specify, so it is optional */
2543             return PMIX_ERR_DATA_VALUE_NOT_FOUND;
2544         }
2545         return PMIX_ERR_NOT_FOUND;
2546     }
2547 
2548     /* if they want it all, give it to them */
2549     if (NULL == key) {
2550         kv = PMIX_NEW(pmix_kval_t);
2551         kv->key = strdup(PMIX_NODE_INFO_ARRAY);
2552         kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2553         if (NULL == kv->value) {
2554             PMIX_RELEASE(kv);
2555             return PMIX_ERR_NOMEM;
2556         }
2557         nds = pmix_list_get_size(&nd->info);
2558         if (NULL != nd->hostname) {
2559             ++nds;
2560         }
2561         if (UINT32_MAX != nd->nodeid) {
2562             ++nds;
2563         }
2564         PMIX_DATA_ARRAY_CREATE(darray, nds, PMIX_INFO);
2565         if (NULL == darray) {
2566             PMIX_RELEASE(kv);
2567             return PMIX_ERR_NOMEM;
2568         }
2569         iptr = (pmix_info_t*)darray->array;
2570         n = 0;
2571         if (NULL != nd->hostname) {
2572             PMIX_INFO_LOAD(&iptr[n], PMIX_HOSTNAME, nd->hostname, PMIX_STRING);
2573             ++n;
2574         }
2575         if (UINT32_MAX != nd->nodeid) {
2576             PMIX_INFO_LOAD(&iptr[n], PMIX_NODEID, &nd->nodeid, PMIX_UINT32);
2577             ++n;
2578         }
2579         PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
2580             pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
2581                                 "%s gds:hash:fetch_nodearray adding key %s",
2582                                 PMIX_NAME_PRINT(&pmix_globals.myid), kp2->key);
2583             PMIX_LOAD_KEY(iptr[n].key, kp2->key);
2584             rc = pmix_value_xfer(&iptr[n].value, kp2->value);
2585             if (PMIX_SUCCESS != rc) {
2586                 PMIX_ERROR_LOG(rc);
2587                 PMIX_DATA_ARRAY_FREE(darray);
2588                 PMIX_RELEASE(kv);
2589                 return rc;
2590             }
2591             ++n;
2592         }
2593         kv->value->data.darray = darray;
2594         kv->value->type = PMIX_DATA_ARRAY;
2595         pmix_list_append(kvs, &kv->super);
2596         return PMIX_SUCCESS;
2597     }
2598 
2599     /* scan the info list of this node to find the key they want */
2600     rc = PMIX_ERR_NOT_FOUND;
2601     PMIX_LIST_FOREACH(kp2, &nd->info, pmix_kval_t) {
2602         if (PMIX_CHECK_KEY(kp2, key)) {
2603             pmix_output_verbose(12, pmix_gds_base_framework.framework_output,
2604                                 "%s gds:hash:fetch_nodearray adding key %s",
2605                                 PMIX_NAME_PRINT(&pmix_globals.myid), kp2->key);
2606             /* since they only asked for one key, return just that value */
2607             kv = PMIX_NEW(pmix_kval_t);
2608             kv->key = strdup(kp2->key);
2609             kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2610             if (NULL == kv->value) {
2611                 PMIX_RELEASE(kv);
2612                 return PMIX_ERR_NOMEM;
2613             }
2614             rc = pmix_value_xfer(kv->value, kp2->value);
2615             if (PMIX_SUCCESS != rc) {
2616                 PMIX_ERROR_LOG(rc);
2617                 PMIX_RELEASE(kv);
2618                 return rc;
2619             }
2620             pmix_list_append(kvs, &kv->super);
2621             break;
2622         }
2623     }
2624     return rc;
2625 }
2626 
fetch_appinfo(const char * key,pmix_list_t * tgt,pmix_info_t * info,size_t ninfo,pmix_list_t * kvs)2627 static pmix_status_t fetch_appinfo(const char *key, pmix_list_t *tgt,
2628                                    pmix_info_t *info, size_t ninfo,
2629                                    pmix_list_t *kvs)
2630 {
2631     size_t n, nds;
2632     pmix_status_t rc;
2633     uint32_t appnum;
2634     bool found = false;
2635     pmix_apptrkr_t *app, *apptr;
2636     pmix_kval_t *kv, *kp2;
2637     pmix_data_array_t *darray;
2638 
2639     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2640                         "FETCHING APP INFO WITH %d APPS",
2641                         (int)pmix_list_get_size(tgt));
2642 
2643     /* scan for the appnum to identify
2644      * which app they are asking about */
2645     for (n=0; n < ninfo; n++) {
2646         if (PMIX_CHECK_KEY(&info[n], PMIX_APPNUM)) {
2647             PMIX_VALUE_GET_NUMBER(rc, &info[n].value, appnum, uint32_t);
2648             if (PMIX_SUCCESS != rc) {
2649                 return rc;
2650             }
2651             found = true;
2652             break;
2653         }
2654     }
2655     if (!found) {
2656         /* if the key is NULL, then they want all the info from
2657          * all apps */
2658         if (NULL == key) {
2659     		PMIX_LIST_FOREACH(apptr, tgt, pmix_apptrkr_t) {
2660      			kv = PMIX_NEW(pmix_kval_t);
2661      			kv->key = strdup(PMIX_APP_INFO_ARRAY);
2662      			kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2663      			if (NULL == kv->value) {
2664      				PMIX_RELEASE(kv);
2665      				return PMIX_ERR_NOMEM;
2666      			}
2667      			nds = pmix_list_get_size(&apptr->appinfo) + 1;
2668      			PMIX_DATA_ARRAY_CREATE(darray, nds, PMIX_INFO);
2669      			if (NULL == darray) {
2670      				PMIX_RELEASE(kv);
2671      				return PMIX_ERR_NOMEM;
2672      			}
2673      			info = (pmix_info_t*)darray->array;
2674      			n = 0;
2675      			/* put in the appnum */
2676      			PMIX_INFO_LOAD(&info[n], PMIX_APPNUM, &apptr->appnum, PMIX_UINT32);
2677      			++n;
2678      			PMIX_LIST_FOREACH(kp2, &apptr->appinfo, pmix_kval_t) {
2679      				PMIX_LOAD_KEY(info[n].key, kp2->key);
2680      				rc = pmix_value_xfer(&info[n].value, kp2->value);
2681      				if (PMIX_SUCCESS != rc) {
2682      					PMIX_ERROR_LOG(rc);
2683      					PMIX_DATA_ARRAY_FREE(darray);
2684      					PMIX_RELEASE(kv);
2685      					return rc;
2686      				}
2687      				++n;
2688      			}
2689      			kv->value->data.darray = darray;
2690      			kv->value->type = PMIX_DATA_ARRAY;
2691      			pmix_list_append(kvs, &kv->super);
2692      		}
2693      		return PMIX_SUCCESS;
2694         }
2695         /* assume they are asking for our app */
2696         appnum = pmix_globals.appnum;
2697     }
2698 
2699     /* scan the list of apps to find the matching entry */
2700     app = NULL;
2701     PMIX_LIST_FOREACH(apptr, tgt, pmix_apptrkr_t) {
2702         if (appnum == apptr->appnum) {
2703             app = apptr;
2704             break;
2705         }
2706     }
2707     if (NULL == app) {
2708         return PMIX_ERR_NOT_FOUND;
2709     }
2710 
2711     /* see if they wanted to know something about a node that
2712      * is associated with this app */
2713     rc = fetch_nodeinfo(key, &app->nodeinfo, info, ninfo, kvs);
2714     if (PMIX_ERR_DATA_VALUE_NOT_FOUND != rc) {
2715         return rc;
2716     }
2717 
2718     /* scan the info list of this app to generate the results */
2719     rc = PMIX_ERR_NOT_FOUND;
2720     PMIX_LIST_FOREACH(kv, &app->appinfo, pmix_kval_t) {
2721         if (NULL == key || PMIX_CHECK_KEY(kv, key)) {
2722             kp2 = PMIX_NEW(pmix_kval_t);
2723             kp2->key = strdup(kv->key);
2724             kp2->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2725             rc = pmix_value_xfer(kp2->value, kv->value);
2726             if (PMIX_SUCCESS != rc) {
2727                 PMIX_ERROR_LOG(rc);
2728                 PMIX_RELEASE(kp2);
2729                 return rc;
2730             }
2731             pmix_list_append(kvs, &kp2->super);
2732             rc = PMIX_SUCCESS;
2733             if (NULL != key) {
2734                 break;
2735             }
2736         }
2737     }
2738 
2739     return rc;
2740 }
2741 
hash_fetch(const pmix_proc_t * proc,pmix_scope_t scope,bool copy,const char * key,pmix_info_t qualifiers[],size_t nqual,pmix_list_t * kvs)2742 static pmix_status_t hash_fetch(const pmix_proc_t *proc,
2743                                 pmix_scope_t scope, bool copy,
2744                                 const char *key,
2745                                 pmix_info_t qualifiers[], size_t nqual,
2746                                 pmix_list_t *kvs)
2747 {
2748     pmix_job_t *trk;
2749     pmix_status_t rc;
2750     pmix_kval_t *kv, *kvptr;
2751     pmix_info_t *info, *iptr;
2752     size_t m, n, ninfo, niptr;
2753     pmix_hash_table_t *ht;
2754     pmix_session_t *sptr;
2755     uint32_t sid;
2756     pmix_rank_t rnk;
2757     pmix_list_t rkvs;
2758     bool nodeinfo = false;
2759     bool appinfo = false;
2760 
2761     pmix_output_verbose(2, pmix_gds_base_framework.framework_output,
2762                         "%s pmix:gds:hash fetch %s for proc %s on scope %s",
2763                         PMIX_NAME_PRINT(&pmix_globals.myid),
2764                         (NULL == key) ? "NULL" : key,
2765                         PMIX_NAME_PRINT(proc), PMIx_Scope_string(scope));
2766 
2767     /* if the rank is wildcard and the key is NULL, then
2768      * they are asking for a complete copy of the job-level
2769      * info for this nspace - retrieve it */
2770     if (NULL == key && PMIX_RANK_WILDCARD == proc->rank) {
2771         /* see if we have a tracker for this nspace - we will
2772          * if we already cached the job info for it */
2773         trk = get_tracker(proc->nspace, false);
2774         if (NULL == trk) {
2775             /* let the caller know */
2776             return PMIX_ERR_INVALID_NAMESPACE;
2777         }
2778         /* fetch all values from the hash table tied to rank=wildcard */
2779         dohash(&trk->internal, NULL, PMIX_RANK_WILDCARD, 0, kvs);
2780         /* also need to add any job-level info */
2781         PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
2782             kv = PMIX_NEW(pmix_kval_t);
2783             kv->key = strdup(kvptr->key);
2784             kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2785             PMIX_VALUE_XFER(rc, kv->value, kvptr->value);
2786             if (PMIX_SUCCESS != rc) {
2787                 PMIX_RELEASE(kv);
2788                 return rc;
2789             }
2790             pmix_list_append(kvs, &kv->super);
2791         }
2792         /* collect the relevant node-level info */
2793         rc = fetch_nodeinfo(NULL, &trk->nodeinfo, qualifiers, nqual, kvs);
2794         if (PMIX_SUCCESS != rc) {
2795             return rc;
2796         }
2797         /* collect the relevant app-level info */
2798         rc = fetch_appinfo(NULL, &trk->apps, qualifiers, nqual, kvs);
2799         if (PMIX_SUCCESS != rc) {
2800             return rc;
2801         }
2802         /* finally, we need the job-level info for each rank in the job */
2803         for (rnk=0; rnk < trk->nptr->nprocs; rnk++) {
2804             PMIX_CONSTRUCT(&rkvs, pmix_list_t);
2805             rc = dohash(&trk->internal, NULL, rnk, 2, &rkvs);
2806             if (PMIX_ERR_NOMEM == rc) {
2807                 return rc;
2808             }
2809             if (0 == pmix_list_get_size(&rkvs)) {
2810                 PMIX_DESTRUCT(&rkvs);
2811                 continue;
2812             }
2813             /* should only have one entry on list */
2814             kvptr = (pmix_kval_t*)pmix_list_get_first(&rkvs);
2815             /* we have to assemble the results into a proc blob
2816              * so the remote end will know what to do with it */
2817             info = (pmix_info_t*)kvptr->value->data.darray->array;
2818             ninfo = kvptr->value->data.darray->size;
2819             /* setup to return the result */
2820             kv = PMIX_NEW(pmix_kval_t);
2821             kv->key = strdup(PMIX_PROC_DATA);
2822             kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2823             kv->value->type = PMIX_DATA_ARRAY;
2824             niptr = ninfo + 1;  // need space for the rank
2825             PMIX_DATA_ARRAY_CREATE(kv->value->data.darray, niptr, PMIX_INFO);
2826             iptr = (pmix_info_t*)kv->value->data.darray->array;
2827             /* start with the rank */
2828             PMIX_INFO_LOAD(&iptr[0], PMIX_RANK, &rnk, PMIX_PROC_RANK);
2829             /* now transfer rest of data across */
2830             for (n=0; n < ninfo; n++) {
2831                 PMIX_INFO_XFER(&iptr[n+1], &info[n]);
2832             }
2833             /* add to the results */
2834             pmix_list_append(kvs, &kv->super);
2835             /* release the search result */
2836             PMIX_LIST_DESTRUCT(&rkvs);
2837         }
2838         return PMIX_SUCCESS;
2839     }
2840 
2841     /* see if they are asking for session, node, or app-level info */
2842     for (n=0; n < nqual; n++) {
2843         if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_SESSION_INFO)) {
2844             /* they must have provided a session ID */
2845             for (m=0; m < nqual; m++) {
2846                 if (PMIX_CHECK_KEY(&qualifiers[m], PMIX_SESSION_ID)) {
2847                     /* see if we have this session */
2848                     PMIX_VALUE_GET_NUMBER(rc, &qualifiers[m].value, sid, uint32_t);
2849                     if (PMIX_SUCCESS != rc) {
2850                         /* didn't provide a correct value */
2851                         PMIX_ERROR_LOG(rc);
2852                         return rc;
2853                     }
2854                     PMIX_LIST_FOREACH(sptr, &mysessions, pmix_session_t) {
2855                         if (sptr->session == sid) {
2856                             /* see if they want info for a specific node */
2857                             rc = fetch_nodeinfo(key, &sptr->nodeinfo, qualifiers, nqual, kvs);
2858                             /* if they did, then we are done */
2859                             if (PMIX_ERR_DATA_VALUE_NOT_FOUND != rc) {
2860                                 return rc;
2861                             }
2862                             /* check the session info */
2863                             PMIX_LIST_FOREACH(kvptr, &sptr->sessioninfo, pmix_kval_t) {
2864                                 if (NULL == key || PMIX_CHECK_KEY(kvptr, key)) {
2865                                     kv = PMIX_NEW(pmix_kval_t);
2866                                     kv->key = strdup(kvptr->key);
2867                                     kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2868                                     PMIX_VALUE_XFER(rc, kv->value, kvptr->value);
2869                                     if (PMIX_SUCCESS != rc) {
2870                                         PMIX_RELEASE(kv);
2871                                         return rc;
2872                                     }
2873                                     pmix_list_append(kvs, &kv->super);
2874                                     if (NULL != key) {
2875                                         /* we are done */
2876                                         return PMIX_SUCCESS;
2877                                     }
2878                                 }
2879                             }
2880                         }
2881                     }
2882                 }
2883             }
2884             /* if we get here, then the session wasn't found */
2885             return PMIX_ERR_NOT_FOUND;
2886         } else if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_NODE_INFO)) {
2887             nodeinfo = PMIX_INFO_TRUE(&qualifiers[n]);
2888         } else if (PMIX_CHECK_KEY(&qualifiers[n], PMIX_APP_INFO)) {
2889             appinfo = PMIX_INFO_TRUE(&qualifiers[n]);
2890         }
2891     }
2892 
2893     /* check for node/app keys in the absence of corresponding qualifier */
2894     if (NULL != key) {
2895         if (pmix_check_node_info(key)) {
2896             nodeinfo = true;
2897         } else if (pmix_check_app_info(key)) {
2898             appinfo = true;
2899         }
2900     }
2901 
2902     /* find the hash table for this nspace */
2903     trk = get_tracker(proc->nspace, false);
2904     if (NULL == trk) {
2905         return PMIX_ERR_INVALID_NAMESPACE;
2906     }
2907 
2908     if (nodeinfo) {
2909         rc = fetch_nodeinfo(key, &trk->nodeinfo, qualifiers, nqual, kvs);
2910         if (PMIX_SUCCESS != rc && PMIX_RANK_WILDCARD == proc->rank) {
2911             /* need to check internal as we might have an older peer */
2912             ht = &trk->internal;
2913             goto doover;
2914         }
2915         return rc;
2916     } else if (appinfo) {
2917         rc = fetch_appinfo(key, &trk->apps, qualifiers, nqual, kvs);
2918         if (PMIX_SUCCESS != rc && PMIX_RANK_WILDCARD == proc->rank) {
2919             /* need to check internal as we might have an older peer */
2920             ht = &trk->internal;
2921             goto doover;
2922         }
2923         return rc;
2924     }
2925 
2926     /* fetch from the corresponding hash table - note that
2927      * we always provide a copy as we don't support
2928      * shared memory */
2929     if (PMIX_INTERNAL == scope ||
2930         PMIX_SCOPE_UNDEF == scope ||
2931         PMIX_GLOBAL == scope ||
2932         PMIX_RANK_WILDCARD == proc->rank) {
2933         ht = &trk->internal;
2934     } else if (PMIX_LOCAL == scope ||
2935                PMIX_GLOBAL == scope) {
2936         ht = &trk->local;
2937     } else if (PMIX_REMOTE == scope) {
2938         ht = &trk->remote;
2939     } else {
2940         PMIX_ERROR_LOG(PMIX_ERR_BAD_PARAM);
2941         return PMIX_ERR_BAD_PARAM;
2942     }
2943 
2944   doover:
2945     /* if rank=PMIX_RANK_UNDEF, then we need to search all
2946      * known ranks for this nspace as any one of them could
2947      * be the source */
2948     if (PMIX_RANK_UNDEF == proc->rank) {
2949         for (rnk=0; rnk < trk->nptr->nprocs; rnk++) {
2950             rc = dohash(ht, key, rnk, true, kvs);
2951             if (PMIX_ERR_NOMEM == rc) {
2952                 return rc;
2953             }
2954             if (PMIX_SUCCESS == rc && NULL != key) {
2955                 return rc;
2956             }
2957         }
2958         /* also need to check any job-level info */
2959         PMIX_LIST_FOREACH(kvptr, &trk->jobinfo, pmix_kval_t) {
2960             if (NULL == key || PMIX_CHECK_KEY(kvptr, key)) {
2961                 kv = PMIX_NEW(pmix_kval_t);
2962                 kv->key = strdup(kvptr->key);
2963                 kv->value = (pmix_value_t*)malloc(sizeof(pmix_value_t));
2964                 PMIX_VALUE_XFER(rc, kv->value, kvptr->value);
2965                 if (PMIX_SUCCESS != rc) {
2966                     PMIX_RELEASE(kv);
2967                     return rc;
2968                 }
2969                 pmix_list_append(kvs, &kv->super);
2970                 if (NULL != key) {
2971                     break;
2972                 }
2973             }
2974         }
2975         if (NULL == key) {
2976             /* and need to add all job info just in case that was
2977              * passed via a different GDS component */
2978             rc = dohash(&trk->internal, NULL, PMIX_RANK_WILDCARD, false, kvs);
2979         } else {
2980             rc = PMIX_ERR_NOT_FOUND;
2981         }
2982     } else {
2983         rc = dohash(ht, key, proc->rank, false, kvs);
2984     }
2985     if (PMIX_SUCCESS == rc) {
2986         if (PMIX_GLOBAL == scope) {
2987             if (ht == &trk->local) {
2988                 /* need to do this again for the remote data */
2989                 ht = &trk->remote;
2990                 goto doover;
2991             } else if (ht == &trk->internal) {
2992                 /* check local */
2993                 ht = &trk->local;
2994                 goto doover;
2995             }
2996         }
2997     } else {
2998         if (PMIX_GLOBAL == scope ||
2999             PMIX_SCOPE_UNDEF == scope) {
3000             if (ht == &trk->internal) {
3001                 /* need to also try the local data */
3002                 ht = &trk->local;
3003                 goto doover;
3004             } else if (ht == &trk->local) {
3005                 /* need to also try the remote data */
3006                 ht = &trk->remote;
3007                 goto doover;
3008             }
3009         }
3010     }
3011     if (0 == pmix_list_get_size(kvs)) {
3012         rc = PMIX_ERR_NOT_FOUND;
3013     }
3014 
3015     return rc;
3016 }
3017 
setup_fork(const pmix_proc_t * proc,char *** env)3018 static pmix_status_t setup_fork(const pmix_proc_t *proc, char ***env)
3019 {
3020     /* we don't need to add anything */
3021     return PMIX_SUCCESS;
3022 }
3023 
nspace_add(const char * nspace,uint32_t nlocalprocs,pmix_info_t info[],size_t ninfo)3024 static pmix_status_t nspace_add(const char *nspace, uint32_t nlocalprocs,
3025                                 pmix_info_t info[], size_t ninfo)
3026 {
3027     /* we don't need to do anything here */
3028     return PMIX_SUCCESS;
3029 }
3030 
nspace_del(const char * nspace)3031 static pmix_status_t nspace_del(const char *nspace)
3032 {
3033     pmix_job_t *t;
3034 
3035     /* find the hash table for this nspace */
3036     PMIX_LIST_FOREACH(t, &myjobs, pmix_job_t) {
3037         if (0 == strcmp(nspace, t->ns)) {
3038             /* release it */
3039             pmix_list_remove_item(&myjobs, &t->super);
3040             PMIX_RELEASE(t);
3041             break;
3042         }
3043     }
3044     return PMIX_SUCCESS;
3045 }
3046 
assemb_kvs_req(const pmix_proc_t * proc,pmix_list_t * kvs,pmix_buffer_t * buf,void * cbdata)3047 static pmix_status_t assemb_kvs_req(const pmix_proc_t *proc,
3048                               pmix_list_t *kvs,
3049                               pmix_buffer_t *buf,
3050                               void *cbdata)
3051 {
3052     pmix_status_t rc = PMIX_SUCCESS;
3053     pmix_server_caddy_t *cd = (pmix_server_caddy_t*)cbdata;
3054     pmix_kval_t *kv;
3055 
3056     if (!PMIX_PEER_IS_V1(cd->peer)) {
3057         PMIX_BFROPS_PACK(rc, cd->peer, buf, proc, 1, PMIX_PROC);
3058         if (PMIX_SUCCESS != rc) {
3059             return rc;
3060         }
3061     }
3062     PMIX_LIST_FOREACH(kv, kvs, pmix_kval_t) {
3063         PMIX_BFROPS_PACK(rc, cd->peer, buf, kv, 1, PMIX_KVAL);
3064         if (PMIX_SUCCESS != rc) {
3065             return rc;
3066         }
3067     }
3068     return rc;
3069 }
3070 
store_session_info(pmix_nspace_t nspace,pmix_kval_t * kv)3071 static pmix_status_t store_session_info(pmix_nspace_t nspace,
3072                                         pmix_kval_t *kv)
3073 {
3074     pmix_job_t *trk;
3075     pmix_status_t rc;
3076 
3077     /* find the hash table for this nspace */
3078     trk = get_tracker(nspace, true);
3079     if (NULL == trk) {
3080         return PMIX_ERR_NOMEM;
3081     }
3082     rc = process_session_array(kv->value, trk);
3083     return rc;
3084 }
3085 
store_node_info(pmix_nspace_t nspace,pmix_kval_t * kv)3086 static pmix_status_t store_node_info(pmix_nspace_t nspace,
3087                                      pmix_kval_t *kv)
3088 {
3089     pmix_job_t *trk;
3090     pmix_status_t rc;
3091 
3092     /* find the hash table for this nspace */
3093     trk = get_tracker(nspace, true);
3094     if (NULL == trk) {
3095         return PMIX_ERR_NOMEM;
3096     }
3097     rc = process_node_array(kv->value, &trk->nodeinfo);
3098     return rc;
3099 }
3100 
store_app_info(pmix_nspace_t nspace,pmix_kval_t * kv)3101 static pmix_status_t store_app_info(pmix_nspace_t nspace,
3102                                      pmix_kval_t *kv)
3103 {
3104     pmix_job_t *trk;
3105     pmix_status_t rc;
3106 
3107     /* find the hash table for this nspace */
3108     trk = get_tracker(nspace, true);
3109     if (NULL == trk) {
3110         return PMIX_ERR_NOMEM;
3111     }
3112     rc = process_app_array(kv->value, trk);
3113     return rc;
3114 }
3115 
accept_kvs_resp(pmix_buffer_t * buf)3116 static pmix_status_t accept_kvs_resp(pmix_buffer_t *buf)
3117 {
3118     pmix_status_t rc = PMIX_SUCCESS;
3119     int32_t cnt;
3120     pmix_byte_object_t bo;
3121     pmix_buffer_t pbkt;
3122     pmix_kval_t *kv;
3123     pmix_proc_t proct;
3124 
3125     /* the incoming payload is provided as a set of packed
3126      * byte objects, one for each rank. A pmix_proc_t is the first
3127      * entry in the byte object. If the rank=PMIX_RANK_WILDCARD,
3128      * then that byte object contains job level info
3129      * for the provided nspace. Otherwise, the byte
3130      * object contains the pmix_kval_t's that were "put" by the
3131      * referenced process */
3132     cnt = 1;
3133     PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3134                        buf, &bo, &cnt, PMIX_BYTE_OBJECT);
3135     while (PMIX_SUCCESS == rc) {
3136         /* setup the byte object for unpacking */
3137         PMIX_CONSTRUCT(&pbkt, pmix_buffer_t);
3138         PMIX_LOAD_BUFFER(pmix_client_globals.myserver,
3139                          &pbkt, bo.bytes, bo.size);
3140         /* unpack the id of the providing process */
3141         cnt = 1;
3142         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3143                            &pbkt, &proct, &cnt, PMIX_PROC);
3144         if (PMIX_SUCCESS != rc) {
3145             PMIX_ERROR_LOG(rc);
3146             return rc;
3147         }
3148         /* if the rank is UNDEF, then we store this on our own
3149          * rank tables */
3150         if (PMIX_RANK_UNDEF == proct.rank) {
3151             proct.rank = pmix_globals.myid.rank;
3152         }
3153 
3154         cnt = 1;
3155         kv = PMIX_NEW(pmix_kval_t);
3156         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3157                            &pbkt, kv, &cnt, PMIX_KVAL);
3158         while (PMIX_SUCCESS == rc) {
3159             /* if this is an info array, then store it here as dstore
3160              * doesn't know how to handle it */
3161             if (PMIX_CHECK_KEY(kv, PMIX_SESSION_INFO_ARRAY)) {
3162                 rc = store_session_info(proct.nspace, kv);
3163             } else if (PMIX_CHECK_KEY(kv, PMIX_NODE_INFO_ARRAY)) {
3164                 rc = store_node_info(proct.nspace, kv);
3165             } else if (PMIX_CHECK_KEY(kv, PMIX_APP_INFO_ARRAY)) {
3166                 rc = store_app_info(proct.nspace, kv);
3167             } else {
3168                 rc = hash_store(&proct, PMIX_INTERNAL, kv);
3169             }
3170             if (PMIX_SUCCESS != rc) {
3171                 PMIX_ERROR_LOG(rc);
3172                 PMIX_RELEASE(kv);
3173                 PMIX_DESTRUCT(&pbkt);
3174                 return rc;
3175             }
3176             PMIX_RELEASE(kv);  // maintain accounting
3177             /* get the next one */
3178             kv = PMIX_NEW(pmix_kval_t);
3179             cnt = 1;
3180             PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3181                                &pbkt, kv, &cnt, PMIX_KVAL);
3182         }
3183         PMIX_RELEASE(kv);  // maintain accounting
3184         if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
3185             PMIX_ERROR_LOG(rc);
3186             PMIX_DESTRUCT(&pbkt);
3187             return rc;
3188         }
3189         PMIX_DESTRUCT(&pbkt);
3190         /* get the next one */
3191         cnt = 1;
3192         PMIX_BFROPS_UNPACK(rc, pmix_client_globals.myserver,
3193                            buf, &bo, &cnt, PMIX_BYTE_OBJECT);
3194     }
3195     if (PMIX_ERR_UNPACK_READ_PAST_END_OF_BUFFER != rc) {
3196         PMIX_ERROR_LOG(rc);
3197         return rc;
3198     }
3199     return rc;
3200 }
3201