1 /****************************************************************************\
2  *  srun_job.c - job data structure creation functions
3  *****************************************************************************
4  *  Copyright (C) 2002-2007 The Regents of the University of California.
5  *  Copyright (C) 2008 Lawrence Livermore National Security.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Mark Grondona <grondona@llnl.gov>.
8  *  CODE-OCEC-09-009. All rights reserved.
9  *
10  *  This file is part of Slurm, a resource management program.
11  *  For details, see <https://slurm.schedmd.com/>.
12  *  Please also read the included file: DISCLAIMER.
13  *
14  *  Slurm is free software; you can redistribute it and/or modify it under
15  *  the terms of the GNU General Public License as published by the Free
16  *  Software Foundation; either version 2 of the License, or (at your option)
17  *  any later version.
18  *
19  *  In addition, as a special exception, the copyright holders give permission
20  *  to link the code of portions of this program with the OpenSSL library under
21  *  certain conditions as described in each individual source file, and
22  *  distribute linked combinations including the two. You must obey the GNU
23  *  General Public License in all respects for all of the code used other than
24  *  OpenSSL. If you modify file(s) with this exception, you may extend this
25  *  exception to your version of the file(s), but you are not obligated to do
26  *  so. If you do not wish to do so, delete this exception statement from your
27  *  version.  If you delete this exception statement from all source files in
28  *  the program, then also delete it here.
29  *
30  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
33  *  details.
34  *
35  *  You should have received a copy of the GNU General Public License along
36  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
37  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
38 \*****************************************************************************/
39 
40 #include "config.h"
41 
42 #include <fcntl.h>
43 #include <grp.h>
44 #include <netdb.h>
45 #include <signal.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <sys/param.h>           /* MAXPATHLEN */
49 #include <sys/resource.h>
50 #include <sys/stat.h>
51 #include <sys/types.h>
52 #include <sys/wait.h>
53 #include <unistd.h>
54 
55 #include "src/common/bitstring.h"
56 #include "src/common/cli_filter.h"
57 #include "src/common/cbuf.h"
58 #include "src/common/fd.h"
59 #include "src/common/forward.h"
60 #include "src/common/hostlist.h"
61 #include "src/common/io_hdr.h"
62 #include "src/common/log.h"
63 #include "src/common/macros.h"
64 #include "src/common/plugstack.h"
65 #include "src/common/proc_args.h"
66 #include "src/common/read_config.h"
67 #include "src/common/slurm_opt.h"
68 #include "src/common/slurm_protocol_api.h"
69 #include "src/common/slurm_rlimits_info.h"
70 #include "src/common/uid.h"
71 #include "src/common/xmalloc.h"
72 #include "src/common/xsignal.h"
73 #include "src/common/xstring.h"
74 
75 #include "src/api/step_launch.h"
76 
77 #include "src/srun/libsrun/allocate.h"
78 #include "src/srun/libsrun/debugger.h"
79 #include "src/srun/libsrun/fname.h"
80 #include "src/srun/libsrun/launch.h"
81 #include "src/srun/libsrun/opt.h"
82 #include "src/srun/libsrun/multi_prog.h"
83 #include "src/srun/libsrun/srun_job.h"
84 
85 /*
86  * allocation information structure used to store general information
87  * about node allocation to be passed to _job_create_structure()
88  */
89 typedef struct allocation_info {
90 	char                   *alias_list;
91 	uint16_t               *cpus_per_node;
92 	uint32_t               *cpu_count_reps;
93 	uint32_t                jobid;
94 	uint32_t                nnodes;
95 	char                   *nodelist;
96 	uint16_t ntasks_per_board;/* number of tasks to invoke on each board */
97 	uint16_t ntasks_per_core; /* number of tasks to invoke on each core */
98 	uint16_t ntasks_per_socket;/* number of tasks to invoke on
99 				    * each socket */
100 	uint32_t                num_cpu_groups;
101 	char                   *partition;
102 	dynamic_plugin_data_t  *select_jobinfo;
103 	uint32_t                stepid;
104 } allocation_info_t;
105 
106 typedef struct het_job_resp_struct {
107 	char **alias_list;
108 	uint16_t *cpu_cnt;
109 	hostlist_t host_list;
110 	uint32_t node_cnt;
111 } het_job_resp_struct_t;
112 
113 
114 static int shepherd_fd = -1;
115 static pthread_t signal_thread = (pthread_t) 0;
116 static int pty_sigarray[] = { SIGWINCH, 0 };
117 
118 extern char **environ;
119 
120 /*
121  * Prototypes:
122  */
123 
124 static int  _become_user(void);
125 static void _call_spank_fini(void);
126 static int  _call_spank_local_user(srun_job_t *job, slurm_opt_t *opt_local);
127 static void _default_sigaction(int sig);
128 static long _diff_tv_str(struct timeval *tv1, struct timeval *tv2);
129 static void _handle_intr(srun_job_t *job);
130 static void _handle_pipe(void);
131 static srun_job_t *_job_create_structure(allocation_info_t *ainfo,
132 					 slurm_opt_t *opt_local);
133 static char *_normalize_hostlist(const char *hostlist);
134 static void _print_job_information(resource_allocation_response_msg_t *resp);
135 static void _run_srun_epilog (srun_job_t *job);
136 static void _run_srun_prolog (srun_job_t *job);
137 static int  _run_srun_script (srun_job_t *job, char *script);
138 static void _set_env_vars(resource_allocation_response_msg_t *resp,
139 			  int het_job_offset);
140 static void _set_env_vars2(resource_allocation_response_msg_t *resp,
141 			   int het_job_offset);
142 static void _set_ntasks(allocation_info_t *ai, slurm_opt_t *opt_local);
143 static void _set_prio_process_env(void);
144 static int  _set_rlimit_env(void);
145 static void _set_submit_dir_env(void);
146 static int  _set_umask_env(void);
147 static void _shepherd_notify(int shepherd_fd);
148 static int  _shepherd_spawn(srun_job_t *job, List srun_job_list,
149 			     bool got_alloc);
150 static void *_srun_signal_mgr(void *no_data);
151 static void _srun_cli_filter_post_submit(uint32_t jobid, uint32_t stepid);
152 static void _step_opt_exclusive(slurm_opt_t *opt_local);
153 static int  _validate_relative(resource_allocation_response_msg_t *resp,
154 			       slurm_opt_t *opt_local);
155 
156 
157 /*
158  * Create an srun job structure w/out an allocation response msg.
159  * (i.e. use the command line options)
160  */
161 srun_job_t *
job_create_noalloc(void)162 job_create_noalloc(void)
163 {
164 	srun_job_t *job = NULL;
165 	allocation_info_t *ai = xmalloc(sizeof(allocation_info_t));
166 	uint16_t cpn[1];
167 	uint32_t cpu_count_reps[1];
168 	slurm_opt_t *opt_local = &opt;
169 	hostlist_t  hl = hostlist_create(opt_local->nodelist);
170 
171 	if (!hl) {
172 		error("Invalid node list `%s' specified", opt_local->nodelist);
173 		goto error;
174 	}
175 	srand48(getpid());
176 	ai->jobid          = MIN_NOALLOC_JOBID +
177 			     ((uint32_t) lrand48() %
178 			      (MAX_NOALLOC_JOBID - MIN_NOALLOC_JOBID + 1));
179 	ai->stepid         = (uint32_t) (lrand48());
180 	ai->nodelist       = opt_local->nodelist;
181 	ai->nnodes         = hostlist_count(hl);
182 
183 	hostlist_destroy(hl);
184 
185 	cpn[0] = (opt_local->ntasks + ai->nnodes - 1) / ai->nnodes;
186 	ai->cpus_per_node  = cpn;
187 	cpu_count_reps[0] = ai->nnodes;
188 	ai->cpu_count_reps = cpu_count_reps;
189 	ai->num_cpu_groups = 1;
190 
191 	/*
192 	 * Create job, then fill in host addresses
193 	 */
194 	job = _job_create_structure(ai, opt_local);
195 
196 	if (job != NULL)
197 		job_update_io_fnames(job, opt_local);
198 
199 error:
200 	xfree(ai);
201 	return (job);
202 
203 }
204 
205 /*
206  * Create an srun job structure for a step w/out an allocation response msg.
207  * (i.e. inside an allocation)
208  */
job_step_create_allocation(resource_allocation_response_msg_t * resp,slurm_opt_t * opt_local)209 extern srun_job_t *job_step_create_allocation(
210 			resource_allocation_response_msg_t *resp,
211 			slurm_opt_t *opt_local)
212 {
213 	srun_opt_t *srun_opt = opt_local->srun_opt;
214 	uint32_t job_id = resp->job_id;
215 	srun_job_t *job = NULL;
216 	allocation_info_t *ai = xmalloc(sizeof(allocation_info_t));
217 	hostlist_t hl = NULL;
218 	char *buf = NULL;
219 	int count = 0;
220 	uint32_t alloc_count = 0;
221 	char *step_nodelist = NULL;
222 	xassert(srun_opt);
223 
224 	ai->jobid          = job_id;
225 	ai->stepid         = NO_VAL;
226 	ai->alias_list     = resp->alias_list;
227 	if (srun_opt->alloc_nodelist)
228 		ai->nodelist = xstrdup(srun_opt->alloc_nodelist);
229 	else
230 		ai->nodelist = xstrdup(resp->node_list);
231 	hl = hostlist_create(ai->nodelist);
232 	hostlist_uniq(hl);
233 	alloc_count = hostlist_count(hl);
234 	ai->nnodes = alloc_count;
235 	hostlist_destroy(hl);
236 
237 	if (opt_local->exclude) {
238 		hostlist_t exc_hl = hostlist_create(opt_local->exclude);
239 		hostlist_t inc_hl = NULL;
240 		char *node_name = NULL;
241 
242 		hl = hostlist_create(ai->nodelist);
243 		if (opt_local->nodelist)
244 			inc_hl = hostlist_create(opt_local->nodelist);
245 		hostlist_uniq(hl);
246 		//info("using %s or %s", opt_local->nodelist, ai->nodelist);
247 		while ((node_name = hostlist_shift(exc_hl))) {
248 			int inx = hostlist_find(hl, node_name);
249 			if (inx >= 0) {
250 				debug("excluding node %s", node_name);
251 				hostlist_delete_nth(hl, inx);
252 				ai->nnodes--;	/* decrement node count */
253 			}
254 			if (inc_hl) {
255 				inx = hostlist_find(inc_hl, node_name);
256 				if (inx >= 0) {
257 					error("Requested node %s is also "
258 					      "in the excluded list.",
259 					      node_name);
260 					error("Job not submitted.");
261 					hostlist_destroy(exc_hl);
262 					hostlist_destroy(inc_hl);
263 					goto error;
264 				}
265 			}
266 			free(node_name);
267 		}
268 		hostlist_destroy(exc_hl);
269 
270 		/* we need to set this here so if there are more nodes
271 		 * available than we requested we can set it
272 		 * straight. If there is no exclude list then we set
273 		 * the vars then.
274 		 */
275 		if (!opt_local->nodes_set) {
276 			/* we don't want to set the number of nodes =
277 			 * to the number of requested processes unless we
278 			 * know it is less than the number of nodes
279 			 * in the allocation
280 			 */
281 			if (opt_local->ntasks_set &&
282 			    (opt_local->ntasks < ai->nnodes))
283 				opt_local->min_nodes = opt_local->ntasks;
284 			else
285 				opt_local->min_nodes = ai->nnodes;
286 			opt_local->nodes_set = true;
287 		}
288 		if (!opt_local->max_nodes)
289 			opt_local->max_nodes = opt_local->min_nodes;
290 		if ((opt_local->max_nodes > 0) &&
291 		    (opt_local->max_nodes < ai->nnodes))
292 			ai->nnodes = opt_local->max_nodes;
293 
294 		count = hostlist_count(hl);
295 		if (!count) {
296 			error("Hostlist is empty!  Can't run job.");
297 			hostlist_destroy(hl);
298 			goto error;
299 		}
300 		if (inc_hl) {
301 			count = hostlist_count(inc_hl);
302 			if (count < ai->nnodes) {
303 				/* add more nodes to get correct number for
304 				   allocation */
305 				hostlist_t tmp_hl = hostlist_copy(hl);
306 				int i = 0;
307 				int diff = ai->nnodes - count;
308 				buf = hostlist_ranged_string_xmalloc(inc_hl);
309 				hostlist_delete(tmp_hl, buf);
310 				xfree(buf);
311 				while ((i < diff) &&
312 				       (node_name = hostlist_shift(tmp_hl))) {
313 					hostlist_push_host(inc_hl, node_name);
314 					free(node_name);
315 					i++;
316 				}
317 				hostlist_destroy(tmp_hl);
318 			}
319 			buf = hostlist_ranged_string_xmalloc(inc_hl);
320 			hostlist_destroy(inc_hl);
321 			xfree(opt_local->nodelist);
322 			opt_local->nodelist = buf;
323 		} else {
324 			if (count > ai->nnodes) {
325 				/* remove more nodes than needed for
326 				 * allocation */
327 				int i;
328 				for (i = count; i >= ai->nnodes; i--)
329 					hostlist_delete_nth(hl, i);
330 			}
331 			xfree(opt_local->nodelist);
332 			opt_local->nodelist = hostlist_ranged_string_xmalloc(hl);
333 		}
334 
335 		hostlist_destroy(hl);
336 	} else {
337 		if (!opt_local->nodes_set) {
338 			/* we don't want to set the number of nodes =
339 			 * to the number of requested processes unless we
340 			 * know it is less than the number of nodes
341 			 * in the allocation
342 			 */
343 			if (opt_local->ntasks_set &&
344 			    (opt_local->ntasks < ai->nnodes))
345 				opt_local->min_nodes = opt_local->ntasks;
346 			else
347 				opt_local->min_nodes = ai->nnodes;
348 			opt_local->nodes_set = true;
349 		}
350 		if (!opt_local->max_nodes)
351 			opt_local->max_nodes = opt_local->min_nodes;
352 		if ((opt_local->max_nodes > 0) &&
353 		    (opt_local->max_nodes < ai->nnodes))
354 			ai->nnodes = opt_local->max_nodes;
355 		/* Don't reset the ai->nodelist because that is the
356 		 * nodelist we want to say the allocation is under
357 		 * opt_local->nodelist is what is used for the allocation.
358 		 */
359 		/* xfree(ai->nodelist); */
360 		/* ai->nodelist = xstrdup(buf); */
361 	}
362 
363 	/* get the correct number of hosts to run tasks on */
364 	if (opt_local->nodelist)
365 		step_nodelist = opt_local->nodelist;
366 	else if (((opt_local->distribution & SLURM_DIST_STATE_BASE) ==
367 		  SLURM_DIST_ARBITRARY) && (count == 0))
368 		step_nodelist = getenv("SLURM_ARBITRARY_NODELIST");
369 	if (step_nodelist) {
370 		hl = hostlist_create(step_nodelist);
371 		if ((opt_local->distribution & SLURM_DIST_STATE_BASE) !=
372 		    SLURM_DIST_ARBITRARY)
373 			hostlist_uniq(hl);
374 		if (!hostlist_count(hl)) {
375 			error("Hostlist is empty!  Can not run job.");
376 			hostlist_destroy(hl);
377 			goto error;
378 		}
379 
380 		buf = hostlist_ranged_string_xmalloc(hl);
381 		count = hostlist_count(hl);
382 		hostlist_destroy(hl);
383 		/*
384 		 * Don't reset the ai->nodelist because that is the
385 		 * nodelist we want to say the allocation is under
386 		 * opt_local->nodelist is what is used for the allocation.
387 		 */
388 		/* xfree(ai->nodelist); */
389 		/* ai->nodelist = xstrdup(buf); */
390 		xfree(opt_local->nodelist);
391 		opt_local->nodelist = buf;
392 	}
393 
394 	if (((opt_local->distribution & SLURM_DIST_STATE_BASE) ==
395 	     SLURM_DIST_ARBITRARY) && (count != opt_local->ntasks)) {
396 		error("You asked for %d tasks but hostlist specified %d nodes",
397 		      opt_local->ntasks, count);
398 		goto error;
399 	}
400 
401 	if (ai->nnodes == 0) {
402 		error("No nodes in allocation, can't run job");
403 		goto error;
404 	}
405 
406 	ai->num_cpu_groups = resp->num_cpu_groups;
407 	ai->cpus_per_node  = resp->cpus_per_node;
408 	ai->cpu_count_reps = resp->cpu_count_reps;
409 	ai->ntasks_per_board = resp->ntasks_per_board;
410 
411 	/* Here let the srun options override the allocation resp */
412 	ai->ntasks_per_core = (opt_local->ntasks_per_core != NO_VAL) ?
413 		opt_local->ntasks_per_core : resp->ntasks_per_core;
414 	ai->ntasks_per_socket = (opt_local->ntasks_per_socket != NO_VAL) ?
415 		opt_local->ntasks_per_socket : resp->ntasks_per_socket;
416 
417 	ai->partition = resp->partition;
418 
419 /* 	info("looking for %d nodes out of %s with a must list of %s", */
420 /* 	     ai->nnodes, ai->nodelist, opt_local->nodelist); */
421 	/*
422 	 * Create job
423 	 */
424 	job = _job_create_structure(ai, opt_local);
425 error:
426    	xfree(ai);
427 	return (job);
428 
429 }
430 
431 /*
432  * Create an srun job structure from a resource allocation response msg
433  */
job_create_allocation(resource_allocation_response_msg_t * resp,slurm_opt_t * opt_local)434 extern srun_job_t *job_create_allocation(
435 			resource_allocation_response_msg_t *resp,
436 			slurm_opt_t *opt_local)
437 {
438 	srun_job_t *job;
439 	allocation_info_t *i = xmalloc(sizeof(allocation_info_t));
440 
441 	i->alias_list     = resp->alias_list;
442 	i->nodelist       = _normalize_hostlist(resp->node_list);
443 	i->nnodes	  = resp->node_cnt;
444 	i->partition      = resp->partition;
445 	i->jobid          = resp->job_id;
446 	i->stepid         = NO_VAL;
447 	i->num_cpu_groups = resp->num_cpu_groups;
448 	i->cpus_per_node  = resp->cpus_per_node;
449 	i->cpu_count_reps = resp->cpu_count_reps;
450 	i->ntasks_per_board = resp->ntasks_per_board;
451 	i->ntasks_per_core = resp->ntasks_per_core;
452 	i->ntasks_per_socket = resp->ntasks_per_socket;
453 
454 	i->select_jobinfo = select_g_select_jobinfo_copy(resp->select_jobinfo);
455 
456 	job = _job_create_structure(i, opt_local);
457 	if (job) {
458 		job->account = xstrdup(resp->account);
459 		job->qos = xstrdup(resp->qos);
460 		job->resv_name = xstrdup(resp->resv_name);
461 	}
462 
463 	xfree(i->nodelist);
464 	xfree(i);
465 
466 	return (job);
467 }
468 
_copy_args(List missing_argc_list,slurm_opt_t * opt_master)469 static void _copy_args(List missing_argc_list, slurm_opt_t *opt_master)
470 {
471 	srun_opt_t *srun_master = opt_master->srun_opt;
472 	ListIterator iter;
473 	slurm_opt_t *opt_local;
474 	int i;
475 	xassert(srun_master);
476 
477 	iter = list_iterator_create(missing_argc_list);
478 	while ((opt_local = list_next(iter))) {
479 		srun_opt_t *srun_opt = opt_local->srun_opt;
480 		xassert(srun_opt);
481 		srun_opt->argc = srun_master->argc;
482 		srun_opt->argv = xmalloc(sizeof(char *) * (srun_opt->argc+1));
483 		for (i = 0; i < srun_opt->argc; i++)
484 			srun_opt->argv[i] = xstrdup(srun_master->argv[i]);
485 		list_remove(iter);
486 	}
487 	list_iterator_destroy(iter);
488 }
489 
490 /*
491  * Build "het_group" string. If set on execute line, it may need to be
492  * rebuilt for multiple option structures ("--het-group=1,2" becomes two
493  * opt structures). Clear "het_grp_bits".if determined to not be a hetjob.
494  */
_het_grp_test(List opt_list)495 static void _het_grp_test(List opt_list)
496 {
497 	ListIterator iter;
498 	int het_job_offset;
499 	bitstr_t *master_map = NULL;
500 	List missing_argv_list = NULL;
501 	bool multi_comp = false, multi_prog = false;
502 
503 	if (opt_list) {
504 		slurm_opt_t *opt_local;
505 		missing_argv_list = list_create(NULL);
506 		iter = list_iterator_create(opt_list);
507 		while ((opt_local = list_next(iter))) {
508 			srun_opt_t *srun_opt = opt_local->srun_opt;
509 			xassert(srun_opt);
510 			if (srun_opt->argc == 0)
511 				list_append(missing_argv_list, opt_local);
512 			else
513 				_copy_args(missing_argv_list, opt_local);
514 			xfree(srun_opt->het_group);
515 			if (srun_opt->het_grp_bits &&
516 			    ((het_job_offset =
517 			      bit_ffs(srun_opt->het_grp_bits)) >= 0)) {
518 				xstrfmtcat(srun_opt->het_group, "%d",
519 					   het_job_offset);
520 			}
521 			if (!srun_opt->het_grp_bits) {
522 				error("%s: het_grp_bits is NULL", __func__);
523 			} else if (!master_map) {
524 				master_map
525 					= bit_copy(srun_opt->het_grp_bits);
526 			} else {
527 				if (bit_overlap_any(master_map,
528 						    srun_opt->het_grp_bits)) {
529 					fatal("Duplicate het groups in single srun not supported");
530 				}
531 				bit_or(master_map, srun_opt->het_grp_bits);
532 			}
533 			if (srun_opt->multi_prog)
534 				multi_prog = true;
535 		}
536 		if (master_map && (bit_set_count(master_map) > 1))
537 			multi_comp = true;
538 		FREE_NULL_BITMAP(master_map);
539 		list_iterator_destroy(iter);
540 		list_destroy(missing_argv_list);
541 	} else if (!sropt.het_group && !getenv("SLURM_HET_SIZE")) {
542 		FREE_NULL_BITMAP(sropt.het_grp_bits);
543 		/* het_group is already NULL */
544 	} else if (!sropt.het_group && sropt.het_grp_bits) {
545 		if ((het_job_offset = bit_ffs(sropt.het_grp_bits)) < 0)
546 			het_job_offset = 0;
547 		else if (bit_set_count(sropt.het_grp_bits) > 1)
548 			multi_comp = true;
549 		if (sropt.multi_prog)
550 			multi_prog = true;
551 		xstrfmtcat(sropt.het_group, "%d", het_job_offset);
552 	}
553 
554 	if (multi_comp && multi_prog)
555 		fatal("--multi-prog option not supported with multiple het groups");
556 }
557 
558 /*
559  * Copy job name from last component to all hetjob components unless
560  * explicitly set.
561  */
_match_job_name(List opt_list)562 static void _match_job_name(List opt_list)
563 {
564 	int cnt;
565 	ListIterator iter;
566 	slurm_opt_t *opt_local;
567 
568 	if (!opt_list)
569 		return;
570 
571 	cnt = list_count(opt_list);
572 	if (cnt < 2)
573 		return;
574 
575 	iter = list_iterator_create(opt_list);
576 	while ((opt_local = list_next(iter))) {
577 		if (!opt_local->job_name)
578 			opt_local->job_name = xstrdup(opt.job_name);
579 		if (opt_local->srun_opt &&
580 		    (opt_local->srun_opt->open_mode == 0)) {
581 			opt_local->srun_opt->open_mode = OPEN_MODE_APPEND;
582 		}
583 	}
584 	list_iterator_destroy(iter);
585 }
586 
_sort_by_offset(void * x,void * y)587 static int _sort_by_offset(void *x, void *y)
588 {
589 	slurm_opt_t *opt_local1 = *(slurm_opt_t **) x;
590 	slurm_opt_t *opt_local2 = *(slurm_opt_t **) y;
591 	int offset1 = -1, offset2 = -1;
592 
593 	if (opt_local1->srun_opt->het_grp_bits)
594 		offset1 = bit_ffs(opt_local1->srun_opt->het_grp_bits);
595 	if (opt_local2->srun_opt->het_grp_bits)
596 		offset2 = bit_ffs(opt_local2->srun_opt->het_grp_bits);
597 	if (offset1 < offset2)
598 		return -1;
599 	if (offset1 > offset2)
600 		return 1;
601 	return 0;
602 }
603 
_post_opts(List opt_list)604 static void _post_opts(List opt_list)
605 {
606 	_het_grp_test(opt_list);
607 	_match_job_name(opt_list);
608 	if (opt_list)
609 		list_sort(opt_list, _sort_by_offset);
610 }
611 
init_srun(int argc,char ** argv,log_options_t * logopt,int debug_level,bool handle_signals)612 extern void init_srun(int argc, char **argv,
613 		      log_options_t *logopt, int debug_level,
614 		      bool handle_signals)
615 {
616 	bool het_job_fini = false;
617 	int i, het_job_argc, het_job_inx, het_job_argc_off;
618 	char **het_job_argv;
619 
620 	/*
621 	 * This must happen before we spawn any threads
622 	 * which are not designed to handle arbitrary signals
623 	 */
624 	if (handle_signals) {
625 		if (xsignal_block(sig_array) < 0)
626 			error("Unable to block signals");
627 	}
628 	xsignal_block(pty_sigarray);
629 
630 	/*
631 	 * Initialize plugin stack, read options from plugins, etc.
632 	 */
633 	init_spank_env();
634 	if (spank_init(NULL) < 0) {
635 		error("Plug-in initialization failed");
636 		exit(error_exit);
637 	}
638 
639 	/*
640 	 * Be sure to call spank_fini when srun exits.
641 	 */
642 	if (atexit(_call_spank_fini) < 0)
643 		error("Failed to register atexit handler for plugins: %m");
644 
645 	het_job_argc = argc;
646 	het_job_argv = argv;
647 	for (het_job_inx = 0; !het_job_fini; het_job_inx++) {
648 		het_job_argc_off = -1;
649 		if (initialize_and_process_args(het_job_argc, het_job_argv,
650 						&het_job_argc_off) < 0) {
651 			error("srun parameter parsing");
652 			exit(1);
653 		}
654 		if ((het_job_argc_off >= 0) &&
655 		    (het_job_argc_off < het_job_argc)) {
656 			for (i = het_job_argc_off; i < het_job_argc; i++) {
657 				if (!xstrcmp(het_job_argv[i], ":")) {
658 					het_job_argc_off = i;
659 					break;
660 				}
661 			}
662 		}
663 		if ((het_job_argc_off >= 0) &&
664 		    (het_job_argc_off < het_job_argc) &&
665 		    !xstrcmp(het_job_argv[het_job_argc_off], ":")) {
666 			/*
667 			 * move het_job_argv[0] from "srun" to ":"
668 			 */
669 			het_job_argc -= het_job_argc_off;
670 			het_job_argv += het_job_argc_off;
671 		} else {
672 			het_job_fini = true;
673 		}
674 	}
675 	_post_opts(opt_list);
676 
677 	/*
678 	 * reinit log with new verbosity (if changed by command line)
679 	 */
680 	if (logopt && (opt.verbose || opt.quiet)) {
681 		/*
682 		 * If log level is already increased, only increment the
683 		 * level to the difference of opt.verbose an LOG_LEVEL_INFO
684 		 */
685 		if ((opt.verbose -= (logopt->stderr_level - LOG_LEVEL_INFO)) > 0)
686 			logopt->stderr_level += opt.verbose;
687 		logopt->stderr_level -= opt.quiet;
688 		logopt->prefix_level = 1;
689 		log_alter(*logopt, 0, NULL);
690 	} else
691 		opt.verbose = debug_level;
692 
693 	(void) _set_rlimit_env();
694 	_set_prio_process_env();
695 	(void) _set_umask_env();
696 	_set_submit_dir_env();
697 
698 	/*
699 	 * Set up slurmctld message handler
700 	 */
701 	slurmctld_msg_init();
702 
703 	/*
704 	 * save process startup time to be used with -I<timeout>
705 	 */
706 	srun_begin_time = time(NULL);
707 }
708 
709 /*
710  * Modify options for a job step (after job allocaiton is complete
711  */
_set_step_opts(slurm_opt_t * opt_local)712 static void _set_step_opts(slurm_opt_t *opt_local)
713 {
714 	srun_opt_t *srun_opt = opt_local->srun_opt;
715 	xassert(srun_opt);
716 
717 	opt_local->time_limit = NO_VAL;/* not applicable for step, only job */
718 	xfree(opt_local->constraint);	/* not applicable for this step */
719 	if ((srun_opt->core_spec_set || srun_opt->exclusive)
720 	    && opt_local->cpus_set) {
721 		/* Step gets specified CPU count, which may only part
722 		 * of the job allocation. */
723 		srun_opt->exclusive = true;
724 	} else {
725 		/* Step gets all CPUs in the job allocation. */
726 		srun_opt->exclusive = false;
727 	}
728 }
729 
730 /*
731  * Create the job step(s). For a heterogeneous job, each step is requested in
732  * a separate RPC. create_job_step() references "opt", so we need to match up
733  * the job allocation request with its requested options.
734  */
_create_job_step(srun_job_t * job,bool use_all_cpus,List srun_job_list,uint32_t het_job_id,char * het_job_nodelist)735 static int _create_job_step(srun_job_t *job, bool use_all_cpus,
736 			    List srun_job_list, uint32_t het_job_id,
737 			    char *het_job_nodelist)
738 {
739 	ListIterator opt_iter = NULL, job_iter;
740 	slurm_opt_t *opt_local = &opt;
741 	uint32_t node_offset = 0, het_job_nnodes = 0, step_id = NO_VAL;
742 	uint32_t het_job_ntasks = 0, task_offset = 0;
743 
744 	job_step_create_response_msg_t *step_resp;
745 	char *resv_ports = NULL;
746 	int rc = 0;
747 
748 	if (srun_job_list) {
749 		if (opt_list)
750 			opt_iter = list_iterator_create(opt_list);
751 		job_iter = list_iterator_create(srun_job_list);
752 		while ((job = list_next(job_iter))) {
753 			if (het_job_id)
754 				job->het_job_id = het_job_id;
755 			job->stepid = NO_VAL;
756 			het_job_nnodes += job->nhosts;
757 			het_job_ntasks += job->ntasks;
758 		}
759 
760 		list_iterator_reset(job_iter);
761 		while ((job = list_next(job_iter))) {
762 			if (opt_list)
763 				opt_local = list_next(opt_iter);
764 			if (!opt_local)
765 				fatal("%s: opt_list too short", __func__);
766 			job->het_job_node_offset = node_offset;
767 			job->het_job_nnodes = het_job_nnodes;
768 			job->het_job_ntasks = het_job_ntasks;
769 			job->het_job_task_offset = task_offset;
770 			if (step_id != NO_VAL)
771 				job->stepid = step_id;
772 			rc = create_job_step(job, use_all_cpus, opt_local);
773 			if (rc < 0)
774 				break;
775 			if (step_id == NO_VAL)
776 				step_id = job->stepid;
777 
778 			if ((slurm_step_ctx_get(job->step_ctx,
779 						SLURM_STEP_CTX_RESP,
780 						&step_resp) == SLURM_SUCCESS) &&
781 			    step_resp->resv_ports &&
782 			    strcmp(step_resp->resv_ports, "(null)")) {
783 				if (resv_ports)
784 					xstrcat(resv_ports, ",");
785 				xstrcat(resv_ports, step_resp->resv_ports);
786 			}
787 			node_offset += job->nhosts;
788 			task_offset += job->ntasks;
789 		}
790 
791 		if (resv_ports) {
792 			/*
793 			 * Merge numeric values into single range
794 			 * (e.g. "10-12,13-15,16-18" -> "10-18")
795 			 */
796 			hostset_t hs;
797 			char *tmp = NULL, *sep;
798 			xstrfmtcat(tmp, "[%s]", resv_ports);
799 			hs = hostset_create(tmp);
800 			hostset_ranged_string(hs, strlen(tmp) + 1, tmp);
801 			sep = strchr(tmp, ']');
802 			if (sep)
803 				sep[0] = '\0';
804 			xfree(resv_ports);
805 			resv_ports = xstrdup(tmp + 1);
806 			xfree(tmp);
807 			hostset_destroy(hs);
808 
809 			list_iterator_reset(job_iter);
810 			while ((job = list_next(job_iter))) {
811 				if (slurm_step_ctx_get(job->step_ctx,
812 						SLURM_STEP_CTX_RESP,
813 						&step_resp) == SLURM_SUCCESS) {
814 					xfree(step_resp->resv_ports);
815 					step_resp->resv_ports =
816 						xstrdup(resv_ports);
817 				}
818 			}
819 			xfree(resv_ports);
820 		}
821 		list_iterator_destroy(job_iter);
822 		if (opt_iter)
823 			list_iterator_destroy(opt_iter);
824 		return rc;
825 	} else if (job) {
826 		if (het_job_id) {
827 			job->het_job_id  = het_job_id;
828 			job->het_job_nnodes = job->nhosts;
829 			job->het_job_ntasks = job->ntasks;
830 			job->het_job_task_offset = 0;
831 		}
832 		return create_job_step(job, use_all_cpus, &opt);
833 	} else {
834 		return -1;
835 	}
836 }
837 
_cancel_steps(List srun_job_list)838 static void _cancel_steps(List srun_job_list)
839 {
840 	srun_job_t *job;
841 	ListIterator job_iter;
842 	slurm_msg_t req;
843 	step_complete_msg_t msg;
844 	int rc = 0;
845 
846 	if (!srun_job_list)
847 		return;
848 
849 	slurm_msg_t_init(&req);
850 	req.msg_type = REQUEST_STEP_COMPLETE;
851 	req.data = &msg;
852 	memset(&msg, 0, sizeof(step_complete_msg_t));
853 	msg.step_rc = 0;
854 
855 	job_iter = list_iterator_create(srun_job_list);
856 	while ((job = list_next(job_iter))) {
857 		if (job->stepid == NO_VAL)
858 			continue;
859 		msg.job_id	= job->jobid;
860 		msg.job_step_id	= job->stepid;
861 		msg.range_first	= 0;
862 		msg.range_last	= job->nhosts - 1;
863 		(void) slurm_send_recv_controller_rc_msg(&req, &rc,
864 							 working_cluster_rec);
865 	}
866 	list_iterator_destroy(job_iter);
867 }
868 
_het_job_struct_del(void * x)869 static void _het_job_struct_del(void *x)
870 {
871 	het_job_resp_struct_t *het_job_resp = (het_job_resp_struct_t *) x;
872 	int i;
873 
874 	if (het_job_resp->alias_list) {
875 		for (i = 0; i < het_job_resp->node_cnt; i++)
876 			xfree(het_job_resp->alias_list[i]);
877 		xfree(het_job_resp->alias_list);
878 	}
879 	xfree(het_job_resp->cpu_cnt);
880 	if (het_job_resp->host_list)
881 		hostlist_destroy(het_job_resp->host_list);
882 	xfree(het_job_resp);
883 }
884 
_compress_het_job_nodelist(List used_resp_list)885 static char *_compress_het_job_nodelist(List used_resp_list)
886 {
887 	resource_allocation_response_msg_t *resp;
888 	het_job_resp_struct_t *het_job_resp;
889 	List het_job_resp_list;
890 	ListIterator resp_iter;
891 	char *aliases = NULL, *save_ptr = NULL, *tok, *tmp;
892 	char *het_job_nodelist = NULL, *node_name;
893 	hostset_t hs;
894 	int cnt, i, j, k, len = 0;
895 	uint16_t *cpus;
896 	uint32_t *reps, cpu_inx;
897 	bool have_aliases = false;
898 
899 	if (!used_resp_list)
900 		return het_job_nodelist;
901 
902 	cnt = list_count(used_resp_list);
903 	het_job_resp_list = list_create(_het_job_struct_del);
904 	hs = hostset_create("");
905 	resp_iter = list_iterator_create(used_resp_list);
906 	while ((resp = list_next(resp_iter))) {
907 		if (!resp->node_list)
908 			continue;
909 		len += strlen(resp->node_list);
910 		hostset_insert(hs, resp->node_list);
911 		het_job_resp = xmalloc(sizeof(het_job_resp_struct_t));
912 		het_job_resp->node_cnt = resp->node_cnt;
913 		/*
914 		 * alias_list contains <NodeName>:<NodeAddr>:<NodeHostName>
915 		 * values in comma separated list
916 		 */
917 		if (resp->alias_list) {
918 			have_aliases = true;
919 			het_job_resp->alias_list = xmalloc(sizeof(char *) *
920 							   resp->node_cnt);
921 			tmp = xstrdup(resp->alias_list);
922 			i = 0;
923 			tok = strtok_r(tmp, ",", &save_ptr);
924 			while (tok) {
925 				if (i >= resp->node_cnt) {
926 					fatal("%s: Invalid alias_list",
927 					      __func__);
928 				}
929 				het_job_resp->alias_list[i++] = xstrdup(tok);
930 				tok = strtok_r(NULL, ",", &save_ptr);
931 			}
932 			xfree(tmp);
933 		}
934 		het_job_resp->cpu_cnt =
935 			xmalloc(sizeof(uint16_t) * resp->node_cnt);
936 		het_job_resp->host_list = hostlist_create(resp->node_list);
937 		for (i = 0, k = 0;
938 		     (i < resp->num_cpu_groups) && (k < resp->node_cnt); i++) {
939 			for (j = 0; j < resp->cpu_count_reps[i]; j++) {
940 				het_job_resp->cpu_cnt[k++] =
941 					resp->cpus_per_node[i];
942 				if (k >= resp->node_cnt)
943 					break;
944 			}
945 			if (k >= resp->node_cnt)
946 				break;
947 		}
948 		list_append(het_job_resp_list, het_job_resp);
949 	}
950 	list_iterator_destroy(resp_iter);
951 
952 	len += (cnt + 16);
953 	het_job_nodelist = xmalloc(len);
954 	(void) hostset_ranged_string(hs, len, het_job_nodelist);
955 
956 	cpu_inx = 0;
957 	cnt = hostset_count(hs);
958 	cpus = xmalloc(sizeof(uint16_t) * (cnt + 1));
959 	reps = xmalloc(sizeof(uint32_t) * (cnt + 1));
960 	for (i = 0; i < cnt; i++) {
961 		node_name = hostset_nth(hs, i);
962 		resp_iter = list_iterator_create(het_job_resp_list);
963 		while ((het_job_resp = list_next(resp_iter))) {
964 			j = hostlist_find(het_job_resp->host_list, node_name);
965 			if ((j == -1) || !het_job_resp->cpu_cnt)
966 				continue;	/* node not in this hetjob */
967 			if (have_aliases) {
968 				if (aliases)
969 					xstrcat(aliases, ",");
970 				if (het_job_resp->alias_list &&
971 				    het_job_resp->alias_list[j]) {
972 					xstrcat(aliases,
973 						het_job_resp->alias_list[j]);
974 				} else {
975 					xstrfmtcat(aliases, "%s:%s:%s",
976 						   node_name, node_name,
977 						   node_name);
978 				}
979 			}
980 			if (cpus[cpu_inx] == het_job_resp->cpu_cnt[j]) {
981 				reps[cpu_inx]++;
982 			} else {
983 				if (cpus[cpu_inx] != 0)
984 					cpu_inx++;
985 				cpus[cpu_inx] = het_job_resp->cpu_cnt[j];
986 				reps[cpu_inx]++;
987 			}
988 			break;
989 		}
990 		list_iterator_destroy(resp_iter);
991 		free(node_name);
992 	}
993 
994 	cpu_inx++;
995 	tmp = uint32_compressed_to_str(cpu_inx, cpus, reps);
996 	if (setenv("SLURM_JOB_CPUS_PER_NODE", tmp, 1) < 0) {
997 		error("%s: Unable to set SLURM_JOB_CPUS_PER_NODE in environment",
998 		      __func__);
999 	}
1000 	xfree(tmp);
1001 
1002 	if (aliases) {
1003 		if (setenv("SLURM_NODE_ALIASES", aliases, 1) < 0) {
1004 			error("%s: Unable to set SLURM_NODE_ALIASES in environment",
1005 			      __func__);
1006 		}
1007 		xfree(aliases);
1008 	}
1009 
1010 	xfree(reps);
1011 	xfree(cpus);
1012 	hostset_destroy(hs);
1013 	list_destroy(het_job_resp_list);
1014 
1015 	return het_job_nodelist;
1016 }
1017 
create_srun_job(void ** p_job,bool * got_alloc,bool slurm_started,bool handle_signals)1018 extern void create_srun_job(void **p_job, bool *got_alloc,
1019 			    bool slurm_started, bool handle_signals)
1020 {
1021 	resource_allocation_response_msg_t *resp;
1022 	List job_resp_list = NULL, srun_job_list = NULL;
1023 	List used_resp_list = NULL;
1024 	ListIterator opt_iter, resp_iter;
1025 	srun_job_t *job = NULL;
1026 	int i, max_list_offset, max_het_job_offset, het_job_offset = -1,
1027 		het_step_offset = 0;
1028 	uint32_t my_job_id = 0, het_job_id = 0;
1029 	char *het_job_nodelist = NULL;
1030 	bool begin_error_logged = false;
1031 	bool core_spec_error_logged = false;
1032 #ifdef HAVE_NATIVE_CRAY
1033 	bool network_error_logged = false;
1034 #endif
1035 	bool node_cnt_error_logged = false;
1036 	bool x11_error_logged = false;
1037 
1038 	/*
1039 	 * now global "opt" should be filled in and available,
1040 	 * create a job from opt
1041 	 */
1042 	if (sropt.test_only) {
1043 		int rc = allocate_test();
1044 		if (rc) {
1045 			slurm_perror("allocation failure");
1046 			exit (1);
1047 		}
1048 		exit (0);
1049 
1050 	} else if (sropt.no_alloc) {
1051 		if (opt_list ||
1052 		    (sropt.het_grp_bits && (bit_fls(sropt.het_grp_bits) > 0)))
1053 			fatal("--no-allocation option not supported for heterogeneous jobs");
1054 		info("do not allocate resources");
1055 		job = job_create_noalloc();
1056 		if (job == NULL) {
1057 			error("Job creation failure.");
1058 			exit(error_exit);
1059 		}
1060 		if (create_job_step(job, false, &opt) < 0)
1061 			exit(error_exit);
1062 	} else if ((job_resp_list = existing_allocation())) {
1063 		slurm_opt_t *opt_local;
1064 
1065 		max_list_offset = 0;
1066 		max_het_job_offset = list_count(job_resp_list) - 1;
1067 		if (opt_list) {
1068 			opt_iter = list_iterator_create(opt_list);
1069 			while ((opt_local = list_next(opt_iter))) {
1070 				srun_opt_t *srun_opt = opt_local->srun_opt;
1071 				xassert(srun_opt);
1072 				if (srun_opt->het_grp_bits) {
1073 					i = bit_fls(srun_opt->het_grp_bits);
1074 					max_list_offset = MAX(max_list_offset,
1075 							      i);
1076 				}
1077 			}
1078 			list_iterator_destroy(opt_iter);
1079 			if (max_list_offset > max_het_job_offset) {
1080 				error("Attempt to run a job step with het group value of %d, "
1081 				      "but the job allocation has maximum value of %d",
1082 				      max_list_offset, max_het_job_offset);
1083 				exit(1);
1084 			}
1085 		}
1086 		srun_job_list = list_create(NULL);
1087 		used_resp_list = list_create(NULL);
1088 		if (max_het_job_offset > 0)
1089 			het_job_offset = 0;
1090 		resp_iter = list_iterator_create(job_resp_list);
1091 		while ((resp = list_next(resp_iter))) {
1092 			bool merge_nodelist = true;
1093 			if (my_job_id == 0) {
1094 				my_job_id = resp->job_id;
1095 				if (resp->working_cluster_rec)
1096 					slurm_setup_remote_working_cluster(resp);
1097 			}
1098 			_print_job_information(resp);
1099 			(void) get_next_opt(-2);
1100 			/*
1101 			 * Check using het_job_offset here, but we use
1102 			 * het_step_offset for the job being added.
1103 			 */
1104 			while ((opt_local = get_next_opt(het_job_offset))) {
1105 				srun_opt_t *srun_opt = opt_local->srun_opt;
1106 				xassert(srun_opt);
1107 				if (merge_nodelist) {
1108 					merge_nodelist = false;
1109 					list_append(used_resp_list, resp);
1110 				}
1111 				if (slurm_option_set_by_env(opt_local, 'N') &&
1112 				    (opt_local->min_nodes > resp->node_cnt)) {
1113 					/*
1114 					 * This signifies the job used the
1115 					 * --no-kill option and a node went DOWN
1116 					 * or it used a node count range
1117 					 * specification, was checkpointed from
1118 					 * one size and restarted at a different
1119 					 * size
1120 					 */
1121 					if (!node_cnt_error_logged) {
1122 						error("SLURM_JOB_NUM_NODES environment variable conflicts with allocated node count (%u != %u).",
1123 						      opt_local->min_nodes,
1124 						      resp->node_cnt);
1125 						node_cnt_error_logged = true;
1126 					}
1127 					/*
1128 					 * Modify options to match resource
1129 					 * allocation.
1130 					 * NOTE: Some options are not supported
1131 					 */
1132 					opt_local->min_nodes = resp->node_cnt;
1133 					xfree(srun_opt->alloc_nodelist);
1134 					if (!opt_local->ntasks_set) {
1135 						opt_local->ntasks =
1136 							opt_local->min_nodes;
1137 					}
1138 				}
1139 				if (srun_opt->core_spec_set &&
1140 				    !core_spec_error_logged) {
1141 					/*
1142 					 * NOTE: Silently ignore specialized
1143 					 * core count set with SLURM_CORE_SPEC
1144 					 * environment variable
1145 					 */
1146 					error("Ignoring --core-spec value for a job step "
1147 					      "within an existing job. Set specialized cores "
1148 					      "at job allocation time.");
1149 					core_spec_error_logged = true;
1150 				}
1151 #ifdef HAVE_NATIVE_CRAY
1152 				if (opt_local->network &&
1153 				    !network_error_logged) {
1154 					if (slurm_option_set_by_env(opt_local,
1155 								    LONG_OPT_NETWORK)) {
1156 						debug2("Ignoring SLURM_NETWORK value for a "
1157 						       "job step within an existing job. "
1158 						       "Using what was set at job "
1159 						       "allocation time.  Most likely this "
1160 						       "variable was set by sbatch or salloc.");
1161 					} else {
1162 						error("Ignoring --network value for a job step "
1163 						      "within an existing job. Set network "
1164 						      "options at job allocation time.");
1165 					}
1166 					network_error_logged = true;
1167 				}
1168 				xfree(opt_local->network);
1169 				/*
1170 				 * Here we send the het job groups to the
1171 				 * slurmctld to set up the interconnect
1172 				 * correctly.  We only ever need to send it to
1173 				 * the first component of the step.
1174 				 */
1175 				if (g_het_grp_bits)
1176 					opt_local->network = bit_fmt_hexmask(
1177 						g_het_grp_bits);
1178 #endif
1179 
1180 				if (srun_opt->exclusive)
1181 					_step_opt_exclusive(opt_local);
1182 				_set_env_vars(resp, het_step_offset);
1183 				if (_validate_relative(resp, opt_local))
1184 					exit(error_exit);
1185 				if (opt_local->begin && !begin_error_logged) {
1186 					error("--begin is ignored because nodes are already allocated.");
1187 					begin_error_logged = true;
1188 				}
1189 				if (opt_local->x11 && !x11_error_logged) {
1190 					error("Ignoring --x11 option for a job step within an "
1191 					      "existing job. Set x11 options at job allocation time.");
1192 					x11_error_logged = true;
1193 				}
1194 				job = job_step_create_allocation(resp,
1195 								 opt_local);
1196 				if (!job)
1197 					exit(error_exit);
1198 				if (max_het_job_offset > 0)
1199 					job->het_job_offset = het_step_offset;
1200 				list_append(srun_job_list, job);
1201 				het_step_offset++;
1202 			}	/* While more option structures */
1203 			het_job_offset++;
1204 		}	/* More hetjob components */
1205 		list_iterator_destroy(resp_iter);
1206 
1207 		max_het_job_offset = get_max_het_group();
1208 		het_job_offset = list_count(job_resp_list) - 1;
1209 		if (max_het_job_offset > het_job_offset) {
1210 			error("Requested het-group offset exceeds highest hetjob index (%d > %d)",
1211 			      max_het_job_offset, het_job_offset);
1212 			exit(error_exit);
1213 		}
1214 		i = list_count(srun_job_list);
1215 		if (i == 0) {
1216 			error("No directives to start application on any available hetjob components");
1217 			exit(error_exit);
1218 		}
1219 		if (i == 1)
1220 			FREE_NULL_LIST(srun_job_list);	/* Just use "job" */
1221 		if (list_count(job_resp_list) > 1) {
1222 			if (my_job_id)
1223 				het_job_id = my_job_id;
1224 			het_job_nodelist =
1225 				_compress_het_job_nodelist(used_resp_list);
1226 		}
1227 		list_destroy(used_resp_list);
1228 		if (_create_job_step(job, false, srun_job_list, het_job_id,
1229 				     het_job_nodelist) < 0) {
1230 			if (*got_alloc)
1231 				slurm_complete_job(my_job_id, 1);
1232 			else
1233 				_cancel_steps(srun_job_list);
1234 			exit(error_exit);
1235 		}
1236 		xfree(het_job_nodelist);
1237 	} else {
1238 		/* Combined job allocation and job step launch */
1239 #if defined HAVE_FRONT_END
1240 		uid_t my_uid = getuid();
1241 		if ((my_uid != 0) &&
1242 		    (my_uid != slurm_get_slurm_user_id())) {
1243 			error("srun task launch not supported on this system");
1244 			exit(error_exit);
1245 		}
1246 #endif
1247 		if (slurm_option_set_by_cli(&opt, 'J'))
1248 			setenvfs("SLURM_JOB_NAME=%s", opt.job_name);
1249 		else if (!slurm_option_set_by_env(&opt, 'J') && sropt.argc)
1250 			setenvfs("SLURM_JOB_NAME=%s", sropt.argv[0]);
1251 
1252 		if (opt_list) {
1253 			job_resp_list = allocate_het_job_nodes(handle_signals);
1254 			if (!job_resp_list)
1255 				exit(error_exit);
1256 			srun_job_list = list_create(NULL);
1257 			opt_iter  = list_iterator_create(opt_list);
1258 			resp_iter = list_iterator_create(job_resp_list);
1259 			while ((resp = list_next(resp_iter))) {
1260 				slurm_opt_t *opt_local;
1261 
1262 				if (my_job_id == 0) {
1263 					my_job_id = resp->job_id;
1264 					*got_alloc = true;
1265 				}
1266 				opt_local = list_next(opt_iter);
1267 				if (!opt_local)
1268 					break;
1269 				_print_job_information(resp);
1270 				_set_env_vars(resp, ++het_job_offset);
1271 				_set_env_vars2(resp, het_job_offset);
1272 				if (_validate_relative(resp, opt_local)) {
1273 					slurm_complete_job(my_job_id, 1);
1274 					exit(error_exit);
1275 				}
1276 				job = job_create_allocation(resp, opt_local);
1277 				job->het_job_offset = het_job_offset;
1278 				list_append(srun_job_list, job);
1279 				_set_step_opts(opt_local);
1280 			}
1281 			list_iterator_destroy(opt_iter);
1282 			list_iterator_destroy(resp_iter);
1283 			/* Continue support for old hetjob terminology. */
1284 			setenvfs("SLURM_PACK_SIZE=%d", het_job_offset + 1);
1285 			setenvfs("SLURM_HET_SIZE=%d", het_job_offset + 1);
1286 		} else {
1287 			if (!(resp = allocate_nodes(handle_signals, &opt)))
1288 				exit(error_exit);
1289 			*got_alloc = true;
1290 			my_job_id = resp->job_id;
1291 			_print_job_information(resp);
1292 			_set_env_vars(resp, -1);
1293 			if (_validate_relative(resp, &opt)) {
1294 				slurm_complete_job(resp->job_id, 1);
1295 				exit(error_exit);
1296 			}
1297 			job = job_create_allocation(resp, &opt);
1298 			_set_step_opts(&opt);
1299 		}
1300 		if (srun_job_list && (list_count(srun_job_list) > 1) &&
1301 		    opt_list && (list_count(opt_list) > 1) && my_job_id) {
1302 			het_job_id = my_job_id;
1303 			het_job_nodelist =
1304 				_compress_het_job_nodelist(job_resp_list);
1305 		}
1306 
1307 		/*
1308 		 *  Become --uid user
1309 		 */
1310 		if (_become_user () < 0)
1311 			fatal("Unable to assume uid=%u", opt.uid);
1312 		if (_create_job_step(job, true, srun_job_list, het_job_id,
1313 				     het_job_nodelist) < 0) {
1314 			slurm_complete_job(my_job_id, 1);
1315 			exit(error_exit);
1316 		}
1317 		xfree(het_job_nodelist);
1318 
1319 		if (opt_list) {
1320 			resp_iter = list_iterator_create(job_resp_list);
1321 			while ((resp = list_next(resp_iter))) {
1322 				slurm_free_resource_allocation_response_msg(
1323 									resp);
1324 			}
1325 			list_iterator_destroy(resp_iter);
1326 		} else {
1327 			slurm_free_resource_allocation_response_msg(resp);
1328 		}
1329 	}
1330 
1331 	/*
1332 	 *  Become --uid user
1333 	 */
1334 	if (_become_user () < 0)
1335 		fatal("Unable to assume uid=%u", opt.uid);
1336 
1337 	if (!slurm_started) {
1338 		/*
1339 		 * Spawn process to ensure clean-up of job and/or step
1340 		 * on abnormal termination
1341 		 */
1342 		shepherd_fd = _shepherd_spawn(job, srun_job_list, *got_alloc);
1343 	}
1344 
1345 	if (opt_list)
1346 		*p_job = (void *) srun_job_list;
1347 	else
1348 		*p_job = (void *) job;
1349 
1350 	if (job)
1351 	        _srun_cli_filter_post_submit(my_job_id, job->stepid);
1352 }
1353 
pre_launch_srun_job(srun_job_t * job,bool slurm_started,bool handle_signals,slurm_opt_t * opt_local)1354 extern void pre_launch_srun_job(srun_job_t *job, bool slurm_started,
1355 				bool handle_signals, slurm_opt_t *opt_local)
1356 {
1357 	if (handle_signals && !signal_thread) {
1358 		slurm_thread_create(&signal_thread, _srun_signal_mgr, job);
1359 	}
1360 
1361 	/* if running from poe This already happened in srun. */
1362 	if (slurm_started)
1363 		return;
1364 
1365 	_run_srun_prolog(job);
1366 	if (_call_spank_local_user(job, opt_local) < 0) {
1367 		error("Failure in local plugin stack");
1368 		slurm_step_launch_abort(job->step_ctx);
1369 		exit(error_exit);
1370 	}
1371 
1372 	env_array_merge(&job->env, (const char **)environ);
1373 }
1374 
fini_srun(srun_job_t * job,bool got_alloc,uint32_t * global_rc,bool slurm_started)1375 extern void fini_srun(srun_job_t *job, bool got_alloc, uint32_t *global_rc,
1376 		      bool slurm_started)
1377 {
1378 	/* If running from poe, most of this already happened in srun. */
1379 	if (slurm_started)
1380 		goto cleanup;
1381 	if (got_alloc) {
1382 		cleanup_allocation();
1383 
1384 		/* Tell slurmctld that we were cancelled */
1385 		if (job->state >= SRUN_JOB_CANCELLED)
1386 			slurm_complete_job(job->jobid, NO_VAL);
1387 		else
1388 			slurm_complete_job(job->jobid, *global_rc);
1389 	}
1390 	_shepherd_notify(shepherd_fd);
1391 
1392 cleanup:
1393 	if (signal_thread) {
1394 		srun_shutdown = true;
1395 		pthread_kill(signal_thread, SIGINT);
1396 		pthread_join(signal_thread,  NULL);
1397 	}
1398 
1399 	if (!slurm_started)
1400 		_run_srun_epilog(job);
1401 
1402 	slurm_step_ctx_destroy(job->step_ctx);
1403 
1404 	if (WIFEXITED(*global_rc))
1405 		*global_rc = WEXITSTATUS(*global_rc);
1406 	else if (WIFSIGNALED(*global_rc))
1407 		*global_rc = 128 + WTERMSIG(*global_rc);
1408 
1409 	mpir_cleanup();
1410 }
1411 
1412 void
update_job_state(srun_job_t * job,srun_job_state_t state)1413 update_job_state(srun_job_t *job, srun_job_state_t state)
1414 {
1415 	slurm_mutex_lock(&job->state_mutex);
1416 	if (job->state < state) {
1417 		job->state = state;
1418 		slurm_cond_signal(&job->state_cond);
1419 
1420 	}
1421 	slurm_mutex_unlock(&job->state_mutex);
1422 	return;
1423 }
1424 
1425 srun_job_state_t
job_state(srun_job_t * job)1426 job_state(srun_job_t *job)
1427 {
1428 	srun_job_state_t state;
1429 	slurm_mutex_lock(&job->state_mutex);
1430 	state = job->state;
1431 	slurm_mutex_unlock(&job->state_mutex);
1432 	return state;
1433 }
1434 
1435 
1436 void
job_force_termination(srun_job_t * job)1437 job_force_termination(srun_job_t *job)
1438 {
1439 	static int kill_sent = 0;
1440 	static time_t last_msg = 0;
1441 
1442 	if (kill_sent == 0) {
1443 		info("forcing job termination");
1444 		/* Sends SIGKILL to tasks directly */
1445 		update_job_state(job, SRUN_JOB_FORCETERM);
1446 	} else {
1447 		time_t now = time(NULL);
1448 		if (last_msg != now) {
1449 			info("job abort in progress");
1450 			last_msg = now;
1451 		}
1452 		if (kill_sent == 1) {
1453 			/* Try sending SIGKILL through slurmctld */
1454 			slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
1455 		}
1456 	}
1457 	kill_sent++;
1458 }
1459 
_set_ntasks(allocation_info_t * ai,slurm_opt_t * opt_local)1460 static void _set_ntasks(allocation_info_t *ai, slurm_opt_t *opt_local)
1461 {
1462 	int cnt = 0;
1463 
1464 	if (opt_local->ntasks_set)
1465 		return;
1466 
1467 	if (opt_local->ntasks_per_node != NO_VAL) {
1468 		cnt = ai->nnodes * opt_local->ntasks_per_node;
1469 		opt_local->ntasks_set = true;	/* implicit */
1470 	} else if (opt_local->cpus_set) {
1471 		int i;
1472 
1473 		for (i = 0; i < ai->num_cpu_groups; i++)
1474 			cnt += (ai->cpu_count_reps[i] *
1475 				(ai->cpus_per_node[i] /
1476 				 opt_local->cpus_per_task));
1477 		opt_local->ntasks_set = true;	/* implicit */
1478 	}
1479 
1480 	opt_local->ntasks = (cnt < ai->nnodes) ? ai->nnodes : cnt;
1481 }
1482 
1483 /*
1484  * Create an srun job structure from a resource allocation response msg
1485  */
_job_create_structure(allocation_info_t * ainfo,slurm_opt_t * opt_local)1486 static srun_job_t *_job_create_structure(allocation_info_t *ainfo,
1487 					 slurm_opt_t *opt_local)
1488 {
1489 	srun_job_t *job = xmalloc(sizeof(srun_job_t));
1490 	int i;
1491 
1492 	_set_ntasks(ainfo, opt_local);
1493 	debug2("creating job with %d tasks", opt_local->ntasks);
1494 
1495 	slurm_mutex_init(&job->state_mutex);
1496 	slurm_cond_init(&job->state_cond, NULL);
1497 	job->state = SRUN_JOB_INIT;
1498 
1499  	job->alias_list = xstrdup(ainfo->alias_list);
1500  	job->nodelist = xstrdup(ainfo->nodelist);
1501  	job->partition = xstrdup(ainfo->partition);
1502 	job->stepid  = ainfo->stepid;
1503 	job->het_job_id  = NO_VAL;
1504 	job->het_job_nnodes = NO_VAL;
1505 	job->het_job_ntasks = NO_VAL;
1506  	job->het_job_offset = NO_VAL;
1507 	job->het_job_task_offset = NO_VAL;
1508 	job->nhosts   = ainfo->nnodes;
1509 
1510 #if defined HAVE_FRONT_END
1511 	/* Limited job step support */
1512 	opt_local->overcommit = true;
1513 #else
1514 	if (opt_local->min_nodes > job->nhosts) {
1515 		error("Only allocated %d nodes asked for %d",
1516 		      job->nhosts, opt_local->min_nodes);
1517 		if (opt_local->exclude) {
1518 			/* When resources are pre-allocated and some nodes
1519 			 * are explicitly excluded, this error can occur. */
1520 			error("Are required nodes explicitly excluded?");
1521 		}
1522 		xfree(job);
1523 		return NULL;
1524 	}
1525 	if ((ainfo->cpus_per_node == NULL) ||
1526 	    (ainfo->cpu_count_reps == NULL)) {
1527 		error("cpus_per_node array is not set");
1528 		xfree(job);
1529 		return NULL;
1530 	}
1531 #endif
1532 	job->select_jobinfo = ainfo->select_jobinfo;
1533 	job->jobid   = ainfo->jobid;
1534 
1535 	job->ntasks  = opt_local->ntasks;
1536 	job->ntasks_per_board = ainfo->ntasks_per_board;
1537 	job->ntasks_per_core = ainfo->ntasks_per_core;
1538 	job->ntasks_per_socket = ainfo->ntasks_per_socket;
1539 
1540 	/*
1541 	 * If cpus_per_task is set then get the exact count of cpus for the
1542 	 * requested step (we might very well use less, especially if
1543 	 * --exclusive is used).  Else get the total for the allocation given.
1544 	 */
1545 	if (opt_local->cpus_set)
1546 		job->cpu_count = opt_local->ntasks * opt_local->cpus_per_task;
1547 	else {
1548 		for (i = 0; i < ainfo->num_cpu_groups; i++) {
1549 			job->cpu_count += ainfo->cpus_per_node[i] *
1550 				ainfo->cpu_count_reps[i];
1551 		}
1552 	}
1553 
1554 	job->rc       = -1;
1555 
1556 	job_update_io_fnames(job, opt_local);
1557 
1558 	return (job);
1559 }
1560 
job_update_io_fnames(srun_job_t * job,slurm_opt_t * opt_local)1561 extern void job_update_io_fnames(srun_job_t *job, slurm_opt_t *opt_local)
1562 {
1563 	job->ifname = fname_create(job, opt_local->ifname, opt_local->ntasks);
1564 	job->ofname = fname_create(job, opt_local->ofname, opt_local->ntasks);
1565 	job->efname = opt_local->efname ?
1566 		      fname_create(job, opt_local->efname, opt_local->ntasks) :
1567 		      job->ofname;
1568 }
1569 
1570 static char *
_normalize_hostlist(const char * hostlist)1571 _normalize_hostlist(const char *hostlist)
1572 {
1573 	char *buf = NULL;
1574 	hostlist_t hl = hostlist_create(hostlist);
1575 
1576 	if (hl)	{
1577 		buf = hostlist_ranged_string_xmalloc(hl);
1578 		hostlist_destroy(hl);
1579 	}
1580 	if (!buf)
1581 		return xstrdup(hostlist);
1582 
1583 	return buf;
1584 }
1585 
_become_user(void)1586 static int _become_user (void)
1587 {
1588 	char *user;
1589 
1590 	/* Already the user, so there's nothing to change. Return early. */
1591 	if (opt.uid == getuid())
1592 		return 0;
1593 
1594 	if (!(user = uid_to_string_or_null(opt.uid))) {
1595 		xfree(user);
1596 		return (error ("Invalid user id %u: %m", opt.uid));
1597 	}
1598 
1599 	if ((opt.gid != getgid()) && (setgid(opt.gid) < 0)) {
1600 		xfree(user);
1601 		return (error ("setgid: %m"));
1602 	}
1603 
1604 	if (initgroups(user, gid_from_uid(opt.uid)))
1605 		return (error ("initgroups: %m"));
1606 
1607 	xfree(user);
1608 
1609 	if (setuid (opt.uid) < 0)
1610 		return (error ("setuid: %m"));
1611 
1612 	return (0);
1613 }
1614 
_call_spank_local_user(srun_job_t * job,slurm_opt_t * opt_local)1615 static int _call_spank_local_user(srun_job_t *job, slurm_opt_t *opt_local)
1616 {
1617 	srun_opt_t *srun_opt = opt_local->srun_opt;
1618 	struct spank_launcher_job_info info[1];
1619 	xassert(srun_opt);
1620 
1621 	info->argc = srun_opt->argc;
1622 	info->argv = srun_opt->argv;
1623 	info->gid	= opt_local->gid;
1624 	info->jobid	= job->jobid;
1625 	info->stepid	= job->stepid;
1626 	info->step_layout = launch_common_get_slurm_step_layout(job);
1627 	info->uid	= opt_local->uid;
1628 
1629 	return spank_local_user(info);
1630 }
1631 
_default_sigaction(int sig)1632 static void _default_sigaction(int sig)
1633 {
1634 	struct sigaction act;
1635 	if (sigaction(sig, NULL, &act)) {
1636 		error("sigaction(%d): %m", sig);
1637 		return;
1638 	}
1639 	if (act.sa_handler != SIG_IGN)
1640 		return;
1641 
1642 	act.sa_handler = SIG_DFL;
1643 	if (sigaction(sig, &act, NULL))
1644 		error("sigaction(%d): %m", sig);
1645 }
1646 
1647 /* Return the number of microseconds between tv1 and tv2 with a maximum
1648  * a maximum value of 10,000,000 to prevent overflows */
_diff_tv_str(struct timeval * tv1,struct timeval * tv2)1649 static long _diff_tv_str(struct timeval *tv1, struct timeval *tv2)
1650 {
1651 	long delta_t;
1652 
1653 	delta_t  = MIN((tv2->tv_sec - tv1->tv_sec), 10);
1654 	delta_t *= USEC_IN_SEC;
1655 	delta_t +=  tv2->tv_usec - tv1->tv_usec;
1656 	return delta_t;
1657 }
1658 
_handle_intr(srun_job_t * job)1659 static void _handle_intr(srun_job_t *job)
1660 {
1661 	static struct timeval last_intr = { 0, 0 };
1662 	static struct timeval last_intr_sent = { 0, 0 };
1663 	struct timeval now;
1664 
1665 	gettimeofday(&now, NULL);
1666 	if (!sropt.quit_on_intr && (_diff_tv_str(&last_intr, &now) > 1000000)) {
1667 		if (sropt.disable_status) {
1668 			info("sending Ctrl-C to job %u.%u",
1669 			     job->jobid, job->stepid);
1670 			launch_g_fwd_signal(SIGINT);
1671 		} else if (job->state < SRUN_JOB_FORCETERM) {
1672 			info("interrupt (one more within 1 sec to abort)");
1673 			launch_g_print_status();
1674 		} else {
1675 			info("interrupt (abort already in progress)");
1676 			launch_g_print_status();
1677 		}
1678 		last_intr = now;
1679 	} else  { /* second Ctrl-C in half as many seconds */
1680 		update_job_state(job, SRUN_JOB_CANCELLED);
1681 		/* terminate job */
1682 		if (job->state < SRUN_JOB_FORCETERM) {
1683 			if (_diff_tv_str(&last_intr_sent, &now) < 1000000) {
1684 				job_force_termination(job);
1685 				launch_g_fwd_signal(SIGKILL);
1686 				return;
1687 			}
1688 
1689 			info("sending Ctrl-C to job %u.%u",
1690 			     job->jobid, job->stepid);
1691 			last_intr_sent = now;
1692 			launch_g_fwd_signal(SIGINT);
1693 		} else
1694 			job_force_termination(job);
1695 
1696 		launch_g_fwd_signal(SIGKILL);
1697 	}
1698 }
1699 
_handle_pipe(void)1700 static void _handle_pipe(void)
1701 {
1702 	static int ending = 0;
1703 
1704 	if (ending)
1705 		return;
1706 	ending = 1;
1707 	launch_g_fwd_signal(SIGKILL);
1708 }
1709 
1710 
_print_job_information(resource_allocation_response_msg_t * resp)1711 static void _print_job_information(resource_allocation_response_msg_t *resp)
1712 {
1713 	int i;
1714 	char *str = NULL;
1715 	char *sep = "";
1716 
1717 	if (!opt.verbose)
1718 		return;
1719 
1720 	xstrfmtcat(str, "jobid %u: nodes(%u):`%s', cpu counts: ",
1721 		   resp->job_id, resp->node_cnt, resp->node_list);
1722 
1723 	for (i = 0; i < resp->num_cpu_groups; i++) {
1724 		xstrfmtcat(str, "%s%u(x%u)",
1725 			   sep, resp->cpus_per_node[i],
1726 			   resp->cpu_count_reps[i]);
1727 		sep = ",";
1728 	}
1729 	verbose("%s", str);
1730 	xfree(str);
1731 }
1732 
1733 /* NOTE: Executed once for entire hetjob */
_run_srun_epilog(srun_job_t * job)1734 static void _run_srun_epilog (srun_job_t *job)
1735 {
1736 	int rc;
1737 
1738 	if (sropt.epilog && xstrcasecmp(sropt.epilog, "none") != 0) {
1739 		if (setenvf(NULL, "SLURM_SCRIPT_CONTEXT", "epilog_srun") < 0)
1740 			error("unable to set SLURM_SCRIPT_CONTEXT in environment");
1741 		rc = _run_srun_script(job, sropt.epilog);
1742 		debug("srun epilog rc = %d", rc);
1743 	}
1744 }
1745 
_run_srun_prolog(srun_job_t * job)1746 static void _run_srun_prolog (srun_job_t *job)
1747 {
1748 	int rc;
1749 
1750 	if (sropt.prolog && xstrcasecmp(sropt.prolog, "none") != 0) {
1751 		if (setenvf(NULL, "SLURM_SCRIPT_CONTEXT", "prolog_srun") < 0)
1752 			error("unable to set SLURM_SCRIPT_CONTEXT in environment");
1753 		rc = _run_srun_script(job, sropt.prolog);
1754 		debug("srun prolog rc = %d", rc);
1755 	}
1756 }
1757 
_run_srun_script(srun_job_t * job,char * script)1758 static int _run_srun_script (srun_job_t *job, char *script)
1759 {
1760 	int status;
1761 	pid_t cpid;
1762 	int i;
1763 	char **args = NULL;
1764 
1765 	if (script == NULL || script[0] == '\0')
1766 		return 0;
1767 
1768 	if (access(script, R_OK | X_OK) < 0) {
1769 		info("Access denied for %s: %m", script);
1770 		return 0;
1771 	}
1772 
1773 	if ((cpid = fork()) < 0) {
1774 		error ("run_srun_script: fork: %m");
1775 		return -1;
1776 	}
1777 	if (cpid == 0) {
1778 		/*
1779 		 * set the prolog/epilog scripts command line arguments to the
1780 		 * application arguments (for last hetjob component), but
1781 		 * shifted one higher
1782 		 */
1783 		args = xmalloc(sizeof(char *) * 1024);
1784 		args[0] = script;
1785 		for (i = 0; i < sropt.argc; i++) {
1786 			args[i+1] = sropt.argv[i];
1787 		}
1788 		args[i+1] = NULL;
1789 		execv(script, args);
1790 		error("help! %m");
1791 		_exit(127);
1792 	}
1793 
1794 	do {
1795 		if (waitpid(cpid, &status, 0) < 0) {
1796 			if (errno == EINTR)
1797 				continue;
1798 			error("waitpid: %m");
1799 			return 0;
1800 		} else
1801 			return status;
1802 	} while(1);
1803 
1804 	/* NOTREACHED */
1805 }
1806 
_build_key(char * base,int het_job_offset)1807 static char *_build_key(char *base, int het_job_offset)
1808 {
1809 	char *key = NULL;
1810 
1811 	if (het_job_offset == -1)
1812 		key = xstrdup(base);
1813 	else
1814 		xstrfmtcat(key, "%s_PACK_GROUP_%d", base, het_job_offset);
1815 
1816 	return key;
1817 }
1818 
_set_env_vars(resource_allocation_response_msg_t * resp,int het_job_offset)1819 static void _set_env_vars(resource_allocation_response_msg_t *resp,
1820 			  int het_job_offset)
1821 {
1822 	char *key, *value, *tmp;
1823 	int i;
1824 
1825 	key = _build_key("SLURM_JOB_CPUS_PER_NODE", het_job_offset);
1826 	if (!getenv(key)) {
1827 		tmp = uint32_compressed_to_str(resp->num_cpu_groups,
1828 					       resp->cpus_per_node,
1829 					       resp->cpu_count_reps);
1830 		if (setenvf(NULL, key, "%s", tmp) < 0)
1831 			error("unable to set %s in environment", key);
1832 		xfree(tmp);
1833 	}
1834 	xfree(key);
1835 
1836 	key = _build_key("SLURM_NODE_ALIASES", het_job_offset);
1837 	if (resp->alias_list) {
1838 		if (setenv(key, resp->alias_list, 1) < 0)
1839 			error("unable to set %s in environment", key);
1840 	} else {
1841 		unsetenv(key);
1842 	}
1843 	xfree(key);
1844 
1845 	if (resp->env_size) {	/* Used to set Burst Buffer environment */
1846 		for (i = 0; i < resp->env_size; i++) {
1847 			tmp = xstrdup(resp->environment[i]);
1848 			key = tmp;
1849 			value = strchr(tmp, '=');
1850 			if (value) {
1851 				value[0] = '\0';
1852 				value++;
1853 				setenv(key, value, 0);
1854 			}
1855 			xfree(tmp);
1856 		}
1857 	}
1858 
1859 	return;
1860 }
1861 
1862 /*
1863  * Set some hetjob environment variables for combined job & step allocation
1864  */
_set_env_vars2(resource_allocation_response_msg_t * resp,int het_job_offset)1865 static void _set_env_vars2(resource_allocation_response_msg_t *resp,
1866 			   int het_job_offset)
1867 {
1868 	char *key;
1869 
1870 	if (resp->account) {
1871 		key = _build_key("SLURM_JOB_ACCOUNT", het_job_offset);
1872 		if (!getenv(key) &&
1873 		    (setenvf(NULL, key, "%s", resp->account) < 0)) {
1874 			error("unable to set %s in environment", key);
1875 		}
1876 		xfree(key);
1877 	}
1878 
1879 	key = _build_key("SLURM_JOB_ID", het_job_offset);
1880 	if (!getenv(key) &&
1881 	    (setenvf(NULL, key, "%u", resp->job_id) < 0)) {
1882 		error("unable to set %s in environment", key);
1883 	}
1884 	xfree(key);
1885 
1886 	key = _build_key("SLURM_JOB_NODELIST", het_job_offset);
1887 	if (!getenv(key) &&
1888 	    (setenvf(NULL, key, "%s", resp->node_list) < 0)) {
1889 		error("unable to set %s in environment", key);
1890 	}
1891 	xfree(key);
1892 
1893 	key = _build_key("SLURM_JOB_PARTITION", het_job_offset);
1894 	if (!getenv(key) &&
1895 	    (setenvf(NULL, key, "%s", resp->partition) < 0)) {
1896 		error("unable to set %s in environment", key);
1897 	}
1898 	xfree(key);
1899 
1900 	if (resp->qos) {
1901 		key = _build_key("SLURM_JOB_QOS", het_job_offset);
1902 		if (!getenv(key) &&
1903 		    (setenvf(NULL, key, "%s", resp->qos) < 0)) {
1904 			error("unable to set %s in environment", key);
1905 		}
1906 		xfree(key);
1907 	}
1908 
1909 	if (resp->resv_name) {
1910 		key = _build_key("SLURM_JOB_RESERVATION", het_job_offset);
1911 		if (!getenv(key) &&
1912 		    (setenvf(NULL, key, "%s", resp->resv_name) < 0)) {
1913 			error("unable to set %s in environment", key);
1914 		}
1915 		xfree(key);
1916 	}
1917 
1918 	if (resp->alias_list) {
1919 		key = _build_key("SLURM_NODE_ALIASES", het_job_offset);
1920 		if (!getenv(key) &&
1921 		    (setenvf(NULL, key, "%s", resp->alias_list) < 0)) {
1922 			error("unable to set %s in environment", key);
1923 		}
1924 		xfree(key);
1925 	}
1926 }
1927 
1928 /*
1929  * _set_prio_process_env
1930  *
1931  * Set the internal SLURM_PRIO_PROCESS environment variable to support
1932  * the propagation of the users nice value and the "PropagatePrioProcess"
1933  * config keyword.
1934  */
_set_prio_process_env(void)1935 static void  _set_prio_process_env(void)
1936 {
1937 	int retval;
1938 
1939 	errno = 0; /* needed to detect a real failure since prio can be -1 */
1940 
1941 	if ((retval = getpriority (PRIO_PROCESS, 0)) == -1)  {
1942 		if (errno) {
1943 			error ("getpriority(PRIO_PROCESS): %m");
1944 			return;
1945 		}
1946 	}
1947 
1948 	if (setenvf (NULL, "SLURM_PRIO_PROCESS", "%d", retval) < 0) {
1949 		error ("unable to set SLURM_PRIO_PROCESS in environment");
1950 		return;
1951 	}
1952 
1953 	debug ("propagating SLURM_PRIO_PROCESS=%d", retval);
1954 }
1955 
1956 /* Set SLURM_RLIMIT_* environment variables with current resource
1957  * limit values, reset RLIMIT_NOFILE to maximum possible value */
_set_rlimit_env(void)1958 static int _set_rlimit_env(void)
1959 {
1960 	int                  rc = SLURM_SUCCESS;
1961 	struct rlimit        rlim[1];
1962 	unsigned long        cur;
1963 	char                 name[64], *format;
1964 	slurm_rlimits_info_t *rli;
1965 
1966 	/* Modify limits with any command-line options */
1967 	if (sropt.propagate
1968 	    && parse_rlimits(sropt.propagate, PROPAGATE_RLIMITS)) {
1969 		error( "--propagate=%s is not valid.", sropt.propagate );
1970 		exit(error_exit);
1971 	}
1972 
1973 	for (rli = get_slurm_rlimits_info(); rli->name != NULL; rli++ ) {
1974 
1975 		if (rli->propagate_flag != PROPAGATE_RLIMITS)
1976 			continue;
1977 
1978 		if (getrlimit (rli->resource, rlim) < 0) {
1979 			error ("getrlimit (RLIMIT_%s): %m", rli->name);
1980 			rc = SLURM_ERROR;
1981 			continue;
1982 		}
1983 
1984 		cur = (unsigned long) rlim->rlim_cur;
1985 		snprintf(name, sizeof(name), "SLURM_RLIMIT_%s", rli->name);
1986 		if (sropt.propagate && (rli->propagate_flag == PROPAGATE_RLIMITS))
1987 			/*
1988 			 * Prepend 'U' to indicate user requested propagate
1989 			 */
1990 			format = "U%lu";
1991 		else
1992 			format = "%lu";
1993 
1994 		if (setenvf (NULL, name, format, cur) < 0) {
1995 			error ("unable to set %s in environment", name);
1996 			rc = SLURM_ERROR;
1997 			continue;
1998 		}
1999 
2000 		debug ("propagating RLIMIT_%s=%lu", rli->name, cur);
2001 	}
2002 
2003 	/*
2004 	 *  Now increase NOFILE to the max available for this srun
2005 	 */
2006 	rlimits_maximize_nofile();
2007 
2008 	return rc;
2009 }
2010 
2011 /* Set SLURM_CLUSTER_NAME< SLURM_SUBMIT_DIR and SLURM_SUBMIT_HOST environment
2012  * variables within current state */
_set_submit_dir_env(void)2013 static void _set_submit_dir_env(void)
2014 {
2015 	char buf[MAXPATHLEN + 1], host[256];
2016 	char *cluster_name;
2017 
2018 	cluster_name = slurm_get_cluster_name();
2019 	if (cluster_name) {
2020 		if (setenvf(NULL, "SLURM_CLUSTER_NAME", "%s", cluster_name) < 0)
2021 			error("unable to set SLURM_CLUSTER_NAME in environment");
2022 		xfree(cluster_name);
2023 	}
2024 
2025 	if ((getcwd(buf, MAXPATHLEN)) == NULL)
2026 		error("getcwd failed: %m");
2027 	else if (setenvf(NULL, "SLURM_SUBMIT_DIR", "%s", buf) < 0)
2028 		error("unable to set SLURM_SUBMIT_DIR in environment");
2029 
2030 	if ((gethostname(host, sizeof(host))))
2031 		error("gethostname_short failed: %m");
2032 	else if (setenvf(NULL, "SLURM_SUBMIT_HOST", "%s", host) < 0)
2033 		error("unable to set SLURM_SUBMIT_HOST in environment");
2034 }
2035 
2036 /* Set some environment variables with current state */
_set_umask_env(void)2037 static int _set_umask_env(void)
2038 {
2039 	if (!getenv("SRUN_DEBUG")) {	/* do not change current value */
2040 		/* NOTE: Default debug level is 3 (info) */
2041 		int log_level = LOG_LEVEL_INFO + opt.verbose - opt.quiet;
2042 
2043 		if (setenvf(NULL, "SRUN_DEBUG", "%d", log_level) < 0)
2044 			error ("unable to set SRUN_DEBUG in environment");
2045 	}
2046 
2047 	if (!getenv("SLURM_UMASK")) {	/* do not change current value */
2048 		char mask_char[5];
2049 		mode_t mask;
2050 
2051 		mask = (int)umask(0);
2052 		umask(mask);
2053 
2054 		sprintf(mask_char, "0%d%d%d",
2055 			((mask>>6)&07), ((mask>>3)&07), mask&07);
2056 		if (setenvf(NULL, "SLURM_UMASK", "%s", mask_char) < 0) {
2057 			error ("unable to set SLURM_UMASK in environment");
2058 			return SLURM_ERROR;
2059 		}
2060 		debug ("propagating UMASK=%s", mask_char);
2061 	}
2062 
2063 	return SLURM_SUCCESS;
2064 }
2065 
_shepherd_notify(int shepherd_fd)2066 static void _shepherd_notify(int shepherd_fd)
2067 {
2068 	int rc;
2069 
2070 	while (1) {
2071 		rc = write(shepherd_fd, "", 1);
2072 		if (rc == -1) {
2073 			if ((errno == EAGAIN) || (errno == EINTR))
2074 				continue;
2075 			error("write(shepherd): %m");
2076 		}
2077 		break;
2078 	}
2079 	close(shepherd_fd);
2080 }
2081 
_shepherd_spawn(srun_job_t * job,List srun_job_list,bool got_alloc)2082 static int _shepherd_spawn(srun_job_t *job, List srun_job_list, bool got_alloc)
2083 {
2084 	int shepherd_pipe[2], rc;
2085 	pid_t shepherd_pid;
2086 	char buf[1];
2087 
2088 	if (pipe(shepherd_pipe)) {
2089 		error("pipe: %m");
2090 		return -1;
2091 	}
2092 
2093 	shepherd_pid = fork();
2094 	if (shepherd_pid == -1) {
2095 		error("fork: %m");
2096 		return -1;
2097 	}
2098 	if (shepherd_pid != 0) {
2099 		close(shepherd_pipe[0]);
2100 		return shepherd_pipe[1];
2101 	}
2102 
2103 	/* Wait for parent to notify of completion or I/O error on abort */
2104 	close(shepherd_pipe[1]);
2105 	while (1) {
2106 		rc = read(shepherd_pipe[0], buf, 1);
2107 		if (rc == 1) {
2108 			_exit(0);
2109 		} else if (rc == 0) {
2110 			break;	/* EOF */
2111 		} else if (rc == -1) {
2112 			if ((errno == EAGAIN) || (errno == EINTR))
2113 				continue;
2114 			break;
2115 		}
2116 	}
2117 
2118 	if (srun_job_list) {
2119 		ListIterator job_iter;
2120 		job_iter  = list_iterator_create(srun_job_list);
2121 		while ((job = list_next(job_iter))) {
2122 			(void) slurm_kill_job_step(job->jobid, job->stepid,
2123 						   SIGKILL);
2124 			if (got_alloc)
2125 				slurm_complete_job(job->jobid, NO_VAL);
2126 		}
2127 		list_iterator_destroy(job_iter);
2128 	} else {
2129 		(void) slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
2130 		if (got_alloc)
2131 			slurm_complete_job(job->jobid, NO_VAL);
2132 	}
2133 
2134 	_exit(0);
2135 	return -1;
2136 }
2137 
2138 /* _srun_signal_mgr - Process daemon-wide signals */
_srun_signal_mgr(void * job_ptr)2139 static void *_srun_signal_mgr(void *job_ptr)
2140 {
2141 	int sig;
2142 	int i, rc;
2143 	sigset_t set;
2144 	srun_job_t *job = (srun_job_t *)job_ptr;
2145 
2146 	/* Make sure no required signals are ignored (possibly inherited) */
2147 	for (i = 0; sig_array[i]; i++)
2148 		_default_sigaction(sig_array[i]);
2149 	while (!srun_shutdown) {
2150 		xsignal_sigset_create(sig_array, &set);
2151 		rc = sigwait(&set, &sig);
2152 		if (rc == EINTR)
2153 			continue;
2154 		switch (sig) {
2155 		case SIGINT:
2156 			if (!srun_shutdown)
2157 				_handle_intr(job);
2158 			break;
2159 		case SIGQUIT:
2160 			info("Quit");
2161 			/* continue with slurm_step_launch_abort */
2162 		case SIGTERM:
2163 		case SIGHUP:
2164 			/* No need to call job_force_termination here since we
2165 			 * are ending the job now and we don't need to update
2166 			 * the state. */
2167 			info("forcing job termination");
2168 			launch_g_fwd_signal(SIGKILL);
2169 			break;
2170 		case SIGCONT:
2171 			info("got SIGCONT");
2172 			break;
2173 		case SIGPIPE:
2174 			_handle_pipe();
2175 			break;
2176 		case SIGALRM:
2177 			if (srun_max_timer) {
2178 				info("First task exited %ds ago", sropt.max_wait);
2179 				launch_g_print_status();
2180 				launch_g_step_terminate();
2181 			}
2182 			break;
2183 		default:
2184 			launch_g_fwd_signal(sig);
2185 			break;
2186 		}
2187 	}
2188 	return NULL;
2189 }
2190 
2191 /* if srun_opt->exclusive is set, disable user task layout controls */
_step_opt_exclusive(slurm_opt_t * opt_local)2192 static void _step_opt_exclusive(slurm_opt_t *opt_local)
2193 {
2194 	srun_opt_t *srun_opt = opt_local->srun_opt;
2195 	xassert(srun_opt);
2196 
2197 	if (!opt_local->ntasks_set) {
2198 		error("--ntasks must be set with --exclusive");
2199 		exit(error_exit);
2200 	}
2201 	if (srun_opt->relative != NO_VAL) {
2202 		error("--relative disabled, incompatible with --exclusive");
2203 		exit(error_exit);
2204 	}
2205 	if (opt_local->exclude) {
2206 		error("--exclude is incompatible with --exclusive");
2207 		exit(error_exit);
2208 	}
2209 }
2210 
_validate_relative(resource_allocation_response_msg_t * resp,slurm_opt_t * opt_local)2211 static int _validate_relative(resource_allocation_response_msg_t *resp,
2212 			      slurm_opt_t *opt_local)
2213 {
2214 	srun_opt_t *srun_opt = opt_local->srun_opt;
2215 	xassert(srun_opt);
2216 
2217 	if ((srun_opt->relative != NO_VAL) &&
2218 	    ((srun_opt->relative + opt_local->min_nodes)
2219 	     > resp->node_cnt)) {
2220 		if (slurm_option_set_by_cli(opt_local, 'N')) {
2221 			/* -N command line option used */
2222 			error("--relative and --nodes option incompatible "
2223 			      "with count of allocated nodes (%d+%d>%d)",
2224 			      srun_opt->relative,
2225 			      opt_local->min_nodes,
2226 			      resp->node_cnt);
2227 		} else {		/* SLURM_JOB_NUM_NODES option used */
2228 			error("--relative and SLURM_JOB_NUM_NODES option incompatible with count of allocated nodes (%d+%d>%d)",
2229 			      srun_opt->relative,
2230 			      opt_local->min_nodes,
2231 			      resp->node_cnt);
2232 		}
2233 		return SLURM_ERROR;
2234 	}
2235 	return SLURM_SUCCESS;
2236 }
2237 
_call_spank_fini(void)2238 static void _call_spank_fini(void)
2239 {
2240 	if (-1 != shepherd_fd)
2241 		spank_fini(NULL);
2242 }
2243 
2244 /*
2245  * Run cli_filter_post_submit on all opt structures
2246  * Convenience function since this might need to run in two spots
2247  */
_srun_cli_filter_post_submit(uint32_t jobid,uint32_t stepid)2248 static void _srun_cli_filter_post_submit(uint32_t jobid, uint32_t stepid)
2249 {
2250 	static bool post_submit_ran = false;
2251 	int idx = 0, components = 1;
2252 
2253 	if (post_submit_ran)
2254 		return;
2255 
2256 	if (opt_list)
2257 		components = list_count(opt_list);
2258 
2259 	for (idx = 0; idx < components; idx++)
2260 		cli_filter_plugin_post_submit(idx, jobid, stepid);
2261 
2262 	post_submit_ran = true;
2263 }
2264