1 /*
2  * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2011 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
13  *                         All rights reserved.
14  * Copyright (c) 2009-2012 Cisco Systems, Inc.  All rights reserved.
15  * Copyright (c) 2011      Oak Ridge National Labs.  All rights reserved.
16  * Copyright (c) 2013-2019 Intel, Inc.  All rights reserved.
17  * Copyright (c) 2015-2019 Research Organization for Information Science
18  *                         and Technology (RIST).  All rights reserved.
19  * Copyright (c) 2016      IBM Corporation.  All rights reserved.
20  * $COPYRIGHT$
21  *
22  * Additional copyrights may follow
23  *
24  * $HEADER$
25  *
26  */
27 
28 #include <src/include/pmix_config.h>
29 #include <pmix_server.h>
30 #include <src/include/types.h>
31 #include <src/include/pmix_globals.h>
32 
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <signal.h>
41 
42 #include "src/class/pmix_list.h"
43 #include "src/util/pmix_environ.h"
44 #include "src/util/output.h"
45 #include "src/util/printf.h"
46 #include "src/util/argv.h"
47 
48 #include "simptest.h"
49 
50 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
51                                pmix_op_cbfunc_t cbfunc, void *cbdata);
52 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
53                                pmix_op_cbfunc_t cbfunc, void *cbdata);
54 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
55                               int status, const char msg[],
56                               pmix_proc_t procs[], size_t nprocs,
57                               pmix_op_cbfunc_t cbfunc, void *cbdata);
58 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
59                                 const pmix_info_t info[], size_t ninfo,
60                                 char *data, size_t ndata,
61                                 pmix_modex_cbfunc_t cbfunc, void *cbdata);
62 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
63                                const pmix_info_t info[], size_t ninfo,
64                                pmix_modex_cbfunc_t cbfunc, void *cbdata);
65 static pmix_status_t publish_fn(const pmix_proc_t *proc,
66                                 const pmix_info_t info[], size_t ninfo,
67                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
68 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
69                                const pmix_info_t info[], size_t ninfo,
70                                pmix_lookup_cbfunc_t cbfunc, void *cbdata);
71 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
72                                   const pmix_info_t info[], size_t ninfo,
73                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
74 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
75                               const pmix_info_t job_info[], size_t ninfo,
76                               const pmix_app_t apps[], size_t napps,
77                               pmix_spawn_cbfunc_t cbfunc, void *cbdata);
78 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
79                                 const pmix_info_t info[], size_t ninfo,
80                                 pmix_op_cbfunc_t cbfunc, void *cbdata);
81 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
82                                    const pmix_info_t info[], size_t ninfo,
83                                    pmix_op_cbfunc_t cbfunc, void *cbdata);
84 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
85                                        const pmix_info_t info[], size_t ninfo,
86                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
87 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
88                                        pmix_op_cbfunc_t cbfunc, void *cbdata);
89 static pmix_status_t notify_event(pmix_status_t code,
90                                   const pmix_proc_t *source,
91                                   pmix_data_range_t range,
92                                   pmix_info_t info[], size_t ninfo,
93                                   pmix_op_cbfunc_t cbfunc, void *cbdata);
94 static pmix_status_t query_fn(pmix_proc_t *proct,
95                               pmix_query_t *queries, size_t nqueries,
96                               pmix_info_cbfunc_t cbfunc,
97                               void *cbdata);
98 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
99                             pmix_tool_connection_cbfunc_t cbfunc,
100                             void *cbdata);
101 static void log_fn(const pmix_proc_t *client,
102                    const pmix_info_t data[], size_t ndata,
103                    const pmix_info_t directives[], size_t ndirs,
104                    pmix_op_cbfunc_t cbfunc, void *cbdata);
105 static pmix_status_t alloc_fn(const pmix_proc_t *client,
106                               pmix_alloc_directive_t directive,
107                               const pmix_info_t data[], size_t ndata,
108                               pmix_info_cbfunc_t cbfunc, void *cbdata);
109 static pmix_status_t jctrl_fn(const pmix_proc_t *requestor,
110                               const pmix_proc_t targets[], size_t ntargets,
111                               const pmix_info_t directives[], size_t ndirs,
112                               pmix_info_cbfunc_t cbfunc, void *cbdata);
113 static pmix_status_t mon_fn(const pmix_proc_t *requestor,
114                             const pmix_info_t *monitor, pmix_status_t error,
115                             const pmix_info_t directives[], size_t ndirs,
116                             pmix_info_cbfunc_t cbfunc, void *cbdata);
117 
118 static pmix_server_module_t mymodule = {
119     .client_connected = connected,
120     .client_finalized = finalized,
121     .abort = abort_fn,
122     .fence_nb = fencenb_fn,
123     .direct_modex = dmodex_fn,
124     .publish = publish_fn,
125     .lookup = lookup_fn,
126     .unpublish = unpublish_fn,
127     .spawn = spawn_fn,
128     .connect = connect_fn,
129     .disconnect = disconnect_fn,
130     .register_events = register_event_fn,
131     .deregister_events = deregister_events,
132     .notify_event = notify_event,
133     .query = query_fn,
134     .tool_connected = tool_connect_fn,
135     .log = log_fn,
136     .allocate = alloc_fn,
137     .job_control = jctrl_fn,
138     .monitor = mon_fn
139 };
140 
141 typedef struct {
142     pmix_list_item_t super;
143     pmix_pdata_t pdata;
144 } pmix_locdat_t;
145 PMIX_CLASS_INSTANCE(pmix_locdat_t,
146                     pmix_list_item_t,
147                     NULL, NULL);
148 
149 typedef struct {
150     pmix_object_t super;
151     mylock_t lock;
152     pmix_event_t ev;
153     pmix_proc_t caller;
154     pmix_info_t *info;
155     size_t ninfo;
156     pmix_op_cbfunc_t cbfunc;
157     pmix_spawn_cbfunc_t spcbfunc;
158     pmix_release_cbfunc_t relcbfunc;
159     void *cbdata;
160 } myxfer_t;
xfcon(myxfer_t * p)161 static void xfcon(myxfer_t *p)
162 {
163     DEBUG_CONSTRUCT_LOCK(&p->lock);
164     p->info = NULL;
165     p->ninfo = 0;
166     p->cbfunc = NULL;
167     p->spcbfunc = NULL;
168     p->cbdata = NULL;
169 }
xfdes(myxfer_t * p)170 static void xfdes(myxfer_t *p)
171 {
172     DEBUG_DESTRUCT_LOCK(&p->lock);
173     if (NULL != p->info) {
174         PMIX_INFO_FREE(p->info, p->ninfo);
175     }
176 }
177 PMIX_CLASS_INSTANCE(myxfer_t,
178                     pmix_object_t,
179                     xfcon, xfdes);
180 
181 typedef struct {
182     pmix_list_item_t super;
183     int exit_code;
184     pid_t pid;
185 } wait_tracker_t;
186 PMIX_CLASS_INSTANCE(wait_tracker_t,
187                     pmix_list_item_t,
188                     NULL, NULL);
189 
190 static volatile int wakeup;
191 static int exit_code = 0;
192 static pmix_list_t pubdata;
193 static pmix_event_t handler;
194 static pmix_list_t children;
195 static bool istimeouttest = false;
196 static mylock_t globallock;
197 static bool arrays = false;
198 
199 static void set_namespace(int nprocs, char *ranks, char *nspace,
200                           pmix_op_cbfunc_t cbfunc, myxfer_t *x);
201 static void errhandler(size_t evhdlr_registration_id,
202                        pmix_status_t status,
203                        const pmix_proc_t *source,
204                        pmix_info_t info[], size_t ninfo,
205                        pmix_info_t results[], size_t nresults,
206                        pmix_event_notification_cbfunc_fn_t cbfunc,
207                        void *cbdata);
208 static void wait_signal_callback(int fd, short event, void *arg);
209 static void errhandler_reg_callbk (pmix_status_t status,
210                                    size_t errhandler_ref,
211                                    void *cbdata);
212 
opcbfunc(pmix_status_t status,void * cbdata)213 static void opcbfunc(pmix_status_t status, void *cbdata)
214 {
215     myxfer_t *x = (myxfer_t*)cbdata;
216 
217     /* release the caller, if necessary */
218     if (NULL != x->cbfunc) {
219         x->cbfunc(PMIX_SUCCESS, x->cbdata);
220     }
221     DEBUG_WAKEUP_THREAD(&x->lock);
222 }
223 
224 /* this is an event notification function that we explicitly request
225  * be called when the PMIX_MODEL_DECLARED notification is issued.
226  * We could catch it in the general event notification function and test
227  * the status to see if the status matched, but it often is simpler
228  * to declare a use-specific notification callback point. In this case,
229  * we are asking to know whenever a model is declared as a means
230  * of testing server self-notification */
model_callback(size_t evhdlr_registration_id,pmix_status_t status,const pmix_proc_t * source,pmix_info_t info[],size_t ninfo,pmix_info_t results[],size_t nresults,pmix_event_notification_cbfunc_fn_t cbfunc,void * cbdata)231 static void model_callback(size_t evhdlr_registration_id,
232                            pmix_status_t status,
233                            const pmix_proc_t *source,
234                            pmix_info_t info[], size_t ninfo,
235                            pmix_info_t results[], size_t nresults,
236                            pmix_event_notification_cbfunc_fn_t cbfunc,
237                            void *cbdata)
238 {
239     size_t n;
240 
241     /* just let us know it was received */
242     fprintf(stderr, "SIMPTEST: Model event handler called with status %d(%s)\n",
243             status, PMIx_Error_string(status));
244     for (n=0; n < ninfo; n++) {
245         if (PMIX_STRING == info[n].value.type) {
246             fprintf(stderr, "\t%s:\t%s\n", info[n].key, info[n].value.data.string);
247         }
248     }
249 
250     /* we must NOT tell the event handler state machine that we
251      * are the last step as that will prevent it from notifying
252      * anyone else that might be listening for declarations */
253     if (NULL != cbfunc) {
254         cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
255     }
256     DEBUG_WAKEUP_THREAD(&globallock);
257 }
258 
259 /* event handler registration is done asynchronously */
model_registration_callback(pmix_status_t status,size_t evhandler_ref,void * cbdata)260 static void model_registration_callback(pmix_status_t status,
261                                         size_t evhandler_ref,
262                                         void *cbdata)
263 {
264     mylock_t *lock = (mylock_t*)cbdata;
265 
266     if (PMIX_SUCCESS != status) {
267         fprintf(stderr, "simptest EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
268                    status, (unsigned long)evhandler_ref);
269     }
270     lock->status = status;
271     DEBUG_WAKEUP_THREAD(lock);
272 }
273 
set_handler_default(int sig)274 static void set_handler_default(int sig)
275 {
276     struct sigaction act;
277 
278     act.sa_handler = SIG_DFL;
279     act.sa_flags = 0;
280     sigemptyset(&act.sa_mask);
281 
282     sigaction(sig, &act, (struct sigaction *)0);
283 }
284 
main(int argc,char ** argv)285 int main(int argc, char **argv)
286 {
287     char **client_env=NULL;
288     char **client_argv=NULL;
289     char *tmp, **atmp, *executable=NULL;
290     int rc, nprocs=1, n, k;
291     uid_t myuid;
292     gid_t mygid;
293     pid_t pid;
294     myxfer_t *x;
295     pmix_proc_t proc;
296     wait_tracker_t *child;
297     pmix_info_t *info;
298     size_t ninfo;
299     bool cross_version = false;
300     bool usock = true;
301     mylock_t mylock;
302     pmix_status_t code;
303     sigset_t unblock;
304 
305     /* smoke test */
306     if (PMIX_SUCCESS != 0) {
307         fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
308         exit(1);
309     }
310 
311     /* see if we were passed the number of procs to run or
312      * the executable to use */
313     for (n=1; n < argc; n++) {
314         if (0 == strcmp("-n", argv[n]) &&
315             NULL != argv[n+1]) {
316             nprocs = strtol(argv[n+1], NULL, 10);
317             ++n;  // step over the argument
318         } else if (0 == strcmp("-e", argv[n]) &&
319                    NULL != argv[n+1]) {
320             executable = strdup(argv[n+1]);
321             /* check for timeout test */
322             if (NULL != strstr(executable, "simptimeout")) {
323                 istimeouttest = true;
324             }
325             for (k=n+2; NULL != argv[k]; k++) {
326                 pmix_argv_append_nosize(&client_argv, argv[k]);
327             }
328             n += k;
329         } else if (0 == strcmp("-x", argv[n])) {
330             /* cross-version test - we will set one child to
331              * run at a different version. Requires -n >= 2 */
332             cross_version = true;
333             usock = false;
334         } else if (0 == strcmp("-u", argv[n])) {
335             /* enable usock */
336             usock = false;
337         } else if (0 == strcmp("-h", argv[n])) {
338             /* print the options and exit */
339             fprintf(stderr, "usage: simptest <options>\n");
340             fprintf(stderr, "    -n N     Number of clients to run\n");
341             fprintf(stderr, "    -e foo   Name of the client executable to run (default: simpclient\n");
342             fprintf(stderr, "    -x       Test cross-version support\n");
343             fprintf(stderr, "    -u       Enable legacy usock support\n");
344             fprintf(stderr, "    -arrays  Use the job session array to pass registration info\n");
345             exit(0);
346         } else if (0 == strcmp("-arrays", argv[n]) ||
347                    0 == strcmp("--arrays", argv[n])) {
348             arrays = true;
349         }
350     }
351     if (NULL == executable) {
352         executable = strdup("./simpclient");
353     }
354     /* check for executable existence and permissions */
355     if (0 != access(executable, X_OK)) {
356         fprintf(stderr, "Executable %s not found or missing executable permissions\n", executable);
357         exit(1);
358     }
359 
360     if (cross_version && nprocs < 2) {
361         fprintf(stderr, "Cross-version testing requires at least two clients\n");
362         exit(1);
363     }
364 
365     fprintf(stderr, "Testing version %s\n", PMIx_Get_version());
366 
367     /* ensure that SIGCHLD is unblocked as we need to capture it */
368     if (0 != sigemptyset(&unblock)) {
369         fprintf(stderr, "SIGEMPTYSET FAILED\n");
370         exit(1);
371     }
372     if (0 != sigaddset(&unblock, SIGCHLD)) {
373         fprintf(stderr, "SIGADDSET FAILED\n");
374         exit(1);
375     }
376     if (0 != sigprocmask(SIG_UNBLOCK, &unblock, NULL)) {
377         fprintf(stderr, "SIG_UNBLOCK FAILED\n");
378         exit(1);
379     }
380 
381 
382     /* setup the server library and tell it to support tool connections */
383     ninfo = 1;
384 
385     PMIX_INFO_CREATE(info, ninfo);
386     PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
387     if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
388         fprintf(stderr, "Init failed with error %d\n", rc);
389         return rc;
390     }
391     PMIX_INFO_FREE(info, ninfo);
392 
393     /* register the default errhandler */
394     DEBUG_CONSTRUCT_LOCK(&mylock);
395     ninfo = 1;
396     PMIX_INFO_CREATE(info, ninfo);
397     PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-DEFAULT", PMIX_STRING);
398     PMIx_Register_event_handler(NULL, 0, info, ninfo,
399                                 errhandler, errhandler_reg_callbk, (void*)&mylock);
400     DEBUG_WAIT_THREAD(&mylock);
401     PMIX_INFO_FREE(info, ninfo);
402     if (PMIX_SUCCESS != mylock.status) {
403         exit(mylock.status);
404     }
405     DEBUG_DESTRUCT_LOCK(&mylock);
406 
407     /* register a handler specifically for when models declare */
408     DEBUG_CONSTRUCT_LOCK(&mylock);
409     ninfo = 1;
410     PMIX_INFO_CREATE(info, ninfo);
411     PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-MODEL", PMIX_STRING);
412     code = PMIX_MODEL_DECLARED;
413     PMIx_Register_event_handler(&code, 1, info, ninfo,
414                                 model_callback, model_registration_callback, (void*)&mylock);
415     DEBUG_WAIT_THREAD(&mylock);
416     PMIX_INFO_FREE(info, ninfo);
417     if (PMIX_SUCCESS != mylock.status) {
418         exit(mylock.status);
419     }
420     DEBUG_DESTRUCT_LOCK(&mylock);
421 
422     /* setup the pub data, in case it is used */
423     PMIX_CONSTRUCT(&pubdata, pmix_list_t);
424 
425     /* setup to see sigchld on the forked tests */
426     PMIX_CONSTRUCT(&children, pmix_list_t);
427     pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
428                       EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
429     pmix_event_add(&handler, NULL);
430 
431     /* we have a single namespace for all clients */
432     atmp = NULL;
433     for (n=0; n < nprocs; n++) {
434         asprintf(&tmp, "%d", n);
435         pmix_argv_append_nosize(&atmp, tmp);
436         free(tmp);
437     }
438     tmp = pmix_argv_join(atmp, ',');
439     pmix_argv_free(atmp);
440     x = PMIX_NEW(myxfer_t);
441     set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
442 
443     /* set common argv and env */
444     client_env = pmix_argv_copy(environ);
445     pmix_argv_prepend_nosize(&client_argv, executable);
446 
447     wakeup = nprocs;
448     myuid = getuid();
449     mygid = getgid();
450 
451     /* if the nspace registration hasn't completed yet,
452      * wait for it here */
453     DEBUG_WAIT_THREAD(&x->lock);
454     free(tmp);
455     PMIX_RELEASE(x);
456 
457     /* fork/exec the test */
458     (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
459     for (n = 0; n < nprocs; n++) {
460         proc.rank = n;
461         if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {//n
462             fprintf(stderr, "Server fork setup failed with error %d\n", rc);
463             PMIx_server_finalize();
464             return rc;
465         }
466         /* if cross-version test is requested, then oscillate PTL support
467          * by rank */
468         if (cross_version) {
469             if (0 == n % 2) {
470                 pmix_setenv("PMIX_MCA_ptl", "tcp", true, &client_env);
471             } else {
472                 pmix_setenv("PMIX_MCA_ptl", "usock", true, &client_env);
473             }
474         } else if (!usock) {
475             /* don't disable usock => enable it on client */
476             pmix_setenv("PMIX_MCA_ptl", "usock", true, &client_env);
477         }
478         x = PMIX_NEW(myxfer_t);
479         if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
480                                                               NULL, opcbfunc, x))) {
481             fprintf(stderr, "Server register client failed with error %d\n", rc);
482             PMIx_server_finalize();
483             return rc;
484         }
485         /* don't fork/exec the client until we know it is registered
486          * so we avoid a potential race condition in the server */
487         DEBUG_WAIT_THREAD(&x->lock);
488         PMIX_RELEASE(x);
489         pid = fork();
490         if (pid < 0) {
491             fprintf(stderr, "Fork failed\n");
492             PMIx_server_finalize();
493             return -1;
494         }
495         if (pid == 0) {
496             sigset_t sigs;
497             set_handler_default(SIGTERM);
498             set_handler_default(SIGINT);
499             set_handler_default(SIGHUP);
500             set_handler_default(SIGPIPE);
501             set_handler_default(SIGCHLD);
502             sigprocmask(0, 0, &sigs);
503             sigprocmask(SIG_UNBLOCK, &sigs, 0);
504             execve(executable, client_argv, client_env);
505             /* Does not return */
506             exit(0);
507         } else {
508             child = PMIX_NEW(wait_tracker_t);
509             child->pid = pid;
510             pmix_list_append(&children, &child->super);
511         }
512     }
513     pmix_argv_free(client_argv);
514     pmix_argv_free(client_env);
515 
516     /* hang around until the client(s) finalize */
517     while (0 < wakeup) {
518         struct timespec ts;
519         ts.tv_sec = 0;
520         ts.tv_nsec = 100000;
521         nanosleep(&ts, NULL);
522     }
523 
524     /* see if anyone exited with non-zero status unless the test
525      * was expected to do so */
526     if (NULL == strstr(executable, "simpdie")) {
527       n=0;
528       PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
529           if (0 != child->exit_code) {
530               fprintf(stderr, "Child %d [%d] exited with status %d - test FAILED\n", n, child->pid, child->exit_code);
531           }
532           ++n;
533       }
534     } else if (1 == exit_code) {
535       exit_code = 0;
536     }
537     free(executable);
538 
539     /* try notifying ourselves */
540     ninfo = 3;
541     PMIX_INFO_CREATE(info, ninfo);
542     PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "PMIX", PMIX_STRING);
543     PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "test", PMIX_STRING);
544     /* mark that it is not to go to any default handlers */
545     PMIX_INFO_LOAD(&info[2], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
546     DEBUG_CONSTRUCT_LOCK(&globallock);
547     PMIx_Notify_event(PMIX_MODEL_DECLARED,
548                       &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
549                       info, ninfo, NULL, NULL);
550     DEBUG_WAIT_THREAD(&globallock);
551     DEBUG_DESTRUCT_LOCK(&globallock);
552     PMIX_INFO_FREE(info, ninfo);
553 
554     /* deregister the event handlers */
555     PMIx_Deregister_event_handler(0, NULL, NULL);
556 
557     /* release any pub data */
558     PMIX_LIST_DESTRUCT(&pubdata);
559 
560     /* release the child tracker */
561     PMIX_LIST_DESTRUCT(&children);
562 
563     /* finalize the server library */
564     if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
565         fprintf(stderr, "Finalize failed with error %d\n", rc);
566         exit_code = rc;
567     }
568 
569     if (0 == exit_code) {
570         fprintf(stderr, "Test finished OK!\n");
571     } else {
572         fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
573     }
574 
575     return exit_code;
576 }
577 
set_namespace(int nprocs,char * ranks,char * nspace,pmix_op_cbfunc_t cbfunc,myxfer_t * x)578 static void set_namespace(int nprocs, char *ranks, char *nspace,
579                           pmix_op_cbfunc_t cbfunc, myxfer_t *x)
580 {
581     char *regex, *ppn;
582     int n, m, k;
583     pmix_data_array_t *array;
584     pmix_info_t *info, *iptr;
585 
586     if (arrays) {
587         x->ninfo = 15 + nprocs;
588     } else {
589         x->ninfo = 16 + nprocs;
590     }
591 
592     PMIX_INFO_CREATE(x->info, x->ninfo);
593     n = 0;
594 
595     PMIx_generate_regex("test000,test001,test002", &regex);
596     PMIx_generate_ppn("0;1;2", &ppn);
597 
598     if (arrays) {
599         (void)strncpy(x->info[n].key, PMIX_JOB_INFO_ARRAY, PMIX_MAX_KEYLEN);
600         x->info[n].value.type = PMIX_DATA_ARRAY;
601         PMIX_DATA_ARRAY_CREATE(x->info[n].value.data.darray, 2, PMIX_INFO);
602         iptr = (pmix_info_t*)x->info[n].value.data.darray->array;
603         (void)strncpy(iptr[0].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
604         iptr[0].value.type = PMIX_STRING;
605         iptr[0].value.data.string = regex;
606         (void)strncpy(iptr[1].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
607         iptr[1].value.type = PMIX_STRING;
608         iptr[1].value.data.string = ppn;
609         ++n;
610     } else {
611         (void)strncpy(x->info[n].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
612         x->info[n].value.type = PMIX_STRING;
613         x->info[n].value.data.string = regex;
614         ++n;
615 
616         /* if we have some empty nodes, then fill their spots */
617         (void)strncpy(x->info[n].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
618         x->info[n].value.type = PMIX_STRING;
619         x->info[n].value.data.string = ppn;
620         ++n;
621     }
622 
623     (void)strncpy(x->info[n].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
624     x->info[n].value.type = PMIX_UINT32;
625     x->info[n].value.data.uint32 = nprocs;
626     ++n;
627 
628     (void)strncpy(x->info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
629     x->info[n].value.type = PMIX_UINT32;
630     x->info[n].value.data.uint32 = 0;
631     ++n;
632 
633     (void)strncpy(x->info[n].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
634     x->info[n].value.type = PMIX_UINT32;
635     x->info[n].value.data.uint32 = nprocs;
636     ++n;
637 
638     (void)strncpy(x->info[n].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
639     x->info[n].value.type = PMIX_STRING;
640     x->info[n].value.data.string = strdup(ranks);
641     ++n;
642 
643     (void)strncpy(x->info[n].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
644     x->info[n].value.type = PMIX_UINT32;
645     x->info[n].value.data.uint32 = nprocs;
646     ++n;
647 
648     (void)strncpy(x->info[n].key, PMIX_JOBID, PMIX_MAX_KEYLEN);
649     x->info[n].value.type = PMIX_STRING;
650     x->info[n].value.data.string = strdup("1234");
651     ++n;
652 
653     (void)strncpy(x->info[n].key, PMIX_NPROC_OFFSET, PMIX_MAX_KEYLEN);
654     x->info[n].value.type = PMIX_UINT32;
655     x->info[n].value.data.uint32 = 0;
656     ++n;
657 
658     (void)strncpy(x->info[n].key, PMIX_NODEID, PMIX_MAX_KEYLEN);
659     x->info[n].value.type = PMIX_UINT32;
660     x->info[n].value.data.uint32 = 0;
661     ++n;
662 
663     (void)strncpy(x->info[n].key, PMIX_NODE_SIZE, PMIX_MAX_KEYLEN);
664     x->info[n].value.type = PMIX_UINT32;
665     x->info[n].value.data.uint32 = nprocs;
666     ++n;
667 
668     (void)strncpy(x->info[n].key, PMIX_NUM_NODES, PMIX_MAX_KEYLEN);
669     x->info[n].value.type = PMIX_UINT32;
670     x->info[n].value.data.uint32 = 1;
671     ++n;
672 
673     (void)strncpy(x->info[n].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
674     x->info[n].value.type = PMIX_UINT32;
675     x->info[n].value.data.uint32 = nprocs;
676     ++n;
677 
678     (void)strncpy(x->info[n].key, PMIX_MAX_PROCS, PMIX_MAX_KEYLEN);
679     x->info[n].value.type = PMIX_UINT32;
680     x->info[n].value.data.uint32 = nprocs;
681     ++n;
682 
683     (void)strncpy(x->info[n].key, PMIX_JOB_NUM_APPS, PMIX_MAX_KEYLEN);
684     x->info[n].value.type = PMIX_UINT32;
685     x->info[n].value.data.uint32 = 1;
686     ++n;
687 
688     (void)strncpy(x->info[n].key, PMIX_LOCALLDR, PMIX_MAX_KEYLEN);
689     x->info[n].value.type = PMIX_PROC_RANK;
690     x->info[n].value.data.uint32 = 0;
691     ++n;
692 
693     /* add the proc-specific data */
694     for (m=0; m < nprocs; m++) {
695         (void)strncpy(x->info[n].key, PMIX_PROC_DATA, PMIX_MAX_KEYLEN);
696         x->info[n].value.type = PMIX_DATA_ARRAY;
697         PMIX_DATA_ARRAY_CREATE(array, 5, PMIX_INFO);
698         x->info[n].value.data.darray = array;
699         info = (pmix_info_t*)array->array;
700         k = 0;
701         (void)strncpy(info[k].key, PMIX_RANK, PMIX_MAX_KEYLEN);
702         info[k].value.type = PMIX_PROC_RANK;
703         info[k].value.data.rank = m;
704         ++k;
705         (void)strncpy(info[k].key, PMIX_GLOBAL_RANK, PMIX_MAX_KEYLEN);
706         info[k].value.type = PMIX_PROC_RANK;
707         info[k].value.data.rank = m;
708         ++k;
709         (void)strncpy(info[k].key, PMIX_LOCAL_RANK, PMIX_MAX_KEYLEN);
710         info[k].value.type = PMIX_UINT16;
711         info[k].value.data.uint16 = m;
712         ++k;
713 
714         (void)strncpy(info[k].key, PMIX_NODE_RANK, PMIX_MAX_KEYLEN);
715         info[k].value.type = PMIX_UINT16;
716         info[k].value.data.uint16 = m;
717         ++k;
718 
719         (void)strncpy(info[k].key, PMIX_NODEID, PMIX_MAX_KEYLEN);
720         info[k].value.type = PMIX_UINT32;
721         info[k].value.data.uint32 = 0;
722         ++k;
723         /* move to next proc */
724         ++n;
725     }
726     PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
727                                 cbfunc, x);
728 }
729 
errhandler(size_t evhdlr_registration_id,pmix_status_t status,const pmix_proc_t * source,pmix_info_t info[],size_t ninfo,pmix_info_t results[],size_t nresults,pmix_event_notification_cbfunc_fn_t cbfunc,void * cbdata)730 static void errhandler(size_t evhdlr_registration_id,
731                        pmix_status_t status,
732                        const pmix_proc_t *source,
733                        pmix_info_t info[], size_t ninfo,
734                        pmix_info_t results[], size_t nresults,
735                        pmix_event_notification_cbfunc_fn_t cbfunc,
736                        void *cbdata)
737 {
738     pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
739     /* we must NOT tell the event handler state machine that we
740      * are the last step as that will prevent it from notifying
741      * anyone else that might be listening for declarations */
742     if (NULL != cbfunc) {
743         cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
744     }
745 }
746 
errhandler_reg_callbk(pmix_status_t status,size_t errhandler_ref,void * cbdata)747 static void errhandler_reg_callbk (pmix_status_t status,
748                                    size_t errhandler_ref,
749                                    void *cbdata)
750 {
751     mylock_t *lock = (mylock_t*)cbdata;
752 
753     lock->status = status;
754     DEBUG_WAKEUP_THREAD(lock);
755 }
756 
connected(const pmix_proc_t * proc,void * server_object,pmix_op_cbfunc_t cbfunc,void * cbdata)757 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
758                                pmix_op_cbfunc_t cbfunc, void *cbdata)
759 {
760     return PMIX_OPERATION_SUCCEEDED;
761 }
finalized(const pmix_proc_t * proc,void * server_object,pmix_op_cbfunc_t cbfunc,void * cbdata)762 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
763                      pmix_op_cbfunc_t cbfunc, void *cbdata)
764 {
765     return PMIX_OPERATION_SUCCEEDED;
766 }
767 
abcbfunc(pmix_status_t status,void * cbdata)768 static void abcbfunc(pmix_status_t status, void *cbdata)
769 {
770     myxfer_t *x = (myxfer_t*)cbdata;
771 
772     /* be sure to release the caller */
773     if (NULL != x->cbfunc) {
774         x->cbfunc(status, x->cbdata);
775     }
776     PMIX_RELEASE(x);
777 }
778 
abort_fn(const pmix_proc_t * proc,void * server_object,int status,const char msg[],pmix_proc_t procs[],size_t nprocs,pmix_op_cbfunc_t cbfunc,void * cbdata)779 static pmix_status_t abort_fn(const pmix_proc_t *proc,
780                               void *server_object,
781                               int status, const char msg[],
782                               pmix_proc_t procs[], size_t nprocs,
783                               pmix_op_cbfunc_t cbfunc, void *cbdata)
784 {
785     pmix_status_t rc;
786     myxfer_t *x;
787 
788     if (NULL != procs) {
789         pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
790     } else {
791         pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
792     }
793 
794     /* instead of aborting the specified procs, notify them
795      * (if they have registered their errhandler) */
796 
797     /* use the myxfer_t object to ensure we release
798      * the caller when notification has been queued */
799     x = PMIX_NEW(myxfer_t);
800     (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
801     x->caller.rank = proc->rank;
802 
803     PMIX_INFO_CREATE(x->info, 2);
804     (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
805     x->info[0].value.type = PMIX_INT8;
806     x->info[0].value.data.int8 = 12;
807     (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
808     x->info[1].value.type = PMIX_DOUBLE;
809     x->info[1].value.data.dval = 12.34;
810     x->cbfunc = cbfunc;
811     x->cbdata = cbdata;
812 
813     if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
814                                                 PMIX_RANGE_NAMESPACE,
815                                                 x->info, 2,
816                                                 abcbfunc, x))) {
817         pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
818     }
819 
820     return PMIX_SUCCESS;
821 }
822 
fencbfn(int sd,short args,void * cbdata)823 static void fencbfn(int sd, short args, void *cbdata)
824 {
825     pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
826 
827     /* pass the provided data back to each participating proc */
828     if (NULL != scd->cbfunc.modexcbfunc) {
829         scd->cbfunc.modexcbfunc(scd->status, scd->data, scd->ndata, scd->cbdata, NULL, NULL);
830     }
831     PMIX_RELEASE(scd);
832 }
fencenb_fn(const pmix_proc_t procs[],size_t nprocs,const pmix_info_t info[],size_t ninfo,char * data,size_t ndata,pmix_modex_cbfunc_t cbfunc,void * cbdata)833 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
834                       const pmix_info_t info[], size_t ninfo,
835                       char *data, size_t ndata,
836                       pmix_modex_cbfunc_t cbfunc, void *cbdata)
837 {
838     pmix_shift_caddy_t *scd;
839 
840     scd = PMIX_NEW(pmix_shift_caddy_t);
841     scd->status = PMIX_SUCCESS;
842     scd->data = data;
843     scd->ndata = ndata;
844     scd->cbfunc.modexcbfunc = cbfunc;
845     scd->cbdata = cbdata;
846     PMIX_THREADSHIFT(scd, fencbfn);
847     return PMIX_SUCCESS;
848 }
849 
850 
dmodex_fn(const pmix_proc_t * proc,const pmix_info_t info[],size_t ninfo,pmix_modex_cbfunc_t cbfunc,void * cbdata)851 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
852                      const pmix_info_t info[], size_t ninfo,
853                      pmix_modex_cbfunc_t cbfunc, void *cbdata)
854 {
855     pmix_shift_caddy_t *scd;
856 
857     /* if this is a timeout test, then do nothing */
858     if (istimeouttest) {
859         return PMIX_SUCCESS;
860     }
861 
862     scd = PMIX_NEW(pmix_shift_caddy_t);
863     scd->status = PMIX_ERR_NOT_FOUND;
864     scd->cbfunc.modexcbfunc = cbfunc;
865     scd->cbdata = cbdata;
866     PMIX_THREADSHIFT(scd, fencbfn);
867 
868     return PMIX_SUCCESS;
869 }
870 
871 
publish_fn(const pmix_proc_t * proc,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)872 static pmix_status_t publish_fn(const pmix_proc_t *proc,
873                       const pmix_info_t info[], size_t ninfo,
874                       pmix_op_cbfunc_t cbfunc, void *cbdata)
875 {
876     pmix_locdat_t *p;
877     size_t n;
878 
879     for (n=0; n < ninfo; n++) {
880         p = PMIX_NEW(pmix_locdat_t);
881         (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
882         p->pdata.proc.rank = proc->rank;
883         (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
884         pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
885         pmix_list_append(&pubdata, &p->super);
886     }
887 
888     return PMIX_OPERATION_SUCCEEDED;
889 }
890 
891 typedef struct {
892     pmix_event_t ev;
893     pmix_pdata_t *pd;
894     size_t n;
895     pmix_lookup_cbfunc_t cbfunc;
896     void *cbdata;
897 } lkobj_t;
898 
lkcbfn(int sd,short args,void * cbdata)899 static void lkcbfn(int sd, short args, void *cbdata)
900 {
901     lkobj_t *lk = (lkobj_t*)cbdata;
902 
903     lk->cbfunc(PMIX_SUCCESS, lk->pd, lk->n, lk->cbdata);
904     PMIX_PDATA_FREE(lk->pd, lk->n);
905     free(lk);
906 }
907 
lookup_fn(const pmix_proc_t * proc,char ** keys,const pmix_info_t info[],size_t ninfo,pmix_lookup_cbfunc_t cbfunc,void * cbdata)908 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
909                      const pmix_info_t info[], size_t ninfo,
910                      pmix_lookup_cbfunc_t cbfunc, void *cbdata)
911 {
912     pmix_locdat_t *p, *p2;
913     pmix_list_t results;
914     size_t i, n;
915     pmix_pdata_t *pd = NULL;
916     pmix_status_t ret = PMIX_ERR_NOT_FOUND;
917     lkobj_t *lk;
918 
919     PMIX_CONSTRUCT(&results, pmix_list_t);
920 
921     for (n=0; NULL != keys[n]; n++) {
922         PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
923             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
924                 p2 = PMIX_NEW(pmix_locdat_t);
925                 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
926                 p2->pdata.proc.rank = p->pdata.proc.rank;
927                 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
928                 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
929                 pmix_list_append(&results, &p2->super);
930                 break;
931             }
932         }
933     }
934     if (0 < (n = pmix_list_get_size(&results))) {
935         ret = PMIX_SUCCESS;
936         PMIX_PDATA_CREATE(pd, n);
937         for (i=0; i < n; i++) {
938             p = (pmix_locdat_t*)pmix_list_remove_first(&results);
939             if (p) {
940                 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
941                 pd[i].proc.rank = p->pdata.proc.rank;
942                 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
943                 pmix_value_xfer(&pd[i].value, &p->pdata.value);
944             }
945         }
946     }
947     PMIX_LIST_DESTRUCT(&results);
948     if (PMIX_SUCCESS == ret) {
949         lk = (lkobj_t*)malloc(sizeof(lkobj_t));
950         lk->pd = pd;
951         lk->n = n;
952         lk->cbfunc = cbfunc;
953         lk->cbdata = cbdata;
954         PMIX_THREADSHIFT(lk, lkcbfn);
955     }
956 
957     return ret;
958 }
959 
960 
unpublish_fn(const pmix_proc_t * proc,char ** keys,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)961 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
962                         const pmix_info_t info[], size_t ninfo,
963                         pmix_op_cbfunc_t cbfunc, void *cbdata)
964 {
965     pmix_locdat_t *p, *p2;
966     size_t n;
967 
968     for (n=0; NULL != keys[n]; n++) {
969         PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
970             if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
971                 pmix_list_remove_item(&pubdata, &p->super);
972                 PMIX_RELEASE(p);
973                 break;
974             }
975         }
976     }
977     return PMIX_OPERATION_SUCCEEDED;
978 }
979 
spcbfunc(pmix_status_t status,void * cbdata)980 static void spcbfunc(pmix_status_t status, void *cbdata)
981 {
982     myxfer_t *x = (myxfer_t*)cbdata;
983 
984     if (NULL != x->spcbfunc) {
985         x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
986     }
987 }
988 
spawn_fn(const pmix_proc_t * proc,const pmix_info_t job_info[],size_t ninfo,const pmix_app_t apps[],size_t napps,pmix_spawn_cbfunc_t cbfunc,void * cbdata)989 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
990                     const pmix_info_t job_info[], size_t ninfo,
991                     const pmix_app_t apps[], size_t napps,
992                     pmix_spawn_cbfunc_t cbfunc, void *cbdata)
993 {
994     myxfer_t *x;
995     size_t n;
996     pmix_proc_t *pptr;
997     bool spawned;
998 
999     /* check the job info for parent and spawned keys */
1000     for (n=0; n < ninfo; n++) {
1001         if (0 == strncmp(job_info[n].key, PMIX_PARENT_ID, PMIX_MAX_KEYLEN)) {
1002             pptr = job_info[n].value.data.proc;
1003             pmix_output(0, "SPAWN: Parent ID %s:%d", pptr->nspace, pptr->rank);
1004         } else if (0 == strncmp(job_info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN)) {
1005             spawned = PMIX_INFO_TRUE(&job_info[n]);
1006             pmix_output(0, "SPAWN: Spawned %s", spawned ? "TRUE" : "FALSE");
1007         }
1008     }
1009 
1010     /* in practice, we would pass this request to the local
1011      * resource manager for launch, and then have that server
1012      * execute our callback function. For now, we will fake
1013      * the spawn and just pretend */
1014 
1015     /* must register the nspace for the new procs before
1016      * we return to the caller */
1017     x = PMIX_NEW(myxfer_t);
1018     x->spcbfunc = cbfunc;
1019     x->cbdata = cbdata;
1020 
1021     set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
1022 
1023     return PMIX_SUCCESS;
1024 }
1025 
1026 static int numconnects = 0;
1027 
connect_fn(const pmix_proc_t procs[],size_t nprocs,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1028 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
1029                                 const pmix_info_t info[], size_t ninfo,
1030                                 pmix_op_cbfunc_t cbfunc, void *cbdata)
1031 {
1032     /* in practice, we would pass this request to the local
1033      * resource manager for handling */
1034 
1035     numconnects++;
1036 
1037     return PMIX_OPERATION_SUCCEEDED;
1038 }
1039 
1040 
disconnect_fn(const pmix_proc_t procs[],size_t nprocs,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1041 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
1042                                    const pmix_info_t info[], size_t ninfo,
1043                                    pmix_op_cbfunc_t cbfunc, void *cbdata)
1044 {
1045     return PMIX_OPERATION_SUCCEEDED;
1046 }
1047 
register_event_fn(pmix_status_t * codes,size_t ncodes,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1048 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
1049                                        const pmix_info_t info[], size_t ninfo,
1050                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
1051 {
1052     return PMIX_OPERATION_SUCCEEDED;
1053 }
1054 
deregister_events(pmix_status_t * codes,size_t ncodes,pmix_op_cbfunc_t cbfunc,void * cbdata)1055 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
1056                                        pmix_op_cbfunc_t cbfunc, void *cbdata)
1057 {
1058     return PMIX_OPERATION_SUCCEEDED;
1059 }
1060 
notify_event(pmix_status_t code,const pmix_proc_t * source,pmix_data_range_t range,pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1061 static pmix_status_t notify_event(pmix_status_t code,
1062                                   const pmix_proc_t *source,
1063                                   pmix_data_range_t range,
1064                                   pmix_info_t info[], size_t ninfo,
1065                                   pmix_op_cbfunc_t cbfunc, void *cbdata)
1066 {
1067     return PMIX_OPERATION_SUCCEEDED;
1068 }
1069 
1070 typedef struct query_data_t {
1071     pmix_event_t ev;
1072     pmix_info_t *data;
1073     size_t ndata;
1074     pmix_info_cbfunc_t cbfunc;
1075     void *cbdata;
1076 } query_data_t;
1077 
qfn(int sd,short args,void * cbdata)1078 static void qfn(int sd, short args, void *cbdata)
1079 {
1080     query_data_t *qd = (query_data_t*)cbdata;
1081 
1082     qd->cbfunc(PMIX_SUCCESS, qd->data, qd->ndata, qd->cbdata, NULL, NULL);
1083     PMIX_INFO_FREE(qd->data, qd->ndata);
1084 }
1085 
query_fn(pmix_proc_t * proct,pmix_query_t * queries,size_t nqueries,pmix_info_cbfunc_t cbfunc,void * cbdata)1086 static pmix_status_t query_fn(pmix_proc_t *proct,
1087                               pmix_query_t *queries, size_t nqueries,
1088                               pmix_info_cbfunc_t cbfunc,
1089                               void *cbdata)
1090 {
1091     size_t n;
1092     pmix_info_t *info;
1093     query_data_t qd;
1094 
1095     if (NULL == cbfunc) {
1096         return PMIX_ERROR;
1097     }
1098     /* keep this simple */
1099     PMIX_INFO_CREATE(info, nqueries);
1100     for (n=0; n < nqueries; n++) {
1101         pmix_output(0, "\tKey: %s", queries[n].keys[0]);
1102         (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
1103         info[n].value.type = PMIX_STRING;
1104         if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
1105             return PMIX_ERROR;
1106         }
1107     }
1108     qd.data = info;
1109     qd.ndata = nqueries;
1110     qd.cbfunc = cbfunc;
1111     qd.cbdata = cbdata;
1112     PMIX_THREADSHIFT(&qd, qfn);
1113     return PMIX_SUCCESS;
1114 }
1115 
tool_connect_fn(pmix_info_t * info,size_t ninfo,pmix_tool_connection_cbfunc_t cbfunc,void * cbdata)1116 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
1117                             pmix_tool_connection_cbfunc_t cbfunc,
1118                             void *cbdata)
1119 {
1120     pmix_proc_t proc;
1121 
1122     /* just pass back an arbitrary nspace */
1123     (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
1124     proc.rank = 0;
1125 
1126     if (NULL != cbfunc) {
1127         cbfunc(PMIX_SUCCESS, &proc, cbdata);
1128     }
1129 }
1130 
1131 typedef struct {
1132     pmix_event_t ev;
1133     pmix_op_cbfunc_t cbfunc;
1134     void *cbdata;
1135 } mylog_t;
1136 
foobar(int sd,short args,void * cbdata)1137 static void foobar(int sd, short args, void *cbdata)
1138 {
1139     mylog_t *lg = (mylog_t*)cbdata;
1140     lg->cbfunc(PMIX_SUCCESS, lg->cbdata);
1141 }
log_fn(const pmix_proc_t * client,const pmix_info_t data[],size_t ndata,const pmix_info_t directives[],size_t ndirs,pmix_op_cbfunc_t cbfunc,void * cbdata)1142 static void log_fn(const pmix_proc_t *client,
1143                    const pmix_info_t data[], size_t ndata,
1144                    const pmix_info_t directives[], size_t ndirs,
1145                    pmix_op_cbfunc_t cbfunc, void *cbdata)
1146 {
1147     mylog_t *lg = (mylog_t *)malloc(sizeof(mylog_t));
1148 
1149     lg->cbfunc = cbfunc;
1150     lg->cbdata = cbdata;
1151     PMIX_THREADSHIFT(lg, foobar);
1152 }
1153 
alloc_fn(const pmix_proc_t * client,pmix_alloc_directive_t directive,const pmix_info_t data[],size_t ndata,pmix_info_cbfunc_t cbfunc,void * cbdata)1154 static pmix_status_t alloc_fn(const pmix_proc_t *client,
1155                               pmix_alloc_directive_t directive,
1156                               const pmix_info_t data[], size_t ndata,
1157                               pmix_info_cbfunc_t cbfunc, void *cbdata)
1158 {
1159     return PMIX_OPERATION_SUCCEEDED;
1160 }
1161 
jctrl_fn(const pmix_proc_t * requestor,const pmix_proc_t targets[],size_t ntargets,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)1162 static pmix_status_t jctrl_fn(const pmix_proc_t *requestor,
1163                               const pmix_proc_t targets[], size_t ntargets,
1164                               const pmix_info_t directives[], size_t ndirs,
1165                               pmix_info_cbfunc_t cbfunc, void *cbdata)
1166 {
1167     return PMIX_OPERATION_SUCCEEDED;
1168 }
1169 
mon_fn(const pmix_proc_t * requestor,const pmix_info_t * monitor,pmix_status_t error,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)1170 static pmix_status_t mon_fn(const pmix_proc_t *requestor,
1171                             const pmix_info_t *monitor, pmix_status_t error,
1172                             const pmix_info_t directives[], size_t ndirs,
1173                             pmix_info_cbfunc_t cbfunc, void *cbdata)
1174 {
1175     return PMIX_ERR_NOT_SUPPORTED;
1176 }
1177 
1178 
wait_signal_callback(int fd,short event,void * arg)1179 static void wait_signal_callback(int fd, short event, void *arg)
1180 {
1181     pmix_event_t *sig = (pmix_event_t*) arg;
1182     int status;
1183     pid_t pid;
1184     wait_tracker_t *t2;
1185 
1186     if (SIGCHLD != event_get_signal(sig)) {
1187         return;
1188     }
1189 
1190     /* we can have multiple children leave but only get one
1191      * sigchild callback, so reap all the waitpids until we
1192      * don't get anything valid back */
1193     while (1) {
1194         pid = waitpid(-1, &status, WNOHANG);
1195         if (-1 == pid && EINTR == errno) {
1196             /* try it again */
1197             continue;
1198         }
1199         /* if we got garbage, then nothing we can do */
1200         if (pid <= 0) {
1201             return;
1202         }
1203 
1204         /* we are already in an event, so it is safe to access the list */
1205         PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
1206             if (pid == t2->pid) {
1207                 /* found it! */
1208                 if (WIFEXITED(status)) {
1209                     t2->exit_code = WEXITSTATUS(status);
1210                 } else {
1211                     if (WIFSIGNALED(status)) {
1212                         t2->exit_code = WTERMSIG(status) + 128;
1213                     }
1214                 }
1215                 if (0 != t2->exit_code && 0 == exit_code) {
1216                     exit_code = t2->exit_code;
1217                 }
1218                 --wakeup;
1219                 break;
1220             }
1221         }
1222     }
1223     fprintf(stderr, "ENDLOOP\n");
1224 }
1225