1 /*****************************************************************************\
2 * step_launch.c - launch a parallel job step
3 *****************************************************************************
4 * Copyright (C) 2006-2007 The Regents of the University of California.
5 * Copyright (C) 2008-2009 Lawrence Livermore National Security.
6 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7 * Written by Christopher J. Morrone <morrone2@llnl.gov>
8 * CODE-OCEC-09-009. All rights reserved.
9 *
10 * This file is part of Slurm, a resource management program.
11 * For details, see <https://slurm.schedmd.com/>.
12 * Please also read the included file: DISCLAIMER.
13 *
14 * Slurm is free software; you can redistribute it and/or modify it under
15 * the terms of the GNU General Public License as published by the Free
16 * Software Foundation; either version 2 of the License, or (at your option)
17 * any later version.
18 *
19 * In addition, as a special exception, the copyright holders give permission
20 * to link the code of portions of this program with the OpenSSL library under
21 * certain conditions as described in each individual source file, and
22 * distribute linked combinations including the two. You must obey the GNU
23 * General Public License in all respects for all of the code used other than
24 * OpenSSL. If you modify file(s) with this exception, you may extend this
25 * exception to your version of the file(s), but you are not obligated to do
26 * so. If you do not wish to do so, delete this exception statement from your
27 * version. If you delete this exception statement from all source files in
28 * the program, then also delete it here.
29 *
30 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
33 * details.
34 *
35 * You should have received a copy of the GNU General Public License along
36 * with Slurm; if not, write to the Free Software Foundation, Inc.,
37 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
38 \*****************************************************************************/
39
40 #include "config.h"
41
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <grp.h>
45 #include <limits.h>
46 #include <netdb.h> /* for gethostbyname */
47 #include <netinet/in.h>
48 #include <pthread.h>
49 #include <stdarg.h>
50 #include <stdlib.h>
51 #include <stdio.h>
52 #include <string.h>
53 #include <sys/param.h>
54 #include <sys/socket.h>
55 #include <sys/stat.h>
56 #include <sys/types.h>
57 #include <sys/un.h>
58 #include <unistd.h>
59
60 #include "slurm/slurm.h"
61
62 #include "src/common/cpu_frequency.h"
63 #include "src/common/eio.h"
64 #include "src/common/fd.h"
65 #include "src/common/forward.h"
66 #include "src/common/hostlist.h"
67 #include "src/common/macros.h"
68 #include "src/common/net.h"
69 #include "src/common/plugstack.h"
70 #include "src/common/slurm_auth.h"
71 #include "src/common/slurm_cred.h"
72 #include "src/common/slurm_mpi.h"
73 #include "src/common/slurm_protocol_api.h"
74 #include "src/common/slurm_protocol_defs.h"
75 #include "src/common/slurm_time.h"
76 #include "src/common/strlcpy.h"
77 #include "src/common/uid.h"
78 #include "src/common/xmalloc.h"
79 #include "src/common/xstring.h"
80
81 #include "src/api/step_launch.h"
82 #include "src/api/step_ctx.h"
83 #include "src/api/pmi_server.h"
84
85 #define STEP_ABORT_TIME 2
86
87 extern char **environ;
88
89 /**********************************************************************
90 * General declarations for step launch code
91 **********************************************************************/
92 static int _launch_tasks(slurm_step_ctx_t *ctx,
93 launch_tasks_request_msg_t *launch_msg,
94 uint32_t timeout, char *nodelist, int start_nodeid);
95 static char *_lookup_cwd(void);
96 static void _print_launch_msg(launch_tasks_request_msg_t *msg,
97 char *hostname, int nodeid);
98
99 /**********************************************************************
100 * Message handler declarations
101 **********************************************************************/
102 static uid_t slurm_uid;
103 static bool force_terminated_job = false;
104 static int task_exit_signal = 0;
105
106 static void _exec_prog(slurm_msg_t *msg);
107 static int _msg_thr_create(struct step_launch_state *sls, int num_nodes);
108 static void _handle_msg(void *arg, slurm_msg_t *msg);
109 static int _cr_notify_step_launch(slurm_step_ctx_t *ctx);
110 static void *_check_io_timeout(void *_sls);
111
112 static struct io_operations message_socket_ops = {
113 .readable = &eio_message_socket_readable,
114 .handle_read = &eio_message_socket_accept,
115 .handle_msg = &_handle_msg
116 };
117
118
119 /**********************************************************************
120 * API functions
121 **********************************************************************/
122
123 /*
124 * slurm_step_launch_params_t_init - initialize a user-allocated
125 * slurm_step_launch_params_t structure with default values.
126 * This function will NOT allocate any new memory.
127 * IN ptr - pointer to a structure allocated by the user.
128 * The structure will be initialized.
129 */
slurm_step_launch_params_t_init(slurm_step_launch_params_t * ptr)130 extern void slurm_step_launch_params_t_init(slurm_step_launch_params_t *ptr)
131 {
132 static slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER;
133
134 /* Initialize all values to zero ("NULL" for pointers) */
135 memset(ptr, 0, sizeof(slurm_step_launch_params_t));
136
137 ptr->buffered_stdio = true;
138 memcpy(&ptr->local_fds, &fds, sizeof(fds));
139 ptr->gid = getgid();
140 ptr->cpu_freq_min = NO_VAL;
141 ptr->cpu_freq_max = NO_VAL;
142 ptr->cpu_freq_gov = NO_VAL;
143 ptr->het_job_node_offset = NO_VAL;
144 ptr->het_job_id = NO_VAL;
145 ptr->het_job_nnodes = NO_VAL;
146 ptr->het_job_ntasks = NO_VAL;
147 ptr->het_job_offset = NO_VAL;
148 ptr->het_job_step_cnt = NO_VAL;
149 ptr->het_job_task_offset = NO_VAL;
150 }
151
152 /*
153 * Specify the plugin name to be used. This may be needed to specify the
154 * non-default MPI plugin when using Slurm API to launch tasks.
155 * IN plugin name - "none", "pmi2", etc.
156 * RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
157 */
slurm_mpi_plugin_init(char * plugin_name)158 extern int slurm_mpi_plugin_init(char *plugin_name)
159 {
160 return mpi_hook_client_init(plugin_name);
161 }
162
163 /*
164 * For a hetjob step, rebuild the MPI data structure to show what is running
165 * in a single MPI_COMM_WORLD
166 */
_rebuild_mpi_layout(slurm_step_ctx_t * ctx,const slurm_step_launch_params_t * params)167 static void _rebuild_mpi_layout(slurm_step_ctx_t *ctx,
168 const slurm_step_launch_params_t *params)
169 {
170 slurm_step_layout_t *new_step_layout, *orig_step_layout;
171
172 ctx->launch_state->mpi_info->het_job_id = params->het_job_id;
173 ctx->launch_state->mpi_info->het_job_task_offset =
174 params->het_job_task_offset;
175 new_step_layout = xmalloc(sizeof(slurm_step_layout_t));
176 orig_step_layout = ctx->launch_state->mpi_info->step_layout;
177 ctx->launch_state->mpi_info->step_layout = new_step_layout;
178 if (orig_step_layout->front_end) {
179 new_step_layout->front_end =
180 xstrdup(orig_step_layout->front_end);
181 }
182 new_step_layout->node_cnt = params->het_job_nnodes;
183 new_step_layout->node_list = xstrdup(params->het_job_node_list);
184 new_step_layout->plane_size = orig_step_layout->plane_size;
185 new_step_layout->start_protocol_ver =
186 orig_step_layout->start_protocol_ver;
187 new_step_layout->tasks = params->het_job_task_cnts;
188 new_step_layout->task_cnt = params->het_job_ntasks;
189 new_step_layout->task_dist = orig_step_layout->task_dist;
190 new_step_layout->tids = params->het_job_tids;
191 }
192
193 /*
194 * slurm_step_launch - launch a parallel job step
195 * IN ctx - job step context generated by slurm_step_ctx_create
196 * IN params - job step parameters
197 * IN callbacks - Identify functions to be called when various events occur
198 * RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
199 */
slurm_step_launch(slurm_step_ctx_t * ctx,const slurm_step_launch_params_t * params,const slurm_step_launch_callbacks_t * callbacks)200 extern int slurm_step_launch(slurm_step_ctx_t *ctx,
201 const slurm_step_launch_params_t *params,
202 const slurm_step_launch_callbacks_t *callbacks)
203 {
204 launch_tasks_request_msg_t launch;
205 char **env = NULL;
206 char **mpi_env = NULL;
207 int rc = SLURM_SUCCESS;
208 bool preserve_env = params->preserve_env;
209
210 debug("Entering %s", __func__);
211 memset(&launch, 0, sizeof(launch));
212
213 if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) {
214 error("%s: Not a valid slurm_step_ctx_t", __func__);
215 slurm_seterrno(EINVAL);
216 return SLURM_ERROR;
217 }
218
219 /* Initialize the callback pointers */
220 if (callbacks != NULL) {
221 /* copy the user specified callback pointers */
222 memcpy(&(ctx->launch_state->callback), callbacks,
223 sizeof(slurm_step_launch_callbacks_t));
224 } else {
225 /* set all callbacks to NULL */
226 memset(&(ctx->launch_state->callback), 0,
227 sizeof(slurm_step_launch_callbacks_t));
228 }
229
230 if (mpi_hook_client_init(params->mpi_plugin_name) == SLURM_ERROR) {
231 slurm_seterrno(SLURM_MPI_PLUGIN_NAME_INVALID);
232 return SLURM_ERROR;
233 }
234
235 if (params->het_job_id && (params->het_job_id != NO_VAL))
236 _rebuild_mpi_layout(ctx, params);
237
238 mpi_env = xmalloc(sizeof(char *)); /* Needed for setenvf used by MPI */
239 if ((ctx->launch_state->mpi_state =
240 mpi_hook_client_prelaunch(ctx->launch_state->mpi_info, &mpi_env))
241 == NULL) {
242 slurm_seterrno(SLURM_MPI_PLUGIN_PRELAUNCH_SETUP_FAILED);
243 return SLURM_ERROR;
244 }
245
246 /* Create message receiving sockets and handler thread */
247 rc = _msg_thr_create(ctx->launch_state,
248 ctx->step_resp->step_layout->node_cnt);
249 if (rc != SLURM_SUCCESS)
250 return rc;
251
252 /* Start tasks on compute nodes */
253 launch.job_id = ctx->step_req->job_id;
254 launch.uid = ctx->step_req->user_id;
255 launch.gid = params->gid;
256 launch.argc = params->argc;
257 launch.argv = params->argv;
258 launch.spank_job_env = params->spank_job_env;
259 launch.spank_job_env_size = params->spank_job_env_size;
260 launch.cred = ctx->step_resp->cred;
261 launch.job_step_id = ctx->step_resp->job_step_id;
262 launch.het_job_node_offset = params->het_job_node_offset;
263 launch.het_job_step_cnt = params->het_job_step_cnt;
264 launch.het_job_id = params->het_job_id;
265 launch.het_job_nnodes = params->het_job_nnodes;
266 launch.het_job_ntasks = params->het_job_ntasks;
267 launch.het_job_offset = params->het_job_offset;
268 launch.het_job_task_offset = params->het_job_task_offset;
269 launch.het_job_task_cnts = params->het_job_task_cnts;
270 launch.het_job_tids = params->het_job_tids;
271 launch.het_job_tid_offsets = params->het_job_tid_offsets;
272 launch.het_job_node_list = params->het_job_node_list;
273 if (params->env == NULL) {
274 /*
275 * If the user didn't specify an environment, then use the
276 * environment of the running process
277 */
278 env_array_merge(&env, (const char **)environ);
279 } else {
280 env_array_merge(&env, (const char **)params->env);
281 }
282 if (params->het_job_ntasks != NO_VAL)
283 preserve_env = true;
284 env_array_for_step(&env, ctx->step_resp, &launch,
285 ctx->launch_state->resp_port[0], preserve_env);
286 env_array_merge(&env, (const char **)mpi_env);
287 env_array_free(mpi_env);
288
289 launch.envc = envcount(env);
290 launch.env = env;
291 if (params->cwd)
292 launch.cwd = xstrdup(params->cwd);
293 else
294 launch.cwd = _lookup_cwd();
295 launch.alias_list = params->alias_list;
296 launch.nnodes = ctx->step_resp->step_layout->node_cnt;
297 launch.ntasks = ctx->step_resp->step_layout->task_cnt;
298 launch.slurmd_debug = params->slurmd_debug;
299 launch.switch_job = ctx->step_resp->switch_job;
300 launch.profile = params->profile;
301 launch.task_prolog = params->task_prolog;
302 launch.task_epilog = params->task_epilog;
303 launch.cpu_bind_type = params->cpu_bind_type;
304 launch.cpu_bind = params->cpu_bind;
305 launch.cpu_freq_min = params->cpu_freq_min;
306 launch.cpu_freq_max = params->cpu_freq_max;
307 launch.cpu_freq_gov = params->cpu_freq_gov;
308 launch.tres_bind = params->tres_bind;
309 launch.tres_freq = params->tres_freq;
310 launch.mem_bind_type = params->mem_bind_type;
311 launch.mem_bind = params->mem_bind;
312 launch.accel_bind_type = params->accel_bind_type;
313 launch.flags = 0;
314 if (params->multi_prog)
315 launch.flags |= LAUNCH_MULTI_PROG;
316 launch.cpus_per_task = params->cpus_per_task;
317 launch.ntasks_per_board = params->ntasks_per_board;
318 launch.ntasks_per_core = params->ntasks_per_core;
319 launch.ntasks_per_socket= params->ntasks_per_socket;
320
321 if (params->no_alloc)
322 launch.flags |= LAUNCH_NO_ALLOC;
323
324 launch.task_dist = params->task_dist;
325 launch.partition = params->partition;
326 if (params->pty)
327 launch.flags |= LAUNCH_PTY;
328 launch.acctg_freq = params->acctg_freq;
329 launch.open_mode = params->open_mode;
330 launch.options = job_options_create();
331 launch.complete_nodelist =
332 xstrdup(ctx->step_resp->step_layout->node_list);
333 spank_set_remote_options (launch.options);
334 if (params->parallel_debug)
335 launch.flags |= LAUNCH_PARALLEL_DEBUG;
336
337 launch.tasks_to_launch = ctx->step_resp->step_layout->tasks;
338 launch.global_task_ids = ctx->step_resp->step_layout->tids;
339
340 launch.select_jobinfo = ctx->step_resp->select_jobinfo;
341
342 if (params->user_managed_io)
343 launch.flags |= LAUNCH_USER_MANAGED_IO;
344 ctx->launch_state->user_managed_io = params->user_managed_io;
345
346 if (!ctx->launch_state->user_managed_io) {
347 launch.ofname = params->remote_output_filename;
348 launch.efname = params->remote_error_filename;
349 launch.ifname = params->remote_input_filename;
350 if (params->buffered_stdio)
351 launch.flags |= LAUNCH_BUFFERED_IO;
352 if (params->labelio)
353 launch.flags |= LAUNCH_LABEL_IO;
354 ctx->launch_state->io.normal =
355 client_io_handler_create(params->local_fds,
356 ctx->step_req->num_tasks,
357 launch.nnodes,
358 ctx->step_resp->cred,
359 params->labelio,
360 params->het_job_offset,
361 params->het_job_task_offset);
362 if (ctx->launch_state->io.normal == NULL) {
363 rc = SLURM_ERROR;
364 goto fail1;
365 }
366 /*
367 * The client_io_t gets a pointer back to the slurm_launch_state
368 * to notify it of I/O errors.
369 */
370 ctx->launch_state->io.normal->sls = ctx->launch_state;
371
372 if (client_io_handler_start(ctx->launch_state->io.normal)
373 != SLURM_SUCCESS) {
374 rc = SLURM_ERROR;
375 goto fail1;
376 }
377 launch.num_io_port = ctx->launch_state->io.normal->num_listen;
378 launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
379 memcpy(launch.io_port, ctx->launch_state->io.normal->listenport,
380 (sizeof(uint16_t) * launch.num_io_port));
381 /*
382 * If the io timeout is > 0, create a flag to ping the stepds
383 * if io_timeout seconds pass without stdio traffic to/from
384 * the node.
385 */
386 ctx->launch_state->io_timeout = slurm_get_msg_timeout();
387 } else { /* user_managed_io is true */
388 /* initialize user_managed_io_t */
389 ctx->launch_state->io.user =
390 (user_managed_io_t *)xmalloc(sizeof(user_managed_io_t));
391 ctx->launch_state->io.user->connected = 0;
392 ctx->launch_state->io.user->sockets =
393 (int *)xmalloc(sizeof(int) * ctx->step_req->num_tasks);
394 }
395
396 launch.num_resp_port = ctx->launch_state->num_resp_port;
397 launch.resp_port = xmalloc(sizeof(uint16_t) * launch.num_resp_port);
398 memcpy(launch.resp_port, ctx->launch_state->resp_port,
399 (sizeof(uint16_t) * launch.num_resp_port));
400 rc = _launch_tasks(ctx, &launch, params->msg_timeout,
401 launch.complete_nodelist, 0);
402
403 /* clean up */
404 xfree(launch.resp_port);
405 if (!ctx->launch_state->user_managed_io)
406 xfree(launch.io_port);
407
408 fail1:
409 xfree(launch.user_name);
410 xfree(launch.complete_nodelist);
411 xfree(launch.cwd);
412 env_array_free(env);
413 job_options_destroy(launch.options);
414 return rc;
415 }
416
417 /*
418 * slurm_step_launch_add - Add tasks to a step that was already started
419 * IN ctx - job step context generated by slurm_step_ctx_create
420 * IN first_ctx - job step context generated by slurm_step_ctx_create for
421 * first component of the job step
422 * IN params - job step parameters
423 * IN node_list - list of extra nodes to add
424 * IN start_nodeid - in the global scheme which node id is the first
425 * node in node_list.
426 * RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
427 */
slurm_step_launch_add(slurm_step_ctx_t * ctx,slurm_step_ctx_t * first_ctx,const slurm_step_launch_params_t * params,char * node_list,int start_nodeid)428 extern int slurm_step_launch_add(slurm_step_ctx_t *ctx,
429 slurm_step_ctx_t *first_ctx,
430 const slurm_step_launch_params_t *params,
431 char *node_list, int start_nodeid)
432 {
433 launch_tasks_request_msg_t launch;
434 char **env = NULL;
435 char **mpi_env = NULL;
436 int rc = SLURM_SUCCESS;
437 uint16_t resp_port = 0;
438 bool preserve_env = params->preserve_env;
439
440 debug("Entering %s", __func__);
441
442 if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) {
443 error("%s: Not a valid slurm_step_ctx_t", __func__);
444 slurm_seterrno(EINVAL);
445 return SLURM_ERROR;
446 }
447
448 memset(&launch, 0, sizeof(launch));
449
450 /* Start tasks on compute nodes */
451 launch.job_id = ctx->step_req->job_id;
452 launch.uid = ctx->step_req->user_id;
453 launch.gid = params->gid;
454 launch.argc = params->argc;
455 launch.argv = params->argv;
456 launch.spank_job_env = params->spank_job_env;
457 launch.spank_job_env_size = params->spank_job_env_size;
458 launch.cred = ctx->step_resp->cred;
459 launch.job_step_id = ctx->step_resp->job_step_id;
460 launch.het_job_step_cnt = params->het_job_step_cnt;
461 launch.het_job_id = params->het_job_id;
462 launch.het_job_nnodes = params->het_job_nnodes;
463 launch.het_job_ntasks = params->het_job_ntasks;
464 launch.het_job_offset = params->het_job_offset;
465 launch.het_job_task_offset = params->het_job_task_offset;
466 launch.het_job_task_cnts = params->het_job_task_cnts;
467 launch.het_job_tids = params->het_job_tids;
468 launch.het_job_tid_offsets = params->het_job_tid_offsets;
469 launch.het_job_node_list = params->het_job_node_list;
470 if (params->env == NULL) {
471 /*
472 * if the user didn't specify an environment, grab the
473 * environment of the running process
474 */
475 env_array_merge(&env, (const char **)environ);
476 } else {
477 env_array_merge(&env, (const char **)params->env);
478 }
479 if (first_ctx->launch_state->resp_port)
480 resp_port = first_ctx->launch_state->resp_port[0];
481 if (params->het_job_ntasks != NO_VAL)
482 preserve_env = true;
483 env_array_for_step(&env, ctx->step_resp, &launch, resp_port,
484 preserve_env);
485 env_array_merge(&env, (const char **)mpi_env);
486 env_array_free(mpi_env);
487
488 launch.envc = envcount(env);
489 launch.env = env;
490 if (params->cwd)
491 launch.cwd = xstrdup(params->cwd);
492 else
493 launch.cwd = _lookup_cwd();
494 launch.alias_list = params->alias_list;
495 launch.nnodes = ctx->step_resp->step_layout->node_cnt;
496 launch.ntasks = ctx->step_resp->step_layout->task_cnt;
497 launch.slurmd_debug = params->slurmd_debug;
498 launch.switch_job = ctx->step_resp->switch_job;
499 launch.profile = params->profile;
500 launch.task_prolog = params->task_prolog;
501 launch.task_epilog = params->task_epilog;
502 launch.cpu_bind_type = params->cpu_bind_type;
503 launch.cpu_bind = params->cpu_bind;
504 launch.cpu_freq_min = params->cpu_freq_min;
505 launch.cpu_freq_max = params->cpu_freq_max;
506 launch.cpu_freq_gov = params->cpu_freq_gov;
507 launch.tres_bind = params->tres_bind;
508 launch.tres_freq = params->tres_freq;
509 launch.mem_bind_type = params->mem_bind_type;
510 launch.mem_bind = params->mem_bind;
511 launch.accel_bind_type = params->accel_bind_type;
512 launch.flags = 0;
513 if (params->multi_prog)
514 launch.flags |= LAUNCH_MULTI_PROG;
515 launch.cpus_per_task = params->cpus_per_task;
516 launch.task_dist = params->task_dist;
517 launch.partition = params->partition;
518 if (params->pty)
519 launch.flags |= LAUNCH_PTY;
520 launch.acctg_freq = params->acctg_freq;
521 launch.open_mode = params->open_mode;
522 launch.options = job_options_create();
523 launch.complete_nodelist =
524 xstrdup(ctx->step_resp->step_layout->node_list);
525
526 spank_set_remote_options (launch.options);
527 if (params->parallel_debug)
528 launch.flags |= LAUNCH_PARALLEL_DEBUG;
529
530 launch.tasks_to_launch = ctx->step_resp->step_layout->tasks;
531 launch.global_task_ids = ctx->step_resp->step_layout->tids;
532
533 launch.select_jobinfo = ctx->step_resp->select_jobinfo;
534
535 if (params->user_managed_io)
536 launch.flags |= LAUNCH_USER_MANAGED_IO;
537
538 /* user_managed_io is true */
539 if (!ctx->launch_state->io.user) {
540 launch.ofname = params->remote_output_filename;
541 launch.efname = params->remote_error_filename;
542 launch.ifname = params->remote_input_filename;
543 if (params->buffered_stdio)
544 launch.flags |= LAUNCH_BUFFERED_IO;
545 if (params->labelio)
546 launch.flags |= LAUNCH_LABEL_IO;
547 ctx->launch_state->io.normal =
548 client_io_handler_create(params->local_fds,
549 ctx->step_req->num_tasks,
550 launch.nnodes,
551 ctx->step_resp->cred,
552 params->labelio,
553 params->het_job_offset,
554 params->het_job_task_offset);
555 if (ctx->launch_state->io.normal == NULL) {
556 rc = SLURM_ERROR;
557 goto fail1;
558 }
559 /*
560 * The client_io_t gets a pointer back to the slurm_launch_state
561 * to notify it of I/O errors.
562 */
563 ctx->launch_state->io.normal->sls = ctx->launch_state;
564
565 if (client_io_handler_start(ctx->launch_state->io.normal)
566 != SLURM_SUCCESS) {
567 rc = SLURM_ERROR;
568 goto fail1;
569 }
570 launch.num_io_port = ctx->launch_state->io.normal->num_listen;
571 launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
572 memcpy(launch.io_port, ctx->launch_state->io.normal->listenport,
573 (sizeof(uint16_t) * launch.num_io_port));
574 /*
575 * If the io timeout is > 0, create a flag to ping the stepds
576 * if io_timeout seconds pass without stdio traffic to/from
577 * the node.
578 */
579 ctx->launch_state->io_timeout = slurm_get_msg_timeout();
580 } else { /* user_managed_io is true */
581 xrealloc(ctx->launch_state->io.user->sockets,
582 sizeof(int) * ctx->step_req->num_tasks);
583 }
584
585 if (first_ctx->launch_state->num_resp_port &&
586 first_ctx->launch_state->resp_port) {
587 launch.num_resp_port = first_ctx->launch_state->num_resp_port;
588 launch.resp_port = xmalloc(sizeof(uint16_t) *
589 launch.num_resp_port);
590 memcpy(launch.resp_port, first_ctx->launch_state->resp_port,
591 (sizeof(uint16_t) * launch.num_resp_port));
592 }
593
594 rc = _launch_tasks(ctx, &launch, params->msg_timeout,
595 node_list, start_nodeid);
596
597 fail1:
598 /* clean up */
599 xfree(launch.user_name);
600 xfree(launch.resp_port);
601 if (!ctx->launch_state->user_managed_io)
602 xfree(launch.io_port);
603
604 xfree(launch.cwd);
605 env_array_free(env);
606 job_options_destroy(launch.options);
607
608 return rc;
609 }
610
_step_abort(slurm_step_ctx_t * ctx)611 static void _step_abort(slurm_step_ctx_t *ctx)
612 {
613 struct step_launch_state *sls = ctx->launch_state;
614
615 if (!sls->abort_action_taken) {
616 slurm_kill_job_step(ctx->job_id, ctx->step_resp->job_step_id,
617 SIGKILL);
618 sls->abort_action_taken = true;
619 }
620 }
621
622 /*
623 * Block until all tasks have started.
624 */
slurm_step_launch_wait_start(slurm_step_ctx_t * ctx)625 int slurm_step_launch_wait_start(slurm_step_ctx_t *ctx)
626 {
627 struct step_launch_state *sls = ctx->launch_state;
628 struct timespec ts;
629
630 ts.tv_sec = time(NULL);
631 ts.tv_nsec = 0;
632 ts.tv_sec += 600; /* 10 min allowed for launch */
633
634 /* Wait for all tasks to start */
635 slurm_mutex_lock(&sls->lock);
636 while (bit_set_count(sls->tasks_started) < sls->tasks_requested) {
637 if (sls->abort) {
638 _step_abort(ctx);
639 slurm_mutex_unlock(&sls->lock);
640 return SLURM_ERROR;
641 }
642 if (pthread_cond_timedwait(&sls->cond, &sls->lock, &ts) ==
643 ETIMEDOUT) {
644 error("timeout waiting for task launch, "
645 "started %d of %d tasks",
646 bit_set_count(sls->tasks_started),
647 sls->tasks_requested);
648 sls->abort = true;
649 _step_abort(ctx);
650 slurm_cond_broadcast(&sls->cond);
651 slurm_mutex_unlock(&sls->lock);
652 return SLURM_ERROR;
653 }
654 }
655
656 if (sls->user_managed_io) {
657 while (sls->io.user->connected < sls->tasks_requested) {
658 if (sls->abort) {
659 _step_abort(ctx);
660 slurm_mutex_unlock(&sls->lock);
661 return SLURM_ERROR;
662 }
663 if (pthread_cond_timedwait(&sls->cond, &sls->lock,
664 &ts) == ETIMEDOUT) {
665 error("timeout waiting for I/O connect");
666 sls->abort = true;
667 _step_abort(ctx);
668 slurm_cond_broadcast(&sls->cond);
669 slurm_mutex_unlock(&sls->lock);
670 return SLURM_ERROR;
671 }
672 }
673 }
674
675 _cr_notify_step_launch(ctx);
676
677 slurm_mutex_unlock(&sls->lock);
678 return SLURM_SUCCESS;
679 }
680
681 /*
682 * Block until all tasks have finished (or failed to start altogether).
683 */
slurm_step_launch_wait_finish(slurm_step_ctx_t * ctx)684 void slurm_step_launch_wait_finish(slurm_step_ctx_t *ctx)
685 {
686 struct step_launch_state *sls;
687 struct timespec ts = {0, 0};
688 bool time_set = false;
689 int errnum;
690
691 if (!ctx || (ctx->magic != STEP_CTX_MAGIC))
692 return;
693
694 sls = ctx->launch_state;
695
696 /* Wait for all tasks to complete */
697 slurm_mutex_lock(&sls->lock);
698 while (bit_set_count(sls->tasks_exited) < sls->tasks_requested) {
699 if (!sls->abort) {
700 slurm_cond_wait(&sls->cond, &sls->lock);
701 } else {
702 if (!sls->abort_action_taken) {
703 slurm_kill_job_step(ctx->job_id,
704 ctx->step_resp->
705 job_step_id,
706 SIGKILL);
707 sls->abort_action_taken = true;
708 }
709 if (!time_set) {
710 uint16_t kill_wait;
711 /* Only set the time once, because we only want
712 * to wait STEP_ABORT_TIME, no matter how many
713 * times the condition variable is signaled.
714 */
715 kill_wait = slurm_get_kill_wait();
716 ts.tv_sec = time(NULL) + STEP_ABORT_TIME
717 + kill_wait;
718 time_set = true;
719 /* FIXME - should this be a callback? */
720 info("Job step aborted: Waiting up to "
721 "%d seconds for job step to finish.",
722 kill_wait + STEP_ABORT_TIME);
723 }
724
725 errnum = pthread_cond_timedwait(&sls->cond,
726 &sls->lock, &ts);
727 if (errnum == ETIMEDOUT) {
728 error("Timed out waiting for job step to "
729 "complete");
730 /*
731 * Send kill again, in case steps were still
732 * launching the first time.
733 * FIXME - eventually the slurmd should
734 * be made smart enough to really ensure
735 * that a killed step never starts.
736 */
737 slurm_kill_job_step(ctx->job_id,
738 ctx->step_resp->job_step_id,
739 SIGKILL);
740 if (!sls->user_managed_io) {
741 client_io_handler_abort(sls->
742 io.normal);
743 }
744 break;
745 } else if (errnum != 0) {
746 error("Error waiting on condition in"
747 " slurm_step_launch_wait_finish: %m");
748 if (!sls->user_managed_io) {
749 client_io_handler_abort(sls->
750 io.normal);
751 }
752 break;
753 }
754 }
755 }
756 if (sls->abort && !time_set)
757 info("Job step aborted"); /* no need to wait */
758
759 if (!force_terminated_job && task_exit_signal)
760 info("Force Terminated job step %u.%u",
761 ctx->job_id, ctx->step_resp->job_step_id);
762
763 /*
764 * task_exit_signal != 0 when srun receives a message that a task
765 * exited with a SIGTERM or SIGKILL. Without this test, a hang in srun
766 * might occur when a node gets a hard power failure, and TCP does not
767 * indicate that the I/O connection closed. The I/O thread could
768 * block waiting for an EOF message, even though the remote process
769 * has died. In this case, use client_io_handler_abort to force the
770 * I/O thread to stop listening for stdout or stderr and shutdown.
771 */
772 if (task_exit_signal && !sls->user_managed_io) {
773 client_io_handler_abort(sls->io.normal);
774 }
775
776 /* Then shutdown the message handler thread */
777 if (sls->msg_handle)
778 eio_signal_shutdown(sls->msg_handle);
779
780 slurm_mutex_unlock(&sls->lock);
781 if (sls->msg_thread)
782 pthread_join(sls->msg_thread, NULL);
783 slurm_mutex_lock(&sls->lock);
784 pmi_kvs_free();
785
786 if (sls->msg_handle) {
787 eio_handle_destroy(sls->msg_handle);
788 sls->msg_handle = NULL;
789 }
790
791 /* Shutdown the IO timeout thread, if one exists */
792 if (sls->io_timeout_thread_created) {
793 sls->halt_io_test = true;
794 slurm_cond_broadcast(&sls->cond);
795
796 slurm_mutex_unlock(&sls->lock);
797 pthread_join(sls->io_timeout_thread, NULL);
798 slurm_mutex_lock(&sls->lock);
799 }
800
801 /* Then wait for the IO thread to finish */
802 if (!sls->user_managed_io) {
803 slurm_mutex_unlock(&sls->lock);
804 client_io_handler_finish(sls->io.normal);
805 slurm_mutex_lock(&sls->lock);
806
807 client_io_handler_destroy(sls->io.normal);
808 sls->io.normal = NULL;
809 }
810
811 sls->mpi_rc = mpi_hook_client_fini(sls->mpi_state);
812 slurm_mutex_unlock(&sls->lock);
813 }
814
815 /*
816 * Abort an in-progress launch, or terminate the fully launched job step.
817 *
818 * Can be called from a signal handler.
819 */
slurm_step_launch_abort(slurm_step_ctx_t * ctx)820 void slurm_step_launch_abort(slurm_step_ctx_t *ctx)
821 {
822 struct step_launch_state *sls;
823
824 if (!ctx || ctx->magic != STEP_CTX_MAGIC)
825 return;
826
827 sls = ctx->launch_state;
828
829 slurm_mutex_lock(&sls->lock);
830 sls->abort = true;
831 slurm_cond_broadcast(&sls->cond);
832 slurm_mutex_unlock(&sls->lock);
833 }
834
835 /*
836 * Forward a signal to all those nodes with running tasks
837 */
slurm_step_launch_fwd_signal(slurm_step_ctx_t * ctx,int signo)838 extern void slurm_step_launch_fwd_signal(slurm_step_ctx_t *ctx, int signo)
839 {
840 int node_id, j, num_tasks;
841 slurm_msg_t req;
842 signal_tasks_msg_t msg;
843 hostlist_t hl;
844 char *name = NULL;
845 List ret_list = NULL;
846 ListIterator itr;
847 ret_data_info_t *ret_data_info = NULL;
848 int rc = SLURM_SUCCESS;
849 struct step_launch_state *sls = ctx->launch_state;
850 bool retry = false;
851 int retry_cnt = 0;
852
853 /* common to all tasks */
854 memset(&msg, 0, sizeof(msg));
855 msg.job_id = ctx->job_id;
856 msg.job_step_id = ctx->step_resp->job_step_id;
857 msg.signal = (uint16_t) signo;
858
859 slurm_mutex_lock(&sls->lock);
860
861 hl = hostlist_create(NULL);
862 for (node_id = 0;
863 node_id < ctx->step_resp->step_layout->node_cnt;
864 node_id++) {
865 bool active = false;
866 num_tasks = sls->layout->tasks[node_id];
867 for (j = 0; j < num_tasks; j++) {
868 if (!bit_test(sls->tasks_exited,
869 sls->layout->tids[node_id][j])) {
870 /* this one has active tasks */
871 active = true;
872 break;
873 }
874 }
875
876 if (!active)
877 continue;
878
879 if (ctx->step_resp->step_layout->front_end) {
880 hostlist_push_host(hl,
881 ctx->step_resp->step_layout->front_end);
882 break;
883 } else {
884 name = nodelist_nth_host(sls->layout->node_list,
885 node_id);
886 hostlist_push_host(hl, name);
887 free(name);
888 }
889 }
890
891 slurm_mutex_unlock(&sls->lock);
892
893 if (!hostlist_count(hl)) {
894 verbose("no active tasks in step %u.%u to send signal %d",
895 ctx->job_id, ctx->step_resp->job_step_id, signo);
896 hostlist_destroy(hl);
897 return;
898 }
899 name = hostlist_ranged_string_xmalloc(hl);
900 hostlist_destroy(hl);
901
902 RESEND: slurm_msg_t_init(&req);
903 req.msg_type = REQUEST_SIGNAL_TASKS;
904 req.data = &msg;
905
906 if (ctx->step_resp->use_protocol_ver)
907 req.protocol_version = ctx->step_resp->use_protocol_ver;
908
909 debug2("sending signal %d to step %u.%u on hosts %s",
910 signo, ctx->job_id, ctx->step_resp->job_step_id, name);
911
912 if (!(ret_list = slurm_send_recv_msgs(name, &req, 0))) {
913 error("fwd_signal: slurm_send_recv_msgs really failed badly");
914 xfree(name);
915 return;
916 }
917
918 itr = list_iterator_create(ret_list);
919 while ((ret_data_info = list_next(itr))) {
920 rc = slurm_get_return_code(ret_data_info->type,
921 ret_data_info->data);
922 /*
923 * Report error unless it is "Invalid job id" which
924 * probably just means the tasks exited in the meanwhile.
925 */
926 if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) &&
927 (rc != ESLURMD_JOB_NOTRUNNING) && (rc != ESRCH) &&
928 (rc != EAGAIN) &&
929 (rc != ESLURM_TRANSITION_STATE_NO_UPDATE)) {
930 error("Failure sending signal %d to step %u.%u on node %s: %s",
931 signo, ctx->job_id, ctx->step_resp->job_step_id,
932 ret_data_info->node_name, slurm_strerror(rc));
933 }
934 if ((rc == EAGAIN) || (rc == ESLURM_TRANSITION_STATE_NO_UPDATE))
935 retry = true;
936 }
937 list_iterator_destroy(itr);
938 FREE_NULL_LIST(ret_list);
939 if (retry) {
940 retry = false;
941 if (retry_cnt++ < 4) {
942 sleep(retry_cnt);
943 goto RESEND;
944 }
945 }
946 xfree(name);
947 }
948
949 /**********************************************************************
950 * Functions used by step_ctx code, but not exported throught the API
951 **********************************************************************/
952 /*
953 * Create a launch state structure for a specified step context, "ctx".
954 */
step_launch_state_create(slurm_step_ctx_t * ctx)955 struct step_launch_state *step_launch_state_create(slurm_step_ctx_t *ctx)
956 {
957 struct step_launch_state *sls;
958 slurm_step_layout_t *layout = ctx->step_resp->step_layout;
959 int ii;
960
961 sls = xmalloc(sizeof(struct step_launch_state));
962 sls->slurmctld_socket_fd = -1;
963 sls->tasks_requested = layout->task_cnt;
964 sls->tasks_started = bit_alloc(layout->task_cnt);
965 sls->tasks_exited = bit_alloc(layout->task_cnt);
966 sls->node_io_error = bit_alloc(layout->node_cnt);
967 sls->io_deadline = (time_t *)xmalloc(sizeof(time_t) * layout->node_cnt);
968 sls->io_timeout_thread_created = false;
969 sls->io_timeout = 0;
970 sls->halt_io_test = false;
971 sls->layout = layout;
972 sls->resp_port = NULL;
973 sls->abort = false;
974 sls->abort_action_taken = false;
975 /* NOTE: No malloc() of sls->mpi_info required */
976 sls->mpi_info->jobid = ctx->step_req->job_id;
977 sls->mpi_info->het_job_id = NO_VAL;
978 sls->mpi_info->het_job_task_offset = NO_VAL;
979 sls->mpi_info->stepid = ctx->step_resp->job_step_id;
980 sls->mpi_info->step_layout = layout;
981 sls->mpi_state = NULL;
982 slurm_mutex_init(&sls->lock);
983 slurm_cond_init(&sls->cond, NULL);
984
985 for (ii = 0; ii < layout->node_cnt; ii++) {
986 sls->io_deadline[ii] = (time_t)NO_VAL;
987 }
988 return sls;
989 }
990
991 /*
992 * If a steps size has changed update the launch_state structure for a
993 * specified step context, "ctx".
994 */
step_launch_state_alter(slurm_step_ctx_t * ctx)995 void step_launch_state_alter(slurm_step_ctx_t *ctx)
996 {
997 struct step_launch_state *sls = ctx->launch_state;
998 slurm_step_layout_t *layout = ctx->step_resp->step_layout;
999 int ii;
1000
1001 xassert(sls);
1002 sls->tasks_requested = layout->task_cnt;
1003 sls->tasks_started = bit_realloc(sls->tasks_started, layout->task_cnt);
1004 sls->tasks_exited = bit_realloc(sls->tasks_exited, layout->task_cnt);
1005 sls->node_io_error = bit_realloc(sls->node_io_error, layout->node_cnt);
1006 xrealloc(sls->io_deadline, sizeof(time_t) * layout->node_cnt);
1007 sls->layout = sls->mpi_info->step_layout = layout;
1008
1009 for (ii = 0; ii < layout->node_cnt; ii++) {
1010 sls->io_deadline[ii] = (time_t)NO_VAL;
1011 }
1012 }
1013
1014 /*
1015 * Free the memory associated with the a launch state structure.
1016 */
step_launch_state_destroy(struct step_launch_state * sls)1017 void step_launch_state_destroy(struct step_launch_state *sls)
1018 {
1019 /* First undo anything created in step_launch_state_create() */
1020 slurm_mutex_destroy(&sls->lock);
1021 slurm_cond_destroy(&sls->cond);
1022 FREE_NULL_BITMAP(sls->tasks_started);
1023 FREE_NULL_BITMAP(sls->tasks_exited);
1024 FREE_NULL_BITMAP(sls->node_io_error);
1025 xfree(sls->io_deadline);
1026
1027 /* Now clean up anything created by slurm_step_launch() */
1028 if (sls->resp_port != NULL) {
1029 xfree(sls->resp_port);
1030 }
1031 }
1032
1033 /**********************************************************************
1034 * CR functions
1035 **********************************************************************/
1036
1037 /* connect to srun_cr */
_connect_srun_cr(char * addr)1038 static int _connect_srun_cr(char *addr)
1039 {
1040 struct sockaddr_un sa;
1041 unsigned int sa_len;
1042 int fd, rc;
1043
1044 if (!addr) {
1045 error("%s: socket path name is NULL", __func__);
1046 return -1;
1047 }
1048 if (strlen(addr) >= sizeof(sa.sun_path)) {
1049 error("%s: socket path name too long (%s)", __func__, addr);
1050 return -1;
1051 }
1052
1053 fd = socket(AF_UNIX, SOCK_STREAM, 0);
1054 if (fd < 0) {
1055 error("failed creating cr socket: %m");
1056 return -1;
1057 }
1058 memset(&sa, 0, sizeof(sa));
1059
1060 sa.sun_family = AF_UNIX;
1061 strlcpy(sa.sun_path, addr, sizeof(sa.sun_path));
1062 sa_len = strlen(sa.sun_path) + sizeof(sa.sun_family);
1063
1064 while (((rc = connect(fd, (struct sockaddr *)&sa, sa_len)) < 0) &&
1065 (errno == EINTR));
1066
1067 if (rc < 0) {
1068 debug2("failed connecting cr socket: %m");
1069 close(fd);
1070 return -1;
1071 }
1072 return fd;
1073 }
1074
1075 /* send job_id, step_id, node_list to srun_cr */
_cr_notify_step_launch(slurm_step_ctx_t * ctx)1076 static int _cr_notify_step_launch(slurm_step_ctx_t *ctx)
1077 {
1078 int fd, len, rc = 0;
1079 char *cr_sock_addr = NULL;
1080
1081 cr_sock_addr = getenv("SLURM_SRUN_CR_SOCKET");
1082 if (cr_sock_addr == NULL) { /* not run under srun_cr */
1083 return 0;
1084 }
1085
1086 if ((fd = _connect_srun_cr(cr_sock_addr)) < 0) {
1087 debug2("failed connecting srun_cr. take it not running under "
1088 "srun_cr.");
1089 return 0;
1090 }
1091 if (write(fd, &ctx->job_id, sizeof(uint32_t)) != sizeof(uint32_t)) {
1092 error("failed writing job_id to srun_cr: %m");
1093 rc = -1;
1094 goto out;
1095 }
1096 if (write(fd, &ctx->step_resp->job_step_id, sizeof(uint32_t)) !=
1097 sizeof(uint32_t)) {
1098 error("failed writing job_step_id to srun_cr: %m");
1099 rc = -1;
1100 goto out;
1101 }
1102 len = strlen(ctx->step_resp->step_layout->node_list);
1103 if (write(fd, &len, sizeof(int)) != sizeof(int)) {
1104 error("failed writing nodelist length to srun_cr: %m");
1105 rc = -1;
1106 goto out;
1107 }
1108 if (write(fd, ctx->step_resp->step_layout->node_list, len + 1) !=
1109 (len + 1)) {
1110 error("failed writing nodelist to srun_cr: %m");
1111 rc = -1;
1112 }
1113 out:
1114 close (fd);
1115 return rc;
1116 }
1117
1118 /**********************************************************************
1119 * Message handler functions
1120 **********************************************************************/
_msg_thr_internal(void * arg)1121 static void *_msg_thr_internal(void *arg)
1122 {
1123 struct step_launch_state *sls = (struct step_launch_state *)arg;
1124
1125 eio_handle_mainloop(sls->msg_handle);
1126
1127 return NULL;
1128 }
1129
1130 static inline int
_estimate_nports(int nclients,int cli_per_port)1131 _estimate_nports(int nclients, int cli_per_port)
1132 {
1133 div_t d;
1134 d = div(nclients, cli_per_port);
1135 return d.rem > 0 ? d.quot + 1 : d.quot;
1136 }
1137
_msg_thr_create(struct step_launch_state * sls,int num_nodes)1138 static int _msg_thr_create(struct step_launch_state *sls, int num_nodes)
1139 {
1140 int sock = -1;
1141 uint16_t port;
1142 eio_obj_t *obj;
1143 int i, rc = SLURM_SUCCESS;
1144 uint16_t *ports;
1145 uint16_t eio_timeout;
1146
1147 debug("Entering _msg_thr_create()");
1148 slurm_uid = (uid_t) slurm_get_slurm_user_id();
1149
1150 eio_timeout = slurm_get_srun_eio_timeout();
1151 sls->msg_handle = eio_handle_create(eio_timeout);
1152 sls->num_resp_port = _estimate_nports(num_nodes, 48);
1153 sls->resp_port = xmalloc(sizeof(uint16_t) * sls->num_resp_port);
1154
1155 /* multiple jobs (easily induced via no_alloc) and highly
1156 * parallel jobs using PMI sometimes result in slow message
1157 * responses and timeouts. Raise the default timeout for srun. */
1158 if (!message_socket_ops.timeout)
1159 message_socket_ops.timeout = slurm_get_msg_timeout() * 8000;
1160
1161 ports = slurm_get_srun_port_range();
1162 for (i = 0; i < sls->num_resp_port; i++) {
1163 int cc;
1164
1165 if (ports)
1166 cc = net_stream_listen_ports(&sock, &port, ports, false);
1167 else
1168 cc = net_stream_listen(&sock, &port);
1169 if (cc < 0) {
1170 error("unable to initialize step launch listening "
1171 "socket: %m");
1172 return SLURM_ERROR;
1173 }
1174 sls->resp_port[i] = port;
1175 obj = eio_obj_create(sock, &message_socket_ops, (void *)sls);
1176 eio_new_initial_obj(sls->msg_handle, obj);
1177 }
1178 /* finally, add the listening port that we told the slurmctld about
1179 * eariler in the step context creation phase */
1180 if (sls->slurmctld_socket_fd > -1) {
1181 obj = eio_obj_create(sls->slurmctld_socket_fd,
1182 &message_socket_ops, (void *)sls);
1183 eio_new_initial_obj(sls->msg_handle, obj);
1184 }
1185
1186 slurm_thread_create(&sls->msg_thread, _msg_thr_internal, sls);
1187 return rc;
1188 }
1189
1190 static void
_launch_handler(struct step_launch_state * sls,slurm_msg_t * resp)1191 _launch_handler(struct step_launch_state *sls, slurm_msg_t *resp)
1192 {
1193 launch_tasks_response_msg_t *msg = resp->data;
1194 int i;
1195
1196 slurm_mutex_lock(&sls->lock);
1197 if ((msg->count_of_pids > 0) &&
1198 bit_test(sls->tasks_started, msg->task_ids[0])) {
1199 debug("%s: duplicate launch response received from node %s",
1200 __func__, msg->node_name);
1201 slurm_mutex_unlock(&sls->lock);
1202 return;
1203 }
1204
1205 if (msg->return_code) {
1206 for (i = 0; i < msg->count_of_pids; i++) {
1207 error("task %u launch failed: %s",
1208 msg->task_ids[i],
1209 slurm_strerror(msg->return_code));
1210 bit_set(sls->tasks_started, msg->task_ids[i]);
1211 bit_set(sls->tasks_exited, msg->task_ids[i]);
1212 }
1213 } else {
1214 for (i = 0; i < msg->count_of_pids; i++)
1215 bit_set(sls->tasks_started, msg->task_ids[i]);
1216 }
1217 if (sls->callback.task_start != NULL)
1218 (sls->callback.task_start)(msg);
1219
1220 slurm_cond_broadcast(&sls->cond);
1221 slurm_mutex_unlock(&sls->lock);
1222
1223 }
1224
1225 static void
_exit_handler(struct step_launch_state * sls,slurm_msg_t * exit_msg)1226 _exit_handler(struct step_launch_state *sls, slurm_msg_t *exit_msg)
1227 {
1228 task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data;
1229 void (*task_finish)(task_exit_msg_t *);
1230 int i;
1231
1232 if ((msg->job_id != sls->mpi_info->jobid) ||
1233 (msg->step_id != sls->mpi_info->stepid)) {
1234 debug("Received MESSAGE_TASK_EXIT from wrong job: %u.%u",
1235 msg->job_id, msg->step_id);
1236 return;
1237 }
1238
1239 /* Record SIGTERM and SIGKILL termination codes to
1240 * recognize abnormal termination */
1241 if (WIFSIGNALED(msg->return_code)) {
1242 i = WTERMSIG(msg->return_code);
1243 if ((i == SIGKILL) || (i == SIGTERM))
1244 task_exit_signal = i;
1245 }
1246
1247 slurm_mutex_lock(&sls->lock);
1248 task_finish = sls->callback.task_finish;
1249 slurm_mutex_unlock(&sls->lock);
1250 if (task_finish != NULL)
1251 (task_finish)(msg); /* Outside of lock for performance */
1252
1253 slurm_mutex_lock(&sls->lock);
1254 for (i = 0; i < msg->num_tasks; i++) {
1255 debug("task %u done", msg->task_id_list[i]);
1256 bit_set(sls->tasks_exited, msg->task_id_list[i]);
1257 }
1258
1259 slurm_cond_broadcast(&sls->cond);
1260 slurm_mutex_unlock(&sls->lock);
1261 }
1262
1263 static void
_job_complete_handler(struct step_launch_state * sls,slurm_msg_t * complete_msg)1264 _job_complete_handler(struct step_launch_state *sls, slurm_msg_t *complete_msg)
1265 {
1266 srun_job_complete_msg_t *step_msg =
1267 (srun_job_complete_msg_t *) complete_msg->data;
1268
1269 if (step_msg->step_id == NO_VAL) {
1270 verbose("Complete job %u received",
1271 step_msg->job_id);
1272 } else {
1273 verbose("Complete job step %u.%u received",
1274 step_msg->job_id, step_msg->step_id);
1275 }
1276
1277 if (sls->callback.step_complete)
1278 (sls->callback.step_complete)(step_msg);
1279
1280 force_terminated_job = true;
1281 slurm_mutex_lock(&sls->lock);
1282 sls->abort = true;
1283 slurm_cond_broadcast(&sls->cond);
1284 slurm_mutex_unlock(&sls->lock);
1285 }
1286
1287 static void
_timeout_handler(struct step_launch_state * sls,slurm_msg_t * timeout_msg)1288 _timeout_handler(struct step_launch_state *sls, slurm_msg_t *timeout_msg)
1289 {
1290 srun_timeout_msg_t *step_msg =
1291 (srun_timeout_msg_t *) timeout_msg->data;
1292
1293 if (sls->callback.step_timeout)
1294 (sls->callback.step_timeout)(step_msg);
1295
1296 slurm_mutex_lock(&sls->lock);
1297 slurm_cond_broadcast(&sls->cond);
1298 slurm_mutex_unlock(&sls->lock);
1299 }
1300
1301 /*
1302 * Take the list of node names of down nodes and convert into an
1303 * array of nodeids for the step. The nodeid array is passed to
1304 * client_io_handler_downnodes to notify the IO handler to expect no
1305 * further IO from that node.
1306 */
1307 static void
_node_fail_handler(struct step_launch_state * sls,slurm_msg_t * fail_msg)1308 _node_fail_handler(struct step_launch_state *sls, slurm_msg_t *fail_msg)
1309 {
1310 srun_node_fail_msg_t *nf = fail_msg->data;
1311 hostset_t fail_nodes, all_nodes;
1312 hostlist_iterator_t fail_itr;
1313 int num_node_ids;
1314 int *node_ids;
1315 int i, j;
1316 int node_id, num_tasks;
1317
1318 error("Node failure on %s", nf->nodelist);
1319
1320 fail_nodes = hostset_create(nf->nodelist);
1321 fail_itr = hostset_iterator_create(fail_nodes);
1322 num_node_ids = hostset_count(fail_nodes);
1323 node_ids = xmalloc(sizeof(int) * num_node_ids);
1324
1325 slurm_mutex_lock(&sls->lock);
1326 all_nodes = hostset_create(sls->layout->node_list);
1327 /* find the index number of each down node */
1328 for (i = 0; i < num_node_ids; i++) {
1329 #ifdef HAVE_FRONT_END
1330 node_id = 0;
1331 #else
1332 char *node = hostlist_next(fail_itr);
1333 node_id = node_ids[i] = hostset_find(all_nodes, node);
1334 if (node_id < 0) {
1335 error( "Internal error: bad SRUN_NODE_FAIL message. "
1336 "Node %s not part of this job step", node);
1337 free(node);
1338 continue;
1339 }
1340 free(node);
1341 #endif
1342
1343 /* find all of the tasks that should run on this node and
1344 * mark them as having started and exited. If they haven't
1345 * started yet, they never will, and likewise for exiting.
1346 */
1347 num_tasks = sls->layout->tasks[node_id];
1348 for (j = 0; j < num_tasks; j++) {
1349 debug2("marking task %d done on failed node %d",
1350 sls->layout->tids[node_id][j], node_id);
1351 bit_set(sls->tasks_started,
1352 sls->layout->tids[node_id][j]);
1353 bit_set(sls->tasks_exited,
1354 sls->layout->tids[node_id][j]);
1355 }
1356 }
1357
1358 if (!sls->user_managed_io) {
1359 client_io_handler_downnodes(sls->io.normal, node_ids,
1360 num_node_ids);
1361 }
1362 slurm_cond_broadcast(&sls->cond);
1363 slurm_mutex_unlock(&sls->lock);
1364
1365 xfree(node_ids);
1366 hostlist_iterator_destroy(fail_itr);
1367 hostset_destroy(fail_nodes);
1368 hostset_destroy(all_nodes);
1369 }
1370
1371 /*
1372 * Receive a message when a slurmd cold starts, that the step on that node
1373 * may have died. Verify that tasks on these nodes(s) are still alive,
1374 * and abort the job step if they are not.
1375 * This message could be the result of the slurmd daemon cold-starting
1376 * or a race condition when tasks are starting or terminating.
1377 */
1378 static void
_step_missing_handler(struct step_launch_state * sls,slurm_msg_t * missing_msg)1379 _step_missing_handler(struct step_launch_state *sls, slurm_msg_t *missing_msg)
1380 {
1381 srun_step_missing_msg_t *step_missing = missing_msg->data;
1382 hostset_t fail_nodes, all_nodes;
1383 hostlist_iterator_t fail_itr;
1384 char *node;
1385 int num_node_ids;
1386 int i, j;
1387 int node_id;
1388 client_io_t *cio = sls->io.normal;
1389 bool test_message_sent;
1390 int num_tasks;
1391 bool active;
1392
1393 debug("Step %u.%u missing from node(s) %s",
1394 step_missing->job_id, step_missing->step_id,
1395 step_missing->nodelist);
1396
1397 /* Ignore this message in the unusual "user_managed_io" case. No way
1398 to confirm a bad connection, since a test message goes straight to
1399 the task. Aborting without checking may be too dangerous. This
1400 choice may cause srun to not exit even though the job step has
1401 ended. */
1402 if (sls->user_managed_io)
1403 return;
1404
1405 slurm_mutex_lock(&sls->lock);
1406
1407 if (!sls->io_timeout_thread_created) {
1408 sls->io_timeout_thread_created = true;
1409 slurm_thread_create(&sls->io_timeout_thread,
1410 _check_io_timeout, sls);
1411 }
1412
1413 fail_nodes = hostset_create(step_missing->nodelist);
1414 fail_itr = hostset_iterator_create(fail_nodes);
1415 num_node_ids = hostset_count(fail_nodes);
1416
1417 all_nodes = hostset_create(sls->layout->node_list);
1418
1419 for (i = 0; i < num_node_ids; i++) {
1420 node = hostlist_next(fail_itr);
1421 node_id = hostset_find(all_nodes, node);
1422 if (node_id < 0) {
1423 error("Internal error: bad SRUN_STEP_MISSING message. "
1424 "Node %s not part of this job step", node);
1425 free(node);
1426 continue;
1427 }
1428 free(node);
1429
1430 /*
1431 * If all tasks for this node have either not started or already
1432 * exited, ignore the missing step message for this node.
1433 */
1434 num_tasks = sls->layout->tasks[node_id];
1435 active = false;
1436 for (j = 0; j < num_tasks; j++) {
1437 if (bit_test(sls->tasks_started,
1438 sls->layout->tids[node_id][j]) &&
1439 !bit_test(sls->tasks_exited,
1440 sls->layout->tids[node_id][j])) {
1441 active = true;
1442 break;
1443 }
1444 }
1445 if (!active)
1446 continue;
1447
1448 /* If this is true, an I/O error has already occurred on the
1449 * stepd for the current node, and the job should abort */
1450 if (bit_test(sls->node_io_error, node_id)) {
1451 error("Aborting, step missing and io error on node %d",
1452 node_id);
1453 sls->abort = true;
1454 slurm_cond_broadcast(&sls->cond);
1455 break;
1456 }
1457
1458 /*
1459 * A test is already is progress. Ignore message for this node.
1460 */
1461 if (sls->io_deadline[node_id] != NO_VAL) {
1462 debug("Test in progress for node %d, ignoring message",
1463 node_id);
1464 continue;
1465 }
1466
1467 sls->io_deadline[node_id] = time(NULL) + sls->io_timeout;
1468
1469 debug("Testing connection to node %d", node_id);
1470 if (client_io_handler_send_test_message(cio, node_id,
1471 &test_message_sent)) {
1472 /*
1473 * If unable to test a connection, assume the step
1474 * is having problems and abort. If unable to test,
1475 * the system is probably having serious problems, so
1476 * aborting the step seems reasonable.
1477 */
1478 error("Aborting, can not test connection to node %d.",
1479 node_id);
1480 sls->abort = true;
1481 slurm_cond_broadcast(&sls->cond);
1482 break;
1483 }
1484
1485 /*
1486 * test_message_sent should be true unless this node either
1487 * hasn't started or already finished. Poke the io_timeout
1488 * thread to make sure it will abort the job if the deadline
1489 * for receiving a response passes.
1490 */
1491 if (test_message_sent) {
1492 slurm_cond_broadcast(&sls->cond);
1493 } else {
1494 sls->io_deadline[node_id] = (time_t)NO_VAL;
1495 }
1496 }
1497 slurm_mutex_unlock(&sls->lock);
1498
1499 hostlist_iterator_destroy(fail_itr);
1500 hostset_destroy(fail_nodes);
1501 hostset_destroy(all_nodes);
1502 }
1503
1504 /* This RPC typically used to send a signal an external program that
1505 * is usually wrapped by srun.
1506 */
1507 static void
_step_step_signal(struct step_launch_state * sls,slurm_msg_t * signal_msg)1508 _step_step_signal(struct step_launch_state *sls, slurm_msg_t *signal_msg)
1509 {
1510 job_step_kill_msg_t *step_signal = signal_msg->data;
1511 debug2("Signal %u requested for step %u.%u", step_signal->signal,
1512 step_signal->job_id, step_signal->job_step_id);
1513 if (sls->callback.step_signal)
1514 (sls->callback.step_signal)(step_signal->signal);
1515
1516 }
1517
1518 /*
1519 * The TCP connection that was used to send the task_spawn_io_msg_t message
1520 * will be used as the user managed IO stream. The remote end of the TCP stream
1521 * will be connected to the stdin, stdout, and stderr of the task. The
1522 * local end of the stream is stored in the user_managed_io_t structure, and
1523 * is left to the user to manage (the user can retrieve the array of
1524 * socket descriptors using slurm_step_ctx_get()).
1525 *
1526 * To allow the message TCP stream to be reused for spawn IO traffic we
1527 * set the slurm_msg_t's conn_fd to -1 to avoid having the caller close the
1528 * TCP stream.
1529 */
1530 static void
_task_user_managed_io_handler(struct step_launch_state * sls,slurm_msg_t * user_io_msg)1531 _task_user_managed_io_handler(struct step_launch_state *sls,
1532 slurm_msg_t *user_io_msg)
1533 {
1534 task_user_managed_io_msg_t *msg =
1535 (task_user_managed_io_msg_t *) user_io_msg->data;
1536
1537 slurm_mutex_lock(&sls->lock);
1538
1539 debug("task %d user managed io stream established", msg->task_id);
1540 /* sanity check */
1541 if (msg->task_id >= sls->tasks_requested) {
1542 error("_task_user_managed_io_handler:"
1543 " bad task ID %u (of %d tasks)",
1544 msg->task_id, sls->tasks_requested);
1545 }
1546
1547 sls->io.user->connected++;
1548 fd_set_blocking(user_io_msg->conn_fd);
1549 sls->io.user->sockets[msg->task_id] = user_io_msg->conn_fd;
1550
1551 /* prevent the caller from closing the user managed IO stream */
1552 user_io_msg->conn_fd = -1;
1553
1554 slurm_cond_broadcast(&sls->cond);
1555 slurm_mutex_unlock(&sls->lock);
1556 }
1557
1558 /*
1559 * Identify the incoming message and call the appropriate handler function.
1560 */
1561 static void
_handle_msg(void * arg,slurm_msg_t * msg)1562 _handle_msg(void *arg, slurm_msg_t *msg)
1563 {
1564 struct step_launch_state *sls = (struct step_launch_state *)arg;
1565 uid_t req_uid;
1566 uid_t uid = getuid();
1567 srun_user_msg_t *um;
1568 int rc;
1569
1570 req_uid = g_slurm_auth_get_uid(msg->auth_cred);
1571
1572 if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) {
1573 error ("Security violation, slurm message from uid %u",
1574 (unsigned int) req_uid);
1575 return;
1576 }
1577
1578 switch (msg->msg_type) {
1579 case RESPONSE_LAUNCH_TASKS:
1580 debug2("received task launch");
1581 _launch_handler(sls, msg);
1582 slurm_send_rc_msg(msg, SLURM_SUCCESS);
1583 break;
1584 case MESSAGE_TASK_EXIT:
1585 debug2("received task exit");
1586 _exit_handler(sls, msg);
1587 slurm_send_rc_msg(msg, SLURM_SUCCESS);
1588 break;
1589 case SRUN_PING:
1590 debug3("slurmctld ping received");
1591 slurm_send_rc_msg(msg, SLURM_SUCCESS);
1592 break;
1593 case SRUN_EXEC:
1594 _exec_prog(msg);
1595 break;
1596 case SRUN_JOB_COMPLETE:
1597 debug2("received job step complete message");
1598 _job_complete_handler(sls, msg);
1599 break;
1600 case SRUN_TIMEOUT:
1601 debug2("received job step timeout message");
1602 _timeout_handler(sls, msg);
1603 break;
1604 case SRUN_USER_MSG:
1605 um = msg->data;
1606 info("%s", um->msg);
1607 break;
1608 case SRUN_NODE_FAIL:
1609 debug2("received srun node fail");
1610 _node_fail_handler(sls, msg);
1611 break;
1612 case SRUN_STEP_MISSING:
1613 debug2("received notice of missing job step");
1614 _step_missing_handler(sls, msg);
1615 break;
1616 case SRUN_STEP_SIGNAL:
1617 debug2("received step signal RPC");
1618 _step_step_signal(sls, msg);
1619 break;
1620 case PMI_KVS_PUT_REQ:
1621 debug2("PMI_KVS_PUT_REQ received");
1622 rc = pmi_kvs_put((kvs_comm_set_t *) msg->data);
1623 slurm_send_rc_msg(msg, rc);
1624 break;
1625 case PMI_KVS_GET_REQ:
1626 debug2("PMI_KVS_GET_REQ received");
1627 rc = pmi_kvs_get((kvs_get_msg_t *) msg->data);
1628 slurm_send_rc_msg(msg, rc);
1629 break;
1630 case TASK_USER_MANAGED_IO_STREAM:
1631 debug2("TASK_USER_MANAGED_IO_STREAM");
1632 _task_user_managed_io_handler(sls, msg);
1633 break;
1634 default:
1635 error("%s: received spurious message type: %u",
1636 __func__, msg->msg_type);
1637 break;
1638 }
1639 return;
1640 }
1641
1642 /**********************************************************************
1643 * Task launch functions
1644 **********************************************************************/
1645
1646 /* Since the slurmd usually controls the finishing of tasks to the
1647 * controller this needs to happen here if there was a problem with a
1648 * task launch to the slurmd since there will not be cleanup of this
1649 * anywhere else.
1650 */
_fail_step_tasks(slurm_step_ctx_t * ctx,char * node,int ret_code)1651 static int _fail_step_tasks(slurm_step_ctx_t *ctx, char *node, int ret_code)
1652 {
1653 slurm_msg_t req;
1654 step_complete_msg_t msg;
1655 int rc = -1;
1656 int nodeid = 0;
1657 struct step_launch_state *sls = ctx->launch_state;
1658
1659 #ifndef HAVE_FRONT_END
1660 /* It is always 0 for front end systems */
1661 nodeid = nodelist_find(ctx->step_resp->step_layout->node_list, node);
1662 #endif
1663
1664 slurm_mutex_lock(&sls->lock);
1665 sls->abort = true;
1666 slurm_cond_broadcast(&sls->cond);
1667 slurm_mutex_unlock(&sls->lock);
1668
1669 memset(&msg, 0, sizeof(msg));
1670 msg.job_id = ctx->job_id;
1671 msg.job_step_id = ctx->step_resp->job_step_id;
1672
1673 msg.range_first = msg.range_last = nodeid;
1674 msg.step_rc = ret_code;
1675
1676 slurm_msg_t_init(&req);
1677 req.msg_type = REQUEST_STEP_COMPLETE;
1678 req.data = &msg;
1679
1680 if (ctx->step_resp->use_protocol_ver)
1681 req.protocol_version = ctx->step_resp->use_protocol_ver;
1682
1683 if (slurm_send_recv_controller_rc_msg(&req, &rc,
1684 working_cluster_rec) < 0)
1685 return SLURM_ERROR;
1686
1687 return SLURM_SUCCESS;
1688 }
1689
_launch_tasks(slurm_step_ctx_t * ctx,launch_tasks_request_msg_t * launch_msg,uint32_t timeout,char * nodelist,int start_nodeid)1690 static int _launch_tasks(slurm_step_ctx_t *ctx,
1691 launch_tasks_request_msg_t *launch_msg,
1692 uint32_t timeout, char *nodelist, int start_nodeid)
1693 {
1694 #ifdef HAVE_FRONT_END
1695 slurm_cred_arg_t cred_args;
1696 #endif
1697 slurm_msg_t msg;
1698 List ret_list = NULL;
1699 ListIterator ret_itr;
1700 ret_data_info_t *ret_data = NULL;
1701 int rc = SLURM_SUCCESS;
1702 int tot_rc = SLURM_SUCCESS;
1703
1704 debug("Entering _launch_tasks");
1705 if (ctx->verbose_level) {
1706 char *name = NULL;
1707 hostlist_t hl = hostlist_create(nodelist);
1708 int i = start_nodeid;
1709 while ((name = hostlist_shift(hl))) {
1710 _print_launch_msg(launch_msg, name, i++);
1711 free(name);
1712 }
1713 hostlist_destroy(hl);
1714 }
1715
1716 /*
1717 * Extend timeout based upon BatchStartTime to permit for a long
1718 * running Prolog
1719 */
1720 if (timeout <= 0) {
1721 timeout = (slurm_get_msg_timeout() +
1722 slurm_get_batch_start_timeout()) * 1000;
1723 }
1724
1725 slurm_msg_t_init(&msg);
1726 msg.msg_type = REQUEST_LAUNCH_TASKS;
1727 msg.data = launch_msg;
1728
1729 if (ctx->step_resp->use_protocol_ver)
1730 msg.protocol_version = ctx->step_resp->use_protocol_ver;
1731
1732 #ifdef HAVE_FRONT_END
1733 slurm_cred_get_args(ctx->step_resp->cred, &cred_args);
1734 //info("hostlist=%s", cred_args.step_hostlist);
1735 ret_list = slurm_send_recv_msgs(cred_args.step_hostlist, &msg, timeout);
1736 slurm_cred_free_args(&cred_args);
1737 #else
1738 ret_list = slurm_send_recv_msgs(nodelist, &msg, timeout);
1739 #endif
1740 if (ret_list == NULL) {
1741 error("slurm_send_recv_msgs failed miserably: %m");
1742 return SLURM_ERROR;
1743 }
1744 ret_itr = list_iterator_create(ret_list);
1745 while ((ret_data = list_next(ret_itr))) {
1746 rc = slurm_get_return_code(ret_data->type,
1747 ret_data->data);
1748 debug("launch returned msg_rc=%d err=%d type=%d",
1749 rc, ret_data->err, ret_data->type);
1750 if (rc != SLURM_SUCCESS) {
1751 if (ret_data->err)
1752 tot_rc = ret_data->err;
1753 else
1754 tot_rc = rc;
1755
1756 _fail_step_tasks(ctx, ret_data->node_name, tot_rc);
1757
1758 errno = tot_rc;
1759 tot_rc = SLURM_ERROR;
1760 error("Task launch for %u.%u failed on "
1761 "node %s: %m",
1762 ctx->job_id, ctx->step_resp->job_step_id,
1763 ret_data->node_name);
1764 } else {
1765 #if 0 /* only for debugging, might want to make this a callback */
1766 errno = ret_data->err;
1767 info("Launch success on node %s",
1768 ret_data->node_name);
1769 #endif
1770 }
1771 }
1772 list_iterator_destroy(ret_itr);
1773 FREE_NULL_LIST(ret_list);
1774
1775 if (tot_rc != SLURM_SUCCESS)
1776 return tot_rc;
1777 return rc;
1778 }
1779
1780 /* returns an xmalloc cwd string, or NULL if lookup failed. */
_lookup_cwd(void)1781 static char *_lookup_cwd(void)
1782 {
1783 char buf[PATH_MAX];
1784
1785 if (getcwd(buf, PATH_MAX) != NULL) {
1786 return xstrdup(buf);
1787 } else {
1788 return NULL;
1789 }
1790 }
1791
_print_launch_msg(launch_tasks_request_msg_t * msg,char * hostname,int nodeid)1792 static void _print_launch_msg(launch_tasks_request_msg_t *msg,
1793 char *hostname, int nodeid)
1794 {
1795 int i;
1796 char *tmp_str = NULL, *task_list = NULL;
1797 hostlist_t hl = hostlist_create(NULL);
1798
1799 for (i=0; i<msg->tasks_to_launch[nodeid]; i++) {
1800 xstrfmtcat(tmp_str, "%u", msg->global_task_ids[nodeid][i]);
1801 hostlist_push_host(hl, tmp_str);
1802 xfree(tmp_str);
1803 }
1804 task_list = hostlist_ranged_string_xmalloc(hl);
1805 hostlist_destroy(hl);
1806
1807 info("launching %u.%u on host %s, %u tasks: %s",
1808 msg->job_id, msg->job_step_id, hostname,
1809 msg->tasks_to_launch[nodeid], task_list);
1810 xfree(task_list);
1811
1812 debug3("uid:%ld gid:%ld cwd:%s %d", (long) msg->uid,
1813 (long) msg->gid, msg->cwd, nodeid);
1814 }
1815
1816 /* This is used to initiate an OpenMPI checkpoint program,
1817 * but is written to be general purpose */
1818 static void
_exec_prog(slurm_msg_t * msg)1819 _exec_prog(slurm_msg_t *msg)
1820 {
1821 pid_t child;
1822 int pfd[2], status;
1823 ssize_t len;
1824 char buf[256] = "";
1825 srun_exec_msg_t *exec_msg = msg->data;
1826
1827 if ((exec_msg->argc < 1) || (exec_msg->argv == NULL) ||
1828 (exec_msg->argv[0] == NULL)) {
1829 error("%s: called with no command to execute", __func__);
1830 return;
1831 } else if (exec_msg->argc > 2) {
1832 verbose("Exec '%s %s' for %u.%u",
1833 exec_msg->argv[0], exec_msg->argv[1],
1834 exec_msg->job_id, exec_msg->step_id);
1835 } else {
1836 verbose("Exec '%s' for %u.%u",
1837 exec_msg->argv[0],
1838 exec_msg->job_id, exec_msg->step_id);
1839 }
1840
1841 if (pipe(pfd) == -1) {
1842 snprintf(buf, sizeof(buf), "pipe: %s", strerror(errno));
1843 error("%s", buf);
1844 return;
1845 }
1846
1847 child = fork();
1848 if (child == 0) {
1849 int fd = open("/dev/null", O_RDONLY);
1850 if (fd < 0) {
1851 error("%s: can not open /dev/null", __func__);
1852 exit(1);
1853 }
1854 dup2(fd, 0); /* stdin from /dev/null */
1855 dup2(pfd[1], 1); /* stdout to pipe */
1856 dup2(pfd[1], 2); /* stderr to pipe */
1857 close(pfd[0]);
1858 close(pfd[1]);
1859 execvp(exec_msg->argv[0], exec_msg->argv);
1860 error("execvp(%s): %m", exec_msg->argv[0]);
1861 } else if (child < 0) {
1862 snprintf(buf, sizeof(buf), "fork: %s", strerror(errno));
1863 error("%s", buf);
1864 return;
1865 } else {
1866 close(pfd[1]);
1867 len = read(pfd[0], buf, sizeof(buf));
1868 if (len >= 1)
1869 close(pfd[0]);
1870 waitpid(child, &status, 0);
1871 }
1872 }
1873
1874
1875 /*
1876 * Notify the step_launch_state that an I/O connection went bad.
1877 * If the node is suspected to be down, abort the job.
1878 */
1879 int
step_launch_notify_io_failure(step_launch_state_t * sls,int node_id)1880 step_launch_notify_io_failure(step_launch_state_t *sls, int node_id)
1881 {
1882 slurm_mutex_lock(&sls->lock);
1883
1884 bit_set(sls->node_io_error, node_id);
1885 debug("IO error on node %d", node_id);
1886
1887 /*
1888 * sls->io_deadline[node_id] != (time_t)NO_VAL means that
1889 * the _step_missing_handler was called on this node.
1890 */
1891 if (sls->io_deadline[node_id] != (time_t)NO_VAL) {
1892 error("Aborting, io error and missing step on node %d",
1893 node_id);
1894 sls->abort = true;
1895 slurm_cond_broadcast(&sls->cond);
1896 } else {
1897
1898 /* FIXME
1899 * If stepd dies or we see I/O error with stepd.
1900 * Do not abort the whole job but collect all
1901 * taks on the node just like if they exited.
1902 *
1903 * Keep supporting 'srun -N x --pty bash'
1904 */
1905 if (getenv("SLURM_PTY_PORT") == NULL) {
1906 error("%s: aborting, io error with slurmstepd on node %d",
1907 __func__, node_id);
1908 sls->abort = true;
1909 slurm_cond_broadcast(&sls->cond);
1910 }
1911 }
1912
1913 slurm_mutex_unlock(&sls->lock);
1914
1915 return SLURM_SUCCESS;
1916 }
1917
1918
1919 /*
1920 * This is called 1) after a node connects for the first time and 2) when
1921 * a message comes in confirming that a connection is okay.
1922 *
1923 * Just in case the node was marked questionable very early in the
1924 * job step setup, clear this flag if/when the node makes its initial
1925 * connection.
1926 */
1927 int
step_launch_clear_questionable_state(step_launch_state_t * sls,int node_id)1928 step_launch_clear_questionable_state(step_launch_state_t *sls, int node_id)
1929 {
1930 slurm_mutex_lock(&sls->lock);
1931 sls->io_deadline[node_id] = (time_t)NO_VAL;
1932 slurm_mutex_unlock(&sls->lock);
1933 return SLURM_SUCCESS;
1934 }
1935
1936 static void *
_check_io_timeout(void * _sls)1937 _check_io_timeout(void *_sls)
1938 {
1939 int ii;
1940 time_t now, next_deadline;
1941 struct timespec ts = {0, 0};
1942 step_launch_state_t *sls = (step_launch_state_t *)_sls;
1943
1944 slurm_mutex_lock(&sls->lock);
1945
1946 while (1) {
1947 if (sls->halt_io_test || sls->abort)
1948 break;
1949
1950 now = time(NULL);
1951 next_deadline = (time_t)NO_VAL;
1952
1953 for (ii = 0; ii < sls->layout->node_cnt; ii++) {
1954 if (sls->io_deadline[ii] == (time_t)NO_VAL)
1955 continue;
1956
1957 if (sls->io_deadline[ii] <= now) {
1958 sls->abort = true;
1959 slurm_cond_broadcast(&sls->cond);
1960 error( "Cannot communicate with node %d. "
1961 "Aborting job.", ii);
1962 break;
1963 } else if (next_deadline == (time_t)NO_VAL ||
1964 sls->io_deadline[ii] < next_deadline) {
1965 next_deadline = sls->io_deadline[ii];
1966 }
1967 }
1968 if (sls->abort)
1969 break;
1970
1971 if (next_deadline == (time_t)NO_VAL) {
1972 debug("io timeout thread: no pending deadlines, "
1973 "sleeping indefinitely");
1974 slurm_cond_wait(&sls->cond, &sls->lock);
1975 } else {
1976 debug("io timeout thread: sleeping %lds until deadline",
1977 (long)(next_deadline - time(NULL)));
1978 ts.tv_sec = next_deadline;
1979 slurm_cond_timedwait(&sls->cond, &sls->lock, &ts);
1980 }
1981 }
1982 slurm_mutex_unlock(&sls->lock);
1983 return NULL;
1984 }
1985