1 /*****************************************************************************\
2 * src/slurmd/slurmstepd/slurmstepd.c - Slurm job-step manager.
3 *****************************************************************************
4 * Copyright (C) 2002-2007 The Regents of the University of California.
5 * Copyright (C) 2008-2009 Lawrence Livermore National Security.
6 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7 * Written by Danny Auble <da@llnl.gov>
8 * and Christopher Morrone <morrone2@llnl.gov>.
9 * CODE-OCEC-09-009. All rights reserved.
10 *
11 * This file is part of Slurm, a resource management program.
12 * For details, see <https://slurm.schedmd.com/>.
13 * Please also read the included file: DISCLAIMER.
14 *
15 * Slurm is free software; you can redistribute it and/or modify it under
16 * the terms of the GNU General Public License as published by the Free
17 * Software Foundation; either version 2 of the License, or (at your option)
18 * any later version.
19 *
20 * In addition, as a special exception, the copyright holders give permission
21 * to link the code of portions of this program with the OpenSSL library under
22 * certain conditions as described in each individual source file, and
23 * distribute linked combinations including the two. You must obey the GNU
24 * General Public License in all respects for all of the code used other than
25 * OpenSSL. If you modify file(s) with this exception, you may extend this
26 * exception to your version of the file(s), but you are not obligated to do
27 * so. If you do not wish to do so, delete this exception statement from your
28 * version. If you delete this exception statement from all source files in
29 * the program, then also delete it here.
30 *
31 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
32 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
33 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
34 * details.
35 *
36 * You should have received a copy of the GNU General Public License along
37 * with Slurm; if not, write to the Free Software Foundation, Inc.,
38 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
39 \*****************************************************************************/
40
41 #include "config.h"
42
43 #include <signal.h>
44 #include <stdlib.h>
45 #include <sys/mman.h>
46 #include <unistd.h>
47
48 #include "src/common/assoc_mgr.h"
49 #include "src/common/cpu_frequency.h"
50 #include "src/common/gres.h"
51 #include "src/common/node_select.h"
52 #include "src/common/plugstack.h"
53 #include "src/common/slurm_auth.h"
54 #include "src/common/slurm_jobacct_gather.h"
55 #include "src/common/slurm_acct_gather_profile.h"
56 #include "src/common/slurm_mpi.h"
57 #include "src/common/slurm_protocol_api.h"
58 #include "src/common/slurm_rlimits_info.h"
59 #include "src/common/stepd_api.h"
60 #include "src/common/switch.h"
61 #include "src/common/xcgroup_read_config.h"
62 #include "src/common/xmalloc.h"
63 #include "src/common/xsignal.h"
64 #include "src/common/xstring.h"
65
66 #include "src/slurmd/common/core_spec_plugin.h"
67 #include "src/slurmd/common/slurmstepd_init.h"
68 #include "src/slurmd/common/setproctitle.h"
69 #include "src/common/slurm_acct_gather_energy.h"
70 #include "src/slurmd/common/proctrack.h"
71 #include "src/slurmd/common/xcpuinfo.h"
72 #include "src/slurmd/slurmd/slurmd.h"
73 #include "src/slurmd/slurmstepd/mgr.h"
74 #include "src/slurmd/slurmstepd/req.h"
75 #include "src/slurmd/slurmstepd/slurmstepd.h"
76 #include "src/slurmd/slurmstepd/slurmstepd_job.h"
77
78 static int _init_from_slurmd(int sock, char **argv, slurm_addr_t **_cli,
79 slurm_addr_t **_self, slurm_msg_t **_msg);
80
81 static void _dump_user_env(void);
82 static void _send_ok_to_slurmd(int sock);
83 static void _send_fail_to_slurmd(int sock);
84 static void _got_ack_from_slurmd(int);
85 static stepd_step_rec_t *_step_setup(slurm_addr_t *cli, slurm_addr_t *self,
86 slurm_msg_t *msg);
87 #ifdef MEMORY_LEAK_DEBUG
88 static void _step_cleanup(stepd_step_rec_t *job, slurm_msg_t *msg, int rc);
89 #endif
90 static int _process_cmdline (int argc, char **argv);
91
92 /*
93 * List of signals to block in this process
94 */
95 int slurmstepd_blocked_signals[] = {
96 SIGINT, SIGTERM, SIGTSTP,
97 SIGQUIT, SIGPIPE, SIGUSR1,
98 SIGUSR2, SIGALRM, SIGHUP, 0
99 };
100
101 /* global variable */
102 slurmd_conf_t * conf;
103 extern char ** environ;
104
105 int
main(int argc,char ** argv)106 main (int argc, char **argv)
107 {
108 log_options_t lopts = LOG_OPTS_INITIALIZER;
109 slurm_addr_t *cli;
110 slurm_addr_t *self;
111 slurm_msg_t *msg;
112 stepd_step_rec_t *job;
113 int rc = 0;
114 char *launch_params;
115
116 if (_process_cmdline (argc, argv) < 0)
117 fatal ("Error in slurmstepd command line");
118
119 xsignal_block(slurmstepd_blocked_signals);
120 conf = xmalloc(sizeof(*conf));
121 conf->argv = &argv;
122 conf->argc = &argc;
123 init_setproctitle(argc, argv);
124
125 log_init(argv[0], lopts, LOG_DAEMON, NULL);
126
127 if (slurm_select_init(1) != SLURM_SUCCESS )
128 fatal( "failed to initialize node selection plugin" );
129 if (slurm_auth_init(NULL) != SLURM_SUCCESS)
130 fatal( "failed to initialize authentication plugin" );
131
132 /* Receive job parameters from the slurmd */
133 _init_from_slurmd(STDIN_FILENO, argv, &cli, &self, &msg);
134
135 /* Create the stepd_step_rec_t, mostly from info in a
136 * launch_tasks_request_msg_t or a batch_job_launch_msg_t */
137 if (!(job = _step_setup(cli, self, msg))) {
138 _send_fail_to_slurmd(STDOUT_FILENO);
139 rc = SLURM_ERROR;
140 goto ending;
141 }
142
143 /* fork handlers cause mutexes on some global data structures
144 * to be re-initialized after the fork. */
145 slurm_conf_install_fork_handlers();
146
147 /* sets job->msg_handle and job->msgid */
148 if (msg_thr_create(job) == SLURM_ERROR) {
149 _send_fail_to_slurmd(STDOUT_FILENO);
150 rc = SLURM_ERROR;
151 goto ending;
152 }
153
154 if (job->stepid != SLURM_EXTERN_CONT)
155 close_slurmd_conn();
156
157 /* slurmstepd is the only daemon that should survive upgrade. If it
158 * had been swapped out before upgrade happened it could easily lead
159 * to SIGBUS at any time after upgrade. Avoid that by locking it
160 * in-memory. */
161 launch_params = slurm_get_launch_params();
162 if (launch_params && strstr(launch_params, "slurmstepd_memlock")) {
163 #ifdef _POSIX_MEMLOCK
164 int flags = MCL_CURRENT;
165 if (strstr(launch_params, "slurmstepd_memlock_all"))
166 flags |= MCL_FUTURE;
167 if (mlockall(flags) < 0)
168 info("failed to mlock() slurmstepd pages: %m");
169 else
170 debug("slurmstepd locked in memory");
171 #else
172 info("mlockall() system call does not appear to be available");
173 #endif
174 }
175 xfree(launch_params);
176
177 acct_gather_energy_g_set_data(ENERGY_DATA_STEP_PTR, job);
178
179 /* This does most of the stdio setup, then launches all the tasks,
180 * and blocks until the step is complete */
181 rc = job_manager(job);
182
183 return stepd_cleanup(msg, job, cli, self, rc, 0);
184 ending:
185 return stepd_cleanup(msg, job, cli, self, rc, 1);
186 }
187
stepd_cleanup(slurm_msg_t * msg,stepd_step_rec_t * job,slurm_addr_t * cli,slurm_addr_t * self,int rc,bool only_mem)188 extern int stepd_cleanup(slurm_msg_t *msg, stepd_step_rec_t *job,
189 slurm_addr_t *cli, slurm_addr_t *self,
190 int rc, bool only_mem)
191 {
192 if (!only_mem) {
193 if (job->batch)
194 batch_finish(job, rc); /* sends batch complete message */
195
196 /* signal the message thread to shutdown, and wait for it */
197 eio_signal_shutdown(job->msg_handle);
198 pthread_join(job->msgid, NULL);
199 }
200
201 mpi_fini(); /* Remove stale PMI2 sockets */
202
203 if (conf->hwloc_xml)
204 (void)remove(conf->hwloc_xml);
205
206 #ifdef MEMORY_LEAK_DEBUG
207 acct_gather_conf_destroy();
208 (void) core_spec_g_fini();
209 _step_cleanup(job, msg, rc);
210
211 fini_setproctitle();
212
213 xcgroup_fini_slurm_cgroup_conf();
214
215 xfree(cli);
216 xfree(self);
217 xfree(conf->block_map);
218 xfree(conf->block_map_inv);
219 xfree(conf->hostname);
220 xfree(conf->hwloc_xml);
221 xfree(conf->job_acct_gather_freq);
222 xfree(conf->job_acct_gather_type);
223 xfree(conf->logfile);
224 xfree(conf->node_name);
225 xfree(conf->node_topo_addr);
226 xfree(conf->node_topo_pattern);
227 xfree(conf->spooldir);
228 xfree(conf->task_epilog);
229 xfree(conf->task_prolog);
230 xfree(conf);
231 #endif
232 info("done with job");
233 return rc;
234 }
235
close_slurmd_conn(void)236 extern void close_slurmd_conn(void)
237 {
238 _send_ok_to_slurmd(STDOUT_FILENO);
239 _got_ack_from_slurmd(STDIN_FILENO);
240
241 /* Fancy way of closing stdin that keeps STDIN_FILENO from being
242 * allocated to any random file. The slurmd already opened /dev/null
243 * on STDERR_FILENO for us. */
244 dup2(STDERR_FILENO, STDIN_FILENO);
245
246 /* Fancy way of closing stdout that keeps STDOUT_FILENO from being
247 * allocated to any random file. The slurmd already opened /dev/null
248 * on STDERR_FILENO for us. */
249 dup2(STDERR_FILENO, STDOUT_FILENO);
250 }
251
read_slurmd_conf_lite(int fd)252 static slurmd_conf_t *read_slurmd_conf_lite(int fd)
253 {
254 int rc;
255 int len;
256 Buf buffer = NULL;
257 slurmd_conf_t *confl, *local_conf = NULL;
258 int tmp_int = 0;
259 List tmp_list = NULL;
260 assoc_mgr_lock_t locks = { .tres = WRITE_LOCK };
261
262 /* First check to see if we've already initialized the
263 * global slurmd_conf_t in 'conf'. Allocate memory if not.
264 */
265 if (conf) {
266 confl = conf;
267 } else {
268 local_conf = xmalloc(sizeof(slurmd_conf_t));
269 confl = local_conf;
270 }
271
272 safe_read(fd, &len, sizeof(int));
273
274 buffer = init_buf(len);
275 safe_read(fd, buffer->head, len);
276
277 rc = unpack_slurmd_conf_lite_no_alloc(confl, buffer);
278 if (rc == SLURM_ERROR)
279 fatal("slurmstepd: problem with unpack of slurmd_conf");
280
281 slurm_unpack_list(&tmp_list,
282 slurmdb_unpack_tres_rec,
283 slurmdb_destroy_tres_rec,
284 buffer, SLURM_PROTOCOL_VERSION);
285
286 free_buf(buffer);
287
288 confl->log_opts.prefix_level = 1;
289 confl->log_opts.stderr_level = confl->debug_level;
290 confl->log_opts.logfile_level = confl->debug_level;
291 confl->log_opts.syslog_level = confl->debug_level;
292 /*
293 * If daemonizing, turn off stderr logging -- also, if
294 * logging to a file, turn off syslog.
295 *
296 * Otherwise, if remaining in foreground, turn off logging
297 * to syslog (but keep logfile level)
298 */
299 if (confl->daemonize) {
300 confl->log_opts.stderr_level = LOG_LEVEL_QUIET;
301 if (confl->logfile)
302 confl->log_opts.syslog_level = LOG_LEVEL_QUIET;
303 } else
304 confl->log_opts.syslog_level = LOG_LEVEL_QUIET;
305
306 /*
307 * LOGGING BEFORE THIS WILL NOT WORK! Only afterwards will it show
308 * up in the log.
309 */
310 log_alter(confl->log_opts, 0, confl->logfile);
311 log_set_timefmt(confl->log_fmt);
312 debug2("debug level is %d.", confl->debug_level);
313
314 confl->acct_freq_task = NO_VAL16;
315 tmp_int = acct_gather_parse_freq(PROFILE_TASK,
316 confl->job_acct_gather_freq);
317 if (tmp_int != -1)
318 confl->acct_freq_task = tmp_int;
319
320 xassert(tmp_list);
321
322 assoc_mgr_lock(&locks);
323 assoc_mgr_post_tres_list(tmp_list);
324 debug2("%s: slurmd sent %u TRES.", __func__, g_tres_count);
325 /* assoc_mgr_post_tres_list destroys tmp_list */
326 tmp_list = NULL;
327 assoc_mgr_unlock(&locks);
328
329 return (confl);
330
331 rwfail:
332 FREE_NULL_BUFFER(buffer);
333 xfree(local_conf);
334 return (NULL);
335 }
336
_get_jobid_uid_gid_from_env(uint32_t * jobid,uid_t * uid,gid_t * gid)337 static int _get_jobid_uid_gid_from_env(uint32_t *jobid, uid_t *uid, gid_t *gid)
338 {
339 const char *val;
340 char *p;
341
342 if (!(val = getenv("SLURM_JOBID")))
343 return error("Unable to get SLURM_JOBID in env!");
344
345 *jobid = (uint32_t) strtoul(val, &p, 10);
346 if (*p != '\0')
347 return error("Invalid SLURM_JOBID=%s", val);
348
349 if (!(val = getenv("SLURM_UID")))
350 return error("Unable to get SLURM_UID in env!");
351
352 *uid = (uid_t) strtoul(val, &p, 10);
353 if (*p != '\0')
354 return error("Invalid SLURM_UID=%s", val);
355
356 if (!(val = getenv("SLURM_JOB_GID")))
357 return error("Unable to get SLURM_JOB_GID in env!");
358
359 *gid = (gid_t) strtoul(val, &p, 10);
360 if (*p != '\0')
361 return error("Invalid SLURM_JOB_GID=%s", val);
362
363 return SLURM_SUCCESS;
364 }
365
_handle_spank_mode(int argc,char ** argv)366 static int _handle_spank_mode (int argc, char **argv)
367 {
368 char *prefix = NULL;
369 const char *mode = argv[2];
370 uid_t uid = (uid_t) -1;
371 gid_t gid = (gid_t) -1;
372 uint32_t jobid = (uint32_t) -1;
373 log_options_t lopts = LOG_OPTS_INITIALIZER;
374
375 /*
376 * Not necessary to log to syslog
377 */
378 lopts.syslog_level = LOG_LEVEL_QUIET;
379
380 /*
381 * Make our log prefix into spank-prolog: or spank-epilog:
382 */
383 xstrfmtcat(prefix, "spank-%s", mode);
384 log_init(prefix, lopts, LOG_DAEMON, NULL);
385 xfree(prefix);
386
387 /*
388 * When we are started from slurmd, a lightweight config is
389 * sent over the stdin fd. If we are able to read this conf
390 * use it to reinitialize the log.
391 * It is not a fatal error if we fail to read the conf file.
392 * This could happen if slurmstepd is run standalone for
393 * testing.
394 */
395 conf = read_slurmd_conf_lite (STDIN_FILENO);
396 close (STDIN_FILENO);
397
398 slurm_conf_init(NULL);
399
400 if (_get_jobid_uid_gid_from_env(&jobid, &uid, &gid))
401 return error("spank environment invalid");
402
403 debug("Running spank/%s for jobid [%u] uid [%u] gid [%u]",
404 mode, jobid, uid, gid);
405
406 if (xstrcmp (mode, "prolog") == 0) {
407 if (spank_job_prolog(jobid, uid, gid) < 0)
408 return (-1);
409 }
410 else if (xstrcmp (mode, "epilog") == 0) {
411 if (spank_job_epilog(jobid, uid, gid) < 0)
412 return (-1);
413 }
414 else {
415 error ("Invalid mode %s specified!", mode);
416 return (-1);
417 }
418 return (0);
419 }
420
421 /*
422 * Process special "modes" of slurmstepd passed as cmdline arguments.
423 */
_process_cmdline(int argc,char ** argv)424 static int _process_cmdline (int argc, char **argv)
425 {
426 if ((argc == 2) && (xstrcmp(argv[1], "getenv") == 0)) {
427 print_rlimits();
428 _dump_user_env();
429 exit(0);
430 }
431 if ((argc == 3) && (xstrcmp(argv[1], "spank") == 0)) {
432 if (_handle_spank_mode(argc, argv) < 0)
433 exit (1);
434 exit (0);
435 }
436 return (0);
437 }
438
439
440 static void
_send_ok_to_slurmd(int sock)441 _send_ok_to_slurmd(int sock)
442 {
443 /* If running under valgrind/memcheck, this pipe doesn't work correctly
444 * so just skip it. */
445 #if (SLURMSTEPD_MEMCHECK == 0)
446 int ok = SLURM_SUCCESS;
447 safe_write(sock, &ok, sizeof(int));
448 return;
449 rwfail:
450 error("Unable to send \"ok\" to slurmd");
451 #endif
452 }
453
454 static void
_send_fail_to_slurmd(int sock)455 _send_fail_to_slurmd(int sock)
456 {
457 /* If running under valgrind/memcheck, this pipe doesn't work correctly
458 * so just skip it. */
459 #if (SLURMSTEPD_MEMCHECK == 0)
460 int fail = SLURM_ERROR;
461
462 if (errno)
463 fail = errno;
464
465 safe_write(sock, &fail, sizeof(int));
466 return;
467 rwfail:
468 error("Unable to send \"fail\" to slurmd");
469 #endif
470 }
471
472 static void
_got_ack_from_slurmd(int sock)473 _got_ack_from_slurmd(int sock)
474 {
475 /* If running under valgrind/memcheck, this pipe doesn't work correctly
476 * so just skip it. */
477 #if (SLURMSTEPD_MEMCHECK == 0)
478 int ok;
479 safe_read(sock, &ok, sizeof(int));
480 return;
481 rwfail:
482 error("Unable to receive \"ok ack\" to slurmd");
483 #endif
484 }
485
_set_job_log_prefix(uint32_t jobid,uint32_t stepid)486 static void _set_job_log_prefix(uint32_t jobid, uint32_t stepid)
487 {
488 char *buf;
489
490 if (stepid == SLURM_BATCH_SCRIPT)
491 buf = xstrdup_printf("[%u.batch]", jobid);
492 else if (stepid == SLURM_EXTERN_CONT)
493 buf = xstrdup_printf("[%u.extern]", jobid);
494 else
495 buf = xstrdup_printf("[%u.%u]", jobid, stepid);
496
497 setproctitle("%s", buf);
498
499 /* note: will claim ownership of buf, do not free */
500 xstrcat(buf, " ");
501 log_set_fpfx(&buf);
502 }
503
504 /*
505 * This function handles the initialization information from slurmd
506 * sent by _send_slurmstepd_init() in src/slurmd/slurmd/req.c.
507 */
508 static int
_init_from_slurmd(int sock,char ** argv,slurm_addr_t ** _cli,slurm_addr_t ** _self,slurm_msg_t ** _msg)509 _init_from_slurmd(int sock, char **argv,
510 slurm_addr_t **_cli, slurm_addr_t **_self, slurm_msg_t **_msg)
511 {
512 char *incoming_buffer = NULL;
513 Buf buffer;
514 int step_type;
515 int len;
516 uint16_t proto;
517 slurm_addr_t *cli = NULL;
518 slurm_addr_t *self = NULL;
519 slurm_msg_t *msg = NULL;
520 uint16_t port;
521 char buf[16];
522 uint32_t jobid = 0, stepid = 0;
523
524 /* receive conf from slurmd */
525 if (!(conf = read_slurmd_conf_lite(sock)))
526 fatal("Failed to read conf from slurmd");
527
528 /* receive cgroup conf from slurmd */
529 if (xcgroup_read_conf(sock) != SLURM_SUCCESS)
530 fatal("Failed to read cgroup conf from slurmd");
531
532 /* receive acct_gather conf from slurmd */
533 if (acct_gather_read_conf(sock) != SLURM_SUCCESS)
534 fatal("Failed to read acct_gather conf from slurmd");
535
536 /* receive job type from slurmd */
537 safe_read(sock, &step_type, sizeof(int));
538 debug3("step_type = %d", step_type);
539
540 /* receive reverse-tree info from slurmd */
541 slurm_mutex_lock(&step_complete.lock);
542 safe_read(sock, &step_complete.rank, sizeof(int));
543 safe_read(sock, &step_complete.parent_rank, sizeof(int));
544 safe_read(sock, &step_complete.children, sizeof(int));
545 safe_read(sock, &step_complete.depth, sizeof(int));
546 safe_read(sock, &step_complete.max_depth, sizeof(int));
547 safe_read(sock, &step_complete.parent_addr, sizeof(slurm_addr_t));
548 if (step_complete.children)
549 step_complete.bits = bit_alloc(step_complete.children);
550 step_complete.jobacct = jobacctinfo_create(NULL);
551 slurm_mutex_unlock(&step_complete.lock);
552
553 switch_g_slurmd_step_init();
554
555 slurm_get_ip_str(&step_complete.parent_addr, &port, buf, 16);
556 debug3("slurmstepd rank %d, parent address = %s, port = %u",
557 step_complete.rank, buf, port);
558
559 /* receive cli from slurmd */
560 safe_read(sock, &len, sizeof(int));
561 incoming_buffer = xmalloc(len);
562 safe_read(sock, incoming_buffer, len);
563 buffer = create_buf(incoming_buffer,len);
564 cli = xmalloc(sizeof(slurm_addr_t));
565 if (slurm_unpack_slurm_addr_no_alloc(cli, buffer) == SLURM_ERROR)
566 fatal("slurmstepd: problem with unpack of slurmd_conf");
567 free_buf(buffer);
568
569 /* receive self from slurmd */
570 safe_read(sock, &len, sizeof(int));
571 if (len > 0) {
572 /* receive packed self from main slurmd */
573 incoming_buffer = xmalloc(len);
574 safe_read(sock, incoming_buffer, len);
575 buffer = create_buf(incoming_buffer,len);
576 self = xmalloc(sizeof(slurm_addr_t));
577 if (slurm_unpack_slurm_addr_no_alloc(self, buffer)
578 == SLURM_ERROR) {
579 fatal("slurmstepd: problem with unpack of "
580 "slurmd_conf");
581 }
582 free_buf(buffer);
583 }
584
585 /* Receive GRES information from slurmd */
586 gres_plugin_recv_stepd(sock);
587
588 /* Grab the slurmd's spooldir. Has %n expanded. */
589 cpu_freq_init(conf);
590
591 /* Receive cpu_frequency info from slurmd */
592 cpu_freq_recv_info(sock);
593
594 /* get the protocol version of the srun */
595 safe_read(sock, &proto, sizeof(uint16_t));
596
597 /* receive req from slurmd */
598 safe_read(sock, &len, sizeof(int));
599 incoming_buffer = xmalloc(len);
600 safe_read(sock, incoming_buffer, len);
601 buffer = create_buf(incoming_buffer,len);
602
603 msg = xmalloc(sizeof(slurm_msg_t));
604 slurm_msg_t_init(msg);
605 /* Always unpack as the current version. */
606 msg->protocol_version = SLURM_PROTOCOL_VERSION;
607
608 switch (step_type) {
609 case LAUNCH_BATCH_JOB:
610 msg->msg_type = REQUEST_BATCH_JOB_LAUNCH;
611 break;
612 case LAUNCH_TASKS:
613 msg->msg_type = REQUEST_LAUNCH_TASKS;
614 break;
615 default:
616 fatal("%s: Unrecognized launch RPC (%d)", __func__, step_type);
617 break;
618 }
619 if (unpack_msg(msg, buffer) == SLURM_ERROR)
620 fatal("slurmstepd: we didn't unpack the request correctly");
621 free_buf(buffer);
622
623 switch (step_type) {
624 case LAUNCH_BATCH_JOB:
625 jobid = ((batch_job_launch_msg_t *)msg->data)->job_id;
626 stepid = ((batch_job_launch_msg_t *)msg->data)->step_id;
627 break;
628 case LAUNCH_TASKS:
629 jobid = ((launch_tasks_request_msg_t *)msg->data)->job_id;
630 stepid = ((launch_tasks_request_msg_t *)msg->data)->job_step_id;
631 break;
632 default:
633 fatal("%s: Unrecognized launch RPC (%d)", __func__, step_type);
634 break;
635 }
636
637 _set_job_log_prefix(jobid, stepid);
638
639 if (!conf->hwloc_xml)
640 conf->hwloc_xml = xstrdup_printf("%s/hwloc_topo_%u.%u.xml",
641 conf->spooldir,
642 jobid, stepid);
643
644 /*
645 * Swap the field to the srun client version, which will eventually
646 * end up stored as protocol_version in srun_info_t. It's a hack to
647 * pass it in-band, while still using the correct version to unpack
648 * the launch request message above.
649 */
650 msg->protocol_version = proto;
651
652 *_cli = cli;
653 *_self = self;
654 *_msg = msg;
655
656 return 1;
657
658 rwfail:
659 fatal("Error reading initialization data from slurmd");
660 exit(1);
661 }
662
663 static stepd_step_rec_t *
_step_setup(slurm_addr_t * cli,slurm_addr_t * self,slurm_msg_t * msg)664 _step_setup(slurm_addr_t *cli, slurm_addr_t *self, slurm_msg_t *msg)
665 {
666 stepd_step_rec_t *job = NULL;
667
668 switch (msg->msg_type) {
669 case REQUEST_BATCH_JOB_LAUNCH:
670 debug2("setup for a batch_job");
671 job = mgr_launch_batch_job_setup(msg->data, cli);
672 break;
673 case REQUEST_LAUNCH_TASKS:
674 debug2("setup for a launch_task");
675 job = mgr_launch_tasks_setup(msg->data, cli, self,
676 msg->protocol_version);
677 break;
678 default:
679 fatal("handle_launch_message: Unrecognized launch RPC");
680 break;
681 }
682
683 if (!job) {
684 error("_step_setup: no job returned");
685 return NULL;
686 }
687
688 job->jmgr_pid = getpid();
689 job->jobacct = jobacctinfo_create(NULL);
690
691 /* Establish GRES environment variables */
692 if (conf->debug_flags & DEBUG_FLAG_GRES) {
693 gres_plugin_job_state_log(job->job_gres_list, job->jobid);
694 gres_plugin_step_state_log(job->step_gres_list, job->jobid,
695 job->stepid);
696 }
697 if (msg->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
698 gres_plugin_job_set_env(&job->env, job->job_gres_list, 0);
699 } else if (msg->msg_type == REQUEST_LAUNCH_TASKS) {
700 gres_plugin_step_set_env(&job->env, job->step_gres_list, 0,
701 NULL, -1);
702 }
703
704 /*
705 * Add slurmd node topology informations to job env array
706 */
707 env_array_overwrite(&job->env,"SLURM_TOPOLOGY_ADDR",
708 conf->node_topo_addr);
709 env_array_overwrite(&job->env,"SLURM_TOPOLOGY_ADDR_PATTERN",
710 conf->node_topo_pattern);
711
712 set_msg_node_id(job);
713
714 return job;
715 }
716
717 #ifdef MEMORY_LEAK_DEBUG
718 static void
_step_cleanup(stepd_step_rec_t * job,slurm_msg_t * msg,int rc)719 _step_cleanup(stepd_step_rec_t *job, slurm_msg_t *msg, int rc)
720 {
721 if (job) {
722 jobacctinfo_destroy(job->jobacct);
723 if (!job->batch)
724 stepd_step_rec_destroy(job);
725 }
726
727 if (msg) {
728 /*
729 * The message cannot be freed until the jobstep is complete
730 * because the job struct has pointers into the msg, such
731 * as the switch jobinfo pointer.
732 */
733 switch(msg->msg_type) {
734 case REQUEST_BATCH_JOB_LAUNCH:
735 slurm_free_job_launch_msg(msg->data);
736 break;
737 case REQUEST_LAUNCH_TASKS:
738 slurm_free_launch_tasks_request_msg(msg->data);
739 break;
740 default:
741 fatal("handle_launch_message: Unrecognized launch RPC");
742 break;
743 }
744 xfree(msg);
745 }
746 jobacctinfo_destroy(step_complete.jobacct);
747 }
748 #endif
749
_dump_user_env(void)750 static void _dump_user_env(void)
751 {
752 int i;
753
754 for (i=0; environ[i]; i++)
755 printf("%s\n",environ[i]);
756 }
757