1 /****************************************************************************\
2 * srun_job.c - job data structure creation functions
3 *****************************************************************************
4 * Copyright (C) 2002-2007 The Regents of the University of California.
5 * Copyright (C) 2008 Lawrence Livermore National Security.
6 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7 * Written by Mark Grondona <grondona@llnl.gov>.
8 * CODE-OCEC-09-009. All rights reserved.
9 *
10 * This file is part of Slurm, a resource management program.
11 * For details, see <https://slurm.schedmd.com/>.
12 * Please also read the included file: DISCLAIMER.
13 *
14 * Slurm is free software; you can redistribute it and/or modify it under
15 * the terms of the GNU General Public License as published by the Free
16 * Software Foundation; either version 2 of the License, or (at your option)
17 * any later version.
18 *
19 * In addition, as a special exception, the copyright holders give permission
20 * to link the code of portions of this program with the OpenSSL library under
21 * certain conditions as described in each individual source file, and
22 * distribute linked combinations including the two. You must obey the GNU
23 * General Public License in all respects for all of the code used other than
24 * OpenSSL. If you modify file(s) with this exception, you may extend this
25 * exception to your version of the file(s), but you are not obligated to do
26 * so. If you do not wish to do so, delete this exception statement from your
27 * version. If you delete this exception statement from all source files in
28 * the program, then also delete it here.
29 *
30 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
33 * details.
34 *
35 * You should have received a copy of the GNU General Public License along
36 * with Slurm; if not, write to the Free Software Foundation, Inc.,
37 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
38 \*****************************************************************************/
39
40 #include "config.h"
41
42 #include <fcntl.h>
43 #include <grp.h>
44 #include <netdb.h>
45 #include <signal.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <sys/param.h> /* MAXPATHLEN */
49 #include <sys/resource.h>
50 #include <sys/stat.h>
51 #include <sys/types.h>
52 #include <sys/wait.h>
53 #include <unistd.h>
54
55 #include "src/common/bitstring.h"
56 #include "src/common/cli_filter.h"
57 #include "src/common/cbuf.h"
58 #include "src/common/fd.h"
59 #include "src/common/forward.h"
60 #include "src/common/hostlist.h"
61 #include "src/common/io_hdr.h"
62 #include "src/common/log.h"
63 #include "src/common/macros.h"
64 #include "src/common/plugstack.h"
65 #include "src/common/proc_args.h"
66 #include "src/common/read_config.h"
67 #include "src/common/slurm_opt.h"
68 #include "src/common/slurm_protocol_api.h"
69 #include "src/common/slurm_rlimits_info.h"
70 #include "src/common/uid.h"
71 #include "src/common/xmalloc.h"
72 #include "src/common/xsignal.h"
73 #include "src/common/xstring.h"
74
75 #include "src/api/step_launch.h"
76
77 #include "src/srun/libsrun/allocate.h"
78 #include "src/srun/libsrun/debugger.h"
79 #include "src/srun/libsrun/fname.h"
80 #include "src/srun/libsrun/launch.h"
81 #include "src/srun/libsrun/opt.h"
82 #include "src/srun/libsrun/multi_prog.h"
83 #include "src/srun/libsrun/srun_job.h"
84
85 /*
86 * allocation information structure used to store general information
87 * about node allocation to be passed to _job_create_structure()
88 */
89 typedef struct allocation_info {
90 char *alias_list;
91 uint16_t *cpus_per_node;
92 uint32_t *cpu_count_reps;
93 uint32_t jobid;
94 uint32_t nnodes;
95 char *nodelist;
96 uint16_t ntasks_per_board;/* number of tasks to invoke on each board */
97 uint16_t ntasks_per_core; /* number of tasks to invoke on each core */
98 uint16_t ntasks_per_socket;/* number of tasks to invoke on
99 * each socket */
100 uint32_t num_cpu_groups;
101 char *partition;
102 dynamic_plugin_data_t *select_jobinfo;
103 uint32_t stepid;
104 } allocation_info_t;
105
106 typedef struct het_job_resp_struct {
107 char **alias_list;
108 uint16_t *cpu_cnt;
109 hostlist_t host_list;
110 uint32_t node_cnt;
111 } het_job_resp_struct_t;
112
113
114 static int shepherd_fd = -1;
115 static pthread_t signal_thread = (pthread_t) 0;
116 static int pty_sigarray[] = { SIGWINCH, 0 };
117
118 extern char **environ;
119
120 /*
121 * Prototypes:
122 */
123
124 static int _become_user(void);
125 static void _call_spank_fini(void);
126 static int _call_spank_local_user(srun_job_t *job, slurm_opt_t *opt_local);
127 static void _default_sigaction(int sig);
128 static long _diff_tv_str(struct timeval *tv1, struct timeval *tv2);
129 static void _handle_intr(srun_job_t *job);
130 static void _handle_pipe(void);
131 static srun_job_t *_job_create_structure(allocation_info_t *ainfo,
132 slurm_opt_t *opt_local);
133 static char *_normalize_hostlist(const char *hostlist);
134 static void _print_job_information(resource_allocation_response_msg_t *resp);
135 static void _run_srun_epilog (srun_job_t *job);
136 static void _run_srun_prolog (srun_job_t *job);
137 static int _run_srun_script (srun_job_t *job, char *script);
138 static void _set_env_vars(resource_allocation_response_msg_t *resp,
139 int het_job_offset);
140 static void _set_env_vars2(resource_allocation_response_msg_t *resp,
141 int het_job_offset);
142 static void _set_ntasks(allocation_info_t *ai, slurm_opt_t *opt_local);
143 static void _set_prio_process_env(void);
144 static int _set_rlimit_env(void);
145 static void _set_submit_dir_env(void);
146 static int _set_umask_env(void);
147 static void _shepherd_notify(int shepherd_fd);
148 static int _shepherd_spawn(srun_job_t *job, List srun_job_list,
149 bool got_alloc);
150 static void *_srun_signal_mgr(void *no_data);
151 static void _srun_cli_filter_post_submit(uint32_t jobid, uint32_t stepid);
152 static void _step_opt_exclusive(slurm_opt_t *opt_local);
153 static int _validate_relative(resource_allocation_response_msg_t *resp,
154 slurm_opt_t *opt_local);
155
156
157 /*
158 * Create an srun job structure w/out an allocation response msg.
159 * (i.e. use the command line options)
160 */
161 srun_job_t *
job_create_noalloc(void)162 job_create_noalloc(void)
163 {
164 srun_job_t *job = NULL;
165 allocation_info_t *ai = xmalloc(sizeof(allocation_info_t));
166 uint16_t cpn[1];
167 uint32_t cpu_count_reps[1];
168 slurm_opt_t *opt_local = &opt;
169 hostlist_t hl = hostlist_create(opt_local->nodelist);
170
171 if (!hl) {
172 error("Invalid node list `%s' specified", opt_local->nodelist);
173 goto error;
174 }
175 srand48(getpid());
176 ai->jobid = MIN_NOALLOC_JOBID +
177 ((uint32_t) lrand48() %
178 (MAX_NOALLOC_JOBID - MIN_NOALLOC_JOBID + 1));
179 ai->stepid = (uint32_t) (lrand48());
180 ai->nodelist = opt_local->nodelist;
181 ai->nnodes = hostlist_count(hl);
182
183 hostlist_destroy(hl);
184
185 cpn[0] = (opt_local->ntasks + ai->nnodes - 1) / ai->nnodes;
186 ai->cpus_per_node = cpn;
187 cpu_count_reps[0] = ai->nnodes;
188 ai->cpu_count_reps = cpu_count_reps;
189 ai->num_cpu_groups = 1;
190
191 /*
192 * Create job, then fill in host addresses
193 */
194 job = _job_create_structure(ai, opt_local);
195
196 if (job != NULL)
197 job_update_io_fnames(job, opt_local);
198
199 error:
200 xfree(ai);
201 return (job);
202
203 }
204
205 /*
206 * Create an srun job structure for a step w/out an allocation response msg.
207 * (i.e. inside an allocation)
208 */
job_step_create_allocation(resource_allocation_response_msg_t * resp,slurm_opt_t * opt_local)209 extern srun_job_t *job_step_create_allocation(
210 resource_allocation_response_msg_t *resp,
211 slurm_opt_t *opt_local)
212 {
213 srun_opt_t *srun_opt = opt_local->srun_opt;
214 uint32_t job_id = resp->job_id;
215 srun_job_t *job = NULL;
216 allocation_info_t *ai = xmalloc(sizeof(allocation_info_t));
217 hostlist_t hl = NULL;
218 char *buf = NULL;
219 int count = 0;
220 uint32_t alloc_count = 0;
221 char *step_nodelist = NULL;
222 xassert(srun_opt);
223
224 ai->jobid = job_id;
225 ai->stepid = NO_VAL;
226 ai->alias_list = resp->alias_list;
227 if (srun_opt->alloc_nodelist)
228 ai->nodelist = xstrdup(srun_opt->alloc_nodelist);
229 else
230 ai->nodelist = xstrdup(resp->node_list);
231 hl = hostlist_create(ai->nodelist);
232 hostlist_uniq(hl);
233 alloc_count = hostlist_count(hl);
234 ai->nnodes = alloc_count;
235 hostlist_destroy(hl);
236
237 if (opt_local->exclude) {
238 hostlist_t exc_hl = hostlist_create(opt_local->exclude);
239 hostlist_t inc_hl = NULL;
240 char *node_name = NULL;
241
242 hl = hostlist_create(ai->nodelist);
243 if (opt_local->nodelist)
244 inc_hl = hostlist_create(opt_local->nodelist);
245 hostlist_uniq(hl);
246 //info("using %s or %s", opt_local->nodelist, ai->nodelist);
247 while ((node_name = hostlist_shift(exc_hl))) {
248 int inx = hostlist_find(hl, node_name);
249 if (inx >= 0) {
250 debug("excluding node %s", node_name);
251 hostlist_delete_nth(hl, inx);
252 ai->nnodes--; /* decrement node count */
253 }
254 if (inc_hl) {
255 inx = hostlist_find(inc_hl, node_name);
256 if (inx >= 0) {
257 error("Requested node %s is also "
258 "in the excluded list.",
259 node_name);
260 error("Job not submitted.");
261 hostlist_destroy(exc_hl);
262 hostlist_destroy(inc_hl);
263 goto error;
264 }
265 }
266 free(node_name);
267 }
268 hostlist_destroy(exc_hl);
269
270 /* we need to set this here so if there are more nodes
271 * available than we requested we can set it
272 * straight. If there is no exclude list then we set
273 * the vars then.
274 */
275 if (!opt_local->nodes_set) {
276 /* we don't want to set the number of nodes =
277 * to the number of requested processes unless we
278 * know it is less than the number of nodes
279 * in the allocation
280 */
281 if (opt_local->ntasks_set &&
282 (opt_local->ntasks < ai->nnodes))
283 opt_local->min_nodes = opt_local->ntasks;
284 else
285 opt_local->min_nodes = ai->nnodes;
286 opt_local->nodes_set = true;
287 }
288 if (!opt_local->max_nodes)
289 opt_local->max_nodes = opt_local->min_nodes;
290 if ((opt_local->max_nodes > 0) &&
291 (opt_local->max_nodes < ai->nnodes))
292 ai->nnodes = opt_local->max_nodes;
293
294 count = hostlist_count(hl);
295 if (!count) {
296 error("Hostlist is empty! Can't run job.");
297 hostlist_destroy(hl);
298 goto error;
299 }
300 if (inc_hl) {
301 count = hostlist_count(inc_hl);
302 if (count < ai->nnodes) {
303 /* add more nodes to get correct number for
304 allocation */
305 hostlist_t tmp_hl = hostlist_copy(hl);
306 int i = 0;
307 int diff = ai->nnodes - count;
308 buf = hostlist_ranged_string_xmalloc(inc_hl);
309 hostlist_delete(tmp_hl, buf);
310 xfree(buf);
311 while ((i < diff) &&
312 (node_name = hostlist_shift(tmp_hl))) {
313 hostlist_push_host(inc_hl, node_name);
314 free(node_name);
315 i++;
316 }
317 hostlist_destroy(tmp_hl);
318 }
319 buf = hostlist_ranged_string_xmalloc(inc_hl);
320 hostlist_destroy(inc_hl);
321 xfree(opt_local->nodelist);
322 opt_local->nodelist = buf;
323 } else {
324 if (count > ai->nnodes) {
325 /* remove more nodes than needed for
326 * allocation */
327 int i;
328 for (i = count; i >= ai->nnodes; i--)
329 hostlist_delete_nth(hl, i);
330 }
331 xfree(opt_local->nodelist);
332 opt_local->nodelist = hostlist_ranged_string_xmalloc(hl);
333 }
334
335 hostlist_destroy(hl);
336 } else {
337 if (!opt_local->nodes_set) {
338 /* we don't want to set the number of nodes =
339 * to the number of requested processes unless we
340 * know it is less than the number of nodes
341 * in the allocation
342 */
343 if (opt_local->ntasks_set &&
344 (opt_local->ntasks < ai->nnodes))
345 opt_local->min_nodes = opt_local->ntasks;
346 else
347 opt_local->min_nodes = ai->nnodes;
348 opt_local->nodes_set = true;
349 }
350 if (!opt_local->max_nodes)
351 opt_local->max_nodes = opt_local->min_nodes;
352 if ((opt_local->max_nodes > 0) &&
353 (opt_local->max_nodes < ai->nnodes))
354 ai->nnodes = opt_local->max_nodes;
355 /* Don't reset the ai->nodelist because that is the
356 * nodelist we want to say the allocation is under
357 * opt_local->nodelist is what is used for the allocation.
358 */
359 /* xfree(ai->nodelist); */
360 /* ai->nodelist = xstrdup(buf); */
361 }
362
363 /* get the correct number of hosts to run tasks on */
364 if (opt_local->nodelist)
365 step_nodelist = opt_local->nodelist;
366 else if (((opt_local->distribution & SLURM_DIST_STATE_BASE) ==
367 SLURM_DIST_ARBITRARY) && (count == 0))
368 step_nodelist = getenv("SLURM_ARBITRARY_NODELIST");
369 if (step_nodelist) {
370 hl = hostlist_create(step_nodelist);
371 if ((opt_local->distribution & SLURM_DIST_STATE_BASE) !=
372 SLURM_DIST_ARBITRARY)
373 hostlist_uniq(hl);
374 if (!hostlist_count(hl)) {
375 error("Hostlist is empty! Can not run job.");
376 hostlist_destroy(hl);
377 goto error;
378 }
379
380 buf = hostlist_ranged_string_xmalloc(hl);
381 count = hostlist_count(hl);
382 hostlist_destroy(hl);
383 /*
384 * Don't reset the ai->nodelist because that is the
385 * nodelist we want to say the allocation is under
386 * opt_local->nodelist is what is used for the allocation.
387 */
388 /* xfree(ai->nodelist); */
389 /* ai->nodelist = xstrdup(buf); */
390 xfree(opt_local->nodelist);
391 opt_local->nodelist = buf;
392 }
393
394 if (((opt_local->distribution & SLURM_DIST_STATE_BASE) ==
395 SLURM_DIST_ARBITRARY) && (count != opt_local->ntasks)) {
396 error("You asked for %d tasks but hostlist specified %d nodes",
397 opt_local->ntasks, count);
398 goto error;
399 }
400
401 if (ai->nnodes == 0) {
402 error("No nodes in allocation, can't run job");
403 goto error;
404 }
405
406 ai->num_cpu_groups = resp->num_cpu_groups;
407 ai->cpus_per_node = resp->cpus_per_node;
408 ai->cpu_count_reps = resp->cpu_count_reps;
409 ai->ntasks_per_board = resp->ntasks_per_board;
410
411 /* Here let the srun options override the allocation resp */
412 ai->ntasks_per_core = (opt_local->ntasks_per_core != NO_VAL) ?
413 opt_local->ntasks_per_core : resp->ntasks_per_core;
414 ai->ntasks_per_socket = (opt_local->ntasks_per_socket != NO_VAL) ?
415 opt_local->ntasks_per_socket : resp->ntasks_per_socket;
416
417 ai->partition = resp->partition;
418
419 /* info("looking for %d nodes out of %s with a must list of %s", */
420 /* ai->nnodes, ai->nodelist, opt_local->nodelist); */
421 /*
422 * Create job
423 */
424 job = _job_create_structure(ai, opt_local);
425 error:
426 xfree(ai);
427 return (job);
428
429 }
430
431 /*
432 * Create an srun job structure from a resource allocation response msg
433 */
job_create_allocation(resource_allocation_response_msg_t * resp,slurm_opt_t * opt_local)434 extern srun_job_t *job_create_allocation(
435 resource_allocation_response_msg_t *resp,
436 slurm_opt_t *opt_local)
437 {
438 srun_job_t *job;
439 allocation_info_t *i = xmalloc(sizeof(allocation_info_t));
440
441 i->alias_list = resp->alias_list;
442 i->nodelist = _normalize_hostlist(resp->node_list);
443 i->nnodes = resp->node_cnt;
444 i->partition = resp->partition;
445 i->jobid = resp->job_id;
446 i->stepid = NO_VAL;
447 i->num_cpu_groups = resp->num_cpu_groups;
448 i->cpus_per_node = resp->cpus_per_node;
449 i->cpu_count_reps = resp->cpu_count_reps;
450 i->ntasks_per_board = resp->ntasks_per_board;
451 i->ntasks_per_core = resp->ntasks_per_core;
452 i->ntasks_per_socket = resp->ntasks_per_socket;
453
454 i->select_jobinfo = select_g_select_jobinfo_copy(resp->select_jobinfo);
455
456 job = _job_create_structure(i, opt_local);
457 if (job) {
458 job->account = xstrdup(resp->account);
459 job->qos = xstrdup(resp->qos);
460 job->resv_name = xstrdup(resp->resv_name);
461 }
462
463 xfree(i->nodelist);
464 xfree(i);
465
466 return (job);
467 }
468
_copy_args(List missing_argc_list,slurm_opt_t * opt_master)469 static void _copy_args(List missing_argc_list, slurm_opt_t *opt_master)
470 {
471 srun_opt_t *srun_master = opt_master->srun_opt;
472 ListIterator iter;
473 slurm_opt_t *opt_local;
474 int i;
475 xassert(srun_master);
476
477 iter = list_iterator_create(missing_argc_list);
478 while ((opt_local = list_next(iter))) {
479 srun_opt_t *srun_opt = opt_local->srun_opt;
480 xassert(srun_opt);
481 srun_opt->argc = srun_master->argc;
482 srun_opt->argv = xmalloc(sizeof(char *) * (srun_opt->argc+1));
483 for (i = 0; i < srun_opt->argc; i++)
484 srun_opt->argv[i] = xstrdup(srun_master->argv[i]);
485 list_remove(iter);
486 }
487 list_iterator_destroy(iter);
488 }
489
490 /*
491 * Build "het_group" string. If set on execute line, it may need to be
492 * rebuilt for multiple option structures ("--het-group=1,2" becomes two
493 * opt structures). Clear "het_grp_bits".if determined to not be a hetjob.
494 */
_het_grp_test(List opt_list)495 static void _het_grp_test(List opt_list)
496 {
497 ListIterator iter;
498 int het_job_offset;
499 bitstr_t *master_map = NULL;
500 List missing_argv_list = NULL;
501 bool multi_comp = false, multi_prog = false;
502
503 if (opt_list) {
504 slurm_opt_t *opt_local;
505 missing_argv_list = list_create(NULL);
506 iter = list_iterator_create(opt_list);
507 while ((opt_local = list_next(iter))) {
508 srun_opt_t *srun_opt = opt_local->srun_opt;
509 xassert(srun_opt);
510 if (srun_opt->argc == 0)
511 list_append(missing_argv_list, opt_local);
512 else
513 _copy_args(missing_argv_list, opt_local);
514 xfree(srun_opt->het_group);
515 if (srun_opt->het_grp_bits &&
516 ((het_job_offset =
517 bit_ffs(srun_opt->het_grp_bits)) >= 0)) {
518 xstrfmtcat(srun_opt->het_group, "%d",
519 het_job_offset);
520 }
521 if (!srun_opt->het_grp_bits) {
522 error("%s: het_grp_bits is NULL", __func__);
523 } else if (!master_map) {
524 master_map
525 = bit_copy(srun_opt->het_grp_bits);
526 } else {
527 if (bit_overlap_any(master_map,
528 srun_opt->het_grp_bits)) {
529 fatal("Duplicate het groups in single srun not supported");
530 }
531 bit_or(master_map, srun_opt->het_grp_bits);
532 }
533 if (srun_opt->multi_prog)
534 multi_prog = true;
535 }
536 if (master_map && (bit_set_count(master_map) > 1))
537 multi_comp = true;
538 FREE_NULL_BITMAP(master_map);
539 list_iterator_destroy(iter);
540 list_destroy(missing_argv_list);
541 } else if (!sropt.het_group && !getenv("SLURM_HET_SIZE")) {
542 FREE_NULL_BITMAP(sropt.het_grp_bits);
543 /* het_group is already NULL */
544 } else if (!sropt.het_group && sropt.het_grp_bits) {
545 if ((het_job_offset = bit_ffs(sropt.het_grp_bits)) < 0)
546 het_job_offset = 0;
547 else if (bit_set_count(sropt.het_grp_bits) > 1)
548 multi_comp = true;
549 if (sropt.multi_prog)
550 multi_prog = true;
551 xstrfmtcat(sropt.het_group, "%d", het_job_offset);
552 }
553
554 if (multi_comp && multi_prog)
555 fatal("--multi-prog option not supported with multiple het groups");
556 }
557
558 /*
559 * Copy job name from last component to all hetjob components unless
560 * explicitly set.
561 */
_match_job_name(List opt_list)562 static void _match_job_name(List opt_list)
563 {
564 int cnt;
565 ListIterator iter;
566 slurm_opt_t *opt_local;
567
568 if (!opt_list)
569 return;
570
571 cnt = list_count(opt_list);
572 if (cnt < 2)
573 return;
574
575 iter = list_iterator_create(opt_list);
576 while ((opt_local = list_next(iter))) {
577 if (!opt_local->job_name)
578 opt_local->job_name = xstrdup(opt.job_name);
579 if (opt_local->srun_opt &&
580 (opt_local->srun_opt->open_mode == 0)) {
581 opt_local->srun_opt->open_mode = OPEN_MODE_APPEND;
582 }
583 }
584 list_iterator_destroy(iter);
585 }
586
_sort_by_offset(void * x,void * y)587 static int _sort_by_offset(void *x, void *y)
588 {
589 slurm_opt_t *opt_local1 = *(slurm_opt_t **) x;
590 slurm_opt_t *opt_local2 = *(slurm_opt_t **) y;
591 int offset1 = -1, offset2 = -1;
592
593 if (opt_local1->srun_opt->het_grp_bits)
594 offset1 = bit_ffs(opt_local1->srun_opt->het_grp_bits);
595 if (opt_local2->srun_opt->het_grp_bits)
596 offset2 = bit_ffs(opt_local2->srun_opt->het_grp_bits);
597 if (offset1 < offset2)
598 return -1;
599 if (offset1 > offset2)
600 return 1;
601 return 0;
602 }
603
_post_opts(List opt_list)604 static void _post_opts(List opt_list)
605 {
606 _het_grp_test(opt_list);
607 _match_job_name(opt_list);
608 if (opt_list)
609 list_sort(opt_list, _sort_by_offset);
610 }
611
init_srun(int argc,char ** argv,log_options_t * logopt,int debug_level,bool handle_signals)612 extern void init_srun(int argc, char **argv,
613 log_options_t *logopt, int debug_level,
614 bool handle_signals)
615 {
616 bool het_job_fini = false;
617 int i, het_job_argc, het_job_inx, het_job_argc_off;
618 char **het_job_argv;
619
620 /*
621 * This must happen before we spawn any threads
622 * which are not designed to handle arbitrary signals
623 */
624 if (handle_signals) {
625 if (xsignal_block(sig_array) < 0)
626 error("Unable to block signals");
627 }
628 xsignal_block(pty_sigarray);
629
630 /*
631 * Initialize plugin stack, read options from plugins, etc.
632 */
633 init_spank_env();
634 if (spank_init(NULL) < 0) {
635 error("Plug-in initialization failed");
636 exit(error_exit);
637 }
638
639 /*
640 * Be sure to call spank_fini when srun exits.
641 */
642 if (atexit(_call_spank_fini) < 0)
643 error("Failed to register atexit handler for plugins: %m");
644
645 het_job_argc = argc;
646 het_job_argv = argv;
647 for (het_job_inx = 0; !het_job_fini; het_job_inx++) {
648 het_job_argc_off = -1;
649 if (initialize_and_process_args(het_job_argc, het_job_argv,
650 &het_job_argc_off) < 0) {
651 error("srun parameter parsing");
652 exit(1);
653 }
654 if ((het_job_argc_off >= 0) &&
655 (het_job_argc_off < het_job_argc)) {
656 for (i = het_job_argc_off; i < het_job_argc; i++) {
657 if (!xstrcmp(het_job_argv[i], ":")) {
658 het_job_argc_off = i;
659 break;
660 }
661 }
662 }
663 if ((het_job_argc_off >= 0) &&
664 (het_job_argc_off < het_job_argc) &&
665 !xstrcmp(het_job_argv[het_job_argc_off], ":")) {
666 /*
667 * move het_job_argv[0] from "srun" to ":"
668 */
669 het_job_argc -= het_job_argc_off;
670 het_job_argv += het_job_argc_off;
671 } else {
672 het_job_fini = true;
673 }
674 }
675 _post_opts(opt_list);
676
677 /*
678 * reinit log with new verbosity (if changed by command line)
679 */
680 if (logopt && (opt.verbose || opt.quiet)) {
681 /*
682 * If log level is already increased, only increment the
683 * level to the difference of opt.verbose an LOG_LEVEL_INFO
684 */
685 if ((opt.verbose -= (logopt->stderr_level - LOG_LEVEL_INFO)) > 0)
686 logopt->stderr_level += opt.verbose;
687 logopt->stderr_level -= opt.quiet;
688 logopt->prefix_level = 1;
689 log_alter(*logopt, 0, NULL);
690 } else
691 opt.verbose = debug_level;
692
693 (void) _set_rlimit_env();
694 _set_prio_process_env();
695 (void) _set_umask_env();
696 _set_submit_dir_env();
697
698 /*
699 * Set up slurmctld message handler
700 */
701 slurmctld_msg_init();
702
703 /*
704 * save process startup time to be used with -I<timeout>
705 */
706 srun_begin_time = time(NULL);
707 }
708
709 /*
710 * Modify options for a job step (after job allocaiton is complete
711 */
_set_step_opts(slurm_opt_t * opt_local)712 static void _set_step_opts(slurm_opt_t *opt_local)
713 {
714 srun_opt_t *srun_opt = opt_local->srun_opt;
715 xassert(srun_opt);
716
717 opt_local->time_limit = NO_VAL;/* not applicable for step, only job */
718 xfree(opt_local->constraint); /* not applicable for this step */
719 if ((srun_opt->core_spec_set || srun_opt->exclusive)
720 && opt_local->cpus_set) {
721 /* Step gets specified CPU count, which may only part
722 * of the job allocation. */
723 srun_opt->exclusive = true;
724 } else {
725 /* Step gets all CPUs in the job allocation. */
726 srun_opt->exclusive = false;
727 }
728 }
729
730 /*
731 * Create the job step(s). For a heterogeneous job, each step is requested in
732 * a separate RPC. create_job_step() references "opt", so we need to match up
733 * the job allocation request with its requested options.
734 */
_create_job_step(srun_job_t * job,bool use_all_cpus,List srun_job_list,uint32_t het_job_id,char * het_job_nodelist)735 static int _create_job_step(srun_job_t *job, bool use_all_cpus,
736 List srun_job_list, uint32_t het_job_id,
737 char *het_job_nodelist)
738 {
739 ListIterator opt_iter = NULL, job_iter;
740 slurm_opt_t *opt_local = &opt;
741 uint32_t node_offset = 0, het_job_nnodes = 0, step_id = NO_VAL;
742 uint32_t het_job_ntasks = 0, task_offset = 0;
743
744 job_step_create_response_msg_t *step_resp;
745 char *resv_ports = NULL;
746 int rc = 0;
747
748 if (srun_job_list) {
749 if (opt_list)
750 opt_iter = list_iterator_create(opt_list);
751 job_iter = list_iterator_create(srun_job_list);
752 while ((job = list_next(job_iter))) {
753 if (het_job_id)
754 job->het_job_id = het_job_id;
755 job->stepid = NO_VAL;
756 het_job_nnodes += job->nhosts;
757 het_job_ntasks += job->ntasks;
758 }
759
760 list_iterator_reset(job_iter);
761 while ((job = list_next(job_iter))) {
762 if (opt_list)
763 opt_local = list_next(opt_iter);
764 if (!opt_local)
765 fatal("%s: opt_list too short", __func__);
766 job->het_job_node_offset = node_offset;
767 job->het_job_nnodes = het_job_nnodes;
768 job->het_job_ntasks = het_job_ntasks;
769 job->het_job_task_offset = task_offset;
770 if (step_id != NO_VAL)
771 job->stepid = step_id;
772 rc = create_job_step(job, use_all_cpus, opt_local);
773 if (rc < 0)
774 break;
775 if (step_id == NO_VAL)
776 step_id = job->stepid;
777
778 if ((slurm_step_ctx_get(job->step_ctx,
779 SLURM_STEP_CTX_RESP,
780 &step_resp) == SLURM_SUCCESS) &&
781 step_resp->resv_ports &&
782 strcmp(step_resp->resv_ports, "(null)")) {
783 if (resv_ports)
784 xstrcat(resv_ports, ",");
785 xstrcat(resv_ports, step_resp->resv_ports);
786 }
787 node_offset += job->nhosts;
788 task_offset += job->ntasks;
789 }
790
791 if (resv_ports) {
792 /*
793 * Merge numeric values into single range
794 * (e.g. "10-12,13-15,16-18" -> "10-18")
795 */
796 hostset_t hs;
797 char *tmp = NULL, *sep;
798 xstrfmtcat(tmp, "[%s]", resv_ports);
799 hs = hostset_create(tmp);
800 hostset_ranged_string(hs, strlen(tmp) + 1, tmp);
801 sep = strchr(tmp, ']');
802 if (sep)
803 sep[0] = '\0';
804 xfree(resv_ports);
805 resv_ports = xstrdup(tmp + 1);
806 xfree(tmp);
807 hostset_destroy(hs);
808
809 list_iterator_reset(job_iter);
810 while ((job = list_next(job_iter))) {
811 if (slurm_step_ctx_get(job->step_ctx,
812 SLURM_STEP_CTX_RESP,
813 &step_resp) == SLURM_SUCCESS) {
814 xfree(step_resp->resv_ports);
815 step_resp->resv_ports =
816 xstrdup(resv_ports);
817 }
818 }
819 xfree(resv_ports);
820 }
821 list_iterator_destroy(job_iter);
822 if (opt_iter)
823 list_iterator_destroy(opt_iter);
824 return rc;
825 } else if (job) {
826 if (het_job_id) {
827 job->het_job_id = het_job_id;
828 job->het_job_nnodes = job->nhosts;
829 job->het_job_ntasks = job->ntasks;
830 job->het_job_task_offset = 0;
831 }
832 return create_job_step(job, use_all_cpus, &opt);
833 } else {
834 return -1;
835 }
836 }
837
_cancel_steps(List srun_job_list)838 static void _cancel_steps(List srun_job_list)
839 {
840 srun_job_t *job;
841 ListIterator job_iter;
842 slurm_msg_t req;
843 step_complete_msg_t msg;
844 int rc = 0;
845
846 if (!srun_job_list)
847 return;
848
849 slurm_msg_t_init(&req);
850 req.msg_type = REQUEST_STEP_COMPLETE;
851 req.data = &msg;
852 memset(&msg, 0, sizeof(step_complete_msg_t));
853 msg.step_rc = 0;
854
855 job_iter = list_iterator_create(srun_job_list);
856 while ((job = list_next(job_iter))) {
857 if (job->stepid == NO_VAL)
858 continue;
859 msg.job_id = job->jobid;
860 msg.job_step_id = job->stepid;
861 msg.range_first = 0;
862 msg.range_last = job->nhosts - 1;
863 (void) slurm_send_recv_controller_rc_msg(&req, &rc,
864 working_cluster_rec);
865 }
866 list_iterator_destroy(job_iter);
867 }
868
_het_job_struct_del(void * x)869 static void _het_job_struct_del(void *x)
870 {
871 het_job_resp_struct_t *het_job_resp = (het_job_resp_struct_t *) x;
872 int i;
873
874 if (het_job_resp->alias_list) {
875 for (i = 0; i < het_job_resp->node_cnt; i++)
876 xfree(het_job_resp->alias_list[i]);
877 xfree(het_job_resp->alias_list);
878 }
879 xfree(het_job_resp->cpu_cnt);
880 if (het_job_resp->host_list)
881 hostlist_destroy(het_job_resp->host_list);
882 xfree(het_job_resp);
883 }
884
_compress_het_job_nodelist(List used_resp_list)885 static char *_compress_het_job_nodelist(List used_resp_list)
886 {
887 resource_allocation_response_msg_t *resp;
888 het_job_resp_struct_t *het_job_resp;
889 List het_job_resp_list;
890 ListIterator resp_iter;
891 char *aliases = NULL, *save_ptr = NULL, *tok, *tmp;
892 char *het_job_nodelist = NULL, *node_name;
893 hostset_t hs;
894 int cnt, i, j, k, len = 0;
895 uint16_t *cpus;
896 uint32_t *reps, cpu_inx;
897 bool have_aliases = false;
898
899 if (!used_resp_list)
900 return het_job_nodelist;
901
902 cnt = list_count(used_resp_list);
903 het_job_resp_list = list_create(_het_job_struct_del);
904 hs = hostset_create("");
905 resp_iter = list_iterator_create(used_resp_list);
906 while ((resp = list_next(resp_iter))) {
907 if (!resp->node_list)
908 continue;
909 len += strlen(resp->node_list);
910 hostset_insert(hs, resp->node_list);
911 het_job_resp = xmalloc(sizeof(het_job_resp_struct_t));
912 het_job_resp->node_cnt = resp->node_cnt;
913 /*
914 * alias_list contains <NodeName>:<NodeAddr>:<NodeHostName>
915 * values in comma separated list
916 */
917 if (resp->alias_list) {
918 have_aliases = true;
919 het_job_resp->alias_list = xmalloc(sizeof(char *) *
920 resp->node_cnt);
921 tmp = xstrdup(resp->alias_list);
922 i = 0;
923 tok = strtok_r(tmp, ",", &save_ptr);
924 while (tok) {
925 if (i >= resp->node_cnt) {
926 fatal("%s: Invalid alias_list",
927 __func__);
928 }
929 het_job_resp->alias_list[i++] = xstrdup(tok);
930 tok = strtok_r(NULL, ",", &save_ptr);
931 }
932 xfree(tmp);
933 }
934 het_job_resp->cpu_cnt =
935 xmalloc(sizeof(uint16_t) * resp->node_cnt);
936 het_job_resp->host_list = hostlist_create(resp->node_list);
937 for (i = 0, k = 0;
938 (i < resp->num_cpu_groups) && (k < resp->node_cnt); i++) {
939 for (j = 0; j < resp->cpu_count_reps[i]; j++) {
940 het_job_resp->cpu_cnt[k++] =
941 resp->cpus_per_node[i];
942 if (k >= resp->node_cnt)
943 break;
944 }
945 if (k >= resp->node_cnt)
946 break;
947 }
948 list_append(het_job_resp_list, het_job_resp);
949 }
950 list_iterator_destroy(resp_iter);
951
952 len += (cnt + 16);
953 het_job_nodelist = xmalloc(len);
954 (void) hostset_ranged_string(hs, len, het_job_nodelist);
955
956 cpu_inx = 0;
957 cnt = hostset_count(hs);
958 cpus = xmalloc(sizeof(uint16_t) * (cnt + 1));
959 reps = xmalloc(sizeof(uint32_t) * (cnt + 1));
960 for (i = 0; i < cnt; i++) {
961 node_name = hostset_nth(hs, i);
962 resp_iter = list_iterator_create(het_job_resp_list);
963 while ((het_job_resp = list_next(resp_iter))) {
964 j = hostlist_find(het_job_resp->host_list, node_name);
965 if ((j == -1) || !het_job_resp->cpu_cnt)
966 continue; /* node not in this hetjob */
967 if (have_aliases) {
968 if (aliases)
969 xstrcat(aliases, ",");
970 if (het_job_resp->alias_list &&
971 het_job_resp->alias_list[j]) {
972 xstrcat(aliases,
973 het_job_resp->alias_list[j]);
974 } else {
975 xstrfmtcat(aliases, "%s:%s:%s",
976 node_name, node_name,
977 node_name);
978 }
979 }
980 if (cpus[cpu_inx] == het_job_resp->cpu_cnt[j]) {
981 reps[cpu_inx]++;
982 } else {
983 if (cpus[cpu_inx] != 0)
984 cpu_inx++;
985 cpus[cpu_inx] = het_job_resp->cpu_cnt[j];
986 reps[cpu_inx]++;
987 }
988 break;
989 }
990 list_iterator_destroy(resp_iter);
991 free(node_name);
992 }
993
994 cpu_inx++;
995 tmp = uint32_compressed_to_str(cpu_inx, cpus, reps);
996 if (setenv("SLURM_JOB_CPUS_PER_NODE", tmp, 1) < 0) {
997 error("%s: Unable to set SLURM_JOB_CPUS_PER_NODE in environment",
998 __func__);
999 }
1000 xfree(tmp);
1001
1002 if (aliases) {
1003 if (setenv("SLURM_NODE_ALIASES", aliases, 1) < 0) {
1004 error("%s: Unable to set SLURM_NODE_ALIASES in environment",
1005 __func__);
1006 }
1007 xfree(aliases);
1008 }
1009
1010 xfree(reps);
1011 xfree(cpus);
1012 hostset_destroy(hs);
1013 list_destroy(het_job_resp_list);
1014
1015 return het_job_nodelist;
1016 }
1017
create_srun_job(void ** p_job,bool * got_alloc,bool slurm_started,bool handle_signals)1018 extern void create_srun_job(void **p_job, bool *got_alloc,
1019 bool slurm_started, bool handle_signals)
1020 {
1021 resource_allocation_response_msg_t *resp;
1022 List job_resp_list = NULL, srun_job_list = NULL;
1023 List used_resp_list = NULL;
1024 ListIterator opt_iter, resp_iter;
1025 srun_job_t *job = NULL;
1026 int i, max_list_offset, max_het_job_offset, het_job_offset = -1,
1027 het_step_offset = 0;
1028 uint32_t my_job_id = 0, het_job_id = 0;
1029 char *het_job_nodelist = NULL;
1030 bool begin_error_logged = false;
1031 bool core_spec_error_logged = false;
1032 #ifdef HAVE_NATIVE_CRAY
1033 bool network_error_logged = false;
1034 #endif
1035 bool node_cnt_error_logged = false;
1036 bool x11_error_logged = false;
1037
1038 /*
1039 * now global "opt" should be filled in and available,
1040 * create a job from opt
1041 */
1042 if (sropt.test_only) {
1043 int rc = allocate_test();
1044 if (rc) {
1045 slurm_perror("allocation failure");
1046 exit (1);
1047 }
1048 exit (0);
1049
1050 } else if (sropt.no_alloc) {
1051 if (opt_list ||
1052 (sropt.het_grp_bits && (bit_fls(sropt.het_grp_bits) > 0)))
1053 fatal("--no-allocation option not supported for heterogeneous jobs");
1054 info("do not allocate resources");
1055 job = job_create_noalloc();
1056 if (job == NULL) {
1057 error("Job creation failure.");
1058 exit(error_exit);
1059 }
1060 if (create_job_step(job, false, &opt) < 0)
1061 exit(error_exit);
1062 } else if ((job_resp_list = existing_allocation())) {
1063 slurm_opt_t *opt_local;
1064
1065 max_list_offset = 0;
1066 max_het_job_offset = list_count(job_resp_list) - 1;
1067 if (opt_list) {
1068 opt_iter = list_iterator_create(opt_list);
1069 while ((opt_local = list_next(opt_iter))) {
1070 srun_opt_t *srun_opt = opt_local->srun_opt;
1071 xassert(srun_opt);
1072 if (srun_opt->het_grp_bits) {
1073 i = bit_fls(srun_opt->het_grp_bits);
1074 max_list_offset = MAX(max_list_offset,
1075 i);
1076 }
1077 }
1078 list_iterator_destroy(opt_iter);
1079 if (max_list_offset > max_het_job_offset) {
1080 error("Attempt to run a job step with het group value of %d, "
1081 "but the job allocation has maximum value of %d",
1082 max_list_offset, max_het_job_offset);
1083 exit(1);
1084 }
1085 }
1086 srun_job_list = list_create(NULL);
1087 used_resp_list = list_create(NULL);
1088 if (max_het_job_offset > 0)
1089 het_job_offset = 0;
1090 resp_iter = list_iterator_create(job_resp_list);
1091 while ((resp = list_next(resp_iter))) {
1092 bool merge_nodelist = true;
1093 if (my_job_id == 0) {
1094 my_job_id = resp->job_id;
1095 if (resp->working_cluster_rec)
1096 slurm_setup_remote_working_cluster(resp);
1097 }
1098 _print_job_information(resp);
1099 (void) get_next_opt(-2);
1100 /*
1101 * Check using het_job_offset here, but we use
1102 * het_step_offset for the job being added.
1103 */
1104 while ((opt_local = get_next_opt(het_job_offset))) {
1105 srun_opt_t *srun_opt = opt_local->srun_opt;
1106 xassert(srun_opt);
1107 if (merge_nodelist) {
1108 merge_nodelist = false;
1109 list_append(used_resp_list, resp);
1110 }
1111 if (slurm_option_set_by_env(opt_local, 'N') &&
1112 (opt_local->min_nodes > resp->node_cnt)) {
1113 /*
1114 * This signifies the job used the
1115 * --no-kill option and a node went DOWN
1116 * or it used a node count range
1117 * specification, was checkpointed from
1118 * one size and restarted at a different
1119 * size
1120 */
1121 if (!node_cnt_error_logged) {
1122 error("SLURM_JOB_NUM_NODES environment variable conflicts with allocated node count (%u != %u).",
1123 opt_local->min_nodes,
1124 resp->node_cnt);
1125 node_cnt_error_logged = true;
1126 }
1127 /*
1128 * Modify options to match resource
1129 * allocation.
1130 * NOTE: Some options are not supported
1131 */
1132 opt_local->min_nodes = resp->node_cnt;
1133 xfree(srun_opt->alloc_nodelist);
1134 if (!opt_local->ntasks_set) {
1135 opt_local->ntasks =
1136 opt_local->min_nodes;
1137 }
1138 }
1139 if (srun_opt->core_spec_set &&
1140 !core_spec_error_logged) {
1141 /*
1142 * NOTE: Silently ignore specialized
1143 * core count set with SLURM_CORE_SPEC
1144 * environment variable
1145 */
1146 error("Ignoring --core-spec value for a job step "
1147 "within an existing job. Set specialized cores "
1148 "at job allocation time.");
1149 core_spec_error_logged = true;
1150 }
1151 #ifdef HAVE_NATIVE_CRAY
1152 if (opt_local->network &&
1153 !network_error_logged) {
1154 if (slurm_option_set_by_env(opt_local,
1155 LONG_OPT_NETWORK)) {
1156 debug2("Ignoring SLURM_NETWORK value for a "
1157 "job step within an existing job. "
1158 "Using what was set at job "
1159 "allocation time. Most likely this "
1160 "variable was set by sbatch or salloc.");
1161 } else {
1162 error("Ignoring --network value for a job step "
1163 "within an existing job. Set network "
1164 "options at job allocation time.");
1165 }
1166 network_error_logged = true;
1167 }
1168 xfree(opt_local->network);
1169 /*
1170 * Here we send the het job groups to the
1171 * slurmctld to set up the interconnect
1172 * correctly. We only ever need to send it to
1173 * the first component of the step.
1174 */
1175 if (g_het_grp_bits)
1176 opt_local->network = bit_fmt_hexmask(
1177 g_het_grp_bits);
1178 #endif
1179
1180 if (srun_opt->exclusive)
1181 _step_opt_exclusive(opt_local);
1182 _set_env_vars(resp, het_step_offset);
1183 if (_validate_relative(resp, opt_local))
1184 exit(error_exit);
1185 if (opt_local->begin && !begin_error_logged) {
1186 error("--begin is ignored because nodes are already allocated.");
1187 begin_error_logged = true;
1188 }
1189 if (opt_local->x11 && !x11_error_logged) {
1190 error("Ignoring --x11 option for a job step within an "
1191 "existing job. Set x11 options at job allocation time.");
1192 x11_error_logged = true;
1193 }
1194 job = job_step_create_allocation(resp,
1195 opt_local);
1196 if (!job)
1197 exit(error_exit);
1198 if (max_het_job_offset > 0)
1199 job->het_job_offset = het_step_offset;
1200 list_append(srun_job_list, job);
1201 het_step_offset++;
1202 } /* While more option structures */
1203 het_job_offset++;
1204 } /* More hetjob components */
1205 list_iterator_destroy(resp_iter);
1206
1207 max_het_job_offset = get_max_het_group();
1208 het_job_offset = list_count(job_resp_list) - 1;
1209 if (max_het_job_offset > het_job_offset) {
1210 error("Requested het-group offset exceeds highest hetjob index (%d > %d)",
1211 max_het_job_offset, het_job_offset);
1212 exit(error_exit);
1213 }
1214 i = list_count(srun_job_list);
1215 if (i == 0) {
1216 error("No directives to start application on any available hetjob components");
1217 exit(error_exit);
1218 }
1219 if (i == 1)
1220 FREE_NULL_LIST(srun_job_list); /* Just use "job" */
1221 if (list_count(job_resp_list) > 1) {
1222 if (my_job_id)
1223 het_job_id = my_job_id;
1224 het_job_nodelist =
1225 _compress_het_job_nodelist(used_resp_list);
1226 }
1227 list_destroy(used_resp_list);
1228 if (_create_job_step(job, false, srun_job_list, het_job_id,
1229 het_job_nodelist) < 0) {
1230 if (*got_alloc)
1231 slurm_complete_job(my_job_id, 1);
1232 else
1233 _cancel_steps(srun_job_list);
1234 exit(error_exit);
1235 }
1236 xfree(het_job_nodelist);
1237 } else {
1238 /* Combined job allocation and job step launch */
1239 #if defined HAVE_FRONT_END
1240 uid_t my_uid = getuid();
1241 if ((my_uid != 0) &&
1242 (my_uid != slurm_get_slurm_user_id())) {
1243 error("srun task launch not supported on this system");
1244 exit(error_exit);
1245 }
1246 #endif
1247 if (slurm_option_set_by_cli(&opt, 'J'))
1248 setenvfs("SLURM_JOB_NAME=%s", opt.job_name);
1249 else if (!slurm_option_set_by_env(&opt, 'J') && sropt.argc)
1250 setenvfs("SLURM_JOB_NAME=%s", sropt.argv[0]);
1251
1252 if (opt_list) {
1253 job_resp_list = allocate_het_job_nodes(handle_signals);
1254 if (!job_resp_list)
1255 exit(error_exit);
1256 srun_job_list = list_create(NULL);
1257 opt_iter = list_iterator_create(opt_list);
1258 resp_iter = list_iterator_create(job_resp_list);
1259 while ((resp = list_next(resp_iter))) {
1260 slurm_opt_t *opt_local;
1261
1262 if (my_job_id == 0) {
1263 my_job_id = resp->job_id;
1264 *got_alloc = true;
1265 }
1266 opt_local = list_next(opt_iter);
1267 if (!opt_local)
1268 break;
1269 _print_job_information(resp);
1270 _set_env_vars(resp, ++het_job_offset);
1271 _set_env_vars2(resp, het_job_offset);
1272 if (_validate_relative(resp, opt_local)) {
1273 slurm_complete_job(my_job_id, 1);
1274 exit(error_exit);
1275 }
1276 job = job_create_allocation(resp, opt_local);
1277 job->het_job_offset = het_job_offset;
1278 list_append(srun_job_list, job);
1279 _set_step_opts(opt_local);
1280 }
1281 list_iterator_destroy(opt_iter);
1282 list_iterator_destroy(resp_iter);
1283 /* Continue support for old hetjob terminology. */
1284 setenvfs("SLURM_PACK_SIZE=%d", het_job_offset + 1);
1285 setenvfs("SLURM_HET_SIZE=%d", het_job_offset + 1);
1286 } else {
1287 if (!(resp = allocate_nodes(handle_signals, &opt)))
1288 exit(error_exit);
1289 *got_alloc = true;
1290 my_job_id = resp->job_id;
1291 _print_job_information(resp);
1292 _set_env_vars(resp, -1);
1293 if (_validate_relative(resp, &opt)) {
1294 slurm_complete_job(resp->job_id, 1);
1295 exit(error_exit);
1296 }
1297 job = job_create_allocation(resp, &opt);
1298 _set_step_opts(&opt);
1299 }
1300 if (srun_job_list && (list_count(srun_job_list) > 1) &&
1301 opt_list && (list_count(opt_list) > 1) && my_job_id) {
1302 het_job_id = my_job_id;
1303 het_job_nodelist =
1304 _compress_het_job_nodelist(job_resp_list);
1305 }
1306
1307 /*
1308 * Become --uid user
1309 */
1310 if (_become_user () < 0)
1311 fatal("Unable to assume uid=%u", opt.uid);
1312 if (_create_job_step(job, true, srun_job_list, het_job_id,
1313 het_job_nodelist) < 0) {
1314 slurm_complete_job(my_job_id, 1);
1315 exit(error_exit);
1316 }
1317 xfree(het_job_nodelist);
1318
1319 if (opt_list) {
1320 resp_iter = list_iterator_create(job_resp_list);
1321 while ((resp = list_next(resp_iter))) {
1322 slurm_free_resource_allocation_response_msg(
1323 resp);
1324 }
1325 list_iterator_destroy(resp_iter);
1326 } else {
1327 slurm_free_resource_allocation_response_msg(resp);
1328 }
1329 }
1330
1331 /*
1332 * Become --uid user
1333 */
1334 if (_become_user () < 0)
1335 fatal("Unable to assume uid=%u", opt.uid);
1336
1337 if (!slurm_started) {
1338 /*
1339 * Spawn process to ensure clean-up of job and/or step
1340 * on abnormal termination
1341 */
1342 shepherd_fd = _shepherd_spawn(job, srun_job_list, *got_alloc);
1343 }
1344
1345 if (opt_list)
1346 *p_job = (void *) srun_job_list;
1347 else
1348 *p_job = (void *) job;
1349
1350 if (job)
1351 _srun_cli_filter_post_submit(my_job_id, job->stepid);
1352 }
1353
pre_launch_srun_job(srun_job_t * job,bool slurm_started,bool handle_signals,slurm_opt_t * opt_local)1354 extern void pre_launch_srun_job(srun_job_t *job, bool slurm_started,
1355 bool handle_signals, slurm_opt_t *opt_local)
1356 {
1357 if (handle_signals && !signal_thread) {
1358 slurm_thread_create(&signal_thread, _srun_signal_mgr, job);
1359 }
1360
1361 /* if running from poe This already happened in srun. */
1362 if (slurm_started)
1363 return;
1364
1365 _run_srun_prolog(job);
1366 if (_call_spank_local_user(job, opt_local) < 0) {
1367 error("Failure in local plugin stack");
1368 slurm_step_launch_abort(job->step_ctx);
1369 exit(error_exit);
1370 }
1371
1372 env_array_merge(&job->env, (const char **)environ);
1373 }
1374
fini_srun(srun_job_t * job,bool got_alloc,uint32_t * global_rc,bool slurm_started)1375 extern void fini_srun(srun_job_t *job, bool got_alloc, uint32_t *global_rc,
1376 bool slurm_started)
1377 {
1378 /* If running from poe, most of this already happened in srun. */
1379 if (slurm_started)
1380 goto cleanup;
1381 if (got_alloc) {
1382 cleanup_allocation();
1383
1384 /* Tell slurmctld that we were cancelled */
1385 if (job->state >= SRUN_JOB_CANCELLED)
1386 slurm_complete_job(job->jobid, NO_VAL);
1387 else
1388 slurm_complete_job(job->jobid, *global_rc);
1389 }
1390 _shepherd_notify(shepherd_fd);
1391
1392 cleanup:
1393 if (signal_thread) {
1394 srun_shutdown = true;
1395 pthread_kill(signal_thread, SIGINT);
1396 pthread_join(signal_thread, NULL);
1397 }
1398
1399 if (!slurm_started)
1400 _run_srun_epilog(job);
1401
1402 slurm_step_ctx_destroy(job->step_ctx);
1403
1404 if (WIFEXITED(*global_rc))
1405 *global_rc = WEXITSTATUS(*global_rc);
1406 else if (WIFSIGNALED(*global_rc))
1407 *global_rc = 128 + WTERMSIG(*global_rc);
1408
1409 mpir_cleanup();
1410 }
1411
1412 void
update_job_state(srun_job_t * job,srun_job_state_t state)1413 update_job_state(srun_job_t *job, srun_job_state_t state)
1414 {
1415 slurm_mutex_lock(&job->state_mutex);
1416 if (job->state < state) {
1417 job->state = state;
1418 slurm_cond_signal(&job->state_cond);
1419
1420 }
1421 slurm_mutex_unlock(&job->state_mutex);
1422 return;
1423 }
1424
1425 srun_job_state_t
job_state(srun_job_t * job)1426 job_state(srun_job_t *job)
1427 {
1428 srun_job_state_t state;
1429 slurm_mutex_lock(&job->state_mutex);
1430 state = job->state;
1431 slurm_mutex_unlock(&job->state_mutex);
1432 return state;
1433 }
1434
1435
1436 void
job_force_termination(srun_job_t * job)1437 job_force_termination(srun_job_t *job)
1438 {
1439 static int kill_sent = 0;
1440 static time_t last_msg = 0;
1441
1442 if (kill_sent == 0) {
1443 info("forcing job termination");
1444 /* Sends SIGKILL to tasks directly */
1445 update_job_state(job, SRUN_JOB_FORCETERM);
1446 } else {
1447 time_t now = time(NULL);
1448 if (last_msg != now) {
1449 info("job abort in progress");
1450 last_msg = now;
1451 }
1452 if (kill_sent == 1) {
1453 /* Try sending SIGKILL through slurmctld */
1454 slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
1455 }
1456 }
1457 kill_sent++;
1458 }
1459
_set_ntasks(allocation_info_t * ai,slurm_opt_t * opt_local)1460 static void _set_ntasks(allocation_info_t *ai, slurm_opt_t *opt_local)
1461 {
1462 int cnt = 0;
1463
1464 if (opt_local->ntasks_set)
1465 return;
1466
1467 if (opt_local->ntasks_per_node != NO_VAL) {
1468 cnt = ai->nnodes * opt_local->ntasks_per_node;
1469 opt_local->ntasks_set = true; /* implicit */
1470 } else if (opt_local->cpus_set) {
1471 int i;
1472
1473 for (i = 0; i < ai->num_cpu_groups; i++)
1474 cnt += (ai->cpu_count_reps[i] *
1475 (ai->cpus_per_node[i] /
1476 opt_local->cpus_per_task));
1477 opt_local->ntasks_set = true; /* implicit */
1478 }
1479
1480 opt_local->ntasks = (cnt < ai->nnodes) ? ai->nnodes : cnt;
1481 }
1482
1483 /*
1484 * Create an srun job structure from a resource allocation response msg
1485 */
_job_create_structure(allocation_info_t * ainfo,slurm_opt_t * opt_local)1486 static srun_job_t *_job_create_structure(allocation_info_t *ainfo,
1487 slurm_opt_t *opt_local)
1488 {
1489 srun_job_t *job = xmalloc(sizeof(srun_job_t));
1490 int i;
1491
1492 _set_ntasks(ainfo, opt_local);
1493 debug2("creating job with %d tasks", opt_local->ntasks);
1494
1495 slurm_mutex_init(&job->state_mutex);
1496 slurm_cond_init(&job->state_cond, NULL);
1497 job->state = SRUN_JOB_INIT;
1498
1499 job->alias_list = xstrdup(ainfo->alias_list);
1500 job->nodelist = xstrdup(ainfo->nodelist);
1501 job->partition = xstrdup(ainfo->partition);
1502 job->stepid = ainfo->stepid;
1503 job->het_job_id = NO_VAL;
1504 job->het_job_nnodes = NO_VAL;
1505 job->het_job_ntasks = NO_VAL;
1506 job->het_job_offset = NO_VAL;
1507 job->het_job_task_offset = NO_VAL;
1508 job->nhosts = ainfo->nnodes;
1509
1510 #if defined HAVE_FRONT_END
1511 /* Limited job step support */
1512 opt_local->overcommit = true;
1513 #else
1514 if (opt_local->min_nodes > job->nhosts) {
1515 error("Only allocated %d nodes asked for %d",
1516 job->nhosts, opt_local->min_nodes);
1517 if (opt_local->exclude) {
1518 /* When resources are pre-allocated and some nodes
1519 * are explicitly excluded, this error can occur. */
1520 error("Are required nodes explicitly excluded?");
1521 }
1522 xfree(job);
1523 return NULL;
1524 }
1525 if ((ainfo->cpus_per_node == NULL) ||
1526 (ainfo->cpu_count_reps == NULL)) {
1527 error("cpus_per_node array is not set");
1528 xfree(job);
1529 return NULL;
1530 }
1531 #endif
1532 job->select_jobinfo = ainfo->select_jobinfo;
1533 job->jobid = ainfo->jobid;
1534
1535 job->ntasks = opt_local->ntasks;
1536 job->ntasks_per_board = ainfo->ntasks_per_board;
1537 job->ntasks_per_core = ainfo->ntasks_per_core;
1538 job->ntasks_per_socket = ainfo->ntasks_per_socket;
1539
1540 /*
1541 * If cpus_per_task is set then get the exact count of cpus for the
1542 * requested step (we might very well use less, especially if
1543 * --exclusive is used). Else get the total for the allocation given.
1544 */
1545 if (opt_local->cpus_set)
1546 job->cpu_count = opt_local->ntasks * opt_local->cpus_per_task;
1547 else {
1548 for (i = 0; i < ainfo->num_cpu_groups; i++) {
1549 job->cpu_count += ainfo->cpus_per_node[i] *
1550 ainfo->cpu_count_reps[i];
1551 }
1552 }
1553
1554 job->rc = -1;
1555
1556 job_update_io_fnames(job, opt_local);
1557
1558 return (job);
1559 }
1560
job_update_io_fnames(srun_job_t * job,slurm_opt_t * opt_local)1561 extern void job_update_io_fnames(srun_job_t *job, slurm_opt_t *opt_local)
1562 {
1563 job->ifname = fname_create(job, opt_local->ifname, opt_local->ntasks);
1564 job->ofname = fname_create(job, opt_local->ofname, opt_local->ntasks);
1565 job->efname = opt_local->efname ?
1566 fname_create(job, opt_local->efname, opt_local->ntasks) :
1567 job->ofname;
1568 }
1569
1570 static char *
_normalize_hostlist(const char * hostlist)1571 _normalize_hostlist(const char *hostlist)
1572 {
1573 char *buf = NULL;
1574 hostlist_t hl = hostlist_create(hostlist);
1575
1576 if (hl) {
1577 buf = hostlist_ranged_string_xmalloc(hl);
1578 hostlist_destroy(hl);
1579 }
1580 if (!buf)
1581 return xstrdup(hostlist);
1582
1583 return buf;
1584 }
1585
_become_user(void)1586 static int _become_user (void)
1587 {
1588 char *user;
1589
1590 /* Already the user, so there's nothing to change. Return early. */
1591 if (opt.uid == getuid())
1592 return 0;
1593
1594 if (!(user = uid_to_string_or_null(opt.uid))) {
1595 xfree(user);
1596 return (error ("Invalid user id %u: %m", opt.uid));
1597 }
1598
1599 if ((opt.gid != getgid()) && (setgid(opt.gid) < 0)) {
1600 xfree(user);
1601 return (error ("setgid: %m"));
1602 }
1603
1604 if (initgroups(user, gid_from_uid(opt.uid)))
1605 return (error ("initgroups: %m"));
1606
1607 xfree(user);
1608
1609 if (setuid (opt.uid) < 0)
1610 return (error ("setuid: %m"));
1611
1612 return (0);
1613 }
1614
_call_spank_local_user(srun_job_t * job,slurm_opt_t * opt_local)1615 static int _call_spank_local_user(srun_job_t *job, slurm_opt_t *opt_local)
1616 {
1617 srun_opt_t *srun_opt = opt_local->srun_opt;
1618 struct spank_launcher_job_info info[1];
1619 xassert(srun_opt);
1620
1621 info->argc = srun_opt->argc;
1622 info->argv = srun_opt->argv;
1623 info->gid = opt_local->gid;
1624 info->jobid = job->jobid;
1625 info->stepid = job->stepid;
1626 info->step_layout = launch_common_get_slurm_step_layout(job);
1627 info->uid = opt_local->uid;
1628
1629 return spank_local_user(info);
1630 }
1631
_default_sigaction(int sig)1632 static void _default_sigaction(int sig)
1633 {
1634 struct sigaction act;
1635 if (sigaction(sig, NULL, &act)) {
1636 error("sigaction(%d): %m", sig);
1637 return;
1638 }
1639 if (act.sa_handler != SIG_IGN)
1640 return;
1641
1642 act.sa_handler = SIG_DFL;
1643 if (sigaction(sig, &act, NULL))
1644 error("sigaction(%d): %m", sig);
1645 }
1646
1647 /* Return the number of microseconds between tv1 and tv2 with a maximum
1648 * a maximum value of 10,000,000 to prevent overflows */
_diff_tv_str(struct timeval * tv1,struct timeval * tv2)1649 static long _diff_tv_str(struct timeval *tv1, struct timeval *tv2)
1650 {
1651 long delta_t;
1652
1653 delta_t = MIN((tv2->tv_sec - tv1->tv_sec), 10);
1654 delta_t *= USEC_IN_SEC;
1655 delta_t += tv2->tv_usec - tv1->tv_usec;
1656 return delta_t;
1657 }
1658
_handle_intr(srun_job_t * job)1659 static void _handle_intr(srun_job_t *job)
1660 {
1661 static struct timeval last_intr = { 0, 0 };
1662 static struct timeval last_intr_sent = { 0, 0 };
1663 struct timeval now;
1664
1665 gettimeofday(&now, NULL);
1666 if (!sropt.quit_on_intr && (_diff_tv_str(&last_intr, &now) > 1000000)) {
1667 if (sropt.disable_status) {
1668 info("sending Ctrl-C to job %u.%u",
1669 job->jobid, job->stepid);
1670 launch_g_fwd_signal(SIGINT);
1671 } else if (job->state < SRUN_JOB_FORCETERM) {
1672 info("interrupt (one more within 1 sec to abort)");
1673 launch_g_print_status();
1674 } else {
1675 info("interrupt (abort already in progress)");
1676 launch_g_print_status();
1677 }
1678 last_intr = now;
1679 } else { /* second Ctrl-C in half as many seconds */
1680 update_job_state(job, SRUN_JOB_CANCELLED);
1681 /* terminate job */
1682 if (job->state < SRUN_JOB_FORCETERM) {
1683 if (_diff_tv_str(&last_intr_sent, &now) < 1000000) {
1684 job_force_termination(job);
1685 launch_g_fwd_signal(SIGKILL);
1686 return;
1687 }
1688
1689 info("sending Ctrl-C to job %u.%u",
1690 job->jobid, job->stepid);
1691 last_intr_sent = now;
1692 launch_g_fwd_signal(SIGINT);
1693 } else
1694 job_force_termination(job);
1695
1696 launch_g_fwd_signal(SIGKILL);
1697 }
1698 }
1699
_handle_pipe(void)1700 static void _handle_pipe(void)
1701 {
1702 static int ending = 0;
1703
1704 if (ending)
1705 return;
1706 ending = 1;
1707 launch_g_fwd_signal(SIGKILL);
1708 }
1709
1710
_print_job_information(resource_allocation_response_msg_t * resp)1711 static void _print_job_information(resource_allocation_response_msg_t *resp)
1712 {
1713 int i;
1714 char *str = NULL;
1715 char *sep = "";
1716
1717 if (!opt.verbose)
1718 return;
1719
1720 xstrfmtcat(str, "jobid %u: nodes(%u):`%s', cpu counts: ",
1721 resp->job_id, resp->node_cnt, resp->node_list);
1722
1723 for (i = 0; i < resp->num_cpu_groups; i++) {
1724 xstrfmtcat(str, "%s%u(x%u)",
1725 sep, resp->cpus_per_node[i],
1726 resp->cpu_count_reps[i]);
1727 sep = ",";
1728 }
1729 verbose("%s", str);
1730 xfree(str);
1731 }
1732
1733 /* NOTE: Executed once for entire hetjob */
_run_srun_epilog(srun_job_t * job)1734 static void _run_srun_epilog (srun_job_t *job)
1735 {
1736 int rc;
1737
1738 if (sropt.epilog && xstrcasecmp(sropt.epilog, "none") != 0) {
1739 if (setenvf(NULL, "SLURM_SCRIPT_CONTEXT", "epilog_srun") < 0)
1740 error("unable to set SLURM_SCRIPT_CONTEXT in environment");
1741 rc = _run_srun_script(job, sropt.epilog);
1742 debug("srun epilog rc = %d", rc);
1743 }
1744 }
1745
_run_srun_prolog(srun_job_t * job)1746 static void _run_srun_prolog (srun_job_t *job)
1747 {
1748 int rc;
1749
1750 if (sropt.prolog && xstrcasecmp(sropt.prolog, "none") != 0) {
1751 if (setenvf(NULL, "SLURM_SCRIPT_CONTEXT", "prolog_srun") < 0)
1752 error("unable to set SLURM_SCRIPT_CONTEXT in environment");
1753 rc = _run_srun_script(job, sropt.prolog);
1754 debug("srun prolog rc = %d", rc);
1755 }
1756 }
1757
_run_srun_script(srun_job_t * job,char * script)1758 static int _run_srun_script (srun_job_t *job, char *script)
1759 {
1760 int status;
1761 pid_t cpid;
1762 int i;
1763 char **args = NULL;
1764
1765 if (script == NULL || script[0] == '\0')
1766 return 0;
1767
1768 if (access(script, R_OK | X_OK) < 0) {
1769 info("Access denied for %s: %m", script);
1770 return 0;
1771 }
1772
1773 if ((cpid = fork()) < 0) {
1774 error ("run_srun_script: fork: %m");
1775 return -1;
1776 }
1777 if (cpid == 0) {
1778 /*
1779 * set the prolog/epilog scripts command line arguments to the
1780 * application arguments (for last hetjob component), but
1781 * shifted one higher
1782 */
1783 args = xmalloc(sizeof(char *) * 1024);
1784 args[0] = script;
1785 for (i = 0; i < sropt.argc; i++) {
1786 args[i+1] = sropt.argv[i];
1787 }
1788 args[i+1] = NULL;
1789 execv(script, args);
1790 error("help! %m");
1791 _exit(127);
1792 }
1793
1794 do {
1795 if (waitpid(cpid, &status, 0) < 0) {
1796 if (errno == EINTR)
1797 continue;
1798 error("waitpid: %m");
1799 return 0;
1800 } else
1801 return status;
1802 } while(1);
1803
1804 /* NOTREACHED */
1805 }
1806
_build_key(char * base,int het_job_offset)1807 static char *_build_key(char *base, int het_job_offset)
1808 {
1809 char *key = NULL;
1810
1811 if (het_job_offset == -1)
1812 key = xstrdup(base);
1813 else
1814 xstrfmtcat(key, "%s_PACK_GROUP_%d", base, het_job_offset);
1815
1816 return key;
1817 }
1818
_set_env_vars(resource_allocation_response_msg_t * resp,int het_job_offset)1819 static void _set_env_vars(resource_allocation_response_msg_t *resp,
1820 int het_job_offset)
1821 {
1822 char *key, *value, *tmp;
1823 int i;
1824
1825 key = _build_key("SLURM_JOB_CPUS_PER_NODE", het_job_offset);
1826 if (!getenv(key)) {
1827 tmp = uint32_compressed_to_str(resp->num_cpu_groups,
1828 resp->cpus_per_node,
1829 resp->cpu_count_reps);
1830 if (setenvf(NULL, key, "%s", tmp) < 0)
1831 error("unable to set %s in environment", key);
1832 xfree(tmp);
1833 }
1834 xfree(key);
1835
1836 key = _build_key("SLURM_NODE_ALIASES", het_job_offset);
1837 if (resp->alias_list) {
1838 if (setenv(key, resp->alias_list, 1) < 0)
1839 error("unable to set %s in environment", key);
1840 } else {
1841 unsetenv(key);
1842 }
1843 xfree(key);
1844
1845 if (resp->env_size) { /* Used to set Burst Buffer environment */
1846 for (i = 0; i < resp->env_size; i++) {
1847 tmp = xstrdup(resp->environment[i]);
1848 key = tmp;
1849 value = strchr(tmp, '=');
1850 if (value) {
1851 value[0] = '\0';
1852 value++;
1853 setenv(key, value, 0);
1854 }
1855 xfree(tmp);
1856 }
1857 }
1858
1859 return;
1860 }
1861
1862 /*
1863 * Set some hetjob environment variables for combined job & step allocation
1864 */
_set_env_vars2(resource_allocation_response_msg_t * resp,int het_job_offset)1865 static void _set_env_vars2(resource_allocation_response_msg_t *resp,
1866 int het_job_offset)
1867 {
1868 char *key;
1869
1870 if (resp->account) {
1871 key = _build_key("SLURM_JOB_ACCOUNT", het_job_offset);
1872 if (!getenv(key) &&
1873 (setenvf(NULL, key, "%s", resp->account) < 0)) {
1874 error("unable to set %s in environment", key);
1875 }
1876 xfree(key);
1877 }
1878
1879 key = _build_key("SLURM_JOB_ID", het_job_offset);
1880 if (!getenv(key) &&
1881 (setenvf(NULL, key, "%u", resp->job_id) < 0)) {
1882 error("unable to set %s in environment", key);
1883 }
1884 xfree(key);
1885
1886 key = _build_key("SLURM_JOB_NODELIST", het_job_offset);
1887 if (!getenv(key) &&
1888 (setenvf(NULL, key, "%s", resp->node_list) < 0)) {
1889 error("unable to set %s in environment", key);
1890 }
1891 xfree(key);
1892
1893 key = _build_key("SLURM_JOB_PARTITION", het_job_offset);
1894 if (!getenv(key) &&
1895 (setenvf(NULL, key, "%s", resp->partition) < 0)) {
1896 error("unable to set %s in environment", key);
1897 }
1898 xfree(key);
1899
1900 if (resp->qos) {
1901 key = _build_key("SLURM_JOB_QOS", het_job_offset);
1902 if (!getenv(key) &&
1903 (setenvf(NULL, key, "%s", resp->qos) < 0)) {
1904 error("unable to set %s in environment", key);
1905 }
1906 xfree(key);
1907 }
1908
1909 if (resp->resv_name) {
1910 key = _build_key("SLURM_JOB_RESERVATION", het_job_offset);
1911 if (!getenv(key) &&
1912 (setenvf(NULL, key, "%s", resp->resv_name) < 0)) {
1913 error("unable to set %s in environment", key);
1914 }
1915 xfree(key);
1916 }
1917
1918 if (resp->alias_list) {
1919 key = _build_key("SLURM_NODE_ALIASES", het_job_offset);
1920 if (!getenv(key) &&
1921 (setenvf(NULL, key, "%s", resp->alias_list) < 0)) {
1922 error("unable to set %s in environment", key);
1923 }
1924 xfree(key);
1925 }
1926 }
1927
1928 /*
1929 * _set_prio_process_env
1930 *
1931 * Set the internal SLURM_PRIO_PROCESS environment variable to support
1932 * the propagation of the users nice value and the "PropagatePrioProcess"
1933 * config keyword.
1934 */
_set_prio_process_env(void)1935 static void _set_prio_process_env(void)
1936 {
1937 int retval;
1938
1939 errno = 0; /* needed to detect a real failure since prio can be -1 */
1940
1941 if ((retval = getpriority (PRIO_PROCESS, 0)) == -1) {
1942 if (errno) {
1943 error ("getpriority(PRIO_PROCESS): %m");
1944 return;
1945 }
1946 }
1947
1948 if (setenvf (NULL, "SLURM_PRIO_PROCESS", "%d", retval) < 0) {
1949 error ("unable to set SLURM_PRIO_PROCESS in environment");
1950 return;
1951 }
1952
1953 debug ("propagating SLURM_PRIO_PROCESS=%d", retval);
1954 }
1955
1956 /* Set SLURM_RLIMIT_* environment variables with current resource
1957 * limit values, reset RLIMIT_NOFILE to maximum possible value */
_set_rlimit_env(void)1958 static int _set_rlimit_env(void)
1959 {
1960 int rc = SLURM_SUCCESS;
1961 struct rlimit rlim[1];
1962 unsigned long cur;
1963 char name[64], *format;
1964 slurm_rlimits_info_t *rli;
1965
1966 /* Modify limits with any command-line options */
1967 if (sropt.propagate
1968 && parse_rlimits(sropt.propagate, PROPAGATE_RLIMITS)) {
1969 error( "--propagate=%s is not valid.", sropt.propagate );
1970 exit(error_exit);
1971 }
1972
1973 for (rli = get_slurm_rlimits_info(); rli->name != NULL; rli++ ) {
1974
1975 if (rli->propagate_flag != PROPAGATE_RLIMITS)
1976 continue;
1977
1978 if (getrlimit (rli->resource, rlim) < 0) {
1979 error ("getrlimit (RLIMIT_%s): %m", rli->name);
1980 rc = SLURM_ERROR;
1981 continue;
1982 }
1983
1984 cur = (unsigned long) rlim->rlim_cur;
1985 snprintf(name, sizeof(name), "SLURM_RLIMIT_%s", rli->name);
1986 if (sropt.propagate && (rli->propagate_flag == PROPAGATE_RLIMITS))
1987 /*
1988 * Prepend 'U' to indicate user requested propagate
1989 */
1990 format = "U%lu";
1991 else
1992 format = "%lu";
1993
1994 if (setenvf (NULL, name, format, cur) < 0) {
1995 error ("unable to set %s in environment", name);
1996 rc = SLURM_ERROR;
1997 continue;
1998 }
1999
2000 debug ("propagating RLIMIT_%s=%lu", rli->name, cur);
2001 }
2002
2003 /*
2004 * Now increase NOFILE to the max available for this srun
2005 */
2006 rlimits_maximize_nofile();
2007
2008 return rc;
2009 }
2010
2011 /* Set SLURM_CLUSTER_NAME< SLURM_SUBMIT_DIR and SLURM_SUBMIT_HOST environment
2012 * variables within current state */
_set_submit_dir_env(void)2013 static void _set_submit_dir_env(void)
2014 {
2015 char buf[MAXPATHLEN + 1], host[256];
2016 char *cluster_name;
2017
2018 cluster_name = slurm_get_cluster_name();
2019 if (cluster_name) {
2020 if (setenvf(NULL, "SLURM_CLUSTER_NAME", "%s", cluster_name) < 0)
2021 error("unable to set SLURM_CLUSTER_NAME in environment");
2022 xfree(cluster_name);
2023 }
2024
2025 if ((getcwd(buf, MAXPATHLEN)) == NULL)
2026 error("getcwd failed: %m");
2027 else if (setenvf(NULL, "SLURM_SUBMIT_DIR", "%s", buf) < 0)
2028 error("unable to set SLURM_SUBMIT_DIR in environment");
2029
2030 if ((gethostname(host, sizeof(host))))
2031 error("gethostname_short failed: %m");
2032 else if (setenvf(NULL, "SLURM_SUBMIT_HOST", "%s", host) < 0)
2033 error("unable to set SLURM_SUBMIT_HOST in environment");
2034 }
2035
2036 /* Set some environment variables with current state */
_set_umask_env(void)2037 static int _set_umask_env(void)
2038 {
2039 if (!getenv("SRUN_DEBUG")) { /* do not change current value */
2040 /* NOTE: Default debug level is 3 (info) */
2041 int log_level = LOG_LEVEL_INFO + opt.verbose - opt.quiet;
2042
2043 if (setenvf(NULL, "SRUN_DEBUG", "%d", log_level) < 0)
2044 error ("unable to set SRUN_DEBUG in environment");
2045 }
2046
2047 if (!getenv("SLURM_UMASK")) { /* do not change current value */
2048 char mask_char[5];
2049 mode_t mask;
2050
2051 mask = (int)umask(0);
2052 umask(mask);
2053
2054 sprintf(mask_char, "0%d%d%d",
2055 ((mask>>6)&07), ((mask>>3)&07), mask&07);
2056 if (setenvf(NULL, "SLURM_UMASK", "%s", mask_char) < 0) {
2057 error ("unable to set SLURM_UMASK in environment");
2058 return SLURM_ERROR;
2059 }
2060 debug ("propagating UMASK=%s", mask_char);
2061 }
2062
2063 return SLURM_SUCCESS;
2064 }
2065
_shepherd_notify(int shepherd_fd)2066 static void _shepherd_notify(int shepherd_fd)
2067 {
2068 int rc;
2069
2070 while (1) {
2071 rc = write(shepherd_fd, "", 1);
2072 if (rc == -1) {
2073 if ((errno == EAGAIN) || (errno == EINTR))
2074 continue;
2075 error("write(shepherd): %m");
2076 }
2077 break;
2078 }
2079 close(shepherd_fd);
2080 }
2081
_shepherd_spawn(srun_job_t * job,List srun_job_list,bool got_alloc)2082 static int _shepherd_spawn(srun_job_t *job, List srun_job_list, bool got_alloc)
2083 {
2084 int shepherd_pipe[2], rc;
2085 pid_t shepherd_pid;
2086 char buf[1];
2087
2088 if (pipe(shepherd_pipe)) {
2089 error("pipe: %m");
2090 return -1;
2091 }
2092
2093 shepherd_pid = fork();
2094 if (shepherd_pid == -1) {
2095 error("fork: %m");
2096 return -1;
2097 }
2098 if (shepherd_pid != 0) {
2099 close(shepherd_pipe[0]);
2100 return shepherd_pipe[1];
2101 }
2102
2103 /* Wait for parent to notify of completion or I/O error on abort */
2104 close(shepherd_pipe[1]);
2105 while (1) {
2106 rc = read(shepherd_pipe[0], buf, 1);
2107 if (rc == 1) {
2108 _exit(0);
2109 } else if (rc == 0) {
2110 break; /* EOF */
2111 } else if (rc == -1) {
2112 if ((errno == EAGAIN) || (errno == EINTR))
2113 continue;
2114 break;
2115 }
2116 }
2117
2118 if (srun_job_list) {
2119 ListIterator job_iter;
2120 job_iter = list_iterator_create(srun_job_list);
2121 while ((job = list_next(job_iter))) {
2122 (void) slurm_kill_job_step(job->jobid, job->stepid,
2123 SIGKILL);
2124 if (got_alloc)
2125 slurm_complete_job(job->jobid, NO_VAL);
2126 }
2127 list_iterator_destroy(job_iter);
2128 } else {
2129 (void) slurm_kill_job_step(job->jobid, job->stepid, SIGKILL);
2130 if (got_alloc)
2131 slurm_complete_job(job->jobid, NO_VAL);
2132 }
2133
2134 _exit(0);
2135 return -1;
2136 }
2137
2138 /* _srun_signal_mgr - Process daemon-wide signals */
_srun_signal_mgr(void * job_ptr)2139 static void *_srun_signal_mgr(void *job_ptr)
2140 {
2141 int sig;
2142 int i, rc;
2143 sigset_t set;
2144 srun_job_t *job = (srun_job_t *)job_ptr;
2145
2146 /* Make sure no required signals are ignored (possibly inherited) */
2147 for (i = 0; sig_array[i]; i++)
2148 _default_sigaction(sig_array[i]);
2149 while (!srun_shutdown) {
2150 xsignal_sigset_create(sig_array, &set);
2151 rc = sigwait(&set, &sig);
2152 if (rc == EINTR)
2153 continue;
2154 switch (sig) {
2155 case SIGINT:
2156 if (!srun_shutdown)
2157 _handle_intr(job);
2158 break;
2159 case SIGQUIT:
2160 info("Quit");
2161 /* continue with slurm_step_launch_abort */
2162 case SIGTERM:
2163 case SIGHUP:
2164 /* No need to call job_force_termination here since we
2165 * are ending the job now and we don't need to update
2166 * the state. */
2167 info("forcing job termination");
2168 launch_g_fwd_signal(SIGKILL);
2169 break;
2170 case SIGCONT:
2171 info("got SIGCONT");
2172 break;
2173 case SIGPIPE:
2174 _handle_pipe();
2175 break;
2176 case SIGALRM:
2177 if (srun_max_timer) {
2178 info("First task exited %ds ago", sropt.max_wait);
2179 launch_g_print_status();
2180 launch_g_step_terminate();
2181 }
2182 break;
2183 default:
2184 launch_g_fwd_signal(sig);
2185 break;
2186 }
2187 }
2188 return NULL;
2189 }
2190
2191 /* if srun_opt->exclusive is set, disable user task layout controls */
_step_opt_exclusive(slurm_opt_t * opt_local)2192 static void _step_opt_exclusive(slurm_opt_t *opt_local)
2193 {
2194 srun_opt_t *srun_opt = opt_local->srun_opt;
2195 xassert(srun_opt);
2196
2197 if (!opt_local->ntasks_set) {
2198 error("--ntasks must be set with --exclusive");
2199 exit(error_exit);
2200 }
2201 if (srun_opt->relative != NO_VAL) {
2202 error("--relative disabled, incompatible with --exclusive");
2203 exit(error_exit);
2204 }
2205 if (opt_local->exclude) {
2206 error("--exclude is incompatible with --exclusive");
2207 exit(error_exit);
2208 }
2209 }
2210
_validate_relative(resource_allocation_response_msg_t * resp,slurm_opt_t * opt_local)2211 static int _validate_relative(resource_allocation_response_msg_t *resp,
2212 slurm_opt_t *opt_local)
2213 {
2214 srun_opt_t *srun_opt = opt_local->srun_opt;
2215 xassert(srun_opt);
2216
2217 if ((srun_opt->relative != NO_VAL) &&
2218 ((srun_opt->relative + opt_local->min_nodes)
2219 > resp->node_cnt)) {
2220 if (slurm_option_set_by_cli(opt_local, 'N')) {
2221 /* -N command line option used */
2222 error("--relative and --nodes option incompatible "
2223 "with count of allocated nodes (%d+%d>%d)",
2224 srun_opt->relative,
2225 opt_local->min_nodes,
2226 resp->node_cnt);
2227 } else { /* SLURM_JOB_NUM_NODES option used */
2228 error("--relative and SLURM_JOB_NUM_NODES option incompatible with count of allocated nodes (%d+%d>%d)",
2229 srun_opt->relative,
2230 opt_local->min_nodes,
2231 resp->node_cnt);
2232 }
2233 return SLURM_ERROR;
2234 }
2235 return SLURM_SUCCESS;
2236 }
2237
_call_spank_fini(void)2238 static void _call_spank_fini(void)
2239 {
2240 if (-1 != shepherd_fd)
2241 spank_fini(NULL);
2242 }
2243
2244 /*
2245 * Run cli_filter_post_submit on all opt structures
2246 * Convenience function since this might need to run in two spots
2247 */
_srun_cli_filter_post_submit(uint32_t jobid,uint32_t stepid)2248 static void _srun_cli_filter_post_submit(uint32_t jobid, uint32_t stepid)
2249 {
2250 static bool post_submit_ran = false;
2251 int idx = 0, components = 1;
2252
2253 if (post_submit_ran)
2254 return;
2255
2256 if (opt_list)
2257 components = list_count(opt_list);
2258
2259 for (idx = 0; idx < components; idx++)
2260 cli_filter_plugin_post_submit(idx, jobid, stepid);
2261
2262 post_submit_ran = true;
2263 }
2264