1 /*****************************************************************************\
2  *  step_launch.c - launch a parallel job step
3  *****************************************************************************
4  *  Copyright (C) 2006-2007 The Regents of the University of California.
5  *  Copyright (C) 2008-2009 Lawrence Livermore National Security.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Christopher J. Morrone <morrone2@llnl.gov>
8  *  CODE-OCEC-09-009. All rights reserved.
9  *
10  *  This file is part of Slurm, a resource management program.
11  *  For details, see <https://slurm.schedmd.com/>.
12  *  Please also read the included file: DISCLAIMER.
13  *
14  *  Slurm is free software; you can redistribute it and/or modify it under
15  *  the terms of the GNU General Public License as published by the Free
16  *  Software Foundation; either version 2 of the License, or (at your option)
17  *  any later version.
18  *
19  *  In addition, as a special exception, the copyright holders give permission
20  *  to link the code of portions of this program with the OpenSSL library under
21  *  certain conditions as described in each individual source file, and
22  *  distribute linked combinations including the two. You must obey the GNU
23  *  General Public License in all respects for all of the code used other than
24  *  OpenSSL. If you modify file(s) with this exception, you may extend this
25  *  exception to your version of the file(s), but you are not obligated to do
26  *  so. If you do not wish to do so, delete this exception statement from your
27  *  version.  If you delete this exception statement from all source files in
28  *  the program, then also delete it here.
29  *
30  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
33  *  details.
34  *
35  *  You should have received a copy of the GNU General Public License along
36  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
37  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
38 \*****************************************************************************/
39 
40 #include "config.h"
41 
42 #include <errno.h>
43 #include <fcntl.h>
44 #include <grp.h>
45 #include <limits.h>
46 #include <netdb.h>		/* for gethostbyname */
47 #include <netinet/in.h>
48 #include <pthread.h>
49 #include <stdarg.h>
50 #include <stdlib.h>
51 #include <stdio.h>
52 #include <string.h>
53 #include <sys/param.h>
54 #include <sys/socket.h>
55 #include <sys/stat.h>
56 #include <sys/types.h>
57 #include <sys/un.h>
58 #include <unistd.h>
59 
60 #include "slurm/slurm.h"
61 
62 #include "src/common/cpu_frequency.h"
63 #include "src/common/eio.h"
64 #include "src/common/fd.h"
65 #include "src/common/forward.h"
66 #include "src/common/hostlist.h"
67 #include "src/common/macros.h"
68 #include "src/common/net.h"
69 #include "src/common/plugstack.h"
70 #include "src/common/slurm_auth.h"
71 #include "src/common/slurm_cred.h"
72 #include "src/common/slurm_mpi.h"
73 #include "src/common/slurm_protocol_api.h"
74 #include "src/common/slurm_protocol_defs.h"
75 #include "src/common/slurm_time.h"
76 #include "src/common/strlcpy.h"
77 #include "src/common/uid.h"
78 #include "src/common/xmalloc.h"
79 #include "src/common/xstring.h"
80 
81 #include "src/api/step_launch.h"
82 #include "src/api/step_ctx.h"
83 #include "src/api/pmi_server.h"
84 
85 #define STEP_ABORT_TIME 2
86 
87 extern char **environ;
88 
89 /**********************************************************************
90  * General declarations for step launch code
91  **********************************************************************/
92 static int _launch_tasks(slurm_step_ctx_t *ctx,
93 			 launch_tasks_request_msg_t *launch_msg,
94 			 uint32_t timeout, char *nodelist, int start_nodeid);
95 static char *_lookup_cwd(void);
96 static void _print_launch_msg(launch_tasks_request_msg_t *msg,
97 			      char *hostname, int nodeid);
98 
99 /**********************************************************************
100  * Message handler declarations
101  **********************************************************************/
102 static uid_t  slurm_uid;
103 static bool   force_terminated_job = false;
104 static int    task_exit_signal = 0;
105 
106 static void _exec_prog(slurm_msg_t *msg);
107 static int  _msg_thr_create(struct step_launch_state *sls, int num_nodes);
108 static void _handle_msg(void *arg, slurm_msg_t *msg);
109 static int  _cr_notify_step_launch(slurm_step_ctx_t *ctx);
110 static void *_check_io_timeout(void *_sls);
111 
112 static struct io_operations message_socket_ops = {
113 	.readable = &eio_message_socket_readable,
114 	.handle_read = &eio_message_socket_accept,
115 	.handle_msg = &_handle_msg
116 };
117 
118 
119 /**********************************************************************
120  * API functions
121  **********************************************************************/
122 
123 /*
124  * slurm_step_launch_params_t_init - initialize a user-allocated
125  *      slurm_step_launch_params_t structure with default values.
126  *	This function will NOT allocate any new memory.
127  * IN ptr - pointer to a structure allocated by the user.
128  *      The structure will be initialized.
129  */
slurm_step_launch_params_t_init(slurm_step_launch_params_t * ptr)130 extern void slurm_step_launch_params_t_init(slurm_step_launch_params_t *ptr)
131 {
132 	static slurm_step_io_fds_t fds = SLURM_STEP_IO_FDS_INITIALIZER;
133 
134 	/* Initialize all values to zero ("NULL" for pointers) */
135 	memset(ptr, 0, sizeof(slurm_step_launch_params_t));
136 
137 	ptr->buffered_stdio = true;
138 	memcpy(&ptr->local_fds, &fds, sizeof(fds));
139 	ptr->gid = getgid();
140 	ptr->cpu_freq_min = NO_VAL;
141 	ptr->cpu_freq_max = NO_VAL;
142 	ptr->cpu_freq_gov = NO_VAL;
143 	ptr->het_job_node_offset  = NO_VAL;
144 	ptr->het_job_id   = NO_VAL;
145 	ptr->het_job_nnodes  = NO_VAL;
146 	ptr->het_job_ntasks  = NO_VAL;
147 	ptr->het_job_offset  = NO_VAL;
148 	ptr->het_job_step_cnt = NO_VAL;
149 	ptr->het_job_task_offset = NO_VAL;
150 }
151 
152 /*
153  * Specify the plugin name to be used. This may be needed to specify the
154  * non-default MPI plugin when using Slurm API to launch tasks.
155  * IN plugin name - "none", "pmi2", etc.
156  * RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
157  */
slurm_mpi_plugin_init(char * plugin_name)158 extern int slurm_mpi_plugin_init(char *plugin_name)
159 {
160 	return mpi_hook_client_init(plugin_name);
161 }
162 
163 /*
164  * For a hetjob step, rebuild the MPI data structure to show what is running
165  * in a single MPI_COMM_WORLD
166  */
_rebuild_mpi_layout(slurm_step_ctx_t * ctx,const slurm_step_launch_params_t * params)167 static void _rebuild_mpi_layout(slurm_step_ctx_t *ctx,
168 				const slurm_step_launch_params_t *params)
169 {
170 	slurm_step_layout_t *new_step_layout, *orig_step_layout;
171 
172 	ctx->launch_state->mpi_info->het_job_id = params->het_job_id;
173 	ctx->launch_state->mpi_info->het_job_task_offset =
174 		params->het_job_task_offset;
175 	new_step_layout = xmalloc(sizeof(slurm_step_layout_t));
176 	orig_step_layout = ctx->launch_state->mpi_info->step_layout;
177 	ctx->launch_state->mpi_info->step_layout = new_step_layout;
178 	if (orig_step_layout->front_end) {
179 		new_step_layout->front_end =
180 			xstrdup(orig_step_layout->front_end);
181 	}
182 	new_step_layout->node_cnt = params->het_job_nnodes;
183 	new_step_layout->node_list = xstrdup(params->het_job_node_list);
184 	new_step_layout->plane_size = orig_step_layout->plane_size;
185 	new_step_layout->start_protocol_ver =
186 		orig_step_layout->start_protocol_ver;
187 	new_step_layout->tasks = params->het_job_task_cnts;
188 	new_step_layout->task_cnt = params->het_job_ntasks;
189 	new_step_layout->task_dist = orig_step_layout->task_dist;
190 	new_step_layout->tids = params->het_job_tids;
191 }
192 
193 /*
194  * slurm_step_launch - launch a parallel job step
195  * IN ctx - job step context generated by slurm_step_ctx_create
196  * IN params - job step parameters
197  * IN callbacks - Identify functions to be called when various events occur
198  * RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
199  */
slurm_step_launch(slurm_step_ctx_t * ctx,const slurm_step_launch_params_t * params,const slurm_step_launch_callbacks_t * callbacks)200 extern int slurm_step_launch(slurm_step_ctx_t *ctx,
201 			     const slurm_step_launch_params_t *params,
202 			     const slurm_step_launch_callbacks_t *callbacks)
203 {
204 	launch_tasks_request_msg_t launch;
205 	char **env = NULL;
206 	char **mpi_env = NULL;
207 	int rc = SLURM_SUCCESS;
208 	bool preserve_env = params->preserve_env;
209 
210 	debug("Entering %s", __func__);
211 	memset(&launch, 0, sizeof(launch));
212 
213 	if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) {
214 		error("%s: Not a valid slurm_step_ctx_t", __func__);
215 		slurm_seterrno(EINVAL);
216 		return SLURM_ERROR;
217 	}
218 
219 	/* Initialize the callback pointers */
220 	if (callbacks != NULL) {
221 		/* copy the user specified callback pointers */
222 		memcpy(&(ctx->launch_state->callback), callbacks,
223 		       sizeof(slurm_step_launch_callbacks_t));
224 	} else {
225 		/* set all callbacks to NULL */
226 		memset(&(ctx->launch_state->callback), 0,
227 		       sizeof(slurm_step_launch_callbacks_t));
228 	}
229 
230 	if (mpi_hook_client_init(params->mpi_plugin_name) == SLURM_ERROR) {
231 		slurm_seterrno(SLURM_MPI_PLUGIN_NAME_INVALID);
232 		return SLURM_ERROR;
233 	}
234 
235 	if (params->het_job_id && (params->het_job_id != NO_VAL))
236 		_rebuild_mpi_layout(ctx, params);
237 
238 	mpi_env = xmalloc(sizeof(char *));  /* Needed for setenvf used by MPI */
239 	if ((ctx->launch_state->mpi_state =
240 	     mpi_hook_client_prelaunch(ctx->launch_state->mpi_info, &mpi_env))
241 	    == NULL) {
242 		slurm_seterrno(SLURM_MPI_PLUGIN_PRELAUNCH_SETUP_FAILED);
243 		return SLURM_ERROR;
244 	}
245 
246 	/* Create message receiving sockets and handler thread */
247 	rc = _msg_thr_create(ctx->launch_state,
248 			     ctx->step_resp->step_layout->node_cnt);
249 	if (rc != SLURM_SUCCESS)
250 		return rc;
251 
252 	/* Start tasks on compute nodes */
253 	launch.job_id = ctx->step_req->job_id;
254 	launch.uid = ctx->step_req->user_id;
255 	launch.gid = params->gid;
256 	launch.argc = params->argc;
257 	launch.argv = params->argv;
258 	launch.spank_job_env = params->spank_job_env;
259 	launch.spank_job_env_size = params->spank_job_env_size;
260 	launch.cred = ctx->step_resp->cred;
261 	launch.job_step_id = ctx->step_resp->job_step_id;
262 	launch.het_job_node_offset = params->het_job_node_offset;
263 	launch.het_job_step_cnt = params->het_job_step_cnt;
264 	launch.het_job_id = params->het_job_id;
265 	launch.het_job_nnodes = params->het_job_nnodes;
266 	launch.het_job_ntasks = params->het_job_ntasks;
267 	launch.het_job_offset = params->het_job_offset;
268 	launch.het_job_task_offset = params->het_job_task_offset;
269 	launch.het_job_task_cnts = params->het_job_task_cnts;
270 	launch.het_job_tids = params->het_job_tids;
271 	launch.het_job_tid_offsets = params->het_job_tid_offsets;
272 	launch.het_job_node_list = params->het_job_node_list;
273 	if (params->env == NULL) {
274 		/*
275 		 * If the user didn't specify an environment, then use the
276 		 * environment of the running process
277 		 */
278 		env_array_merge(&env, (const char **)environ);
279 	} else {
280 		env_array_merge(&env, (const char **)params->env);
281 	}
282 	if (params->het_job_ntasks != NO_VAL)
283 		preserve_env = true;
284 	env_array_for_step(&env, ctx->step_resp, &launch,
285 			   ctx->launch_state->resp_port[0], preserve_env);
286 	env_array_merge(&env, (const char **)mpi_env);
287 	env_array_free(mpi_env);
288 
289 	launch.envc = envcount(env);
290 	launch.env = env;
291 	if (params->cwd)
292 		launch.cwd = xstrdup(params->cwd);
293 	else
294 		launch.cwd = _lookup_cwd();
295 	launch.alias_list	= params->alias_list;
296 	launch.nnodes		= ctx->step_resp->step_layout->node_cnt;
297 	launch.ntasks		= ctx->step_resp->step_layout->task_cnt;
298 	launch.slurmd_debug	= params->slurmd_debug;
299 	launch.switch_job	= ctx->step_resp->switch_job;
300 	launch.profile		= params->profile;
301 	launch.task_prolog	= params->task_prolog;
302 	launch.task_epilog	= params->task_epilog;
303 	launch.cpu_bind_type	= params->cpu_bind_type;
304 	launch.cpu_bind		= params->cpu_bind;
305 	launch.cpu_freq_min	= params->cpu_freq_min;
306 	launch.cpu_freq_max	= params->cpu_freq_max;
307 	launch.cpu_freq_gov	= params->cpu_freq_gov;
308 	launch.tres_bind	= params->tres_bind;
309 	launch.tres_freq	= params->tres_freq;
310 	launch.mem_bind_type	= params->mem_bind_type;
311 	launch.mem_bind		= params->mem_bind;
312 	launch.accel_bind_type	= params->accel_bind_type;
313 	launch.flags		= 0;
314 	if (params->multi_prog)
315 		launch.flags	|= LAUNCH_MULTI_PROG;
316 	launch.cpus_per_task	= params->cpus_per_task;
317 	launch.ntasks_per_board = params->ntasks_per_board;
318 	launch.ntasks_per_core  = params->ntasks_per_core;
319 	launch.ntasks_per_socket= params->ntasks_per_socket;
320 
321 	if (params->no_alloc)
322 		launch.flags	|= LAUNCH_NO_ALLOC;
323 
324 	launch.task_dist	= params->task_dist;
325 	launch.partition	= params->partition;
326 	if (params->pty)
327 		launch.flags |= LAUNCH_PTY;
328 	launch.acctg_freq	= params->acctg_freq;
329 	launch.open_mode        = params->open_mode;
330 	launch.options          = job_options_create();
331 	launch.complete_nodelist =
332 		xstrdup(ctx->step_resp->step_layout->node_list);
333 	spank_set_remote_options (launch.options);
334 	if (params->parallel_debug)
335 		launch.flags |= LAUNCH_PARALLEL_DEBUG;
336 
337 	launch.tasks_to_launch = ctx->step_resp->step_layout->tasks;
338 	launch.global_task_ids = ctx->step_resp->step_layout->tids;
339 
340 	launch.select_jobinfo  = ctx->step_resp->select_jobinfo;
341 
342 	if (params->user_managed_io)
343 		launch.flags	|= LAUNCH_USER_MANAGED_IO;
344 	ctx->launch_state->user_managed_io = params->user_managed_io;
345 
346 	if (!ctx->launch_state->user_managed_io) {
347 		launch.ofname = params->remote_output_filename;
348 		launch.efname = params->remote_error_filename;
349 		launch.ifname = params->remote_input_filename;
350 		if (params->buffered_stdio)
351 			launch.flags	|= LAUNCH_BUFFERED_IO;
352 		if (params->labelio)
353 			launch.flags	|= LAUNCH_LABEL_IO;
354 		ctx->launch_state->io.normal =
355 			client_io_handler_create(params->local_fds,
356 						 ctx->step_req->num_tasks,
357 						 launch.nnodes,
358 						 ctx->step_resp->cred,
359 						 params->labelio,
360 						 params->het_job_offset,
361 						 params->het_job_task_offset);
362 		if (ctx->launch_state->io.normal == NULL) {
363 			rc = SLURM_ERROR;
364 			goto fail1;
365 		}
366 		/*
367 		 * The client_io_t gets a pointer back to the slurm_launch_state
368 		 * to notify it of I/O errors.
369 		 */
370 		ctx->launch_state->io.normal->sls = ctx->launch_state;
371 
372 		if (client_io_handler_start(ctx->launch_state->io.normal)
373 		    != SLURM_SUCCESS) {
374 			rc = SLURM_ERROR;
375 			goto fail1;
376 		}
377 		launch.num_io_port = ctx->launch_state->io.normal->num_listen;
378 		launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
379 		memcpy(launch.io_port, ctx->launch_state->io.normal->listenport,
380 		       (sizeof(uint16_t) * launch.num_io_port));
381 		/*
382 		 * If the io timeout is > 0, create a flag to ping the stepds
383 		 * if io_timeout seconds pass without stdio traffic to/from
384 		 * the node.
385 		 */
386 		ctx->launch_state->io_timeout = slurm_get_msg_timeout();
387 	} else { /* user_managed_io is true */
388 		/* initialize user_managed_io_t */
389 		ctx->launch_state->io.user =
390 			(user_managed_io_t *)xmalloc(sizeof(user_managed_io_t));
391 		ctx->launch_state->io.user->connected = 0;
392 		ctx->launch_state->io.user->sockets =
393 			(int *)xmalloc(sizeof(int) * ctx->step_req->num_tasks);
394 	}
395 
396 	launch.num_resp_port = ctx->launch_state->num_resp_port;
397 	launch.resp_port = xmalloc(sizeof(uint16_t) * launch.num_resp_port);
398 	memcpy(launch.resp_port, ctx->launch_state->resp_port,
399 	       (sizeof(uint16_t) * launch.num_resp_port));
400 	rc = _launch_tasks(ctx, &launch, params->msg_timeout,
401 			   launch.complete_nodelist, 0);
402 
403 	/* clean up */
404 	xfree(launch.resp_port);
405 	if (!ctx->launch_state->user_managed_io)
406 		xfree(launch.io_port);
407 
408 fail1:
409 	xfree(launch.user_name);
410 	xfree(launch.complete_nodelist);
411 	xfree(launch.cwd);
412 	env_array_free(env);
413 	job_options_destroy(launch.options);
414 	return rc;
415 }
416 
417 /*
418  * slurm_step_launch_add - Add tasks to a step that was already started
419  * IN ctx - job step context generated by slurm_step_ctx_create
420  * IN first_ctx - job step context generated by slurm_step_ctx_create for
421  *		first component of the job step
422  * IN params - job step parameters
423  * IN node_list - list of extra nodes to add
424  * IN start_nodeid - in the global scheme which node id is the first
425  *                   node in node_list.
426  * RET SLURM_SUCCESS or SLURM_ERROR (with errno set)
427  */
slurm_step_launch_add(slurm_step_ctx_t * ctx,slurm_step_ctx_t * first_ctx,const slurm_step_launch_params_t * params,char * node_list,int start_nodeid)428 extern int slurm_step_launch_add(slurm_step_ctx_t *ctx,
429 				 slurm_step_ctx_t *first_ctx,
430 				 const slurm_step_launch_params_t *params,
431 				 char *node_list, int start_nodeid)
432 {
433 	launch_tasks_request_msg_t launch;
434 	char **env = NULL;
435 	char **mpi_env = NULL;
436 	int rc = SLURM_SUCCESS;
437 	uint16_t resp_port = 0;
438 	bool preserve_env = params->preserve_env;
439 
440 	debug("Entering %s", __func__);
441 
442 	if ((ctx == NULL) || (ctx->magic != STEP_CTX_MAGIC)) {
443 		error("%s: Not a valid slurm_step_ctx_t", __func__);
444 		slurm_seterrno(EINVAL);
445 		return SLURM_ERROR;
446 	}
447 
448 	memset(&launch, 0, sizeof(launch));
449 
450 	/* Start tasks on compute nodes */
451 	launch.job_id = ctx->step_req->job_id;
452 	launch.uid = ctx->step_req->user_id;
453 	launch.gid = params->gid;
454 	launch.argc = params->argc;
455 	launch.argv = params->argv;
456 	launch.spank_job_env = params->spank_job_env;
457 	launch.spank_job_env_size = params->spank_job_env_size;
458 	launch.cred = ctx->step_resp->cred;
459 	launch.job_step_id = ctx->step_resp->job_step_id;
460 	launch.het_job_step_cnt = params->het_job_step_cnt;
461 	launch.het_job_id = params->het_job_id;
462 	launch.het_job_nnodes = params->het_job_nnodes;
463 	launch.het_job_ntasks = params->het_job_ntasks;
464 	launch.het_job_offset = params->het_job_offset;
465 	launch.het_job_task_offset = params->het_job_task_offset;
466 	launch.het_job_task_cnts = params->het_job_task_cnts;
467 	launch.het_job_tids = params->het_job_tids;
468 	launch.het_job_tid_offsets = params->het_job_tid_offsets;
469 	launch.het_job_node_list = params->het_job_node_list;
470 	if (params->env == NULL) {
471 		/*
472 		 * if the user didn't specify an environment, grab the
473 		 * environment of the running process
474 		 */
475 		env_array_merge(&env, (const char **)environ);
476 	} else {
477 		env_array_merge(&env, (const char **)params->env);
478 	}
479 	if (first_ctx->launch_state->resp_port)
480 		resp_port = first_ctx->launch_state->resp_port[0];
481 	if (params->het_job_ntasks != NO_VAL)
482 		preserve_env = true;
483 	env_array_for_step(&env, ctx->step_resp, &launch, resp_port,
484 			   preserve_env);
485 	env_array_merge(&env, (const char **)mpi_env);
486 	env_array_free(mpi_env);
487 
488 	launch.envc = envcount(env);
489 	launch.env = env;
490 	if (params->cwd)
491 		launch.cwd = xstrdup(params->cwd);
492 	else
493 		launch.cwd = _lookup_cwd();
494 	launch.alias_list	= params->alias_list;
495 	launch.nnodes		= ctx->step_resp->step_layout->node_cnt;
496 	launch.ntasks		= ctx->step_resp->step_layout->task_cnt;
497 	launch.slurmd_debug	= params->slurmd_debug;
498 	launch.switch_job	= ctx->step_resp->switch_job;
499 	launch.profile		= params->profile;
500 	launch.task_prolog	= params->task_prolog;
501 	launch.task_epilog	= params->task_epilog;
502 	launch.cpu_bind_type	= params->cpu_bind_type;
503 	launch.cpu_bind		= params->cpu_bind;
504 	launch.cpu_freq_min	= params->cpu_freq_min;
505 	launch.cpu_freq_max	= params->cpu_freq_max;
506 	launch.cpu_freq_gov	= params->cpu_freq_gov;
507 	launch.tres_bind	= params->tres_bind;
508 	launch.tres_freq	= params->tres_freq;
509 	launch.mem_bind_type	= params->mem_bind_type;
510 	launch.mem_bind		= params->mem_bind;
511 	launch.accel_bind_type	= params->accel_bind_type;
512 	launch.flags = 0;
513 	if (params->multi_prog)
514 		launch.flags |= LAUNCH_MULTI_PROG;
515 	launch.cpus_per_task	= params->cpus_per_task;
516 	launch.task_dist	= params->task_dist;
517 	launch.partition	= params->partition;
518 	if (params->pty)
519 		launch.flags |= LAUNCH_PTY;
520 	launch.acctg_freq	= params->acctg_freq;
521 	launch.open_mode        = params->open_mode;
522 	launch.options          = job_options_create();
523 	launch.complete_nodelist =
524 		xstrdup(ctx->step_resp->step_layout->node_list);
525 
526 	spank_set_remote_options (launch.options);
527 	if (params->parallel_debug)
528 		launch.flags |= LAUNCH_PARALLEL_DEBUG;
529 
530 	launch.tasks_to_launch = ctx->step_resp->step_layout->tasks;
531 	launch.global_task_ids = ctx->step_resp->step_layout->tids;
532 
533 	launch.select_jobinfo  = ctx->step_resp->select_jobinfo;
534 
535 	if (params->user_managed_io)
536 		launch.flags |= LAUNCH_USER_MANAGED_IO;
537 
538 	/* user_managed_io is true */
539 	if (!ctx->launch_state->io.user) {
540 		launch.ofname = params->remote_output_filename;
541 		launch.efname = params->remote_error_filename;
542 		launch.ifname = params->remote_input_filename;
543 		if (params->buffered_stdio)
544 			launch.flags	|= LAUNCH_BUFFERED_IO;
545 		if (params->labelio)
546 			launch.flags	|= LAUNCH_LABEL_IO;
547 		ctx->launch_state->io.normal =
548 			client_io_handler_create(params->local_fds,
549 						 ctx->step_req->num_tasks,
550 						 launch.nnodes,
551 						 ctx->step_resp->cred,
552 						 params->labelio,
553 						 params->het_job_offset,
554 						 params->het_job_task_offset);
555 		if (ctx->launch_state->io.normal == NULL) {
556 			rc = SLURM_ERROR;
557 			goto fail1;
558 		}
559 		/*
560 		 * The client_io_t gets a pointer back to the slurm_launch_state
561 		 * to notify it of I/O errors.
562 		 */
563 		ctx->launch_state->io.normal->sls = ctx->launch_state;
564 
565 		if (client_io_handler_start(ctx->launch_state->io.normal)
566 		    != SLURM_SUCCESS) {
567 			rc = SLURM_ERROR;
568 			goto fail1;
569 		}
570 		launch.num_io_port = ctx->launch_state->io.normal->num_listen;
571 		launch.io_port = xmalloc(sizeof(uint16_t) * launch.num_io_port);
572 		memcpy(launch.io_port, ctx->launch_state->io.normal->listenport,
573 		       (sizeof(uint16_t) * launch.num_io_port));
574 		/*
575 		 * If the io timeout is > 0, create a flag to ping the stepds
576 		 * if io_timeout seconds pass without stdio traffic to/from
577 		 * the node.
578 		 */
579 		ctx->launch_state->io_timeout = slurm_get_msg_timeout();
580 	} else { /* user_managed_io is true */
581 		xrealloc(ctx->launch_state->io.user->sockets,
582 			 sizeof(int) * ctx->step_req->num_tasks);
583 	}
584 
585 	if (first_ctx->launch_state->num_resp_port &&
586 	    first_ctx->launch_state->resp_port) {
587 		launch.num_resp_port = first_ctx->launch_state->num_resp_port;
588 		launch.resp_port = xmalloc(sizeof(uint16_t) *
589 					  launch.num_resp_port);
590 		memcpy(launch.resp_port, first_ctx->launch_state->resp_port,
591 		       (sizeof(uint16_t) * launch.num_resp_port));
592 	}
593 
594 	rc = _launch_tasks(ctx, &launch, params->msg_timeout,
595 			   node_list, start_nodeid);
596 
597 fail1:
598 	/* clean up */
599 	xfree(launch.user_name);
600 	xfree(launch.resp_port);
601 	if (!ctx->launch_state->user_managed_io)
602 		xfree(launch.io_port);
603 
604 	xfree(launch.cwd);
605 	env_array_free(env);
606 	job_options_destroy(launch.options);
607 
608 	return rc;
609 }
610 
_step_abort(slurm_step_ctx_t * ctx)611 static void _step_abort(slurm_step_ctx_t *ctx)
612 {
613 	struct step_launch_state *sls = ctx->launch_state;
614 
615 	if (!sls->abort_action_taken) {
616 		slurm_kill_job_step(ctx->job_id, ctx->step_resp->job_step_id,
617 				    SIGKILL);
618 		sls->abort_action_taken = true;
619 	}
620 }
621 
622 /*
623  * Block until all tasks have started.
624  */
slurm_step_launch_wait_start(slurm_step_ctx_t * ctx)625 int slurm_step_launch_wait_start(slurm_step_ctx_t *ctx)
626 {
627 	struct step_launch_state *sls = ctx->launch_state;
628 	struct timespec ts;
629 
630 	ts.tv_sec  = time(NULL);
631 	ts.tv_nsec = 0;
632 	ts.tv_sec += 600;	/* 10 min allowed for launch */
633 
634 	/* Wait for all tasks to start */
635 	slurm_mutex_lock(&sls->lock);
636 	while (bit_set_count(sls->tasks_started) < sls->tasks_requested) {
637 		if (sls->abort) {
638 			_step_abort(ctx);
639 			slurm_mutex_unlock(&sls->lock);
640 			return SLURM_ERROR;
641 		}
642 		if (pthread_cond_timedwait(&sls->cond, &sls->lock, &ts) ==
643 		    ETIMEDOUT) {
644 			error("timeout waiting for task launch, "
645 			      "started %d of %d tasks",
646 			      bit_set_count(sls->tasks_started),
647 			      sls->tasks_requested);
648 			sls->abort = true;
649 			_step_abort(ctx);
650 			slurm_cond_broadcast(&sls->cond);
651 			slurm_mutex_unlock(&sls->lock);
652 			return SLURM_ERROR;
653 		}
654 	}
655 
656 	if (sls->user_managed_io) {
657 		while (sls->io.user->connected < sls->tasks_requested) {
658 			if (sls->abort) {
659 				_step_abort(ctx);
660 				slurm_mutex_unlock(&sls->lock);
661 				return SLURM_ERROR;
662 			}
663 			if (pthread_cond_timedwait(&sls->cond, &sls->lock,
664 						   &ts) == ETIMEDOUT) {
665 				error("timeout waiting for I/O connect");
666 				sls->abort = true;
667 				_step_abort(ctx);
668 				slurm_cond_broadcast(&sls->cond);
669 				slurm_mutex_unlock(&sls->lock);
670 				return SLURM_ERROR;
671 			}
672 		}
673 	}
674 
675 	_cr_notify_step_launch(ctx);
676 
677 	slurm_mutex_unlock(&sls->lock);
678 	return SLURM_SUCCESS;
679 }
680 
681 /*
682  * Block until all tasks have finished (or failed to start altogether).
683  */
slurm_step_launch_wait_finish(slurm_step_ctx_t * ctx)684 void slurm_step_launch_wait_finish(slurm_step_ctx_t *ctx)
685 {
686 	struct step_launch_state *sls;
687 	struct timespec ts = {0, 0};
688 	bool time_set = false;
689 	int errnum;
690 
691 	if (!ctx || (ctx->magic != STEP_CTX_MAGIC))
692 		return;
693 
694 	sls = ctx->launch_state;
695 
696 	/* Wait for all tasks to complete */
697 	slurm_mutex_lock(&sls->lock);
698 	while (bit_set_count(sls->tasks_exited) < sls->tasks_requested) {
699 		if (!sls->abort) {
700 			slurm_cond_wait(&sls->cond, &sls->lock);
701 		} else {
702 			if (!sls->abort_action_taken) {
703 				slurm_kill_job_step(ctx->job_id,
704 						    ctx->step_resp->
705 						    job_step_id,
706 						    SIGKILL);
707 				sls->abort_action_taken = true;
708 			}
709 			if (!time_set) {
710 				uint16_t kill_wait;
711 				/* Only set the time once, because we only want
712 				 * to wait STEP_ABORT_TIME, no matter how many
713 				 * times the condition variable is signaled.
714 				 */
715 				kill_wait = slurm_get_kill_wait();
716 				ts.tv_sec = time(NULL) + STEP_ABORT_TIME
717 					+ kill_wait;
718 				time_set = true;
719 				/* FIXME - should this be a callback? */
720 				info("Job step aborted: Waiting up to "
721 				     "%d seconds for job step to finish.",
722 				     kill_wait + STEP_ABORT_TIME);
723 			}
724 
725 			errnum = pthread_cond_timedwait(&sls->cond,
726 							&sls->lock, &ts);
727 			if (errnum == ETIMEDOUT) {
728 				error("Timed out waiting for job step to "
729 				      "complete");
730 				/*
731 				 * Send kill again, in case steps were still
732 				 * launching the first time.
733 				 * FIXME - eventually the slurmd should
734 				 *   be made smart enough to really ensure
735 				 *   that a killed step never starts.
736 				 */
737 				slurm_kill_job_step(ctx->job_id,
738 						    ctx->step_resp->job_step_id,
739 						    SIGKILL);
740 				if (!sls->user_managed_io) {
741 					client_io_handler_abort(sls->
742 								io.normal);
743 				}
744 				break;
745 			} else if (errnum != 0) {
746 				error("Error waiting on condition in"
747 				      " slurm_step_launch_wait_finish: %m");
748 				if (!sls->user_managed_io) {
749 					client_io_handler_abort(sls->
750 								io.normal);
751 				}
752 				break;
753 			}
754 		}
755 	}
756 	if (sls->abort && !time_set)
757 		info("Job step aborted");	/* no need to wait */
758 
759 	if (!force_terminated_job && task_exit_signal)
760 		info("Force Terminated job step %u.%u",
761 		     ctx->job_id, ctx->step_resp->job_step_id);
762 
763 	/*
764 	 * task_exit_signal != 0 when srun receives a message that a task
765 	 * exited with a SIGTERM or SIGKILL.  Without this test, a hang in srun
766 	 * might occur when a node gets a hard power failure, and TCP does not
767 	 * indicate that the I/O connection closed.  The I/O thread could
768 	 * block waiting for an EOF message, even though the remote process
769 	 * has died.  In this case, use client_io_handler_abort to force the
770 	 * I/O thread to stop listening for stdout or stderr and shutdown.
771 	 */
772 	if (task_exit_signal && !sls->user_managed_io) {
773 		client_io_handler_abort(sls->io.normal);
774 	}
775 
776 	/* Then shutdown the message handler thread */
777 	if (sls->msg_handle)
778 		eio_signal_shutdown(sls->msg_handle);
779 
780 	slurm_mutex_unlock(&sls->lock);
781 	if (sls->msg_thread)
782 		pthread_join(sls->msg_thread, NULL);
783 	slurm_mutex_lock(&sls->lock);
784 	pmi_kvs_free();
785 
786 	if (sls->msg_handle) {
787 		eio_handle_destroy(sls->msg_handle);
788 		sls->msg_handle = NULL;
789 	}
790 
791 	/* Shutdown the IO timeout thread, if one exists */
792 	if (sls->io_timeout_thread_created) {
793 		sls->halt_io_test = true;
794 		slurm_cond_broadcast(&sls->cond);
795 
796 		slurm_mutex_unlock(&sls->lock);
797 		pthread_join(sls->io_timeout_thread, NULL);
798 		slurm_mutex_lock(&sls->lock);
799 	}
800 
801 	/* Then wait for the IO thread to finish */
802 	if (!sls->user_managed_io) {
803 		slurm_mutex_unlock(&sls->lock);
804 		client_io_handler_finish(sls->io.normal);
805 		slurm_mutex_lock(&sls->lock);
806 
807 		client_io_handler_destroy(sls->io.normal);
808 		sls->io.normal = NULL;
809 	}
810 
811 	sls->mpi_rc = mpi_hook_client_fini(sls->mpi_state);
812 	slurm_mutex_unlock(&sls->lock);
813 }
814 
815 /*
816  * Abort an in-progress launch, or terminate the fully launched job step.
817  *
818  * Can be called from a signal handler.
819  */
slurm_step_launch_abort(slurm_step_ctx_t * ctx)820 void slurm_step_launch_abort(slurm_step_ctx_t *ctx)
821 {
822 	struct step_launch_state *sls;
823 
824 	if (!ctx || ctx->magic != STEP_CTX_MAGIC)
825 		return;
826 
827 	sls = ctx->launch_state;
828 
829 	slurm_mutex_lock(&sls->lock);
830 	sls->abort = true;
831 	slurm_cond_broadcast(&sls->cond);
832 	slurm_mutex_unlock(&sls->lock);
833 }
834 
835 /*
836  * Forward a signal to all those nodes with running tasks
837  */
slurm_step_launch_fwd_signal(slurm_step_ctx_t * ctx,int signo)838 extern void slurm_step_launch_fwd_signal(slurm_step_ctx_t *ctx, int signo)
839 {
840 	int node_id, j, num_tasks;
841 	slurm_msg_t req;
842 	signal_tasks_msg_t msg;
843 	hostlist_t hl;
844 	char *name = NULL;
845 	List ret_list = NULL;
846 	ListIterator itr;
847 	ret_data_info_t *ret_data_info = NULL;
848 	int rc = SLURM_SUCCESS;
849 	struct step_launch_state *sls = ctx->launch_state;
850 	bool retry = false;
851 	int retry_cnt = 0;
852 
853 	/* common to all tasks */
854 	memset(&msg, 0, sizeof(msg));
855 	msg.job_id      = ctx->job_id;
856 	msg.job_step_id = ctx->step_resp->job_step_id;
857 	msg.signal      = (uint16_t) signo;
858 
859 	slurm_mutex_lock(&sls->lock);
860 
861 	hl = hostlist_create(NULL);
862 	for (node_id = 0;
863 	     node_id < ctx->step_resp->step_layout->node_cnt;
864 	     node_id++) {
865 		bool active = false;
866 		num_tasks = sls->layout->tasks[node_id];
867 		for (j = 0; j < num_tasks; j++) {
868 			if (!bit_test(sls->tasks_exited,
869 				      sls->layout->tids[node_id][j])) {
870 				/* this one has active tasks */
871 				active = true;
872 				break;
873 			}
874 		}
875 
876 		if (!active)
877 			continue;
878 
879 		if (ctx->step_resp->step_layout->front_end) {
880 			hostlist_push_host(hl,
881 				      ctx->step_resp->step_layout->front_end);
882 			break;
883 		} else {
884 			name = nodelist_nth_host(sls->layout->node_list,
885 						 node_id);
886 			hostlist_push_host(hl, name);
887 			free(name);
888 		}
889 	}
890 
891 	slurm_mutex_unlock(&sls->lock);
892 
893 	if (!hostlist_count(hl)) {
894 		verbose("no active tasks in step %u.%u to send signal %d",
895 		        ctx->job_id, ctx->step_resp->job_step_id, signo);
896 		hostlist_destroy(hl);
897 		return;
898 	}
899 	name = hostlist_ranged_string_xmalloc(hl);
900 	hostlist_destroy(hl);
901 
902 RESEND:	slurm_msg_t_init(&req);
903 	req.msg_type = REQUEST_SIGNAL_TASKS;
904 	req.data     = &msg;
905 
906 	if (ctx->step_resp->use_protocol_ver)
907 		req.protocol_version = ctx->step_resp->use_protocol_ver;
908 
909 	debug2("sending signal %d to step %u.%u on hosts %s",
910 	       signo, ctx->job_id, ctx->step_resp->job_step_id, name);
911 
912 	if (!(ret_list = slurm_send_recv_msgs(name, &req, 0))) {
913 		error("fwd_signal: slurm_send_recv_msgs really failed badly");
914 		xfree(name);
915 		return;
916 	}
917 
918 	itr = list_iterator_create(ret_list);
919 	while ((ret_data_info = list_next(itr))) {
920 		rc = slurm_get_return_code(ret_data_info->type,
921 					   ret_data_info->data);
922 		/*
923 		 * Report error unless it is "Invalid job id" which
924 		 * probably just means the tasks exited in the meanwhile.
925 		 */
926 		if ((rc != 0) && (rc != ESLURM_INVALID_JOB_ID) &&
927 		    (rc != ESLURMD_JOB_NOTRUNNING) && (rc != ESRCH) &&
928 		    (rc != EAGAIN) &&
929 		    (rc != ESLURM_TRANSITION_STATE_NO_UPDATE)) {
930 			error("Failure sending signal %d to step %u.%u on node %s: %s",
931 			      signo, ctx->job_id, ctx->step_resp->job_step_id,
932 			      ret_data_info->node_name, slurm_strerror(rc));
933 		}
934 		if ((rc == EAGAIN) || (rc == ESLURM_TRANSITION_STATE_NO_UPDATE))
935 			retry = true;
936 	}
937 	list_iterator_destroy(itr);
938 	FREE_NULL_LIST(ret_list);
939 	if (retry) {
940 		retry = false;
941 		if (retry_cnt++ < 4) {
942 			sleep(retry_cnt);
943 			goto RESEND;
944 		}
945 	}
946 	xfree(name);
947 }
948 
949 /**********************************************************************
950  * Functions used by step_ctx code, but not exported throught the API
951  **********************************************************************/
952 /*
953  * Create a launch state structure for a specified step context, "ctx".
954  */
step_launch_state_create(slurm_step_ctx_t * ctx)955 struct step_launch_state *step_launch_state_create(slurm_step_ctx_t *ctx)
956 {
957 	struct step_launch_state *sls;
958 	slurm_step_layout_t *layout = ctx->step_resp->step_layout;
959 	int ii;
960 
961 	sls = xmalloc(sizeof(struct step_launch_state));
962 	sls->slurmctld_socket_fd = -1;
963 	sls->tasks_requested = layout->task_cnt;
964 	sls->tasks_started = bit_alloc(layout->task_cnt);
965 	sls->tasks_exited = bit_alloc(layout->task_cnt);
966 	sls->node_io_error = bit_alloc(layout->node_cnt);
967 	sls->io_deadline = (time_t *)xmalloc(sizeof(time_t) * layout->node_cnt);
968 	sls->io_timeout_thread_created = false;
969 	sls->io_timeout = 0;
970 	sls->halt_io_test = false;
971 	sls->layout = layout;
972 	sls->resp_port = NULL;
973 	sls->abort = false;
974 	sls->abort_action_taken = false;
975 	/* NOTE: No malloc() of sls->mpi_info required */
976 	sls->mpi_info->jobid = ctx->step_req->job_id;
977 	sls->mpi_info->het_job_id = NO_VAL;
978 	sls->mpi_info->het_job_task_offset = NO_VAL;
979 	sls->mpi_info->stepid = ctx->step_resp->job_step_id;
980 	sls->mpi_info->step_layout = layout;
981 	sls->mpi_state = NULL;
982 	slurm_mutex_init(&sls->lock);
983 	slurm_cond_init(&sls->cond, NULL);
984 
985 	for (ii = 0; ii < layout->node_cnt; ii++) {
986 		sls->io_deadline[ii] = (time_t)NO_VAL;
987 	}
988 	return sls;
989 }
990 
991 /*
992  * If a steps size has changed update the launch_state structure for a
993  * specified step context, "ctx".
994  */
step_launch_state_alter(slurm_step_ctx_t * ctx)995 void step_launch_state_alter(slurm_step_ctx_t *ctx)
996 {
997 	struct step_launch_state *sls = ctx->launch_state;
998 	slurm_step_layout_t *layout = ctx->step_resp->step_layout;
999 	int ii;
1000 
1001 	xassert(sls);
1002 	sls->tasks_requested = layout->task_cnt;
1003 	sls->tasks_started = bit_realloc(sls->tasks_started, layout->task_cnt);
1004 	sls->tasks_exited = bit_realloc(sls->tasks_exited, layout->task_cnt);
1005 	sls->node_io_error = bit_realloc(sls->node_io_error, layout->node_cnt);
1006 	xrealloc(sls->io_deadline, sizeof(time_t) * layout->node_cnt);
1007 	sls->layout = sls->mpi_info->step_layout = layout;
1008 
1009 	for (ii = 0; ii < layout->node_cnt; ii++) {
1010 		sls->io_deadline[ii] = (time_t)NO_VAL;
1011 	}
1012 }
1013 
1014 /*
1015  * Free the memory associated with the a launch state structure.
1016  */
step_launch_state_destroy(struct step_launch_state * sls)1017 void step_launch_state_destroy(struct step_launch_state *sls)
1018 {
1019 	/* First undo anything created in step_launch_state_create() */
1020 	slurm_mutex_destroy(&sls->lock);
1021 	slurm_cond_destroy(&sls->cond);
1022 	FREE_NULL_BITMAP(sls->tasks_started);
1023 	FREE_NULL_BITMAP(sls->tasks_exited);
1024 	FREE_NULL_BITMAP(sls->node_io_error);
1025 	xfree(sls->io_deadline);
1026 
1027 	/* Now clean up anything created by slurm_step_launch() */
1028 	if (sls->resp_port != NULL) {
1029 		xfree(sls->resp_port);
1030 	}
1031 }
1032 
1033 /**********************************************************************
1034  * CR functions
1035  **********************************************************************/
1036 
1037 /* connect to srun_cr */
_connect_srun_cr(char * addr)1038 static int _connect_srun_cr(char *addr)
1039 {
1040 	struct sockaddr_un sa;
1041 	unsigned int sa_len;
1042 	int fd, rc;
1043 
1044 	if (!addr) {
1045 		error("%s: socket path name is NULL", __func__);
1046 		return -1;
1047 	}
1048 	if (strlen(addr) >= sizeof(sa.sun_path)) {
1049 		error("%s: socket path name too long (%s)", __func__, addr);
1050 		return -1;
1051 	}
1052 
1053 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
1054 	if (fd < 0) {
1055 		error("failed creating cr socket: %m");
1056 		return -1;
1057 	}
1058 	memset(&sa, 0, sizeof(sa));
1059 
1060 	sa.sun_family = AF_UNIX;
1061 	strlcpy(sa.sun_path, addr, sizeof(sa.sun_path));
1062 	sa_len = strlen(sa.sun_path) + sizeof(sa.sun_family);
1063 
1064 	while (((rc = connect(fd, (struct sockaddr *)&sa, sa_len)) < 0) &&
1065 	       (errno == EINTR));
1066 
1067 	if (rc < 0) {
1068 		debug2("failed connecting cr socket: %m");
1069 		close(fd);
1070 		return -1;
1071 	}
1072 	return fd;
1073 }
1074 
1075 /* send job_id, step_id, node_list to srun_cr */
_cr_notify_step_launch(slurm_step_ctx_t * ctx)1076 static int _cr_notify_step_launch(slurm_step_ctx_t *ctx)
1077 {
1078 	int fd, len, rc = 0;
1079 	char *cr_sock_addr = NULL;
1080 
1081 	cr_sock_addr = getenv("SLURM_SRUN_CR_SOCKET");
1082 	if (cr_sock_addr == NULL) { /* not run under srun_cr */
1083 		return 0;
1084 	}
1085 
1086 	if ((fd = _connect_srun_cr(cr_sock_addr)) < 0) {
1087 		debug2("failed connecting srun_cr. take it not running under "
1088 		       "srun_cr.");
1089 		return 0;
1090 	}
1091 	if (write(fd, &ctx->job_id, sizeof(uint32_t)) != sizeof(uint32_t)) {
1092 		error("failed writing job_id to srun_cr: %m");
1093 		rc = -1;
1094 		goto out;
1095 	}
1096 	if (write(fd, &ctx->step_resp->job_step_id, sizeof(uint32_t)) !=
1097 	    sizeof(uint32_t)) {
1098 		error("failed writing job_step_id to srun_cr: %m");
1099 		rc = -1;
1100 		goto out;
1101 	}
1102 	len = strlen(ctx->step_resp->step_layout->node_list);
1103 	if (write(fd, &len, sizeof(int)) != sizeof(int)) {
1104 		error("failed writing nodelist length to srun_cr: %m");
1105 		rc = -1;
1106 		goto out;
1107 	}
1108 	if (write(fd, ctx->step_resp->step_layout->node_list, len + 1) !=
1109 	    (len + 1)) {
1110 		error("failed writing nodelist to srun_cr: %m");
1111 		rc = -1;
1112 	}
1113  out:
1114 	close (fd);
1115 	return rc;
1116 }
1117 
1118 /**********************************************************************
1119  * Message handler functions
1120  **********************************************************************/
_msg_thr_internal(void * arg)1121 static void *_msg_thr_internal(void *arg)
1122 {
1123 	struct step_launch_state *sls = (struct step_launch_state *)arg;
1124 
1125 	eio_handle_mainloop(sls->msg_handle);
1126 
1127 	return NULL;
1128 }
1129 
1130 static inline int
_estimate_nports(int nclients,int cli_per_port)1131 _estimate_nports(int nclients, int cli_per_port)
1132 {
1133 	div_t d;
1134 	d = div(nclients, cli_per_port);
1135 	return d.rem > 0 ? d.quot + 1 : d.quot;
1136 }
1137 
_msg_thr_create(struct step_launch_state * sls,int num_nodes)1138 static int _msg_thr_create(struct step_launch_state *sls, int num_nodes)
1139 {
1140 	int sock = -1;
1141 	uint16_t port;
1142 	eio_obj_t *obj;
1143 	int i, rc = SLURM_SUCCESS;
1144 	uint16_t *ports;
1145 	uint16_t eio_timeout;
1146 
1147 	debug("Entering _msg_thr_create()");
1148 	slurm_uid = (uid_t) slurm_get_slurm_user_id();
1149 
1150 	eio_timeout = slurm_get_srun_eio_timeout();
1151 	sls->msg_handle = eio_handle_create(eio_timeout);
1152 	sls->num_resp_port = _estimate_nports(num_nodes, 48);
1153 	sls->resp_port = xmalloc(sizeof(uint16_t) * sls->num_resp_port);
1154 
1155 	/* multiple jobs (easily induced via no_alloc) and highly
1156 	 * parallel jobs using PMI sometimes result in slow message
1157 	 * responses and timeouts. Raise the default timeout for srun. */
1158 	if (!message_socket_ops.timeout)
1159 		message_socket_ops.timeout = slurm_get_msg_timeout() * 8000;
1160 
1161 	ports = slurm_get_srun_port_range();
1162 	for (i = 0; i < sls->num_resp_port; i++) {
1163 		int cc;
1164 
1165 		if (ports)
1166 			cc = net_stream_listen_ports(&sock, &port, ports, false);
1167 		else
1168 			cc = net_stream_listen(&sock, &port);
1169 		if (cc < 0) {
1170 			error("unable to initialize step launch listening "
1171 			      "socket: %m");
1172 			return SLURM_ERROR;
1173 		}
1174 		sls->resp_port[i] = port;
1175 		obj = eio_obj_create(sock, &message_socket_ops, (void *)sls);
1176 		eio_new_initial_obj(sls->msg_handle, obj);
1177 	}
1178 	/* finally, add the listening port that we told the slurmctld about
1179 	 * eariler in the step context creation phase */
1180 	if (sls->slurmctld_socket_fd > -1) {
1181 		obj = eio_obj_create(sls->slurmctld_socket_fd,
1182 				     &message_socket_ops, (void *)sls);
1183 		eio_new_initial_obj(sls->msg_handle, obj);
1184 	}
1185 
1186 	slurm_thread_create(&sls->msg_thread, _msg_thr_internal, sls);
1187 	return rc;
1188 }
1189 
1190 static void
_launch_handler(struct step_launch_state * sls,slurm_msg_t * resp)1191 _launch_handler(struct step_launch_state *sls, slurm_msg_t *resp)
1192 {
1193 	launch_tasks_response_msg_t *msg = resp->data;
1194 	int i;
1195 
1196 	slurm_mutex_lock(&sls->lock);
1197 	if ((msg->count_of_pids > 0) &&
1198 	    bit_test(sls->tasks_started, msg->task_ids[0])) {
1199 		debug("%s: duplicate launch response received from node %s",
1200 		      __func__, msg->node_name);
1201 		slurm_mutex_unlock(&sls->lock);
1202 		return;
1203 	}
1204 
1205 	if (msg->return_code) {
1206 		for (i = 0; i < msg->count_of_pids; i++) {
1207 			error("task %u launch failed: %s",
1208 			      msg->task_ids[i],
1209 			      slurm_strerror(msg->return_code));
1210 			bit_set(sls->tasks_started, msg->task_ids[i]);
1211 			bit_set(sls->tasks_exited, msg->task_ids[i]);
1212 		}
1213 	} else {
1214 		for (i = 0; i < msg->count_of_pids; i++)
1215 			bit_set(sls->tasks_started, msg->task_ids[i]);
1216 	}
1217 	if (sls->callback.task_start != NULL)
1218 		(sls->callback.task_start)(msg);
1219 
1220 	slurm_cond_broadcast(&sls->cond);
1221 	slurm_mutex_unlock(&sls->lock);
1222 
1223 }
1224 
1225 static void
_exit_handler(struct step_launch_state * sls,slurm_msg_t * exit_msg)1226 _exit_handler(struct step_launch_state *sls, slurm_msg_t *exit_msg)
1227 {
1228 	task_exit_msg_t *msg = (task_exit_msg_t *) exit_msg->data;
1229 	void (*task_finish)(task_exit_msg_t *);
1230 	int i;
1231 
1232 	if ((msg->job_id != sls->mpi_info->jobid) ||
1233 	    (msg->step_id != sls->mpi_info->stepid)) {
1234 		debug("Received MESSAGE_TASK_EXIT from wrong job: %u.%u",
1235 		      msg->job_id, msg->step_id);
1236 		return;
1237 	}
1238 
1239 	/* Record SIGTERM and SIGKILL termination codes to
1240 	 * recognize abnormal termination */
1241 	if (WIFSIGNALED(msg->return_code)) {
1242 		i = WTERMSIG(msg->return_code);
1243 		if ((i == SIGKILL) || (i == SIGTERM))
1244 			task_exit_signal = i;
1245 	}
1246 
1247 	slurm_mutex_lock(&sls->lock);
1248 	task_finish = sls->callback.task_finish;
1249 	slurm_mutex_unlock(&sls->lock);
1250 	if (task_finish != NULL)
1251 		(task_finish)(msg);	/* Outside of lock for performance */
1252 
1253 	slurm_mutex_lock(&sls->lock);
1254 	for (i = 0; i < msg->num_tasks; i++) {
1255 		debug("task %u done", msg->task_id_list[i]);
1256 		bit_set(sls->tasks_exited, msg->task_id_list[i]);
1257 	}
1258 
1259 	slurm_cond_broadcast(&sls->cond);
1260 	slurm_mutex_unlock(&sls->lock);
1261 }
1262 
1263 static void
_job_complete_handler(struct step_launch_state * sls,slurm_msg_t * complete_msg)1264 _job_complete_handler(struct step_launch_state *sls, slurm_msg_t *complete_msg)
1265 {
1266 	srun_job_complete_msg_t *step_msg =
1267 		(srun_job_complete_msg_t *) complete_msg->data;
1268 
1269 	if (step_msg->step_id == NO_VAL) {
1270 		verbose("Complete job %u received",
1271 			step_msg->job_id);
1272 	} else {
1273 		verbose("Complete job step %u.%u received",
1274 			step_msg->job_id, step_msg->step_id);
1275 	}
1276 
1277 	if (sls->callback.step_complete)
1278 		(sls->callback.step_complete)(step_msg);
1279 
1280 	force_terminated_job = true;
1281 	slurm_mutex_lock(&sls->lock);
1282 	sls->abort = true;
1283 	slurm_cond_broadcast(&sls->cond);
1284 	slurm_mutex_unlock(&sls->lock);
1285 }
1286 
1287 static void
_timeout_handler(struct step_launch_state * sls,slurm_msg_t * timeout_msg)1288 _timeout_handler(struct step_launch_state *sls, slurm_msg_t *timeout_msg)
1289 {
1290 	srun_timeout_msg_t *step_msg =
1291 		(srun_timeout_msg_t *) timeout_msg->data;
1292 
1293 	if (sls->callback.step_timeout)
1294 		(sls->callback.step_timeout)(step_msg);
1295 
1296 	slurm_mutex_lock(&sls->lock);
1297 	slurm_cond_broadcast(&sls->cond);
1298 	slurm_mutex_unlock(&sls->lock);
1299 }
1300 
1301 /*
1302  * Take the list of node names of down nodes and convert into an
1303  * array of nodeids for the step.  The nodeid array is passed to
1304  * client_io_handler_downnodes to notify the IO handler to expect no
1305  * further IO from that node.
1306  */
1307 static void
_node_fail_handler(struct step_launch_state * sls,slurm_msg_t * fail_msg)1308 _node_fail_handler(struct step_launch_state *sls, slurm_msg_t *fail_msg)
1309 {
1310 	srun_node_fail_msg_t *nf = fail_msg->data;
1311 	hostset_t fail_nodes, all_nodes;
1312 	hostlist_iterator_t fail_itr;
1313 	int num_node_ids;
1314 	int *node_ids;
1315 	int i, j;
1316 	int node_id, num_tasks;
1317 
1318 	error("Node failure on %s", nf->nodelist);
1319 
1320 	fail_nodes = hostset_create(nf->nodelist);
1321 	fail_itr = hostset_iterator_create(fail_nodes);
1322 	num_node_ids = hostset_count(fail_nodes);
1323 	node_ids = xmalloc(sizeof(int) * num_node_ids);
1324 
1325 	slurm_mutex_lock(&sls->lock);
1326 	all_nodes = hostset_create(sls->layout->node_list);
1327 	/* find the index number of each down node */
1328 	for (i = 0; i < num_node_ids; i++) {
1329 #ifdef HAVE_FRONT_END
1330 		node_id = 0;
1331 #else
1332 		char *node = hostlist_next(fail_itr);
1333 		node_id = node_ids[i] = hostset_find(all_nodes, node);
1334 		if (node_id < 0) {
1335 			error(  "Internal error: bad SRUN_NODE_FAIL message. "
1336 				"Node %s not part of this job step", node);
1337 			free(node);
1338 			continue;
1339 		}
1340 		free(node);
1341 #endif
1342 
1343 		/* find all of the tasks that should run on this node and
1344 		 * mark them as having started and exited.  If they haven't
1345 		 * started yet, they never will, and likewise for exiting.
1346 		 */
1347 		num_tasks = sls->layout->tasks[node_id];
1348 		for (j = 0; j < num_tasks; j++) {
1349 			debug2("marking task %d done on failed node %d",
1350 			       sls->layout->tids[node_id][j], node_id);
1351 			bit_set(sls->tasks_started,
1352 				sls->layout->tids[node_id][j]);
1353 			bit_set(sls->tasks_exited,
1354 				sls->layout->tids[node_id][j]);
1355 		}
1356 	}
1357 
1358 	if (!sls->user_managed_io) {
1359 		client_io_handler_downnodes(sls->io.normal, node_ids,
1360 					    num_node_ids);
1361 	}
1362 	slurm_cond_broadcast(&sls->cond);
1363 	slurm_mutex_unlock(&sls->lock);
1364 
1365 	xfree(node_ids);
1366 	hostlist_iterator_destroy(fail_itr);
1367 	hostset_destroy(fail_nodes);
1368 	hostset_destroy(all_nodes);
1369 }
1370 
1371 /*
1372  * Receive a message when a slurmd cold starts, that the step on that node
1373  * may have died.  Verify that tasks on these nodes(s) are still alive,
1374  * and abort the job step if they are not.
1375  * This message could be the result of the slurmd daemon cold-starting
1376  * or a race condition when tasks are starting or terminating.
1377  */
1378 static void
_step_missing_handler(struct step_launch_state * sls,slurm_msg_t * missing_msg)1379 _step_missing_handler(struct step_launch_state *sls, slurm_msg_t *missing_msg)
1380 {
1381 	srun_step_missing_msg_t *step_missing = missing_msg->data;
1382 	hostset_t fail_nodes, all_nodes;
1383 	hostlist_iterator_t fail_itr;
1384 	char *node;
1385 	int num_node_ids;
1386 	int i, j;
1387 	int node_id;
1388 	client_io_t *cio = sls->io.normal;
1389 	bool  test_message_sent;
1390 	int   num_tasks;
1391 	bool  active;
1392 
1393 	debug("Step %u.%u missing from node(s) %s",
1394 	      step_missing->job_id, step_missing->step_id,
1395 	      step_missing->nodelist);
1396 
1397 	/* Ignore this message in the unusual "user_managed_io" case.  No way
1398 	   to confirm a bad connection, since a test message goes straight to
1399 	   the task.  Aborting without checking may be too dangerous.  This
1400 	   choice may cause srun to not exit even though the job step has
1401 	   ended. */
1402 	if (sls->user_managed_io)
1403 		return;
1404 
1405 	slurm_mutex_lock(&sls->lock);
1406 
1407 	if (!sls->io_timeout_thread_created) {
1408 		sls->io_timeout_thread_created = true;
1409 		slurm_thread_create(&sls->io_timeout_thread,
1410 				    _check_io_timeout, sls);
1411 	}
1412 
1413 	fail_nodes = hostset_create(step_missing->nodelist);
1414 	fail_itr = hostset_iterator_create(fail_nodes);
1415 	num_node_ids = hostset_count(fail_nodes);
1416 
1417 	all_nodes = hostset_create(sls->layout->node_list);
1418 
1419 	for (i = 0; i < num_node_ids; i++) {
1420 		node = hostlist_next(fail_itr);
1421 		node_id = hostset_find(all_nodes, node);
1422 		if (node_id < 0) {
1423 			error("Internal error: bad SRUN_STEP_MISSING message. "
1424 			      "Node %s not part of this job step", node);
1425 			free(node);
1426 			continue;
1427 		}
1428 		free(node);
1429 
1430 		/*
1431 		 * If all tasks for this node have either not started or already
1432 		 * exited, ignore the missing step message for this node.
1433 		 */
1434 		num_tasks = sls->layout->tasks[node_id];
1435 		active = false;
1436 		for (j = 0; j < num_tasks; j++) {
1437 			if (bit_test(sls->tasks_started,
1438 				     sls->layout->tids[node_id][j]) &&
1439 			    !bit_test(sls->tasks_exited,
1440 				      sls->layout->tids[node_id][j])) {
1441 				active = true;
1442 				break;
1443 			}
1444 		}
1445 		if (!active)
1446 			continue;
1447 
1448 		/* If this is true, an I/O error has already occurred on the
1449 		 * stepd for the current node, and the job should abort */
1450 		if (bit_test(sls->node_io_error, node_id)) {
1451 			error("Aborting, step missing and io error on node %d",
1452 			      node_id);
1453 			sls->abort = true;
1454 			slurm_cond_broadcast(&sls->cond);
1455 			break;
1456 		}
1457 
1458 		/*
1459 		 * A test is already is progress. Ignore message for this node.
1460 		 */
1461 		if (sls->io_deadline[node_id] != NO_VAL) {
1462 			debug("Test in progress for node %d, ignoring message",
1463 			      node_id);
1464 			continue;
1465 		}
1466 
1467 		sls->io_deadline[node_id] = time(NULL) + sls->io_timeout;
1468 
1469 		debug("Testing connection to node %d", node_id);
1470 		if (client_io_handler_send_test_message(cio, node_id,
1471 							&test_message_sent)) {
1472 			/*
1473 			 * If unable to test a connection, assume the step
1474 			 * is having problems and abort.  If unable to test,
1475 			 * the system is probably having serious problems, so
1476 			 * aborting the step seems reasonable.
1477 			 */
1478 			error("Aborting, can not test connection to node %d.",
1479 			      node_id);
1480 			sls->abort = true;
1481 			slurm_cond_broadcast(&sls->cond);
1482 			break;
1483 		}
1484 
1485 		/*
1486 		 * test_message_sent should be true unless this node either
1487 		 * hasn't started or already finished.  Poke the io_timeout
1488 		 * thread to make sure it will abort the job if the deadline
1489 		 * for receiving a response passes.
1490 		 */
1491 		if (test_message_sent) {
1492 			slurm_cond_broadcast(&sls->cond);
1493 		} else {
1494 			sls->io_deadline[node_id] = (time_t)NO_VAL;
1495 		}
1496 	}
1497 	slurm_mutex_unlock(&sls->lock);
1498 
1499 	hostlist_iterator_destroy(fail_itr);
1500 	hostset_destroy(fail_nodes);
1501 	hostset_destroy(all_nodes);
1502 }
1503 
1504 /* This RPC typically used to send a signal an external program that
1505  * is usually wrapped by srun.
1506  */
1507 static void
_step_step_signal(struct step_launch_state * sls,slurm_msg_t * signal_msg)1508 _step_step_signal(struct step_launch_state *sls, slurm_msg_t *signal_msg)
1509 {
1510 	job_step_kill_msg_t *step_signal = signal_msg->data;
1511 	debug2("Signal %u requested for step %u.%u", step_signal->signal,
1512 	       step_signal->job_id, step_signal->job_step_id);
1513 	if (sls->callback.step_signal)
1514 		(sls->callback.step_signal)(step_signal->signal);
1515 
1516 }
1517 
1518 /*
1519  * The TCP connection that was used to send the task_spawn_io_msg_t message
1520  * will be used as the user managed IO stream.  The remote end of the TCP stream
1521  * will be connected to the stdin, stdout, and stderr of the task.  The
1522  * local end of the stream is stored in the user_managed_io_t structure, and
1523  * is left to the user to manage (the user can retrieve the array of
1524  * socket descriptors using slurm_step_ctx_get()).
1525  *
1526  * To allow the message TCP stream to be reused for spawn IO traffic we
1527  * set the slurm_msg_t's conn_fd to -1 to avoid having the caller close the
1528  * TCP stream.
1529  */
1530 static void
_task_user_managed_io_handler(struct step_launch_state * sls,slurm_msg_t * user_io_msg)1531 _task_user_managed_io_handler(struct step_launch_state *sls,
1532 			      slurm_msg_t *user_io_msg)
1533 {
1534 	task_user_managed_io_msg_t *msg =
1535 		(task_user_managed_io_msg_t *) user_io_msg->data;
1536 
1537 	slurm_mutex_lock(&sls->lock);
1538 
1539 	debug("task %d user managed io stream established", msg->task_id);
1540 	/* sanity check */
1541 	if (msg->task_id >= sls->tasks_requested) {
1542 		error("_task_user_managed_io_handler:"
1543 		      " bad task ID %u (of %d tasks)",
1544 		      msg->task_id, sls->tasks_requested);
1545 	}
1546 
1547 	sls->io.user->connected++;
1548 	fd_set_blocking(user_io_msg->conn_fd);
1549 	sls->io.user->sockets[msg->task_id] = user_io_msg->conn_fd;
1550 
1551 	/* prevent the caller from closing the user managed IO stream */
1552 	user_io_msg->conn_fd = -1;
1553 
1554 	slurm_cond_broadcast(&sls->cond);
1555 	slurm_mutex_unlock(&sls->lock);
1556 }
1557 
1558 /*
1559  * Identify the incoming message and call the appropriate handler function.
1560  */
1561 static void
_handle_msg(void * arg,slurm_msg_t * msg)1562 _handle_msg(void *arg, slurm_msg_t *msg)
1563 {
1564 	struct step_launch_state *sls = (struct step_launch_state *)arg;
1565 	uid_t req_uid;
1566 	uid_t uid = getuid();
1567 	srun_user_msg_t *um;
1568 	int rc;
1569 
1570 	req_uid = g_slurm_auth_get_uid(msg->auth_cred);
1571 
1572 	if ((req_uid != slurm_uid) && (req_uid != 0) && (req_uid != uid)) {
1573 		error ("Security violation, slurm message from uid %u",
1574 		       (unsigned int) req_uid);
1575  		return;
1576 	}
1577 
1578 	switch (msg->msg_type) {
1579 	case RESPONSE_LAUNCH_TASKS:
1580 		debug2("received task launch");
1581 		_launch_handler(sls, msg);
1582 		slurm_send_rc_msg(msg, SLURM_SUCCESS);
1583 		break;
1584 	case MESSAGE_TASK_EXIT:
1585 		debug2("received task exit");
1586 		_exit_handler(sls, msg);
1587 		slurm_send_rc_msg(msg, SLURM_SUCCESS);
1588 		break;
1589 	case SRUN_PING:
1590 		debug3("slurmctld ping received");
1591 		slurm_send_rc_msg(msg, SLURM_SUCCESS);
1592 		break;
1593 	case SRUN_EXEC:
1594 		_exec_prog(msg);
1595 		break;
1596 	case SRUN_JOB_COMPLETE:
1597 		debug2("received job step complete message");
1598 		_job_complete_handler(sls, msg);
1599 		break;
1600 	case SRUN_TIMEOUT:
1601 		debug2("received job step timeout message");
1602 		_timeout_handler(sls, msg);
1603 		break;
1604 	case SRUN_USER_MSG:
1605 		um = msg->data;
1606 		info("%s", um->msg);
1607 		break;
1608 	case SRUN_NODE_FAIL:
1609 		debug2("received srun node fail");
1610 		_node_fail_handler(sls, msg);
1611 		break;
1612 	case SRUN_STEP_MISSING:
1613 		debug2("received notice of missing job step");
1614 		_step_missing_handler(sls, msg);
1615 		break;
1616 	case SRUN_STEP_SIGNAL:
1617 		debug2("received step signal RPC");
1618 		_step_step_signal(sls, msg);
1619 		break;
1620 	case PMI_KVS_PUT_REQ:
1621 		debug2("PMI_KVS_PUT_REQ received");
1622 		rc = pmi_kvs_put((kvs_comm_set_t *) msg->data);
1623 		slurm_send_rc_msg(msg, rc);
1624 		break;
1625 	case PMI_KVS_GET_REQ:
1626 		debug2("PMI_KVS_GET_REQ received");
1627 		rc = pmi_kvs_get((kvs_get_msg_t *) msg->data);
1628 		slurm_send_rc_msg(msg, rc);
1629 		break;
1630 	case TASK_USER_MANAGED_IO_STREAM:
1631 		debug2("TASK_USER_MANAGED_IO_STREAM");
1632 		_task_user_managed_io_handler(sls, msg);
1633 		break;
1634 	default:
1635 		error("%s: received spurious message type: %u",
1636 		      __func__, msg->msg_type);
1637 		break;
1638 	}
1639 	return;
1640 }
1641 
1642 /**********************************************************************
1643  * Task launch functions
1644  **********************************************************************/
1645 
1646 /* Since the slurmd usually controls the finishing of tasks to the
1647  * controller this needs to happen here if there was a problem with a
1648  * task launch to the slurmd since there will not be cleanup of this
1649  * anywhere else.
1650  */
_fail_step_tasks(slurm_step_ctx_t * ctx,char * node,int ret_code)1651 static int _fail_step_tasks(slurm_step_ctx_t *ctx, char *node, int ret_code)
1652 {
1653 	slurm_msg_t req;
1654 	step_complete_msg_t msg;
1655 	int rc = -1;
1656 	int nodeid = 0;
1657 	struct step_launch_state *sls = ctx->launch_state;
1658 
1659 #ifndef HAVE_FRONT_END
1660 	/* It is always 0 for front end systems */
1661 	nodeid = nodelist_find(ctx->step_resp->step_layout->node_list, node);
1662 #endif
1663 
1664 	slurm_mutex_lock(&sls->lock);
1665 	sls->abort = true;
1666 	slurm_cond_broadcast(&sls->cond);
1667 	slurm_mutex_unlock(&sls->lock);
1668 
1669 	memset(&msg, 0, sizeof(msg));
1670 	msg.job_id = ctx->job_id;
1671 	msg.job_step_id = ctx->step_resp->job_step_id;
1672 
1673 	msg.range_first = msg.range_last = nodeid;
1674 	msg.step_rc = ret_code;
1675 
1676 	slurm_msg_t_init(&req);
1677 	req.msg_type = REQUEST_STEP_COMPLETE;
1678 	req.data = &msg;
1679 
1680 	if (ctx->step_resp->use_protocol_ver)
1681 		req.protocol_version = ctx->step_resp->use_protocol_ver;
1682 
1683 	if (slurm_send_recv_controller_rc_msg(&req, &rc,
1684 					      working_cluster_rec) < 0)
1685 	       return SLURM_ERROR;
1686 
1687 	return SLURM_SUCCESS;
1688 }
1689 
_launch_tasks(slurm_step_ctx_t * ctx,launch_tasks_request_msg_t * launch_msg,uint32_t timeout,char * nodelist,int start_nodeid)1690 static int _launch_tasks(slurm_step_ctx_t *ctx,
1691 			 launch_tasks_request_msg_t *launch_msg,
1692 			 uint32_t timeout, char *nodelist, int start_nodeid)
1693 {
1694 #ifdef HAVE_FRONT_END
1695 	slurm_cred_arg_t cred_args;
1696 #endif
1697 	slurm_msg_t msg;
1698 	List ret_list = NULL;
1699 	ListIterator ret_itr;
1700 	ret_data_info_t *ret_data = NULL;
1701 	int rc = SLURM_SUCCESS;
1702 	int tot_rc = SLURM_SUCCESS;
1703 
1704 	debug("Entering _launch_tasks");
1705 	if (ctx->verbose_level) {
1706 		char *name = NULL;
1707 		hostlist_t hl = hostlist_create(nodelist);
1708 		int i = start_nodeid;
1709 		while ((name = hostlist_shift(hl))) {
1710 			_print_launch_msg(launch_msg, name, i++);
1711 			free(name);
1712 		}
1713 		hostlist_destroy(hl);
1714 	}
1715 
1716 	/*
1717 	 * Extend timeout based upon BatchStartTime to permit for a long
1718 	 * running Prolog
1719 	 */
1720 	if (timeout <= 0) {
1721 		timeout = (slurm_get_msg_timeout() +
1722 			   slurm_get_batch_start_timeout()) * 1000;
1723 	}
1724 
1725 	slurm_msg_t_init(&msg);
1726 	msg.msg_type = REQUEST_LAUNCH_TASKS;
1727 	msg.data = launch_msg;
1728 
1729 	if (ctx->step_resp->use_protocol_ver)
1730 		msg.protocol_version = ctx->step_resp->use_protocol_ver;
1731 
1732 #ifdef HAVE_FRONT_END
1733 	slurm_cred_get_args(ctx->step_resp->cred, &cred_args);
1734 	//info("hostlist=%s", cred_args.step_hostlist);
1735 	ret_list = slurm_send_recv_msgs(cred_args.step_hostlist, &msg, timeout);
1736 	slurm_cred_free_args(&cred_args);
1737 #else
1738 	ret_list = slurm_send_recv_msgs(nodelist, &msg, timeout);
1739 #endif
1740 	if (ret_list == NULL) {
1741 		error("slurm_send_recv_msgs failed miserably: %m");
1742 		return SLURM_ERROR;
1743 	}
1744 	ret_itr = list_iterator_create(ret_list);
1745 	while ((ret_data = list_next(ret_itr))) {
1746 		rc = slurm_get_return_code(ret_data->type,
1747 					   ret_data->data);
1748 		debug("launch returned msg_rc=%d err=%d type=%d",
1749 		      rc, ret_data->err, ret_data->type);
1750 		if (rc != SLURM_SUCCESS) {
1751 			if (ret_data->err)
1752 				tot_rc = ret_data->err;
1753 			else
1754 				tot_rc = rc;
1755 
1756 			_fail_step_tasks(ctx, ret_data->node_name, tot_rc);
1757 
1758 			errno = tot_rc;
1759 			tot_rc = SLURM_ERROR;
1760 			error("Task launch for %u.%u failed on "
1761 			      "node %s: %m",
1762 			      ctx->job_id, ctx->step_resp->job_step_id,
1763 			      ret_data->node_name);
1764 		} else {
1765 #if 0 /* only for debugging, might want to make this a callback */
1766 			errno = ret_data->err;
1767 			info("Launch success on node %s",
1768 			     ret_data->node_name);
1769 #endif
1770 		}
1771 	}
1772 	list_iterator_destroy(ret_itr);
1773 	FREE_NULL_LIST(ret_list);
1774 
1775 	if (tot_rc != SLURM_SUCCESS)
1776 		return tot_rc;
1777 	return rc;
1778 }
1779 
1780 /* returns an xmalloc cwd string, or NULL if lookup failed. */
_lookup_cwd(void)1781 static char *_lookup_cwd(void)
1782 {
1783 	char buf[PATH_MAX];
1784 
1785 	if (getcwd(buf, PATH_MAX) != NULL) {
1786 		return xstrdup(buf);
1787 	} else {
1788 		return NULL;
1789 	}
1790 }
1791 
_print_launch_msg(launch_tasks_request_msg_t * msg,char * hostname,int nodeid)1792 static void _print_launch_msg(launch_tasks_request_msg_t *msg,
1793 			      char *hostname, int nodeid)
1794 {
1795 	int i;
1796 	char *tmp_str = NULL, *task_list = NULL;
1797 	hostlist_t hl = hostlist_create(NULL);
1798 
1799 	for (i=0; i<msg->tasks_to_launch[nodeid]; i++) {
1800 		xstrfmtcat(tmp_str, "%u", msg->global_task_ids[nodeid][i]);
1801 		hostlist_push_host(hl, tmp_str);
1802 		xfree(tmp_str);
1803 	}
1804 	task_list = hostlist_ranged_string_xmalloc(hl);
1805 	hostlist_destroy(hl);
1806 
1807 	info("launching %u.%u on host %s, %u tasks: %s",
1808 	     msg->job_id, msg->job_step_id, hostname,
1809 	     msg->tasks_to_launch[nodeid], task_list);
1810 	xfree(task_list);
1811 
1812 	debug3("uid:%ld gid:%ld cwd:%s %d", (long) msg->uid,
1813 		(long) msg->gid, msg->cwd, nodeid);
1814 }
1815 
1816 /* This is used to initiate an OpenMPI checkpoint program,
1817  * but is written to be general purpose */
1818 static void
_exec_prog(slurm_msg_t * msg)1819 _exec_prog(slurm_msg_t *msg)
1820 {
1821 	pid_t child;
1822 	int pfd[2], status;
1823 	ssize_t len;
1824 	char buf[256] = "";
1825 	srun_exec_msg_t *exec_msg = msg->data;
1826 
1827 	if ((exec_msg->argc < 1) || (exec_msg->argv == NULL) ||
1828 	    (exec_msg->argv[0] == NULL)) {
1829 		error("%s: called with no command to execute", __func__);
1830 		return;
1831 	} else if (exec_msg->argc > 2) {
1832 		verbose("Exec '%s %s' for %u.%u",
1833 			exec_msg->argv[0], exec_msg->argv[1],
1834 			exec_msg->job_id, exec_msg->step_id);
1835 	} else {
1836 		verbose("Exec '%s' for %u.%u",
1837 			exec_msg->argv[0],
1838 			exec_msg->job_id, exec_msg->step_id);
1839 	}
1840 
1841 	if (pipe(pfd) == -1) {
1842 		snprintf(buf, sizeof(buf), "pipe: %s", strerror(errno));
1843 		error("%s", buf);
1844 		return;
1845 	}
1846 
1847 	child = fork();
1848 	if (child == 0) {
1849 		int fd = open("/dev/null", O_RDONLY);
1850 		if (fd < 0) {
1851 			error("%s: can not open /dev/null", __func__);
1852 			exit(1);
1853 		}
1854 		dup2(fd, 0);		/* stdin from /dev/null */
1855 		dup2(pfd[1], 1);	/* stdout to pipe */
1856 		dup2(pfd[1], 2);	/* stderr to pipe */
1857 		close(pfd[0]);
1858 		close(pfd[1]);
1859 		execvp(exec_msg->argv[0], exec_msg->argv);
1860 		error("execvp(%s): %m", exec_msg->argv[0]);
1861 	} else if (child < 0) {
1862 		snprintf(buf, sizeof(buf), "fork: %s", strerror(errno));
1863 		error("%s", buf);
1864 		return;
1865 	} else {
1866 		close(pfd[1]);
1867 		len = read(pfd[0], buf, sizeof(buf));
1868 		if (len >= 1)
1869 			close(pfd[0]);
1870 		waitpid(child, &status, 0);
1871 	}
1872 }
1873 
1874 
1875 /*
1876  * Notify the step_launch_state that an I/O connection went bad.
1877  * If the node is suspected to be down, abort the job.
1878  */
1879 int
step_launch_notify_io_failure(step_launch_state_t * sls,int node_id)1880 step_launch_notify_io_failure(step_launch_state_t *sls, int node_id)
1881 {
1882 	slurm_mutex_lock(&sls->lock);
1883 
1884 	bit_set(sls->node_io_error, node_id);
1885 	debug("IO error on node %d", node_id);
1886 
1887 	/*
1888 	 * sls->io_deadline[node_id] != (time_t)NO_VAL  means that
1889 	 * the _step_missing_handler was called on this node.
1890 	 */
1891 	if (sls->io_deadline[node_id] != (time_t)NO_VAL) {
1892 		error("Aborting, io error and missing step on node %d",
1893 		      node_id);
1894 		sls->abort = true;
1895 		slurm_cond_broadcast(&sls->cond);
1896 	} else {
1897 
1898 		/* FIXME
1899 		 * If stepd dies or we see I/O error with stepd.
1900 		 * Do not abort the whole job but collect all
1901 		 * taks on the node just like if they exited.
1902 		 *
1903 		 * Keep supporting 'srun -N x --pty bash'
1904 		 */
1905 		if (getenv("SLURM_PTY_PORT") == NULL) {
1906 			error("%s: aborting, io error with slurmstepd on node %d",
1907 			      __func__, node_id);
1908 			sls->abort = true;
1909 			slurm_cond_broadcast(&sls->cond);
1910 		}
1911 	}
1912 
1913 	slurm_mutex_unlock(&sls->lock);
1914 
1915 	return SLURM_SUCCESS;
1916 }
1917 
1918 
1919 /*
1920  * This is called 1) after a node connects for the first time and 2) when
1921  * a message comes in confirming that a connection is okay.
1922  *
1923  * Just in case the node was marked questionable very early in the
1924  * job step setup, clear this flag if/when the node makes its initial
1925  * connection.
1926  */
1927 int
step_launch_clear_questionable_state(step_launch_state_t * sls,int node_id)1928 step_launch_clear_questionable_state(step_launch_state_t *sls, int node_id)
1929 {
1930 	slurm_mutex_lock(&sls->lock);
1931 	sls->io_deadline[node_id] = (time_t)NO_VAL;
1932 	slurm_mutex_unlock(&sls->lock);
1933 	return SLURM_SUCCESS;
1934 }
1935 
1936 static void *
_check_io_timeout(void * _sls)1937 _check_io_timeout(void *_sls)
1938 {
1939 	int ii;
1940 	time_t now, next_deadline;
1941 	struct timespec ts = {0, 0};
1942 	step_launch_state_t *sls = (step_launch_state_t *)_sls;
1943 
1944 	slurm_mutex_lock(&sls->lock);
1945 
1946 	while (1) {
1947 		if (sls->halt_io_test || sls->abort)
1948 			break;
1949 
1950 		now = time(NULL);
1951 		next_deadline = (time_t)NO_VAL;
1952 
1953 		for (ii = 0; ii < sls->layout->node_cnt; ii++) {
1954 			if (sls->io_deadline[ii] == (time_t)NO_VAL)
1955 				continue;
1956 
1957 			if (sls->io_deadline[ii] <= now) {
1958 				sls->abort = true;
1959 				slurm_cond_broadcast(&sls->cond);
1960 				error(  "Cannot communicate with node %d.  "
1961 					"Aborting job.", ii);
1962 				break;
1963 			} else if (next_deadline == (time_t)NO_VAL ||
1964 				   sls->io_deadline[ii] < next_deadline) {
1965 				next_deadline = sls->io_deadline[ii];
1966 			}
1967 		}
1968 		if (sls->abort)
1969 			break;
1970 
1971 		if (next_deadline == (time_t)NO_VAL) {
1972 			debug("io timeout thread: no pending deadlines, "
1973 			      "sleeping indefinitely");
1974 			slurm_cond_wait(&sls->cond, &sls->lock);
1975 		} else {
1976 			debug("io timeout thread: sleeping %lds until deadline",
1977 			       (long)(next_deadline - time(NULL)));
1978 			ts.tv_sec = next_deadline;
1979 			slurm_cond_timedwait(&sls->cond, &sls->lock, &ts);
1980 		}
1981 	}
1982 	slurm_mutex_unlock(&sls->lock);
1983 	return NULL;
1984 }
1985