1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * (C) 2001 by Argonne National Laboratory.
4 * See COPYRIGHT in top-level directory.
5 */
6 #include "ipmi.h"
7 #ifdef HAVE_CTYPE_H
8 #include <ctype.h>
9 #endif
10
11 /* pmiimpl.h */
12
13 static int root_smpd(void *p);
14
15 /* Define to prevent an smpd root thread or process from being created when
16 there is only one process. */
17 /* Currently, defining this prevents the use of the spawn command. */
18 /*#define SINGLE_PROCESS_OPTIMIZATION*/
19
20 #define PMI_MAX_KEY_LEN 256
21 #define PMI_MAX_VALUE_LEN 8192
22 #define PMI_MAX_KVS_NAME_LENGTH 100
23 #define PMI_MAX_HOST_NAME_LENGTH 100
24 #define PMI_MAX_STR_VAL_LENGTH 100
25
26 #ifdef HAVE_WINDOWS_H
27 #define PMII_PROCESS_HANDLE_TYPE HANDLE
28 #define PMII_PROCESS_INVALID_HANDLE NULL
29 #else
30 #define PMII_PROCESS_HANDLE_TYPE int
31 #define PMII_PROCESS_INVALID_HANDLE -1
32 #endif
33
34 typedef enum {PMI_UNINITIALIZED,
35 PMI_SINGLETON_INIT_BUT_NO_PM,
36 PMI_SINGLETON_INIT_WITH_PM,
37 PMI_INITIALIZED,
38 PMI_FINALIZED} PMIState;
39
40 /*
41 #define PMI_INITIALIZED 0
42 #define PMI_FINALIZED 1
43 */
44
45 #define PMI_TRUE 1
46 #define PMI_FALSE 0
47
48 typedef struct pmi_process_t
49 {
50 int rpmi;
51 #ifdef HAVE_WINDOWS_H
52 HANDLE hRootThread;
53 HANDLE hRootThreadReadyEvent;
54 #else
55 int root_pid;
56 #endif
57 char root_host[100];
58 int root_port;
59 int local_kvs;
60 char kvs_name[PMI_MAX_KVS_NAME_LENGTH];
61 char domain_name[PMI_MAX_KVS_NAME_LENGTH];
62 SMPDU_Sock_t sock;
63 SMPDU_Sock_set_t set;
64 int iproc;
65 int nproc;
66 PMIState init_finalized;
67 int smpd_id;
68 SMPDU_SOCK_NATIVE_FD smpd_fd;
69 int smpd_key;
70 smpd_context_t *context;
71 int clique_size;
72 int *clique_ranks;
73 char host[PMI_MAX_HOST_NAME_LENGTH];
74 int port;
75 int appnum;
76 PMII_PROCESS_HANDLE_TYPE singleton_mpiexec_fd;
77 char kvs_name_singleton_nopm[PMI_MAX_KVS_NAME_LENGTH];
78 } pmi_process_t;
79
80 /* global variables */
81 static pmi_process_t pmi_process =
82 {
83 PMI_FALSE, /* rpmi */
84 #ifdef HAVE_WINDOWS_H
85 NULL, /* root thread */
86 NULL, /* hRootThreadReadyEvent */
87 #else
88 0, /* root pid */
89 #endif
90 "", /* root host */
91 0, /* root port */
92 PMI_FALSE, /* local_kvs */
93 "", /* kvs_name */
94 "", /* domain_name */
95 SMPDU_SOCK_INVALID_SOCK, /* sock */
96 SMPDU_SOCK_INVALID_SET, /* set */
97 -1, /* iproc */
98 -1, /* nproc */
99 PMI_UNINITIALIZED, /* init_finalized */
100 -1, /* smpd_id */
101 0, /* smpd_fd */
102 0, /* smpd_key */
103 NULL, /* context */
104 0, /* clique_size */
105 NULL, /* clique_ranks */
106 "", /* host */
107 -1, /* port */
108 0, /* appnum */
109 PMII_PROCESS_INVALID_HANDLE, /* singleton mpiexec proc handle/pid */
110 "" /* kvs_name of singleton proc with no PM */
111 };
112
113
pmi_init_printf(void)114 void pmi_init_printf(void)
115 {
116 char *env;
117
118 env = getenv("SMPD_DBG_OUTPUT");
119 if(env != NULL){
120 /* We only support tracing for now */
121 smpd_process.verbose = SMPD_TRUE;
122 smpd_process.dbg_state |= SMPD_DBG_STATE_ERROUT | SMPD_DBG_STATE_STDOUT | SMPD_DBG_STATE_TRACE;
123 }
124 }
125
126 static int silence = 0;
pmi_err_printf(char * str,...)127 static int pmi_err_printf(char *str, ...)
128 {
129 int n=0;
130 va_list list;
131
132 if (!silence)
133 {
134 printf("[%d] ", pmi_process.iproc);
135 va_start(list, str);
136 n = vprintf(str, list);
137 va_end(list);
138
139 fflush(stdout);
140 }
141
142 return n;
143 }
144
145 #ifdef PMII_DEBUG_
pmi_dbg_printf(char * str,...)146 static int pmi_dbg_printf(char *str, ...)
147 {
148 int n=0;
149 va_list list;
150
151 printf("[%d] ", pmi_process.iproc);
152 va_start(list, str);
153 n = vprintf(str, list);
154 va_end(list);
155 fflush(stdout);
156
157 return n;
158 }
159 #else
160 # define pmi_dbg_printf(...) 1
161 #endif
162
pmi_mpi_err_printf(int mpi_errno,char * fmt,...)163 static int pmi_mpi_err_printf(int mpi_errno, char *fmt, ... )
164 {
165 int n;
166 va_list list;
167
168 /* convert the error code to a string */
169 printf("mpi_errno: %d\n", mpi_errno);
170
171 printf("[%d] ", pmi_process.iproc);
172 va_start(list, fmt);
173 n = vprintf(fmt, list);
174 va_end(list);
175
176 fflush(stdout);
177
178 MPIR_Err_return_comm(NULL, "", mpi_errno);
179
180 return n;
181 }
182
pmi_create_post_command(const char * command,const char * name,const char * key,const char * value)183 static int pmi_create_post_command(const char *command, const char *name, const char *key, const char *value)
184 {
185 int result;
186 smpd_command_t *cmd_ptr;
187 int dest = 1;
188 int add_id = 0;
189
190 if (!pmi_process.rpmi)
191 {
192 if (strcmp(command, "done") == 0)
193 {
194 /* done commands go to the immediate smpd, not the root */
195 dest = pmi_process.smpd_id;
196 }
197 }
198 if ((strcmp(command, "init") == 0) || (strcmp(command, "finalize") == 0))
199 {
200 add_id = 1;
201 dest = 0;
202 }
203
204 result = smpd_create_command((char*)command, pmi_process.smpd_id, dest, SMPD_TRUE, &cmd_ptr);
205 if (result != SMPD_SUCCESS)
206 {
207 pmi_err_printf("unable to create a %s command.\n", command);
208 return PMI_FAIL;
209 }
210 result = smpd_add_command_int_arg(cmd_ptr, "ctx_key", pmi_process.smpd_key);
211 if (result != SMPD_SUCCESS)
212 {
213 pmi_err_printf("unable to add the key to the %s command.\n", command);
214 return PMI_FAIL;
215 }
216
217 if (name != NULL)
218 {
219 result = smpd_add_command_arg(cmd_ptr, "name", (char*)name);
220 if (result != SMPD_SUCCESS)
221 {
222 pmi_err_printf("unable to add the kvs name('%s') to the %s command.\n", name, command);
223 return PMI_FAIL;
224 }
225 }
226
227 if (key != NULL)
228 {
229 result = smpd_add_command_arg(cmd_ptr, "key", (char*)key);
230 if (result != SMPD_SUCCESS)
231 {
232 pmi_err_printf("unable to add the key('%s') to the %s command.\n", key, command);
233 return PMI_FAIL;
234 }
235 }
236
237 if (value != NULL)
238 {
239 result = smpd_add_command_arg(cmd_ptr, "value", (char*)value);
240 if (result != SMPD_SUCCESS)
241 {
242 pmi_err_printf("unable to add the value('%s') to the %s command.\n", value, command);
243 return PMI_FAIL;
244 }
245 }
246
247 if (add_id)
248 {
249 result = smpd_add_command_int_arg(cmd_ptr, "node_id", pmi_process.smpd_id);
250 if (result != SMPD_SUCCESS)
251 {
252 pmi_err_printf("unable to add the node_id(%d) to the %s command.\n", pmi_process.smpd_id, command);
253 return PMI_FAIL;
254 }
255 }
256
257 /* post the write of the command */
258 /*
259 printf("posting write of dbs command to %s context, sock %d: '%s'\n",
260 smpd_get_context_str(pmi_process.context), SMPDU_Sock_getid(pmi_process.context->sock), cmd_ptr->cmd);
261 fflush(stdout);
262 */
263 /* If proc_info command add the proc_info args */
264 if(strcmp(command, "proc_info") == 0){
265 /* FIXME - Send the actual exe name */
266 result = smpd_add_command_arg(cmd_ptr, "c", "singleton_client");
267 if(result != SMPD_SUCCESS){
268 smpd_err_printf("Unable to add executable name to 'proc_info' cmd\n");
269 }
270 result = smpd_add_command_int_arg(cmd_ptr, "i", pmi_process.iproc);
271 if(result != SMPD_SUCCESS){
272 smpd_err_printf("Unable to add rank to 'proc_info' cmd\n");
273 }
274 result = smpd_add_command_int_arg(cmd_ptr, "n", pmi_process.nproc);
275 if(result != SMPD_SUCCESS){
276 smpd_err_printf("Unable to add nprocs to 'proc_info' cmd\n");
277 }
278 result = smpd_add_command_int_arg(cmd_ptr, "s", smpd_process.is_singleton_client ? 1 : 0);
279 if(result != SMPD_SUCCESS){
280 smpd_err_printf("Unable to add 'is_singleton_client' to 'proc_info' cmd\n");
281 }
282 #ifndef HAVE_WINDOWS_H
283 /* For non-windows systems send the PID in 'proc_info' */
284 /* FIXME: Can we send a pid_t as an int ? */
285 result = smpd_add_command_int_arg(cmd_ptr, "p", getpid());
286 if(result != SMPD_SUCCESS){
287 smpd_err_printf("Unable to add PID to 'proc_info' cmd \n");
288 }
289 #endif
290 }
291
292 result = smpd_post_write_command(pmi_process.context, cmd_ptr);
293 if (result != SMPD_SUCCESS)
294 {
295 pmi_err_printf("unable to post a write of the %s command.\n", command);
296 return PMI_FAIL;
297 }
298 if (strcmp(command, "done"))
299 {
300 /* and post a read for the result if it is not a done command */
301 result = smpd_post_read_command(pmi_process.context);
302 if (result != SMPD_SUCCESS)
303 {
304 pmi_err_printf("unable to post a read of the next command on the pmi context.\n");
305 return PMI_FAIL;
306 }
307 }
308
309 /* let the state machine send the command and receive the result */
310 result = smpd_enter_at_state(pmi_process.set, SMPD_WRITING_CMD);
311 if (result != SMPD_SUCCESS)
312 {
313 pmi_err_printf("the state machine logic failed to get the result of the %s command.\n", command);
314 return PMI_FAIL;
315 }
316 return PMI_SUCCESS;
317 }
318
uPMI_ConnectToHost(char * host,int port,smpd_state_t state)319 static int uPMI_ConnectToHost(char *host, int port, smpd_state_t state)
320 {
321 int result;
322 char error_msg[MPI_MAX_ERROR_STRING];
323 int len;
324
325 /* Make sure that we have the smpd passphrase before connecting to PM */
326 if (smpd_process.passphrase[0] == '\0'){
327 smpd_get_smpd_data("phrase", smpd_process.passphrase, SMPD_PASSPHRASE_MAX_LENGTH);
328 }
329 if (smpd_process.passphrase[0] == '\0'){
330 if (smpd_process.noprompt){
331 pmi_err_printf("Error: No smpd passphrase specified through the registry or .smpd file, exiting.\n");
332 return PMI_FAIL;
333 }
334 else{
335 printf("Please specify an authentication passphrase for smpd: "); fflush(stdout);
336 smpd_get_password(smpd_process.passphrase);
337 }
338 }
339
340 /*printf("posting a connect to %s:%d\n", host, port);fflush(stdout);*/
341 result = smpd_create_context(SMPD_CONTEXT_PMI, pmi_process.set, SMPDU_SOCK_INVALID_SOCK/*pmi_process.sock*/, smpd_process.id, &pmi_process.context);
342 if (result != SMPD_SUCCESS)
343 {
344 pmi_err_printf("PMI_ConnectToHost failed: unable to create a context to connect to %s:%d with.\n", host, port);
345 return PMI_FAIL;
346 }
347
348 result = SMPDU_Sock_post_connect(pmi_process.set, pmi_process.context, host, port, &pmi_process.sock);
349 if (result != SMPD_SUCCESS)
350 {
351 printf("SMPDU_Sock_post_connect failed.\n");fflush(stdout);
352 len = MPI_MAX_ERROR_STRING;
353 PMPI_Error_string(result, error_msg, &len);
354 pmi_err_printf("PMI_ConnectToHost failed: unable to post a connect to %s:%d, error: %s\n", host, port, error_msg);
355 printf("uPMI_ConnectToHost returning PMI_FAIL\n");fflush(stdout);
356 return PMI_FAIL;
357 }
358
359 pmi_process.context->sock = pmi_process.sock;
360 pmi_process.context->state = state;
361
362 result = smpd_enter_at_state(pmi_process.set, state);
363 if (result != MPI_SUCCESS)
364 {
365 pmi_mpi_err_printf(result, "PMI_ConnectToHost failed: unable to connect to %s:%d.\n", host, port);
366 return PMI_FAIL;
367 }
368
369 if (state == SMPD_CONNECTING_RPMI)
370 {
371 /* remote pmi processes receive their smpd_key when they connect to the smpd pmi server */
372 pmi_process.smpd_key = atoi(pmi_process.context->session);
373 }
374
375 return SMPD_SUCCESS;
376 }
377
pmi_create_localKVS(void)378 static int pmi_create_localKVS(void ){
379 /* Its ok to init here since we can only have one local db */
380 if (smpd_dbs_init() != SMPD_DBS_SUCCESS){
381 pmi_err_printf("unable to initialize the local dbs engine.\n");
382 return PMI_FAIL;
383 }
384
385 if (smpd_dbs_create(pmi_process.kvs_name) != SMPD_DBS_SUCCESS){
386 pmi_err_printf("unable to create the process group kvs\n");
387 return PMI_FAIL;
388 }
389 /* smpd_process.domain_name is created in smpd_dbs_init() */
390 MPIU_Strncpy(pmi_process.domain_name, smpd_process.domain_name,
391 PMI_MAX_KVS_NAME_LENGTH);
392 pmi_process.local_kvs = PMI_TRUE;
393 return PMI_SUCCESS;
394 }
395
pmi_destroy_localKVS(void)396 static int pmi_destroy_localKVS(void ){
397 /* Its ok to finalize here since we can only have one local db */
398 if(smpd_dbs_finalize() != SMPD_DBS_SUCCESS){
399 pmi_err_printf("unable to finalize the local dbs engine.\n");
400 return PMI_FAIL;
401 }
402 pmi_process.local_kvs = PMI_FALSE;
403 return PMI_SUCCESS;
404 }
405
406 /* FIXME : Currently only used for singleton init -- mostly only one
407 * pair of (key, val) . Inefficient for large number of (key,val)s
408 */
409
pmi_rsync_localKVS(const char * localKVSName,const char * remoteKVSName)410 static int pmi_rsync_localKVS(const char *localKVSName,
411 const char *remoteKVSName){
412 smpd_dbsIter_t localKVSIter;
413 char key[SMPD_MAX_DBS_KEY_LEN], value[SMPD_MAX_DBS_VALUE_LEN];
414 if(smpd_dbsIter_init(localKVSName, &localKVSIter) !=
415 SMPD_DBS_SUCCESS){
416 pmi_err_printf("Error initializing local KVS iterator\n");
417 return PMI_FAIL;
418 }
419 while(smpd_dbs_hasMoreKeys(localKVSIter)){
420 if(smpd_dbs_getNextKeyVal(&localKVSIter, key, value)
421 != SMPD_DBS_SUCCESS){
422 smpd_err_printf("Error reading key/val from localKVS\n");
423 return PMI_FAIL;
424 }
425 if(PMI_KVS_Put(remoteKVSName, key, value) != PMI_SUCCESS){
426 smpd_err_printf("Error syncing localKVS to remoteKVS\n");
427 return PMI_FAIL;
428 }
429 }
430 smpd_dbsIter_finalize(&localKVSIter);
431 return PMI_SUCCESS;
432 }
433
434 /* Launch an instance of mpiexec which will connect to SMPD and start a PMI service.
435 * This instance of mpiexec will connect back using the portNo specified in the "-port" option
436 * and provide info about the new PMI service.
437 */
launch_mpiexec_process(int portNo)438 static PMII_PROCESS_HANDLE_TYPE launch_mpiexec_process(int portNo){
439 #ifdef HAVE_WINDOWS_H
440 #define PMII_MAX_MPIEXEC_CMD_STR_LENGTH 100
441 char progName[PMII_MAX_MPIEXEC_CMD_STR_LENGTH];
442 STARTUPINFO sInfo;
443 PROCESS_INFORMATION pInfo = { 0 };
444 ZeroMemory(&sInfo, sizeof(sInfo));
445 sInfo.cb = sizeof(sInfo);
446 ZeroMemory(&pInfo, sizeof(pInfo));
447 snprintf(progName, PMII_MAX_MPIEXEC_CMD_STR_LENGTH,
448 "mpiexec -pmiserver 1 -port %d -hide_console", portNo);
449 if(!CreateProcess(NULL, progName, NULL, NULL, TRUE,
450 NORMAL_PRIORITY_CLASS | CREATE_NO_WINDOW, NULL, NULL, &sInfo, &pInfo)){
451 pmi_err_printf("Error creating mpiexec process...%d\n", GetLastError());
452 pmi_err_printf("This singleton init program tried to access a feature which requires PM support\n");
453 pmi_err_printf("eg: spawn, universe_size etc\n");
454 pmi_err_printf("The program failed because mpiexec could not be located\n");
455 return PMII_PROCESS_INVALID_HANDLE;
456 }
457 return pInfo.hProcess;
458 #else
459 #define PMII_MPIEXEC_CMDLINE_ARGV_SIZE 6
460 int pid, rc;
461 char *mpiexecArgv[PMII_MPIEXEC_CMDLINE_ARGV_SIZE];
462 char port[16];
463 pid = fork();
464 if(pid < 0){
465 pmi_err_printf("Error creating mpiexec process...\n");
466 return PMII_PROCESS_INVALID_HANDLE;
467 }
468 else if(pid == 0){
469 MPIU_Snprintf(port, sizeof(port), "%d", portNo);
470 mpiexecArgv[0] = "mpiexec";
471 mpiexecArgv[1] = "-pmiserver";
472 mpiexecArgv[2] = "1";
473 mpiexecArgv[3] = "-port";
474 mpiexecArgv[4] = port;
475 mpiexecArgv[5] = NULL;
476 rc = execvp(mpiexecArgv[0], mpiexecArgv);
477 pmi_err_printf("Error Singinit execv'ing mpiexec failed\n");
478 pmi_err_printf("This singleton init program tried to access a feature which requires PM support\n");
479 pmi_err_printf("eg: spawn, universe_size etc\n");
480 pmi_err_printf("The program failed because mpiexec could not be located\n");
481 exit(-1);
482 }
483 else{
484 return pid;
485 }
486 #endif
487 }
488
489 #define PMII_ERR_SETPRINTANDJUMP(msg, errcode) { pmi_err_printf("%s", msg); retval = errcode; goto fn_fail; }
490 #define PMII_MAX_ERR_MSG_LENGTH 100
491
PMIi_InitSingleton(void)492 static int PMIi_InitSingleton(void ){
493 SMPDU_Sock_set_t singleton_client_set;
494 SMPDU_Sock_t singleton_client_sock;
495 smpd_context_t *p_singleton_context=NULL;
496 char err_msg[PMII_MAX_ERR_MSG_LENGTH];
497 int singleton_client_lport;
498 int result, retval = PMI_SUCCESS;
499 char rank_str[PMI_MAX_STR_VAL_LENGTH], size_str[PMI_MAX_STR_VAL_LENGTH];
500 char str[PMI_MAX_STR_VAL_LENGTH];
501
502 /* Enable singleton_init state machine tracing */
503 /*
504 smpd_process.verbose = SMPD_TRUE;
505 smpd_process.dbg_state |= SMPD_DBG_STATE_ERROUT | SMPD_DBG_STATE_STDOUT | SMPD_DBG_STATE_TRACE;
506 */
507
508 result = SMPDU_Sock_create_set(&singleton_client_set);
509 if(result != SMPD_SUCCESS){
510 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "SMPDU_Sock_create_set failed: unable to create a sock set, error: %d\n", result);
511 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
512 }
513
514 /* Assign an ephemeral port */
515 singleton_client_lport = 0;
516 result = SMPDU_Sock_listen(singleton_client_set, NULL, &singleton_client_lport, &singleton_client_sock);
517 if (result != SMPD_SUCCESS){
518 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "SMPDU_Sock_listen failed,\nsock error: %s\n", get_sock_error_string(result));
519 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
520 }
521
522 result = smpd_create_context(SMPD_CONTEXT_SINGLETON_INIT_CLIENT, singleton_client_set, singleton_client_sock,
523 -1, &p_singleton_context);
524 if (result != SMPD_SUCCESS){
525 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "smpd_create_context failed, error = %d\n", result);
526 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
527 }
528
529 result = SMPDU_Sock_set_user_ptr(singleton_client_sock, p_singleton_context);
530 if (result != SMPD_SUCCESS){
531 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "SMPDU_Sock_set_user_ptr failed,\nsock error: %s\n", get_sock_error_string(result));
532 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
533 }
534
535 p_singleton_context->state = SMPD_SINGLETON_CLIENT_LISTENING;
536
537 /* Create an instance of mpiexec that will connect back and give us information about the PM to connect to */
538 pmi_process.singleton_mpiexec_fd = launch_mpiexec_process(singleton_client_lport);
539 if(pmi_process.singleton_mpiexec_fd == PMII_PROCESS_INVALID_HANDLE){
540 result = -1;
541 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "launchMpiexecProcess failed\n");
542 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
543 }
544 /* FIXME: Switch to PMI v2 to recognize non-MPICH2 mpiexecs */
545 /* SMPD state machine will accept connection from mpiexec & get information about the PM */
546 result = smpd_enter_at_state(singleton_client_set, SMPD_SINGLETON_CLIENT_LISTENING);
547 if (result != SMPD_SUCCESS) {
548 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "smpd state machine failed, error = %d\n", result);
549 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
550 }
551 /* SMPD state machine has set the PMI info for smpd_process */
552 /* Now we have PMI_KVS, PMI_HOST and PMI_PORT info */
553 if ((smpd_process.port > 0) &&
554 (strlen(smpd_process.host) > 0) &&
555 (strlen(smpd_process.kvs_name) > 0)){
556 /* Save the current local KVS name */
557 MPIU_Strncpy(pmi_process.kvs_name_singleton_nopm, pmi_process.kvs_name, PMI_MAX_KVS_NAME_LENGTH);
558 /* Update the pmi process structs with the new remote KVS info */
559 MPIU_Strncpy(pmi_process.kvs_name, smpd_process.kvs_name, PMI_MAX_KVS_NAME_LENGTH);
560 MPIU_Strncpy(pmi_process.domain_name, smpd_process.domain_name, PMI_MAX_KVS_NAME_LENGTH);
561 MPIU_Strncpy(pmi_process.host, smpd_process.host, PMI_MAX_HOST_NAME_LENGTH);
562 MPIU_Strncpy(pmi_process.root_host, smpd_process.host, PMI_MAX_HOST_NAME_LENGTH);
563 pmi_process.root_port = smpd_process.port;
564 pmi_process.port = smpd_process.port;
565 /*
566 printf("Received:\nkvs_name = %s\nhost = %s\nport = %d\n",
567 pmi_process.kvs_name, pmi_process.host, pmi_process.port); fflush(stdout);
568 */
569
570 smpd_process.id = 1;
571 pmi_process.smpd_id = 1;
572 pmi_process.smpd_key = 0;
573 pmi_process.rpmi = PMI_TRUE;
574 pmi_process.local_kvs = PMI_FALSE;
575 pmi_process.iproc = 0;
576 pmi_process.nproc = 1;
577
578 smpd_process.is_singleton_client = SMPD_TRUE;
579
580 result = SMPDU_Sock_create_set(&pmi_process.set);
581 if (result != SMPD_SUCCESS){
582 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "SMPDU_Sock_create_set failed: unable to create a sock set, error: %d\n", result);
583 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
584 }
585
586 /* Connect to PM */
587 result = uPMI_ConnectToHost(pmi_process.root_host, pmi_process.root_port, SMPD_CONNECTING_RPMI);
588 if (result != SMPD_SUCCESS){
589 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH, "uPMI_ConnectToHost failed: error: %d\n", result);
590 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
591 }
592
593 /* FIXME: Reduce size of rank_str & size_str */
594 MPIU_Snprintf(rank_str, PMI_MAX_STR_VAL_LENGTH, "%d", pmi_process.iproc);
595 MPIU_Snprintf(size_str, PMI_MAX_STR_VAL_LENGTH, "%d", pmi_process.nproc);
596
597 result = pmi_create_post_command("init", pmi_process.kvs_name, rank_str, size_str);
598 if (result != PMI_SUCCESS){
599 pmi_err_printf("PMIi_InitSingleton failed: unable to create an init command.\n");
600 return PMI_FAIL;
601 }
602
603 /* parse the result of the command */
604 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, PMI_MAX_STR_VAL_LENGTH)
605 != MPIU_STR_SUCCESS){
606 pmi_err_printf("PMIi_InitSingleton failed: no result string in the 'init' result command.\n");
607 return PMI_FAIL;
608 }
609
610 if (strncmp(str, SMPD_SUCCESS_STR, PMI_MAX_STR_VAL_LENGTH)){
611 pmi_err_printf("PMIi_InitSingleton failed: %s\n", str);
612 return PMI_FAIL;
613 }
614
615 /* Send info about the process to PM */
616 result = pmi_create_post_command("proc_info", pmi_process.kvs_name, rank_str, size_str);
617 if (result != PMI_SUCCESS){
618 pmi_dbg_printf("PMIi_InitSingleton failed: unable to create a 'proc_info' command.\n");
619 return PMI_FAIL;
620 }
621
622 /* parse the result of the command */
623 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, PMI_MAX_STR_VAL_LENGTH)
624 != MPIU_STR_SUCCESS){
625 pmi_err_printf("PMIi_InitSingleton failed: no result string in the 'proc_info' result command.\n");
626 return PMI_FAIL;
627 }
628
629 if (strncmp(str, SMPD_SUCCESS_STR, PMI_MAX_STR_VAL_LENGTH)){
630 pmi_err_printf("PMIi_InitSingleton failed: %s\n", str);
631 return PMI_FAIL;
632 }
633
634 pmi_process.init_finalized = PMI_INITIALIZED;
635 /* Sync old local KVS with new remote KVS */
636 if(pmi_rsync_localKVS(pmi_process.kvs_name_singleton_nopm, pmi_process.kvs_name)
637 != PMI_SUCCESS){
638 PMII_ERR_SETPRINTANDJUMP("Sync'ing local KVS in singleton proc to remote KVS in PM failed",
639 PMI_FAIL);
640 }
641
642 /* Remove local KVS */
643 if(pmi_destroy_localKVS() != PMI_SUCCESS){
644 PMII_ERR_SETPRINTANDJUMP("Error removing local KVS in singleton proc\n", PMI_FAIL);
645 }
646 }
647 else{
648 MPIU_Snprintf(err_msg, PMII_MAX_ERR_MSG_LENGTH,
649 "No mechanism specified for connecting to the process manager - host %s or port %d provided.\n",
650 pmi_process.host, pmi_process.port);
651 PMII_ERR_SETPRINTANDJUMP(err_msg, result);
652 }
653
654 fn_exit:
655 if(singleton_client_set){
656 result = SMPDU_Sock_destroy_set(singleton_client_set);
657 if(result != SMPD_SUCCESS){
658 pmi_err_printf("SMPDU_Sock_destroy_set failed: unable to destroy a sock set, error: %d\n", result);
659 }
660 }
661 /* Make sure we return the error code set within the funcn */
662 return retval;
663 fn_fail:
664 /* FIXME : Make sure the newly created mpiexec process is also killed in the case of an error */
665 /* FIXME : On failure do we have a local KVS ? */
666 if(p_singleton_context){
667 result = smpd_free_context(p_singleton_context);
668 if(result != SMPD_SUCCESS){
669 pmi_err_printf("smpd_free_context failed, error = %d\n", result);
670 }
671 }
672 goto fn_exit;
673 }
674
iPMI_Initialized(int * initialized)675 int iPMI_Initialized(int *initialized)
676 {
677 if (initialized == NULL)
678 return PMI_ERR_INVALID_ARG;
679 if (pmi_process.init_finalized == PMI_INITIALIZED)
680 {
681 *initialized = PMI_TRUE;
682 }
683 else
684 {
685 *initialized = PMI_FALSE;
686 }
687 return PMI_SUCCESS;
688 }
689
parse_clique(const char * str_orig)690 static int parse_clique(const char *str_orig)
691 {
692 int count, i;
693 char *str, *token;
694 int first, last;
695
696 /* count clique */
697 count = 0;
698 str = MPIU_Strdup(str_orig);
699 if (str == NULL)
700 return PMI_FAIL;
701 token = strtok(str, ",");
702 while (token)
703 {
704 first = atoi(token);
705 while (isdigit(*token))
706 token++;
707 if (*token == '\0')
708 count++;
709 else
710 {
711 if (*token == '.')
712 {
713 token++;
714 token++;
715 last = atoi(token);
716 count += last - first + 1;
717 }
718 else
719 {
720 pmi_err_printf("unexpected clique token: '%s'\n", token);
721 MPIU_Free(str);
722 return PMI_FAIL;
723 }
724 }
725 token = strtok(NULL, ",");
726 }
727 MPIU_Free(str);
728
729 /* allocate array */
730 pmi_process.clique_ranks = (int*)MPIU_Malloc(count * sizeof(int));
731 if (pmi_process.clique_ranks == NULL)
732 return PMI_FAIL;
733 pmi_process.clique_size = count;
734
735 /* populate array */
736 count = 0;
737 str = MPIU_Strdup(str_orig);
738 if (str == NULL)
739 return PMI_FAIL;
740 token = strtok(str, ",");
741 while (token)
742 {
743 first = atoi(token);
744 while (isdigit(*token))
745 token++;
746 if (*token == '\0')
747 {
748 pmi_process.clique_ranks[count] = first;
749 count++;
750 }
751 else
752 {
753 if (*token == '.')
754 {
755 token++;
756 token++;
757 last = atoi(token);
758 for (i=first; i<=last; i++)
759 {
760 pmi_process.clique_ranks[count] = i;
761 count++;
762 }
763 }
764 else
765 {
766 pmi_err_printf("unexpected clique token: '%s'\n", token);
767 MPIU_Free(str);
768 return PMI_FAIL;
769 }
770 }
771 token = strtok(NULL, ",");
772 }
773 MPIU_Free(str);
774
775 /*
776 printf("clique: %d [", pmi_process.iproc);
777 for (i=0; i<pmi_process.clique_size; i++)
778 {
779 printf("%d,", pmi_process.clique_ranks[i]);
780 }
781 printf("]\n");
782 fflush(stdout);
783 */
784 return PMI_SUCCESS;
785 }
786
787
rPMI_Init(int * spawned)788 static int rPMI_Init(int *spawned)
789 {
790 char *p;
791 int result;
792 char rank_str[100], size_str[100];
793 char str[1024];
794
795 if (spawned == NULL)
796 return PMI_ERR_INVALID_ARG;
797
798 /* Enable state machine tracing
799 smpd_process.verbose = SMPD_TRUE;
800 smpd_process.dbg_state |= SMPD_DBG_STATE_ERROUT | SMPD_DBG_STATE_STDOUT | SMPD_DBG_STATE_TRACE;
801 */
802
803 /* initialize to defaults */
804 smpd_process.id = 1;
805 pmi_process.smpd_id = 1;
806 pmi_process.rpmi = PMI_TRUE;
807 pmi_process.iproc = 0;
808 pmi_process.nproc = 1;
809
810 p = getenv("PMI_ROOT_HOST");
811 if (p == NULL)
812 {
813 pmi_err_printf("unable to initialize the rPMI library: no PMI_ROOT_HOST specified.\n");
814 return PMI_FAIL;
815 }
816 strncpy(pmi_process.root_host, p, 100);
817
818 p = getenv("PMI_ROOT_PORT");
819 if (p == NULL)
820 {
821 /* set to default port? */
822 pmi_err_printf("unable to initialize the rPMI library: no PMI_ROOT_PORT specified.\n");
823 return PMI_FAIL;
824 }
825 pmi_process.root_port = atoi(p);
826 if (pmi_process.root_port < 1)
827 {
828 pmi_err_printf("invalid root port specified: %s\n", p);
829 return PMI_FAIL;
830 }
831 smpd_process.port = pmi_process.root_port;
832 strcpy(smpd_process.host, pmi_process.root_host);
833
834 p = getenv("PMI_SPAWN");
835 if (p)
836 {
837 *spawned = atoi(p);
838 }
839 else
840 {
841 *spawned = 0;
842 }
843
844 p = getenv("PMI_KVS");
845 if (p != NULL)
846 {
847 /* use specified kvs name */
848 strncpy(pmi_process.kvs_name, p, PMI_MAX_KVS_NAME_LENGTH);
849 strncpy(smpd_process.kvs_name, p, PMI_MAX_KVS_NAME_LENGTH);
850 }
851 else
852 {
853 /* use default kvs name */
854 strncpy(pmi_process.kvs_name, "default_mpich_kvs_name", PMI_MAX_KVS_NAME_LENGTH);
855 strncpy(smpd_process.kvs_name, "default_mpich_kvs_name", PMI_MAX_KVS_NAME_LENGTH);
856 }
857
858 p = getenv("PMI_DOMAIN");
859 if (p != NULL)
860 {
861 strncpy(pmi_process.domain_name, p, PMI_MAX_KVS_NAME_LENGTH);
862 strncpy(smpd_process.domain_name, p, PMI_MAX_KVS_NAME_LENGTH);
863 }
864 else
865 {
866 strncpy(pmi_process.domain_name, "mpich2", PMI_MAX_KVS_NAME_LENGTH);
867 strncpy(smpd_process.domain_name, "mpich2", PMI_MAX_KVS_NAME_LENGTH);
868 }
869
870 p = getenv("PMI_RANK");
871 if (p != NULL)
872 {
873 pmi_process.iproc = atoi(p);
874 if (pmi_process.iproc < 0)
875 {
876 pmi_err_printf("invalid rank %d\n", pmi_process.iproc);
877 return PMI_FAIL;
878 }
879 }
880
881 p = getenv("PMI_SIZE");
882 if (p != NULL)
883 {
884 pmi_process.nproc = atoi(p);
885 if (pmi_process.nproc < 1)
886 {
887 pmi_err_printf("invalid size %d\n", pmi_process.nproc);
888 return PMI_FAIL;
889 }
890 }
891 smpd_process.nproc = pmi_process.nproc;
892 #ifdef SINGLE_PROCESS_OPTIMIZATION
893 /* leave this code #ifdef'd out so we can test rPMI stuff with one process */
894 if (pmi_process.nproc == 1)
895 {
896 pmi_process.local_kvs = PMI_TRUE;
897 result = smpd_dbs_init();
898 if (result != SMPD_SUCCESS)
899 {
900 pmi_err_printf("unable to initialize the local dbs engine.\n");
901 return PMI_FAIL;
902 }
903 result = smpd_dbs_create(pmi_process.kvs_name);
904 if (result != SMPD_SUCCESS)
905 {
906 pmi_err_printf("unable to create the process group kvs\n");
907 return PMI_FAIL;
908 }
909 pmi_process.init_finalized = PMI_INITIALIZED;
910 return PMI_SUCCESS;
911 }
912 #endif
913
914 p = getenv("PMI_CLIQUE");
915 if (p != NULL)
916 {
917 parse_clique(p);
918 }
919
920 /*
921 printf("PMI_ROOT_HOST=%s PMI_ROOT_PORT=%s PMI_RANK=%s PMI_SIZE=%s PMI_KVS=%s PMI_CLIQUE=%s\n",
922 getenv("PMI_ROOT_HOST"), getenv("PMI_ROOT_PORT"), getenv("PMI_RANK"), getenv("PMI_SIZE"),
923 getenv("PMI_KVS"), getenv("PMI_CLIQUE"));
924 fflush(stdout);
925 */
926
927 if (pmi_process.iproc == 0)
928 {
929 p = getenv("PMI_ROOT_LOCAL");
930 if (p && strcmp(p, "1") == 0)
931 {
932 #ifdef HAVE_WINDOWS_H
933 pmi_process.hRootThreadReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
934 if (pmi_process.hRootThreadReadyEvent == NULL)
935 {
936 pmi_err_printf("unable to create the root listener synchronization event, error: %d\n", GetLastError());
937 return PMI_FAIL;
938 }
939 pmi_process.hRootThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)root_smpd, NULL, 0, NULL);
940 if (pmi_process.hRootThread == NULL)
941 {
942 pmi_err_printf("unable to create the root listener thread: error %d\n", GetLastError());
943 return PMI_FAIL;
944 }
945 if (WaitForSingleObject(pmi_process.hRootThreadReadyEvent, 60000) != WAIT_OBJECT_0)
946 {
947 pmi_err_printf("the root process thread failed to initialize.\n");
948 return PMI_FAIL;
949 }
950 #else
951 result = fork();
952 if (result == -1)
953 {
954 pmi_err_printf("unable to fork the root listener, errno %d\n", errno);
955 return PMI_FAIL;
956 }
957 if (result == 0)
958 {
959 root_smpd(NULL);
960 exit(0);
961 }
962 pmi_process.root_pid = result;
963 #endif
964 }
965 }
966
967 /* connect to the root */
968
969 result = SMPDU_Sock_create_set(&pmi_process.set);
970 if (result != SMPD_SUCCESS)
971 {
972 pmi_err_printf("PMI_Init failed: unable to create a sock set, error: %d\n", result);
973 return PMI_FAIL;
974 }
975
976 result = uPMI_ConnectToHost(pmi_process.root_host, pmi_process.root_port, SMPD_CONNECTING_RPMI);
977 if (result != SMPD_SUCCESS)
978 {
979 pmi_err_printf("PMI_Init failed.\n");
980 return PMI_FAIL;
981 }
982
983 pmi_process.init_finalized = PMI_INITIALIZED;
984
985 sprintf(rank_str, "%d", pmi_process.iproc);
986 sprintf(size_str, "%d", pmi_process.nproc);
987 result = pmi_create_post_command("init", pmi_process.kvs_name, rank_str, size_str);
988 if (result != PMI_SUCCESS)
989 {
990 pmi_err_printf("PMI_Init failed: unable to create an init command.\n");
991 return PMI_FAIL;
992 }
993
994 /* parse the result of the command */
995 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
996 {
997 pmi_err_printf("PMI_Init failed: no result string in the result command.\n");
998 return PMI_FAIL;
999 }
1000 if (strcmp(str, SMPD_SUCCESS_STR))
1001 {
1002 pmi_err_printf("PMI_Init failed: %s\n", str);
1003 return PMI_FAIL;
1004 }
1005
1006 return PMI_SUCCESS;
1007 }
1008
rPMI_Finalize()1009 static int rPMI_Finalize()
1010 {
1011 int result;
1012 char rank_str[100];
1013 char str[1024];
1014 #ifndef HAVE_WINDOWS_H
1015 int status;
1016 #endif
1017
1018 if (pmi_process.init_finalized == PMI_FINALIZED)
1019 return PMI_SUCCESS;
1020
1021 if(pmi_process.init_finalized < PMI_INITIALIZED)
1022 return PMI_SUCCESS;
1023
1024 if (pmi_process.local_kvs)
1025 {
1026 if(pmi_destroy_localKVS() != PMI_SUCCESS){
1027 pmi_dbg_printf("Failed to destroy local KVS\n");
1028 }
1029 if(pmi_process.singleton_mpiexec_fd != PMII_PROCESS_INVALID_HANDLE){
1030 #ifdef HAVE_WINDOWS_H
1031 WaitForSingleObject(pmi_process.singleton_mpiexec_fd, INFINITE);
1032 #else
1033 waitpid(pmi_process.singleton_mpiexec_fd, &status, WUNTRACED);
1034 #endif
1035 }
1036
1037 result = SMPDU_Sock_finalize();
1038 pmi_process.init_finalized = PMI_FINALIZED;
1039 return PMI_SUCCESS;
1040 }
1041
1042 sprintf(rank_str, "%d", pmi_process.iproc);
1043 result = pmi_create_post_command("finalize", pmi_process.kvs_name, rank_str, NULL);
1044 if (result != PMI_SUCCESS)
1045 {
1046 pmi_err_printf("PMI_Finalize failed: unable to create an finalize command.\n");
1047 return PMI_FAIL;
1048 }
1049
1050 /* parse the result of the command */
1051 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1052 {
1053 pmi_err_printf("PMI_Finalize failed: no result string in the result command.\n");
1054 return PMI_FAIL;
1055 }
1056 if (strcmp(str, SMPD_SUCCESS_STR))
1057 {
1058 pmi_err_printf("PMI_Finalize failed: %s\n", str);
1059 return PMI_FAIL;
1060 }
1061
1062 if (pmi_process.iproc == 0)
1063 {
1064 /* the root process tells the root to exit when all the pmi contexts have exited */
1065 result = pmi_create_post_command("exit_on_done", NULL, NULL, NULL);
1066 if (result != PMI_SUCCESS)
1067 {
1068 pmi_err_printf("exit_on_done command failed.\n");
1069 return PMI_FAIL;
1070 }
1071 /*printf("exit_on_done command returned successfully.\n");fflush(stdout);*/
1072 }
1073
1074 /*printf("entering finalize pmi_barrier.\n");fflush(stdout);*/
1075 PMI_Barrier();
1076 /*printf("after finalize pmi_barrier, posting done command.\n");fflush(stdout);*/
1077
1078 /* post a done command to close the pmi context */
1079 result = pmi_create_post_command("done", NULL, NULL, NULL);
1080 if (result != PMI_SUCCESS)
1081 {
1082 pmi_err_printf("failed.\n");
1083 return PMI_FAIL;
1084 }
1085
1086 if (pmi_process.iproc == 0)
1087 {
1088 #ifdef HAVE_WINDOWS_H
1089 WaitForSingleObject(pmi_process.hRootThread, INFINITE);
1090 if(pmi_process.singleton_mpiexec_fd != PMII_PROCESS_INVALID_HANDLE){
1091 WaitForSingleObject(pmi_process.singleton_mpiexec_fd, INFINITE);
1092 }
1093 #else
1094 waitpid(pmi_process.root_pid, &status, WUNTRACED);
1095 if(pmi_process.singleton_mpiexec_fd != PMII_PROCESS_INVALID_HANDLE){
1096 waitpid(pmi_process.singleton_mpiexec_fd, &status, WUNTRACED);
1097 }
1098 #endif
1099 }
1100
1101 /*if (pmi_process.sock != SMPDU_SOCK_INVALID_SOCK)*/
1102 {
1103 result = SMPDU_Sock_finalize();
1104 if (result != SMPD_SUCCESS)
1105 {
1106 /*pmi_err_printf("SMPDU_Sock_finalize failed, error: %d\n", result);*/
1107 }
1108 }
1109
1110 pmi_process.init_finalized = PMI_FINALIZED;
1111
1112 return PMI_SUCCESS;
1113 }
1114
iPMI_Init(int * spawned)1115 int iPMI_Init(int *spawned)
1116 {
1117 char *p;
1118 int result;
1119 char rank_str[100], size_str[100];
1120 char str[1024];
1121
1122 if (spawned == NULL){
1123 return PMI_ERR_INVALID_ARG;
1124 }
1125
1126 /* Enable smpd state machine tracing */
1127 /*
1128 smpd_process.verbose = SMPD_TRUE;
1129 smpd_process.dbg_state |= SMPD_DBG_STATE_ERROUT | SMPD_DBG_STATE_STDOUT | SMPD_DBG_STATE_TRACE;
1130 */
1131
1132 pmi_init_printf();
1133
1134 /* don't allow pmi_init to be called more than once */
1135 if (pmi_process.init_finalized == PMI_INITIALIZED)
1136 return PMI_SUCCESS;
1137
1138 /* initialize to defaults */
1139
1140 result = SMPDU_Sock_init();
1141 if (result != SMPD_SUCCESS)
1142 {
1143 pmi_err_printf("SMPDU_Sock_init failed,\nsock error: %s\n", get_sock_error_string(result));
1144 return PMI_FAIL;
1145 }
1146
1147 result = smpd_init_process();
1148 if (result != SMPD_SUCCESS)
1149 {
1150 pmi_err_printf("unable to initialize the smpd global process structure.\n");
1151 return PMI_FAIL;
1152 }
1153
1154 p = getenv("PMI_ROOT_HOST");
1155 if (p != NULL)
1156 {
1157 return rPMI_Init(spawned);
1158 }
1159
1160 pmi_process.iproc = 0;
1161 pmi_process.nproc = 1;
1162
1163 p = getenv("PMI_SPAWN");
1164 if (p)
1165 {
1166 *spawned = atoi(p);
1167 }
1168 else
1169 {
1170 *spawned = 0;
1171 }
1172
1173 p = getenv("PMI_APPNUM");
1174 if (p)
1175 {
1176 pmi_process.appnum = atoi(p);
1177 }
1178 else
1179 {
1180 pmi_process.appnum = 0;
1181 }
1182
1183 /* Determine If singleton */
1184 p = getenv("PMI_SMPD_FD");
1185 if( p == NULL){
1186 p = getenv("PMI_HOST");
1187 if( p == NULL){
1188 /* FIXME: Do we need a check for PMI_KVS to determine if
1189 * client is singleton ?
1190 */
1191 p = getenv("PMI_KVS");
1192 if(p == NULL){
1193 /* Assume singleton.
1194 * Setup the PMI service when required i.e., later
1195 */
1196 pmi_process.init_finalized =
1197 PMI_SINGLETON_INIT_BUT_NO_PM;
1198 /* Rank & Nprocs initialized by default above*/
1199 /* Create a local KVS which will be used until
1200 spawn(), universe_size, kvs_get() is called*/
1201 if(pmi_create_localKVS() == PMI_SUCCESS){
1202 return PMI_SUCCESS;
1203 }
1204 else{
1205 pmi_err_printf("Unable to create local KVS\n");
1206 return PMI_FAIL;
1207 }
1208 }
1209 }
1210 }
1211 else{
1212 /* decode PMI_SMPD_FD */
1213 #ifdef HAVE_WINDOWS_H
1214 pmi_process.smpd_fd = smpd_decode_handle(p);
1215 #else
1216 pmi_process.smpd_fd = (SMPDU_SOCK_NATIVE_FD)atoi(p);
1217 #endif
1218 if(pmi_process.smpd_fd <= 0){
1219 /* FIXME: hack - Is there a better way ? */
1220 /* mpiexec sets smpd_fd<=0 to distinguish itself from
1221 * a singleton MPI process
1222 */
1223 /* FIXME: Get rid of this hack - we already create
1224 * local KVS for all singleton clients by default
1225 */
1226 pmi_process.smpd_fd = 0;
1227 putenv("PMI_SMPD_FD=");
1228 }
1229 }
1230
1231 p = getenv("PMI_KVS");
1232 if (p != NULL)
1233 {
1234 strncpy(pmi_process.kvs_name, p, PMI_MAX_KVS_NAME_LENGTH);
1235 }
1236 else
1237 {
1238 /* mpiexec/smpd don't set PMI_KVS */
1239 if(pmi_create_localKVS() == PMI_SUCCESS){
1240 pmi_process.init_finalized = PMI_INITIALIZED;
1241 return PMI_SUCCESS;
1242 }
1243 else{
1244 pmi_err_printf("unable to create local KVS\n");
1245 return PMI_FAIL;
1246 }
1247 }
1248
1249 p = getenv("PMI_DOMAIN");
1250 if (p != NULL)
1251 {
1252 strncpy(pmi_process.domain_name, p, PMI_MAX_KVS_NAME_LENGTH);
1253 }
1254 else
1255 {
1256 strncpy(pmi_process.domain_name, "mpich2", PMI_MAX_KVS_NAME_LENGTH);
1257 }
1258
1259 p = getenv("PMI_RANK");
1260 if (p != NULL)
1261 {
1262 pmi_process.iproc = atoi(p);
1263 if (pmi_process.iproc < 0)
1264 {
1265 pmi_err_printf("invalid rank %d, setting to 0\n", pmi_process.iproc);
1266 pmi_process.iproc = 0;
1267 }
1268 }
1269
1270 p = getenv("PMI_SIZE");
1271 if (p != NULL)
1272 {
1273 pmi_process.nproc = atoi(p);
1274 if (pmi_process.nproc < 1)
1275 {
1276 pmi_err_printf("invalid size %d, setting to 1\n", pmi_process.nproc);
1277 pmi_process.nproc = 1;
1278 }
1279 }
1280
1281 p = getenv("PMI_SMPD_ID");
1282 if (p != NULL){
1283 pmi_process.smpd_id = atoi(p);
1284 smpd_process.id = pmi_process.smpd_id;
1285 }
1286
1287 p = getenv("PMI_SMPD_KEY");
1288 if (p != NULL)
1289 {
1290 pmi_process.smpd_key = atoi(p);
1291 }
1292
1293 p = getenv("PMI_SMPD_FD");
1294 if (p != NULL)
1295 {
1296 result = SMPDU_Sock_create_set(&pmi_process.set);
1297 if (result != SMPD_SUCCESS)
1298 {
1299 pmi_err_printf("PMI_Init failed: unable to create a sock set, error:\n%s\n",
1300 get_sock_error_string(result));
1301 return PMI_FAIL;
1302 }
1303 /* pmi_process.smpd_fd is decoded when checking for Singleton Init */
1304 result = SMPDU_Sock_native_to_sock(pmi_process.set, pmi_process.smpd_fd, NULL, &pmi_process.sock);
1305 if (result != SMPD_SUCCESS)
1306 {
1307 pmi_err_printf("SMPDU_Sock_native_to_sock failed, error %s\n", get_sock_error_string(result));
1308 return PMI_FAIL;
1309 }
1310 result = smpd_create_context(SMPD_CONTEXT_PMI, pmi_process.set, pmi_process.sock, pmi_process.smpd_id, &pmi_process.context);
1311 if (result != SMPD_SUCCESS)
1312 {
1313 pmi_err_printf("unable to create a pmi context.\n");
1314 return PMI_FAIL;
1315 }
1316 }
1317 else
1318 {
1319 p = getenv("PMI_HOST");
1320 if (p != NULL)
1321 {
1322 strncpy(pmi_process.host, p, PMI_MAX_HOST_NAME_LENGTH);
1323 p = getenv("PMI_PORT");
1324 if (p != NULL)
1325 {
1326 pmi_process.port = atoi(p);
1327
1328 result = SMPDU_Sock_create_set(&pmi_process.set);
1329 if (result != SMPD_SUCCESS)
1330 {
1331 pmi_err_printf("PMI_Init failed: unable to create a sock set, error: %d\n", result);
1332 return PMI_FAIL;
1333 }
1334
1335 result = uPMI_ConnectToHost(pmi_process.host, pmi_process.port, SMPD_CONNECTING_PMI);
1336 if (result != SMPD_SUCCESS)
1337 {
1338 pmi_err_printf("PMI_Init failed.\n");
1339 return PMI_FAIL;
1340 }
1341 }
1342 else
1343 {
1344 pmi_err_printf("No mechanism specified for connecting to the process manager - host %s but no port provided.\n", pmi_process.host);
1345 return PMI_FAIL;
1346 }
1347 }
1348 else
1349 {
1350 /* SINGLETON: Assume singleton here and initialize to SINGLETON_INIT_BUT_NO_PM
1351 * Also set PMI_KVS & PMI_DOMAIN after this step...
1352 */
1353 pmi_err_printf("No mechanism specified for connecting to the process manager.\n");
1354 return PMI_FAIL;
1355 }
1356 }
1357
1358 p = getenv("PMI_CLIQUE");
1359 if (p != NULL)
1360 {
1361 parse_clique(p);
1362 }
1363 /*
1364 printf("PMI_RANK=%s PMI_SIZE=%s PMI_KVS=%s PMI_SMPD_ID=%s PMI_SMPD_FD=%s PMI_SMPD_KEY=%s\n PMI_SPAWN=%s",
1365 getenv("PMI_RANK"), getenv("PMI_SIZE"), getenv("PMI_KVS"), getenv("PMI_SMPD_ID"),
1366 getenv("PMI_SMPD_FD"), getenv("PMI_SMPD_KEY"), getenv("PMI_SPAWN"));
1367 fflush(stdout);
1368 */
1369
1370 pmi_process.init_finalized = PMI_INITIALIZED;
1371
1372 sprintf(rank_str, "%d", pmi_process.iproc);
1373 sprintf(size_str, "%d", pmi_process.nproc);
1374 result = pmi_create_post_command("init", pmi_process.kvs_name, rank_str, size_str);
1375 if (result != PMI_SUCCESS)
1376 {
1377 pmi_err_printf("PMI_Init failed: unable to create an init command.\n");
1378 return PMI_FAIL;
1379 }
1380
1381 /* parse the result of the command */
1382 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1383 {
1384 pmi_err_printf("PMI_Init failed: no result string in the result command.\n");
1385 return PMI_FAIL;
1386 }
1387 if (strcmp(str, SMPD_SUCCESS_STR))
1388 {
1389 pmi_err_printf("PMI_Init failed: %s\n", str);
1390 return PMI_FAIL;
1391 }
1392
1393 /*
1394 if (*spawned && pmi_process.iproc == 0)
1395 {
1396 char key[1024], val[8192];
1397 key[0] = '\0';
1398 result = PMI_KVS_Iter_first(pmi_process.kvs_name, key, 1024, val, 8192);
1399 if (result != PMI_SUCCESS || key[0] == '\0')
1400 {
1401 printf("No preput values in %s\n", pmi_process.kvs_name);
1402 }
1403 while (result == PMI_SUCCESS && key[0] != '\0')
1404 {
1405 printf("PREPUT key=%s, val=%s\n", key, val);
1406 result = PMI_KVS_Iter_next(pmi_process.kvs_name, key, 1024, val, 8192);
1407 }
1408 fflush(stdout);
1409 }
1410 iPMI_Barrier();
1411 */
1412
1413 /*printf("iPMI_Init returning success.\n");fflush(stdout);*/
1414 return PMI_SUCCESS;
1415 }
1416
iPMI_Finalize()1417 int iPMI_Finalize()
1418 {
1419 int result;
1420 char rank_str[100];
1421 char str[1024];
1422
1423 if (pmi_process.init_finalized == PMI_FINALIZED)
1424 return PMI_SUCCESS;
1425
1426 if (pmi_process.rpmi)
1427 {
1428 return rPMI_Finalize();
1429 }
1430
1431 if(pmi_process.init_finalized < PMI_INITIALIZED)
1432 return PMI_SUCCESS;
1433
1434 if (pmi_process.local_kvs)
1435 {
1436 if(pmi_destroy_localKVS() != PMI_SUCCESS){
1437 pmi_dbg_printf("Failed to destroy local KVS\n");
1438 }
1439 result = SMPDU_Sock_finalize();
1440 pmi_process.init_finalized = PMI_FINALIZED;
1441 return PMI_SUCCESS;
1442 }
1443
1444 sprintf(rank_str, "%d", pmi_process.iproc);
1445 result = pmi_create_post_command("finalize", pmi_process.kvs_name, rank_str, NULL);
1446 if (result != PMI_SUCCESS)
1447 {
1448 pmi_err_printf("PMI_Finalize failed: unable to create an finalize command.\n");
1449 goto fn_fail;
1450 }
1451
1452 /* parse the result of the command */
1453 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1454 {
1455 pmi_err_printf("PMI_Finalize failed: no result string in the result command.\n");
1456 goto fn_fail;
1457 }
1458 if (strcmp(str, SMPD_SUCCESS_STR))
1459 {
1460 pmi_err_printf("PMI_Finalize failed: %s\n", str);
1461 goto fn_fail;
1462 }
1463
1464 PMI_Barrier();
1465
1466 /* post the done command and wait for the result */
1467 result = pmi_create_post_command("done", NULL, NULL, NULL);
1468 if (result != PMI_SUCCESS)
1469 {
1470 pmi_err_printf("failed.\n");
1471 goto fn_fail;
1472 }
1473
1474 /*if (pmi_process.sock != SMPDU_SOCK_INVALID_SOCK)*/
1475 {
1476 result = SMPDU_Sock_finalize();
1477 if (result != SMPD_SUCCESS)
1478 {
1479 /*pmi_err_printf("SMPDU_Sock_finalize failed,\nsock error: %s\n", get_sock_error_string(result));*/
1480 }
1481 }
1482
1483 pmi_process.init_finalized = PMI_FINALIZED;
1484 /*printf("iPMI_Finalize success.\n");fflush(stdout);*/
1485 return PMI_SUCCESS;
1486
1487 fn_fail:
1488 /* set the state to finalized so PMI_Abort will not dereference mangled structures due to a failure */
1489 pmi_process.init_finalized = PMI_FINALIZED;
1490 return PMI_FAIL;
1491 }
1492
iPMI_Abort(int exit_code,const char error_msg[])1493 int iPMI_Abort(int exit_code, const char error_msg[])
1494 {
1495 int result;
1496 smpd_command_t *cmd_ptr;
1497
1498 /* flush any output before aborting */
1499 /* This doesn't work because it flushes output from the mpich dll but does not flush the main module's output */
1500 fflush(stdout);
1501 fflush(stderr);
1502
1503 if (pmi_process.init_finalized == PMI_FINALIZED)
1504 {
1505 printf("PMI_Abort called after PMI_Finalize, error message:\n%s\n", error_msg);
1506 fflush(stdout);
1507 #ifdef HAVE_WINDOWS_H
1508 /* ExitProcess(exit_code); */
1509 TerminateProcess(GetCurrentProcess(), exit_code);
1510 #else
1511 exit(exit_code);
1512 return PMI_FAIL;
1513 #endif
1514 }
1515
1516 if (pmi_process.local_kvs)
1517 {
1518 if (smpd_process.verbose_abort_output)
1519 {
1520 printf("\njob aborted:\n");
1521 printf("process: node: exit code: error message:\n");
1522 printf("0: localhost: %d", exit_code);
1523 if (error_msg != NULL)
1524 {
1525 printf(": %s", error_msg);
1526 }
1527 printf("\n");
1528 }
1529 else
1530 {
1531 if (error_msg != NULL)
1532 {
1533 printf("%s\n", error_msg);
1534 }
1535 }
1536 fflush(stdout);
1537 if(pmi_destroy_localKVS() != PMI_SUCCESS){
1538 pmi_dbg_printf("Failed to destroy local KVS\n");
1539 }
1540 pmi_process.init_finalized = PMI_FINALIZED;
1541 #ifdef HAVE_WINDOWS_H
1542 /* ExitProcess(exit_code); */
1543 TerminateProcess(GetCurrentProcess(), exit_code);
1544 #else
1545 exit(exit_code);
1546 return PMI_FAIL;
1547 #endif
1548 }
1549 if(pmi_process.init_finalized < PMI_INITIALIZED)
1550 return PMI_FAIL;
1551
1552 result = smpd_create_command("abort_job", pmi_process.smpd_id, 0, SMPD_FALSE, &cmd_ptr);
1553 if (result != SMPD_SUCCESS)
1554 {
1555 pmi_err_printf("unable to create an abort command.\n");
1556 return PMI_FAIL;
1557 }
1558
1559 result = smpd_add_command_arg(cmd_ptr, "name", pmi_process.kvs_name);
1560 if (result != SMPD_SUCCESS)
1561 {
1562 pmi_err_printf("unable to add the kvs name('%s') to the abort command.\n", pmi_process.kvs_name);
1563 return PMI_FAIL;
1564 }
1565
1566 result = smpd_add_command_int_arg(cmd_ptr, "rank", pmi_process.iproc);
1567 if (result != SMPD_SUCCESS)
1568 {
1569 pmi_err_printf("unable to add the rank %d to the abort command.\n", pmi_process.iproc);
1570 return PMI_FAIL;
1571 }
1572
1573 result = smpd_add_command_arg(cmd_ptr, "error", (char*)error_msg);
1574 if (result != SMPD_SUCCESS)
1575 {
1576 pmi_err_printf("unable to add the error message('%s') to the abort command.\n", error_msg);
1577 return PMI_FAIL;
1578 }
1579
1580 result = smpd_add_command_int_arg(cmd_ptr, "exit_code", exit_code);
1581 if (result != SMPD_SUCCESS)
1582 {
1583 pmi_err_printf("unable to add the exit code(%d) to the abort command.\n", exit_code);
1584 return PMI_FAIL;
1585 }
1586
1587 /* post the write of the command */
1588 result = smpd_post_write_command(pmi_process.context, cmd_ptr);
1589 if (result != SMPD_SUCCESS)
1590 {
1591 pmi_err_printf("unable to post a write of the abort command.\n");
1592 return PMI_FAIL;
1593 }
1594
1595 /* and post a read for the result */
1596 /*
1597 result = smpd_post_read_command(pmi_process.context);
1598 if (result != SMPD_SUCCESS)
1599 {
1600 pmi_err_printf("unable to post a read of the next command on the pmi context.\n");
1601 return PMI_FAIL;
1602 }
1603 */
1604
1605 /* let the state machine send the command and receive the result */
1606 result = smpd_enter_at_state(pmi_process.set, SMPD_WRITING_CMD);
1607 if (result != SMPD_SUCCESS)
1608 {
1609 pmi_err_printf("the state machine logic failed to handle the abort command.\n");
1610 return PMI_FAIL;
1611 }
1612
1613 if(pmi_process.iproc == 0 &&
1614 pmi_process.singleton_mpiexec_fd != PMII_PROCESS_INVALID_HANDLE){
1615 int status;
1616 #ifdef HAVE_WINDOWS_H
1617 WaitForSingleObject(pmi_process.singleton_mpiexec_fd, INFINITE);
1618 #else
1619 waitpid(pmi_process.singleton_mpiexec_fd, &status, WUNTRACED);
1620 #endif
1621 }
1622
1623 #ifdef HAVE_WINDOWS_H
1624 /* ExitProcess(exit_code); */
1625 TerminateProcess(GetCurrentProcess(), exit_code);
1626 #else
1627 exit(exit_code);
1628 return PMI_FAIL;
1629 #endif
1630 }
1631
iPMI_Get_size(int * size)1632 int iPMI_Get_size(int *size)
1633 {
1634 if (pmi_process.init_finalized == PMI_FINALIZED)
1635 return PMI_ERR_INIT;
1636 if (size == NULL)
1637 return PMI_ERR_INVALID_ARG;
1638
1639 *size = pmi_process.nproc;
1640
1641 return PMI_SUCCESS;
1642 }
1643
iPMI_Get_rank(int * rank)1644 int iPMI_Get_rank(int *rank)
1645 {
1646 if (pmi_process.init_finalized == PMI_FINALIZED)
1647 return PMI_ERR_INIT;
1648 if (rank == NULL)
1649 return PMI_ERR_INVALID_ARG;
1650
1651 *rank = pmi_process.iproc;
1652
1653 return PMI_SUCCESS;
1654 }
1655
iPMI_Get_universe_size(int * size)1656 int iPMI_Get_universe_size(int *size)
1657 {
1658 if (pmi_process.init_finalized == PMI_FINALIZED)
1659 return PMI_ERR_INIT;
1660 /* Singleton init */
1661 if(pmi_process.init_finalized == PMI_SINGLETON_INIT_BUT_NO_PM){
1662 if(PMIi_InitSingleton() != PMI_SUCCESS){
1663 return PMI_ERR_INIT;
1664 }
1665 }
1666 if (size == NULL)
1667 return PMI_ERR_INVALID_ARG;
1668
1669 *size = -1;
1670
1671 return PMI_SUCCESS;
1672 }
1673
iPMI_Get_appnum(int * appnum)1674 int iPMI_Get_appnum(int *appnum)
1675 {
1676 if (pmi_process.init_finalized == PMI_FINALIZED)
1677 return PMI_ERR_INIT;
1678 if (appnum == NULL)
1679 return PMI_ERR_INVALID_ARG;
1680
1681 *appnum = pmi_process.appnum;
1682
1683 return PMI_SUCCESS;
1684 }
1685
iPMI_Get_clique_size(int * size)1686 int iPMI_Get_clique_size( int *size )
1687 {
1688 if (pmi_process.init_finalized == PMI_FINALIZED)
1689 return PMI_ERR_INIT;
1690 if (size == NULL)
1691 return PMI_ERR_INVALID_ARG;
1692
1693 if (pmi_process.clique_size == 0)
1694 *size = 1;
1695 else
1696 *size = pmi_process.clique_size;
1697 return PMI_SUCCESS;
1698 }
1699
iPMI_Get_clique_ranks(int ranks[],int length)1700 int iPMI_Get_clique_ranks( int ranks[], int length )
1701 {
1702 int i;
1703
1704 if (pmi_process.init_finalized == PMI_FINALIZED)
1705 return PMI_ERR_INIT;
1706 if (ranks == NULL)
1707 return PMI_ERR_INVALID_ARG;
1708 if (length < pmi_process.clique_size)
1709 return PMI_ERR_INVALID_LENGTH;
1710
1711 if (pmi_process.clique_size == 0)
1712 {
1713 *ranks = 0;
1714 }
1715 else
1716 {
1717 for (i=0; i<pmi_process.clique_size; i++)
1718 {
1719 ranks[i] = pmi_process.clique_ranks[i];
1720 }
1721 }
1722 return PMI_SUCCESS;
1723 }
1724
iPMI_Get_id(char id_str[],int length)1725 int iPMI_Get_id( char id_str[], int length )
1726 {
1727 return iPMI_KVS_Get_my_name(id_str, length);
1728 }
1729
iPMI_Get_id_length_max(int * maxlen)1730 int iPMI_Get_id_length_max(int *maxlen)
1731 {
1732 return iPMI_KVS_Get_name_length_max(maxlen);
1733 }
1734
iPMI_Get_kvs_domain_id(char id_str[],int length)1735 int iPMI_Get_kvs_domain_id(char id_str[], int length)
1736 {
1737 if (pmi_process.init_finalized == PMI_FINALIZED)
1738 return PMI_ERR_INIT;
1739 if (id_str == NULL)
1740 return PMI_ERR_INVALID_ARG;
1741 if (length < PMI_MAX_KVS_NAME_LENGTH)
1742 return PMI_ERR_INVALID_LENGTH;
1743
1744 strncpy(id_str, pmi_process.domain_name, length);
1745
1746 return PMI_SUCCESS;
1747 }
1748
iPMI_Barrier()1749 int iPMI_Barrier()
1750 {
1751 int result;
1752 char count_str[20];
1753 char str[1024];
1754
1755 if (pmi_process.init_finalized == PMI_FINALIZED)
1756 return PMI_ERR_INIT;
1757
1758 if (pmi_process.nproc == 1)
1759 return PMI_SUCCESS;
1760
1761 /*printf("entering barrier %d, %s\n", pmi_process.nproc, pmi_process.kvs_name);fflush(stdout);*/
1762
1763 /* encode the size of the barrier */
1764 snprintf(count_str, 20, "%d", pmi_process.nproc);
1765
1766 /* post the command and wait for the result */
1767 result = pmi_create_post_command("barrier", pmi_process.kvs_name, NULL, count_str);
1768 if (result != PMI_SUCCESS)
1769 {
1770 pmi_err_printf("PMI_Barrier failed.\n");
1771 return PMI_FAIL;
1772 }
1773
1774 /* interpret the result */
1775 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1776 {
1777 pmi_err_printf("PMI_Barrier failed: no result string in the result command.\n");
1778 return PMI_FAIL;
1779 }
1780 if (strcmp(str, DBS_SUCCESS_STR))
1781 {
1782 pmi_err_printf("PMI_Barrier failed: '%s'\n", str);
1783 return PMI_FAIL;
1784 }
1785
1786 /*printf("iPMI_Barrier success.\n");fflush(stdout);*/
1787 return PMI_SUCCESS;
1788 }
1789
iPMI_KVS_Get_my_name(char kvsname[],int length)1790 int iPMI_KVS_Get_my_name(char kvsname[], int length)
1791 {
1792 if (pmi_process.init_finalized == PMI_FINALIZED)
1793 return PMI_ERR_INIT;
1794 if (kvsname == NULL)
1795 return PMI_ERR_INVALID_ARG;
1796 if (length < PMI_MAX_KVS_NAME_LENGTH)
1797 return PMI_ERR_INVALID_LENGTH;
1798
1799 strncpy(kvsname, pmi_process.kvs_name, length);
1800
1801 /*
1802 printf("my kvs name is %s\n", kvsname);fflush(stdout);
1803 */
1804
1805 return PMI_SUCCESS;
1806 }
1807
iPMI_KVS_Get_name_length_max(int * maxlen)1808 int iPMI_KVS_Get_name_length_max(int *maxlen)
1809 {
1810 if (pmi_process.init_finalized == PMI_FINALIZED)
1811 return PMI_ERR_INIT;
1812 if (maxlen == NULL)
1813 return PMI_ERR_INVALID_ARG;
1814 *maxlen = PMI_MAX_KVS_NAME_LENGTH;
1815 return PMI_SUCCESS;
1816 }
1817
iPMI_KVS_Get_key_length_max(int * maxlen)1818 int iPMI_KVS_Get_key_length_max(int *maxlen)
1819 {
1820 if (pmi_process.init_finalized == PMI_FINALIZED)
1821 return PMI_ERR_INIT;
1822 if (maxlen == NULL)
1823 return PMI_ERR_INVALID_ARG;
1824 *maxlen = PMI_MAX_KEY_LEN;
1825 return PMI_SUCCESS;
1826 }
1827
iPMI_KVS_Get_value_length_max(int * maxlen)1828 int iPMI_KVS_Get_value_length_max(int *maxlen)
1829 {
1830 if (pmi_process.init_finalized == PMI_FINALIZED)
1831 return PMI_ERR_INIT;
1832 if (maxlen == NULL)
1833 return PMI_ERR_INVALID_ARG;
1834 *maxlen = PMI_MAX_VALUE_LEN;
1835 return PMI_SUCCESS;
1836 }
1837
iPMI_KVS_Create(char kvsname[],int length)1838 int iPMI_KVS_Create(char kvsname[], int length)
1839 {
1840 int result;
1841 char str[1024];
1842
1843 if (pmi_process.init_finalized == PMI_FINALIZED)
1844 return PMI_ERR_INIT;
1845 if (kvsname == NULL)
1846 return PMI_ERR_INVALID_ARG;
1847 if (length < PMI_MAX_KVS_NAME_LENGTH)
1848 return PMI_ERR_INVALID_LENGTH;
1849
1850 if(pmi_process.init_finalized == PMI_SINGLETON_INIT_BUT_NO_PM){
1851 if(PMIi_InitSingleton() != PMI_SUCCESS){
1852 return PMI_ERR_INIT;
1853 }
1854 }
1855
1856 if (pmi_process.local_kvs)
1857 {
1858 result = smpd_dbs_create(kvsname);
1859 return (result == SMPD_SUCCESS) ? PMI_SUCCESS : PMI_FAIL;
1860 }
1861
1862 result = pmi_create_post_command("dbcreate", NULL, NULL, NULL);
1863 if (result != PMI_SUCCESS)
1864 {
1865 pmi_err_printf("PMI_KVS_Create failed: unable to create a pmi kvs space.\n");
1866 return PMI_FAIL;
1867 }
1868
1869 /* parse the result of the command */
1870 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1871 {
1872 pmi_err_printf("PMI_KVS_Create failed: no result string in the result command.\n");
1873 return PMI_FAIL;
1874 }
1875 if (strcmp(str, DBS_SUCCESS_STR))
1876 {
1877 pmi_err_printf("PMI_KVS_Create failed: %s\n", str);
1878 return PMI_FAIL;
1879 }
1880 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "name", str, 1024) != MPIU_STR_SUCCESS)
1881 {
1882 pmi_err_printf("PMI_KVS_Create failed: no kvs name in the dbcreate result command.\n");
1883 return PMI_FAIL;
1884 }
1885 strncpy(kvsname, str, PMI_MAX_KVS_NAME_LENGTH);
1886
1887 /*printf("iPMI_KVS_Create success.\n");fflush(stdout);*/
1888 return PMI_SUCCESS;
1889 }
1890
iPMI_KVS_Destroy(const char kvsname[])1891 int iPMI_KVS_Destroy(const char kvsname[])
1892 {
1893 int result;
1894 char str[1024];
1895
1896 if (pmi_process.init_finalized == PMI_FINALIZED)
1897 return PMI_ERR_INIT;
1898 if (kvsname == NULL)
1899 return PMI_ERR_INVALID_ARG;
1900
1901 if (pmi_process.local_kvs)
1902 {
1903 result = smpd_dbs_destroy(kvsname);
1904 return (result == SMPD_SUCCESS) ? PMI_SUCCESS : PMI_FAIL;
1905 }
1906 else{
1907 /* FIXME: Test only for singleton init proc */
1908 int len = 0;
1909 if((len = strlen(kvsname)) > PMI_MAX_KVS_NAME_LENGTH){
1910 return PMI_ERR_INVALID_LENGTH;
1911 }
1912 /* Is the destroy req for stale kvsname before
1913 * singleton init ?
1914 */
1915 if(strncmp(kvsname, pmi_process.kvs_name_singleton_nopm, len)
1916 == 0){
1917 return PMI_SUCCESS;
1918 }
1919 }
1920
1921 result = pmi_create_post_command("dbdestroy", kvsname, NULL, NULL);
1922 if (result != PMI_SUCCESS)
1923 {
1924 pmi_err_printf("PMI_KVS_Destroy failed: unable to destroy the pmi kvs space named '%s'.\n", kvsname);
1925 return PMI_FAIL;
1926 }
1927
1928 /* parse the result of the command */
1929 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1930 {
1931 pmi_err_printf("PMI_KVS_Destroy failed: no result string in the result command.\n");
1932 return PMI_FAIL;
1933 }
1934 if (strcmp(str, DBS_SUCCESS_STR))
1935 {
1936 pmi_err_printf("PMI_KVS_Destroy failed: %s\n", str);
1937 return PMI_FAIL;
1938 }
1939
1940 return PMI_SUCCESS;
1941 }
1942
iPMI_KVS_Put(const char kvsname[],const char key[],const char value[])1943 int iPMI_KVS_Put(const char kvsname[], const char key[], const char value[])
1944 {
1945 int result;
1946 char str[1024];
1947 const char *kvsname_ = NULL;
1948
1949 if (pmi_process.init_finalized == PMI_FINALIZED)
1950 return PMI_ERR_INIT;
1951 if (kvsname == NULL)
1952 return PMI_ERR_INVALID_ARG;
1953 if (key == NULL)
1954 return PMI_ERR_INVALID_KEY;
1955 if (value == NULL)
1956 return PMI_ERR_INVALID_VAL;
1957
1958 kvsname_ = kvsname;
1959 /*printf("putting <%s><%s><%s>\n", kvsname, key, value);fflush(stdout);*/
1960
1961 if (pmi_process.local_kvs)
1962 {
1963 result = smpd_dbs_put(kvsname_, key, value);
1964 return (result == SMPD_SUCCESS) ? PMI_SUCCESS : PMI_FAIL;
1965 }
1966 else{
1967 int len = 0;
1968 if((len = strlen(kvsname)) > PMI_MAX_KVS_NAME_LENGTH){
1969 return PMI_ERR_INVALID_LENGTH;
1970 }
1971 /* Update kvsname if the caller has the stale kvsname before
1972 * singleton init
1973 */
1974 if(strncmp(kvsname, pmi_process.kvs_name_singleton_nopm, len)
1975 == 0){
1976 kvsname_ = pmi_process.kvs_name;
1977 }
1978 }
1979
1980 result = pmi_create_post_command("dbput", kvsname_, key, value);
1981 if (result != PMI_SUCCESS)
1982 {
1983 pmi_err_printf("PMI_KVS_Put failed: unable to put '%s:%s:%s'\n", kvsname_, key, value);
1984 return PMI_FAIL;
1985 }
1986
1987 /* parse the result of the command */
1988 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
1989 {
1990 pmi_err_printf("PMI_KVS_Put failed: no result string in the result command.\n");
1991 return PMI_FAIL;
1992 }
1993 if (strcmp(str, DBS_SUCCESS_STR))
1994 {
1995 pmi_err_printf("PMI_KVS_Put failed: '%s'\n", str);
1996 return PMI_FAIL;
1997 }
1998
1999 /*printf("iPMI_KVS_Put success.\n");fflush(stdout);*/
2000 return PMI_SUCCESS;
2001 }
2002
iPMI_KVS_Commit(const char kvsname[])2003 int iPMI_KVS_Commit(const char kvsname[])
2004 {
2005 if (pmi_process.init_finalized == PMI_FINALIZED)
2006 return PMI_ERR_INIT;
2007 if (kvsname == NULL)
2008 return PMI_ERR_INVALID_ARG;
2009
2010 if (pmi_process.local_kvs)
2011 {
2012 return PMI_SUCCESS;
2013 }
2014
2015 /* Make the puts return when the commands are written but not acknowledged.
2016 Then have this function wait until all outstanding puts are acknowledged.
2017 */
2018
2019 return PMI_SUCCESS;
2020 }
2021
iPMI_KVS_Get(const char kvsname[],const char key[],char value[],int length)2022 int iPMI_KVS_Get(const char kvsname[], const char key[], char value[], int length)
2023 {
2024 int result;
2025 char str[1024];
2026 const char *kvsname_ = NULL;
2027
2028 if (pmi_process.init_finalized == PMI_FINALIZED)
2029 return PMI_ERR_INIT;
2030
2031 if (kvsname == NULL)
2032 return PMI_ERR_INVALID_ARG;
2033 if (key == NULL)
2034 return PMI_ERR_INVALID_KEY;
2035 if (value == NULL)
2036 return PMI_ERR_INVALID_VAL;
2037
2038 kvsname_ = kvsname;
2039
2040 /* We need singleton init only for spawn, universe_size
2041 if(pmi_process.init_finalized == PMI_SINGLETON_INIT_BUT_NO_PM){
2042 if(PMIi_InitSingleton() != PMI_SUCCESS){
2043 return PMI_ERR_INIT;
2044 }
2045 }
2046 */
2047
2048 if (pmi_process.local_kvs)
2049 {
2050 result = smpd_dbs_get(kvsname_, key, value);
2051 return (result == SMPD_SUCCESS) ? PMI_SUCCESS : PMI_FAIL;
2052 }
2053 else{
2054 int len = 0;
2055 if((len = strlen(kvsname)) > PMI_MAX_KVS_NAME_LENGTH){
2056 return PMI_ERR_INVALID_LENGTH;
2057 }
2058 /* Update kvsname if the caller has the stale kvsname before
2059 * singleton init
2060 */
2061 if(strncmp(kvsname, pmi_process.kvs_name_singleton_nopm, len)
2062 == 0){
2063 kvsname_ = pmi_process.kvs_name;
2064 }
2065 }
2066
2067 result = pmi_create_post_command("dbget", kvsname_, key, NULL);
2068 if (result != PMI_SUCCESS)
2069 {
2070 pmi_err_printf("PMI_KVS_Get failed: unable to get '%s:%s'\n", kvsname_, key);
2071 return PMI_FAIL;
2072 }
2073
2074 /* parse the result of the command */
2075 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
2076 {
2077 pmi_err_printf("PMI_KVS_Get failed: no result string in the result command.\n");
2078 return PMI_FAIL;
2079 }
2080 if (strcmp(str, DBS_SUCCESS_STR))
2081 {
2082 /* Unable to find the keyval in db. The caller should handle this error */
2083 pmi_dbg_printf("PMI_KVS_Get failed: '%s'\n", str);
2084 return PMI_FAIL;
2085 }
2086 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "value", value, length) != MPIU_STR_SUCCESS)
2087 {
2088 pmi_err_printf("PMI_KVS_Get failed: no value in the result command for the get: '%s'\n", pmi_process.context->read_cmd.cmd);
2089 return PMI_FAIL;
2090 }
2091
2092 /*
2093 printf("iPMI_KVS_Get success.\n");fflush(stdout);
2094 printf("get <%s><%s><%s>\n", kvsname, key, value);
2095 fflush(stdout);
2096 */
2097 return PMI_SUCCESS;
2098 }
2099
iPMI_KVS_Iter_first(const char kvsname[],char key[],int key_len,char value[],int val_len)2100 int iPMI_KVS_Iter_first(const char kvsname[], char key[], int key_len, char value[], int val_len)
2101 {
2102 int result;
2103 char str[1024];
2104 const char *kvsname_ = NULL;
2105
2106 if (pmi_process.init_finalized == PMI_FINALIZED)
2107 return PMI_ERR_INIT;
2108 if (kvsname == NULL)
2109 return PMI_ERR_INVALID_ARG;
2110 if (key == NULL)
2111 return PMI_ERR_INVALID_KEY;
2112 if (key_len < PMI_MAX_KEY_LEN)
2113 return PMI_ERR_INVALID_KEY_LENGTH;
2114 if (value == NULL)
2115 return PMI_ERR_INVALID_VAL;
2116 if (val_len < PMI_MAX_VALUE_LEN)
2117 return PMI_ERR_INVALID_VAL_LENGTH;
2118
2119 kvsname_ = kvsname;
2120 if (pmi_process.local_kvs)
2121 {
2122 result = smpd_dbs_first(kvsname_, key, value);
2123 return (result == SMPD_SUCCESS) ? PMI_SUCCESS : PMI_FAIL;
2124 }
2125 else{
2126 int len = 0;
2127 if((len = strlen(kvsname)) > PMI_MAX_KVS_NAME_LENGTH){
2128 return PMI_ERR_INVALID_LENGTH;
2129 }
2130 /* Update kvsname if the caller has the stale kvsname before
2131 * singleton init
2132 */
2133 if(strncmp(kvsname, pmi_process.kvs_name_singleton_nopm, len)
2134 == 0){
2135 kvsname_ = pmi_process.kvs_name;
2136 }
2137 }
2138
2139 result = pmi_create_post_command("dbfirst", kvsname_, NULL, NULL);
2140 if (result != PMI_SUCCESS)
2141 {
2142 pmi_err_printf("PMI_KVS_Iter_first failed: unable to get the first key/value pair from '%s'\n", kvsname_);
2143 return PMI_FAIL;
2144 }
2145
2146 /* parse the result of the command */
2147 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
2148 {
2149 pmi_err_printf("PMI_KVS_Iter_first failed: no result string in the result command.\n");
2150 return PMI_FAIL;
2151 }
2152 if (strcmp(str, DBS_SUCCESS_STR))
2153 {
2154 pmi_err_printf("PMI_KVS_Iter_first failed: %s\n", str);
2155 return PMI_FAIL;
2156 }
2157 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "key", str, PMI_MAX_KEY_LEN) != MPIU_STR_SUCCESS)
2158 {
2159 pmi_err_printf("PMI_KVS_Iter_first failed: no key in the result command for the pmi iter_first: '%s'\n", pmi_process.context->read_cmd.cmd);
2160 return PMI_FAIL;
2161 }
2162 if (strcmp(str, DBS_END_STR) == 0)
2163 {
2164 *key = '\0';
2165 *value = '\0';
2166 return PMI_SUCCESS;
2167 }
2168 strcpy(key, str);
2169 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "value", value, PMI_MAX_VALUE_LEN) != MPIU_STR_SUCCESS)
2170 {
2171 pmi_err_printf("PMI_KVS_Iter_first failed: no value in the result command for the pmi iter_first: '%s'\n", pmi_process.context->read_cmd.cmd);
2172 return PMI_FAIL;
2173 }
2174
2175 return PMI_SUCCESS;
2176 }
2177
iPMI_KVS_Iter_next(const char kvsname[],char key[],int key_len,char value[],int val_len)2178 int iPMI_KVS_Iter_next(const char kvsname[], char key[], int key_len, char value[], int val_len)
2179 {
2180 int result;
2181 char str[1024];
2182 const char *kvsname_ = NULL;
2183
2184 if (pmi_process.init_finalized == PMI_FINALIZED)
2185 return PMI_ERR_INIT;
2186 if (kvsname == NULL)
2187 return PMI_ERR_INVALID_ARG;
2188 if (key == NULL)
2189 return PMI_ERR_INVALID_KEY;
2190 if (key_len < PMI_MAX_KEY_LEN)
2191 return PMI_ERR_INVALID_KEY_LENGTH;
2192 if (value == NULL)
2193 return PMI_ERR_INVALID_VAL;
2194 if (val_len < PMI_MAX_VALUE_LEN)
2195 return PMI_ERR_INVALID_VAL_LENGTH;
2196
2197 kvsname_ = kvsname;
2198
2199 if (pmi_process.local_kvs)
2200 {
2201 result = smpd_dbs_next(kvsname_, key, value);
2202 return (result == SMPD_SUCCESS) ? PMI_SUCCESS : PMI_FAIL;
2203 }
2204 else{
2205 int len = 0;
2206 if((len = strlen(kvsname)) > PMI_MAX_KVS_NAME_LENGTH){
2207 return PMI_ERR_INVALID_LENGTH;
2208 }
2209 /* Update kvsname if the caller has the stale kvsname before
2210 * singleton init
2211 */
2212 if(strncmp(kvsname, pmi_process.kvs_name_singleton_nopm, len)
2213 == 0){
2214 kvsname_ = pmi_process.kvs_name;
2215 }
2216 }
2217
2218 result = pmi_create_post_command("dbnext", kvsname_, NULL, NULL);
2219 if (result != PMI_SUCCESS)
2220 {
2221 pmi_err_printf("PMI_KVS_Iter_next failed: unable to get the next key/value pair from '%s'\n", kvsname_);
2222 return PMI_FAIL;
2223 }
2224
2225 /* parse the result of the command */
2226 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "result", str, 1024) != MPIU_STR_SUCCESS)
2227 {
2228 pmi_err_printf("PMI_KVS_Iter_next failed: no result string in the result command.\n");
2229 return PMI_FAIL;
2230 }
2231 if (strcmp(str, DBS_SUCCESS_STR))
2232 {
2233 pmi_err_printf("PMI_KVS_Iter_next failed: %s\n", str);
2234 return PMI_FAIL;
2235 }
2236 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "key", str, PMI_MAX_KEY_LEN) != MPIU_STR_SUCCESS)
2237 {
2238 pmi_err_printf("PMI_KVS_Iter_next failed: no key in the result command for the pmi iter_next: '%s'\n", pmi_process.context->read_cmd.cmd);
2239 return PMI_FAIL;
2240 }
2241 if (strcmp(str, DBS_END_STR) == 0)
2242 {
2243 *key = '\0';
2244 *value = '\0';
2245 return PMI_SUCCESS;
2246 }
2247 strcpy(key, str);
2248 if (MPIU_Str_get_string_arg(pmi_process.context->read_cmd.cmd, "value", value, PMI_MAX_VALUE_LEN) != MPIU_STR_SUCCESS)
2249 {
2250 pmi_err_printf("PMI_KVS_Iter_next failed: no value in the result command for the pmi iter_next: '%s'\n", pmi_process.context->read_cmd.cmd);
2251 return PMI_FAIL;
2252 }
2253
2254 return PMI_SUCCESS;
2255 }
2256
iPMI_Spawn_multiple(int count,const char * cmds[],const char ** argvs[],const int maxprocs[],const int cinfo_keyval_sizes[],const PMI_keyval_t * info_keyval_vectors[],int preput_keyval_size,const PMI_keyval_t preput_keyval_vector[],int errors[])2257 int iPMI_Spawn_multiple(int count,
2258 const char * cmds[],
2259 const char ** argvs[],
2260 const int maxprocs[],
2261 const int cinfo_keyval_sizes[],
2262 const PMI_keyval_t * info_keyval_vectors[],
2263 int preput_keyval_size,
2264 const PMI_keyval_t preput_keyval_vector[],
2265 int errors[])
2266 {
2267 int result;
2268 smpd_command_t *cmd_ptr;
2269 int dest = 0;
2270 char buffer[SMPD_MAX_CMD_LENGTH];
2271 char keyval_buf[SMPD_MAX_CMD_LENGTH];
2272 char key[100];
2273 char *iter, *iter2;
2274 int i, j, maxlen, maxlen2;
2275 int path_specified = 0, wdir_specified = 0;
2276 char path[SMPD_MAX_PATH_LENGTH] = "";
2277 int *info_keyval_sizes;
2278 int total_num_processes;
2279 int appnum = 0;
2280
2281 if (pmi_process.init_finalized == PMI_FINALIZED)
2282 return PMI_ERR_INIT;
2283 if(pmi_process.init_finalized == PMI_SINGLETON_INIT_BUT_NO_PM){
2284 if(PMIi_InitSingleton() != PMI_SUCCESS){
2285 return PMI_ERR_INIT;
2286 }
2287 }
2288
2289 if (count < 1 || cmds == NULL || maxprocs == NULL || preput_keyval_size < 0)
2290 return PMI_ERR_INVALID_ARG;
2291
2292 if (pmi_process.local_kvs)
2293 {
2294 return PMI_FAIL;
2295 }
2296
2297 /*printf("creating spawn command.\n");fflush(stdout);*/
2298 result = smpd_create_command("spawn", pmi_process.smpd_id, dest, SMPD_TRUE, &cmd_ptr);
2299 if (result != SMPD_SUCCESS)
2300 {
2301 pmi_err_printf("unable to create a spawn command.\n");
2302 return PMI_FAIL;
2303 }
2304 result = smpd_add_command_int_arg(cmd_ptr, "ctx_key", pmi_process.smpd_key);
2305 if (result != SMPD_SUCCESS)
2306 {
2307 pmi_err_printf("unable to add the key to the spawn command.\n");
2308 return PMI_FAIL;
2309 }
2310
2311 /* add the number of commands */
2312 result = smpd_add_command_int_arg(cmd_ptr, "ncmds", count);
2313 if (result != SMPD_SUCCESS)
2314 {
2315 pmi_err_printf("unable to add the ncmds field to the spawn command.\n");
2316 return PMI_FAIL;
2317 }
2318 /* add the commands and their argv arrays */
2319 for (i=0; i<count; i++)
2320 {
2321 sprintf(key, "cmd%d", i);
2322 #ifdef HAVE_WINDOWS_H
2323 if (strlen(cmds[i]) > 2)
2324 {
2325 if (cmds[i][0] == '.' && cmds[i][1] == '/')
2326 {
2327 result = smpd_add_command_arg(cmd_ptr, key, (char*)&cmds[i][2]);
2328 }
2329 else
2330 {
2331 result = smpd_add_command_arg(cmd_ptr, key, (char*)cmds[i]);
2332 }
2333 }
2334 else
2335 {
2336 result = smpd_add_command_arg(cmd_ptr, key, (char*)cmds[i]);
2337 }
2338 #else
2339 result = smpd_add_command_arg(cmd_ptr, key, (char*)cmds[i]);
2340 #endif
2341 if (result != SMPD_SUCCESS)
2342 {
2343 pmi_err_printf("unable to add %s(%s) to the spawn command.\n", key, cmds[i]);
2344 return PMI_FAIL;
2345 }
2346 if (argvs)
2347 {
2348 buffer[0] = '\0';
2349 iter = buffer;
2350 maxlen = SMPD_MAX_CMD_LENGTH;
2351 if (argvs[i] != NULL)
2352 {
2353 for (j=0; argvs[i][j] != NULL; j++)
2354 {
2355 result = MPIU_Str_add_string(&iter, &maxlen, argvs[i][j]);
2356 }
2357 if (iter > buffer)
2358 {
2359 iter--;
2360 *iter = '\0'; /* erase the trailing space */
2361 }
2362 }
2363 sprintf(key, "argv%d", i);
2364 result = smpd_add_command_arg(cmd_ptr, key, buffer);
2365 if (result != SMPD_SUCCESS)
2366 {
2367 pmi_err_printf("unable to add %s(%s) to the spawn command.\n", key, buffer);
2368 return PMI_FAIL;
2369 }
2370 }
2371 }
2372 /* add the maxprocs array and count the total number of processes */
2373 total_num_processes = 0;
2374 buffer[0] = '\0';
2375 for (i=0; i<count; i++)
2376 {
2377 total_num_processes += maxprocs[i];
2378 if (i < count-1)
2379 sprintf(key, "%d ", maxprocs[i]);
2380 else
2381 sprintf(key, "%d", maxprocs[i]);
2382 strcat(buffer, key);
2383 }
2384 result = smpd_add_command_arg(cmd_ptr, "maxprocs", buffer);
2385 if (result != SMPD_SUCCESS)
2386 {
2387 pmi_err_printf("unable to add maxprocs(%s) to the spawn command.\n", buffer);
2388 return PMI_FAIL;
2389 }
2390
2391 #ifdef HAVE_WINDOWS_H
2392 {
2393 HMODULE hModule;
2394 char exe_path[SMPD_MAX_PATH_LENGTH];
2395 char *iter;
2396 int length;
2397
2398 GetCurrentDirectory(SMPD_MAX_PATH_LENGTH, path);
2399 hModule = GetModuleHandle(NULL);
2400 if (GetModuleFileName(hModule, exe_path, SMPD_MAX_PATH_LENGTH))
2401 {
2402 iter = strrchr(exe_path, '\\');
2403 if (iter != NULL)
2404 {
2405 if (iter == (exe_path + 2) && *(iter-1) == ':')
2406 {
2407 /* leave the \ if the path is at the root, like c:\foo.exe */
2408 iter++;
2409 }
2410 *iter = '\0'; /* erase the file name leaving only the path */
2411 }
2412 length = (int)strlen(path);
2413 iter = &path[length];
2414 MPIU_Snprintf(iter, SMPD_MAX_PATH_LENGTH-length, ";%s", exe_path);
2415 }
2416 }
2417 #else
2418 getcwd(path, SMPD_MAX_PATH_LENGTH);
2419 #endif
2420
2421 /* create a copy of the sizes so we can change the values locally */
2422 info_keyval_sizes = (int*)MPIU_Malloc(count * sizeof(int));
2423 if (info_keyval_sizes == NULL)
2424 {
2425 pmi_err_printf("unable to allocate an array of kevval sizes.\n");
2426 return PMI_FAIL;
2427 }
2428 for (i=0; i<count; i++)
2429 {
2430 info_keyval_sizes[i] = cinfo_keyval_sizes[i];
2431 }
2432
2433 /* add the keyvals */
2434 if (info_keyval_sizes && info_keyval_vectors){
2435 for (i=0; i<count; i++){
2436 path_specified = 0;
2437 wdir_specified = 0;
2438 buffer[0] = '\0';
2439 iter = buffer;
2440 maxlen = SMPD_MAX_CMD_LENGTH;
2441
2442 for (j=0; j<info_keyval_sizes[i]; j++){
2443 keyval_buf[0] = '\0';
2444 iter2 = keyval_buf;
2445 maxlen2 = SMPD_MAX_CMD_LENGTH;
2446 if (strcmp(info_keyval_vectors[i][j].key, "path") == 0){
2447 size_t val2len;
2448 char *val2;
2449 val2len = sizeof(char) * strlen(info_keyval_vectors[i][j].val) + 1 + strlen(path) + 1;
2450 val2 = (char*)MPIU_Malloc(val2len);
2451 if (val2 == NULL){
2452 pmi_err_printf("unable to allocate memory for the path key.\n");
2453 return PMI_FAIL;
2454 }
2455 /*printf("creating path %d: <%s>;<%s>\n", val2len, info_keyval_vectors[i][j].val, path);fflush(stdout);*/
2456 MPIU_Snprintf(val2, val2len, "%s;%s", info_keyval_vectors[i][j].val, path);
2457 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, info_keyval_vectors[i][j].key, val2);
2458 if (result != MPIU_STR_SUCCESS){
2459 pmi_err_printf("unable to add %s=%s to the spawn command.\n", info_keyval_vectors[i][j].key, val2);
2460 MPIU_Free(val2);
2461 return PMI_FAIL;
2462 }
2463 MPIU_Free(val2);
2464 path_specified = 1;
2465 }
2466 else{
2467 if(strcmp(info_keyval_vectors[i][j].key, "wdir") == 0){
2468 wdir_specified = 1;
2469 }
2470 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, info_keyval_vectors[i][j].key, info_keyval_vectors[i][j].val);
2471 if (result != MPIU_STR_SUCCESS){
2472 pmi_err_printf("unable to add %s=%s to the spawn command.\n", info_keyval_vectors[i][j].key, info_keyval_vectors[i][j].val);
2473 return PMI_FAIL;
2474 }
2475 }
2476 if (iter2 > keyval_buf){
2477 iter2--;
2478 *iter2 = '\0'; /* remove the trailing space */
2479 }
2480 sprintf(key, "%d", j);
2481 result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf);
2482 if (result != MPIU_STR_SUCCESS){
2483 pmi_err_printf("unable to add %s=%s to the spawn command.\n", key, keyval_buf);
2484 return PMI_FAIL;
2485 }
2486 }
2487 /* add the current directory as the default path if a path has not been specified */
2488 if (!path_specified){
2489 keyval_buf[0] = '\0';
2490 iter2 = keyval_buf;
2491 maxlen2 = SMPD_MAX_CMD_LENGTH;
2492 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, "path", path);
2493 iter2--;
2494 *iter2 = '\0';
2495 sprintf(key, "%d", j++);
2496 result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf);
2497 if (result != MPIU_STR_SUCCESS){
2498 pmi_err_printf("unable to add %s=%s to the spawn command.\n", key, keyval_buf);
2499 return PMI_FAIL;
2500 }
2501 info_keyval_sizes[i]++;
2502 }
2503 if(!wdir_specified){
2504 char wdir[SMPD_MAX_DIR_LENGTH];
2505 if(getcwd(wdir, SMPD_MAX_DIR_LENGTH)){
2506 keyval_buf[0] = '\0';
2507 iter2 = keyval_buf;
2508 maxlen2 = SMPD_MAX_CMD_LENGTH;
2509 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, "wdir", wdir);
2510 if(result != MPIU_STR_SUCCESS){
2511 pmi_err_printf("Unable to add wdir to keyval_buf\n");
2512 return PMI_FAIL;
2513 }
2514 *(--iter2) = '\0';
2515 sprintf(key, "%d", j++);
2516 result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf);
2517 if(result != MPIU_STR_SUCCESS){
2518 pmi_err_printf("unable to add %s=%s to the spawn command\n", key, keyval_buf);
2519 return PMI_FAIL;
2520 }
2521 info_keyval_sizes[i]++;
2522 }
2523 }
2524 #ifdef HAVE_WINDOWS_H
2525 /* FIXME: We don't support user environment infos for spawn() */
2526 if(pmi_process.rpmi == PMI_TRUE){
2527 /* Add channel environment for rpmi/singleton_init procs */
2528 char *env, env_str[SMPD_MAX_ENV_LENGTH];
2529 env = getenv("MPICH2_CHANNEL");
2530 if(env != NULL){
2531 snprintf(env_str, SMPD_MAX_ENV_LENGTH, "MPICH2_CHANNEL=%s", env);
2532 keyval_buf[0] = '\0';
2533 iter2 = keyval_buf;
2534 maxlen2 = SMPD_MAX_CMD_LENGTH;
2535 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, "env", env_str);
2536 iter2--;
2537 *iter2 = '\0';
2538 sprintf(key, "%d", j++);
2539 result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf);
2540 if (result != MPIU_STR_SUCCESS){
2541 pmi_err_printf("unable to add %s=%s to the spawn command.\n", key, keyval_buf);
2542 return PMI_FAIL;
2543 }
2544 info_keyval_sizes[i]++;
2545 }
2546 }
2547 #endif
2548 if (iter != buffer){
2549 iter--;
2550 *iter = '\0'; /* remove the trailing space */
2551 }
2552 sprintf(key, "keyvals%d", i);
2553 result = smpd_add_command_arg(cmd_ptr, key, buffer);
2554 if (result != SMPD_SUCCESS){
2555 pmi_err_printf("unable to add %s(%s) to the spawn command.\n", key, buffer);
2556 return PMI_FAIL;
2557 }
2558 }
2559 }
2560 else
2561 {
2562 if (!info_keyval_sizes)
2563 {
2564 buffer[0] = '\0';
2565 for (i=0; i<count; i++)
2566 {
2567 if (i < count-1)
2568 strcat(buffer, "1 ");
2569 else
2570 strcat(buffer, "1");
2571 }
2572 result = smpd_add_command_arg(cmd_ptr, "nkeyvals", buffer);
2573 if (result != SMPD_SUCCESS)
2574 {
2575 pmi_err_printf("unable to add nkeyvals(%s) to the spawn command.\n", buffer);
2576 return PMI_FAIL;
2577 }
2578 }
2579 for (i=0; i<count; i++)
2580 {
2581 buffer[0] = '\0';
2582 iter = buffer;
2583 maxlen = SMPD_MAX_CMD_LENGTH;
2584 /* add the current directory as the default path if a path has not been specified */
2585 keyval_buf[0] = '\0';
2586 iter2 = keyval_buf;
2587 maxlen2 = SMPD_MAX_CMD_LENGTH;
2588 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, "path", path);
2589 iter2--;
2590 *iter2 = '\0';
2591 strcpy(key, "0");
2592 result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf);
2593 if (result != MPIU_STR_SUCCESS)
2594 {
2595 pmi_err_printf("unable to add %s=%s to the spawn command.\n", key, keyval_buf);
2596 return PMI_FAIL;
2597 }
2598 sprintf(key, "keyvals%d", i);
2599 result = smpd_add_command_arg(cmd_ptr, key, buffer);
2600 if (result != SMPD_SUCCESS)
2601 {
2602 pmi_err_printf("unable to add %s(%s) to the spawn command.\n", key, buffer);
2603 return PMI_FAIL;
2604 }
2605 }
2606 }
2607
2608 /* add the keyval sizes array */
2609 if (info_keyval_sizes)
2610 {
2611 buffer[0] = '\0';
2612 for (i=0; i<count; i++)
2613 {
2614 if (i < count-1)
2615 sprintf(key, "%d ", info_keyval_sizes[i] > 0 ? info_keyval_sizes[i] : 1);
2616 else
2617 sprintf(key, "%d", info_keyval_sizes[i] > 0 ? info_keyval_sizes[i] : 1);
2618 strcat(buffer, key);
2619 }
2620 result = smpd_add_command_arg(cmd_ptr, "nkeyvals", buffer);
2621 if (result != SMPD_SUCCESS)
2622 {
2623 pmi_err_printf("unable to add nkeyvals(%s) to the spawn command.\n", buffer);
2624 return PMI_FAIL;
2625 }
2626 }
2627
2628 MPIU_Free(info_keyval_sizes);
2629
2630 /* add the pre-put keyvals */
2631 result = smpd_add_command_int_arg(cmd_ptr, "npreput", preput_keyval_size);
2632 if (result != SMPD_SUCCESS)
2633 {
2634 pmi_err_printf("unable to add npreput=%d to the spawn command.\n", preput_keyval_size);
2635 return PMI_FAIL;
2636 }
2637 if (preput_keyval_size > 0 && preput_keyval_vector)
2638 {
2639 buffer[0] = '\0';
2640 iter = buffer;
2641 maxlen = SMPD_MAX_CMD_LENGTH;
2642 for (i=0; i<preput_keyval_size; i++)
2643 {
2644 keyval_buf[0] = '\0';
2645 iter2 = keyval_buf;
2646 maxlen2 = SMPD_MAX_CMD_LENGTH;
2647 result = MPIU_Str_add_string_arg(&iter2, &maxlen2, preput_keyval_vector[i].key, preput_keyval_vector[i].val);
2648 if (result != MPIU_STR_SUCCESS)
2649 {
2650 pmi_err_printf("unable to add %s=%s to the spawn command.\n", preput_keyval_vector[i].key, preput_keyval_vector[i].val);
2651 return PMI_FAIL;
2652 }
2653 if (iter2 > keyval_buf)
2654 {
2655 iter2--;
2656 *iter2 = '\0'; /* remove the trailing space */
2657 }
2658 sprintf(key, "%d", i);
2659 result = MPIU_Str_add_string_arg(&iter, &maxlen, key, keyval_buf);
2660 if (result != MPIU_STR_SUCCESS)
2661 {
2662 pmi_err_printf("unable to add %s=%s to the spawn command.\n", key, keyval_buf);
2663 return PMI_FAIL;
2664 }
2665 }
2666 result = smpd_add_command_arg(cmd_ptr, "preput", buffer);
2667 if (result != SMPD_SUCCESS)
2668 {
2669 pmi_err_printf("unable to add preput(%s) to the spawn command.\n", buffer);
2670 return PMI_FAIL;
2671 }
2672 }
2673
2674 /*printf("spawn command: <%s>\n", cmd_ptr->cmd);*/
2675
2676 /* post the write of the command */
2677 /*
2678 printf("posting write of spawn command to %s context, sock %d: '%s'\n",
2679 smpd_get_context_str(pmi_process.context), SMPDU_Sock_get_sock_id(pmi_process.context->sock), cmd_ptr->cmd);
2680 fflush(stdout);
2681 */
2682
2683 result = smpd_post_write_command(pmi_process.context, cmd_ptr);
2684 if (result != SMPD_SUCCESS)
2685 {
2686 pmi_err_printf("unable to post a write of the spawn command.\n");
2687 return PMI_FAIL;
2688 }
2689
2690 /* post a read for the result*/
2691 result = smpd_post_read_command(pmi_process.context);
2692 if (result != SMPD_SUCCESS)
2693 {
2694 pmi_err_printf("unable to post a read of the next command on the pmi context.\n");
2695 return PMI_FAIL;
2696 }
2697
2698 /* let the state machine send the command and receive the result */
2699 result = smpd_enter_at_state(pmi_process.set, SMPD_WRITING_CMD);
2700 if (result != SMPD_SUCCESS)
2701 {
2702 /*printf("PMI_Spawn_multiple returning failure.\n");fflush(stdout);*/
2703 pmi_err_printf("the state machine logic failed to get the result of the spawn command.\n");
2704 return PMI_FAIL;
2705 }
2706
2707 for (i=0; i<total_num_processes; i++)
2708 {
2709 errors[i] = PMI_SUCCESS;
2710 }
2711 /*printf("PMI_Spawn_multiple returning success.\n");fflush(stdout);*/
2712 return PMI_SUCCESS;
2713 }
2714
iPMI_Parse_option(int num_args,char * args[],int * num_parsed,PMI_keyval_t ** keyvalp,int * size)2715 int iPMI_Parse_option(int num_args, char *args[], int *num_parsed, PMI_keyval_t **keyvalp, int *size)
2716 {
2717 if (num_args < 1)
2718 return PMI_ERR_INVALID_NUM_ARGS;
2719 if (args == NULL)
2720 return PMI_ERR_INVALID_ARGS;
2721 if (num_parsed == NULL)
2722 return PMI_ERR_INVALID_NUM_PARSED;
2723 if (keyvalp == NULL)
2724 return PMI_ERR_INVALID_KEYVALP;
2725 if (size == NULL)
2726 return PMI_ERR_INVALID_SIZE;
2727 *num_parsed = 0;
2728 *keyvalp = NULL;
2729 *size = 0;
2730 return PMI_SUCCESS;
2731 }
2732
2733 int iPMI_Args_to_keyval(int *argcp, char *((*argvp)[]), PMI_keyval_t **keyvalp, int *size)
2734 {
2735 if (argcp == NULL || argvp == NULL || keyvalp == NULL || size == NULL)
2736 return PMI_ERR_INVALID_ARG;
2737 return PMI_SUCCESS;
2738 }
2739
iPMI_Free_keyvals(PMI_keyval_t keyvalp[],int size)2740 int iPMI_Free_keyvals(PMI_keyval_t keyvalp[], int size)
2741 {
2742 if (keyvalp == NULL || size < 0)
2743 return PMI_ERR_INVALID_ARG;
2744 if (size == 0)
2745 return PMI_SUCCESS;
2746 /* free stuff */
2747 return PMI_SUCCESS;
2748 }
2749
2750 static char * namepub_kvs = NULL;
setup_name_service()2751 static int setup_name_service()
2752 {
2753 int result;
2754 char *pmi_namepub_kvs;
2755
2756 if (namepub_kvs != NULL)
2757 {
2758 /* FIXME: Should it be an error to call setup_name_service twice? */
2759 MPIU_Free(namepub_kvs);
2760 }
2761
2762 namepub_kvs = (char*)MPIU_Malloc(PMI_MAX_KVS_NAME_LENGTH);
2763 if (!namepub_kvs)
2764 {
2765 pmi_err_printf("unable to allocate memory for the name publisher kvs.\n");
2766 return PMI_FAIL;
2767 }
2768
2769 pmi_namepub_kvs = getenv("PMI_NAMEPUB_KVS");
2770 if (pmi_namepub_kvs)
2771 {
2772 strncpy(namepub_kvs, pmi_namepub_kvs, PMI_MAX_KVS_NAME_LENGTH);
2773 }
2774 else
2775 {
2776 /*result = PMI_KVS_Create(namepub_kvs, PMI_MAX_KVS_NAME_LENGTH);*/
2777 result = iPMI_Get_kvs_domain_id(namepub_kvs, PMI_MAX_KVS_NAME_LENGTH);
2778 if (result != PMI_SUCCESS)
2779 {
2780 pmi_err_printf("unable to get the name publisher kvs name.\n");
2781 return result;
2782 }
2783 }
2784
2785 /*printf("namepub kvs: <%s>\n", namepub_kvs);fflush(stdout);*/
2786 return PMI_SUCCESS;
2787 }
2788
iPMI_Publish_name(const char service_name[],const char port[])2789 int iPMI_Publish_name( const char service_name[], const char port[] )
2790 {
2791 int result;
2792 if (service_name == NULL || port == NULL)
2793 return PMI_ERR_INVALID_ARG;
2794 if (namepub_kvs == NULL)
2795 {
2796 result = setup_name_service();
2797 if (result != PMI_SUCCESS)
2798 return result;
2799 }
2800 /*printf("publish kvs: <%s>\n", namepub_kvs);fflush(stdout);*/
2801 result = iPMI_KVS_Put(namepub_kvs, service_name, port);
2802 if (result != PMI_SUCCESS)
2803 {
2804 pmi_err_printf("unable to put the service name and port into the name publisher kvs.\n");
2805 return result;
2806 }
2807 result = iPMI_KVS_Commit(namepub_kvs);
2808 if (result != PMI_SUCCESS)
2809 {
2810 pmi_err_printf("unable to commit the name publisher kvs.\n");
2811 return result;
2812 }
2813 return PMI_SUCCESS;
2814 }
2815
iPMI_Unpublish_name(const char service_name[])2816 int iPMI_Unpublish_name( const char service_name[] )
2817 {
2818 int result;
2819 if (service_name == NULL)
2820 return PMI_ERR_INVALID_ARG;
2821 if (namepub_kvs == NULL)
2822 {
2823 result = setup_name_service();
2824 if (result != PMI_SUCCESS)
2825 return result;
2826 }
2827 /*printf("unpublish kvs: <%s>\n", namepub_kvs);fflush(stdout);*/
2828 /* This assumes you can put the same key more than once which breaks the PMI specification */
2829 result = iPMI_KVS_Put(namepub_kvs, service_name, "");
2830 if (result != PMI_SUCCESS)
2831 {
2832 pmi_err_printf("unable to put the blank service name and port into the name publisher kvs.\n");
2833 return result;
2834 }
2835 result = iPMI_KVS_Commit(namepub_kvs);
2836 if (result != PMI_SUCCESS)
2837 {
2838 pmi_err_printf("unable to commit the name publisher kvs.\n");
2839 return result;
2840 }
2841 return PMI_SUCCESS;
2842 }
2843
iPMI_Lookup_name(const char service_name[],char port[])2844 int iPMI_Lookup_name( const char service_name[], char port[] )
2845 {
2846 int result;
2847 if (service_name == NULL || port == NULL)
2848 return PMI_ERR_INVALID_ARG;
2849 if (namepub_kvs == NULL)
2850 {
2851 result = setup_name_service();
2852 if (result != PMI_SUCCESS)
2853 return result;
2854 }
2855 /*printf("lookup kvs: <%s>\n", namepub_kvs);fflush(stdout);*/
2856 silence = 1;
2857 result = iPMI_KVS_Get(namepub_kvs, service_name, port, MPI_MAX_PORT_NAME);
2858 silence = 0;
2859 if (result != PMI_SUCCESS)
2860 {
2861 /* fail silently */
2862 /*pmi_err_printf("unable to get the service name and port from the name publisher kvs.\n");*/
2863 return result;
2864 }
2865
2866 if (port[0] == '\0')
2867 {
2868 return MPI_ERR_NAME;
2869 }
2870 return PMI_SUCCESS;
2871 }
2872
2873 #ifndef HAVE_WINDOWS_H
writebuf(int fd,void * buffer,int length)2874 static int writebuf(int fd, void *buffer, int length)
2875 {
2876 unsigned char *buf;
2877 int num_written;
2878
2879 buf = (unsigned char *)buffer;
2880 while (length)
2881 {
2882 num_written = write(fd, buf, length);
2883 if (num_written < 0)
2884 {
2885 if (errno != EINTR)
2886 {
2887 return errno;
2888 }
2889 num_written = 0;
2890 }
2891 buf = buf + num_written;
2892 length = length - num_written;
2893 }
2894 return 0;
2895 }
2896
readbuf(int fd,void * buffer,int length)2897 static int readbuf(int fd, void *buffer, int length)
2898 {
2899 unsigned char *buf;
2900 int num_read;
2901
2902 buf = (unsigned char *)buffer;
2903 while (length)
2904 {
2905 num_read = read(fd, buf, length);
2906 if (num_read < 0)
2907 {
2908 if (errno != EINTR)
2909 {
2910 return errno;
2911 }
2912 num_read = 0;
2913 }
2914 else if (num_read == 0)
2915 {
2916 return -1;
2917 }
2918 buf = buf + num_read;
2919 length = length - num_read;
2920 }
2921 return 0;
2922 }
2923 #endif
2924
PMIX_Start_root_smpd(int nproc,char * host,int len,int * port)2925 int PMIX_Start_root_smpd(int nproc, char *host, int len, int *port)
2926 {
2927 #ifdef HAVE_WINDOWS_H
2928 DWORD dwLength = len;
2929 #else
2930 int pipe_fd[2];
2931 int result;
2932 #endif
2933
2934 pmi_process.nproc = nproc;
2935
2936 #ifdef HAVE_WINDOWS_H
2937 pmi_process.hRootThreadReadyEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
2938 if (pmi_process.hRootThreadReadyEvent == NULL)
2939 {
2940 pmi_err_printf("unable to create the root listener synchronization event, error: %d\n", GetLastError());
2941 return PMI_FAIL;
2942 }
2943 pmi_process.hRootThread = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)root_smpd, NULL, 0, NULL);
2944 if (pmi_process.hRootThread == NULL)
2945 {
2946 pmi_err_printf("unable to create the root listener thread: error %d\n", GetLastError());
2947 return PMI_FAIL;
2948 }
2949 if (WaitForSingleObject(pmi_process.hRootThreadReadyEvent, 60000) != WAIT_OBJECT_0)
2950 {
2951 pmi_err_printf("the root process thread failed to initialize.\n");
2952 return PMI_FAIL;
2953 }
2954 /*GetComputerName(host, &dwLength);*/
2955 GetComputerNameEx(ComputerNameDnsFullyQualified, host, &dwLength);
2956 #else
2957 pipe(pipe_fd);
2958 result = fork();
2959 if (result == -1)
2960 {
2961 pmi_err_printf("unable to fork the root listener, errno %d\n", errno);
2962 return PMI_FAIL;
2963 }
2964 if (result == 0)
2965 {
2966 close(pipe_fd[0]); /* close the read end of the pipe */
2967 result = root_smpd(&pipe_fd[1]);
2968 exit(result);
2969 }
2970
2971 /* close the write end of the pipe */
2972 close(pipe_fd[1]);
2973 /* read the port from the root_smpd process */
2974 readbuf(pipe_fd[0], &pmi_process.root_port, sizeof(int));
2975 /* read the kvs name */
2976 readbuf(pipe_fd[0], smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN);
2977 /* close the read end of the pipe */
2978 close(pipe_fd[0]);
2979 pmi_process.root_pid = result;
2980 gethostname(host, len);
2981 #endif
2982
2983 *port = pmi_process.root_port;
2984
2985 return PMI_SUCCESS;
2986 }
2987
PMIX_Stop_root_smpd()2988 int PMIX_Stop_root_smpd()
2989 {
2990 #ifdef HAVE_WINDOWS_H
2991 DWORD result;
2992 #else
2993 int status;
2994 #endif
2995
2996 #ifdef HAVE_WINDOWS_H
2997 result = WaitForSingleObject(pmi_process.hRootThread, INFINITE);
2998 if (result != WAIT_OBJECT_0)
2999 {
3000 return PMI_FAIL;
3001 }
3002 #else
3003 kill(pmi_process.root_pid, SIGKILL);
3004 /*
3005 if (waitpid(pmi_process.root_pid, &status, WUNTRACED) == -1)
3006 {
3007 return PMI_FAIL;
3008 }
3009 */
3010 #endif
3011 return PMI_SUCCESS;
3012 }
3013
3014 /* FIXME: Why is this func defined here ?
3015 * - shouldn't this be in smpd_util*.lib ?
3016 */
root_smpd(void * p)3017 static int root_smpd(void *p)
3018 {
3019 int result;
3020 SMPDU_Sock_set_t set;
3021 SMPDU_Sock_t listener;
3022 smpd_process_group_t *pg;
3023 int i;
3024 #ifndef HAVE_WINDOWS_H
3025 int send_kvs = 0;
3026 int pipe_fd;
3027 #endif
3028
3029 /* unreferenced parameter */
3030 SMPD_UNREFERENCED_ARG(p);
3031
3032 smpd_process.id = 1;
3033 smpd_process.root_smpd = SMPD_FALSE;
3034 smpd_process.map0to1 = SMPD_TRUE;
3035
3036 result = SMPDU_Sock_create_set(&set);
3037 if (result != SMPD_SUCCESS)
3038 {
3039 pmi_mpi_err_printf(result, "SMPDU_Sock_create_set failed.\n");
3040 return PMI_FAIL;
3041 }
3042 smpd_process.set = set;
3043 smpd_dbg_printf("created a set for the listener: %d\n", SMPDU_Sock_get_sock_set_id(set));
3044 result = SMPDU_Sock_listen(set, NULL, &pmi_process.root_port, &listener);
3045 if (result != SMPD_SUCCESS)
3046 {
3047 pmi_mpi_err_printf(result, "SMPDU_Sock_listen failed.\n");
3048 return PMI_FAIL;
3049 }
3050 smpd_dbg_printf("smpd listening on port %d\n", pmi_process.root_port);
3051
3052 result = smpd_create_context(SMPD_CONTEXT_LISTENER, set, listener, -1, &smpd_process.listener_context);
3053 if (result != SMPD_SUCCESS)
3054 {
3055 pmi_err_printf("unable to create a context for the smpd listener.\n");
3056 return PMI_FAIL;
3057 }
3058 result = SMPDU_Sock_set_user_ptr(listener, smpd_process.listener_context);
3059 if (result != SMPD_SUCCESS)
3060 {
3061 pmi_mpi_err_printf(result, "SMPDU_Sock_set_user_ptr failed.\n");
3062 return PMI_FAIL;
3063 }
3064 smpd_process.listener_context->state = SMPD_SMPD_LISTENING;
3065
3066 smpd_dbs_init();
3067 smpd_process.have_dbs = SMPD_TRUE;
3068 if (smpd_process.kvs_name[0] != '\0')
3069 {
3070 result = smpd_dbs_create_name_in(smpd_process.kvs_name);
3071 }
3072 else
3073 {
3074 result = smpd_dbs_create(smpd_process.kvs_name);
3075 #ifndef HAVE_WINDOWS_H
3076 send_kvs = 1;
3077 #endif
3078 }
3079 if (result != SMPD_DBS_SUCCESS)
3080 {
3081 pmi_err_printf("unable to create a kvs database: name = <%s>.\n", smpd_process.kvs_name);
3082 return PMI_FAIL;
3083 }
3084
3085 /* Set up the process group */
3086 /* initialize a new process group structure */
3087 pg = (smpd_process_group_t*)MPIU_Malloc(sizeof(smpd_process_group_t));
3088 if (pg == NULL)
3089 {
3090 pmi_err_printf("unable to allocate memory for a process group structure.\n");
3091 return PMI_FAIL;
3092 }
3093 pg->aborted = SMPD_FALSE;
3094 pg->any_init_received = SMPD_FALSE;
3095 pg->any_noinit_process_exited = SMPD_FALSE;
3096 strncpy(pg->kvs, smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN);
3097 pg->num_procs = pmi_process.nproc;
3098 pg->processes = (smpd_exit_process_t*)MPIU_Malloc(pmi_process.nproc * sizeof(smpd_exit_process_t));
3099 if (pg->processes == NULL)
3100 {
3101 pmi_err_printf("unable to allocate an array of %d process exit structures.\n", pmi_process.nproc);
3102 return PMI_FAIL;
3103 }
3104 for (i=0; i<pmi_process.nproc; i++)
3105 {
3106 pg->processes[i].ctx_key[0] = '\0';
3107 pg->processes[i].errmsg = NULL;
3108 pg->processes[i].exitcode = -1;
3109 pg->processes[i].exited = SMPD_FALSE;
3110 pg->processes[i].finalize_called = SMPD_FALSE;
3111 pg->processes[i].init_called = SMPD_FALSE;
3112 pg->processes[i].node_id = i+1;
3113 pg->processes[i].host[0] = '\0';
3114 pg->processes[i].suspended = SMPD_FALSE;
3115 pg->processes[i].suspend_cmd = NULL;
3116 }
3117 /* add the process group to the global list */
3118 pg->next = smpd_process.pg_list;
3119 smpd_process.pg_list = pg;
3120
3121 #ifdef HAVE_WINDOWS_H
3122 SetEvent(pmi_process.hRootThreadReadyEvent);
3123 #else
3124 if (p != NULL)
3125 {
3126 pipe_fd = *(int*)p;
3127 /* send the root port back over the pipe */
3128 writebuf(pipe_fd, &pmi_process.root_port, sizeof(int));
3129 if (send_kvs)
3130 {
3131 writebuf(pipe_fd, smpd_process.kvs_name, SMPD_MAX_DBS_NAME_LEN);
3132 }
3133 close(pipe_fd);
3134 }
3135 #endif
3136
3137 result = smpd_enter_at_state(set, SMPD_SMPD_LISTENING);
3138 if (result != SMPD_SUCCESS)
3139 {
3140 pmi_err_printf("root_smpd state machine failed.\n");
3141 return PMI_FAIL;
3142 }
3143
3144 result = SMPDU_Sock_destroy_set(set);
3145 if (result != SMPD_SUCCESS)
3146 {
3147 pmi_mpi_err_printf(result, "unable to destroy the set.\n");
3148 }
3149
3150 return PMI_SUCCESS;
3151 }
3152