1 /*****************************************************************************\
2 * launch_slurm.c - Define job launch using slurm.
3 *****************************************************************************
4 * Copyright (C) 2012-2017 SchedMD LLC
5 * Written by Danny Auble <da@schedmd.com>
6 *
7 * This file is part of Slurm, a resource management program.
8 * For details, see <https://slurm.schedmd.com/>.
9 * Please also read the included file: DISCLAIMER.
10 *
11 * Slurm is free software; you can redistribute it and/or modify it under
12 * the terms of the GNU General Public License as published by the Free
13 * Software Foundation; either version 2 of the License, or (at your option)
14 * any later version.
15 *
16 * In addition, as a special exception, the copyright holders give permission
17 * to link the code of portions of this program with the OpenSSL library under
18 * certain conditions as described in each individual source file, and
19 * distribute linked combinations including the two. You must obey the GNU
20 * General Public License in all respects for all of the code used other than
21 * OpenSSL. If you modify file(s) with this exception, you may extend this
22 * exception to your version of the file(s), but you are not obligated to do
23 * so. If you do not wish to do so, delete this exception statement from your
24 * version. If you delete this exception statement from all source files in
25 * the program, then also delete it here.
26 *
27 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
28 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
29 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
30 * details.
31 *
32 * You should have received a copy of the GNU General Public License along
33 * with Slurm; if not, write to the Free Software Foundation, Inc.,
34 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
35 \*****************************************************************************/
36
37 #include "config.h"
38
39 #define _GNU_SOURCE
40
41 #include <fcntl.h>
42 #include <stdlib.h>
43 #include <string.h>
44 #include <sys/stat.h>
45
46 #include "src/common/slurm_opt.h"
47 #include "src/common/slurm_xlator.h"
48 #include "src/common/slurm_resource_info.h"
49 #include "src/api/pmi_server.h"
50 #include "src/srun/libsrun/allocate.h"
51 #include "src/srun/libsrun/fname.h"
52 #include "src/srun/libsrun/launch.h"
53 #include "src/srun/libsrun/multi_prog.h"
54
55 #include "src/plugins/launch/slurm/task_state.h"
56
57 #ifndef OPEN_MPI_PORT_ERROR
58 /* This exit code indicates the launched Open MPI tasks could
59 * not open the reserved port. It was already open by some
60 * other process. */
61 #define OPEN_MPI_PORT_ERROR 108
62 #endif
63
64 #define MAX_STEP_RETRIES 4
65
66 /*
67 * These variables are required by the generic plugin interface. If they
68 * are not found in the plugin, the plugin loader will ignore it.
69 *
70 * plugin_name - a string giving a human-readable description of the
71 * plugin. There is no maximum length, but the symbol must refer to
72 * a valid string.
73 *
74 * plugin_type - a string suggesting the type of the plugin or its
75 * applicability to a particular form of data or method of data handling.
76 * If the low-level plugin API is used, the contents of this string are
77 * unimportant and may be anything. Slurm uses the higher-level plugin
78 * interface which requires this string to be of the form
79 *
80 * <application>/<method>
81 *
82 * where <application> is a description of the intended application of
83 * the plugin (e.g., "task" for task control) and <method> is a description
84 * of how this plugin satisfies that application. Slurm will only load
85 * a task plugin if the plugin_type string has a prefix of "task/".
86 *
87 * plugin_version - an unsigned 32-bit integer containing the Slurm version
88 * (major.minor.micro combined into a single number).
89 */
90 const char plugin_name[] = "launch Slurm plugin";
91 const char plugin_type[] = "launch/slurm";
92 const uint32_t plugin_version = SLURM_VERSION_NUMBER;
93
94 static List local_job_list = NULL;
95 static uint32_t *local_global_rc = NULL;
96 static pthread_mutex_t launch_lock = PTHREAD_MUTEX_INITIALIZER;
97 static pthread_mutex_t het_job_lock = PTHREAD_MUTEX_INITIALIZER;
98 static pthread_cond_t start_cond = PTHREAD_COND_INITIALIZER;
99 static pthread_mutex_t start_mutex = PTHREAD_MUTEX_INITIALIZER;
100 static slurm_opt_t *opt_save = NULL;
101
102 static List task_state_list = NULL;
103 static time_t launch_start_time;
104 static bool retry_step_begin = false;
105 static int retry_step_cnt = 0;
106
107 static int _step_signal(int signal);
108 extern char **environ;
109
_hostset_to_string(hostset_t hs)110 static char *_hostset_to_string(hostset_t hs)
111 {
112 size_t n = 1024;
113 size_t maxsize = 1024 * 64;
114 char *str = NULL;
115
116 do {
117 str = xrealloc(str, n);
118 } while ((hostset_ranged_string(hs, n*=2, str) < 0) && (n < maxsize));
119
120 /*
121 * If string was truncated, indicate this with a '+' suffix.
122 */
123 if (n >= maxsize)
124 strcpy(str + (maxsize - 2), "+");
125
126 return str;
127 }
128
129 /*
130 * Convert an array of task IDs into a list of host names
131 * RET: the string, caller must xfree() this value
132 */
_task_ids_to_host_list(int ntasks,uint32_t * taskids,srun_job_t * my_srun_job)133 static char *_task_ids_to_host_list(int ntasks, uint32_t *taskids,
134 srun_job_t *my_srun_job)
135 {
136 int i, task_cnt = 0;
137 hostset_t hs;
138 char *hosts;
139 slurm_step_layout_t *sl;
140
141 if ((sl = launch_common_get_slurm_step_layout(my_srun_job)) == NULL)
142 return (xstrdup("Unknown"));
143
144 /*
145 * If overhead of determining the hostlist is too high then srun
146 * communications will timeout and fail, so return "Unknown" instead.
147 *
148 * See slurm_step_layout_host_id() in src/common/slurm_step_layout.c
149 * for details.
150 */
151 for (i = 0; i < sl->node_cnt; i++) {
152 task_cnt += sl->tasks[i];
153 }
154 if (task_cnt > 100000)
155 return (xstrdup("Unknown"));
156
157 hs = hostset_create(NULL);
158 for (i = 0; i < ntasks; i++) {
159 char *host = slurm_step_layout_host_name(sl, taskids[i]);
160 if (host) {
161 hostset_insert(hs, host);
162 free(host);
163 } else {
164 error("Could not identify host name for task %u",
165 taskids[i]);
166 }
167 }
168
169 hosts = _hostset_to_string(hs);
170 hostset_destroy(hs);
171
172 return (hosts);
173 }
174
175 /*
176 * Convert an array of task IDs into a string.
177 * RET: the string, caller must xfree() this value
178 * NOTE: the taskids array is not necessarily in numeric order,
179 * so we use existing bitmap functions to format
180 */
_task_array_to_string(int ntasks,uint32_t * taskids,srun_job_t * my_srun_job)181 static char *_task_array_to_string(int ntasks, uint32_t *taskids,
182 srun_job_t *my_srun_job)
183 {
184 bitstr_t *tasks_bitmap = NULL;
185 char *str;
186 int i;
187
188 tasks_bitmap = bit_alloc(my_srun_job->ntasks);
189 if (!tasks_bitmap) {
190 error("bit_alloc: memory allocation failure");
191 exit(error_exit);
192 }
193 for (i = 0; i < ntasks; i++)
194 bit_set(tasks_bitmap, taskids[i]);
195 str = xmalloc(2048);
196 bit_fmt(str, 2048, tasks_bitmap);
197 FREE_NULL_BITMAP(tasks_bitmap);
198
199 return str;
200 }
201
_update_task_exit_state(task_state_t task_state,uint32_t ntasks,uint32_t * taskids,int abnormal)202 static void _update_task_exit_state(task_state_t task_state, uint32_t ntasks,
203 uint32_t *taskids, int abnormal)
204 {
205 int i;
206 task_state_type_t t = abnormal ? TS_ABNORMAL_EXIT : TS_NORMAL_EXIT;
207
208 for (i = 0; i < ntasks; i++)
209 task_state_update(task_state, taskids[i], t);
210 }
211
_kill_on_bad_exit(void)212 static int _kill_on_bad_exit(void)
213 {
214 xassert(opt_save->srun_opt);
215 if (!opt_save || (opt_save->srun_opt->kill_bad_exit == NO_VAL))
216 return slurm_get_kill_on_bad_exit();
217 return opt_save->srun_opt->kill_bad_exit;
218 }
219
_setup_max_wait_timer(void)220 static void _setup_max_wait_timer(void)
221 {
222 xassert(opt_save->srun_opt);
223 /*
224 * If these are the first tasks to finish we need to
225 * start a timer to kill off the job step if the other
226 * tasks don't finish within opt_save->srun_opt->max_wait seconds.
227 */
228 verbose("First task exited. Terminating job in %ds",
229 opt_save->srun_opt->max_wait);
230 srun_max_timer = true;
231 alarm(opt_save->srun_opt->max_wait);
232 }
233
_taskstr(int n)234 static const char *_taskstr(int n)
235 {
236 if (n == 1)
237 return "task";
238 else
239 return "tasks";
240 }
241
_is_openmpi_port_error(int errcode)242 static int _is_openmpi_port_error(int errcode)
243 {
244 if (errcode != OPEN_MPI_PORT_ERROR)
245 return 0;
246 if (opt_save && (opt_save->srun_opt->resv_port_cnt == NO_VAL))
247 return 0;
248 if (difftime(time(NULL), launch_start_time) > slurm_get_msg_timeout())
249 return 0;
250 return 1;
251 }
252
253 static void
_handle_openmpi_port_error(const char * tasks,const char * hosts,slurm_step_ctx_t * step_ctx)254 _handle_openmpi_port_error(const char *tasks, const char *hosts,
255 slurm_step_ctx_t *step_ctx)
256 {
257 uint32_t job_id, step_id;
258 char *msg = "retrying";
259
260 if (!retry_step_begin) {
261 retry_step_begin = true;
262 retry_step_cnt++;
263 }
264
265 if (retry_step_cnt >= MAX_STEP_RETRIES)
266 msg = "aborting";
267 error("%s: tasks %s unable to claim reserved port, %s.",
268 hosts, tasks, msg);
269
270 slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_JOBID, &job_id);
271 slurm_step_ctx_get(step_ctx, SLURM_STEP_CTX_STEPID, &step_id);
272 info("Terminating job step %u.%u", job_id, step_id);
273 slurm_kill_job_step(job_id, step_id, SIGKILL);
274 }
275
_task_start(launch_tasks_response_msg_t * msg)276 static void _task_start(launch_tasks_response_msg_t *msg)
277 {
278 MPIR_PROCDESC *table;
279 uint32_t local_task_id, global_task_id;
280 int i;
281 task_state_t task_state;
282
283 if (msg->count_of_pids) {
284 verbose("Node %s, %d tasks started",
285 msg->node_name, msg->count_of_pids);
286 } else {
287 /*
288 * This message should be displayed through the API,
289 * hence it is a debug2() instead of error().
290 */
291 debug2("No tasks started on node %s: %s",
292 msg->node_name, slurm_strerror(msg->return_code));
293 }
294
295 task_state = task_state_find(msg->job_id, msg->step_id, NO_VAL,
296 task_state_list);
297 if (!task_state) {
298 error("%s: Could not locate task state for step %u.%u",
299 __func__, msg->job_id, msg->step_id);
300 }
301 for (i = 0; i < msg->count_of_pids; i++) {
302 local_task_id = msg->task_ids[i];
303 global_task_id = task_state_global_id(task_state,local_task_id);
304 if (global_task_id >= MPIR_proctable_size) {
305 error("%s: task_id too large (%u >= %d)", __func__,
306 global_task_id, MPIR_proctable_size);
307 continue;
308 }
309 table = &MPIR_proctable[global_task_id];
310 table->host_name = xstrdup(msg->node_name);
311 /* table->executable_name set in mpir_set_executable_names() */
312 table->pid = msg->local_pids[i];
313 if (!task_state) {
314 error("%s: Could not update task state for task ID %u",
315 __func__, global_task_id);
316 } else if (msg->return_code == 0) {
317 task_state_update(task_state, local_task_id,
318 TS_START_SUCCESS);
319 } else {
320 task_state_update(task_state, local_task_id,
321 TS_START_FAILURE);
322
323 }
324 }
325
326 }
327
_task_finish(task_exit_msg_t * msg)328 static void _task_finish(task_exit_msg_t *msg)
329 {
330 char *tasks = NULL, *hosts = NULL;
331 bool build_task_string = false;
332 uint32_t rc = 0;
333 int normal_exit = 0;
334 static int reduce_task_exit_msg = -1;
335 static int msg_printed = 0, oom_printed = 0, last_task_exit_rc;
336 task_state_t task_state;
337 const char *task_str = _taskstr(msg->num_tasks);
338 srun_job_t *my_srun_job;
339 ListIterator iter;
340
341 iter = list_iterator_create(local_job_list);
342 while ((my_srun_job = (srun_job_t *) list_next(iter))) {
343 if ((my_srun_job->jobid == msg->job_id) &&
344 (my_srun_job->stepid == msg->step_id))
345 break;
346 }
347 list_iterator_destroy(iter);
348 if (!my_srun_job) {
349 error("Ignoring exit message from unrecognized step %u.%u",
350 msg->job_id, msg->step_id);
351 return;
352 }
353
354 if (reduce_task_exit_msg == -1) {
355 char *ptr = getenv("SLURM_SRUN_REDUCE_TASK_EXIT_MSG");
356 if (ptr && atoi(ptr) != 0)
357 reduce_task_exit_msg = 1;
358 else
359 reduce_task_exit_msg = 0;
360 }
361
362 verbose("Received task exit notification for %d %s of step %u.%u (status=0x%04x).",
363 msg->num_tasks, task_str, msg->job_id, msg->step_id,
364 msg->return_code);
365
366 /*
367 * Only build the "tasks" and "hosts" strings as needed.
368 * Building them can take multiple milliseconds
369 */
370 if (((msg->return_code & 0xff) == SIG_OOM) && !oom_printed) {
371 build_task_string = true;
372 } else if (WIFEXITED(msg->return_code)) {
373 if ((rc = WEXITSTATUS(msg->return_code)) == 0) {
374 if (get_log_level() >= LOG_LEVEL_VERBOSE)
375 build_task_string = true;
376 } else {
377 build_task_string = true;
378 }
379
380 } else if (WIFSIGNALED(msg->return_code)) {
381 if (my_srun_job->state >= SRUN_JOB_CANCELLED) {
382 if (get_log_level() >= LOG_LEVEL_VERBOSE)
383 build_task_string = true;
384 } else {
385 build_task_string = true;
386 }
387 }
388 if (build_task_string) {
389 tasks = _task_array_to_string(msg->num_tasks,
390 msg->task_id_list, my_srun_job);
391 hosts = _task_ids_to_host_list(msg->num_tasks,
392 msg->task_id_list, my_srun_job);
393 }
394
395 slurm_mutex_lock(&launch_lock);
396 if ((msg->return_code & 0xff) == SIG_OOM) {
397 if (!oom_printed)
398 error("%s: %s %s: Out Of Memory", hosts, task_str,
399 tasks);
400 oom_printed = 1;
401 *local_global_rc = msg->return_code;
402 } else if (WIFEXITED(msg->return_code)) {
403 if ((rc = WEXITSTATUS(msg->return_code)) == 0) {
404 verbose("%s: %s %s: Completed", hosts, task_str, tasks);
405 normal_exit = 1;
406 } else if (_is_openmpi_port_error(rc)) {
407 _handle_openmpi_port_error(tasks, hosts,
408 my_srun_job->step_ctx);
409 } else if ((reduce_task_exit_msg == 0) ||
410 (msg_printed == 0) ||
411 (msg->return_code != last_task_exit_rc)) {
412 error("%s: %s %s: Exited with exit code %d",
413 hosts, task_str, tasks, rc);
414 msg_printed = 1;
415 }
416 if (((*local_global_rc & 0xff) != SIG_OOM) &&
417 (!WIFSIGNALED(*local_global_rc)) &&
418 (!WIFEXITED(*local_global_rc) ||
419 (rc > WEXITSTATUS(*local_global_rc))))
420 *local_global_rc = msg->return_code;
421 } else if (WIFSIGNALED(msg->return_code)) {
422 const char *signal_str = strsignal(WTERMSIG(msg->return_code));
423 char *core_str = "";
424 #ifdef WCOREDUMP
425 if (WCOREDUMP(msg->return_code))
426 core_str = " (core dumped)";
427 #endif
428 if (my_srun_job->state >= SRUN_JOB_CANCELLED) {
429 verbose("%s: %s %s: %s%s",
430 hosts, task_str, tasks, signal_str, core_str);
431 } else if ((reduce_task_exit_msg == 0) ||
432 (msg_printed == 0) ||
433 (msg->return_code != last_task_exit_rc)) {
434 error("%s: %s %s: %s%s",
435 hosts, task_str, tasks, signal_str, core_str);
436 msg_printed = 1;
437 }
438 /*
439 * Even though lower numbered signals can be stronger than
440 * higher numbered signals, keep the highest signal so that it's
441 * predicatable to the user.
442 */
443 rc = WTERMSIG(msg->return_code);
444 if (((*local_global_rc & 0xff) != SIG_OOM) &&
445 (!WIFSIGNALED(*local_global_rc) ||
446 (rc > WTERMSIG(*local_global_rc))))
447 *local_global_rc = msg->return_code;
448 }
449 xfree(tasks);
450 xfree(hosts);
451
452 task_state = task_state_find(msg->job_id, msg->step_id, NO_VAL,
453 task_state_list);
454 if (task_state) {
455 _update_task_exit_state(task_state, msg->num_tasks,
456 msg->task_id_list, !normal_exit);
457 } else {
458 error("%s: Could not find task state for step %u.%u", __func__,
459 msg->job_id, msg->step_id);
460 }
461
462 if (task_state_first_abnormal_exit(task_state_list) &&
463 _kill_on_bad_exit())
464 (void) _step_signal(SIG_TERM_KILL);
465
466 if (task_state_first_exit(task_state_list) && opt_save &&
467 (opt_save->srun_opt->max_wait > 0))
468 _setup_max_wait_timer();
469
470 last_task_exit_rc = msg->return_code;
471 slurm_mutex_unlock(&launch_lock);
472 }
473
474 /*
475 * Load the multi_prog config file into argv, pass the entire file contents
476 * in order to avoid having to read the file on every node. We could parse
477 * the infomration here too for loading the MPIR records for TotalView
478 */
_load_multi(int * argc,char ** argv)479 static void _load_multi(int *argc, char **argv)
480 {
481 int config_fd, data_read = 0, i;
482 struct stat stat_buf;
483 char *data_buf;
484
485 if ((config_fd = open(argv[0], O_RDONLY)) == -1) {
486 error("Could not open multi_prog config file %s",
487 argv[0]);
488 exit(error_exit);
489 }
490 if (fstat(config_fd, &stat_buf) == -1) {
491 error("Could not stat multi_prog config file %s",
492 argv[0]);
493 exit(error_exit);
494 }
495 if (stat_buf.st_size > 60000) {
496 error("Multi_prog config file %s is too large",
497 argv[0]);
498 exit(error_exit);
499 }
500 data_buf = xmalloc(stat_buf.st_size + 1);
501 while ((i = read(config_fd, &data_buf[data_read], stat_buf.st_size
502 - data_read)) != 0) {
503 if (i < 0) {
504 error("Error reading multi_prog config file %s",
505 argv[0]);
506 exit(error_exit);
507 } else
508 data_read += i;
509 }
510 close(config_fd);
511
512 for (i = *argc+1; i > 1; i--)
513 argv[i] = argv[i-1];
514 argv[1] = data_buf;
515 *argc += 1;
516 }
517
518 /*
519 * init() is called when the plugin is loaded, before any other functions
520 * are called. Put global initialization here.
521 */
init(void)522 extern int init(void)
523 {
524 verbose("%s loaded", plugin_name);
525 return SLURM_SUCCESS;
526 }
527
528 /*
529 * fini() is called when the plugin is removed. Clear any allocated
530 * storage here.
531 */
fini(void)532 extern int fini(void)
533 {
534 FREE_NULL_LIST(task_state_list);
535
536 return SLURM_SUCCESS;
537 }
538
launch_p_setup_srun_opt(char ** rest,slurm_opt_t * opt_local)539 extern int launch_p_setup_srun_opt(char **rest, slurm_opt_t *opt_local)
540 {
541 srun_opt_t *srun_opt = opt_local->srun_opt;
542 xassert(srun_opt);
543 if (srun_opt->debugger_test)
544 MPIR_being_debugged = 1;
545
546 /*
547 * We need to do +2 here just in case multi-prog is needed
548 * (we add an extra argv on so just make space for it).
549 */
550 srun_opt->argv = xmalloc((srun_opt->argc + 2) * sizeof(char *));
551
552 return 0;
553 }
554
launch_p_handle_multi_prog_verify(int command_pos,slurm_opt_t * opt_local)555 extern int launch_p_handle_multi_prog_verify(int command_pos,
556 slurm_opt_t *opt_local)
557 {
558 srun_opt_t *srun_opt = opt_local->srun_opt;
559 xassert(srun_opt);
560
561 if (srun_opt->multi_prog) {
562 if (srun_opt->argc < 1) {
563 error("configuration file not specified");
564 exit(error_exit);
565 }
566 _load_multi(&srun_opt->argc, srun_opt->argv);
567 if (verify_multi_name(srun_opt->argv[command_pos], opt_local))
568 exit(error_exit);
569 return 1;
570 } else
571 return 0;
572 }
573
launch_p_create_job_step(srun_job_t * job,bool use_all_cpus,void (* signal_function)(int),sig_atomic_t * destroy_job,slurm_opt_t * opt_local)574 extern int launch_p_create_job_step(srun_job_t *job, bool use_all_cpus,
575 void (*signal_function)(int),
576 sig_atomic_t *destroy_job,
577 slurm_opt_t *opt_local)
578 {
579 if (launch_common_create_job_step(job, use_all_cpus,
580 signal_function, destroy_job,
581 opt_local) != SLURM_SUCCESS)
582 return SLURM_ERROR;
583
584 /* set the jobid for totalview */
585 if (!totalview_jobid) {
586 xstrfmtcat(totalview_jobid, "%u", job->jobid);
587 xstrfmtcat(totalview_stepid, "%u", job->stepid);
588 }
589
590 return SLURM_SUCCESS;
591 }
592
_build_user_env(srun_job_t * job,slurm_opt_t * opt_local)593 static char **_build_user_env(srun_job_t *job, slurm_opt_t *opt_local)
594 {
595 srun_opt_t *srun_opt = opt_local->srun_opt;
596 char **dest_array = NULL;
597 char *tmp_env, *tok, *save_ptr = NULL, *eq_ptr, *value;
598 bool all;
599 xassert(srun_opt);
600
601 if (!srun_opt->export_env) {
602 all = true;
603 } else {
604 all = false;
605 tmp_env = xstrdup(srun_opt->export_env);
606 tok = find_quote_token(tmp_env, ",", &save_ptr);
607 while (tok) {
608 if (xstrcasecmp(tok, "ALL") == 0)
609 all = true;
610
611 if (!xstrcasecmp(tok, "NONE"))
612 break;
613 eq_ptr = strchr(tok, '=');
614 if (eq_ptr) {
615 eq_ptr[0] = '\0';
616 value = eq_ptr + 1;
617 env_array_overwrite(&dest_array, tok, value);
618 } else {
619 value = getenv(tok);
620 if (value) {
621 env_array_overwrite(&dest_array, tok,
622 value);
623 }
624 }
625 tok = find_quote_token(NULL, ",", &save_ptr);
626 }
627 xfree(tmp_env);
628 }
629
630 if (!job->env)
631 fatal("%s: job env is NULL", __func__);
632 else if (all)
633 env_array_merge(&dest_array, (const char **) job->env);
634 else
635 env_array_merge_slurm(&dest_array, (const char **) job->env);
636
637 return dest_array;
638 }
639
_task_state_del(void * x)640 static void _task_state_del(void *x)
641 {
642 task_state_t task_state = (task_state_t) x;
643
644 task_state_destroy(task_state);
645 }
646
647 /*
648 * Return only after all hetjob components reach this point (or timeout)
649 */
_wait_all_het_job_comps_started(slurm_opt_t * opt_local)650 static void _wait_all_het_job_comps_started(slurm_opt_t *opt_local)
651 {
652 srun_opt_t *srun_opt = opt_local->srun_opt;
653 static int start_cnt = 0;
654 static int total_cnt = -1;
655 struct timeval now;
656 struct timespec timeout;
657 int rc;
658 xassert(srun_opt);
659
660 slurm_mutex_lock(&start_mutex);
661 if (total_cnt == -1)
662 total_cnt = srun_opt->het_step_cnt;
663 start_cnt++;
664 while (start_cnt < total_cnt) {
665 gettimeofday(&now, NULL);
666 timeout.tv_sec = now.tv_sec + 10; /* 10 sec delay max */
667 timeout.tv_nsec = now.tv_usec * 1000;
668 rc = pthread_cond_timedwait(&start_cond, &start_mutex,
669 &timeout);
670 if (rc == ETIMEDOUT)
671 break;
672 }
673 slurm_cond_broadcast(&start_cond);
674 slurm_mutex_unlock(&start_mutex);
675 }
676
launch_p_step_launch(srun_job_t * job,slurm_step_io_fds_t * cio_fds,uint32_t * global_rc,slurm_step_launch_callbacks_t * step_callbacks,slurm_opt_t * opt_local)677 extern int launch_p_step_launch(srun_job_t *job, slurm_step_io_fds_t *cio_fds,
678 uint32_t *global_rc,
679 slurm_step_launch_callbacks_t *step_callbacks,
680 slurm_opt_t *opt_local)
681 {
682 srun_job_t *local_srun_job;
683 srun_opt_t *srun_opt = opt_local->srun_opt;
684 slurm_step_launch_params_t launch_params;
685 slurm_step_launch_callbacks_t callbacks;
686 int rc = SLURM_SUCCESS;
687 task_state_t task_state;
688 bool first_launch = false;
689 uint32_t def_cpu_bind_type = 0;
690 char tmp_str[128];
691 xassert(srun_opt);
692
693 slurm_step_launch_params_t_init(&launch_params);
694 memcpy(&callbacks, step_callbacks, sizeof(callbacks));
695
696 task_state = task_state_find(job->jobid, job->stepid,
697 job->het_job_offset, task_state_list);
698 if (!task_state) {
699 task_state = task_state_create(job->jobid, job->stepid,
700 job->het_job_offset, job->ntasks,
701 job->het_job_task_offset);
702 slurm_mutex_lock(&het_job_lock);
703 if (!local_job_list)
704 local_job_list = list_create(NULL);
705 if (!task_state_list)
706 task_state_list = list_create(_task_state_del);
707 slurm_mutex_unlock(&het_job_lock);
708 local_srun_job = job;
709 local_global_rc = global_rc;
710 list_append(local_job_list, local_srun_job);
711 list_append(task_state_list, task_state);
712 first_launch = true;
713 } else {
714 /* Launching extra POE tasks */
715 task_state_alter(task_state, job->ntasks);
716 }
717
718 launch_params.gid = opt_local->gid;
719 launch_params.alias_list = job->alias_list;
720 launch_params.argc = srun_opt->argc;
721 launch_params.argv = srun_opt->argv;
722 launch_params.multi_prog = srun_opt->multi_prog ? true : false;
723 launch_params.cwd = opt_local->chdir;
724 launch_params.slurmd_debug = srun_opt->slurmd_debug;
725 launch_params.buffered_stdio = !srun_opt->unbuffered;
726 launch_params.labelio = srun_opt->labelio ? true : false;
727 launch_params.remote_output_filename = fname_remote_string(job->ofname);
728 launch_params.remote_input_filename = fname_remote_string(job->ifname);
729 launch_params.remote_error_filename = fname_remote_string(job->efname);
730 launch_params.het_job_node_offset = job->het_job_node_offset;
731 launch_params.het_job_id = job->het_job_id;
732 launch_params.het_job_nnodes = job->het_job_nnodes;
733 launch_params.het_job_ntasks = job->het_job_ntasks;
734 launch_params.het_job_offset = job->het_job_offset;
735 launch_params.het_job_step_cnt = srun_opt->het_step_cnt;
736 launch_params.het_job_task_offset = job->het_job_task_offset;
737 launch_params.het_job_task_cnts = job->het_job_task_cnts;
738 launch_params.het_job_tids = job->het_job_tids;
739 launch_params.het_job_tid_offsets = job->het_job_tid_offsets;
740 launch_params.het_job_node_list = job->het_job_node_list;
741 launch_params.partition = job->partition;
742 launch_params.profile = opt_local->profile;
743 launch_params.task_prolog = srun_opt->task_prolog;
744 launch_params.task_epilog = srun_opt->task_epilog;
745
746 slurm_step_ctx_get(job->step_ctx, SLURM_STEP_CTX_DEF_CPU_BIND_TYPE,
747 &def_cpu_bind_type);
748 if (slurm_verify_cpu_bind(NULL, &srun_opt->cpu_bind,
749 &srun_opt->cpu_bind_type,
750 def_cpu_bind_type)) {
751 return SLURM_ERROR;
752 }
753 slurm_sprint_cpu_bind_type(tmp_str, srun_opt->cpu_bind_type);
754 verbose("CpuBindType=%s", tmp_str);
755 launch_params.cpu_bind = srun_opt->cpu_bind;
756 launch_params.cpu_bind_type = srun_opt->cpu_bind_type;
757
758 launch_params.mem_bind = opt_local->mem_bind;
759 launch_params.mem_bind_type = opt_local->mem_bind_type;
760 launch_params.accel_bind_type = srun_opt->accel_bind_type;
761 launch_params.open_mode = srun_opt->open_mode;
762 if (opt_local->acctg_freq)
763 launch_params.acctg_freq = opt_local->acctg_freq;
764 launch_params.pty = srun_opt->pty;
765 if (opt_local->cpus_set)
766 launch_params.cpus_per_task = opt_local->cpus_per_task;
767 else
768 launch_params.cpus_per_task = 1;
769 launch_params.cpu_freq_min = opt_local->cpu_freq_min;
770 launch_params.cpu_freq_max = opt_local->cpu_freq_max;
771 launch_params.cpu_freq_gov = opt_local->cpu_freq_gov;
772 launch_params.tres_bind = opt_local->tres_bind;
773 launch_params.tres_freq = opt_local->tres_freq;
774 launch_params.task_dist = opt_local->distribution;
775 launch_params.preserve_env = srun_opt->preserve_env;
776 launch_params.spank_job_env = opt_local->spank_job_env;
777 launch_params.spank_job_env_size = opt_local->spank_job_env_size;
778 launch_params.ntasks_per_board = job->ntasks_per_board;
779 launch_params.ntasks_per_core = job->ntasks_per_core;
780 launch_params.ntasks_per_socket = job->ntasks_per_socket;
781 launch_params.no_alloc = srun_opt->no_alloc;
782 launch_params.env = _build_user_env(job, opt_local);
783
784 memcpy(&launch_params.local_fds, cio_fds, sizeof(slurm_step_io_fds_t));
785
786 if (MPIR_being_debugged) {
787 launch_params.parallel_debug = true;
788 pmi_server_max_threads(1);
789 } else {
790 launch_params.parallel_debug = false;
791 }
792 /*
793 * Normally this isn't used, but if an outside process (other
794 * than srun (poe) is using this logic to launch tasks then we
795 * can use this to signal the step.
796 */
797 callbacks.task_start = _task_start;
798 /*
799 * If poe is using this code with multi-prog it always returns
800 * 1 for each task which could be confusing since no real
801 * error happened.
802 */
803 if (!launch_params.multi_prog
804 || (!callbacks.step_signal
805 || (callbacks.step_signal == launch_g_fwd_signal))) {
806 callbacks.task_finish = _task_finish;
807 slurm_mutex_lock(&launch_lock);
808 if (!opt_save) {
809 /*
810 * Save opt_local parameters since _task_finish()
811 * will lack the values
812 */
813 opt_save = xmalloc(sizeof(slurm_opt_t));
814 memcpy(opt_save, opt_local, sizeof(slurm_opt_t));
815 opt_save->srun_opt = xmalloc(sizeof(srun_opt_t));
816 memcpy(opt_save->srun_opt, srun_opt,
817 sizeof(srun_opt_t));
818 }
819 slurm_mutex_unlock(&launch_lock);
820 }
821
822 update_job_state(job, SRUN_JOB_LAUNCHING);
823 launch_start_time = time(NULL);
824 if (first_launch) {
825 if (slurm_step_launch(job->step_ctx, &launch_params,
826 &callbacks) != SLURM_SUCCESS) {
827 rc = errno;
828 *local_global_rc = errno;
829 error("Application launch failed: %m");
830 slurm_step_launch_abort(job->step_ctx);
831 slurm_step_launch_wait_finish(job->step_ctx);
832 goto cleanup;
833 }
834 } else {
835 if (slurm_step_launch_add(job->step_ctx, job->step_ctx,
836 &launch_params, job->nodelist,
837 job->fir_nodeid) != SLURM_SUCCESS) {
838 rc = errno;
839 *local_global_rc = errno;
840 error("Application launch add failed: %m");
841 slurm_step_launch_abort(job->step_ctx);
842 slurm_step_launch_wait_finish(job->step_ctx);
843 goto cleanup;
844 }
845 }
846
847 update_job_state(job, SRUN_JOB_STARTING);
848 if (slurm_step_launch_wait_start(job->step_ctx) == SLURM_SUCCESS) {
849 update_job_state(job, SRUN_JOB_RUNNING);
850 /*
851 * Only set up MPIR structures if the step launched correctly
852 */
853 if (srun_opt->multi_prog) {
854 mpir_set_multi_name(job->ntasks,
855 launch_params.argv[0]);
856 } else {
857 mpir_set_executable_names(launch_params.argv[0],
858 job->het_job_task_offset,
859 job->ntasks);
860 }
861
862 _wait_all_het_job_comps_started(opt_local);
863 MPIR_debug_state = MPIR_DEBUG_SPAWNED;
864 if (srun_opt->debugger_test)
865 mpir_dump_proctable();
866 else if (srun_opt->parallel_debug)
867 MPIR_Breakpoint(job);
868 } else {
869 info("Job step %u.%u aborted before step completely launched.",
870 job->jobid, job->stepid);
871 }
872
873 cleanup:
874 return rc;
875 }
876
launch_p_step_wait(srun_job_t * job,bool got_alloc,slurm_opt_t * opt_local)877 extern int launch_p_step_wait(srun_job_t *job, bool got_alloc,
878 slurm_opt_t *opt_local)
879 {
880 int rc = 0;
881
882 slurm_step_launch_wait_finish(job->step_ctx);
883 if ((MPIR_being_debugged == 0) && retry_step_begin &&
884 (retry_step_cnt < MAX_STEP_RETRIES) &&
885 (job->het_job_id == NO_VAL)) { /* Not hetjob step */
886 retry_step_begin = false;
887 slurm_step_ctx_destroy(job->step_ctx);
888 if (got_alloc)
889 rc = create_job_step(job, true, opt_local);
890 else
891 rc = create_job_step(job, false, opt_local);
892 if (rc < 0)
893 exit(error_exit);
894 rc = -1;
895 }
896 return rc;
897 }
898
_step_signal(int signal)899 static int _step_signal(int signal)
900 {
901 srun_job_t *my_srun_job;
902 ListIterator iter;
903 int rc = SLURM_SUCCESS, rc2;
904
905 if (!local_job_list) {
906 debug("%s: local_job_list does not exist yet", __func__);
907 return SLURM_ERROR;
908 }
909
910 iter = list_iterator_create(local_job_list);
911 while ((my_srun_job = (srun_job_t *) list_next(iter))) {
912 info("Terminating job step %u.%u",
913 my_srun_job->jobid, my_srun_job->stepid);
914 rc2 = slurm_kill_job_step(my_srun_job->jobid,
915 my_srun_job->stepid, signal);
916 if (rc2)
917 rc = rc2;
918 }
919 list_iterator_destroy(iter);
920 return rc;
921 }
922
launch_p_step_terminate(void)923 extern int launch_p_step_terminate(void)
924 {
925 return _step_signal(SIGKILL);
926
927 }
928
launch_p_print_status(void)929 extern void launch_p_print_status(void)
930 {
931 task_state_print(task_state_list, (log_f)info);
932 }
933
launch_p_fwd_signal(int signal)934 extern void launch_p_fwd_signal(int signal)
935 {
936 srun_job_t *my_srun_job;
937 ListIterator iter;
938
939 if (!local_job_list) {
940 debug("%s: local_job_list does not exist yet", __func__);
941 return;
942 }
943
944 iter = list_iterator_create(local_job_list);
945 while ((my_srun_job = (srun_job_t *) list_next(iter))) {
946 switch (signal) {
947 case SIGKILL:
948 slurm_step_launch_abort(my_srun_job->step_ctx);
949 break;
950 default:
951 slurm_step_launch_fwd_signal(my_srun_job->step_ctx,
952 signal);
953 break;
954 }
955 }
956 list_iterator_destroy(iter);
957 }
958