1 /*****************************************************************************\
2  **  spawn.c - PMI job spawn handling
3  *****************************************************************************
4  *  Copyright (C) 2011-2012 National University of Defense Technology.
5  *  Written by Hongjia Cao <hjcao@nudt.edu.cn>.
6  *  All rights reserved.
7  *
8  *  This file is part of Slurm, a resource management program.
9  *  For details, see <https://slurm.schedmd.com/>.
10  *  Please also read the included file: DISCLAIMER.
11  *
12  *  Slurm is free software; you can redistribute it and/or modify it under
13  *  the terms of the GNU General Public License as published by the Free
14  *  Software Foundation; either version 2 of the License, or (at your option)
15  *  any later version.
16  *
17  *  In addition, as a special exception, the copyright holders give permission
18  *  to link the code of portions of this program with the OpenSSL library under
19  *  certain conditions as described in each individual source file, and
20  *  distribute linked combinations including the two. You must obey the GNU
21  *  General Public License in all respects for all of the code used other than
22  *  OpenSSL. If you modify file(s) with this exception, you may extend this
23  *  exception to your version of the file(s), but you are not obligated to do
24  *  so. If you do not wish to do so, delete this exception statement from your
25  *  version.  If you delete this exception statement from all source files in
26  *  the program, then also delete it here.
27  *
28  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
31  *  details.
32  *
33  *  You should have received a copy of the GNU General Public License along
34  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
35  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
36 \*****************************************************************************/
37 
38 #include <fcntl.h>
39 #include <stdio.h>
40 #include <stdlib.h>
41 #include <sys/stat.h>
42 #include <sys/types.h>
43 
44 #include "src/common/slurm_xlator.h"
45 #include "src/common/xmalloc.h"
46 #include "src/common/xstring.h"
47 #include "src/common/list.h"
48 #include "src/common/slurm_protocol_interface.h"
49 
50 #include "spawn.h"
51 #include "setup.h"
52 #include "tree.h"
53 #include "pmi.h"
54 
55 static uint32_t spawn_seq = 1;	/* 0 if not spawned */
56 static pid_t *spawned_srun_pids = NULL;
57 
58 typedef struct pending_spawn_req {
59 	uint32_t seq;
60 	int fd;
61 	int lrank;
62 	char *from_node;	/* for srun */
63 	struct pending_spawn_req *next;
64 } psr_t;
65 
66 static psr_t *psr_list = NULL;
67 
68 extern spawn_subcmd_t *
spawn_subcmd_new(void)69 spawn_subcmd_new(void)
70 {
71 	spawn_subcmd_t *subcmd;
72 
73 	subcmd = xmalloc(sizeof(spawn_subcmd_t));
74 	return subcmd;
75 }
76 
77 extern void
spawn_subcmd_free(spawn_subcmd_t * subcmd)78 spawn_subcmd_free(spawn_subcmd_t *subcmd)
79 {
80 	int i;
81 
82 	if (subcmd) {
83 		xfree(subcmd->cmd);
84 		if (subcmd->argv) {
85 			for (i = 0; i < subcmd->argc; i ++) {
86 				xfree(subcmd->argv[i]);
87 			}
88 			xfree(subcmd->argv);
89 		}
90 		if (subcmd->info_keys) {
91 			for (i = 0; i < subcmd->info_cnt; i ++) {
92 				xfree(subcmd->info_keys[i]);
93 			}
94 			xfree(subcmd->info_keys);
95 		}
96 		if (subcmd->info_vals) {
97 			for (i = 0; i < subcmd->info_cnt; i ++) {
98 				xfree(subcmd->info_vals[i]);
99 			}
100 			xfree(subcmd->info_vals);
101 		}
102 		xfree(subcmd);
103 	}
104 }
105 
106 extern spawn_req_t *
spawn_req_new(void)107 spawn_req_new(void)
108 {
109 	spawn_req_t *req;
110 
111 	req = xmalloc(sizeof(spawn_req_t));
112 	req->seq = 0;
113 	req->from_node = xstrdup(tree_info.this_node);
114 	return req;
115 }
116 
117 extern void
spawn_req_free(spawn_req_t * req)118 spawn_req_free(spawn_req_t *req)
119 {
120 	int i;
121 
122 	if (req) {
123 		xfree(req->from_node);
124 		if (req->pp_keys) {
125 			for (i = 0; i < req->preput_cnt; i ++) {
126 				xfree(req->pp_keys[i]);
127 			}
128 			xfree(req->pp_keys);
129 		}
130 		if (req->pp_vals) {
131 			for (i = 0; i < req->preput_cnt; i ++) {
132 				xfree(req->pp_vals[i]);
133 			}
134 			xfree(req->pp_vals);
135 		}
136 		if (req->subcmds) {
137 			for (i = 0; i < req->subcmd_cnt; i ++) {
138 				spawn_subcmd_free(req->subcmds[i]);
139 			}
140 			xfree(req->subcmds);
141 		}
142 		xfree(req);
143 	}
144 }
145 
146 extern void
spawn_req_pack(spawn_req_t * req,Buf buf)147 spawn_req_pack(spawn_req_t *req, Buf buf)
148 {
149 	int i, j;
150 	spawn_subcmd_t *subcmd;
151 	void *auth_cred;
152 	char *auth_info = slurm_get_auth_info();
153 
154 	auth_cred = g_slurm_auth_create(AUTH_DEFAULT_INDEX, auth_info);
155 	xfree(auth_info);
156 	if (auth_cred == NULL) {
157 		error("authentication: %m");
158 		return;
159 	}
160 
161 	/*
162 	 * We can use SLURM_PROTOCOL_VERSION here since there is no possibility
163 	 * of protocol mismatch.
164 	 */
165 	(void) g_slurm_auth_pack(auth_cred, buf, SLURM_PROTOCOL_VERSION);
166 	(void) g_slurm_auth_destroy(auth_cred);
167 
168 	pack32(req->seq, buf);
169 	packstr(req->from_node, buf);
170 	pack32(req->subcmd_cnt, buf);
171 	pack32(req->preput_cnt, buf);
172 	for (i = 0; i < req->preput_cnt; i ++) {
173 		packstr(req->pp_keys[i], buf);
174 		packstr(req->pp_vals[i], buf);
175 	}
176 	for (i = 0; i < req->subcmd_cnt; i ++) {
177 		subcmd = req->subcmds[i];
178 
179 		packstr(subcmd->cmd, buf);
180 		pack32(subcmd->max_procs, buf);
181 		pack32(subcmd->argc, buf);
182 		for (j = 0; j < subcmd->argc; j ++) {
183 			packstr(subcmd->argv[j], buf);
184 		}
185 		pack32(subcmd->info_cnt, buf);
186 		for (j = 0; j < subcmd->info_cnt; j ++) {
187 			packstr(subcmd->info_keys[j], buf);
188 			packstr(subcmd->info_vals[j], buf);
189 		}
190 	}
191 }
192 
193 extern int
spawn_req_unpack(spawn_req_t ** req_ptr,Buf buf)194 spawn_req_unpack(spawn_req_t **req_ptr, Buf buf)
195 {
196 	spawn_req_t *req = NULL;
197 	spawn_subcmd_t *subcmd = NULL;
198 	uint32_t temp32;
199 	int i, j;
200 	void *auth_cred;
201 	char *auth_info;
202 	uid_t auth_uid, my_uid;
203 
204 	/*
205 	 * We can use SLURM_PROTOCOL_VERSION here since there is no possibility
206 	 * of protocol mismatch.
207 	 */
208 	auth_cred = g_slurm_auth_unpack(buf, SLURM_PROTOCOL_VERSION);
209 	if (auth_cred == NULL) {
210 		error("authentication: %m");
211 		return SLURM_ERROR;
212 	}
213 	auth_info = slurm_get_auth_info();
214 	if (g_slurm_auth_verify(auth_cred, auth_info)) {
215 		error("authentication: %m");
216 		xfree(auth_info);
217 		return SLURM_ERROR;
218 	}
219 	xfree(auth_info);
220 	auth_uid = g_slurm_auth_get_uid(auth_cred);
221 	(void) g_slurm_auth_destroy(auth_cred);
222 	my_uid = getuid();
223 	if ((auth_uid != 0) && (auth_uid != my_uid)) {
224 		error("mpi/pmi2: spawn request apparently from uid %u",
225 		      (uint32_t) auth_uid);
226 		return SLURM_ERROR;
227 	}
228 
229 	req = xmalloc(sizeof(spawn_req_t));
230 
231 	safe_unpack32(&req->seq, buf);
232 	safe_unpackstr_xmalloc(&req->from_node, &temp32, buf);
233 	safe_unpack32(&req->subcmd_cnt, buf);
234 	/* subcmd_cnt must be greater than 0 */
235 	safe_xcalloc(req->subcmds, req->subcmd_cnt, sizeof(spawn_subcmd_t *));
236 	safe_unpack32(&req->preput_cnt, buf);
237 	if (req->preput_cnt > 0) {
238 		safe_xcalloc(req->pp_keys, req->preput_cnt, sizeof(char *));
239 		safe_xcalloc(req->pp_vals, req->preput_cnt, sizeof(char *));
240 		for (i = 0; i < req->preput_cnt; i ++) {
241 			safe_unpackstr_xmalloc(&req->pp_keys[i], &temp32, buf);
242 			safe_unpackstr_xmalloc(&req->pp_vals[i], &temp32, buf);
243 		}
244 	}
245 	for (i = 0; i < req->subcmd_cnt; i ++) {
246 		req->subcmds[i] = spawn_subcmd_new();
247 		subcmd = req->subcmds[i];
248 
249 		safe_unpackstr_xmalloc(&(subcmd->cmd), &temp32, buf);
250 		safe_unpack32(&(subcmd->max_procs), buf);
251 		safe_unpack32(&(subcmd->argc), buf);
252 		if (subcmd->argc > 0) {
253 			safe_xcalloc(subcmd->argv, subcmd->argc,
254 				     sizeof(char *));
255 			for (j = 0; j < subcmd->argc; j ++) {
256 				safe_unpackstr_xmalloc(&(subcmd->argv[j]),
257 						       &temp32, buf);
258 			}
259 		}
260 		safe_unpack32(&(subcmd->info_cnt), buf);
261 		if (subcmd->info_cnt > 0) {
262 			safe_xcalloc(subcmd->info_keys, subcmd->info_cnt,
263 				     sizeof(char *));
264 			safe_xcalloc(subcmd->info_vals, subcmd->info_cnt,
265 				     sizeof(char *));
266 			for (j = 0; j < subcmd->info_cnt; j ++) {
267 				safe_unpackstr_xmalloc(&(subcmd->info_keys[j]),
268 						       &temp32, buf);
269 				safe_unpackstr_xmalloc(&(subcmd->info_vals[j]),
270 						       &temp32, buf);
271 			}
272 		}
273 	}
274 	*req_ptr = req;
275 	return SLURM_SUCCESS;
276 
277 unpack_error:
278 	spawn_req_free(req);
279 	return SLURM_ERROR;
280 }
281 
282 extern int
spawn_req_send_to_srun(spawn_req_t * req,spawn_resp_t ** resp_ptr)283 spawn_req_send_to_srun(spawn_req_t *req, spawn_resp_t **resp_ptr)
284 {
285 	Buf req_buf = NULL, resp_buf = NULL;
286 	int rc;
287 	uint16_t cmd;
288 
289 	req_buf = init_buf(2048);
290 	cmd = TREE_CMD_SPAWN;
291 	pack16(cmd, req_buf);
292 	spawn_req_pack(req, req_buf);
293 	rc = tree_msg_to_srun_with_resp(get_buf_offset(req_buf),
294 					get_buf_data(req_buf), &resp_buf);
295 	free_buf(req_buf);
296 
297 	if (rc == SLURM_SUCCESS) {
298 		rc = spawn_resp_unpack(resp_ptr, resp_buf);
299 		free_buf(resp_buf);
300 	}
301 	return rc;
302 }
303 /**************************************************************/
304 
305 extern spawn_resp_t *
spawn_resp_new(void)306 spawn_resp_new(void)
307 {
308 	spawn_resp_t *resp;
309 
310 	resp = xmalloc(sizeof(spawn_resp_t));
311 	return resp;
312 }
313 
314 extern void
spawn_resp_free(spawn_resp_t * resp)315 spawn_resp_free(spawn_resp_t *resp)
316 {
317 	if (resp) {
318 		xfree(resp->jobid);
319 		xfree(resp->error_codes);
320 		xfree(resp);
321 	}
322 }
323 
324 extern void
spawn_resp_pack(spawn_resp_t * resp,Buf buf)325 spawn_resp_pack(spawn_resp_t *resp, Buf buf)
326 {
327 	int i;
328 
329 	pack32(resp->seq, buf);
330 	pack32((uint32_t)resp->rc, buf);
331 	pack16((uint16_t)resp->pmi_port, buf);
332 	packstr(resp->jobid, buf);
333 	pack32(resp->error_cnt, buf);
334 	for (i = 0; i < resp->error_cnt; i ++) {
335 		pack32((uint32_t)resp->error_codes[i], buf);
336 	}
337 }
338 
339 extern int
spawn_resp_unpack(spawn_resp_t ** resp_ptr,Buf buf)340 spawn_resp_unpack(spawn_resp_t **resp_ptr, Buf buf)
341 {
342 	spawn_resp_t *resp = NULL;
343 	uint32_t temp32;
344 	int i;
345 
346 	resp = xmalloc(sizeof(spawn_resp_t));
347 
348 	safe_unpack32(&resp->seq, buf);
349 	safe_unpack32((uint32_t *)&resp->rc, buf);
350 	safe_unpack16((uint16_t *)&resp->pmi_port, buf);
351 	safe_unpackstr_xmalloc(&resp->jobid, &temp32, buf);
352 	safe_unpack32(&resp->error_cnt, buf);
353 	if (resp->error_cnt > 0) {
354 		safe_xcalloc(resp->error_codes, resp->error_cnt, sizeof(int));
355 		for (i = 0; i < resp->error_cnt; i ++) {
356 			safe_unpack32((uint32_t *)&(resp->error_codes[i]), buf);
357 		}
358 	}
359 	*resp_ptr = resp;
360 	return SLURM_SUCCESS;
361 
362 unpack_error:
363 	spawn_resp_free(resp);
364 	return SLURM_ERROR;
365 }
366 
367 extern int
spawn_resp_send_to_stepd(spawn_resp_t * resp,char ** node)368 spawn_resp_send_to_stepd(spawn_resp_t *resp, char **node)
369 {
370 	Buf buf;
371 	int rc;
372 	uint16_t cmd;
373 
374 	buf = init_buf(1024);
375 
376 	cmd = TREE_CMD_SPAWN_RESP;
377 	pack16(cmd, buf);
378 	spawn_resp_pack(resp, buf);
379 
380 	rc = slurm_forward_data(node, tree_sock_addr,
381 				get_buf_offset(buf),
382 				get_buf_data(buf));
383 	free_buf(buf);
384 	return rc;
385 }
386 
387 extern int
spawn_resp_send_to_srun(spawn_resp_t * resp)388 spawn_resp_send_to_srun(spawn_resp_t *resp)
389 {
390 	Buf buf;
391 	int rc;
392 	uint16_t cmd;
393 
394 	buf = init_buf(1024);
395 
396 	cmd = TREE_CMD_SPAWN_RESP;
397 	pack16(cmd, buf);
398 	spawn_resp_pack(resp, buf);
399 
400 	rc = tree_msg_to_srun(get_buf_offset(buf), get_buf_data(buf));
401 	free_buf(buf);
402 	return rc;
403 }
404 
405 extern int
spawn_resp_send_to_fd(spawn_resp_t * resp,int fd)406 spawn_resp_send_to_fd(spawn_resp_t *resp, int fd)
407 {
408 	Buf buf;
409 	int rc;
410 
411 	buf = init_buf(1024);
412 
413 	/* sync with spawn_req_send_to_srun */
414 /* 	cmd = TREE_CMD_SPAWN_RESP; */
415 /* 	pack16(cmd, buf); */
416 	spawn_resp_pack(resp, buf);
417 	rc = slurm_msg_sendto(fd, get_buf_data(buf), get_buf_offset(buf));
418 	free_buf(buf);
419 
420 	return rc;
421 }
422 
423 /**************************************************************/
424 
425 extern int
spawn_psr_enqueue(uint32_t seq,int fd,int lrank,char * from_node)426 spawn_psr_enqueue(uint32_t seq, int fd, int lrank, char *from_node)
427 {
428 	psr_t *psr;
429 
430 	psr = xmalloc(sizeof(psr_t));
431 	psr->seq = seq;
432 	psr->fd = fd;
433 	psr->lrank = lrank;
434 	psr->from_node = xstrdup(from_node);
435 	psr->next = psr_list;
436 	psr_list = psr;
437 	return SLURM_SUCCESS;
438 }
439 
440 extern int
spawn_psr_dequeue(uint32_t seq,int * fd,int * lrank,char ** from_node)441 spawn_psr_dequeue(uint32_t seq, int *fd, int *lrank, char **from_node)
442 {
443 	psr_t *psr, **pprev;
444 
445 	pprev = &psr_list;
446 	psr = *pprev;
447 	while(psr != NULL) {
448 		if (psr->seq != seq) {
449 			pprev = &(psr->next);
450 			psr = *pprev;
451 			continue;
452 		}
453 		/* found. remove the psr. */
454 		*fd = psr->fd;
455 		*lrank = psr->lrank;
456 		*from_node = psr->from_node; /* take over ownership */
457 		*pprev = psr->next;
458 		xfree(psr);
459 		return SLURM_SUCCESS;
460 	}
461 	return SLURM_ERROR;
462 }
463 
464 extern uint32_t
spawn_seq_next(void)465 spawn_seq_next(void)
466 {
467 	return spawn_seq ++;
468 }
469 
470 static int
_exec_srun_single(spawn_req_t * req,char ** env)471 _exec_srun_single(spawn_req_t *req, char **env)
472 {
473 	int argc, i, j;
474 	char **argv = NULL;
475 	spawn_subcmd_t *subcmd;
476 
477 	debug3("mpi/mpi2: in _exec_srun_single");
478 	subcmd = req->subcmds[0];
479 	argc = subcmd->argc + 7;
480 	xrealloc(argv, (argc + 1) * sizeof(char *));
481 
482 	j = 0;
483 	argv[j ++] = "srun";
484 	argv[j ++] = "--mpi=pmi2";
485 	if (job_info.srun_opt && job_info.srun_opt->srun_opt->no_alloc) {
486 		argv[j ++] = "--no-alloc";
487 		xstrfmtcat(argv[j ++], "--nodelist=%s",
488 			   job_info.srun_opt->nodelist);
489 	}
490 
491 	xstrfmtcat(argv[j ++], "--ntasks=%d", subcmd->max_procs);
492 	/* TODO: inherit options from srun_opt. */
493 	for (i = 0; i < subcmd->info_cnt; i ++) {
494 		if (0) {
495 
496 		} else if (! xstrcmp(subcmd->info_keys[i], "host")) {
497 			xstrfmtcat(argv[j ++], "--nodelist=%s",
498 				   subcmd->info_vals[i]);
499 
500 		} else if (! xstrcmp(subcmd->info_keys[i], "arch")) {
501 			error("mpi/pmi2: spawn info key 'arch' not supported");
502 
503 		} else if (! xstrcmp(subcmd->info_keys[i], "wdir")) {
504 			xstrfmtcat(argv[j ++], "--chdir=%s",
505 				   subcmd->info_vals[i]);
506 
507 		} else if (! xstrcmp(subcmd->info_keys[i], "path")) {
508 			env_array_overwrite_fmt(&env, "PATH", "%s",
509 						subcmd->info_vals[i]);
510 
511 		} else if (! xstrcmp(subcmd->info_keys[i], "file")) {
512 			error("mpi/pmi2: spawn info key 'file' not supported");
513 
514 		} else if (! xstrcmp(subcmd->info_keys[i], "soft")) {
515 			error("mpi/pmi2: spawn info key 'soft' not supported");
516 
517 		} else {
518 			error("mpi/pmi2: unknown spawn info key '%s' ignored",
519 				subcmd->info_keys[i]);
520 		}
521 	}
522 	argv[j ++] = subcmd->cmd;
523 	for (i = 0; i < subcmd->argc; i ++) {
524 		argv[j ++] = subcmd->argv[i];
525 	}
526 	argv[j ++] = NULL;
527 
528 	{
529 		debug3("mpi/mpi2: to execve");
530 		for (i = 0; i < j; i ++) {
531 			debug3("mpi/pmi2:   argv[%d]=%s", i, argv[i]);
532 		}
533 	}
534 	execve(SLURM_PREFIX"/bin/srun", argv, env);
535 	error("mpi/pmi2: failed to exec srun: %m");
536 	return SLURM_ERROR;
537 }
538 
539 static int
_exec_srun_multiple(spawn_req_t * req,char ** env)540 _exec_srun_multiple(spawn_req_t *req, char **env)
541 {
542 	int argc, ntasks, i, j, spawn_cnt, fd;
543 	char **argv = NULL, *buf = NULL;
544 	spawn_subcmd_t *subcmd = NULL;
545 	char fbuf[128];
546 
547 	debug3("mpi/pmi2: in _exec_srun_multiple");
548 	/* create a tmp multi_prog file */
549 	/* TODO: how to delete the file? */
550 	sprintf(fbuf, "/tmp/%d.XXXXXX", getpid());
551 	fd = mkstemp(fbuf);
552 	if (fd < 0) {
553 		error("mpi/pmi2: failed to open multi-prog file %s: %m", fbuf);
554 		return SLURM_ERROR;
555 	}
556 	ntasks = 0;
557 	for (spawn_cnt = 0; spawn_cnt < req->subcmd_cnt; spawn_cnt ++) {
558 		subcmd = req->subcmds[spawn_cnt];
559 		/* TODO: write a wrapper program to handle the info */
560 		if (subcmd->info_cnt > 0) {
561 			error("mpi/pmi2: spawn info ignored");
562 		}
563 		if (subcmd->max_procs == 1) {
564 			xstrfmtcat(buf, "%d  %s", ntasks, subcmd->cmd);
565 		} else {
566 			xstrfmtcat(buf, "%d-%d  %s", ntasks,
567 				   ntasks + subcmd->max_procs - 1, subcmd->cmd);
568 		}
569 		for (i = 0; i < subcmd->argc; i ++) {
570 			xstrfmtcat(buf, " %s", subcmd->argv[i]);
571 		}
572 		xstrcat(buf, "\n");
573 		ntasks += subcmd->max_procs;
574 	}
575 	if (buf) {
576 		safe_write(fd, buf, strlen(buf));
577 		xfree(buf);
578 	}
579 	close(fd);
580 
581 	argc = 7;
582 	xrealloc(argv, argc * sizeof(char *));
583 
584 	j = 0;
585 	argv[j ++] = "srun";
586 	argv[j ++] = "--mpi=pmi2";
587 	xstrfmtcat(argv[j ++], "--ntasks=%d", ntasks);
588 	if (job_info.srun_opt && job_info.srun_opt->srun_opt->no_alloc) {
589 		argv[j ++] = "--no-alloc";
590 		xstrfmtcat(argv[j ++], "--nodelist=%s",
591 			   job_info.srun_opt->nodelist);
592 	}
593 	argv[j ++] = "--multi-prog";
594 	argv[j ++] = fbuf;
595 	argv[j ++] = NULL;
596 
597 	debug3("mpi/mpi2: to execve");
598 
599 	execve(SLURM_PREFIX"/bin/srun", argv, env);
600 	error("mpi/pmi2: failed to exec srun: %m");
601 	return SLURM_ERROR;
602 rwfail:
603 	error("mpi/pmi2: failed to generate multi-prog file");
604 	return SLURM_ERROR;
605 }
606 
607 static void
_setup_exec_srun(spawn_req_t * req)608 _setup_exec_srun(spawn_req_t *req)
609 {
610 	char **env, env_key[32];
611 	int i, rc;
612 	spawn_resp_t *resp;
613 
614 	debug3("mpi/pmi2: in _setup_exec_srun");
615 
616 	/* setup environments */
617 	env = env_array_copy((const char **)job_info.job_env);
618 	/* TODO: unset some env-vars */
619 
620 	env_array_overwrite_fmt(&env, "SLURM_JOB_ID", "%u", job_info.jobid);
621 	env_array_overwrite_fmt(&env, PMI2_SPAWNER_JOBID_ENV, "%s",
622 				job_info.pmi_jobid);
623 	env_array_overwrite_fmt(&env, PMI2_PMI_JOBID_ENV, "%s-%u",
624 				job_info.pmi_jobid, req->seq);
625 	env_array_overwrite_fmt(&env, PMI2_SPAWN_SEQ_ENV, "%u", req->seq);
626 	env_array_overwrite_fmt(&env, PMI2_SPAWNER_PORT_ENV, "%hu",
627 				tree_info.pmi_port);
628 	/* preput kvs */
629 	env_array_overwrite_fmt(&env, PMI2_PREPUT_CNT_ENV, "%d",
630 				req->preput_cnt);
631 	for (i = 0; i < req->preput_cnt; i ++) {
632 		snprintf(env_key, 32, PMI2_PPKEY_ENV"%d", i);
633 		env_array_overwrite_fmt(&env, env_key, "%s", req->pp_keys[i]);
634 		snprintf(env_key, 32, PMI2_PPVAL_ENV"%d", i);
635 		env_array_overwrite_fmt(&env, env_key, "%s", req->pp_vals[i]);
636 	}
637 
638 	if (req->subcmd_cnt == 1) {
639 		/* no return if success */
640 		rc = _exec_srun_single(req, env);
641 	} else {
642 		/* no return if success */
643 		rc = _exec_srun_multiple(req, env);
644 	}
645 
646 	resp = spawn_resp_new();
647 	resp->seq = req->seq;
648 	xstrfmtcat(resp->jobid, "%s-%u", job_info.pmi_jobid, req->seq);
649 	resp->error_cnt = 0;
650 	resp->rc = rc;
651 
652 	/* fake a srun address */
653 	tree_info.srun_addr = xmalloc(sizeof(slurm_addr_t));
654 	slurm_set_addr(tree_info.srun_addr, tree_info.pmi_port,
655 		       "127.0.0.1");
656 	spawn_resp_send_to_srun(resp);
657 	spawn_resp_free(resp);
658 	_exit(errno);
659 }
660 
661 extern int
spawn_job_do_spawn(spawn_req_t * req)662 spawn_job_do_spawn(spawn_req_t *req)
663 {
664 	pid_t child_pid;
665 
666 	child_pid = fork();
667 	if (child_pid < 0) {
668 		error("mpi/pmi2: failed to fork srun");
669 		return SLURM_ERROR;
670 	} else if (child_pid == 0) { /* child */
671 		_setup_exec_srun(req);
672 	} else {
673 		/* always serially executed, spawn_seq == req->seq + 1 */
674 		xrealloc(spawned_srun_pids, spawn_seq * sizeof(pid_t));
675 		spawned_srun_pids[req->seq] = child_pid;
676 		return SLURM_SUCCESS;
677 	}
678 	return SLURM_ERROR;
679 }
680 
681 static int
_wait_for_all(void)682 _wait_for_all(void)
683 {
684 	pid_t child;
685 	int i, status, exited;
686 
687 	exited = 0;
688 	for (i = 1; i < spawn_seq; i ++) { /* seq 0 not used */
689 		if (! spawned_srun_pids[i])
690 			continue;
691 		child = waitpid(spawned_srun_pids[i], &status, WNOHANG);
692 		if (child == spawned_srun_pids[i]) {
693 			spawned_srun_pids[i] = 0;
694 			exited ++;
695 		}
696 	}
697 	return exited;
698 }
699 
700 extern void
spawn_job_wait(void)701 spawn_job_wait(void)
702 {
703 	int exited, i, wait;
704 
705 	if (job_info.srun_opt) {
706 		wait = job_info.srun_opt->srun_opt->max_wait;
707 	} else {
708 		wait = 0;
709 	}
710 
711 	if (wait == 0)		/* TODO: wait indefinitely */
712 		wait = 60;
713 	exited = _wait_for_all();
714 	while(wait > 0 && exited != spawn_seq - 1) {
715 		sleep(1);
716 		exited += _wait_for_all();
717 		wait --;
718 	}
719 	for (i = 1; i < spawn_seq; i ++) {
720 		if (!spawned_srun_pids[i])
721 			continue;
722 		/* terminte it */
723 		kill(spawned_srun_pids[i], SIGTERM);
724 	}
725 }
726