1 /*****************************************************************************\
2  *  slurmd/slurmstepd/task.c - task launching functions for slurmstepd
3  *****************************************************************************
4  *  Copyright (C) 2002-2007 The Regents of the University of California.
5  *  Copyright (C) 2008-2009 Lawrence Livermore National Security.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Mark A. Grondona <mgrondona@llnl.gov>.
8  *  CODE-OCEC-09-009. All rights reserved.
9  *
10  *  This file is part of Slurm, a resource management program.
11  *  For details, see <https://slurm.schedmd.com/>.
12  *  Please also read the included file: DISCLAIMER.
13  *
14  *  Slurm is free software; you can redistribute it and/or modify it under
15  *  the terms of the GNU General Public License as published by the Free
16  *  Software Foundation; either version 2 of the License, or (at your option)
17  *  any later version.
18  *
19  *  In addition, as a special exception, the copyright holders give permission
20  *  to link the code of portions of this program with the OpenSSL library under
21  *  certain conditions as described in each individual source file, and
22  *  distribute linked combinations including the two. You must obey the GNU
23  *  General Public License in all respects for all of the code used other than
24  *  OpenSSL. If you modify file(s) with this exception, you may extend this
25  *  exception to your version of the file(s), but you are not obligated to do
26  *  so. If you do not wish to do so, delete this exception statement from your
27  *  version.  If you delete this exception statement from all source files in
28  *  the program, then also delete it here.
29  *
30  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
33  *  details.
34  *
35  *  You should have received a copy of the GNU General Public License along
36  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
37  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
38 \*****************************************************************************/
39 
40 #include "config.h"
41 
42 #define _GNU_SOURCE
43 
44 #include <assert.h>
45 #include <ctype.h>
46 #include <fcntl.h>
47 #include <grp.h>
48 #include <pwd.h>
49 #include <stdlib.h>
50 #include <string.h>
51 #include <sys/param.h>
52 #include <sys/resource.h>
53 #include <sys/stat.h>
54 #include <sys/types.h>
55 #include <sys/wait.h>
56 #include <unistd.h>
57 
58 /* FIXME: Come up with a real solution for EUID instead of substituting RUID */
59 #if defined(__NetBSD__)
60 #define eaccess(p,m) (access((p),(m)))
61 #define HAVE_EACCESS 1
62 #endif
63 
64 #include "slurm/slurm_errno.h"
65 
66 #include "src/common/env.h"
67 #include "src/common/gres.h"
68 #include "src/common/fd.h"
69 #include "src/common/log.h"
70 #include "src/common/plugstack.h"
71 #include "src/common/slurm_mpi.h"
72 #include "src/common/strlcpy.h"
73 #include "src/common/switch.h"
74 #include "src/common/xsignal.h"
75 #include "src/common/xstring.h"
76 #include "src/common/xmalloc.h"
77 #include "src/slurmd/common/proctrack.h"
78 #include "src/slurmd/common/task_plugin.h"
79 #include "src/slurmd/slurmd/slurmd.h"
80 #include "src/slurmd/slurmstepd/pdebug.h"
81 #include "src/slurmd/slurmstepd/task.h"
82 #include "src/slurmd/slurmstepd/ulimits.h"
83 
84 /*
85  * Static prototype definitions.
86  */
87 static void  _make_tmpdir(stepd_step_rec_t *job);
88 static int   _run_script_and_set_env(const char *name, const char *path,
89 				     stepd_step_rec_t *job);
90 static void  _proc_stdout(char *buf, stepd_step_rec_t *job);
91 static char *_uint32_array_to_str(int array_len, const uint32_t *array);
92 
93 /*
94  * Process TaskProlog output
95  * "export NAME=value"	adds environment variables
96  * "unset  NAME"	clears an environment variable
97  * "print  <whatever>"	writes that to the job's stdout
98  */
_proc_stdout(char * buf,stepd_step_rec_t * job)99 static void _proc_stdout(char *buf, stepd_step_rec_t *job)
100 {
101 	bool end_buf = false;
102 	int len;
103 	char *buf_ptr, *name_ptr, *val_ptr;
104 	char *end_line, *equal_ptr;
105 	char ***env = &job->env;
106 
107 	buf_ptr = buf;
108 	while (buf_ptr[0]) {
109 		end_line = strchr(buf_ptr, '\n');
110 		if (!end_line) {
111 			end_line = buf_ptr + strlen(buf_ptr);
112 			end_buf = true;
113 		}
114 		if (!xstrncmp(buf_ptr, "print ", 6)) {
115 			buf_ptr += 6;
116 			while (isspace(buf_ptr[0]))
117 				buf_ptr++;
118 			len = end_line - buf_ptr + 1;
119 			safe_write(1, buf_ptr, len);
120 		} else if (!xstrncmp(buf_ptr, "export ",7)) {
121 			name_ptr = buf_ptr + 7;
122 			while (isspace(name_ptr[0]))
123 				name_ptr++;
124 			equal_ptr = strchr(name_ptr, '=');
125 			if (!equal_ptr || (equal_ptr > end_line))
126 				goto rwfail;
127 			val_ptr = equal_ptr + 1;
128 			while (isspace(equal_ptr[-1]))
129 				equal_ptr--;
130 			equal_ptr[0] = '\0';
131 			end_line[0] = '\0';
132 			if (!xstrcmp(name_ptr, "SLURM_PROLOG_CPU_MASK")) {
133 				job->cpu_bind_type = CPU_BIND_MASK;
134 				xfree(job->cpu_bind);
135 				job->cpu_bind = xstrdup(val_ptr);
136 				if (task_g_pre_launch(job)) {
137 					error("Failed SLURM_PROLOG_CPU_MASK "
138 					      "setup");
139 					exit(1);
140 				}
141 			}
142 			debug("export name:%s:val:%s:", name_ptr, val_ptr);
143 			if (setenvf(env, name_ptr, "%s", val_ptr)) {
144 				error("Unable to set %s environment variable",
145 				      buf_ptr);
146 			}
147 			equal_ptr[0] = '=';
148 			if (end_buf)
149 				end_line[0] = '\0';
150 			else
151 				end_line[0] = '\n';
152 		} else if (!xstrncmp(buf_ptr, "unset ", 6)) {
153 			name_ptr = buf_ptr + 6;
154 			while (isspace(name_ptr[0]))
155 				name_ptr++;
156 			if ((name_ptr[0] == '\n') || (name_ptr[0] == '\0'))
157 				goto rwfail;
158 			while (isspace(end_line[-1]))
159 				end_line--;
160 			end_line[0] = '\0';
161 			debug(" unset name:%s:", name_ptr);
162 			unsetenvp(*env, name_ptr);
163 			if (end_buf)
164 				end_line[0] = '\0';
165 			else
166 				end_line[0] = '\n';
167 		}
168 
169 rwfail:		 /* process rest of script output */
170 		if (end_buf)
171 			break;
172 		buf_ptr = end_line + 1;
173 	}
174 	return;
175 }
176 
177 /*
178  * Run a task prolog script.  Also read the stdout of the script and set
179  * 	environment variables in the task's environment as specified
180  *	in the script's standard output.
181  * name IN: class of program ("system prolog", "user prolog", etc.)
182  * path IN: pathname of program to run
183  * job IN/OUT: pointer to associated job, can update job->env
184  *	if prolog
185  * RET 0 on success, -1 on failure.
186  */
187 static int
_run_script_and_set_env(const char * name,const char * path,stepd_step_rec_t * job)188 _run_script_and_set_env(const char *name, const char *path,
189 			stepd_step_rec_t *job)
190 {
191 	int status, rc;
192 	pid_t cpid;
193 	int pfd[2];
194 	char buf[4096];
195 	FILE *f;
196 
197 	xassert(job->env);
198 	if (path == NULL || path[0] == '\0')
199 		return 0;
200 
201 	debug("[job %u] attempting to run %s [%s]", job->jobid, name, path);
202 
203 	if (access(path, R_OK | X_OK) < 0) {
204 		error("Could not run %s [%s]: %m", name, path);
205 		return -1;
206 	}
207 	if (pipe(pfd) < 0) {
208 		error("executing %s: pipe: %m", name);
209 		return -1;
210 	}
211 	if ((cpid = fork()) < 0) {
212 		error("executing %s: fork: %m", name);
213 		return -1;
214 	}
215 	if (cpid == 0) {
216 		char *argv[2];
217 
218 		setenvf(&job->env, "SLURM_SCRIPT_CONTEXT", "prolog_task");
219 
220 		argv[0] = xstrdup(path);
221 		argv[1] = NULL;
222 		if (dup2(pfd[1], 1) == -1)
223 			error("couldn't do the dup: %m");
224 		close(2);
225 		close(0);
226 		close(pfd[0]);
227 		close(pfd[1]);
228 		setpgid(0, 0);
229 		execve(path, argv, job->env);
230 		error("execve(%s): %m", path);
231 		_exit(127);
232 	}
233 
234 	close(pfd[1]);
235 	f = fdopen(pfd[0], "r");
236 	if (f == NULL) {
237 		error("Cannot open pipe device: %m");
238 		log_fini();
239 		exit(1);
240 	}
241 	while (feof(f) == 0) {
242 		if (fgets(buf, sizeof(buf) - 1, f) != NULL) {
243 			_proc_stdout(buf, job);
244 		}
245 	}
246 	fclose(f);
247 
248 	while (1) {
249 		rc = waitpid(cpid, &status, 0);
250 		if (rc < 0) {
251 			if (errno == EINTR)
252 				continue;
253 			error("waidpid: %m");
254 			return 0;
255 		} else  {
256 			killpg(cpid, SIGKILL);  /* kill children too */
257 			return status;
258 		}
259 	}
260 
261 	/* NOTREACHED */
262 }
263 
264 /* Given a program name, translate it to a fully qualified pathname as needed
265  * based upon the PATH environment variable and current working directory
266  * Returns xmalloc()'d string that must be xfree()'d */
_build_path(char * fname,char ** prog_env)267 static char *_build_path(char *fname, char **prog_env)
268 {
269 	char *path_env = NULL, *dir = NULL;
270 	char *file_name, *last = NULL;
271 	struct stat stat_buf;
272 	int len = PATH_MAX;
273 
274 	if (!fname)
275 		return NULL;
276 
277 	file_name = (char *) xmalloc(len);
278 
279 	/* check if already absolute path */
280 	if (fname[0] == '/') {
281 		/* copy and ensure null termination */
282 		strlcpy(file_name, fname, len);
283 		return file_name;
284 	}
285 
286 	if (fname[0] == '.') {
287 		dir = xmalloc(len);
288 		if (!getcwd(dir, len))
289 			error("getcwd failed: %m");
290 		snprintf(file_name, len, "%s/%s", dir, fname);
291 		xfree(dir);
292 		return file_name;
293 	}
294 
295 	/* search for the file using PATH environment variable */
296 	path_env = xstrdup(getenvp(prog_env, "PATH"));
297 	if (path_env)
298 		dir = strtok_r(path_env, ":", &last);
299 	while (dir) {
300 		snprintf(file_name, len, "%s/%s", dir, fname);
301 		if ((stat(file_name, &stat_buf) == 0)
302 		    && (! S_ISDIR(stat_buf.st_mode)))
303 			break;
304 		dir = strtok_r(NULL, ":", &last);
305 	}
306 	if (dir == NULL)	/* not found */
307 		strlcpy(file_name, fname, len);
308 
309 	xfree(path_env);
310 	return file_name;
311 }
312 
313 static int
_setup_mpi(stepd_step_rec_t * job,int ltaskid)314 _setup_mpi(stepd_step_rec_t *job, int ltaskid)
315 {
316 	mpi_plugin_task_info_t info[1];
317 
318 	if (job->het_job_id && (job->het_job_id != NO_VAL)) {
319 		info->jobid   = job->het_job_id;
320 		info->stepid  = job->stepid;
321 		info->nnodes  = job->het_job_nnodes;
322 		info->nodeid  = job->het_job_node_offset + job->nodeid;
323 		info->ntasks  = job->het_job_ntasks;
324 		info->ltasks  = job->node_tasks;
325 		info->gtaskid = job->het_job_task_offset +
326 				job->task[ltaskid]->gtid;
327 		info->ltaskid = job->task[ltaskid]->id;
328 		info->self    = job->envtp->self;
329 		info->client  = job->envtp->cli;
330 	} else {
331 		info->jobid   = job->jobid;
332 		info->stepid  = job->stepid;
333 		info->nnodes  = job->nnodes;
334 		info->nodeid  = job->nodeid;
335 		info->ntasks  = job->ntasks;
336 		info->ltasks  = job->node_tasks;
337 		info->gtaskid = job->task[ltaskid]->gtid;
338 		info->ltaskid = job->task[ltaskid]->id;
339 		info->self    = job->envtp->self;
340 		info->client  = job->envtp->cli;
341 	}
342 
343 	return mpi_hook_slurmstepd_task(info, &job->env);
344 }
345 
346 /*
347  *  Current process is running as the user when this is called.
348  */
exec_task(stepd_step_rec_t * job,int local_proc_id)349 extern void exec_task(stepd_step_rec_t *job, int local_proc_id)
350 {
351 	uint32_t *gtids;		/* pointer to array of ranks */
352 	int fd, j;
353 	stepd_step_task_info_t *task = job->task[local_proc_id];
354 	char **tmp_env;
355 	int saved_errno;
356 	uint32_t node_offset = 0, task_offset = 0;
357 
358 	if (job->het_job_node_offset != NO_VAL)
359 		node_offset = job->het_job_node_offset;
360 	if (job->het_job_task_offset != NO_VAL)
361 		task_offset = job->het_job_task_offset;
362 
363 	gtids = xmalloc(job->node_tasks * sizeof(uint32_t));
364 	for (j = 0; j < job->node_tasks; j++)
365 		gtids[j] = job->task[j]->gtid + task_offset;
366 	job->envtp->sgtids = _uint32_array_to_str(job->node_tasks, gtids);
367 	xfree(gtids);
368 
369 	if (job->het_job_id != NO_VAL)
370 		job->envtp->jobid = job->het_job_id;
371 	else
372 		job->envtp->jobid = job->jobid;
373 	job->envtp->stepid = job->stepid;
374 	job->envtp->nodeid = job->nodeid + node_offset;
375 	job->envtp->cpus_on_node = job->cpus;
376 	job->envtp->procid = task->gtid + task_offset;
377 	job->envtp->localid = task->id;
378 	job->envtp->task_pid = getpid();
379 	job->envtp->distribution = job->task_dist;
380 	job->envtp->cpu_bind = xstrdup(job->cpu_bind);
381 	job->envtp->cpu_bind_type = job->cpu_bind_type;
382 	job->envtp->cpu_freq_min = job->cpu_freq_min;
383 	job->envtp->cpu_freq_max = job->cpu_freq_max;
384 	job->envtp->cpu_freq_gov = job->cpu_freq_gov;
385 	job->envtp->mem_bind = xstrdup(job->mem_bind);
386 	job->envtp->mem_bind_type = job->mem_bind_type;
387 	job->envtp->distribution = -1;
388 	job->envtp->batch_flag = job->batch;
389 	job->envtp->uid = job->uid;
390 	job->envtp->user_name = xstrdup(job->user_name);
391 
392 	/*
393 	 * Modify copy of job's environment. Do not alter in place or
394 	 * concurrent searches of the environment can generate invalid memory
395 	 * references.
396 	 */
397 	job->envtp->env = env_array_copy((const char **) job->env);
398 	setup_env(job->envtp, false);
399 	setenvf(&job->envtp->env, "SLURM_JOB_GID", "%d", job->gid);
400 	setenvf(&job->envtp->env, "SLURMD_NODENAME", "%s", conf->node_name);
401 	if (job->tres_bind) {
402 		setenvf(&job->envtp->env, "SLURMD_TRES_BIND", "%s",
403 			job->tres_bind);
404 	}
405 	if (job->tres_freq) {
406 		setenvf(&job->envtp->env, "SLURMD_TRES_FREQ", "%s",
407 			job->tres_freq);
408 	}
409 	tmp_env = job->env;
410 	job->env = job->envtp->env;
411 	env_array_free(tmp_env);
412 	job->envtp->env = NULL;
413 
414 	xfree(job->envtp->task_count);
415 
416 	if (!job->batch && (job->stepid != SLURM_EXTERN_CONT)) {
417 		if (switch_g_job_attach(job->switch_job, &job->env,
418 					job->nodeid, (uint32_t) local_proc_id,
419 					job->nnodes, job->ntasks,
420 					task->gtid) < 0) {
421 			error("Unable to attach to interconnect: %m");
422 			log_fini();
423 			_exit(1);
424 		}
425 
426 		if (_setup_mpi(job, local_proc_id) != SLURM_SUCCESS) {
427 			error("Unable to configure MPI plugin: %m");
428 			log_fini();
429 			_exit(1);
430 		}
431 	}
432 
433 	/* task-specific pre-launch activities */
434 
435 	/* task plugin hook */
436 	if (task_g_pre_launch(job)) {
437 		error("Failed to invoke task plugins: task_p_pre_launch error");
438 		_exit(1);
439 	}
440 	if (!job->batch && (job->accel_bind_type || job->tres_bind)) {
441 		/*
442 		 * Modify copy of job's environment as needed for GRES. Do not
443 		 * alter in place or concurrent searches of the environment can
444 		 * generate invalid memory references.
445 		 */
446 		job->envtp->env = env_array_copy((const char **) job->env);
447 		gres_plugin_step_set_env(&job->envtp->env, job->step_gres_list,
448 					 job->accel_bind_type, job->tres_bind,
449 					 local_proc_id);
450 		tmp_env = job->env;
451 		job->env = job->envtp->env;
452 		env_array_free(tmp_env);
453 	}
454 
455 	if (spank_user_task(job, local_proc_id) < 0) {
456 		error("Failed to invoke spank plugin stack");
457 		_exit(1);
458 	}
459 
460 	if (conf->task_prolog) {
461 		char *my_prolog;
462 		slurm_mutex_lock(&conf->config_mutex);
463 		my_prolog = xstrdup(conf->task_prolog);
464 		slurm_mutex_unlock(&conf->config_mutex);
465 		_run_script_and_set_env("slurm task_prolog",
466 					my_prolog, job);
467 		xfree(my_prolog);
468 	}
469 	if (job->task_prolog) {
470 		_run_script_and_set_env("user task_prolog",
471 					job->task_prolog, job);
472 	}
473 
474 	/*
475 	 * Set TMPDIR after running prolog scripts, since TMPDIR
476 	 * might be set or changed in one of the prolog scripts.
477 	 */
478 	if (local_proc_id == 0)
479 		_make_tmpdir(job);
480 
481 	if (!job->batch)
482 		pdebug_stop_current(job);
483 	if (job->env == NULL) {
484 		debug("job->env is NULL");
485 		job->env = (char **)xmalloc(sizeof(char *));
486 		job->env[0] = (char *)NULL;
487 	}
488 
489 	if (task->argv[0] == NULL) {
490 		error("No executable program specified for this task");
491 		_exit(2);
492 	}
493 
494 	if (*task->argv[0] != '/') {
495 		/*
496 		 * Handle PATH resolution for the command to launch.
497 		 * Need to handle this late so that SPANK and other plugins
498 		 * have a chance to manipulate the PATH and/or change the
499 		 * filesystem namespaces into the final arrangement, which
500 		 * may affect which executable we select.
501 		 */
502 		task->argv[0] = _build_path(task->argv[0], job->env);
503 	}
504 
505 
506 	/* Do this last so you don't worry too much about the users
507 	   limits including the slurmstepd in with it.
508 	*/
509 	if (set_user_limits(job) < 0) {
510 		debug("Unable to set user limits");
511 		log_fini();
512 		_exit(5);
513 	}
514 
515 	execve(task->argv[0], task->argv, job->env);
516 	saved_errno = errno;
517 
518 	/*
519 	 * print error message and clean up if execve() returns:
520 	 */
521 	if ((errno == ENOENT) &&
522 	    ((fd = open(task->argv[0], O_RDONLY)) >= 0)) {
523 		char buf[256], *eol;
524 		int sz;
525 		sz = read(fd, buf, sizeof(buf));
526 		if ((sz >= 3) && (xstrncmp(buf, "#!", 2) == 0)) {
527 			buf[sizeof(buf)-1] = '\0';
528 			eol = strchr(buf, '\n');
529 			if (eol)
530 				eol[0] = '\0';
531 			slurm_seterrno(saved_errno);
532 			error("execve(): bad interpreter(%s): %m", buf+2);
533 			_exit(errno);
534 		}
535 	}
536 	slurm_seterrno(saved_errno);
537 	error("execve(): %s: %m", task->argv[0]);
538 	_exit(errno);
539 }
540 
541 static void
_make_tmpdir(stepd_step_rec_t * job)542 _make_tmpdir(stepd_step_rec_t *job)
543 {
544 	char *tmpdir;
545 
546 	if (!(tmpdir = getenvp(job->env, "TMPDIR")))
547 		setenvf(&job->env, "TMPDIR", "/tmp"); /* task may want it set */
548 	else if (mkdir(tmpdir, 0700) < 0) {
549 		struct stat st;
550 		int mkdir_errno = errno;
551 
552 		if (stat(tmpdir, &st)) { /* does the file exist ? */
553 			/* show why we were not able to create it */
554 			error("Unable to create TMPDIR [%s]: %s",
555 			      tmpdir, strerror(mkdir_errno));
556 		} else if (!S_ISDIR(st.st_mode)) {  /* is it a directory? */
557 			error("TMPDIR [%s] is not a directory", tmpdir);
558 		}
559 
560 		/* Eaccess wasn't introduced until glibc 2.4 but euidaccess
561 		 * has been around for a while.  So to make sure we
562 		 * still work with older systems we include this check.
563 		 */
564 
565 #if defined(HAVE_FACCESSAT)
566 		else if (faccessat(AT_FDCWD, tmpdir, X_OK|W_OK, AT_EACCESS))
567 #elif defined(HAVE_EACCESS)
568 		else if (eaccess(tmpdir, X_OK|W_OK)) /* check permissions */
569 #else
570 		else if (euidaccess(tmpdir, X_OK|W_OK))
571 #endif
572 			error("TMPDIR [%s] is not writeable", tmpdir);
573 		else
574 			return;
575 
576 		error("Setting TMPDIR to /tmp");
577 		setenvf(&job->env, "TMPDIR", "/tmp");
578 	}
579 
580 	return;
581 }
582 
583 /*
584  * Return a string representation of an array of uint32_t elements.
585  * Each value in the array is printed in decimal notation and elements
586  * are separated by a comma.
587  *
588  * Returns an xmalloc'ed string.  Free with xfree().
589  */
_uint32_array_to_str(int array_len,const uint32_t * array)590 static char *_uint32_array_to_str(int array_len, const uint32_t *array)
591 {
592 	int i;
593 	char *sep = ",";  /* seperator */
594 	char *str = xstrdup("");
595 
596 	if (array == NULL)
597 		return str;
598 
599 	for (i = 0; i < array_len; i++) {
600 
601 		if (i == array_len-1) /* last time through loop */
602 			sep = "";
603 		xstrfmtcat(str, "%u%s", array[i], sep);
604 	}
605 
606 	return str;
607 }
608