1 /*****************************************************************************\
2  *  launch_slurm.c - Define job launch using slurm.
3  *****************************************************************************
4  *  Copyright (C) 2012-2017 SchedMD LLC
5  *  Written by Danny Auble <da@schedmd.com>
6  *
7  *  This file is part of Slurm, a resource management program.
8  *  For details, see <https://slurm.schedmd.com/>.
9  *  Please also read the included file: DISCLAIMER.
10  *
11  *  Slurm is free software; you can redistribute it and/or modify it under
12  *  the terms of the GNU General Public License as published by the Free
13  *  Software Foundation; either version 2 of the License, or (at your option)
14  *  any later version.
15  *
16  *  In addition, as a special exception, the copyright holders give permission
17  *  to link the code of portions of this program with the OpenSSL library under
18  *  certain conditions as described in each individual source file, and
19  *  distribute linked combinations including the two. You must obey the GNU
20  *  General Public License in all respects for all of the code used other than
21  *  OpenSSL. If you modify file(s) with this exception, you may extend this
22  *  exception to your version of the file(s), but you are not obligated to do
23  *  so. If you do not wish to do so, delete this exception statement from your
24  *  version.  If you delete this exception statement from all source files in
25  *  the program, then also delete it here.
26  *
27  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
28  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
29  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
30  *  details.
31  *
32  *  You should have received a copy of the GNU General Public License along
33  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
34  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
35 \*****************************************************************************/
36 
37 #include "config.h"
38 
39 #define _GNU_SOURCE
40 
41 #include <fcntl.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/stat.h>
45 
46 #include "src/common/slurm_opt.h"
47 #include "src/common/slurm_xlator.h"
48 #include "src/common/slurm_resource_info.h"
49 #include "src/api/pmi_server.h"
50 #include "src/srun/libsrun/allocate.h"
51 #include "src/srun/libsrun/fname.h"
52 #include "src/srun/libsrun/launch.h"
53 #include "src/srun/libsrun/multi_prog.h"
54 
55 #include "src/plugins/launch/slurm/task_state.h"
56 
57 #ifndef OPEN_MPI_PORT_ERROR
58 /* This exit code indicates the launched Open MPI tasks could
59  *	not open the reserved port. It was already open by some
60  *	other process. */
61 #define OPEN_MPI_PORT_ERROR 108
62 #endif
63 
64 #define MAX_STEP_RETRIES 4
65 
66 /*
67  * These variables are required by the generic plugin interface.  If they
68  * are not found in the plugin, the plugin loader will ignore it.
69  *
70  * plugin_name - a string giving a human-readable description of the
71  * plugin.  There is no maximum length, but the symbol must refer to
72  * a valid string.
73  *
74  * plugin_type - a string suggesting the type of the plugin or its
75  * applicability to a particular form of data or method of data handling.
76  * If the low-level plugin API is used, the contents of this string are
77  * unimportant and may be anything.  Slurm uses the higher-level plugin
78  * interface which requires this string to be of the form
79  *
80  *      <application>/<method>
81  *
82  * where <application> is a description of the intended application of
83  * the plugin (e.g., "task" for task control) and <method> is a description
84  * of how this plugin satisfies that application.  Slurm will only load
85  * a task plugin if the plugin_type string has a prefix of "task/".
86  *
87  * plugin_version - an unsigned 32-bit integer containing the Slurm version
88  * (major.minor.micro combined into a single number).
89  */
90 const char plugin_name[]        = "launch Slurm plugin";
91 const char plugin_type[]        = "launch/slurm";
92 const uint32_t plugin_version   = SLURM_VERSION_NUMBER;
93 
94 static List local_job_list = NULL;
95 static uint32_t *local_global_rc = NULL;
96 static pthread_mutex_t launch_lock = PTHREAD_MUTEX_INITIALIZER;
97 static pthread_mutex_t het_job_lock = PTHREAD_MUTEX_INITIALIZER;
98 static pthread_cond_t  start_cond  = PTHREAD_COND_INITIALIZER;
99 static pthread_mutex_t start_mutex = PTHREAD_MUTEX_INITIALIZER;
100 static slurm_opt_t *opt_save = NULL;
101 
102 static List task_state_list = NULL;
103 static time_t launch_start_time;
104 static bool retry_step_begin = false;
105 static int  retry_step_cnt = 0;
106 
107 static int _step_signal(int signal);
108 extern char **environ;
109 
_hostset_to_string(hostset_t hs)110 static char *_hostset_to_string(hostset_t hs)
111 {
112 	size_t n = 1024;
113 	size_t maxsize = 1024 * 64;
114 	char *str = NULL;
115 
116 	do {
117 		str = xrealloc(str, n);
118 	} while ((hostset_ranged_string(hs, n*=2, str) < 0) && (n < maxsize));
119 
120 	/*
121 	 *  If string was truncated, indicate this with a '+' suffix.
122 	 */
123 	if (n >= maxsize)
124 		strcpy(str + (maxsize - 2), "+");
125 
126 	return str;
127 }
128 
129 /*
130  * Convert an array of task IDs into a list of host names
131  * RET: the string, caller must xfree() this value
132  */
_task_ids_to_host_list(int ntasks,uint32_t * taskids,srun_job_t * my_srun_job)133 static char *_task_ids_to_host_list(int ntasks, uint32_t *taskids,
134 				    srun_job_t *my_srun_job)
135 {
136 	int i, task_cnt = 0;
137 	hostset_t hs;
138 	char *hosts;
139 	slurm_step_layout_t *sl;
140 
141 	if ((sl = launch_common_get_slurm_step_layout(my_srun_job)) == NULL)
142 		return (xstrdup("Unknown"));
143 
144 	/*
145 	 * If overhead of determining the hostlist is too high then srun
146 	 * communications will timeout and fail, so return "Unknown" instead.
147 	 *
148 	 * See slurm_step_layout_host_id() in src/common/slurm_step_layout.c
149 	 * for details.
150 	 */
151 	for (i = 0; i < sl->node_cnt; i++) {
152 		task_cnt += sl->tasks[i];
153 	}
154 	if (task_cnt > 100000)
155 		return (xstrdup("Unknown"));
156 
157 	hs = hostset_create(NULL);
158 	for (i = 0; i < ntasks; i++) {
159 		char *host = slurm_step_layout_host_name(sl, taskids[i]);
160 		if (host) {
161 			hostset_insert(hs, host);
162 			free(host);
163 		} else {
164 			error("Could not identify host name for task %u",
165 			      taskids[i]);
166 		}
167 	}
168 
169 	hosts = _hostset_to_string(hs);
170 	hostset_destroy(hs);
171 
172 	return (hosts);
173 }
174 
175 /*
176  * Convert an array of task IDs into a string.
177  * RET: the string, caller must xfree() this value
178  * NOTE: the taskids array is not necessarily in numeric order,
179  *       so we use existing bitmap functions to format
180  */
_task_array_to_string(int ntasks,uint32_t * taskids,srun_job_t * my_srun_job)181 static char *_task_array_to_string(int ntasks, uint32_t *taskids,
182 				   srun_job_t *my_srun_job)
183 {
184 	bitstr_t *tasks_bitmap = NULL;
185 	char *str;
186 	int i;
187 
188 	tasks_bitmap = bit_alloc(my_srun_job->ntasks);
189 	if (!tasks_bitmap) {
190 		error("bit_alloc: memory allocation failure");
191 		exit(error_exit);
192 	}
193 	for (i = 0; i < ntasks; i++)
194 		bit_set(tasks_bitmap, taskids[i]);
195 	str = xmalloc(2048);
196 	bit_fmt(str, 2048, tasks_bitmap);
197 	FREE_NULL_BITMAP(tasks_bitmap);
198 
199 	return str;
200 }
201 
_update_task_exit_state(task_state_t task_state,uint32_t ntasks,uint32_t * taskids,int abnormal)202 static void _update_task_exit_state(task_state_t task_state, uint32_t ntasks,
203 				    uint32_t *taskids, int abnormal)
204 {
205 	int i;
206 	task_state_type_t t = abnormal ? TS_ABNORMAL_EXIT : TS_NORMAL_EXIT;
207 
208 	for (i = 0; i < ntasks; i++)
209 		task_state_update(task_state, taskids[i], t);
210 }
211 
_kill_on_bad_exit(void)212 static int _kill_on_bad_exit(void)
213 {
214 	xassert(opt_save->srun_opt);
215 	if (!opt_save || (opt_save->srun_opt->kill_bad_exit == NO_VAL))
216 		return slurm_get_kill_on_bad_exit();
217 	return opt_save->srun_opt->kill_bad_exit;
218 }
219 
_setup_max_wait_timer(void)220 static void _setup_max_wait_timer(void)
221 {
222 	xassert(opt_save->srun_opt);
223 	/*
224 	 *  If these are the first tasks to finish we need to
225 	 *   start a timer to kill off the job step if the other
226 	 *   tasks don't finish within opt_save->srun_opt->max_wait seconds.
227 	 */
228 	verbose("First task exited. Terminating job in %ds",
229 		opt_save->srun_opt->max_wait);
230 	srun_max_timer = true;
231 	alarm(opt_save->srun_opt->max_wait);
232 }
233 
_taskstr(int n)234 static const char *_taskstr(int n)
235 {
236 	if (n == 1)
237 		return "task";
238 	else
239 		return "tasks";
240 }
241 
_is_openmpi_port_error(int errcode)242 static int _is_openmpi_port_error(int errcode)
243 {
244 	if (errcode != OPEN_MPI_PORT_ERROR)
245 		return 0;
246 	if (opt_save && (opt_save->srun_opt->resv_port_cnt == NO_VAL))
247 		return 0;
248 	if (difftime(time(NULL), launch_start_time) > slurm_get_msg_timeout())
249 		return 0;
250 	return 1;
251 }
252 
253 static void
_handle_openmpi_port_error(const char * tasks,const char * hosts,slurm_step_ctx_t * step_ctx)254 _handle_openmpi_port_error(const char *tasks, const char *hosts,
255 			   slurm_step_ctx_t *step_ctx)
256 {
257 	uint32_t job_id, step_id;
258 	char *msg = "retrying";
259 
260 	if (!retry_step_begin) {
261 		retry_step_begin = true;
262 		retry_step_cnt++;
263 	}
264 
265 	if (retry_step_cnt >= MAX_STEP_RETRIES)
266 		msg = "aborting";
267 	error("%s: tasks %s unable to claim reserved port, %s.",
268 	      hosts, tasks, msg);
269 
270 	slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_JOBID, &job_id);
271 	slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_STEPID, &step_id);
272 	info("Terminating job step %u.%u", job_id, step_id);
273 	slurm_kill_job_step(job_id, step_id, SIGKILL);
274 }
275 
_task_start(launch_tasks_response_msg_t * msg)276 static void _task_start(launch_tasks_response_msg_t *msg)
277 {
278 	MPIR_PROCDESC *table;
279 	uint32_t local_task_id, global_task_id;
280 	int i;
281 	task_state_t task_state;
282 
283 	if (msg->count_of_pids) {
284 		verbose("Node %s, %d tasks started",
285 			msg->node_name, msg->count_of_pids);
286 	} else {
287 		/*
288 		 * This message should be displayed through the API,
289 		 * hence it is a debug2() instead of error().
290 		 */
291 		debug2("No tasks started on node %s: %s",
292 		       msg->node_name, slurm_strerror(msg->return_code));
293 	}
294 
295 	task_state = task_state_find(msg->job_id, msg->step_id, NO_VAL,
296 				     task_state_list);
297 	if (!task_state) {
298 		error("%s: Could not locate task state for step %u.%u",
299 		      __func__, msg->job_id, msg->step_id);
300 	}
301 	for (i = 0; i < msg->count_of_pids; i++) {
302 		local_task_id = msg->task_ids[i];
303 		global_task_id = task_state_global_id(task_state,local_task_id);
304 		if (global_task_id >= MPIR_proctable_size) {
305 			error("%s: task_id too large (%u >= %d)", __func__,
306 			      global_task_id, MPIR_proctable_size);
307 			continue;
308 		}
309 		table = &MPIR_proctable[global_task_id];
310 		table->host_name = xstrdup(msg->node_name);
311 		/* table->executable_name set in mpir_set_executable_names() */
312 		table->pid = msg->local_pids[i];
313 		if (!task_state) {
314 			error("%s: Could not update task state for task ID %u",
315 			      __func__, global_task_id);
316 		} else if (msg->return_code == 0) {
317 			task_state_update(task_state, local_task_id,
318 					  TS_START_SUCCESS);
319 		} else {
320 			task_state_update(task_state, local_task_id,
321 					  TS_START_FAILURE);
322 
323 		}
324 	}
325 
326 }
327 
_task_finish(task_exit_msg_t * msg)328 static void _task_finish(task_exit_msg_t *msg)
329 {
330 	char *tasks = NULL, *hosts = NULL;
331 	bool build_task_string = false;
332 	uint32_t rc = 0;
333 	int normal_exit = 0;
334 	static int reduce_task_exit_msg = -1;
335 	static int msg_printed = 0, oom_printed = 0, last_task_exit_rc;
336 	task_state_t task_state;
337 	const char *task_str = _taskstr(msg->num_tasks);
338 	srun_job_t *my_srun_job;
339 	ListIterator iter;
340 
341 	iter = list_iterator_create(local_job_list);
342 	while ((my_srun_job = (srun_job_t *) list_next(iter))) {
343 		if ((my_srun_job->jobid  == msg->job_id) &&
344 		    (my_srun_job->stepid == msg->step_id))
345 			break;
346 	}
347 	list_iterator_destroy(iter);
348 	if (!my_srun_job) {
349 		error("Ignoring exit message from unrecognized step %u.%u",
350 		      msg->job_id, msg->step_id);
351 		return;
352 	}
353 
354 	if (reduce_task_exit_msg == -1) {
355 		char *ptr = getenv("SLURM_SRUN_REDUCE_TASK_EXIT_MSG");
356 		if (ptr && atoi(ptr) != 0)
357 			reduce_task_exit_msg = 1;
358 		else
359 			reduce_task_exit_msg = 0;
360 	}
361 
362 	verbose("Received task exit notification for %d %s of step %u.%u (status=0x%04x).",
363 		msg->num_tasks, task_str, msg->job_id, msg->step_id,
364 		msg->return_code);
365 
366 	/*
367 	 * Only build the "tasks" and "hosts" strings as needed.
368 	 * Building them can take multiple milliseconds
369 	 */
370 	if (((msg->return_code & 0xff) == SIG_OOM) && !oom_printed) {
371 		build_task_string = true;
372 	} else if (WIFEXITED(msg->return_code)) {
373 		if ((rc = WEXITSTATUS(msg->return_code)) == 0) {
374 			if (get_log_level() >= LOG_LEVEL_VERBOSE)
375 				build_task_string = true;
376 		} else {
377 			build_task_string = true;
378 		}
379 
380 	} else if (WIFSIGNALED(msg->return_code)) {
381 		if (my_srun_job->state >= SRUN_JOB_CANCELLED) {
382 			if (get_log_level() >= LOG_LEVEL_VERBOSE)
383 				build_task_string = true;
384 		} else {
385 			build_task_string = true;
386 		}
387 	}
388 	if (build_task_string) {
389 		tasks = _task_array_to_string(msg->num_tasks,
390 					      msg->task_id_list, my_srun_job);
391 		hosts = _task_ids_to_host_list(msg->num_tasks,
392 					       msg->task_id_list, my_srun_job);
393 	}
394 
395 	slurm_mutex_lock(&launch_lock);
396 	if ((msg->return_code & 0xff) == SIG_OOM) {
397 		if (!oom_printed)
398 			error("%s: %s %s: Out Of Memory", hosts, task_str,
399 			     tasks);
400 		oom_printed = 1;
401 		*local_global_rc = msg->return_code;
402 	} else if (WIFEXITED(msg->return_code)) {
403 		if ((rc = WEXITSTATUS(msg->return_code)) == 0) {
404 			verbose("%s: %s %s: Completed", hosts, task_str, tasks);
405 			normal_exit = 1;
406 		} else if (_is_openmpi_port_error(rc)) {
407 			_handle_openmpi_port_error(tasks, hosts,
408 						   my_srun_job->step_ctx);
409 		} else if ((reduce_task_exit_msg == 0) ||
410 			   (msg_printed == 0) ||
411 			   (msg->return_code != last_task_exit_rc)) {
412 			error("%s: %s %s: Exited with exit code %d",
413 			      hosts, task_str, tasks, rc);
414 			msg_printed = 1;
415 		}
416 		if (((*local_global_rc & 0xff) != SIG_OOM) &&
417 		    (!WIFSIGNALED(*local_global_rc)) &&
418 		    (!WIFEXITED(*local_global_rc) ||
419 		     (rc > WEXITSTATUS(*local_global_rc))))
420 			*local_global_rc = msg->return_code;
421 	} else if (WIFSIGNALED(msg->return_code)) {
422 		const char *signal_str = strsignal(WTERMSIG(msg->return_code));
423 		char *core_str = "";
424 #ifdef WCOREDUMP
425 		if (WCOREDUMP(msg->return_code))
426 			core_str = " (core dumped)";
427 #endif
428 		if (my_srun_job->state >= SRUN_JOB_CANCELLED) {
429 			verbose("%s: %s %s: %s%s",
430 				hosts, task_str, tasks, signal_str, core_str);
431 		} else if ((reduce_task_exit_msg == 0) ||
432 			   (msg_printed == 0) ||
433 			   (msg->return_code != last_task_exit_rc)) {
434 			error("%s: %s %s: %s%s",
435 			      hosts, task_str, tasks, signal_str, core_str);
436 			msg_printed = 1;
437 		}
438 		/*
439 		 * Even though lower numbered signals can be stronger than
440 		 * higher numbered signals, keep the highest signal so that it's
441 		 * predicatable to the user.
442 		 */
443 		rc = WTERMSIG(msg->return_code);
444 		if (((*local_global_rc & 0xff) != SIG_OOM) &&
445 		    (!WIFSIGNALED(*local_global_rc) ||
446 		     (rc > WTERMSIG(*local_global_rc))))
447 			*local_global_rc = msg->return_code;
448 	}
449 	xfree(tasks);
450 	xfree(hosts);
451 
452 	task_state = task_state_find(msg->job_id, msg->step_id, NO_VAL,
453 				     task_state_list);
454 	if (task_state) {
455 		_update_task_exit_state(task_state, msg->num_tasks,
456 					msg->task_id_list, !normal_exit);
457 	} else {
458 		error("%s: Could not find task state for step %u.%u", __func__,
459 		      msg->job_id, msg->step_id);
460 	}
461 
462 	if (task_state_first_abnormal_exit(task_state_list) &&
463 	    _kill_on_bad_exit())
464 		(void) _step_signal(SIG_TERM_KILL);
465 
466 	if (task_state_first_exit(task_state_list) && opt_save &&
467 	    (opt_save->srun_opt->max_wait > 0))
468 		_setup_max_wait_timer();
469 
470 	last_task_exit_rc = msg->return_code;
471 	slurm_mutex_unlock(&launch_lock);
472 }
473 
474 /*
475  * Load the multi_prog config file into argv, pass the  entire file contents
476  * in order to avoid having to read the file on every node. We could parse
477  * the infomration here too for loading the MPIR records for TotalView
478  */
_load_multi(int * argc,char ** argv)479 static void _load_multi(int *argc, char **argv)
480 {
481 	int config_fd, data_read = 0, i;
482 	struct stat stat_buf;
483 	char *data_buf;
484 
485 	if ((config_fd = open(argv[0], O_RDONLY)) == -1) {
486 		error("Could not open multi_prog config file %s",
487 		      argv[0]);
488 		exit(error_exit);
489 	}
490 	if (fstat(config_fd, &stat_buf) == -1) {
491 		error("Could not stat multi_prog config file %s",
492 		      argv[0]);
493 		exit(error_exit);
494 	}
495 	if (stat_buf.st_size > 60000) {
496 		error("Multi_prog config file %s is too large",
497 		      argv[0]);
498 		exit(error_exit);
499 	}
500 	data_buf = xmalloc(stat_buf.st_size + 1);
501 	while ((i = read(config_fd, &data_buf[data_read], stat_buf.st_size
502 			 - data_read)) != 0) {
503 		if (i < 0) {
504 			error("Error reading multi_prog config file %s",
505 			      argv[0]);
506 			exit(error_exit);
507 		} else
508 			data_read += i;
509 	}
510 	close(config_fd);
511 
512 	for (i = *argc+1; i > 1; i--)
513 		argv[i] = argv[i-1];
514 	argv[1] = data_buf;
515 	*argc += 1;
516 }
517 
518 /*
519  * init() is called when the plugin is loaded, before any other functions
520  *	are called.  Put global initialization here.
521  */
init(void)522 extern int init(void)
523 {
524 	verbose("%s loaded", plugin_name);
525 	return SLURM_SUCCESS;
526 }
527 
528 /*
529  * fini() is called when the plugin is removed. Clear any allocated
530  *	storage here.
531  */
fini(void)532 extern int fini(void)
533 {
534 	FREE_NULL_LIST(task_state_list);
535 
536 	return SLURM_SUCCESS;
537 }
538 
launch_p_setup_srun_opt(char ** rest,slurm_opt_t * opt_local)539 extern int launch_p_setup_srun_opt(char **rest, slurm_opt_t *opt_local)
540 {
541 	srun_opt_t *srun_opt = opt_local->srun_opt;
542 	xassert(srun_opt);
543 	if (srun_opt->debugger_test)
544 		MPIR_being_debugged = 1;
545 
546 	/*
547 	 * We need to do +2 here just in case multi-prog is needed
548 	 * (we add an extra argv on so just make space for it).
549 	 */
550 	srun_opt->argv = xmalloc((srun_opt->argc + 2) * sizeof(char *));
551 
552 	return 0;
553 }
554 
launch_p_handle_multi_prog_verify(int command_pos,slurm_opt_t * opt_local)555 extern int launch_p_handle_multi_prog_verify(int command_pos,
556 					     slurm_opt_t *opt_local)
557 {
558 	srun_opt_t *srun_opt = opt_local->srun_opt;
559 	xassert(srun_opt);
560 
561 	if (srun_opt->multi_prog) {
562 		if (srun_opt->argc < 1) {
563 			error("configuration file not specified");
564 			exit(error_exit);
565 		}
566 		_load_multi(&srun_opt->argc, srun_opt->argv);
567 		if (verify_multi_name(srun_opt->argv[command_pos], opt_local))
568 			exit(error_exit);
569 		return 1;
570 	} else
571 		return 0;
572 }
573 
launch_p_create_job_step(srun_job_t * job,bool use_all_cpus,void (* signal_function)(int),sig_atomic_t * destroy_job,slurm_opt_t * opt_local)574 extern int launch_p_create_job_step(srun_job_t *job, bool use_all_cpus,
575 				    void (*signal_function)(int),
576 				    sig_atomic_t *destroy_job,
577 				    slurm_opt_t *opt_local)
578 {
579 	if (launch_common_create_job_step(job, use_all_cpus,
580 					  signal_function, destroy_job,
581 					  opt_local) != SLURM_SUCCESS)
582 		return SLURM_ERROR;
583 
584 	/* set the jobid for totalview */
585 	if (!totalview_jobid) {
586 		xstrfmtcat(totalview_jobid,  "%u", job->jobid);
587 		xstrfmtcat(totalview_stepid, "%u", job->stepid);
588 	}
589 
590 	return SLURM_SUCCESS;
591 }
592 
_build_user_env(srun_job_t * job,slurm_opt_t * opt_local)593 static char **_build_user_env(srun_job_t *job, slurm_opt_t *opt_local)
594 {
595 	srun_opt_t *srun_opt = opt_local->srun_opt;
596 	char **dest_array = NULL;
597 	char *tmp_env, *tok, *save_ptr = NULL, *eq_ptr, *value;
598 	bool all;
599 	xassert(srun_opt);
600 
601 	if (!srun_opt->export_env) {
602 		all = true;
603 	} else {
604 		all = false;
605 		tmp_env = xstrdup(srun_opt->export_env);
606 		tok = find_quote_token(tmp_env, ",", &save_ptr);
607 		while (tok) {
608 			if (xstrcasecmp(tok, "ALL") == 0)
609 				all = true;
610 
611 			if (!xstrcasecmp(tok, "NONE"))
612 				break;
613 			eq_ptr = strchr(tok, '=');
614 			if (eq_ptr) {
615 				eq_ptr[0] = '\0';
616 				value = eq_ptr + 1;
617 				env_array_overwrite(&dest_array, tok, value);
618 			} else {
619 				value = getenv(tok);
620 				if (value) {
621 					env_array_overwrite(&dest_array, tok,
622 							    value);
623 				}
624 			}
625 			tok = find_quote_token(NULL, ",", &save_ptr);
626 		}
627 		xfree(tmp_env);
628 	}
629 
630 	if (!job->env)
631 		fatal("%s: job env is NULL", __func__);
632 	else if (all)
633 		env_array_merge(&dest_array, (const char **) job->env);
634 	else
635 		env_array_merge_slurm(&dest_array, (const char **) job->env);
636 
637 	return dest_array;
638 }
639 
_task_state_del(void * x)640 static void _task_state_del(void *x)
641 {
642 	task_state_t task_state = (task_state_t) x;
643 
644 	task_state_destroy(task_state);
645 }
646 
647 /*
648  * Return only after all hetjob components reach this point (or timeout)
649  */
_wait_all_het_job_comps_started(slurm_opt_t * opt_local)650 static void _wait_all_het_job_comps_started(slurm_opt_t *opt_local)
651 {
652 	srun_opt_t *srun_opt = opt_local->srun_opt;
653 	static int start_cnt = 0;
654 	static int total_cnt = -1;
655 	struct timeval  now;
656 	struct timespec timeout;
657 	int rc;
658 	xassert(srun_opt);
659 
660 	slurm_mutex_lock(&start_mutex);
661 	if (total_cnt == -1)
662 		total_cnt = srun_opt->het_step_cnt;
663 	start_cnt++;
664 	while (start_cnt < total_cnt) {
665 		gettimeofday(&now, NULL);
666 		timeout.tv_sec = now.tv_sec + 10;	/* 10 sec delay max */
667 		timeout.tv_nsec = now.tv_usec * 1000;
668 		rc = pthread_cond_timedwait(&start_cond, &start_mutex,
669 					    &timeout);
670 		if (rc == ETIMEDOUT)
671 			break;
672 	}
673 	slurm_cond_broadcast(&start_cond);
674 	slurm_mutex_unlock(&start_mutex);
675 }
676 
launch_p_step_launch(srun_job_t * job,slurm_step_io_fds_t * cio_fds,uint32_t * global_rc,slurm_step_launch_callbacks_t * step_callbacks,slurm_opt_t * opt_local)677 extern int launch_p_step_launch(srun_job_t *job, slurm_step_io_fds_t *cio_fds,
678 				uint32_t *global_rc,
679 				slurm_step_launch_callbacks_t *step_callbacks,
680 				slurm_opt_t *opt_local)
681 {
682 	srun_job_t *local_srun_job;
683 	srun_opt_t *srun_opt = opt_local->srun_opt;
684 	slurm_step_launch_params_t launch_params;
685 	slurm_step_launch_callbacks_t callbacks;
686 	int rc = SLURM_SUCCESS;
687 	task_state_t task_state;
688 	bool first_launch = false;
689 	uint32_t def_cpu_bind_type = 0;
690 	char tmp_str[128];
691 	xassert(srun_opt);
692 
693 	slurm_step_launch_params_t_init(&launch_params);
694 	memcpy(&callbacks, step_callbacks, sizeof(callbacks));
695 
696 	task_state = task_state_find(job->jobid, job->stepid,
697 				     job->het_job_offset, task_state_list);
698 	if (!task_state) {
699 		task_state = task_state_create(job->jobid, job->stepid,
700 					       job->het_job_offset, job->ntasks,
701 					       job->het_job_task_offset);
702 		slurm_mutex_lock(&het_job_lock);
703 		if (!local_job_list)
704 			local_job_list = list_create(NULL);
705 		if (!task_state_list)
706 			task_state_list = list_create(_task_state_del);
707 		slurm_mutex_unlock(&het_job_lock);
708 		local_srun_job = job;
709 		local_global_rc = global_rc;
710 		list_append(local_job_list, local_srun_job);
711 		list_append(task_state_list, task_state);
712 		first_launch = true;
713 	} else {
714 		/* Launching extra POE tasks */
715 		task_state_alter(task_state, job->ntasks);
716 	}
717 
718 	launch_params.gid = opt_local->gid;
719 	launch_params.alias_list = job->alias_list;
720 	launch_params.argc = srun_opt->argc;
721 	launch_params.argv = srun_opt->argv;
722 	launch_params.multi_prog = srun_opt->multi_prog ? true : false;
723 	launch_params.cwd = opt_local->chdir;
724 	launch_params.slurmd_debug = srun_opt->slurmd_debug;
725 	launch_params.buffered_stdio = !srun_opt->unbuffered;
726 	launch_params.labelio = srun_opt->labelio ? true : false;
727 	launch_params.remote_output_filename = fname_remote_string(job->ofname);
728 	launch_params.remote_input_filename  = fname_remote_string(job->ifname);
729 	launch_params.remote_error_filename  = fname_remote_string(job->efname);
730 	launch_params.het_job_node_offset = job->het_job_node_offset;
731 	launch_params.het_job_id  = job->het_job_id;
732 	launch_params.het_job_nnodes = job->het_job_nnodes;
733 	launch_params.het_job_ntasks = job->het_job_ntasks;
734 	launch_params.het_job_offset = job->het_job_offset;
735 	launch_params.het_job_step_cnt = srun_opt->het_step_cnt;
736 	launch_params.het_job_task_offset = job->het_job_task_offset;
737 	launch_params.het_job_task_cnts = job->het_job_task_cnts;
738 	launch_params.het_job_tids = job->het_job_tids;
739 	launch_params.het_job_tid_offsets = job->het_job_tid_offsets;
740 	launch_params.het_job_node_list = job->het_job_node_list;
741 	launch_params.partition = job->partition;
742 	launch_params.profile = opt_local->profile;
743 	launch_params.task_prolog = srun_opt->task_prolog;
744 	launch_params.task_epilog = srun_opt->task_epilog;
745 
746 	slurm_step_ctx_get(job->step_ctx, SLURM_STEP_CTX_DEF_CPU_BIND_TYPE,
747 			   &def_cpu_bind_type);
748 	if (slurm_verify_cpu_bind(NULL, &srun_opt->cpu_bind,
749 				  &srun_opt->cpu_bind_type,
750 				  def_cpu_bind_type)) {
751 		return SLURM_ERROR;
752 	}
753 	slurm_sprint_cpu_bind_type(tmp_str, srun_opt->cpu_bind_type);
754 	verbose("CpuBindType=%s", tmp_str);
755 	launch_params.cpu_bind = srun_opt->cpu_bind;
756 	launch_params.cpu_bind_type = srun_opt->cpu_bind_type;
757 
758 	launch_params.mem_bind = opt_local->mem_bind;
759 	launch_params.mem_bind_type = opt_local->mem_bind_type;
760 	launch_params.accel_bind_type = srun_opt->accel_bind_type;
761 	launch_params.open_mode = srun_opt->open_mode;
762 	if (opt_local->acctg_freq)
763 		launch_params.acctg_freq = opt_local->acctg_freq;
764 	launch_params.pty = srun_opt->pty;
765 	if (opt_local->cpus_set)
766 		launch_params.cpus_per_task	= opt_local->cpus_per_task;
767 	else
768 		launch_params.cpus_per_task	= 1;
769 	launch_params.cpu_freq_min       = opt_local->cpu_freq_min;
770 	launch_params.cpu_freq_max       = opt_local->cpu_freq_max;
771 	launch_params.cpu_freq_gov       = opt_local->cpu_freq_gov;
772 	launch_params.tres_bind          = opt_local->tres_bind;
773 	launch_params.tres_freq          = opt_local->tres_freq;
774 	launch_params.task_dist          = opt_local->distribution;
775 	launch_params.preserve_env       = srun_opt->preserve_env;
776 	launch_params.spank_job_env      = opt_local->spank_job_env;
777 	launch_params.spank_job_env_size = opt_local->spank_job_env_size;
778 	launch_params.ntasks_per_board   = job->ntasks_per_board;
779 	launch_params.ntasks_per_core    = job->ntasks_per_core;
780 	launch_params.ntasks_per_socket  = job->ntasks_per_socket;
781 	launch_params.no_alloc           = srun_opt->no_alloc;
782 	launch_params.env = _build_user_env(job, opt_local);
783 
784 	memcpy(&launch_params.local_fds, cio_fds, sizeof(slurm_step_io_fds_t));
785 
786 	if (MPIR_being_debugged) {
787 		launch_params.parallel_debug = true;
788 		pmi_server_max_threads(1);
789 	} else {
790 		launch_params.parallel_debug = false;
791 	}
792 	/*
793 	 * Normally this isn't used, but if an outside process (other
794 	 * than srun (poe) is using this logic to launch tasks then we
795 	 * can use this to signal the step.
796 	 */
797 	callbacks.task_start = _task_start;
798 	/*
799 	 * If poe is using this code with multi-prog it always returns
800 	 * 1 for each task which could be confusing since no real
801 	 * error happened.
802 	 */
803 	if (!launch_params.multi_prog
804 	    || (!callbacks.step_signal
805 		|| (callbacks.step_signal == launch_g_fwd_signal))) {
806 		callbacks.task_finish = _task_finish;
807 		slurm_mutex_lock(&launch_lock);
808 		if (!opt_save) {
809 			/*
810 			 * Save opt_local parameters since _task_finish()
811 			 * will lack the values
812 			 */
813 			opt_save = xmalloc(sizeof(slurm_opt_t));
814 			memcpy(opt_save, opt_local, sizeof(slurm_opt_t));
815 			opt_save->srun_opt = xmalloc(sizeof(srun_opt_t));
816 			memcpy(opt_save->srun_opt, srun_opt,
817 			       sizeof(srun_opt_t));
818 		}
819 		slurm_mutex_unlock(&launch_lock);
820 	}
821 
822 	update_job_state(job, SRUN_JOB_LAUNCHING);
823 	launch_start_time = time(NULL);
824 	if (first_launch) {
825 		if (slurm_step_launch(job->step_ctx, &launch_params,
826 				      &callbacks) != SLURM_SUCCESS) {
827 			rc = errno;
828 			*local_global_rc = errno;
829 			error("Application launch failed: %m");
830 			slurm_step_launch_abort(job->step_ctx);
831 			slurm_step_launch_wait_finish(job->step_ctx);
832 			goto cleanup;
833 		}
834 	} else {
835 		if (slurm_step_launch_add(job->step_ctx, job->step_ctx,
836 					  &launch_params, job->nodelist,
837 					  job->fir_nodeid) != SLURM_SUCCESS) {
838 			rc = errno;
839 			*local_global_rc = errno;
840 			error("Application launch add failed: %m");
841 			slurm_step_launch_abort(job->step_ctx);
842 			slurm_step_launch_wait_finish(job->step_ctx);
843 			goto cleanup;
844 		}
845 	}
846 
847 	update_job_state(job, SRUN_JOB_STARTING);
848 	if (slurm_step_launch_wait_start(job->step_ctx) == SLURM_SUCCESS) {
849 		update_job_state(job, SRUN_JOB_RUNNING);
850 		/*
851 		 * Only set up MPIR structures if the step launched correctly
852 		 */
853 		if (srun_opt->multi_prog) {
854 			mpir_set_multi_name(job->ntasks,
855 					    launch_params.argv[0]);
856 		} else {
857 			mpir_set_executable_names(launch_params.argv[0],
858 						  job->het_job_task_offset,
859 						  job->ntasks);
860 		}
861 
862 		_wait_all_het_job_comps_started(opt_local);
863 		MPIR_debug_state = MPIR_DEBUG_SPAWNED;
864 		if (srun_opt->debugger_test)
865 			mpir_dump_proctable();
866 		else if (srun_opt->parallel_debug)
867 			MPIR_Breakpoint(job);
868 	} else {
869 		info("Job step %u.%u aborted before step completely launched.",
870 		     job->jobid, job->stepid);
871 	}
872 
873 cleanup:
874 	return rc;
875 }
876 
launch_p_step_wait(srun_job_t * job,bool got_alloc,slurm_opt_t * opt_local)877 extern int launch_p_step_wait(srun_job_t *job, bool got_alloc,
878 			      slurm_opt_t *opt_local)
879 {
880 	int rc = 0;
881 
882 	slurm_step_launch_wait_finish(job->step_ctx);
883 	if ((MPIR_being_debugged == 0) && retry_step_begin &&
884 	    (retry_step_cnt < MAX_STEP_RETRIES) &&
885 	     (job->het_job_id == NO_VAL)) {	/* Not hetjob step */
886 		retry_step_begin = false;
887 		slurm_step_ctx_destroy(job->step_ctx);
888 		if (got_alloc)
889 			rc = create_job_step(job, true, opt_local);
890 		else
891 			rc = create_job_step(job, false, opt_local);
892 		if (rc < 0)
893 			exit(error_exit);
894 		rc = -1;
895 	}
896 	return rc;
897 }
898 
_step_signal(int signal)899 static int _step_signal(int signal)
900 {
901 	srun_job_t *my_srun_job;
902 	ListIterator iter;
903 	int rc = SLURM_SUCCESS, rc2;
904 
905 	if (!local_job_list) {
906 		debug("%s: local_job_list does not exist yet", __func__);
907 		return SLURM_ERROR;
908 	}
909 
910 	iter = list_iterator_create(local_job_list);
911 	while ((my_srun_job = (srun_job_t *) list_next(iter))) {
912 		info("Terminating job step %u.%u",
913 		      my_srun_job->jobid, my_srun_job->stepid);
914 		rc2 = slurm_kill_job_step(my_srun_job->jobid,
915 					  my_srun_job->stepid, signal);
916 		if (rc2)
917 			rc = rc2;
918 	}
919 	list_iterator_destroy(iter);
920 	return rc;
921 }
922 
launch_p_step_terminate(void)923 extern int launch_p_step_terminate(void)
924 {
925 	return _step_signal(SIGKILL);
926 
927 }
928 
launch_p_print_status(void)929 extern void launch_p_print_status(void)
930 {
931 	task_state_print(task_state_list, (log_f)info);
932 }
933 
launch_p_fwd_signal(int signal)934 extern void launch_p_fwd_signal(int signal)
935 {
936 	srun_job_t *my_srun_job;
937 	ListIterator iter;
938 
939 	if (!local_job_list) {
940 		debug("%s: local_job_list does not exist yet", __func__);
941 		return;
942 	}
943 
944 	iter = list_iterator_create(local_job_list);
945 	while ((my_srun_job = (srun_job_t *) list_next(iter))) {
946 		switch (signal) {
947 		case SIGKILL:
948 			slurm_step_launch_abort(my_srun_job->step_ctx);
949 			break;
950 		default:
951 			slurm_step_launch_fwd_signal(my_srun_job->step_ctx,
952 						     signal);
953 			break;
954 		}
955 	}
956 	list_iterator_destroy(iter);
957 }
958