1 /*
2 * Copyright (c) 2004-2010 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2011 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2006-2013 Los Alamos National Security, LLC.
13 * All rights reserved.
14 * Copyright (c) 2009-2012 Cisco Systems, Inc. All rights reserved.
15 * Copyright (c) 2011 Oak Ridge National Labs. All rights reserved.
16 * Copyright (c) 2013-2019 Intel, Inc. All rights reserved.
17 * Copyright (c) 2015-2019 Research Organization for Information Science
18 * and Technology (RIST). All rights reserved.
19 * Copyright (c) 2016 IBM Corporation. All rights reserved.
20 * $COPYRIGHT$
21 *
22 * Additional copyrights may follow
23 *
24 * $HEADER$
25 *
26 */
27
28 #include <src/include/pmix_config.h>
29 #include <pmix_server.h>
30 #include <src/include/types.h>
31 #include <src/include/pmix_globals.h>
32
33 #include <stdio.h>
34 #include <stdlib.h>
35 #include <unistd.h>
36 #include <time.h>
37 #include <sys/types.h>
38 #include <sys/wait.h>
39 #include <errno.h>
40 #include <signal.h>
41
42 #include "src/class/pmix_list.h"
43 #include "src/util/pmix_environ.h"
44 #include "src/util/output.h"
45 #include "src/util/printf.h"
46 #include "src/util/argv.h"
47
48 #include "simptest.h"
49
50 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
51 pmix_op_cbfunc_t cbfunc, void *cbdata);
52 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
53 pmix_op_cbfunc_t cbfunc, void *cbdata);
54 static pmix_status_t abort_fn(const pmix_proc_t *proc, void *server_object,
55 int status, const char msg[],
56 pmix_proc_t procs[], size_t nprocs,
57 pmix_op_cbfunc_t cbfunc, void *cbdata);
58 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
59 const pmix_info_t info[], size_t ninfo,
60 char *data, size_t ndata,
61 pmix_modex_cbfunc_t cbfunc, void *cbdata);
62 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
63 const pmix_info_t info[], size_t ninfo,
64 pmix_modex_cbfunc_t cbfunc, void *cbdata);
65 static pmix_status_t publish_fn(const pmix_proc_t *proc,
66 const pmix_info_t info[], size_t ninfo,
67 pmix_op_cbfunc_t cbfunc, void *cbdata);
68 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
69 const pmix_info_t info[], size_t ninfo,
70 pmix_lookup_cbfunc_t cbfunc, void *cbdata);
71 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
72 const pmix_info_t info[], size_t ninfo,
73 pmix_op_cbfunc_t cbfunc, void *cbdata);
74 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
75 const pmix_info_t job_info[], size_t ninfo,
76 const pmix_app_t apps[], size_t napps,
77 pmix_spawn_cbfunc_t cbfunc, void *cbdata);
78 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
79 const pmix_info_t info[], size_t ninfo,
80 pmix_op_cbfunc_t cbfunc, void *cbdata);
81 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
82 const pmix_info_t info[], size_t ninfo,
83 pmix_op_cbfunc_t cbfunc, void *cbdata);
84 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
85 const pmix_info_t info[], size_t ninfo,
86 pmix_op_cbfunc_t cbfunc, void *cbdata);
87 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
88 pmix_op_cbfunc_t cbfunc, void *cbdata);
89 static pmix_status_t notify_event(pmix_status_t code,
90 const pmix_proc_t *source,
91 pmix_data_range_t range,
92 pmix_info_t info[], size_t ninfo,
93 pmix_op_cbfunc_t cbfunc, void *cbdata);
94 static pmix_status_t query_fn(pmix_proc_t *proct,
95 pmix_query_t *queries, size_t nqueries,
96 pmix_info_cbfunc_t cbfunc,
97 void *cbdata);
98 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
99 pmix_tool_connection_cbfunc_t cbfunc,
100 void *cbdata);
101 static void log_fn(const pmix_proc_t *client,
102 const pmix_info_t data[], size_t ndata,
103 const pmix_info_t directives[], size_t ndirs,
104 pmix_op_cbfunc_t cbfunc, void *cbdata);
105 static pmix_status_t alloc_fn(const pmix_proc_t *client,
106 pmix_alloc_directive_t directive,
107 const pmix_info_t data[], size_t ndata,
108 pmix_info_cbfunc_t cbfunc, void *cbdata);
109 static pmix_status_t jctrl_fn(const pmix_proc_t *requestor,
110 const pmix_proc_t targets[], size_t ntargets,
111 const pmix_info_t directives[], size_t ndirs,
112 pmix_info_cbfunc_t cbfunc, void *cbdata);
113 static pmix_status_t mon_fn(const pmix_proc_t *requestor,
114 const pmix_info_t *monitor, pmix_status_t error,
115 const pmix_info_t directives[], size_t ndirs,
116 pmix_info_cbfunc_t cbfunc, void *cbdata);
117
118 static pmix_server_module_t mymodule = {
119 .client_connected = connected,
120 .client_finalized = finalized,
121 .abort = abort_fn,
122 .fence_nb = fencenb_fn,
123 .direct_modex = dmodex_fn,
124 .publish = publish_fn,
125 .lookup = lookup_fn,
126 .unpublish = unpublish_fn,
127 .spawn = spawn_fn,
128 .connect = connect_fn,
129 .disconnect = disconnect_fn,
130 .register_events = register_event_fn,
131 .deregister_events = deregister_events,
132 .notify_event = notify_event,
133 .query = query_fn,
134 .tool_connected = tool_connect_fn,
135 .log = log_fn,
136 .allocate = alloc_fn,
137 .job_control = jctrl_fn,
138 .monitor = mon_fn
139 };
140
141 typedef struct {
142 pmix_list_item_t super;
143 pmix_pdata_t pdata;
144 } pmix_locdat_t;
145 PMIX_CLASS_INSTANCE(pmix_locdat_t,
146 pmix_list_item_t,
147 NULL, NULL);
148
149 typedef struct {
150 pmix_object_t super;
151 mylock_t lock;
152 pmix_event_t ev;
153 pmix_proc_t caller;
154 pmix_info_t *info;
155 size_t ninfo;
156 pmix_op_cbfunc_t cbfunc;
157 pmix_spawn_cbfunc_t spcbfunc;
158 pmix_release_cbfunc_t relcbfunc;
159 void *cbdata;
160 } myxfer_t;
xfcon(myxfer_t * p)161 static void xfcon(myxfer_t *p)
162 {
163 DEBUG_CONSTRUCT_LOCK(&p->lock);
164 p->info = NULL;
165 p->ninfo = 0;
166 p->cbfunc = NULL;
167 p->spcbfunc = NULL;
168 p->cbdata = NULL;
169 }
xfdes(myxfer_t * p)170 static void xfdes(myxfer_t *p)
171 {
172 DEBUG_DESTRUCT_LOCK(&p->lock);
173 if (NULL != p->info) {
174 PMIX_INFO_FREE(p->info, p->ninfo);
175 }
176 }
177 PMIX_CLASS_INSTANCE(myxfer_t,
178 pmix_object_t,
179 xfcon, xfdes);
180
181 typedef struct {
182 pmix_list_item_t super;
183 int exit_code;
184 pid_t pid;
185 } wait_tracker_t;
186 PMIX_CLASS_INSTANCE(wait_tracker_t,
187 pmix_list_item_t,
188 NULL, NULL);
189
190 static volatile int wakeup;
191 static int exit_code = 0;
192 static pmix_list_t pubdata;
193 static pmix_event_t handler;
194 static pmix_list_t children;
195 static bool istimeouttest = false;
196 static mylock_t globallock;
197 static bool arrays = false;
198
199 static void set_namespace(int nprocs, char *ranks, char *nspace,
200 pmix_op_cbfunc_t cbfunc, myxfer_t *x);
201 static void errhandler(size_t evhdlr_registration_id,
202 pmix_status_t status,
203 const pmix_proc_t *source,
204 pmix_info_t info[], size_t ninfo,
205 pmix_info_t results[], size_t nresults,
206 pmix_event_notification_cbfunc_fn_t cbfunc,
207 void *cbdata);
208 static void wait_signal_callback(int fd, short event, void *arg);
209 static void errhandler_reg_callbk (pmix_status_t status,
210 size_t errhandler_ref,
211 void *cbdata);
212
opcbfunc(pmix_status_t status,void * cbdata)213 static void opcbfunc(pmix_status_t status, void *cbdata)
214 {
215 myxfer_t *x = (myxfer_t*)cbdata;
216
217 /* release the caller, if necessary */
218 if (NULL != x->cbfunc) {
219 x->cbfunc(PMIX_SUCCESS, x->cbdata);
220 }
221 DEBUG_WAKEUP_THREAD(&x->lock);
222 }
223
224 /* this is an event notification function that we explicitly request
225 * be called when the PMIX_MODEL_DECLARED notification is issued.
226 * We could catch it in the general event notification function and test
227 * the status to see if the status matched, but it often is simpler
228 * to declare a use-specific notification callback point. In this case,
229 * we are asking to know whenever a model is declared as a means
230 * of testing server self-notification */
model_callback(size_t evhdlr_registration_id,pmix_status_t status,const pmix_proc_t * source,pmix_info_t info[],size_t ninfo,pmix_info_t results[],size_t nresults,pmix_event_notification_cbfunc_fn_t cbfunc,void * cbdata)231 static void model_callback(size_t evhdlr_registration_id,
232 pmix_status_t status,
233 const pmix_proc_t *source,
234 pmix_info_t info[], size_t ninfo,
235 pmix_info_t results[], size_t nresults,
236 pmix_event_notification_cbfunc_fn_t cbfunc,
237 void *cbdata)
238 {
239 size_t n;
240
241 /* just let us know it was received */
242 fprintf(stderr, "SIMPTEST: Model event handler called with status %d(%s)\n",
243 status, PMIx_Error_string(status));
244 for (n=0; n < ninfo; n++) {
245 if (PMIX_STRING == info[n].value.type) {
246 fprintf(stderr, "\t%s:\t%s\n", info[n].key, info[n].value.data.string);
247 }
248 }
249
250 /* we must NOT tell the event handler state machine that we
251 * are the last step as that will prevent it from notifying
252 * anyone else that might be listening for declarations */
253 if (NULL != cbfunc) {
254 cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
255 }
256 DEBUG_WAKEUP_THREAD(&globallock);
257 }
258
259 /* event handler registration is done asynchronously */
model_registration_callback(pmix_status_t status,size_t evhandler_ref,void * cbdata)260 static void model_registration_callback(pmix_status_t status,
261 size_t evhandler_ref,
262 void *cbdata)
263 {
264 mylock_t *lock = (mylock_t*)cbdata;
265
266 if (PMIX_SUCCESS != status) {
267 fprintf(stderr, "simptest EVENT HANDLER REGISTRATION FAILED WITH STATUS %d, ref=%lu\n",
268 status, (unsigned long)evhandler_ref);
269 }
270 lock->status = status;
271 DEBUG_WAKEUP_THREAD(lock);
272 }
273
set_handler_default(int sig)274 static void set_handler_default(int sig)
275 {
276 struct sigaction act;
277
278 act.sa_handler = SIG_DFL;
279 act.sa_flags = 0;
280 sigemptyset(&act.sa_mask);
281
282 sigaction(sig, &act, (struct sigaction *)0);
283 }
284
main(int argc,char ** argv)285 int main(int argc, char **argv)
286 {
287 char **client_env=NULL;
288 char **client_argv=NULL;
289 char *tmp, **atmp, *executable=NULL;
290 int rc, nprocs=1, n, k;
291 uid_t myuid;
292 gid_t mygid;
293 pid_t pid;
294 myxfer_t *x;
295 pmix_proc_t proc;
296 wait_tracker_t *child;
297 pmix_info_t *info;
298 size_t ninfo;
299 bool cross_version = false;
300 bool usock = true;
301 mylock_t mylock;
302 pmix_status_t code;
303 sigset_t unblock;
304
305 /* smoke test */
306 if (PMIX_SUCCESS != 0) {
307 fprintf(stderr, "ERROR IN COMPUTING CONSTANTS: PMIX_SUCCESS = %d\n", PMIX_SUCCESS);
308 exit(1);
309 }
310
311 /* see if we were passed the number of procs to run or
312 * the executable to use */
313 for (n=1; n < argc; n++) {
314 if (0 == strcmp("-n", argv[n]) &&
315 NULL != argv[n+1]) {
316 nprocs = strtol(argv[n+1], NULL, 10);
317 ++n; // step over the argument
318 } else if (0 == strcmp("-e", argv[n]) &&
319 NULL != argv[n+1]) {
320 executable = strdup(argv[n+1]);
321 /* check for timeout test */
322 if (NULL != strstr(executable, "simptimeout")) {
323 istimeouttest = true;
324 }
325 for (k=n+2; NULL != argv[k]; k++) {
326 pmix_argv_append_nosize(&client_argv, argv[k]);
327 }
328 n += k;
329 } else if (0 == strcmp("-x", argv[n])) {
330 /* cross-version test - we will set one child to
331 * run at a different version. Requires -n >= 2 */
332 cross_version = true;
333 usock = false;
334 } else if (0 == strcmp("-u", argv[n])) {
335 /* enable usock */
336 usock = false;
337 } else if (0 == strcmp("-h", argv[n])) {
338 /* print the options and exit */
339 fprintf(stderr, "usage: simptest <options>\n");
340 fprintf(stderr, " -n N Number of clients to run\n");
341 fprintf(stderr, " -e foo Name of the client executable to run (default: simpclient\n");
342 fprintf(stderr, " -x Test cross-version support\n");
343 fprintf(stderr, " -u Enable legacy usock support\n");
344 fprintf(stderr, " -arrays Use the job session array to pass registration info\n");
345 exit(0);
346 } else if (0 == strcmp("-arrays", argv[n]) ||
347 0 == strcmp("--arrays", argv[n])) {
348 arrays = true;
349 }
350 }
351 if (NULL == executable) {
352 executable = strdup("./simpclient");
353 }
354 /* check for executable existence and permissions */
355 if (0 != access(executable, X_OK)) {
356 fprintf(stderr, "Executable %s not found or missing executable permissions\n", executable);
357 exit(1);
358 }
359
360 if (cross_version && nprocs < 2) {
361 fprintf(stderr, "Cross-version testing requires at least two clients\n");
362 exit(1);
363 }
364
365 fprintf(stderr, "Testing version %s\n", PMIx_Get_version());
366
367 /* ensure that SIGCHLD is unblocked as we need to capture it */
368 if (0 != sigemptyset(&unblock)) {
369 fprintf(stderr, "SIGEMPTYSET FAILED\n");
370 exit(1);
371 }
372 if (0 != sigaddset(&unblock, SIGCHLD)) {
373 fprintf(stderr, "SIGADDSET FAILED\n");
374 exit(1);
375 }
376 if (0 != sigprocmask(SIG_UNBLOCK, &unblock, NULL)) {
377 fprintf(stderr, "SIG_UNBLOCK FAILED\n");
378 exit(1);
379 }
380
381
382 /* setup the server library and tell it to support tool connections */
383 ninfo = 1;
384
385 PMIX_INFO_CREATE(info, ninfo);
386 PMIX_INFO_LOAD(&info[0], PMIX_SERVER_TOOL_SUPPORT, NULL, PMIX_BOOL);
387 if (PMIX_SUCCESS != (rc = PMIx_server_init(&mymodule, info, ninfo))) {
388 fprintf(stderr, "Init failed with error %d\n", rc);
389 return rc;
390 }
391 PMIX_INFO_FREE(info, ninfo);
392
393 /* register the default errhandler */
394 DEBUG_CONSTRUCT_LOCK(&mylock);
395 ninfo = 1;
396 PMIX_INFO_CREATE(info, ninfo);
397 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-DEFAULT", PMIX_STRING);
398 PMIx_Register_event_handler(NULL, 0, info, ninfo,
399 errhandler, errhandler_reg_callbk, (void*)&mylock);
400 DEBUG_WAIT_THREAD(&mylock);
401 PMIX_INFO_FREE(info, ninfo);
402 if (PMIX_SUCCESS != mylock.status) {
403 exit(mylock.status);
404 }
405 DEBUG_DESTRUCT_LOCK(&mylock);
406
407 /* register a handler specifically for when models declare */
408 DEBUG_CONSTRUCT_LOCK(&mylock);
409 ninfo = 1;
410 PMIX_INFO_CREATE(info, ninfo);
411 PMIX_INFO_LOAD(&info[0], PMIX_EVENT_HDLR_NAME, "SIMPTEST-MODEL", PMIX_STRING);
412 code = PMIX_MODEL_DECLARED;
413 PMIx_Register_event_handler(&code, 1, info, ninfo,
414 model_callback, model_registration_callback, (void*)&mylock);
415 DEBUG_WAIT_THREAD(&mylock);
416 PMIX_INFO_FREE(info, ninfo);
417 if (PMIX_SUCCESS != mylock.status) {
418 exit(mylock.status);
419 }
420 DEBUG_DESTRUCT_LOCK(&mylock);
421
422 /* setup the pub data, in case it is used */
423 PMIX_CONSTRUCT(&pubdata, pmix_list_t);
424
425 /* setup to see sigchld on the forked tests */
426 PMIX_CONSTRUCT(&children, pmix_list_t);
427 pmix_event_assign(&handler, pmix_globals.evbase, SIGCHLD,
428 EV_SIGNAL|EV_PERSIST,wait_signal_callback, &handler);
429 pmix_event_add(&handler, NULL);
430
431 /* we have a single namespace for all clients */
432 atmp = NULL;
433 for (n=0; n < nprocs; n++) {
434 asprintf(&tmp, "%d", n);
435 pmix_argv_append_nosize(&atmp, tmp);
436 free(tmp);
437 }
438 tmp = pmix_argv_join(atmp, ',');
439 pmix_argv_free(atmp);
440 x = PMIX_NEW(myxfer_t);
441 set_namespace(nprocs, tmp, "foobar", opcbfunc, x);
442
443 /* set common argv and env */
444 client_env = pmix_argv_copy(environ);
445 pmix_argv_prepend_nosize(&client_argv, executable);
446
447 wakeup = nprocs;
448 myuid = getuid();
449 mygid = getgid();
450
451 /* if the nspace registration hasn't completed yet,
452 * wait for it here */
453 DEBUG_WAIT_THREAD(&x->lock);
454 free(tmp);
455 PMIX_RELEASE(x);
456
457 /* fork/exec the test */
458 (void)strncpy(proc.nspace, "foobar", PMIX_MAX_NSLEN);
459 for (n = 0; n < nprocs; n++) {
460 proc.rank = n;
461 if (PMIX_SUCCESS != (rc = PMIx_server_setup_fork(&proc, &client_env))) {//n
462 fprintf(stderr, "Server fork setup failed with error %d\n", rc);
463 PMIx_server_finalize();
464 return rc;
465 }
466 /* if cross-version test is requested, then oscillate PTL support
467 * by rank */
468 if (cross_version) {
469 if (0 == n % 2) {
470 pmix_setenv("PMIX_MCA_ptl", "tcp", true, &client_env);
471 } else {
472 pmix_setenv("PMIX_MCA_ptl", "usock", true, &client_env);
473 }
474 } else if (!usock) {
475 /* don't disable usock => enable it on client */
476 pmix_setenv("PMIX_MCA_ptl", "usock", true, &client_env);
477 }
478 x = PMIX_NEW(myxfer_t);
479 if (PMIX_SUCCESS != (rc = PMIx_server_register_client(&proc, myuid, mygid,
480 NULL, opcbfunc, x))) {
481 fprintf(stderr, "Server register client failed with error %d\n", rc);
482 PMIx_server_finalize();
483 return rc;
484 }
485 /* don't fork/exec the client until we know it is registered
486 * so we avoid a potential race condition in the server */
487 DEBUG_WAIT_THREAD(&x->lock);
488 PMIX_RELEASE(x);
489 pid = fork();
490 if (pid < 0) {
491 fprintf(stderr, "Fork failed\n");
492 PMIx_server_finalize();
493 return -1;
494 }
495 if (pid == 0) {
496 sigset_t sigs;
497 set_handler_default(SIGTERM);
498 set_handler_default(SIGINT);
499 set_handler_default(SIGHUP);
500 set_handler_default(SIGPIPE);
501 set_handler_default(SIGCHLD);
502 sigprocmask(0, 0, &sigs);
503 sigprocmask(SIG_UNBLOCK, &sigs, 0);
504 execve(executable, client_argv, client_env);
505 /* Does not return */
506 exit(0);
507 } else {
508 child = PMIX_NEW(wait_tracker_t);
509 child->pid = pid;
510 pmix_list_append(&children, &child->super);
511 }
512 }
513 pmix_argv_free(client_argv);
514 pmix_argv_free(client_env);
515
516 /* hang around until the client(s) finalize */
517 while (0 < wakeup) {
518 struct timespec ts;
519 ts.tv_sec = 0;
520 ts.tv_nsec = 100000;
521 nanosleep(&ts, NULL);
522 }
523
524 /* see if anyone exited with non-zero status unless the test
525 * was expected to do so */
526 if (NULL == strstr(executable, "simpdie")) {
527 n=0;
528 PMIX_LIST_FOREACH(child, &children, wait_tracker_t) {
529 if (0 != child->exit_code) {
530 fprintf(stderr, "Child %d [%d] exited with status %d - test FAILED\n", n, child->pid, child->exit_code);
531 }
532 ++n;
533 }
534 } else if (1 == exit_code) {
535 exit_code = 0;
536 }
537 free(executable);
538
539 /* try notifying ourselves */
540 ninfo = 3;
541 PMIX_INFO_CREATE(info, ninfo);
542 PMIX_INFO_LOAD(&info[0], PMIX_PROGRAMMING_MODEL, "PMIX", PMIX_STRING);
543 PMIX_INFO_LOAD(&info[1], PMIX_MODEL_LIBRARY_NAME, "test", PMIX_STRING);
544 /* mark that it is not to go to any default handlers */
545 PMIX_INFO_LOAD(&info[2], PMIX_EVENT_NON_DEFAULT, NULL, PMIX_BOOL);
546 DEBUG_CONSTRUCT_LOCK(&globallock);
547 PMIx_Notify_event(PMIX_MODEL_DECLARED,
548 &pmix_globals.myid, PMIX_RANGE_PROC_LOCAL,
549 info, ninfo, NULL, NULL);
550 DEBUG_WAIT_THREAD(&globallock);
551 DEBUG_DESTRUCT_LOCK(&globallock);
552 PMIX_INFO_FREE(info, ninfo);
553
554 /* deregister the event handlers */
555 PMIx_Deregister_event_handler(0, NULL, NULL);
556
557 /* release any pub data */
558 PMIX_LIST_DESTRUCT(&pubdata);
559
560 /* release the child tracker */
561 PMIX_LIST_DESTRUCT(&children);
562
563 /* finalize the server library */
564 if (PMIX_SUCCESS != (rc = PMIx_server_finalize())) {
565 fprintf(stderr, "Finalize failed with error %d\n", rc);
566 exit_code = rc;
567 }
568
569 if (0 == exit_code) {
570 fprintf(stderr, "Test finished OK!\n");
571 } else {
572 fprintf(stderr, "TEST FAILED WITH ERROR %d\n", exit_code);
573 }
574
575 return exit_code;
576 }
577
set_namespace(int nprocs,char * ranks,char * nspace,pmix_op_cbfunc_t cbfunc,myxfer_t * x)578 static void set_namespace(int nprocs, char *ranks, char *nspace,
579 pmix_op_cbfunc_t cbfunc, myxfer_t *x)
580 {
581 char *regex, *ppn;
582 int n, m, k;
583 pmix_data_array_t *array;
584 pmix_info_t *info, *iptr;
585
586 if (arrays) {
587 x->ninfo = 15 + nprocs;
588 } else {
589 x->ninfo = 16 + nprocs;
590 }
591
592 PMIX_INFO_CREATE(x->info, x->ninfo);
593 n = 0;
594
595 PMIx_generate_regex("test000,test001,test002", ®ex);
596 PMIx_generate_ppn("0;1;2", &ppn);
597
598 if (arrays) {
599 (void)strncpy(x->info[n].key, PMIX_JOB_INFO_ARRAY, PMIX_MAX_KEYLEN);
600 x->info[n].value.type = PMIX_DATA_ARRAY;
601 PMIX_DATA_ARRAY_CREATE(x->info[n].value.data.darray, 2, PMIX_INFO);
602 iptr = (pmix_info_t*)x->info[n].value.data.darray->array;
603 (void)strncpy(iptr[0].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
604 iptr[0].value.type = PMIX_STRING;
605 iptr[0].value.data.string = regex;
606 (void)strncpy(iptr[1].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
607 iptr[1].value.type = PMIX_STRING;
608 iptr[1].value.data.string = ppn;
609 ++n;
610 } else {
611 (void)strncpy(x->info[n].key, PMIX_NODE_MAP, PMIX_MAX_KEYLEN);
612 x->info[n].value.type = PMIX_STRING;
613 x->info[n].value.data.string = regex;
614 ++n;
615
616 /* if we have some empty nodes, then fill their spots */
617 (void)strncpy(x->info[n].key, PMIX_PROC_MAP, PMIX_MAX_KEYLEN);
618 x->info[n].value.type = PMIX_STRING;
619 x->info[n].value.data.string = ppn;
620 ++n;
621 }
622
623 (void)strncpy(x->info[n].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
624 x->info[n].value.type = PMIX_UINT32;
625 x->info[n].value.data.uint32 = nprocs;
626 ++n;
627
628 (void)strncpy(x->info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN);
629 x->info[n].value.type = PMIX_UINT32;
630 x->info[n].value.data.uint32 = 0;
631 ++n;
632
633 (void)strncpy(x->info[n].key, PMIX_LOCAL_SIZE, PMIX_MAX_KEYLEN);
634 x->info[n].value.type = PMIX_UINT32;
635 x->info[n].value.data.uint32 = nprocs;
636 ++n;
637
638 (void)strncpy(x->info[n].key, PMIX_LOCAL_PEERS, PMIX_MAX_KEYLEN);
639 x->info[n].value.type = PMIX_STRING;
640 x->info[n].value.data.string = strdup(ranks);
641 ++n;
642
643 (void)strncpy(x->info[n].key, PMIX_JOB_SIZE, PMIX_MAX_KEYLEN);
644 x->info[n].value.type = PMIX_UINT32;
645 x->info[n].value.data.uint32 = nprocs;
646 ++n;
647
648 (void)strncpy(x->info[n].key, PMIX_JOBID, PMIX_MAX_KEYLEN);
649 x->info[n].value.type = PMIX_STRING;
650 x->info[n].value.data.string = strdup("1234");
651 ++n;
652
653 (void)strncpy(x->info[n].key, PMIX_NPROC_OFFSET, PMIX_MAX_KEYLEN);
654 x->info[n].value.type = PMIX_UINT32;
655 x->info[n].value.data.uint32 = 0;
656 ++n;
657
658 (void)strncpy(x->info[n].key, PMIX_NODEID, PMIX_MAX_KEYLEN);
659 x->info[n].value.type = PMIX_UINT32;
660 x->info[n].value.data.uint32 = 0;
661 ++n;
662
663 (void)strncpy(x->info[n].key, PMIX_NODE_SIZE, PMIX_MAX_KEYLEN);
664 x->info[n].value.type = PMIX_UINT32;
665 x->info[n].value.data.uint32 = nprocs;
666 ++n;
667
668 (void)strncpy(x->info[n].key, PMIX_NUM_NODES, PMIX_MAX_KEYLEN);
669 x->info[n].value.type = PMIX_UINT32;
670 x->info[n].value.data.uint32 = 1;
671 ++n;
672
673 (void)strncpy(x->info[n].key, PMIX_UNIV_SIZE, PMIX_MAX_KEYLEN);
674 x->info[n].value.type = PMIX_UINT32;
675 x->info[n].value.data.uint32 = nprocs;
676 ++n;
677
678 (void)strncpy(x->info[n].key, PMIX_MAX_PROCS, PMIX_MAX_KEYLEN);
679 x->info[n].value.type = PMIX_UINT32;
680 x->info[n].value.data.uint32 = nprocs;
681 ++n;
682
683 (void)strncpy(x->info[n].key, PMIX_JOB_NUM_APPS, PMIX_MAX_KEYLEN);
684 x->info[n].value.type = PMIX_UINT32;
685 x->info[n].value.data.uint32 = 1;
686 ++n;
687
688 (void)strncpy(x->info[n].key, PMIX_LOCALLDR, PMIX_MAX_KEYLEN);
689 x->info[n].value.type = PMIX_PROC_RANK;
690 x->info[n].value.data.uint32 = 0;
691 ++n;
692
693 /* add the proc-specific data */
694 for (m=0; m < nprocs; m++) {
695 (void)strncpy(x->info[n].key, PMIX_PROC_DATA, PMIX_MAX_KEYLEN);
696 x->info[n].value.type = PMIX_DATA_ARRAY;
697 PMIX_DATA_ARRAY_CREATE(array, 5, PMIX_INFO);
698 x->info[n].value.data.darray = array;
699 info = (pmix_info_t*)array->array;
700 k = 0;
701 (void)strncpy(info[k].key, PMIX_RANK, PMIX_MAX_KEYLEN);
702 info[k].value.type = PMIX_PROC_RANK;
703 info[k].value.data.rank = m;
704 ++k;
705 (void)strncpy(info[k].key, PMIX_GLOBAL_RANK, PMIX_MAX_KEYLEN);
706 info[k].value.type = PMIX_PROC_RANK;
707 info[k].value.data.rank = m;
708 ++k;
709 (void)strncpy(info[k].key, PMIX_LOCAL_RANK, PMIX_MAX_KEYLEN);
710 info[k].value.type = PMIX_UINT16;
711 info[k].value.data.uint16 = m;
712 ++k;
713
714 (void)strncpy(info[k].key, PMIX_NODE_RANK, PMIX_MAX_KEYLEN);
715 info[k].value.type = PMIX_UINT16;
716 info[k].value.data.uint16 = m;
717 ++k;
718
719 (void)strncpy(info[k].key, PMIX_NODEID, PMIX_MAX_KEYLEN);
720 info[k].value.type = PMIX_UINT32;
721 info[k].value.data.uint32 = 0;
722 ++k;
723 /* move to next proc */
724 ++n;
725 }
726 PMIx_server_register_nspace(nspace, nprocs, x->info, x->ninfo,
727 cbfunc, x);
728 }
729
errhandler(size_t evhdlr_registration_id,pmix_status_t status,const pmix_proc_t * source,pmix_info_t info[],size_t ninfo,pmix_info_t results[],size_t nresults,pmix_event_notification_cbfunc_fn_t cbfunc,void * cbdata)730 static void errhandler(size_t evhdlr_registration_id,
731 pmix_status_t status,
732 const pmix_proc_t *source,
733 pmix_info_t info[], size_t ninfo,
734 pmix_info_t results[], size_t nresults,
735 pmix_event_notification_cbfunc_fn_t cbfunc,
736 void *cbdata)
737 {
738 pmix_output(0, "SERVER: ERRHANDLER CALLED WITH STATUS %d", status);
739 /* we must NOT tell the event handler state machine that we
740 * are the last step as that will prevent it from notifying
741 * anyone else that might be listening for declarations */
742 if (NULL != cbfunc) {
743 cbfunc(PMIX_SUCCESS, NULL, 0, NULL, NULL, cbdata);
744 }
745 }
746
errhandler_reg_callbk(pmix_status_t status,size_t errhandler_ref,void * cbdata)747 static void errhandler_reg_callbk (pmix_status_t status,
748 size_t errhandler_ref,
749 void *cbdata)
750 {
751 mylock_t *lock = (mylock_t*)cbdata;
752
753 lock->status = status;
754 DEBUG_WAKEUP_THREAD(lock);
755 }
756
connected(const pmix_proc_t * proc,void * server_object,pmix_op_cbfunc_t cbfunc,void * cbdata)757 static pmix_status_t connected(const pmix_proc_t *proc, void *server_object,
758 pmix_op_cbfunc_t cbfunc, void *cbdata)
759 {
760 return PMIX_OPERATION_SUCCEEDED;
761 }
finalized(const pmix_proc_t * proc,void * server_object,pmix_op_cbfunc_t cbfunc,void * cbdata)762 static pmix_status_t finalized(const pmix_proc_t *proc, void *server_object,
763 pmix_op_cbfunc_t cbfunc, void *cbdata)
764 {
765 return PMIX_OPERATION_SUCCEEDED;
766 }
767
abcbfunc(pmix_status_t status,void * cbdata)768 static void abcbfunc(pmix_status_t status, void *cbdata)
769 {
770 myxfer_t *x = (myxfer_t*)cbdata;
771
772 /* be sure to release the caller */
773 if (NULL != x->cbfunc) {
774 x->cbfunc(status, x->cbdata);
775 }
776 PMIX_RELEASE(x);
777 }
778
abort_fn(const pmix_proc_t * proc,void * server_object,int status,const char msg[],pmix_proc_t procs[],size_t nprocs,pmix_op_cbfunc_t cbfunc,void * cbdata)779 static pmix_status_t abort_fn(const pmix_proc_t *proc,
780 void *server_object,
781 int status, const char msg[],
782 pmix_proc_t procs[], size_t nprocs,
783 pmix_op_cbfunc_t cbfunc, void *cbdata)
784 {
785 pmix_status_t rc;
786 myxfer_t *x;
787
788 if (NULL != procs) {
789 pmix_output(0, "SERVER: ABORT on %s:%d", procs[0].nspace, procs[0].rank);
790 } else {
791 pmix_output(0, "SERVER: ABORT OF ALL PROCS IN NSPACE %s", proc->nspace);
792 }
793
794 /* instead of aborting the specified procs, notify them
795 * (if they have registered their errhandler) */
796
797 /* use the myxfer_t object to ensure we release
798 * the caller when notification has been queued */
799 x = PMIX_NEW(myxfer_t);
800 (void)strncpy(x->caller.nspace, proc->nspace, PMIX_MAX_NSLEN);
801 x->caller.rank = proc->rank;
802
803 PMIX_INFO_CREATE(x->info, 2);
804 (void)strncpy(x->info[0].key, "DARTH", PMIX_MAX_KEYLEN);
805 x->info[0].value.type = PMIX_INT8;
806 x->info[0].value.data.int8 = 12;
807 (void)strncpy(x->info[1].key, "VADER", PMIX_MAX_KEYLEN);
808 x->info[1].value.type = PMIX_DOUBLE;
809 x->info[1].value.data.dval = 12.34;
810 x->cbfunc = cbfunc;
811 x->cbdata = cbdata;
812
813 if (PMIX_SUCCESS != (rc = PMIx_Notify_event(status, &x->caller,
814 PMIX_RANGE_NAMESPACE,
815 x->info, 2,
816 abcbfunc, x))) {
817 pmix_output(0, "SERVER: FAILED NOTIFY ERROR %d", (int)rc);
818 }
819
820 return PMIX_SUCCESS;
821 }
822
fencbfn(int sd,short args,void * cbdata)823 static void fencbfn(int sd, short args, void *cbdata)
824 {
825 pmix_shift_caddy_t *scd = (pmix_shift_caddy_t*)cbdata;
826
827 /* pass the provided data back to each participating proc */
828 if (NULL != scd->cbfunc.modexcbfunc) {
829 scd->cbfunc.modexcbfunc(scd->status, scd->data, scd->ndata, scd->cbdata, NULL, NULL);
830 }
831 PMIX_RELEASE(scd);
832 }
fencenb_fn(const pmix_proc_t procs[],size_t nprocs,const pmix_info_t info[],size_t ninfo,char * data,size_t ndata,pmix_modex_cbfunc_t cbfunc,void * cbdata)833 static pmix_status_t fencenb_fn(const pmix_proc_t procs[], size_t nprocs,
834 const pmix_info_t info[], size_t ninfo,
835 char *data, size_t ndata,
836 pmix_modex_cbfunc_t cbfunc, void *cbdata)
837 {
838 pmix_shift_caddy_t *scd;
839
840 scd = PMIX_NEW(pmix_shift_caddy_t);
841 scd->status = PMIX_SUCCESS;
842 scd->data = data;
843 scd->ndata = ndata;
844 scd->cbfunc.modexcbfunc = cbfunc;
845 scd->cbdata = cbdata;
846 PMIX_THREADSHIFT(scd, fencbfn);
847 return PMIX_SUCCESS;
848 }
849
850
dmodex_fn(const pmix_proc_t * proc,const pmix_info_t info[],size_t ninfo,pmix_modex_cbfunc_t cbfunc,void * cbdata)851 static pmix_status_t dmodex_fn(const pmix_proc_t *proc,
852 const pmix_info_t info[], size_t ninfo,
853 pmix_modex_cbfunc_t cbfunc, void *cbdata)
854 {
855 pmix_shift_caddy_t *scd;
856
857 /* if this is a timeout test, then do nothing */
858 if (istimeouttest) {
859 return PMIX_SUCCESS;
860 }
861
862 scd = PMIX_NEW(pmix_shift_caddy_t);
863 scd->status = PMIX_ERR_NOT_FOUND;
864 scd->cbfunc.modexcbfunc = cbfunc;
865 scd->cbdata = cbdata;
866 PMIX_THREADSHIFT(scd, fencbfn);
867
868 return PMIX_SUCCESS;
869 }
870
871
publish_fn(const pmix_proc_t * proc,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)872 static pmix_status_t publish_fn(const pmix_proc_t *proc,
873 const pmix_info_t info[], size_t ninfo,
874 pmix_op_cbfunc_t cbfunc, void *cbdata)
875 {
876 pmix_locdat_t *p;
877 size_t n;
878
879 for (n=0; n < ninfo; n++) {
880 p = PMIX_NEW(pmix_locdat_t);
881 (void)strncpy(p->pdata.proc.nspace, proc->nspace, PMIX_MAX_NSLEN);
882 p->pdata.proc.rank = proc->rank;
883 (void)strncpy(p->pdata.key, info[n].key, PMIX_MAX_KEYLEN);
884 pmix_value_xfer(&p->pdata.value, (pmix_value_t*)&info[n].value);
885 pmix_list_append(&pubdata, &p->super);
886 }
887
888 return PMIX_OPERATION_SUCCEEDED;
889 }
890
891 typedef struct {
892 pmix_event_t ev;
893 pmix_pdata_t *pd;
894 size_t n;
895 pmix_lookup_cbfunc_t cbfunc;
896 void *cbdata;
897 } lkobj_t;
898
lkcbfn(int sd,short args,void * cbdata)899 static void lkcbfn(int sd, short args, void *cbdata)
900 {
901 lkobj_t *lk = (lkobj_t*)cbdata;
902
903 lk->cbfunc(PMIX_SUCCESS, lk->pd, lk->n, lk->cbdata);
904 PMIX_PDATA_FREE(lk->pd, lk->n);
905 free(lk);
906 }
907
lookup_fn(const pmix_proc_t * proc,char ** keys,const pmix_info_t info[],size_t ninfo,pmix_lookup_cbfunc_t cbfunc,void * cbdata)908 static pmix_status_t lookup_fn(const pmix_proc_t *proc, char **keys,
909 const pmix_info_t info[], size_t ninfo,
910 pmix_lookup_cbfunc_t cbfunc, void *cbdata)
911 {
912 pmix_locdat_t *p, *p2;
913 pmix_list_t results;
914 size_t i, n;
915 pmix_pdata_t *pd = NULL;
916 pmix_status_t ret = PMIX_ERR_NOT_FOUND;
917 lkobj_t *lk;
918
919 PMIX_CONSTRUCT(&results, pmix_list_t);
920
921 for (n=0; NULL != keys[n]; n++) {
922 PMIX_LIST_FOREACH(p, &pubdata, pmix_locdat_t) {
923 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
924 p2 = PMIX_NEW(pmix_locdat_t);
925 (void)strncpy(p2->pdata.proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
926 p2->pdata.proc.rank = p->pdata.proc.rank;
927 (void)strncpy(p2->pdata.key, p->pdata.key, PMIX_MAX_KEYLEN);
928 pmix_value_xfer(&p2->pdata.value, &p->pdata.value);
929 pmix_list_append(&results, &p2->super);
930 break;
931 }
932 }
933 }
934 if (0 < (n = pmix_list_get_size(&results))) {
935 ret = PMIX_SUCCESS;
936 PMIX_PDATA_CREATE(pd, n);
937 for (i=0; i < n; i++) {
938 p = (pmix_locdat_t*)pmix_list_remove_first(&results);
939 if (p) {
940 (void)strncpy(pd[i].proc.nspace, p->pdata.proc.nspace, PMIX_MAX_NSLEN);
941 pd[i].proc.rank = p->pdata.proc.rank;
942 (void)strncpy(pd[i].key, p->pdata.key, PMIX_MAX_KEYLEN);
943 pmix_value_xfer(&pd[i].value, &p->pdata.value);
944 }
945 }
946 }
947 PMIX_LIST_DESTRUCT(&results);
948 if (PMIX_SUCCESS == ret) {
949 lk = (lkobj_t*)malloc(sizeof(lkobj_t));
950 lk->pd = pd;
951 lk->n = n;
952 lk->cbfunc = cbfunc;
953 lk->cbdata = cbdata;
954 PMIX_THREADSHIFT(lk, lkcbfn);
955 }
956
957 return ret;
958 }
959
960
unpublish_fn(const pmix_proc_t * proc,char ** keys,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)961 static pmix_status_t unpublish_fn(const pmix_proc_t *proc, char **keys,
962 const pmix_info_t info[], size_t ninfo,
963 pmix_op_cbfunc_t cbfunc, void *cbdata)
964 {
965 pmix_locdat_t *p, *p2;
966 size_t n;
967
968 for (n=0; NULL != keys[n]; n++) {
969 PMIX_LIST_FOREACH_SAFE(p, p2, &pubdata, pmix_locdat_t) {
970 if (0 == strncmp(keys[n], p->pdata.key, PMIX_MAX_KEYLEN)) {
971 pmix_list_remove_item(&pubdata, &p->super);
972 PMIX_RELEASE(p);
973 break;
974 }
975 }
976 }
977 return PMIX_OPERATION_SUCCEEDED;
978 }
979
spcbfunc(pmix_status_t status,void * cbdata)980 static void spcbfunc(pmix_status_t status, void *cbdata)
981 {
982 myxfer_t *x = (myxfer_t*)cbdata;
983
984 if (NULL != x->spcbfunc) {
985 x->spcbfunc(PMIX_SUCCESS, "DYNSPACE", x->cbdata);
986 }
987 }
988
spawn_fn(const pmix_proc_t * proc,const pmix_info_t job_info[],size_t ninfo,const pmix_app_t apps[],size_t napps,pmix_spawn_cbfunc_t cbfunc,void * cbdata)989 static pmix_status_t spawn_fn(const pmix_proc_t *proc,
990 const pmix_info_t job_info[], size_t ninfo,
991 const pmix_app_t apps[], size_t napps,
992 pmix_spawn_cbfunc_t cbfunc, void *cbdata)
993 {
994 myxfer_t *x;
995 size_t n;
996 pmix_proc_t *pptr;
997 bool spawned;
998
999 /* check the job info for parent and spawned keys */
1000 for (n=0; n < ninfo; n++) {
1001 if (0 == strncmp(job_info[n].key, PMIX_PARENT_ID, PMIX_MAX_KEYLEN)) {
1002 pptr = job_info[n].value.data.proc;
1003 pmix_output(0, "SPAWN: Parent ID %s:%d", pptr->nspace, pptr->rank);
1004 } else if (0 == strncmp(job_info[n].key, PMIX_SPAWNED, PMIX_MAX_KEYLEN)) {
1005 spawned = PMIX_INFO_TRUE(&job_info[n]);
1006 pmix_output(0, "SPAWN: Spawned %s", spawned ? "TRUE" : "FALSE");
1007 }
1008 }
1009
1010 /* in practice, we would pass this request to the local
1011 * resource manager for launch, and then have that server
1012 * execute our callback function. For now, we will fake
1013 * the spawn and just pretend */
1014
1015 /* must register the nspace for the new procs before
1016 * we return to the caller */
1017 x = PMIX_NEW(myxfer_t);
1018 x->spcbfunc = cbfunc;
1019 x->cbdata = cbdata;
1020
1021 set_namespace(2, "0,1", "DYNSPACE", spcbfunc, x);
1022
1023 return PMIX_SUCCESS;
1024 }
1025
1026 static int numconnects = 0;
1027
connect_fn(const pmix_proc_t procs[],size_t nprocs,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1028 static pmix_status_t connect_fn(const pmix_proc_t procs[], size_t nprocs,
1029 const pmix_info_t info[], size_t ninfo,
1030 pmix_op_cbfunc_t cbfunc, void *cbdata)
1031 {
1032 /* in practice, we would pass this request to the local
1033 * resource manager for handling */
1034
1035 numconnects++;
1036
1037 return PMIX_OPERATION_SUCCEEDED;
1038 }
1039
1040
disconnect_fn(const pmix_proc_t procs[],size_t nprocs,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1041 static pmix_status_t disconnect_fn(const pmix_proc_t procs[], size_t nprocs,
1042 const pmix_info_t info[], size_t ninfo,
1043 pmix_op_cbfunc_t cbfunc, void *cbdata)
1044 {
1045 return PMIX_OPERATION_SUCCEEDED;
1046 }
1047
register_event_fn(pmix_status_t * codes,size_t ncodes,const pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1048 static pmix_status_t register_event_fn(pmix_status_t *codes, size_t ncodes,
1049 const pmix_info_t info[], size_t ninfo,
1050 pmix_op_cbfunc_t cbfunc, void *cbdata)
1051 {
1052 return PMIX_OPERATION_SUCCEEDED;
1053 }
1054
deregister_events(pmix_status_t * codes,size_t ncodes,pmix_op_cbfunc_t cbfunc,void * cbdata)1055 static pmix_status_t deregister_events(pmix_status_t *codes, size_t ncodes,
1056 pmix_op_cbfunc_t cbfunc, void *cbdata)
1057 {
1058 return PMIX_OPERATION_SUCCEEDED;
1059 }
1060
notify_event(pmix_status_t code,const pmix_proc_t * source,pmix_data_range_t range,pmix_info_t info[],size_t ninfo,pmix_op_cbfunc_t cbfunc,void * cbdata)1061 static pmix_status_t notify_event(pmix_status_t code,
1062 const pmix_proc_t *source,
1063 pmix_data_range_t range,
1064 pmix_info_t info[], size_t ninfo,
1065 pmix_op_cbfunc_t cbfunc, void *cbdata)
1066 {
1067 return PMIX_OPERATION_SUCCEEDED;
1068 }
1069
1070 typedef struct query_data_t {
1071 pmix_event_t ev;
1072 pmix_info_t *data;
1073 size_t ndata;
1074 pmix_info_cbfunc_t cbfunc;
1075 void *cbdata;
1076 } query_data_t;
1077
qfn(int sd,short args,void * cbdata)1078 static void qfn(int sd, short args, void *cbdata)
1079 {
1080 query_data_t *qd = (query_data_t*)cbdata;
1081
1082 qd->cbfunc(PMIX_SUCCESS, qd->data, qd->ndata, qd->cbdata, NULL, NULL);
1083 PMIX_INFO_FREE(qd->data, qd->ndata);
1084 }
1085
query_fn(pmix_proc_t * proct,pmix_query_t * queries,size_t nqueries,pmix_info_cbfunc_t cbfunc,void * cbdata)1086 static pmix_status_t query_fn(pmix_proc_t *proct,
1087 pmix_query_t *queries, size_t nqueries,
1088 pmix_info_cbfunc_t cbfunc,
1089 void *cbdata)
1090 {
1091 size_t n;
1092 pmix_info_t *info;
1093 query_data_t qd;
1094
1095 if (NULL == cbfunc) {
1096 return PMIX_ERROR;
1097 }
1098 /* keep this simple */
1099 PMIX_INFO_CREATE(info, nqueries);
1100 for (n=0; n < nqueries; n++) {
1101 pmix_output(0, "\tKey: %s", queries[n].keys[0]);
1102 (void)strncpy(info[n].key, queries[n].keys[0], PMIX_MAX_KEYLEN);
1103 info[n].value.type = PMIX_STRING;
1104 if (0 > asprintf(&info[n].value.data.string, "%d", (int)n)) {
1105 return PMIX_ERROR;
1106 }
1107 }
1108 qd.data = info;
1109 qd.ndata = nqueries;
1110 qd.cbfunc = cbfunc;
1111 qd.cbdata = cbdata;
1112 PMIX_THREADSHIFT(&qd, qfn);
1113 return PMIX_SUCCESS;
1114 }
1115
tool_connect_fn(pmix_info_t * info,size_t ninfo,pmix_tool_connection_cbfunc_t cbfunc,void * cbdata)1116 static void tool_connect_fn(pmix_info_t *info, size_t ninfo,
1117 pmix_tool_connection_cbfunc_t cbfunc,
1118 void *cbdata)
1119 {
1120 pmix_proc_t proc;
1121
1122 /* just pass back an arbitrary nspace */
1123 (void)strncpy(proc.nspace, "TOOL", PMIX_MAX_NSLEN);
1124 proc.rank = 0;
1125
1126 if (NULL != cbfunc) {
1127 cbfunc(PMIX_SUCCESS, &proc, cbdata);
1128 }
1129 }
1130
1131 typedef struct {
1132 pmix_event_t ev;
1133 pmix_op_cbfunc_t cbfunc;
1134 void *cbdata;
1135 } mylog_t;
1136
foobar(int sd,short args,void * cbdata)1137 static void foobar(int sd, short args, void *cbdata)
1138 {
1139 mylog_t *lg = (mylog_t*)cbdata;
1140 lg->cbfunc(PMIX_SUCCESS, lg->cbdata);
1141 }
log_fn(const pmix_proc_t * client,const pmix_info_t data[],size_t ndata,const pmix_info_t directives[],size_t ndirs,pmix_op_cbfunc_t cbfunc,void * cbdata)1142 static void log_fn(const pmix_proc_t *client,
1143 const pmix_info_t data[], size_t ndata,
1144 const pmix_info_t directives[], size_t ndirs,
1145 pmix_op_cbfunc_t cbfunc, void *cbdata)
1146 {
1147 mylog_t *lg = (mylog_t *)malloc(sizeof(mylog_t));
1148
1149 lg->cbfunc = cbfunc;
1150 lg->cbdata = cbdata;
1151 PMIX_THREADSHIFT(lg, foobar);
1152 }
1153
alloc_fn(const pmix_proc_t * client,pmix_alloc_directive_t directive,const pmix_info_t data[],size_t ndata,pmix_info_cbfunc_t cbfunc,void * cbdata)1154 static pmix_status_t alloc_fn(const pmix_proc_t *client,
1155 pmix_alloc_directive_t directive,
1156 const pmix_info_t data[], size_t ndata,
1157 pmix_info_cbfunc_t cbfunc, void *cbdata)
1158 {
1159 return PMIX_OPERATION_SUCCEEDED;
1160 }
1161
jctrl_fn(const pmix_proc_t * requestor,const pmix_proc_t targets[],size_t ntargets,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)1162 static pmix_status_t jctrl_fn(const pmix_proc_t *requestor,
1163 const pmix_proc_t targets[], size_t ntargets,
1164 const pmix_info_t directives[], size_t ndirs,
1165 pmix_info_cbfunc_t cbfunc, void *cbdata)
1166 {
1167 return PMIX_OPERATION_SUCCEEDED;
1168 }
1169
mon_fn(const pmix_proc_t * requestor,const pmix_info_t * monitor,pmix_status_t error,const pmix_info_t directives[],size_t ndirs,pmix_info_cbfunc_t cbfunc,void * cbdata)1170 static pmix_status_t mon_fn(const pmix_proc_t *requestor,
1171 const pmix_info_t *monitor, pmix_status_t error,
1172 const pmix_info_t directives[], size_t ndirs,
1173 pmix_info_cbfunc_t cbfunc, void *cbdata)
1174 {
1175 return PMIX_ERR_NOT_SUPPORTED;
1176 }
1177
1178
wait_signal_callback(int fd,short event,void * arg)1179 static void wait_signal_callback(int fd, short event, void *arg)
1180 {
1181 pmix_event_t *sig = (pmix_event_t*) arg;
1182 int status;
1183 pid_t pid;
1184 wait_tracker_t *t2;
1185
1186 if (SIGCHLD != event_get_signal(sig)) {
1187 return;
1188 }
1189
1190 /* we can have multiple children leave but only get one
1191 * sigchild callback, so reap all the waitpids until we
1192 * don't get anything valid back */
1193 while (1) {
1194 pid = waitpid(-1, &status, WNOHANG);
1195 if (-1 == pid && EINTR == errno) {
1196 /* try it again */
1197 continue;
1198 }
1199 /* if we got garbage, then nothing we can do */
1200 if (pid <= 0) {
1201 return;
1202 }
1203
1204 /* we are already in an event, so it is safe to access the list */
1205 PMIX_LIST_FOREACH(t2, &children, wait_tracker_t) {
1206 if (pid == t2->pid) {
1207 /* found it! */
1208 if (WIFEXITED(status)) {
1209 t2->exit_code = WEXITSTATUS(status);
1210 } else {
1211 if (WIFSIGNALED(status)) {
1212 t2->exit_code = WTERMSIG(status) + 128;
1213 }
1214 }
1215 if (0 != t2->exit_code && 0 == exit_code) {
1216 exit_code = t2->exit_code;
1217 }
1218 --wakeup;
1219 break;
1220 }
1221 }
1222 }
1223 fprintf(stderr, "ENDLOOP\n");
1224 }
1225