1 /*****************************************************************************\
2  *  src/slurmd/slurmstepd/slurmstepd.c - Slurm job-step manager.
3  *****************************************************************************
4  *  Copyright (C) 2002-2007 The Regents of the University of California.
5  *  Copyright (C) 2008-2009 Lawrence Livermore National Security.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Danny Auble <da@llnl.gov>
8  *  and Christopher Morrone <morrone2@llnl.gov>.
9  *  CODE-OCEC-09-009. All rights reserved.
10  *
11  *  This file is part of Slurm, a resource management program.
12  *  For details, see <https://slurm.schedmd.com/>.
13  *  Please also read the included file: DISCLAIMER.
14  *
15  *  Slurm is free software; you can redistribute it and/or modify it under
16  *  the terms of the GNU General Public License as published by the Free
17  *  Software Foundation; either version 2 of the License, or (at your option)
18  *  any later version.
19  *
20  *  In addition, as a special exception, the copyright holders give permission
21  *  to link the code of portions of this program with the OpenSSL library under
22  *  certain conditions as described in each individual source file, and
23  *  distribute linked combinations including the two. You must obey the GNU
24  *  General Public License in all respects for all of the code used other than
25  *  OpenSSL. If you modify file(s) with this exception, you may extend this
26  *  exception to your version of the file(s), but you are not obligated to do
27  *  so. If you do not wish to do so, delete this exception statement from your
28  *  version.  If you delete this exception statement from all source files in
29  *  the program, then also delete it here.
30  *
31  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
32  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
33  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
34  *  details.
35  *
36  *  You should have received a copy of the GNU General Public License along
37  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
38  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
39 \*****************************************************************************/
40 
41 #include "config.h"
42 
43 #include <signal.h>
44 #include <stdlib.h>
45 #include <sys/mman.h>
46 #include <unistd.h>
47 
48 #include "src/common/assoc_mgr.h"
49 #include "src/common/cpu_frequency.h"
50 #include "src/common/gres.h"
51 #include "src/common/node_select.h"
52 #include "src/common/plugstack.h"
53 #include "src/common/slurm_auth.h"
54 #include "src/common/slurm_jobacct_gather.h"
55 #include "src/common/slurm_acct_gather_profile.h"
56 #include "src/common/slurm_mpi.h"
57 #include "src/common/slurm_protocol_api.h"
58 #include "src/common/slurm_rlimits_info.h"
59 #include "src/common/stepd_api.h"
60 #include "src/common/switch.h"
61 #include "src/common/xcgroup_read_config.h"
62 #include "src/common/xmalloc.h"
63 #include "src/common/xsignal.h"
64 #include "src/common/xstring.h"
65 
66 #include "src/slurmd/common/core_spec_plugin.h"
67 #include "src/slurmd/common/slurmstepd_init.h"
68 #include "src/slurmd/common/setproctitle.h"
69 #include "src/common/slurm_acct_gather_energy.h"
70 #include "src/slurmd/common/proctrack.h"
71 #include "src/slurmd/common/xcpuinfo.h"
72 #include "src/slurmd/slurmd/slurmd.h"
73 #include "src/slurmd/slurmstepd/mgr.h"
74 #include "src/slurmd/slurmstepd/req.h"
75 #include "src/slurmd/slurmstepd/slurmstepd.h"
76 #include "src/slurmd/slurmstepd/slurmstepd_job.h"
77 
78 static int _init_from_slurmd(int sock, char **argv, slurm_addr_t **_cli,
79 			     slurm_addr_t **_self, slurm_msg_t **_msg);
80 
81 static void _dump_user_env(void);
82 static void _send_ok_to_slurmd(int sock);
83 static void _send_fail_to_slurmd(int sock);
84 static void _got_ack_from_slurmd(int);
85 static stepd_step_rec_t *_step_setup(slurm_addr_t *cli, slurm_addr_t *self,
86 				     slurm_msg_t *msg);
87 #ifdef MEMORY_LEAK_DEBUG
88 static void _step_cleanup(stepd_step_rec_t *job, slurm_msg_t *msg, int rc);
89 #endif
90 static int _process_cmdline (int argc, char **argv);
91 
92 /*
93  *  List of signals to block in this process
94  */
95 int slurmstepd_blocked_signals[] = {
96 	SIGINT,  SIGTERM, SIGTSTP,
97 	SIGQUIT, SIGPIPE, SIGUSR1,
98 	SIGUSR2, SIGALRM, SIGHUP, 0
99 };
100 
101 /* global variable */
102 slurmd_conf_t * conf;
103 extern char  ** environ;
104 
105 int
main(int argc,char ** argv)106 main (int argc, char **argv)
107 {
108 	log_options_t lopts = LOG_OPTS_INITIALIZER;
109 	slurm_addr_t *cli;
110 	slurm_addr_t *self;
111 	slurm_msg_t *msg;
112 	stepd_step_rec_t *job;
113 	int rc = 0;
114 	char *launch_params;
115 
116 	if (_process_cmdline (argc, argv) < 0)
117 		fatal ("Error in slurmstepd command line");
118 
119 	xsignal_block(slurmstepd_blocked_signals);
120 	conf = xmalloc(sizeof(*conf));
121 	conf->argv = &argv;
122 	conf->argc = &argc;
123 	init_setproctitle(argc, argv);
124 
125 	log_init(argv[0], lopts, LOG_DAEMON, NULL);
126 
127 	if (slurm_select_init(1) != SLURM_SUCCESS )
128 		fatal( "failed to initialize node selection plugin" );
129 	if (slurm_auth_init(NULL) != SLURM_SUCCESS)
130 		fatal( "failed to initialize authentication plugin" );
131 
132 	/* Receive job parameters from the slurmd */
133 	_init_from_slurmd(STDIN_FILENO, argv, &cli, &self, &msg);
134 
135 	/* Create the stepd_step_rec_t, mostly from info in a
136 	 * launch_tasks_request_msg_t or a batch_job_launch_msg_t */
137 	if (!(job = _step_setup(cli, self, msg))) {
138 		_send_fail_to_slurmd(STDOUT_FILENO);
139 		rc = SLURM_ERROR;
140 		goto ending;
141 	}
142 
143 	/* fork handlers cause mutexes on some global data structures
144 	 * to be re-initialized after the fork. */
145 	slurm_conf_install_fork_handlers();
146 
147 	/* sets job->msg_handle and job->msgid */
148 	if (msg_thr_create(job) == SLURM_ERROR) {
149 		_send_fail_to_slurmd(STDOUT_FILENO);
150 		rc = SLURM_ERROR;
151 		goto ending;
152 	}
153 
154 	if (job->stepid != SLURM_EXTERN_CONT)
155 		close_slurmd_conn();
156 
157 	/* slurmstepd is the only daemon that should survive upgrade. If it
158 	 * had been swapped out before upgrade happened it could easily lead
159 	 * to SIGBUS at any time after upgrade. Avoid that by locking it
160 	 * in-memory. */
161 	launch_params = slurm_get_launch_params();
162 	if (launch_params && strstr(launch_params, "slurmstepd_memlock")) {
163 #ifdef _POSIX_MEMLOCK
164 		int flags = MCL_CURRENT;
165 		if (strstr(launch_params, "slurmstepd_memlock_all"))
166 			flags |= MCL_FUTURE;
167 		if (mlockall(flags) < 0)
168 			info("failed to mlock() slurmstepd pages: %m");
169 		else
170 			debug("slurmstepd locked in memory");
171 #else
172 		info("mlockall() system call does not appear to be available");
173 #endif
174 	}
175 	xfree(launch_params);
176 
177 	acct_gather_energy_g_set_data(ENERGY_DATA_STEP_PTR, job);
178 
179 	/* This does most of the stdio setup, then launches all the tasks,
180 	 * and blocks until the step is complete */
181 	rc = job_manager(job);
182 
183 	return stepd_cleanup(msg, job, cli, self, rc, 0);
184 ending:
185 	return stepd_cleanup(msg, job, cli, self, rc, 1);
186 }
187 
stepd_cleanup(slurm_msg_t * msg,stepd_step_rec_t * job,slurm_addr_t * cli,slurm_addr_t * self,int rc,bool only_mem)188 extern int stepd_cleanup(slurm_msg_t *msg, stepd_step_rec_t *job,
189 			 slurm_addr_t *cli, slurm_addr_t *self,
190 			 int rc, bool only_mem)
191 {
192 	if (!only_mem) {
193 		if (job->batch)
194 			batch_finish(job, rc); /* sends batch complete message */
195 
196 		/* signal the message thread to shutdown, and wait for it */
197 		eio_signal_shutdown(job->msg_handle);
198 		pthread_join(job->msgid, NULL);
199 	}
200 
201 	mpi_fini();	/* Remove stale PMI2 sockets */
202 
203 	if (conf->hwloc_xml)
204 		(void)remove(conf->hwloc_xml);
205 
206 #ifdef MEMORY_LEAK_DEBUG
207 	acct_gather_conf_destroy();
208 	(void) core_spec_g_fini();
209 	_step_cleanup(job, msg, rc);
210 
211 	fini_setproctitle();
212 
213 	xcgroup_fini_slurm_cgroup_conf();
214 
215 	xfree(cli);
216 	xfree(self);
217 	xfree(conf->block_map);
218 	xfree(conf->block_map_inv);
219 	xfree(conf->hostname);
220 	xfree(conf->hwloc_xml);
221 	xfree(conf->job_acct_gather_freq);
222 	xfree(conf->job_acct_gather_type);
223 	xfree(conf->logfile);
224 	xfree(conf->node_name);
225 	xfree(conf->node_topo_addr);
226 	xfree(conf->node_topo_pattern);
227 	xfree(conf->spooldir);
228 	xfree(conf->task_epilog);
229 	xfree(conf->task_prolog);
230 	xfree(conf);
231 #endif
232 	info("done with job");
233 	return rc;
234 }
235 
close_slurmd_conn(void)236 extern void close_slurmd_conn(void)
237 {
238 	_send_ok_to_slurmd(STDOUT_FILENO);
239 	_got_ack_from_slurmd(STDIN_FILENO);
240 
241 	/* Fancy way of closing stdin that keeps STDIN_FILENO from being
242 	 * allocated to any random file.  The slurmd already opened /dev/null
243 	 * on STDERR_FILENO for us. */
244 	dup2(STDERR_FILENO, STDIN_FILENO);
245 
246 	/* Fancy way of closing stdout that keeps STDOUT_FILENO from being
247 	 * allocated to any random file.  The slurmd already opened /dev/null
248 	 * on STDERR_FILENO for us. */
249 	dup2(STDERR_FILENO, STDOUT_FILENO);
250 }
251 
read_slurmd_conf_lite(int fd)252 static slurmd_conf_t *read_slurmd_conf_lite(int fd)
253 {
254 	int rc;
255 	int len;
256 	Buf buffer = NULL;
257 	slurmd_conf_t *confl, *local_conf = NULL;
258 	int tmp_int = 0;
259 	List tmp_list = NULL;
260 	assoc_mgr_lock_t locks = { .tres = WRITE_LOCK };
261 
262 	/*  First check to see if we've already initialized the
263 	 *   global slurmd_conf_t in 'conf'. Allocate memory if not.
264 	 */
265 	if (conf) {
266 		confl = conf;
267 	} else {
268 		local_conf = xmalloc(sizeof(slurmd_conf_t));
269 		confl = local_conf;
270 	}
271 
272 	safe_read(fd, &len, sizeof(int));
273 
274 	buffer = init_buf(len);
275 	safe_read(fd, buffer->head, len);
276 
277 	rc = unpack_slurmd_conf_lite_no_alloc(confl, buffer);
278 	if (rc == SLURM_ERROR)
279 		fatal("slurmstepd: problem with unpack of slurmd_conf");
280 
281 	slurm_unpack_list(&tmp_list,
282 			  slurmdb_unpack_tres_rec,
283 			  slurmdb_destroy_tres_rec,
284 			  buffer, SLURM_PROTOCOL_VERSION);
285 
286 	free_buf(buffer);
287 
288 	confl->log_opts.prefix_level = 1;
289 	confl->log_opts.stderr_level = confl->debug_level;
290 	confl->log_opts.logfile_level = confl->debug_level;
291 	confl->log_opts.syslog_level = confl->debug_level;
292 	/*
293 	 * If daemonizing, turn off stderr logging -- also, if
294 	 * logging to a file, turn off syslog.
295 	 *
296 	 * Otherwise, if remaining in foreground, turn off logging
297 	 * to syslog (but keep logfile level)
298 	 */
299 	if (confl->daemonize) {
300 		confl->log_opts.stderr_level = LOG_LEVEL_QUIET;
301 		if (confl->logfile)
302 			confl->log_opts.syslog_level = LOG_LEVEL_QUIET;
303 	} else
304 		confl->log_opts.syslog_level  = LOG_LEVEL_QUIET;
305 
306 	/*
307 	 * LOGGING BEFORE THIS WILL NOT WORK!  Only afterwards will it show
308 	 * up in the log.
309 	 */
310 	log_alter(confl->log_opts, 0, confl->logfile);
311 	log_set_timefmt(confl->log_fmt);
312 	debug2("debug level is %d.", confl->debug_level);
313 
314 	confl->acct_freq_task = NO_VAL16;
315 	tmp_int = acct_gather_parse_freq(PROFILE_TASK,
316 				       confl->job_acct_gather_freq);
317 	if (tmp_int != -1)
318 		confl->acct_freq_task = tmp_int;
319 
320 	xassert(tmp_list);
321 
322 	assoc_mgr_lock(&locks);
323 	assoc_mgr_post_tres_list(tmp_list);
324 	debug2("%s: slurmd sent %u TRES.", __func__, g_tres_count);
325 	/* assoc_mgr_post_tres_list destroys tmp_list */
326 	tmp_list = NULL;
327 	assoc_mgr_unlock(&locks);
328 
329 	return (confl);
330 
331 rwfail:
332 	FREE_NULL_BUFFER(buffer);
333 	xfree(local_conf);
334 	return (NULL);
335 }
336 
_get_jobid_uid_gid_from_env(uint32_t * jobid,uid_t * uid,gid_t * gid)337 static int _get_jobid_uid_gid_from_env(uint32_t *jobid, uid_t *uid, gid_t *gid)
338 {
339 	const char *val;
340 	char *p;
341 
342 	if (!(val = getenv("SLURM_JOBID")))
343 		return error("Unable to get SLURM_JOBID in env!");
344 
345 	*jobid = (uint32_t) strtoul(val, &p, 10);
346 	if (*p != '\0')
347 		return error("Invalid SLURM_JOBID=%s", val);
348 
349 	if (!(val = getenv("SLURM_UID")))
350 		return error("Unable to get SLURM_UID in env!");
351 
352 	*uid = (uid_t) strtoul(val, &p, 10);
353 	if (*p != '\0')
354 		return error("Invalid SLURM_UID=%s", val);
355 
356 	if (!(val = getenv("SLURM_JOB_GID")))
357 		return error("Unable to get SLURM_JOB_GID in env!");
358 
359 	*gid = (gid_t) strtoul(val, &p, 10);
360 	if (*p != '\0')
361 		return error("Invalid SLURM_JOB_GID=%s", val);
362 
363 	return SLURM_SUCCESS;
364 }
365 
_handle_spank_mode(int argc,char ** argv)366 static int _handle_spank_mode (int argc, char **argv)
367 {
368 	char *prefix = NULL;
369 	const char *mode = argv[2];
370 	uid_t uid = (uid_t) -1;
371 	gid_t gid = (gid_t) -1;
372 	uint32_t jobid = (uint32_t) -1;
373 	log_options_t lopts = LOG_OPTS_INITIALIZER;
374 
375 	/*
376 	 *  Not necessary to log to syslog
377 	 */
378 	lopts.syslog_level = LOG_LEVEL_QUIET;
379 
380 	/*
381 	 *  Make our log prefix into spank-prolog: or spank-epilog:
382 	 */
383 	xstrfmtcat(prefix, "spank-%s", mode);
384 	log_init(prefix, lopts, LOG_DAEMON, NULL);
385 	xfree(prefix);
386 
387 	/*
388 	 *  When we are started from slurmd, a lightweight config is
389 	 *   sent over the stdin fd. If we are able to read this conf
390 	 *   use it to reinitialize the log.
391 	 *  It is not a fatal error if we fail to read the conf file.
392 	 *   This could happen if slurmstepd is run standalone for
393 	 *   testing.
394 	 */
395 	conf = read_slurmd_conf_lite (STDIN_FILENO);
396 	close (STDIN_FILENO);
397 
398 	slurm_conf_init(NULL);
399 
400 	if (_get_jobid_uid_gid_from_env(&jobid, &uid, &gid))
401 		return error("spank environment invalid");
402 
403 	debug("Running spank/%s for jobid [%u] uid [%u] gid [%u]",
404 	      mode, jobid, uid, gid);
405 
406 	if (xstrcmp (mode, "prolog") == 0) {
407 		if (spank_job_prolog(jobid, uid, gid) < 0)
408 			return (-1);
409 	}
410 	else if (xstrcmp (mode, "epilog") == 0) {
411 		if (spank_job_epilog(jobid, uid, gid) < 0)
412 			return (-1);
413 	}
414 	else {
415 		error ("Invalid mode %s specified!", mode);
416 		return (-1);
417 	}
418 	return (0);
419 }
420 
421 /*
422  *  Process special "modes" of slurmstepd passed as cmdline arguments.
423  */
_process_cmdline(int argc,char ** argv)424 static int _process_cmdline (int argc, char **argv)
425 {
426 	if ((argc == 2) && (xstrcmp(argv[1], "getenv") == 0)) {
427 		print_rlimits();
428 		_dump_user_env();
429 		exit(0);
430 	}
431 	if ((argc == 3) && (xstrcmp(argv[1], "spank") == 0)) {
432 		if (_handle_spank_mode(argc, argv) < 0)
433 			exit (1);
434 		exit (0);
435 	}
436 	return (0);
437 }
438 
439 
440 static void
_send_ok_to_slurmd(int sock)441 _send_ok_to_slurmd(int sock)
442 {
443 	/* If running under valgrind/memcheck, this pipe doesn't work correctly
444 	 * so just skip it. */
445 #if (SLURMSTEPD_MEMCHECK == 0)
446 	int ok = SLURM_SUCCESS;
447 	safe_write(sock, &ok, sizeof(int));
448 	return;
449 rwfail:
450 	error("Unable to send \"ok\" to slurmd");
451 #endif
452 }
453 
454 static void
_send_fail_to_slurmd(int sock)455 _send_fail_to_slurmd(int sock)
456 {
457 	/* If running under valgrind/memcheck, this pipe doesn't work correctly
458 	 * so just skip it. */
459 #if (SLURMSTEPD_MEMCHECK == 0)
460 	int fail = SLURM_ERROR;
461 
462 	if (errno)
463 		fail = errno;
464 
465 	safe_write(sock, &fail, sizeof(int));
466 	return;
467 rwfail:
468 	error("Unable to send \"fail\" to slurmd");
469 #endif
470 }
471 
472 static void
_got_ack_from_slurmd(int sock)473 _got_ack_from_slurmd(int sock)
474 {
475 	/* If running under valgrind/memcheck, this pipe doesn't work correctly
476 	 * so just skip it. */
477 #if (SLURMSTEPD_MEMCHECK == 0)
478 	int ok;
479 	safe_read(sock, &ok, sizeof(int));
480 	return;
481 rwfail:
482 	error("Unable to receive \"ok ack\" to slurmd");
483 #endif
484 }
485 
_set_job_log_prefix(uint32_t jobid,uint32_t stepid)486 static void _set_job_log_prefix(uint32_t jobid, uint32_t stepid)
487 {
488 	char *buf;
489 
490 	if (stepid == SLURM_BATCH_SCRIPT)
491 		buf = xstrdup_printf("[%u.batch]", jobid);
492 	else if (stepid == SLURM_EXTERN_CONT)
493 		buf = xstrdup_printf("[%u.extern]", jobid);
494 	else
495 		buf = xstrdup_printf("[%u.%u]", jobid, stepid);
496 
497 	setproctitle("%s", buf);
498 
499 	/* note: will claim ownership of buf, do not free */
500 	xstrcat(buf, " ");
501 	log_set_fpfx(&buf);
502 }
503 
504 /*
505  *  This function handles the initialization information from slurmd
506  *  sent by _send_slurmstepd_init() in src/slurmd/slurmd/req.c.
507  */
508 static int
_init_from_slurmd(int sock,char ** argv,slurm_addr_t ** _cli,slurm_addr_t ** _self,slurm_msg_t ** _msg)509 _init_from_slurmd(int sock, char **argv,
510 		  slurm_addr_t **_cli, slurm_addr_t **_self, slurm_msg_t **_msg)
511 {
512 	char *incoming_buffer = NULL;
513 	Buf buffer;
514 	int step_type;
515 	int len;
516 	uint16_t proto;
517 	slurm_addr_t *cli = NULL;
518 	slurm_addr_t *self = NULL;
519 	slurm_msg_t *msg = NULL;
520 	uint16_t port;
521 	char buf[16];
522 	uint32_t jobid = 0, stepid = 0;
523 
524 	/* receive conf from slurmd */
525 	if (!(conf = read_slurmd_conf_lite(sock)))
526 		fatal("Failed to read conf from slurmd");
527 
528 	/* receive cgroup conf from slurmd */
529 	if (xcgroup_read_conf(sock) != SLURM_SUCCESS)
530 		fatal("Failed to read cgroup conf from slurmd");
531 
532 	/* receive acct_gather conf from slurmd */
533 	if (acct_gather_read_conf(sock) != SLURM_SUCCESS)
534 		fatal("Failed to read acct_gather conf from slurmd");
535 
536 	/* receive job type from slurmd */
537 	safe_read(sock, &step_type, sizeof(int));
538 	debug3("step_type = %d", step_type);
539 
540 	/* receive reverse-tree info from slurmd */
541 	slurm_mutex_lock(&step_complete.lock);
542 	safe_read(sock, &step_complete.rank, sizeof(int));
543 	safe_read(sock, &step_complete.parent_rank, sizeof(int));
544 	safe_read(sock, &step_complete.children, sizeof(int));
545 	safe_read(sock, &step_complete.depth, sizeof(int));
546 	safe_read(sock, &step_complete.max_depth, sizeof(int));
547 	safe_read(sock, &step_complete.parent_addr, sizeof(slurm_addr_t));
548 	if (step_complete.children)
549 		step_complete.bits = bit_alloc(step_complete.children);
550 	step_complete.jobacct = jobacctinfo_create(NULL);
551 	slurm_mutex_unlock(&step_complete.lock);
552 
553 	switch_g_slurmd_step_init();
554 
555 	slurm_get_ip_str(&step_complete.parent_addr, &port, buf, 16);
556 	debug3("slurmstepd rank %d, parent address = %s, port = %u",
557 	       step_complete.rank, buf, port);
558 
559 	/* receive cli from slurmd */
560 	safe_read(sock, &len, sizeof(int));
561 	incoming_buffer = xmalloc(len);
562 	safe_read(sock, incoming_buffer, len);
563 	buffer = create_buf(incoming_buffer,len);
564 	cli = xmalloc(sizeof(slurm_addr_t));
565 	if (slurm_unpack_slurm_addr_no_alloc(cli, buffer) == SLURM_ERROR)
566 		fatal("slurmstepd: problem with unpack of slurmd_conf");
567 	free_buf(buffer);
568 
569 	/* receive self from slurmd */
570 	safe_read(sock, &len, sizeof(int));
571 	if (len > 0) {
572 		/* receive packed self from main slurmd */
573 		incoming_buffer = xmalloc(len);
574 		safe_read(sock, incoming_buffer, len);
575 		buffer = create_buf(incoming_buffer,len);
576 		self = xmalloc(sizeof(slurm_addr_t));
577 		if (slurm_unpack_slurm_addr_no_alloc(self, buffer)
578 		    == SLURM_ERROR) {
579 			fatal("slurmstepd: problem with unpack of "
580 			      "slurmd_conf");
581 		}
582 		free_buf(buffer);
583 	}
584 
585 	/* Receive GRES information from slurmd */
586 	gres_plugin_recv_stepd(sock);
587 
588 	/* Grab the slurmd's spooldir. Has %n expanded. */
589 	cpu_freq_init(conf);
590 
591 	/* Receive cpu_frequency info from slurmd */
592 	cpu_freq_recv_info(sock);
593 
594 	/* get the protocol version of the srun */
595 	safe_read(sock, &proto, sizeof(uint16_t));
596 
597 	/* receive req from slurmd */
598 	safe_read(sock, &len, sizeof(int));
599 	incoming_buffer = xmalloc(len);
600 	safe_read(sock, incoming_buffer, len);
601 	buffer = create_buf(incoming_buffer,len);
602 
603 	msg = xmalloc(sizeof(slurm_msg_t));
604 	slurm_msg_t_init(msg);
605 	/* Always unpack as the current version. */
606 	msg->protocol_version = SLURM_PROTOCOL_VERSION;
607 
608 	switch (step_type) {
609 	case LAUNCH_BATCH_JOB:
610 		msg->msg_type = REQUEST_BATCH_JOB_LAUNCH;
611 		break;
612 	case LAUNCH_TASKS:
613 		msg->msg_type = REQUEST_LAUNCH_TASKS;
614 		break;
615 	default:
616 		fatal("%s: Unrecognized launch RPC (%d)", __func__, step_type);
617 		break;
618 	}
619 	if (unpack_msg(msg, buffer) == SLURM_ERROR)
620 		fatal("slurmstepd: we didn't unpack the request correctly");
621 	free_buf(buffer);
622 
623 	switch (step_type) {
624 	case LAUNCH_BATCH_JOB:
625 		jobid = ((batch_job_launch_msg_t *)msg->data)->job_id;
626 		stepid = ((batch_job_launch_msg_t *)msg->data)->step_id;
627 		break;
628 	case LAUNCH_TASKS:
629 		jobid = ((launch_tasks_request_msg_t *)msg->data)->job_id;
630 		stepid = ((launch_tasks_request_msg_t *)msg->data)->job_step_id;
631 		break;
632 	default:
633 		fatal("%s: Unrecognized launch RPC (%d)", __func__, step_type);
634 		break;
635 	}
636 
637 	_set_job_log_prefix(jobid, stepid);
638 
639 	if (!conf->hwloc_xml)
640 		conf->hwloc_xml = xstrdup_printf("%s/hwloc_topo_%u.%u.xml",
641 						 conf->spooldir,
642 						 jobid, stepid);
643 
644 	/*
645 	 * Swap the field to the srun client version, which will eventually
646 	 * end up stored as protocol_version in srun_info_t. It's a hack to
647 	 * pass it in-band, while still using the correct version to unpack
648 	 * the launch request message above.
649 	 */
650 	msg->protocol_version = proto;
651 
652 	*_cli = cli;
653 	*_self = self;
654 	*_msg = msg;
655 
656 	return 1;
657 
658 rwfail:
659 	fatal("Error reading initialization data from slurmd");
660 	exit(1);
661 }
662 
663 static stepd_step_rec_t *
_step_setup(slurm_addr_t * cli,slurm_addr_t * self,slurm_msg_t * msg)664 _step_setup(slurm_addr_t *cli, slurm_addr_t *self, slurm_msg_t *msg)
665 {
666 	stepd_step_rec_t *job = NULL;
667 
668 	switch (msg->msg_type) {
669 	case REQUEST_BATCH_JOB_LAUNCH:
670 		debug2("setup for a batch_job");
671 		job = mgr_launch_batch_job_setup(msg->data, cli);
672 		break;
673 	case REQUEST_LAUNCH_TASKS:
674 		debug2("setup for a launch_task");
675 		job = mgr_launch_tasks_setup(msg->data, cli, self,
676 					     msg->protocol_version);
677 		break;
678 	default:
679 		fatal("handle_launch_message: Unrecognized launch RPC");
680 		break;
681 	}
682 
683 	if (!job) {
684 		error("_step_setup: no job returned");
685 		return NULL;
686 	}
687 
688 	job->jmgr_pid = getpid();
689 	job->jobacct = jobacctinfo_create(NULL);
690 
691 	/* Establish GRES environment variables */
692 	if (conf->debug_flags & DEBUG_FLAG_GRES) {
693 		gres_plugin_job_state_log(job->job_gres_list, job->jobid);
694 		gres_plugin_step_state_log(job->step_gres_list, job->jobid,
695 					   job->stepid);
696 	}
697 	if (msg->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
698 		gres_plugin_job_set_env(&job->env, job->job_gres_list, 0);
699 	} else if (msg->msg_type == REQUEST_LAUNCH_TASKS) {
700 		gres_plugin_step_set_env(&job->env, job->step_gres_list, 0,
701 					 NULL, -1);
702 	}
703 
704 	/*
705 	 * Add slurmd node topology informations to job env array
706 	 */
707 	env_array_overwrite(&job->env,"SLURM_TOPOLOGY_ADDR",
708 			    conf->node_topo_addr);
709 	env_array_overwrite(&job->env,"SLURM_TOPOLOGY_ADDR_PATTERN",
710 			    conf->node_topo_pattern);
711 
712 	set_msg_node_id(job);
713 
714 	return job;
715 }
716 
717 #ifdef MEMORY_LEAK_DEBUG
718 static void
_step_cleanup(stepd_step_rec_t * job,slurm_msg_t * msg,int rc)719 _step_cleanup(stepd_step_rec_t *job, slurm_msg_t *msg, int rc)
720 {
721 	if (job) {
722 		jobacctinfo_destroy(job->jobacct);
723 		if (!job->batch)
724 			stepd_step_rec_destroy(job);
725 	}
726 
727 	if (msg) {
728 		/*
729 		 * The message cannot be freed until the jobstep is complete
730 		 * because the job struct has pointers into the msg, such
731 		 * as the switch jobinfo pointer.
732 		 */
733 		switch(msg->msg_type) {
734 		case REQUEST_BATCH_JOB_LAUNCH:
735 			slurm_free_job_launch_msg(msg->data);
736 			break;
737 		case REQUEST_LAUNCH_TASKS:
738 			slurm_free_launch_tasks_request_msg(msg->data);
739 			break;
740 		default:
741 			fatal("handle_launch_message: Unrecognized launch RPC");
742 			break;
743 		}
744 		xfree(msg);
745 	}
746 	jobacctinfo_destroy(step_complete.jobacct);
747 }
748 #endif
749 
_dump_user_env(void)750 static void _dump_user_env(void)
751 {
752 	int i;
753 
754 	for (i=0; environ[i]; i++)
755 		printf("%s\n",environ[i]);
756 }
757