1 /*****************************************************************************\
2  *  src/slurmd/slurmd/req.c - slurmd request handling
3  *****************************************************************************
4  *  Copyright (C) 2002-2007 The Regents of the University of California.
5  *  Copyright (C) 2008-2010 Lawrence Livermore National Security.
6  *  Portions Copyright (C) 2010-2016 SchedMD LLC.
7  *  Portions copyright (C) 2015 Mellanox Technologies Inc.
8  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
9  *  Written by Mark Grondona <mgrondona@llnl.gov>.
10  *  CODE-OCEC-09-009. All rights reserved.
11  *
12  *  This file is part of Slurm, a resource management program.
13  *  For details, see <https://slurm.schedmd.com/>.
14  *  Please also read the included file: DISCLAIMER.
15  *
16  *  Slurm is free software; you can redistribute it and/or modify it under
17  *  the terms of the GNU General Public License as published by the Free
18  *  Software Foundation; either version 2 of the License, or (at your option)
19  *  any later version.
20  *
21  *  In addition, as a special exception, the copyright holders give permission
22  *  to link the code of portions of this program with the OpenSSL library under
23  *  certain conditions as described in each individual source file, and
24  *  distribute linked combinations including the two. You must obey the GNU
25  *  General Public License in all respects for all of the code used other than
26  *  OpenSSL. If you modify file(s) with this exception, you may extend this
27  *  exception to your version of the file(s), but you are not obligated to do
28  *  so. If you do not wish to do so, delete this exception statement from your
29  *  version.  If you delete this exception statement from all source files in
30  *  the program, then also delete it here.
31  *
32  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
33  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
34  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
35  *  details.
36  *
37  *  You should have received a copy of the GNU General Public License along
38  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
39  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
40 \*****************************************************************************/
41 
42 #include "config.h"
43 
44 #include <ctype.h>
45 #include <fcntl.h>
46 #include <grp.h>
47 #ifdef HAVE_NUMA
48 #undef NUMA_VERSION1_COMPATIBILITY
49 #include <numa.h>
50 #endif
51 #include <poll.h>
52 #include <pthread.h>
53 #include <sched.h>
54 #include <signal.h>
55 #include <stdlib.h>
56 #include <string.h>
57 #include <sys/param.h>
58 #include <sys/stat.h>
59 #include <sys/types.h>
60 #include <sys/un.h>
61 #include <sys/wait.h>
62 #include <time.h>
63 #include <unistd.h>
64 #include <utime.h>
65 
66 #include "src/common/assoc_mgr.h"
67 #include "src/common/callerid.h"
68 #include "src/common/cpu_frequency.h"
69 #include "src/common/env.h"
70 #include "src/common/fd.h"
71 #include "src/common/fetch_config.h"
72 #include "src/common/forward.h"
73 #include "src/common/gres.h"
74 #include "src/common/group_cache.h"
75 #include "src/common/hostlist.h"
76 #include "src/common/list.h"
77 #include "src/common/log.h"
78 #include "src/common/macros.h"
79 #include "src/common/msg_aggr.h"
80 #include "src/common/node_features.h"
81 #include "src/common/node_select.h"
82 #include "src/common/plugstack.h"
83 #include "src/common/prep.h"
84 #include "src/common/read_config.h"
85 #include "src/common/slurm_auth.h"
86 #include "src/common/slurm_cred.h"
87 #include "src/common/slurm_acct_gather_energy.h"
88 #include "src/common/slurm_jobacct_gather.h"
89 #include "src/common/slurm_protocol_defs.h"
90 #include "src/common/slurm_protocol_api.h"
91 #include "src/common/slurm_protocol_interface.h"
92 #include "src/common/stepd_api.h"
93 #include "src/common/uid.h"
94 #include "src/common/util-net.h"
95 #include "src/common/xcgroup_read_config.h"
96 #include "src/common/xstring.h"
97 #include "src/common/xmalloc.h"
98 
99 #include "src/bcast/file_bcast.h"
100 
101 #include "src/slurmd/slurmd/get_mach_stat.h"
102 #include "src/slurmd/slurmd/slurmd.h"
103 
104 #include "src/slurmd/common/fname.h"
105 #include "src/slurmd/common/job_container_plugin.h"
106 #include "src/slurmd/common/proctrack.h"
107 #include "src/slurmd/common/run_script.h"
108 #include "src/slurmd/common/reverse_tree_math.h"
109 #include "src/slurmd/common/slurmstepd_init.h"
110 #include "src/slurmd/common/task_plugin.h"
111 
112 #define _LIMIT_INFO 0
113 
114 #define RETRY_DELAY 15		/* retry every 15 seconds */
115 #define MAX_RETRY   240		/* retry 240 times (one hour max) */
116 
117 #ifndef MAXHOSTNAMELEN
118 #define MAXHOSTNAMELEN	64
119 #endif
120 
121 #define MAX_CPU_CNT 1024
122 #define MAX_NUMA_CNT 128
123 
124 typedef struct {
125 	uint32_t job_id;
126 	uint32_t step_id;
127 	uint64_t job_mem;
128 	uint64_t step_mem;
129 } job_mem_limits_t;
130 
131 typedef struct {
132 	uint32_t job_id;
133 	uint32_t step_id;
134 } starting_step_t;
135 
136 typedef struct {
137 	uint32_t job_id;
138 	uint16_t msg_timeout;
139 	bool *prolog_fini;
140 	pthread_cond_t *timer_cond;
141 	pthread_mutex_t *timer_mutex;
142 } timer_struct_t;
143 
144 static int  _abort_step(uint32_t job_id, uint32_t step_id);
145 static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc);
146 static void _free_job_env(job_env_t *env_ptr);
147 static bool _is_batch_job_finished(uint32_t job_id);
148 static int  _job_limits_match(void *x, void *key);
149 static bool _job_still_running(uint32_t job_id);
150 static int  _kill_all_active_steps(uint32_t jobid, int sig,
151 				   int flags, bool batch, uid_t req_uid);
152 static void _launch_complete_add(uint32_t job_id);
153 static void _launch_complete_log(char *type, uint32_t job_id);
154 static void _launch_complete_rm(uint32_t job_id);
155 static void _launch_complete_wait(uint32_t job_id);
156 static int  _launch_job_fail(uint32_t job_id, uint32_t slurm_rc);
157 static bool _launch_job_test(uint32_t job_id);
158 static void _note_batch_job_finished(uint32_t job_id);
159 static int  _prolog_is_running (uint32_t jobid);
160 static int  _step_limits_match(void *x, void *key);
161 static int  _terminate_all_steps(uint32_t jobid, bool batch);
162 static void _rpc_launch_tasks(slurm_msg_t *);
163 static void _rpc_abort_job(slurm_msg_t *);
164 static void _rpc_batch_job(slurm_msg_t *msg, bool new_msg);
165 static void _rpc_prolog(slurm_msg_t *msg);
166 static void _rpc_job_notify(slurm_msg_t *);
167 static void _rpc_signal_tasks(slurm_msg_t *);
168 static void _rpc_complete_batch(slurm_msg_t *);
169 static void _rpc_terminate_tasks(slurm_msg_t *);
170 static void _rpc_timelimit(slurm_msg_t *);
171 static void _rpc_reattach_tasks(slurm_msg_t *);
172 static void _rpc_suspend_job(slurm_msg_t *msg);
173 static void _rpc_terminate_job(slurm_msg_t *);
174 static void _rpc_update_time(slurm_msg_t *);
175 static void _rpc_shutdown(slurm_msg_t *msg);
176 static void _rpc_reconfig(slurm_msg_t *msg);
177 static void _rpc_reconfig_with_config(slurm_msg_t *msg);
178 static void _rpc_reboot(slurm_msg_t *msg);
179 static void _rpc_pid2jid(slurm_msg_t *msg);
180 static int  _rpc_file_bcast(slurm_msg_t *msg);
181 static void _file_bcast_cleanup(void);
182 static int  _file_bcast_register_file(slurm_msg_t *msg,
183 				      sbcast_cred_arg_t *cred_arg,
184 				      file_bcast_info_t *key);
185 static int  _rpc_ping(slurm_msg_t *);
186 static int  _rpc_health_check(slurm_msg_t *);
187 static int  _rpc_acct_gather_update(slurm_msg_t *);
188 static int  _rpc_acct_gather_energy(slurm_msg_t *);
189 static int  _rpc_step_complete(slurm_msg_t *msg);
190 static int  _rpc_step_complete_aggr(slurm_msg_t *msg);
191 static int  _rpc_stat_jobacct(slurm_msg_t *msg);
192 static int  _rpc_list_pids(slurm_msg_t *msg);
193 static int  _rpc_daemon_status(slurm_msg_t *msg);
194 static int  _run_epilog(job_env_t *job_env);
195 static int  _run_prolog(job_env_t *job_env, slurm_cred_t *cred,
196 			bool remove_running);
197 static void _rpc_forward_data(slurm_msg_t *msg);
198 static int  _rpc_network_callerid(slurm_msg_t *msg);
199 
200 static bool _pause_for_job_completion(uint32_t jobid, char *nodes,
201 				      int maxtime);
202 static bool _slurm_authorized_user(uid_t uid);
203 static void _sync_messages_kill(kill_job_msg_t *req);
204 static int  _waiter_init (uint32_t jobid);
205 static int  _waiter_complete (uint32_t jobid);
206 
207 static bool _steps_completed_now(uint32_t jobid);
208 static sbcast_cred_arg_t *_valid_sbcast_cred(file_bcast_msg_t *req,
209 					     uid_t req_uid,
210 					     gid_t req_gid,
211 					     uint16_t protocol_version);
212 static void _wait_state_completed(uint32_t jobid, int max_delay);
213 static uid_t _get_job_uid(uint32_t jobid);
214 
215 static int  _add_starting_step(uint16_t type, void *req);
216 static int  _remove_starting_step(uint16_t type, void *req);
217 static int  _compare_starting_steps(void *s0, void *s1);
218 static int  _wait_for_starting_step(uint32_t job_id, uint32_t step_id);
219 static bool _step_is_starting(uint32_t job_id, uint32_t step_id);
220 
221 static void _add_job_running_prolog(uint32_t job_id);
222 static void _remove_job_running_prolog(uint32_t job_id);
223 static int  _match_jobid(void *s0, void *s1);
224 static void _wait_for_job_running_prolog(uint32_t job_id);
225 static bool _requeue_setup_env_fail(void);
226 
227 /*
228  *  List of threads waiting for jobs to complete
229  */
230 static List waiters;
231 
232 static pthread_mutex_t launch_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static time_t startup = 0;		/* daemon startup time */
234 static time_t last_slurmctld_msg = 0;
235 
236 static pthread_mutex_t job_limits_mutex = PTHREAD_MUTEX_INITIALIZER;
237 static List job_limits_list = NULL;
238 static bool job_limits_loaded = false;
239 
240 static int next_fini_job_inx = 0;
241 
242 /* NUM_PARALLEL_SUSP_JOBS controls the number of jobs that can be suspended or
243  * resumed at one time. */
244 #define NUM_PARALLEL_SUSP_JOBS 64
245 /* NUM_PARALLEL_SUSP_STEPS controls the number of steps per job that can be
246  * suspended at one time. */
247 #define NUM_PARALLEL_SUSP_STEPS 8
248 static pthread_mutex_t suspend_mutex = PTHREAD_MUTEX_INITIALIZER;
249 static uint32_t job_suspend_array[NUM_PARALLEL_SUSP_JOBS] = {0};
250 static int job_suspend_size = 0;
251 
252 #define JOB_STATE_CNT 64
253 static pthread_mutex_t job_state_mutex   = PTHREAD_MUTEX_INITIALIZER;
254 static pthread_cond_t  job_state_cond    = PTHREAD_COND_INITIALIZER;
255 static uint32_t active_job_id[JOB_STATE_CNT] = {0};
256 
257 static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER;
258 static pthread_mutex_t prolog_serial_mutex = PTHREAD_MUTEX_INITIALIZER;
259 
260 #define FILE_BCAST_TIMEOUT 300
261 static pthread_mutex_t file_bcast_mutex = PTHREAD_MUTEX_INITIALIZER;
262 static pthread_cond_t  file_bcast_cond  = PTHREAD_COND_INITIALIZER;
263 static int fb_read_lock = 0, fb_write_wait_lock = 0, fb_write_lock = 0;
264 static List file_bcast_list = NULL;
265 
266 static pthread_mutex_t waiter_mutex = PTHREAD_MUTEX_INITIALIZER;
267 
268 void
slurmd_req(slurm_msg_t * msg)269 slurmd_req(slurm_msg_t *msg)
270 {
271 	int rc;
272 
273 	if (msg == NULL) {
274 		if (startup == 0)
275 			startup = time(NULL);
276 		slurm_mutex_lock(&waiter_mutex);
277 		FREE_NULL_LIST(waiters);
278 		slurm_mutex_unlock(&waiter_mutex);
279 		slurm_mutex_lock(&job_limits_mutex);
280 		if (job_limits_list) {
281 			FREE_NULL_LIST(job_limits_list);
282 			job_limits_loaded = false;
283 		}
284 		slurm_mutex_unlock(&job_limits_mutex);
285 		return;
286 	}
287 
288 	switch (msg->msg_type) {
289 	case REQUEST_LAUNCH_PROLOG:
290 		debug2("Processing RPC: REQUEST_LAUNCH_PROLOG");
291 		_rpc_prolog(msg);
292 		last_slurmctld_msg = time(NULL);
293 		break;
294 	case REQUEST_BATCH_JOB_LAUNCH:
295 		debug2("Processing RPC: REQUEST_BATCH_JOB_LAUNCH");
296 		/* Mutex locking moved into _rpc_batch_job() due to
297 		 * very slow prolog on Blue Gene system. Only batch
298 		 * jobs are supported on Blue Gene (no job steps). */
299 		_rpc_batch_job(msg, true);
300 		last_slurmctld_msg = time(NULL);
301 		break;
302 	case REQUEST_LAUNCH_TASKS:
303 		debug2("Processing RPC: REQUEST_LAUNCH_TASKS");
304 		slurm_mutex_lock(&launch_mutex);
305 		_rpc_launch_tasks(msg);
306 		slurm_mutex_unlock(&launch_mutex);
307 		break;
308 	case REQUEST_SIGNAL_TASKS:
309 		debug2("Processing RPC: REQUEST_SIGNAL_TASKS");
310 		_rpc_signal_tasks(msg);
311 		break;
312 	case REQUEST_TERMINATE_TASKS:
313 		debug2("Processing RPC: REQUEST_TERMINATE_TASKS");
314 		_rpc_terminate_tasks(msg);
315 		break;
316 	case REQUEST_KILL_PREEMPTED:
317 		debug2("Processing RPC: REQUEST_KILL_PREEMPTED");
318 		last_slurmctld_msg = time(NULL);
319 		_rpc_timelimit(msg);
320 		break;
321 	case REQUEST_KILL_TIMELIMIT:
322 		debug2("Processing RPC: REQUEST_KILL_TIMELIMIT");
323 		last_slurmctld_msg = time(NULL);
324 		_rpc_timelimit(msg);
325 		break;
326 	case REQUEST_REATTACH_TASKS:
327 		debug2("Processing RPC: REQUEST_REATTACH_TASKS");
328 		_rpc_reattach_tasks(msg);
329 		break;
330 	case REQUEST_SUSPEND_INT:
331 		debug2("Processing RPC: REQUEST_SUSPEND_INT");
332 		_rpc_suspend_job(msg);
333 		last_slurmctld_msg = time(NULL);
334 		break;
335 	case REQUEST_ABORT_JOB:
336 		debug2("Processing RPC: REQUEST_ABORT_JOB");
337 		last_slurmctld_msg = time(NULL);
338 		_rpc_abort_job(msg);
339 		break;
340 	case REQUEST_TERMINATE_JOB:
341 		debug2("Processing RPC: REQUEST_TERMINATE_JOB");
342 		last_slurmctld_msg = time(NULL);
343 		_rpc_terminate_job(msg);
344 		break;
345 	case REQUEST_COMPLETE_BATCH_SCRIPT:
346 		debug2("Processing RPC: REQUEST_COMPLETE_BATCH_SCRIPT");
347 		_rpc_complete_batch(msg);
348 		break;
349 	case REQUEST_UPDATE_JOB_TIME:
350 		debug2("Processing RPC: REQUEST_UPDATE_JOB_TIME");
351 		_rpc_update_time(msg);
352 		last_slurmctld_msg = time(NULL);
353 		break;
354 	case REQUEST_SHUTDOWN:
355 		debug2("Processing RPC: REQUEST_SHUTDOWN");
356 		_rpc_shutdown(msg);
357 		break;
358 	case REQUEST_RECONFIGURE:
359 		debug2("Processing RPC: REQUEST_RECONFIGURE");
360 		_rpc_reconfig(msg);
361 		last_slurmctld_msg = time(NULL);
362 		break;
363 	case REQUEST_RECONFIGURE_WITH_CONFIG:
364 		debug2("Processing RPC: REQUEST_RECONFIGURE_WITH_CONFIG");
365 		_rpc_reconfig_with_config(msg);
366 		last_slurmctld_msg = time(NULL);
367 		break;
368 	case REQUEST_REBOOT_NODES:
369 		debug2("Processing RPC: REQUEST_REBOOT_NODES");
370 		_rpc_reboot(msg);
371 		break;
372 	case REQUEST_NODE_REGISTRATION_STATUS:
373 		debug2("Processing RPC: REQUEST_NODE_REGISTRATION_STATUS");
374 		get_reg_resp = 1;
375 		/* Treat as ping (for slurmctld agent, just return SUCCESS) */
376 		rc = _rpc_ping(msg);
377 		last_slurmctld_msg = time(NULL);
378 		/* Then initiate a separate node registration */
379 		if (rc == SLURM_SUCCESS)
380 			send_registration_msg(SLURM_SUCCESS, true);
381 		break;
382 	case REQUEST_PING:
383 		_rpc_ping(msg);
384 		last_slurmctld_msg = time(NULL);
385 		break;
386 	case REQUEST_HEALTH_CHECK:
387 		debug2("Processing RPC: REQUEST_HEALTH_CHECK");
388 		_rpc_health_check(msg);
389 		last_slurmctld_msg = time(NULL);
390 		break;
391 	case REQUEST_ACCT_GATHER_UPDATE:
392 		debug2("Processing RPC: REQUEST_ACCT_GATHER_UPDATE");
393 		_rpc_acct_gather_update(msg);
394 		last_slurmctld_msg = time(NULL);
395 		break;
396 	case REQUEST_ACCT_GATHER_ENERGY:
397 		debug2("Processing RPC: REQUEST_ACCT_GATHER_ENERGY");
398 		_rpc_acct_gather_energy(msg);
399 		break;
400 	case REQUEST_JOB_ID:
401 		_rpc_pid2jid(msg);
402 		break;
403 	case REQUEST_FILE_BCAST:
404 		rc = _rpc_file_bcast(msg);
405 		slurm_send_rc_msg(msg, rc);
406 		break;
407 	case REQUEST_STEP_COMPLETE:
408 		(void) _rpc_step_complete(msg);
409 		break;
410 	case REQUEST_STEP_COMPLETE_AGGR:
411 		(void) _rpc_step_complete_aggr(msg);
412 		break;
413 	case REQUEST_JOB_STEP_STAT:
414 		(void) _rpc_stat_jobacct(msg);
415 		break;
416 	case REQUEST_JOB_STEP_PIDS:
417 		(void) _rpc_list_pids(msg);
418 		break;
419 	case REQUEST_DAEMON_STATUS:
420 		_rpc_daemon_status(msg);
421 		break;
422 	case REQUEST_JOB_NOTIFY:
423 		_rpc_job_notify(msg);
424 		break;
425 	case REQUEST_FORWARD_DATA:
426 		_rpc_forward_data(msg);
427 		break;
428 	case REQUEST_NETWORK_CALLERID:
429 		debug2("Processing RPC: REQUEST_NETWORK_CALLERID");
430 		_rpc_network_callerid(msg);
431 		break;
432 	case MESSAGE_COMPOSITE:
433 		error("Processing RPC: MESSAGE_COMPOSITE: "
434 		      "This should never happen");
435 		msg_aggr_add_msg(msg, 0, NULL);
436 		break;
437 	case RESPONSE_MESSAGE_COMPOSITE:
438 		debug2("Processing RPC: RESPONSE_MESSAGE_COMPOSITE");
439 		msg_aggr_resp(msg);
440 		break;
441 	default:
442 		error("slurmd_req: invalid request msg type %d",
443 		      msg->msg_type);
444 		slurm_send_rc_msg(msg, EINVAL);
445 		break;
446 	}
447 	return;
448 }
449 
send_slurmd_conf_lite(int fd,slurmd_conf_t * cf)450 extern int send_slurmd_conf_lite(int fd, slurmd_conf_t *cf)
451 {
452 	int len;
453 
454 	/*
455 	 * Wait for the registration to come back from the slurmctld so we have
456 	 * a TRES list to work with.
457 	 */
458 	if (!assoc_mgr_tres_list) {
459 		slurm_mutex_lock(&tres_mutex);
460 		slurm_cond_wait(&tres_cond, &tres_mutex);
461 		slurm_mutex_unlock(&tres_mutex);
462 	}
463 
464 	slurm_mutex_lock(&cf->config_mutex);
465 
466 	xassert(cf->buf);
467 	if (!tres_packed) {
468 		assoc_mgr_lock_t locks = { .tres = READ_LOCK };
469 		assoc_mgr_lock(&locks);
470 		if (assoc_mgr_tres_list) {
471 			slurm_pack_list(assoc_mgr_tres_list,
472 					slurmdb_pack_tres_rec, cf->buf,
473 					SLURM_PROTOCOL_VERSION);
474 		} else {
475 			fatal("%s: assoc_mgr_tres_list is NULL when trying to start a slurmstepd. This should never happen.",
476 			      __func__);
477 		}
478 		assoc_mgr_unlock(&locks);
479 		tres_packed = true;
480 	}
481 
482 	len = get_buf_offset(cf->buf);
483 	safe_write(fd, &len, sizeof(int));
484 	safe_write(fd, get_buf_data(cf->buf), len);
485 
486 	slurm_mutex_unlock(&cf->config_mutex);
487 
488 	return (0);
489 
490 rwfail:
491 	slurm_mutex_unlock(&cf->config_mutex);
492 	return (-1);
493 }
494 
495 static int
_send_slurmstepd_init(int fd,int type,void * req,slurm_addr_t * cli,slurm_addr_t * self,hostset_t step_hset,uint16_t protocol_version)496 _send_slurmstepd_init(int fd, int type, void *req,
497 		      slurm_addr_t *cli, slurm_addr_t *self,
498 		      hostset_t step_hset, uint16_t protocol_version)
499 {
500 	int len = 0;
501 	Buf buffer = NULL;
502 	slurm_msg_t msg;
503 
504 	int rank;
505 	int parent_rank, children, depth, max_depth;
506 	char *parent_alias = NULL;
507 	slurm_addr_t parent_addr = {0};
508 
509 	slurm_msg_t_init(&msg);
510 
511 	/* send conf over to slurmstepd */
512 	if (send_slurmd_conf_lite(fd, conf) < 0)
513 		goto rwfail;
514 
515 	/* send cgroup conf over to slurmstepd */
516 	if (xcgroup_write_conf(fd) < 0)
517 		goto rwfail;
518 
519 	/* send acct_gather.conf over to slurmstepd */
520 	if (acct_gather_write_conf(fd) < 0)
521 		goto rwfail;
522 
523 	/* send type over to slurmstepd */
524 	safe_write(fd, &type, sizeof(int));
525 
526 	/* step_hset can be NULL for batch scripts OR if the job was submitted
527 	 * by SlurmUser or root using the --no-allocate/-Z option and the job
528 	 * job credential validation by _check_job_credential() failed. If the
529 	 * job credential did not validate, then it did not come from slurmctld
530 	 * and there is no reason to send step completion messages to slurmctld.
531 	 */
532 	if (step_hset == NULL) {
533 		bool send_error = false;
534 		if (type == LAUNCH_TASKS) {
535 			launch_tasks_request_msg_t *launch_req;
536 			launch_req = (launch_tasks_request_msg_t *) req;
537 			if (launch_req->job_step_id != SLURM_EXTERN_CONT)
538 				send_error = true;
539 		}
540 		if (send_error) {
541 			info("task rank unavailable due to invalid job "
542 			     "credential, step completion RPC impossible");
543 		}
544 		rank = -1;
545 		parent_rank = -1;
546 		children = 0;
547 		depth = 0;
548 		max_depth = 0;
549 	} else if ((type == LAUNCH_TASKS) &&
550 		   (((launch_tasks_request_msg_t *)req)->alias_list)) {
551 		/*
552 		 * In the cloud, each task talks directly to the slurmctld
553 		 * since node addressing is abnormal. Setting parent_rank = -1
554 		 * is sufficient to force slurmstepd to talk directly to the
555 		 * slurmctld - see _one_step_complete_msg. We need to make sure
556 		 * to set rank to the actual rank so that the slurmctld will
557 		 * properly clean up all nodes.
558 		 */
559 		rank = hostset_find(step_hset, conf->node_name);
560 		parent_rank = -1;
561 		children = 0;
562 		depth = 0;
563 		max_depth = 0;
564 	} else {
565 #ifndef HAVE_FRONT_END
566 		int count;
567 		count = hostset_count(step_hset);
568 		rank = hostset_find(step_hset, conf->node_name);
569 		reverse_tree_info(rank, count, REVERSE_TREE_WIDTH,
570 				  &parent_rank, &children,
571 				  &depth, &max_depth);
572 
573 		if (children == -1) {
574 			error("reverse_tree_info: Sanity check fail, can't start job");
575 			goto rwfail;
576 		}
577 		if (rank > 0) { /* rank 0 talks directly to the slurmctld */
578 			int rc;
579 			/* Find the slurm_addr_t of this node's parent slurmd
580 			 * in the step host list */
581 			parent_alias = hostset_nth(step_hset, parent_rank);
582 			rc = slurm_conf_get_addr(parent_alias, &parent_addr, 0);
583 			if (rc != SLURM_SUCCESS) {
584 				error("Failed looking up address for "
585 				      "NodeName %s", parent_alias);
586 				/* parent_rank = -1; */
587 			}
588 		}
589 #else
590 		/* In FRONT_END mode, one slurmd pretends to be all
591 		 * NodeNames, so we can't compare conf->node_name
592 		 * to the NodeNames in step_hset.  Just send step complete
593 		 * RPC directly to the controller.
594 		 */
595 		rank = 0;
596 		parent_rank = -1;
597 		children = 0;
598 		depth = 0;
599 		max_depth = 0;
600 #endif
601 	}
602 	debug3("slurmstepd rank %d (%s), parent rank %d (%s), "
603 	       "children %d, depth %d, max_depth %d",
604 	       rank, conf->node_name,
605 	       parent_rank, parent_alias ? parent_alias : "NONE",
606 	       children, depth, max_depth);
607 	if (parent_alias)
608 		free(parent_alias);
609 
610 	/* send reverse-tree info to the slurmstepd */
611 	safe_write(fd, &rank, sizeof(int));
612 	safe_write(fd, &parent_rank, sizeof(int));
613 	safe_write(fd, &children, sizeof(int));
614 	safe_write(fd, &depth, sizeof(int));
615 	safe_write(fd, &max_depth, sizeof(int));
616 	safe_write(fd, &parent_addr, sizeof(slurm_addr_t));
617 
618 	/* send cli address over to slurmstepd */
619 	buffer = init_buf(0);
620 	slurm_pack_slurm_addr(cli, buffer);
621 	len = get_buf_offset(buffer);
622 	safe_write(fd, &len, sizeof(int));
623 	safe_write(fd, get_buf_data(buffer), len);
624 	free_buf(buffer);
625 	buffer = NULL;
626 
627 	/* send self address over to slurmstepd */
628 	if (self) {
629 		buffer = init_buf(0);
630 		slurm_pack_slurm_addr(self, buffer);
631 		len = get_buf_offset(buffer);
632 		safe_write(fd, &len, sizeof(int));
633 		safe_write(fd, get_buf_data(buffer), len);
634 		free_buf(buffer);
635 		buffer = NULL;
636 
637 	} else {
638 		len = 0;
639 		safe_write(fd, &len, sizeof(int));
640 	}
641 
642 	/* Send GRES information to slurmstepd */
643 	gres_plugin_send_stepd(fd);
644 
645 	/* send cpu_frequency info to slurmstepd */
646 	cpu_freq_send_info(fd);
647 
648 	/* send req over to slurmstepd */
649 	switch (type) {
650 	case LAUNCH_BATCH_JOB:
651 		msg.msg_type = REQUEST_BATCH_JOB_LAUNCH;
652 		break;
653 	case LAUNCH_TASKS:
654 		msg.msg_type = REQUEST_LAUNCH_TASKS;
655 		break;
656 	default:
657 		error("Was sent a task I didn't understand");
658 		break;
659 	}
660 	buffer = init_buf(0);
661 	msg.data = req;
662 
663 	/* always force the RPC format to the latest */
664 	msg.protocol_version = SLURM_PROTOCOL_VERSION;
665 	pack_msg(&msg, buffer);
666 	len = get_buf_offset(buffer);
667 
668 	/* send the srun protocol_version over, which may be older */
669 	safe_write(fd, &protocol_version, sizeof(uint16_t));
670 
671 	safe_write(fd, &len, sizeof(int));
672 	safe_write(fd, get_buf_data(buffer), len);
673 	free_buf(buffer);
674 	buffer = NULL;
675 
676 	return 0;
677 
678 rwfail:
679 	if (buffer)
680 		free_buf(buffer);
681 	error("_send_slurmstepd_init failed");
682 	return errno;
683 }
684 
685 
686 /*
687  * Fork and exec the slurmstepd, then send the slurmstepd its
688  * initialization data.  Then wait for slurmstepd to send an "ok"
689  * message before returning.  When the "ok" message is received,
690  * the slurmstepd has created and begun listening on its unix
691  * domain socket.
692  *
693  * Note that this code forks twice and it is the grandchild that
694  * becomes the slurmstepd process, so the slurmstepd's parent process
695  * will be init, not slurmd.
696  */
697 static int
_forkexec_slurmstepd(uint16_t type,void * req,slurm_addr_t * cli,slurm_addr_t * self,const hostset_t step_hset,uint16_t protocol_version)698 _forkexec_slurmstepd(uint16_t type, void *req,
699 		     slurm_addr_t *cli, slurm_addr_t *self,
700 		     const hostset_t step_hset, uint16_t protocol_version)
701 {
702 	pid_t pid;
703 	int to_stepd[2] = {-1, -1};
704 	int to_slurmd[2] = {-1, -1};
705 
706 	if (pipe(to_stepd) < 0 || pipe(to_slurmd) < 0) {
707 		error("%s: pipe failed: %m", __func__);
708 		return SLURM_ERROR;
709 	}
710 
711 	if (_add_starting_step(type, req)) {
712 		error("%s: failed in _add_starting_step: %m", __func__);
713 		return SLURM_ERROR;
714 	}
715 
716 	if ((pid = fork()) < 0) {
717 		error("%s: fork: %m", __func__);
718 		close(to_stepd[0]);
719 		close(to_stepd[1]);
720 		close(to_slurmd[0]);
721 		close(to_slurmd[1]);
722 		_remove_starting_step(type, req);
723 		return SLURM_ERROR;
724 	} else if (pid > 0) {
725 		int rc = SLURM_SUCCESS;
726 #if (SLURMSTEPD_MEMCHECK == 0)
727 		int i;
728 		time_t start_time = time(NULL);
729 #endif
730 		/*
731 		 * Parent sends initialization data to the slurmstepd
732 		 * over the to_stepd pipe, and waits for the return code
733 		 * reply on the to_slurmd pipe.
734 		 */
735 		if (close(to_stepd[0]) < 0)
736 			error("Unable to close read to_stepd in parent: %m");
737 		if (close(to_slurmd[1]) < 0)
738 			error("Unable to close write to_slurmd in parent: %m");
739 
740 		if ((rc = _send_slurmstepd_init(to_stepd[1], type,
741 						req, cli, self,
742 						step_hset,
743 						protocol_version)) != 0) {
744 			error("Unable to init slurmstepd");
745 			goto done;
746 		}
747 
748 		/* If running under valgrind/memcheck, this pipe doesn't work
749 		 * correctly so just skip it. */
750 #if (SLURMSTEPD_MEMCHECK == 0)
751 		i = read(to_slurmd[0], &rc, sizeof(int));
752 		if (i < 0) {
753 			error("%s: Can not read return code from slurmstepd "
754 			      "got %d: %m", __func__, i);
755 			rc = SLURM_ERROR;
756 		} else if (i != sizeof(int)) {
757 			error("%s: slurmstepd failed to send return code "
758 			      "got %d: %m", __func__, i);
759 			rc = SLURM_ERROR;
760 		} else {
761 			int delta_time = time(NULL) - start_time;
762 			int cc;
763 			if (delta_time > 5) {
764 				info("Warning: slurmstepd startup took %d sec, "
765 				     "possible file system problem or full "
766 				     "memory", delta_time);
767 			}
768 			if (rc != SLURM_SUCCESS)
769 				error("slurmstepd return code %d", rc);
770 
771 			cc = SLURM_SUCCESS;
772 			cc = write(to_stepd[1], &cc, sizeof(int));
773 			if (cc != sizeof(int)) {
774 				error("%s: failed to send ack to stepd %d: %m",
775 				      __func__, cc);
776 			}
777 		}
778 #endif
779 	done:
780 		if (_remove_starting_step(type, req))
781 			error("Error cleaning up starting_step list");
782 
783 		/* Reap child */
784 		if (waitpid(pid, NULL, 0) < 0)
785 			error("Unable to reap slurmd child process");
786 		if (close(to_stepd[1]) < 0)
787 			error("close write to_stepd in parent: %m");
788 		if (close(to_slurmd[0]) < 0)
789 			error("close read to_slurmd in parent: %m");
790 		return rc;
791 	} else {
792 #if (SLURMSTEPD_MEMCHECK == 1)
793 		/* memcheck test of slurmstepd, option #1 */
794 		char *const argv[3] = {"memcheck",
795 				       (char *)conf->stepd_loc, NULL};
796 #elif (SLURMSTEPD_MEMCHECK == 2)
797 		/* valgrind test of slurmstepd, option #2 */
798 		uint32_t job_id = 0, step_id = 0;
799 		char log_file[256];
800 		char *const argv[13] = {"valgrind", "--tool=memcheck",
801 					"--error-limit=no",
802 					"--leak-check=summary",
803 					"--show-reachable=yes",
804 					"--max-stackframe=16777216",
805 					"--num-callers=20",
806 					"--child-silent-after-fork=yes",
807 					"--track-origins=yes",
808 					log_file, (char *)conf->stepd_loc,
809 					NULL};
810 		if (type == LAUNCH_BATCH_JOB) {
811 			job_id = ((batch_job_launch_msg_t *)req)->job_id;
812 			step_id = ((batch_job_launch_msg_t *)req)->step_id;
813 		} else if (type == LAUNCH_TASKS) {
814 			job_id = ((launch_tasks_request_msg_t *)req)->job_id;
815 			step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
816 		}
817 		snprintf(log_file, sizeof(log_file),
818 			 "--log-file=/tmp/slurmstepd_valgrind_%u.%u",
819 			 job_id, step_id);
820 #elif (SLURMSTEPD_MEMCHECK == 3)
821 		/* valgrind/drd test of slurmstepd, option #3 */
822 		uint32_t job_id = 0, step_id = 0;
823 		char log_file[256];
824 		char *const argv[10] = {"valgrind", "--tool=drd",
825 					"--error-limit=no",
826 					"--max-stackframe=16777216",
827 					"--num-callers=20",
828 					"--child-silent-after-fork=yes",
829 					log_file, (char *)conf->stepd_loc,
830 					NULL};
831 		if (type == LAUNCH_BATCH_JOB) {
832 			job_id = ((batch_job_launch_msg_t *)req)->job_id;
833 			step_id = ((batch_job_launch_msg_t *)req)->step_id;
834 		} else if (type == LAUNCH_TASKS) {
835 			job_id = ((launch_tasks_request_msg_t *)req)->job_id;
836 			step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
837 		}
838 		snprintf(log_file, sizeof(log_file),
839 			 "--log-file=/tmp/slurmstepd_valgrind_%u.%u",
840 			 job_id, step_id);
841 #elif (SLURMSTEPD_MEMCHECK == 4)
842 		/* valgrind/helgrind test of slurmstepd, option #4 */
843 		uint32_t job_id = 0, step_id = 0;
844 		char log_file[256];
845 		char *const argv[10] = {"valgrind", "--tool=helgrind",
846 					"--error-limit=no",
847 					"--max-stackframe=16777216",
848 					"--num-callers=20",
849 					"--child-silent-after-fork=yes",
850 					log_file, (char *)conf->stepd_loc,
851 					NULL};
852 		if (type == LAUNCH_BATCH_JOB) {
853 			job_id = ((batch_job_launch_msg_t *)req)->job_id;
854 			step_id = ((batch_job_launch_msg_t *)req)->step_id;
855 		} else if (type == LAUNCH_TASKS) {
856 			job_id = ((launch_tasks_request_msg_t *)req)->job_id;
857 			step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
858 		}
859 		snprintf(log_file, sizeof(log_file),
860 			 "--log-file=/tmp/slurmstepd_valgrind_%u.%u",
861 			 job_id, step_id);
862 #else
863 		/* no memory checking, default */
864 		char *const argv[2] = { (char *)conf->stepd_loc, NULL};
865 #endif
866 		int i;
867 		int failed = 0;
868 
869 		/*
870 		 * Child forks and exits
871 		 */
872 		if (setsid() < 0) {
873 			error("%s: setsid: %m", __func__);
874 			failed = 1;
875 		}
876 		if ((pid = fork()) < 0) {
877 			error("%s: Unable to fork grandchild: %m", __func__);
878 			failed = 2;
879 		} else if (pid > 0) { /* child */
880 			_exit(0);
881 		}
882 
883 		/*
884 		 * Just in case we (or someone we are linking to)
885 		 * opened a file and didn't do a close on exec.  This
886 		 * is needed mostly to protect us against libs we link
887 		 * to that don't set the flag as we should already be
888 		 * setting it for those that we open.  The number 256
889 		 * is an arbitrary number based off test7.9.
890 		 */
891 		for (i=3; i<256; i++) {
892 			(void) fcntl(i, F_SETFD, FD_CLOEXEC);
893 		}
894 
895 		/*
896 		 * Grandchild exec's the slurmstepd
897 		 *
898 		 * If the slurmd is being shutdown/restarted before
899 		 * the pipe happens the old conf->lfd could be reused
900 		 * and if we close it the dup2 below will fail.
901 		 */
902 		if ((to_stepd[0] != conf->lfd)
903 		    && (to_slurmd[1] != conf->lfd))
904 			close(conf->lfd);
905 
906 		if (close(to_stepd[1]) < 0)
907 			error("close write to_stepd in grandchild: %m");
908 		if (close(to_slurmd[0]) < 0)
909 			error("close read to_slurmd in parent: %m");
910 
911 		(void) close(STDIN_FILENO); /* ignore return */
912 		if (dup2(to_stepd[0], STDIN_FILENO) == -1) {
913 			error("dup2 over STDIN_FILENO: %m");
914 			_exit(1);
915 		}
916 		fd_set_close_on_exec(to_stepd[0]);
917 		(void) close(STDOUT_FILENO); /* ignore return */
918 		if (dup2(to_slurmd[1], STDOUT_FILENO) == -1) {
919 			error("dup2 over STDOUT_FILENO: %m");
920 			_exit(1);
921 		}
922 		fd_set_close_on_exec(to_slurmd[1]);
923 		(void) close(STDERR_FILENO); /* ignore return */
924 		if (dup2(devnull, STDERR_FILENO) == -1) {
925 			error("dup2 /dev/null to STDERR_FILENO: %m");
926 			_exit(1);
927 		}
928 		fd_set_noclose_on_exec(STDERR_FILENO);
929 		log_fini();
930 		if (!failed) {
931 			execvp(argv[0], argv);
932 			error("exec of slurmstepd failed: %m");
933 		}
934 		_exit(2);
935 	}
936 }
937 
_setup_x11_display(uint32_t job_id,uint32_t step_id,char *** env,uint32_t * envc)938 static void _setup_x11_display(uint32_t job_id, uint32_t step_id,
939 			       char ***env, uint32_t *envc)
940 {
941 	int display = 0, fd;
942 	char *xauthority = NULL;
943 	uint16_t protocol_version;
944 
945 	fd = stepd_connect(conf->spooldir, conf->node_name,
946 			   job_id, SLURM_EXTERN_CONT,
947 			   &protocol_version);
948 
949 	if (fd == -1) {
950 		error("could not get x11 forwarding display for job %u step %u,"
951 		      " x11 forwarding disabled", job_id, step_id);
952 		return;
953 	}
954 
955 	display = stepd_get_x11_display(fd, protocol_version, &xauthority);
956 	close(fd);
957 
958 	if (!display) {
959 		error("could not get x11 forwarding display for job %u step %u,"
960 		      " x11 forwarding disabled", job_id, step_id);
961 		env_array_overwrite(env, "DISPLAY", "SLURM_X11_SETUP_FAILED");
962 		*envc = envcount(*env);
963 		return;
964 	}
965 
966 	debug2("%s: setting DISPLAY=localhost:%d:0 for job %u step %u",
967 	       __func__, display, job_id, step_id);
968 	env_array_overwrite_fmt(env, "DISPLAY", "localhost:%d.0", display);
969 
970 	if (xauthority) {
971 		env_array_overwrite(env, "XAUTHORITY", xauthority);
972 		xfree(xauthority);
973 	}
974 
975 	*envc = envcount(*env);
976 }
977 
978 /*
979  * The job(step) credential is the only place to get a definitive
980  * list of the nodes allocated to a job step.  We need to return
981  * a hostset_t of the nodes. Validate the incoming RPC, updating
982  * job_mem needed.
983  */
_check_job_credential(launch_tasks_request_msg_t * req,uid_t auth_uid,gid_t auth_gid,int node_id,hostset_t * step_hset,uint16_t protocol_version)984 static int _check_job_credential(launch_tasks_request_msg_t *req,
985 				 uid_t auth_uid, gid_t auth_gid,
986 				 int node_id, hostset_t *step_hset,
987 				 uint16_t protocol_version)
988 {
989 	slurm_cred_arg_t arg;
990 	hostset_t	s_hset = NULL;
991 	bool		user_ok = _slurm_authorized_user(auth_uid);
992 	int		host_index = -1;
993 	slurm_cred_t    *cred = req->cred;
994 	uint32_t	jobid = req->job_id;
995 	uint32_t	stepid = req->job_step_id;
996 	int		tasks_to_launch = req->tasks_to_launch[node_id];
997 	uint32_t	job_cpus = 0, step_cpus = 0;
998 
999 	if (user_ok && (req->flags & LAUNCH_NO_ALLOC)) {
1000 		/* If we didn't allocate then the cred isn't valid, just skip
1001 		 * checking.  This is only cool for root or SlurmUser */
1002 		debug("%s: FYI, user %d is an authorized user running outside of an allocation.",
1003 		      __func__, auth_uid);
1004 		return SLURM_SUCCESS;
1005 	}
1006 
1007 	/*
1008 	 * First call slurm_cred_verify() so that all credentials are checked
1009 	 */
1010 	if (slurm_cred_verify(conf->vctx, cred, &arg, protocol_version) < 0)
1011 		return SLURM_ERROR;
1012 
1013 	if ((arg.jobid != jobid) || (arg.stepid != stepid)) {
1014 		error("job credential for %u.%u, expected %u.%u",
1015 		      arg.jobid, arg.stepid, jobid, stepid);
1016 		goto fail;
1017 	}
1018 
1019 	if (arg.uid != req->uid) {
1020 		error("job %u credential created for uid %u, expected %u",
1021 		      arg.jobid, arg.uid, req->uid);
1022 		goto fail;
1023 	}
1024 
1025 	if (arg.gid != req->gid) {
1026 		error("job %u credential created for gid %u, expected %u",
1027 		      arg.jobid, arg.gid, req->gid);
1028 		goto fail;
1029 	}
1030 
1031 	xfree(req->user_name);
1032 	if (arg.pw_name)
1033 		req->user_name = xstrdup(arg.pw_name);
1034 	else
1035 		req->user_name = uid_to_string(req->uid);
1036 
1037 	xfree(req->gids);
1038 	if (arg.ngids) {
1039 		req->ngids = arg.ngids;
1040 		req->gids = copy_gids(arg.ngids, arg.gids);
1041 	} else {
1042 		/*
1043 		 * The gids were not sent in the cred, or dealing with an older
1044 		 * RPC format, so retrieve from cache instead.
1045 		 */
1046 		req->ngids = group_cache_lookup(req->uid, req->gid,
1047 						req->user_name, &req->gids);
1048 	}
1049 
1050 	/*
1051 	 * Check that credential is valid for this host
1052 	 */
1053 	if (!(s_hset = hostset_create(arg.step_hostlist))) {
1054 		error("Unable to parse credential hostlist: `%s'",
1055 		      arg.step_hostlist);
1056 		goto fail;
1057 	}
1058 
1059 	if (!hostset_within(s_hset, conf->node_name)) {
1060 		error("Invalid job %u.%u credential for user %u: "
1061 		      "host %s not in hostset %s",
1062 		      arg.jobid, arg.stepid, arg.uid,
1063 		      conf->node_name, arg.step_hostlist);
1064 		goto fail;
1065 	}
1066 
1067 	if ((arg.job_nhosts > 0) && (tasks_to_launch > 0)) {
1068 		uint32_t hi, i, i_first_bit=0, i_last_bit=0, j;
1069 		bool cpu_log = slurm_get_debug_flags() & DEBUG_FLAG_CPU_BIND;
1070 		bool setup_x11 = false;
1071 
1072 #ifdef HAVE_FRONT_END
1073 		host_index = 0;	/* It is always 0 for front end systems */
1074 #else
1075 		hostset_t j_hset;
1076 		/* Determine the CPU count based upon this node's index into
1077 		 * the _job's_ allocation (job's hostlist and core_bitmap) */
1078 		if (!(j_hset = hostset_create(arg.job_hostlist))) {
1079 			error("Unable to parse credential hostlist: `%s'",
1080 			      arg.job_hostlist);
1081 			goto fail;
1082 		}
1083 		host_index = hostset_find(j_hset, conf->node_name);
1084 		hostset_destroy(j_hset);
1085 
1086 		if ((host_index < 0) || (host_index >= arg.job_nhosts)) {
1087 			error("job cr credential invalid host_index %d for "
1088 			      "job %u", host_index, arg.jobid);
1089 			goto fail;
1090 		}
1091 #endif
1092 
1093 		/*
1094 		 * handle the x11 flag bit here since we have access to the
1095 		 * host_index already.
1096 		 *
1097 		 */
1098 		if (!arg.x11)
1099 			setup_x11 = false;
1100 		else if (arg.x11 & X11_FORWARD_ALL)
1101 			setup_x11 = true;
1102 		/* assumes that the first node is the batch host */
1103 		else if (((arg.x11 & X11_FORWARD_FIRST) ||
1104 			  (arg.x11 & X11_FORWARD_BATCH))
1105 			 && (host_index == 0))
1106 			setup_x11 = true;
1107 		else if ((arg.x11 & X11_FORWARD_LAST)
1108 			 && (host_index == (req->nnodes - 1)))
1109 			setup_x11 = true;
1110 
1111 		if (setup_x11)
1112 			_setup_x11_display(req->job_id, req->job_step_id,
1113 					   &req->env, &req->envc);
1114 
1115 		if (cpu_log) {
1116 			char *per_job = "", *per_step = "";
1117 			uint64_t job_mem  = arg.job_mem_limit;
1118 			uint64_t step_mem = arg.step_mem_limit;
1119 			if (job_mem & MEM_PER_CPU) {
1120 				job_mem &= (~MEM_PER_CPU);
1121 				per_job = "_per_CPU";
1122 			}
1123 			if (step_mem & MEM_PER_CPU) {
1124 				step_mem &= (~MEM_PER_CPU);
1125 				per_step = "_per_CPU";
1126 			}
1127 			info("====================");
1128 			info("step_id:%u.%u job_mem:%"PRIu64"MB%s "
1129 			     "step_mem:%"PRIu64"MB%s",
1130 			     arg.jobid, arg.stepid, job_mem, per_job,
1131 			     step_mem, per_step);
1132 		}
1133 
1134 		hi = host_index + 1;	/* change from 0-origin to 1-origin */
1135 		for (i=0; hi; i++) {
1136 			if (hi > arg.sock_core_rep_count[i]) {
1137 				i_first_bit += arg.sockets_per_node[i] *
1138 					arg.cores_per_socket[i] *
1139 					arg.sock_core_rep_count[i];
1140 				hi -= arg.sock_core_rep_count[i];
1141 			} else {
1142 				i_first_bit += arg.sockets_per_node[i] *
1143 					arg.cores_per_socket[i] *
1144 					(hi - 1);
1145 				i_last_bit = i_first_bit +
1146 					arg.sockets_per_node[i] *
1147 					arg.cores_per_socket[i];
1148 				break;
1149 			}
1150 		}
1151 		/* Now count the allocated processors */
1152 		for (i=i_first_bit, j=0; i<i_last_bit; i++, j++) {
1153 			char *who_has = NULL;
1154 			if (bit_test(arg.job_core_bitmap, i)) {
1155 				job_cpus++;
1156 				who_has = "Job";
1157 			}
1158 			if (bit_test(arg.step_core_bitmap, i)) {
1159 				step_cpus++;
1160 				who_has = "Step";
1161 			}
1162 			if (cpu_log && who_has) {
1163 				info("JobNode[%u] CPU[%u] %s alloc",
1164 				     host_index, j, who_has);
1165 			}
1166 		}
1167 		if (cpu_log)
1168 			info("====================");
1169 		if (step_cpus == 0) {
1170 			error("cons_res: zero processors allocated to step");
1171 			step_cpus = 1;
1172 		}
1173 		/* NOTE: step_cpus is the count of allocated resources
1174 		 * (typically cores). Convert to CPU count as needed */
1175 		if (i_last_bit <= i_first_bit)
1176 			error("step credential has no CPUs selected");
1177 		else {
1178 			i = conf->cpus / (i_last_bit - i_first_bit);
1179 			if (i > 1) {
1180 				if (cpu_log)
1181 					info("Scaling CPU count by factor of "
1182 					     "%d (%u/(%u-%u))",
1183 					     i, conf->cpus,
1184 					     i_last_bit, i_first_bit);
1185 				step_cpus *= i;
1186 				job_cpus *= i;
1187 			}
1188 		}
1189 		if (tasks_to_launch > step_cpus) {
1190 			/* This is expected with the --overcommit option
1191 			 * or hyperthreads */
1192 			debug("cons_res: More than one tasks per logical "
1193 			      "processor (%d > %u) on host [%u.%u %ld %s] ",
1194 			      tasks_to_launch, step_cpus, arg.jobid,
1195 			      arg.stepid, (long) arg.uid, arg.step_hostlist);
1196 		}
1197 	} else {
1198 		step_cpus = 1;
1199 		job_cpus  = 1;
1200 	}
1201 
1202 	/* Overwrite any memory limits in the RPC with contents of the
1203 	 * memory limit within the credential.
1204 	 * Reset the CPU count on this node to correct value. */
1205 	if (arg.step_mem_limit) {
1206 		if (arg.step_mem_limit & MEM_PER_CPU) {
1207 			req->step_mem_lim  = arg.step_mem_limit &
1208 				(~MEM_PER_CPU);
1209 			req->step_mem_lim *= step_cpus;
1210 		} else
1211 			req->step_mem_lim  = arg.step_mem_limit;
1212 	} else {
1213 		if (arg.job_mem_limit & MEM_PER_CPU) {
1214 			req->step_mem_lim  = arg.job_mem_limit &
1215 				(~MEM_PER_CPU);
1216 			req->step_mem_lim *= job_cpus;
1217 		} else
1218 			req->step_mem_lim  = arg.job_mem_limit;
1219 	}
1220 	if (arg.job_mem_limit & MEM_PER_CPU) {
1221 		req->job_mem_lim  = arg.job_mem_limit & (~MEM_PER_CPU);
1222 		req->job_mem_lim *= job_cpus;
1223 	} else
1224 		req->job_mem_lim  = arg.job_mem_limit;
1225 	req->job_core_spec = arg.job_core_spec;
1226 	req->node_cpus = step_cpus;
1227 #if 0
1228 	info("%u.%u node_id:%d mem orig:%"PRIu64" cpus:%u limit:%"PRIu64"",
1229 	     jobid, stepid, node_id, arg.job_mem_limit,
1230 	     step_cpus, req->job_mem_lim);
1231 #endif
1232 
1233 	*step_hset = s_hset;
1234 	slurm_cred_free_args(&arg);
1235 	return SLURM_SUCCESS;
1236 
1237 fail:
1238 	if (s_hset)
1239 		hostset_destroy(s_hset);
1240 	*step_hset = NULL;
1241 	slurm_cred_free_args(&arg);
1242 	slurm_seterrno_ret(ESLURMD_INVALID_JOB_CREDENTIAL);
1243 }
1244 
_str_to_memset(bitstr_t * mask,char * str)1245 static int _str_to_memset(bitstr_t *mask, char *str)
1246 {
1247 	int len = strlen(str);
1248 	const char *ptr = str + len - 1;
1249 	int base = 0;
1250 
1251 	while (ptr >= str) {
1252 		char val = slurm_char_to_hex(*ptr);
1253 		if (val == (char) -1)
1254 			return -1;
1255 		if ((val & 1) && (base < MAX_NUMA_CNT))
1256 			bit_set(mask, base);
1257 		base++;
1258 		if ((val & 2) && (base < MAX_NUMA_CNT))
1259 			bit_set(mask, base);
1260 		base++;
1261 		if ((val & 4) && (base < MAX_NUMA_CNT))
1262 			bit_set(mask, base);
1263 		base++;
1264 		if ((val & 8) && (base < MAX_NUMA_CNT))
1265 			bit_set(mask, base);
1266 		base++;
1267 		len--;
1268 		ptr--;
1269 	}
1270 
1271 	return 0;
1272 }
1273 
_build_cpu_bitmap(uint16_t cpu_bind_type,char * cpu_bind,int task_cnt_on_node)1274 static bitstr_t *_build_cpu_bitmap(uint16_t cpu_bind_type, char *cpu_bind,
1275 				   int task_cnt_on_node)
1276 {
1277 	bitstr_t *cpu_bitmap = NULL;
1278 	char *tmp_str, *tok, *save_ptr = NULL;
1279 	int cpu_id;
1280 
1281 	if (cpu_bind_type & CPU_BIND_NONE) {
1282 		/* Return NULL bitmap, sort all NUMA */
1283 	} else if ((cpu_bind_type & CPU_BIND_RANK) &&
1284 		   (task_cnt_on_node > 0)) {
1285 		cpu_bitmap = bit_alloc(MAX_CPU_CNT);
1286 		if (task_cnt_on_node >= MAX_CPU_CNT)
1287 			task_cnt_on_node = MAX_CPU_CNT;
1288 		for (cpu_id = 0; cpu_id < task_cnt_on_node; cpu_id++) {
1289 			bit_set(cpu_bitmap, cpu_id);
1290 		}
1291 	} else if (cpu_bind_type & CPU_BIND_MAP) {
1292 		cpu_bitmap = bit_alloc(MAX_CPU_CNT);
1293 		tmp_str = xstrdup(cpu_bind);
1294 		tok = strtok_r(tmp_str, ",", &save_ptr);
1295 		while (tok) {
1296 			if (!xstrncmp(tok, "0x", 2))
1297 				cpu_id = strtoul(tok + 2, NULL, 16);
1298 			else
1299 				cpu_id = strtoul(tok, NULL, 10);
1300 			if (cpu_id < MAX_CPU_CNT)
1301 				bit_set(cpu_bitmap, cpu_id);
1302 			tok = strtok_r(NULL, ",", &save_ptr);
1303 		}
1304 		xfree(tmp_str);
1305 	} else if (cpu_bind_type & CPU_BIND_MASK) {
1306 		cpu_bitmap = bit_alloc(MAX_CPU_CNT);
1307 		tmp_str = xstrdup(cpu_bind);
1308 		tok = strtok_r(tmp_str, ",", &save_ptr);
1309 		while (tok) {
1310 			if (!xstrncmp(tok, "0x", 2))
1311 				tok += 2;	/* Skip "0x", always hex */
1312 			(void) _str_to_memset(cpu_bitmap, tok);
1313 			tok = strtok_r(NULL, ",", &save_ptr);
1314 		}
1315 		xfree(tmp_str);
1316 	}
1317 	return cpu_bitmap;
1318 }
1319 
_xlate_cpu_to_numa_bitmap(bitstr_t * cpu_bitmap)1320 static bitstr_t *_xlate_cpu_to_numa_bitmap(bitstr_t *cpu_bitmap)
1321 {
1322 	bitstr_t *numa_bitmap = NULL;
1323 #ifdef HAVE_NUMA
1324 	struct bitmask *numa_bitmask = NULL;
1325 	char cpu_str[10240];
1326 	int i, max_numa;
1327 
1328 	if (numa_available() != -1) {
1329 		bit_fmt(cpu_str, sizeof(cpu_str), cpu_bitmap);
1330 		numa_bitmask = numa_parse_cpustring(cpu_str);
1331 		if (numa_bitmask) {
1332 			max_numa = numa_max_node();
1333 			numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1334 			for (i = 0; i <= max_numa; i++) {
1335 				if (numa_bitmask_isbitset(numa_bitmask, i))
1336 					bit_set(numa_bitmap, i);
1337 			}
1338 			numa_bitmask_free(numa_bitmask);
1339 		}
1340 	}
1341 #endif
1342 	return numa_bitmap;
1343 
1344 }
1345 
_build_numa_bitmap(uint16_t mem_bind_type,char * mem_bind,uint16_t cpu_bind_type,char * cpu_bind,int task_cnt_on_node)1346 static bitstr_t *_build_numa_bitmap(uint16_t mem_bind_type, char *mem_bind,
1347 				    uint16_t cpu_bind_type, char *cpu_bind,
1348 				    int task_cnt_on_node)
1349 {
1350 	bitstr_t *cpu_bitmap = NULL, *numa_bitmap = NULL;
1351 	char *tmp_str, *tok, *save_ptr = NULL;
1352 	int numa_id;
1353 
1354 	if (mem_bind_type & MEM_BIND_NONE) {
1355 		/* Return NULL bitmap, sort all NUMA */
1356 	} else if ((mem_bind_type & MEM_BIND_RANK) &&
1357 		   (task_cnt_on_node > 0)) {
1358 		numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1359 		if (task_cnt_on_node >= MAX_NUMA_CNT)
1360 			task_cnt_on_node = MAX_NUMA_CNT;
1361 		for (numa_id = 0; numa_id < task_cnt_on_node; numa_id++) {
1362 			bit_set(numa_bitmap, numa_id);
1363 		}
1364 	} else if (mem_bind_type & MEM_BIND_MAP) {
1365 		numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1366 		tmp_str = xstrdup(mem_bind);
1367 		tok = strtok_r(tmp_str, ",", &save_ptr);
1368 		while (tok) {
1369 			if (!xstrncmp(tok, "0x", 2))
1370 				numa_id = strtoul(tok + 2, NULL, 16);
1371 			else
1372 				numa_id = strtoul(tok, NULL, 10);
1373 			if (numa_id < MAX_NUMA_CNT)
1374 				bit_set(numa_bitmap, numa_id);
1375 			tok = strtok_r(NULL, ",", &save_ptr);
1376 		}
1377 		xfree(tmp_str);
1378 	} else if (mem_bind_type & MEM_BIND_MASK) {
1379 		numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1380 		tmp_str = xstrdup(mem_bind);
1381 		tok = strtok_r(tmp_str, ",", &save_ptr);
1382 		while (tok) {
1383 			if (!xstrncmp(tok, "0x", 2))
1384 				tok += 2;	/* Skip "0x", always hex */
1385 			(void) _str_to_memset(numa_bitmap, tok);
1386 			tok = strtok_r(NULL, ",", &save_ptr);
1387 		}
1388 		xfree(tmp_str);
1389 	} else if (mem_bind_type & MEM_BIND_LOCAL) {
1390 		cpu_bitmap = _build_cpu_bitmap(cpu_bind_type, cpu_bind,
1391 					       task_cnt_on_node);
1392 		if (cpu_bitmap) {
1393 			numa_bitmap = _xlate_cpu_to_numa_bitmap(cpu_bitmap);
1394 			FREE_NULL_BITMAP(cpu_bitmap);
1395 		}
1396 	}
1397 
1398 	return numa_bitmap;
1399 }
1400 
1401 static void
_rpc_launch_tasks(slurm_msg_t * msg)1402 _rpc_launch_tasks(slurm_msg_t *msg)
1403 {
1404 	int      errnum = SLURM_SUCCESS;
1405 	uint16_t port;
1406 	char     host[MAXHOSTNAMELEN];
1407 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
1408 	gid_t req_gid = g_slurm_auth_get_gid(msg->auth_cred);
1409 	launch_tasks_request_msg_t *req = msg->data;
1410 	bool     super_user = false;
1411 	bool     mem_sort = false;
1412 #ifndef HAVE_FRONT_END
1413 	bool     first_job_run;
1414 #endif
1415 	slurm_addr_t self;
1416 	slurm_addr_t *cli = &msg->orig_addr;
1417 	hostset_t step_hset = NULL;
1418 	job_mem_limits_t *job_limits_ptr;
1419 	int node_id = 0;
1420 	bitstr_t *numa_bitmap = NULL;
1421 
1422 #ifndef HAVE_FRONT_END
1423 	/* It is always 0 for front end systems */
1424 	node_id = nodelist_find(req->complete_nodelist, conf->node_name);
1425 #endif
1426 	memcpy(&req->orig_addr, &msg->orig_addr, sizeof(slurm_addr_t));
1427 
1428 	super_user = _slurm_authorized_user(req_uid);
1429 
1430 	if ((super_user == false) && (req_uid != req->uid)) {
1431 		error("launch task request from uid %u",
1432 		      (unsigned int) req_uid);
1433 		errnum = ESLURM_USER_ID_MISSING;	/* or invalid user */
1434 		goto done;
1435 	}
1436 	if (node_id < 0) {
1437 		info("%s: Invalid node list (%s not in %s)", __func__,
1438 		     conf->node_name, req->complete_nodelist);
1439 		errnum = ESLURM_INVALID_NODE_NAME;
1440 		goto done;
1441 	}
1442 
1443 	slurm_get_ip_str(cli, &port, host, sizeof(host));
1444 	if (req->het_job_id && (req->het_job_id != NO_VAL)) {
1445 		info("launch task %u+%u.%u (%u.%u) request from UID:%u GID:%u HOST:%s PORT:%hu",
1446 		     req->het_job_id, req->het_job_offset, req->job_step_id,
1447 		     req->job_id, req->job_step_id, req->uid, req->gid,
1448 		     host, port);
1449 	} else {
1450 		info("launch task %u.%u request from UID:%u GID:%u HOST:%s PORT:%hu",
1451 		     req->job_id, req->job_step_id, req->uid, req->gid,
1452 		     host, port);
1453 	}
1454 
1455 	/* this could be set previously and needs to be overwritten by
1456 	 * this call for messages to work correctly for the new call */
1457 	env_array_overwrite(&req->env, "SLURM_SRUN_COMM_HOST", host);
1458 	req->envc = envcount(req->env);
1459 
1460 #ifndef HAVE_FRONT_END
1461 	slurm_mutex_lock(&prolog_mutex);
1462 	first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
1463 #endif
1464 	if (_check_job_credential(req, req_uid, req_gid, node_id, &step_hset,
1465 				  msg->protocol_version) < 0) {
1466 		errnum = errno;
1467 		error("Invalid job credential from %ld@%s: %m",
1468 		      (long) req_uid, host);
1469 #ifndef HAVE_FRONT_END
1470 		slurm_mutex_unlock(&prolog_mutex);
1471 #endif
1472 		goto done;
1473 	}
1474 
1475 	/* Must follow _check_job_credential(), which sets some req fields */
1476 	task_g_slurmd_launch_request(req, node_id);
1477 
1478 #ifndef HAVE_FRONT_END
1479 	if (first_job_run) {
1480 		int rc;
1481 		job_env_t job_env;
1482 		List job_gres_list, epi_env_gres_list;
1483 		uint32_t jobid;
1484 
1485 		slurm_cred_insert_jobid(conf->vctx, req->job_id);
1486 		_add_job_running_prolog(req->job_id);
1487 		slurm_mutex_unlock(&prolog_mutex);
1488 
1489 #ifdef HAVE_NATIVE_CRAY
1490 		if (req->het_job_id && (req->het_job_id != NO_VAL))
1491 			jobid = req->het_job_id;
1492 		else
1493 			jobid = req->job_id;
1494 #else
1495 		jobid = req->job_id;
1496 #endif
1497 		if (container_g_create(jobid))
1498 			error("container_g_create(%u): %m", req->job_id);
1499 
1500 		memset(&job_env, 0, sizeof(job_env));
1501 		job_gres_list = (List) slurm_cred_get_arg(req->cred,
1502 							CRED_ARG_JOB_GRES_LIST);
1503 		epi_env_gres_list = gres_plugin_epilog_build_env(job_gres_list,
1504 							req->complete_nodelist);
1505 		gres_plugin_epilog_set_env(&job_env.gres_job_env,
1506 					   epi_env_gres_list, node_id);
1507 		FREE_NULL_LIST(epi_env_gres_list);
1508 
1509 		job_env.jobid = req->job_id;
1510 		job_env.step_id = req->job_step_id;
1511 		job_env.node_list = req->complete_nodelist;
1512 		job_env.het_job_id = req->het_job_id;
1513 		job_env.partition = req->partition;
1514 		job_env.spank_job_env = req->spank_job_env;
1515 		job_env.spank_job_env_size = req->spank_job_env_size;
1516 		job_env.uid = req->uid;
1517 		job_env.gid = req->gid;
1518 		job_env.user_name = req->user_name;
1519 		rc =  _run_prolog(&job_env, req->cred, true);
1520 		_free_job_env(&job_env);
1521 		if (rc) {
1522 			int term_sig = 0, exit_status = 0;
1523 			if (WIFSIGNALED(rc))
1524 				term_sig    = WTERMSIG(rc);
1525 			else if (WIFEXITED(rc))
1526 				exit_status = WEXITSTATUS(rc);
1527 			error("[job %u] prolog failed status=%d:%d",
1528 			      req->job_id, exit_status, term_sig);
1529 			errnum = ESLURMD_PROLOG_FAILED;
1530 			goto done;
1531 		}
1532 	} else {
1533 		slurm_mutex_unlock(&prolog_mutex);
1534 		_wait_for_job_running_prolog(req->job_id);
1535 	}
1536 
1537 	/*
1538 	 * Since the job could have been killed while the prolog was running,
1539 	 * test if the credential has since been revoked and exit as needed.
1540 	 */
1541 	if (slurm_cred_revoked(conf->vctx, req->cred)) {
1542 		info("Job %u already killed, do not launch step %u.%u",
1543 		     req->job_id, req->job_id, req->job_step_id);
1544 		errnum = ESLURMD_CREDENTIAL_REVOKED;
1545 		goto done;
1546 	}
1547 #endif
1548 
1549 	if (req->mem_bind_type & MEM_BIND_SORT) {
1550 		int task_cnt = -1;
1551 		if (req->tasks_to_launch)
1552 			task_cnt = (int) req->tasks_to_launch[node_id];
1553 		mem_sort = true;
1554 		numa_bitmap = _build_numa_bitmap(req->mem_bind_type,
1555 						 req->mem_bind,
1556 						 req->cpu_bind_type,
1557 						 req->cpu_bind, task_cnt);
1558 	}
1559 	node_features_g_step_config(mem_sort, numa_bitmap);
1560 	FREE_NULL_BITMAP(numa_bitmap);
1561 
1562 	if (req->job_mem_lim || req->step_mem_lim) {
1563 		step_loc_t step_info;
1564 		slurm_mutex_lock(&job_limits_mutex);
1565 		if (!job_limits_list)
1566 			job_limits_list = list_create(xfree_ptr);
1567 		step_info.jobid  = req->job_id;
1568 		step_info.stepid = req->job_step_id;
1569 		job_limits_ptr = list_find_first(job_limits_list,
1570 						 _step_limits_match,
1571 						 &step_info);
1572 		if (!job_limits_ptr) {
1573 			job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
1574 			job_limits_ptr->job_id   = req->job_id;
1575 			job_limits_ptr->job_mem  = req->job_mem_lim;
1576 			job_limits_ptr->step_id  = req->job_step_id;
1577 			job_limits_ptr->step_mem = req->step_mem_lim;
1578 #if _LIMIT_INFO
1579 			info("AddLim step:%u.%u job_mem:%"PRIu64" "
1580 			     "step_mem:%"PRIu64"",
1581 			     job_limits_ptr->job_id, job_limits_ptr->step_id,
1582 			     job_limits_ptr->job_mem,
1583 			     job_limits_ptr->step_mem);
1584 #endif
1585 			list_append(job_limits_list, job_limits_ptr);
1586 		}
1587 		slurm_mutex_unlock(&job_limits_mutex);
1588 	}
1589 
1590 	if (slurm_get_stream_addr(msg->conn_fd, &self)) {
1591 		error("%s: slurm_get_stream_addr(): %m", __func__);
1592 		errnum = errno;
1593 		goto done;
1594 	}
1595 
1596 	debug3("%s: call to _forkexec_slurmstepd", __func__);
1597 	errnum = _forkexec_slurmstepd(LAUNCH_TASKS, (void *)req, cli, &self,
1598 				      step_hset, msg->protocol_version);
1599 	debug3("%s: return from _forkexec_slurmstepd", __func__);
1600 	_launch_complete_add(req->job_id);
1601 
1602 done:
1603 	if (step_hset)
1604 		hostset_destroy(step_hset);
1605 
1606 	if (slurm_send_rc_msg(msg, errnum) < 0) {
1607 		char addr_str[32];
1608 		slurm_print_slurm_addr(&msg->address, addr_str,
1609 				       sizeof(addr_str));
1610 		error("%s: unable to send return code to address:port=%s msg_type=%u: %m",
1611 		      __func__, addr_str, msg->msg_type);
1612 
1613 		/*
1614 		 * Rewind credential so that srun may perform retry
1615 		 */
1616 		slurm_cred_rewind(conf->vctx, req->cred); /* ignore errors */
1617 
1618 	} else if (errnum == SLURM_SUCCESS) {
1619 		save_cred_state(conf->vctx);
1620 		task_g_slurmd_reserve_resources(req, node_id);
1621 	}
1622 
1623 	/*
1624 	 *  If job prolog failed, indicate failure to slurmctld
1625 	 */
1626 	if (errnum == ESLURMD_PROLOG_FAILED)
1627 		send_registration_msg(errnum, false);
1628 }
1629 
1630 /*
1631  * Open file based upon permissions of a different user
1632  * IN path_name - name of file to open
1633  * IN flags - flags to open() call
1634  * IN mode - mode to open() call
1635  * IN jobid - (optional) job id
1636  * IN uid - User ID to use for file access check
1637  * IN gid - Group ID to use for file access check
1638  * RET -1 on error, file descriptor otherwise
1639  */
_open_as_other(char * path_name,int flags,int mode,uint32_t jobid,uid_t uid,gid_t gid,int ngids,gid_t * gids)1640 static int _open_as_other(char *path_name, int flags, int mode,
1641 			  uint32_t jobid, uid_t uid, gid_t gid,
1642 			  int ngids, gid_t *gids)
1643 {
1644 	pid_t child;
1645 	int pipe[2];
1646 	int fd = -1, rc = 0;
1647 
1648 	if ((rc = container_g_create(jobid))) {
1649 		error("%s: container_g_create(%u): %m", __func__, jobid);
1650 		return -1;
1651 	}
1652 
1653 	/* child process will setuid to the user, register the process
1654 	 * with the container, and open the file for us. */
1655 	if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pipe) != 0) {
1656 		error("%s: Failed to open pipe: %m", __func__);
1657 		return -1;
1658 	}
1659 
1660 	child = fork();
1661 	if (child == -1) {
1662 		error("%s: fork failure", __func__);
1663 		close(pipe[0]);
1664 		close(pipe[1]);
1665 		return -1;
1666 	} else if (child > 0) {
1667 		close(pipe[0]);
1668 		(void) waitpid(child, &rc, 0);
1669 		if (WIFEXITED(rc) && (WEXITSTATUS(rc) == 0))
1670 			fd = receive_fd_over_pipe(pipe[1]);
1671 		close(pipe[1]);
1672 		return fd;
1673 	}
1674 
1675 	/* child process below here */
1676 
1677 	close(pipe[1]);
1678 
1679 	/* container_g_join needs to be called in the
1680 	 * forked process part of the fork to avoid a race
1681 	 * condition where if this process makes a file or
1682 	 * detacts itself from a child before we add the pid
1683 	 * to the container in the parent of the fork. */
1684 	if (container_g_join(jobid, uid)) {
1685 		error("%s container_g_join(%u): %m", __func__, jobid);
1686 		_exit(SLURM_ERROR);
1687 	}
1688 
1689 	/* The child actually performs the I/O and exits with
1690 	 * a return code, do not return! */
1691 
1692 	/*********************************************************************\
1693 	 * NOTE: It would be best to do an exec() immediately after the fork()
1694 	 * in order to help prevent a possible deadlock in the child process
1695 	 * due to locks being set at the time of the fork and being freed by
1696 	 * the parent process, but not freed by the child process. Performing
1697 	 * the work inline is done for simplicity. Note that the logging
1698 	 * performed by error() should be safe due to the use of
1699 	 * atfork_install_handlers() as defined in src/common/log.c.
1700 	 * Change the code below with caution.
1701 	\*********************************************************************/
1702 
1703 	if (setgroups(ngids, gids) < 0) {
1704 		error("%s: uid: %u setgroups failed: %m", __func__, uid);
1705 		_exit(errno);
1706 	}
1707 
1708 	if (setgid(gid) < 0) {
1709 		error("%s: uid:%u setgid(%u): %m", __func__, uid, gid);
1710 		_exit(errno);
1711 	}
1712 	if (setuid(uid) < 0) {
1713 		error("%s: getuid(%u): %m", __func__, uid);
1714 		_exit(errno);
1715 	}
1716 
1717 	fd = open(path_name, flags, mode);
1718 	if (fd == -1) {
1719 		 error("%s: uid:%u can't open `%s`: %m",
1720 			__func__, uid, path_name);
1721 		_exit(errno);
1722 	}
1723 	send_fd_over_pipe(pipe[0], fd);
1724 	close(fd);
1725 	_exit(SLURM_SUCCESS);
1726 }
1727 
1728 
1729 static void
_prolog_error(batch_job_launch_msg_t * req,int rc)1730 _prolog_error(batch_job_launch_msg_t *req, int rc)
1731 {
1732 	char *err_name = NULL, *path_name = NULL;
1733 	int fd;
1734 	int flags = (O_CREAT|O_APPEND|O_WRONLY);
1735 	uint32_t jobid;
1736 
1737 #ifdef HAVE_NATIVE_CRAY
1738 	if (req->het_job_id && (req->het_job_id != NO_VAL))
1739 		jobid = req->het_job_id;
1740 	else
1741 		jobid = req->job_id;
1742 #else
1743 	jobid = req->job_id;
1744 #endif
1745 
1746 	path_name = fname_create2(req);
1747 	if ((fd = _open_as_other(path_name, flags, 0644,
1748 				 jobid, req->uid, req->gid,
1749 				 req->ngids, req->gids)) == -1) {
1750 		error("Unable to open %s: Permission denied", path_name);
1751 		xfree(path_name);
1752 		return;
1753 	}
1754 	xfree(path_name);
1755 
1756 	xstrfmtcat(err_name, "Error running slurm prolog: %d\n",
1757 		   WEXITSTATUS(rc));
1758 	safe_write(fd, err_name, strlen(err_name));
1759 
1760 rwfail:
1761 	xfree(err_name);
1762 	close(fd);
1763 }
1764 
1765 /* load the user's environment on this machine if requested
1766  * SLURM_GET_USER_ENV environment variable is set */
1767 static int
_get_user_env(batch_job_launch_msg_t * req)1768 _get_user_env(batch_job_launch_msg_t *req)
1769 {
1770 	char **new_env;
1771 	int i;
1772 	static time_t config_update = 0;
1773 	static bool no_env_cache = false;
1774 
1775 	if (config_update != conf->last_update) {
1776 		char *sched_params = slurm_get_sched_params();
1777 		no_env_cache = (xstrcasestr(sched_params, "no_env_cache"));
1778 		xfree(sched_params);
1779 		config_update = conf->last_update;
1780 	}
1781 
1782 	for (i=0; i<req->envc; i++) {
1783 		if (xstrcmp(req->environment[i], "SLURM_GET_USER_ENV=1") == 0)
1784 			break;
1785 	}
1786 	if (i >= req->envc)
1787 		return 0;		/* don't need to load env */
1788 
1789 	verbose("%s: get env for user %s here", __func__, req->user_name);
1790 
1791 	/* Permit up to 120 second delay before using cache file */
1792 	new_env = env_array_user_default(req->user_name, 120, 0, no_env_cache);
1793 	if (! new_env) {
1794 		error("%s: Unable to get user's local environment%s",
1795 		      __func__, no_env_cache ?
1796 		      "" : ", running only with passed environment");
1797 		return -1;
1798 	}
1799 
1800 	env_array_merge(&new_env,
1801 			(const char **) req->environment);
1802 	env_array_free(req->environment);
1803 	req->environment = new_env;
1804 	req->envc = envcount(new_env);
1805 
1806 	return 0;
1807 }
1808 
1809 /* The RPC currently contains a memory size limit, but we load the
1810  * value from the job credential to be certain it has not been
1811  * altered by the user */
1812 static void
_set_batch_job_limits(slurm_msg_t * msg)1813 _set_batch_job_limits(slurm_msg_t *msg)
1814 {
1815 	int i;
1816 	uint32_t alloc_lps = 0, last_bit = 0;
1817 	bool cpu_log = slurm_get_debug_flags() & DEBUG_FLAG_CPU_BIND;
1818 	slurm_cred_arg_t arg;
1819 	batch_job_launch_msg_t *req = (batch_job_launch_msg_t *)msg->data;
1820 
1821 	if (slurm_cred_get_args(req->cred, &arg) != SLURM_SUCCESS)
1822 		return;
1823 	req->job_core_spec = arg.job_core_spec;	/* Prevent user reset */
1824 
1825 	if (cpu_log) {
1826 		char *per_job = "";
1827 		uint64_t job_mem  = arg.job_mem_limit;
1828 		if (job_mem & MEM_PER_CPU) {
1829 			job_mem &= (~MEM_PER_CPU);
1830 			per_job = "_per_CPU";
1831 		}
1832 		info("====================");
1833 		info("batch_job:%u job_mem:%"PRIu64"MB%s", req->job_id,
1834 		     job_mem, per_job);
1835 	}
1836 	if (cpu_log || (arg.job_mem_limit & MEM_PER_CPU)) {
1837 		if (arg.job_nhosts > 0) {
1838 			last_bit = arg.sockets_per_node[0] *
1839 				arg.cores_per_socket[0];
1840 			for (i=0; i<last_bit; i++) {
1841 				if (!bit_test(arg.job_core_bitmap, i))
1842 					continue;
1843 				if (cpu_log)
1844 					info("JobNode[0] CPU[%u] Job alloc",i);
1845 				alloc_lps++;
1846 			}
1847 		}
1848 		if (cpu_log)
1849 			info("====================");
1850 		if (alloc_lps == 0) {
1851 			error("_set_batch_job_limit: alloc_lps is zero");
1852 			alloc_lps = 1;
1853 		}
1854 
1855 		/* NOTE: alloc_lps is the count of allocated resources
1856 		 * (typically cores). Convert to CPU count as needed */
1857 		if (last_bit < 1)
1858 			error("Batch job credential allocates no CPUs");
1859 		else {
1860 			i = conf->cpus / last_bit;
1861 			if (i > 1)
1862 				alloc_lps *= i;
1863 		}
1864 	}
1865 
1866 	if (arg.job_mem_limit & MEM_PER_CPU) {
1867 		req->job_mem = arg.job_mem_limit & (~MEM_PER_CPU);
1868 		req->job_mem *= alloc_lps;
1869 	} else
1870 		req->job_mem = arg.job_mem_limit;
1871 
1872 	/*
1873 	 * handle x11 settings here since this is the only access to the cred
1874 	 * on the batch step.
1875 	 */
1876 	if ((arg.x11 & X11_FORWARD_ALL) || (arg.x11 & X11_FORWARD_BATCH))
1877 		_setup_x11_display(req->job_id, SLURM_BATCH_SCRIPT,
1878 				   &req->environment, &req->envc);
1879 
1880 	slurm_cred_free_args(&arg);
1881 }
1882 
1883 /* These functions prevent a possible race condition if the batch script's
1884  * complete RPC is processed before it's launch_successful response. This
1885  *  */
_is_batch_job_finished(uint32_t job_id)1886 static bool _is_batch_job_finished(uint32_t job_id)
1887 {
1888 	bool found_job = false;
1889 	int i;
1890 
1891 	slurm_mutex_lock(&fini_job_mutex);
1892 	for (i = 0; i < fini_job_cnt; i++) {
1893 		if (fini_job_id[i] == job_id) {
1894 			found_job = true;
1895 			break;
1896 		}
1897 	}
1898 	slurm_mutex_unlock(&fini_job_mutex);
1899 
1900 	return found_job;
1901 }
_note_batch_job_finished(uint32_t job_id)1902 static void _note_batch_job_finished(uint32_t job_id)
1903 {
1904 	slurm_mutex_lock(&fini_job_mutex);
1905 	fini_job_id[next_fini_job_inx] = job_id;
1906 	if (++next_fini_job_inx >= fini_job_cnt)
1907 		next_fini_job_inx = 0;
1908 	slurm_mutex_unlock(&fini_job_mutex);
1909 }
1910 
1911 /* Send notification to slurmctld we are finished running the prolog.
1912  * This is needed on system that don't use srun to launch their tasks.
1913  */
_notify_slurmctld_prolog_fini(uint32_t job_id,uint32_t prolog_return_code)1914 static int _notify_slurmctld_prolog_fini(
1915 	uint32_t job_id, uint32_t prolog_return_code)
1916 {
1917 	int rc, ret_c;
1918 	slurm_msg_t req_msg;
1919 	complete_prolog_msg_t req;
1920 
1921 	slurm_msg_t_init(&req_msg);
1922 	memset(&req, 0, sizeof(req));
1923 	req.job_id	= job_id;
1924 	req.prolog_rc	= prolog_return_code;
1925 
1926 	req_msg.msg_type= REQUEST_COMPLETE_PROLOG;
1927 	req_msg.data	= &req;
1928 
1929 	/*
1930 	 * Here we only care about the return code of
1931 	 * slurm_send_recv_controller_rc_msg since it means there was a
1932 	 * communication failure and we may need to try again.
1933 	 */
1934 	if ((ret_c = slurm_send_recv_controller_rc_msg(
1935 		     &req_msg, &rc, working_cluster_rec)))
1936 		error("Error sending prolog completion notification: %m");
1937 
1938 	return ret_c;
1939 }
1940 
1941 /* Convert memory limits from per-CPU to per-node */
_convert_job_mem(slurm_msg_t * msg)1942 static void _convert_job_mem(slurm_msg_t *msg)
1943 {
1944 	prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
1945 	slurm_cred_arg_t arg;
1946 	hostset_t j_hset = NULL;
1947 	int rc, hi, host_index, job_cpus;
1948 	int i, i_first_bit = 0, i_last_bit = 0;
1949 
1950 	rc = slurm_cred_verify(conf->vctx, req->cred, &arg,
1951 			       msg->protocol_version);
1952 	if (rc < 0) {
1953 		error("%s: slurm_cred_verify failed: %m", __func__);
1954 		req->nnodes = 1;	/* best guess */
1955 		return;
1956 	}
1957 
1958 	req->nnodes = arg.job_nhosts;
1959 
1960 	if (arg.job_mem_limit == 0)
1961 		goto fini;
1962 	if ((arg.job_mem_limit & MEM_PER_CPU) == 0) {
1963 		req->job_mem_limit = arg.job_mem_limit;
1964 		goto fini;
1965 	}
1966 
1967 	/* Assume 1 CPU on error */
1968 	req->job_mem_limit = arg.job_mem_limit & (~MEM_PER_CPU);
1969 
1970 	if (!(j_hset = hostset_create(arg.job_hostlist))) {
1971 		error("%s: Unable to parse credential hostlist: `%s'",
1972 		      __func__, arg.step_hostlist);
1973 		goto fini;
1974 	}
1975 	host_index = hostset_find(j_hset, conf->node_name);
1976 	hostset_destroy(j_hset);
1977 
1978 	hi = host_index + 1;	/* change from 0-origin to 1-origin */
1979 	for (i = 0; hi; i++) {
1980 		if (hi > arg.sock_core_rep_count[i]) {
1981 			i_first_bit += arg.sockets_per_node[i] *
1982 				arg.cores_per_socket[i] *
1983 				arg.sock_core_rep_count[i];
1984 			i_last_bit = i_first_bit +
1985 				arg.sockets_per_node[i] *
1986 				arg.cores_per_socket[i] *
1987 				arg.sock_core_rep_count[i];
1988 			hi -= arg.sock_core_rep_count[i];
1989 		} else {
1990 			i_first_bit += arg.sockets_per_node[i] *
1991 				arg.cores_per_socket[i] * (hi - 1);
1992 			i_last_bit = i_first_bit +
1993 				arg.sockets_per_node[i] *
1994 				arg.cores_per_socket[i];
1995 			break;
1996 		}
1997 	}
1998 
1999 	/* Now count the allocated processors on this node */
2000 	job_cpus = 0;
2001 	for (i = i_first_bit; i < i_last_bit; i++) {
2002 		if (bit_test(arg.job_core_bitmap, i))
2003 			job_cpus++;
2004 	}
2005 
2006 	/* NOTE: alloc_lps is the count of allocated resources
2007 	 * (typically cores). Convert to CPU count as needed */
2008 	if (i_last_bit > i_first_bit) {
2009 		i = conf->cpus / (i_last_bit - i_first_bit);
2010 		if (i > 1)
2011 			job_cpus *= i;
2012 	}
2013 
2014 	req->job_mem_limit *= job_cpus;
2015 
2016 fini:	slurm_cred_free_args(&arg);
2017 }
2018 
_make_prolog_mem_container(slurm_msg_t * msg)2019 static void _make_prolog_mem_container(slurm_msg_t *msg)
2020 {
2021 	prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
2022 	job_mem_limits_t *job_limits_ptr;
2023 	step_loc_t step_info;
2024 
2025 	_convert_job_mem(msg);	/* Convert per-CPU mem limit */
2026 	if (req->job_mem_limit) {
2027 		slurm_mutex_lock(&job_limits_mutex);
2028 		if (!job_limits_list)
2029 			job_limits_list = list_create(xfree_ptr);
2030 		step_info.jobid  = req->job_id;
2031 		step_info.stepid = SLURM_EXTERN_CONT;
2032 		job_limits_ptr = list_find_first(job_limits_list,
2033 						 _step_limits_match,
2034 						 &step_info);
2035 		if (!job_limits_ptr) {
2036 			job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
2037 			job_limits_ptr->job_id   = req->job_id;
2038 			job_limits_ptr->job_mem  = req->job_mem_limit;
2039 			job_limits_ptr->step_id  = SLURM_EXTERN_CONT;
2040 			job_limits_ptr->step_mem = req->job_mem_limit;
2041 #if _LIMIT_INFO
2042 			info("AddLim step:%u.%u job_mem:%"PRIu64""
2043 			     " step_mem:%"PRIu64"",
2044 			     job_limits_ptr->job_id, job_limits_ptr->step_id,
2045 			     job_limits_ptr->job_mem,
2046 			     job_limits_ptr->step_mem);
2047 #endif
2048 			list_append(job_limits_list, job_limits_ptr);
2049 		}
2050 		slurm_mutex_unlock(&job_limits_mutex);
2051 	}
2052 }
2053 
_spawn_prolog_stepd(slurm_msg_t * msg)2054 static int _spawn_prolog_stepd(slurm_msg_t *msg)
2055 {
2056 	prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
2057 	launch_tasks_request_msg_t *launch_req;
2058 	slurm_addr_t self;
2059 	slurm_addr_t *cli = &msg->orig_addr;
2060 	int rc = SLURM_SUCCESS;
2061 	int i;
2062 
2063 	launch_req = xmalloc(sizeof(launch_tasks_request_msg_t));
2064 	launch_req->alias_list		= req->alias_list;
2065 	launch_req->complete_nodelist	= req->nodes;
2066 	launch_req->cpus_per_task	= 1;
2067 	launch_req->cred		= req->cred;
2068 	launch_req->cwd			= req->work_dir;
2069 	launch_req->efname		= "/dev/null";
2070 	launch_req->gid			= req->gid;
2071 	launch_req->global_task_ids	= xmalloc(sizeof(uint32_t *)
2072 						  * req->nnodes);
2073 	launch_req->ifname		= "/dev/null";
2074 	launch_req->job_id		= req->job_id;
2075 	launch_req->job_mem_lim		= req->job_mem_limit;
2076 	launch_req->job_step_id		= SLURM_EXTERN_CONT;
2077 	launch_req->nnodes		= req->nnodes;
2078 	launch_req->ntasks		= req->nnodes;
2079 	launch_req->ofname		= "/dev/null";
2080 
2081 	launch_req->het_job_id		= req->het_job_id;
2082 	launch_req->het_job_nnodes	= NO_VAL;
2083 
2084 	launch_req->partition		= req->partition;
2085 	launch_req->spank_job_env_size	= req->spank_job_env_size;
2086 	launch_req->spank_job_env	= req->spank_job_env;
2087 	launch_req->step_mem_lim	= req->job_mem_limit;
2088 	launch_req->tasks_to_launch	= xmalloc(sizeof(uint16_t)
2089 						  * req->nnodes);
2090 	launch_req->uid			= req->uid;
2091 	launch_req->user_name		= req->user_name;
2092 
2093 	/*
2094 	 * determine which node this is in the allocation and if
2095 	 * it should setup the x11 forwarding or not
2096 	 */
2097 	if (req->x11) {
2098 		bool setup_x11 = false;
2099 		int host_index = -1;
2100 #ifdef HAVE_FRONT_END
2101 		host_index = 0;	/* It is always 0 for front end systems */
2102 #else
2103 		hostset_t j_hset;
2104 		/*
2105 		 * Determine need to setup X11 based upon this node's index into
2106 		 * the _job's_ allocation
2107 		 */
2108 		if (req->x11 & X11_FORWARD_ALL) {
2109 			;	/* Don't need host_index */
2110 		} else if (!(j_hset = hostset_create(req->nodes))) {
2111 			error("Unable to parse hostlist: `%s'", req->nodes);
2112 		} else {
2113 			host_index = hostset_find(j_hset, conf->node_name);
2114 			hostset_destroy(j_hset);
2115 		}
2116 #endif
2117 
2118 		if (req->x11 & X11_FORWARD_ALL)
2119 			setup_x11 = true;
2120 		/* assumes that the first node is the batch host */
2121 		else if (((req->x11 & X11_FORWARD_FIRST) ||
2122 			  (req->x11 & X11_FORWARD_BATCH))
2123 			 && (host_index == 0))
2124 			setup_x11 = true;
2125 		else if ((req->x11 & X11_FORWARD_LAST)
2126 			 && (host_index == (req->nnodes - 1)))
2127 			setup_x11 = true;
2128 
2129 		if (setup_x11) {
2130 			launch_req->x11 = req->x11;
2131 			launch_req->x11_alloc_host = req->x11_alloc_host;
2132 			launch_req->x11_alloc_port = req->x11_alloc_port;
2133 			launch_req->x11_magic_cookie = req->x11_magic_cookie;
2134 			launch_req->x11_target = req->x11_target;
2135 			launch_req->x11_target_port = req->x11_target_port;
2136 		}
2137 	}
2138 
2139 	for (i = 0; i < req->nnodes; i++) {
2140 		uint32_t *tmp32 = xmalloc(sizeof(uint32_t));
2141 		*tmp32 = i;
2142 		launch_req->global_task_ids[i] = tmp32;
2143 		launch_req->tasks_to_launch[i] = 1;
2144 	}
2145 
2146 	/*
2147 	 * Since job could have been killed while the prolog was
2148 	 * running (especially on BlueGene, which can take minutes
2149 	 * for partition booting). Test if the credential has since
2150 	 * been revoked and exit as needed.
2151 	 */
2152 	if (slurm_get_stream_addr(msg->conn_fd, &self)) {
2153 		error("%s: slurm_get_stream_addr(): %m", __func__);
2154 		rc = SLURM_ERROR;
2155 	} else if (slurm_cred_revoked(conf->vctx, req->cred)) {
2156 		info("Job %u already killed, do not launch extern step",
2157 		     req->job_id);
2158 		/*
2159 		 * Don't set the rc to SLURM_ERROR at this point.
2160 		 * The job's already been killed, and returning a prolog
2161 		 * failure will just add more confusion. Better to just
2162 		 * silently terminate.
2163 		 */
2164 	} else {
2165 		hostset_t step_hset = hostset_create(req->nodes);
2166 		int rc;
2167 
2168 		debug3("%s: call to _forkexec_slurmstepd", __func__);
2169 		rc =  _forkexec_slurmstepd(LAUNCH_TASKS, (void *)launch_req,
2170 					   cli, &self, step_hset,
2171 					   msg->protocol_version);
2172 		debug3("%s: return from _forkexec_slurmstepd %d",
2173 		       __func__, rc);
2174 
2175 		if (rc != SLURM_SUCCESS)
2176 			_launch_job_fail(req->job_id, rc);
2177 
2178 		if (step_hset)
2179 			hostset_destroy(step_hset);
2180 	}
2181 
2182 	for (i = 0; i < req->nnodes; i++)
2183 		xfree(launch_req->global_task_ids[i]);
2184 	xfree(launch_req->global_task_ids);
2185 	xfree(launch_req->tasks_to_launch);
2186 	xfree(launch_req);
2187 
2188 	return rc;
2189 }
2190 
_rpc_prolog(slurm_msg_t * msg)2191 static void _rpc_prolog(slurm_msg_t *msg)
2192 {
2193 	int rc = SLURM_SUCCESS, alt_rc = SLURM_ERROR, node_id = 0;
2194 	prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
2195 	job_env_t job_env;
2196 	bool     first_job_run;
2197 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2198 	uint32_t jobid;
2199 
2200 	if (req == NULL)
2201 		return;
2202 
2203 	if (!_slurm_authorized_user(req_uid)) {
2204 		error("REQUEST_LAUNCH_PROLOG request from uid %u",
2205 		      (unsigned int) req_uid);
2206 		return;
2207 	}
2208 
2209 	if (!req->user_name)
2210 		req->user_name = uid_to_string(req->uid);
2211 	/*
2212 	 * Send message back to the slurmctld so it knows we got the rpc.  A
2213 	 * prolog could easily run way longer than a MessageTimeout or we would
2214 	 * just wait.
2215 	 */
2216 	if (slurm_send_rc_msg(msg, rc) < 0) {
2217 		error("%s: Error talking to slurmctld: %m", __func__);
2218 	}
2219 
2220 	slurm_mutex_lock(&prolog_mutex);
2221 	first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
2222 	if (first_job_run) {
2223 #ifndef HAVE_FRONT_END
2224 		/* It is always 0 for front end systems */
2225 		node_id = nodelist_find(req->nodes, conf->node_name);
2226 #endif
2227 		if (slurmctld_conf.prolog_flags & PROLOG_FLAG_CONTAIN)
2228 			_make_prolog_mem_container(msg);
2229 
2230 		slurm_cred_insert_jobid(conf->vctx, req->job_id);
2231 		_add_job_running_prolog(req->job_id);
2232 		/* signal just in case the batch rpc got here before we did */
2233 		slurm_cond_broadcast(&conf->prolog_running_cond);
2234 		slurm_mutex_unlock(&prolog_mutex);
2235 		memset(&job_env, 0, sizeof(job_env));
2236 		gres_plugin_epilog_set_env(&job_env.gres_job_env,
2237 					   req->job_gres_info, node_id);
2238 
2239 		job_env.jobid = req->job_id;
2240 		job_env.step_id = 0;	/* not available */
2241 		job_env.node_list = req->nodes;
2242 		job_env.het_job_id = req->het_job_id;
2243 		job_env.partition = req->partition;
2244 		job_env.spank_job_env = req->spank_job_env;
2245 		job_env.spank_job_env_size = req->spank_job_env_size;
2246 		job_env.uid = req->uid;
2247 		job_env.gid = req->gid;
2248 		job_env.user_name = req->user_name;
2249 
2250 #ifdef HAVE_NATIVE_CRAY
2251 		if (req->het_job_id && (req->het_job_id != NO_VAL))
2252 			jobid = req->het_job_id;
2253 		else
2254 			jobid = req->job_id;
2255 #else
2256 		jobid = req->job_id;
2257 #endif
2258 
2259 		if ((rc = container_g_create(jobid)))
2260 			error("container_g_create(%u): %m", req->job_id);
2261 		else
2262 			rc = _run_prolog(&job_env, req->cred, false);
2263 		_free_job_env(&job_env);
2264 		if (rc) {
2265 			int term_sig = 0, exit_status = 0;
2266 			if (WIFSIGNALED(rc))
2267 				term_sig    = WTERMSIG(rc);
2268 			else if (WIFEXITED(rc))
2269 				exit_status = WEXITSTATUS(rc);
2270 			error("[job %u] prolog failed status=%d:%d",
2271 			      req->job_id, exit_status, term_sig);
2272 			rc = ESLURMD_PROLOG_FAILED;
2273 		}
2274 
2275 		if ((rc == SLURM_SUCCESS) &&
2276 		    (slurmctld_conf.prolog_flags & PROLOG_FLAG_CONTAIN))
2277 			rc = _spawn_prolog_stepd(msg);
2278 
2279 		/*
2280 		 * Revoke cred so that the slurmd won't launch tasks if the
2281 		 * prolog failed. The slurmd waits for the prolog to finish but
2282 		 * can't check the return code.
2283 		 */
2284 		if (rc)
2285 			slurm_cred_revoke(conf->vctx, req->job_id, time(NULL),
2286 					  time(NULL));
2287 
2288 		_remove_job_running_prolog(req->job_id);
2289 	} else
2290 		slurm_mutex_unlock(&prolog_mutex);
2291 
2292 	/*
2293 	 * We need the slurmctld to know we are done or we can get into a
2294 	 * situation where nothing from the job will ever launch because the
2295 	 * prolog will never appear to stop running.
2296 	 */
2297 	while (alt_rc != SLURM_SUCCESS) {
2298 		if (!(slurmctld_conf.prolog_flags & PROLOG_FLAG_NOHOLD))
2299 			alt_rc = _notify_slurmctld_prolog_fini(
2300 				req->job_id, rc);
2301 		else
2302 			alt_rc = SLURM_SUCCESS;
2303 
2304 		if (rc != SLURM_SUCCESS) {
2305 			alt_rc = _launch_job_fail(req->job_id, rc);
2306 			send_registration_msg(rc, false);
2307 		}
2308 
2309 		if (alt_rc != SLURM_SUCCESS) {
2310 			info("%s: Retrying prolog complete RPC for JobId=%u [sleeping %us]",
2311 			     __func__, req->job_id, RETRY_DELAY);
2312 			sleep(RETRY_DELAY);
2313 		}
2314 	}
2315 }
2316 
2317 static void
_rpc_batch_job(slurm_msg_t * msg,bool new_msg)2318 _rpc_batch_job(slurm_msg_t *msg, bool new_msg)
2319 {
2320 	batch_job_launch_msg_t *req = (batch_job_launch_msg_t *)msg->data;
2321 	bool     first_job_run;
2322 	int      rc = SLURM_SUCCESS, node_id = 0;
2323 	bool	 replied = false, revoked;
2324 	slurm_addr_t *cli = &msg->orig_addr;
2325 
2326 	if (new_msg) {
2327 		uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2328 		if (!_slurm_authorized_user(req_uid)) {
2329 			error("Security violation, batch launch RPC from uid %d",
2330 			      req_uid);
2331 			rc = ESLURM_USER_ID_MISSING;  /* or bad in this case */
2332 			goto done;
2333 		}
2334 	}
2335 
2336 	if (_launch_job_test(req->job_id)) {
2337 		error("Job %u already running, do not launch second copy",
2338 		      req->job_id);
2339 		rc = ESLURM_DUPLICATE_JOB_ID;	/* job already running */
2340 		_launch_job_fail(req->job_id, rc);
2341 		goto done;
2342 	}
2343 
2344 	slurm_cred_handle_reissue(conf->vctx, req->cred, false);
2345 	if (slurm_cred_revoked(conf->vctx, req->cred)) {
2346 		error("Job %u already killed, do not launch batch job",
2347 		      req->job_id);
2348 		rc = ESLURMD_CREDENTIAL_REVOKED;	/* job already ran */
2349 		goto done;
2350 	}
2351 
2352 	/* lookup user_name if not provided by slurmctld */
2353 	if (!req->user_name)
2354 		req->user_name = uid_to_string(req->uid);
2355 
2356 	/* lookup gids if they weren't sent by slurmctld */
2357 	if (!req->ngids)
2358 		req->ngids = group_cache_lookup(req->uid, req->gid,
2359 						req->user_name, &req->gids);
2360 
2361 	task_g_slurmd_batch_request(req);	/* determine task affinity */
2362 
2363 	slurm_mutex_lock(&prolog_mutex);
2364 	first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
2365 
2366 	/* BlueGene prolog waits for partition boot and is very slow.
2367 	 * On any system we might need to load environment variables
2368 	 * for Moab (see --get-user-env), which could also be slow.
2369 	 * Just reply now and send a separate kill job request if the
2370 	 * prolog or launch fail. */
2371 	replied = true;
2372 	if (new_msg && (slurm_send_rc_msg(msg, rc) < 1)) {
2373 		/* The slurmctld is no longer waiting for a reply.
2374 		 * This typically indicates that the slurmd was
2375 		 * blocked from memory and/or CPUs and the slurmctld
2376 		 * has requeued the batch job request. */
2377 		error("Could not confirm batch launch for job %u, "
2378 		      "aborting request", req->job_id);
2379 		rc = SLURM_COMMUNICATIONS_SEND_ERROR;
2380 		slurm_mutex_unlock(&prolog_mutex);
2381 		goto done;
2382 	}
2383 
2384 	if (slurmctld_conf.prolog_flags & PROLOG_FLAG_ALLOC) {
2385 		struct timespec ts = {0, 0};
2386 		struct timeval now;
2387 		int retry_cnt = 0;
2388 		/*
2389 		 * We want to wait until the rpc_prolog is ran before
2390 		 * continuing. Since we are already locked on prolog_mutex here
2391 		 * we don't have to unlock to wait on the
2392 		 * conf->prolog_running_cond.
2393 		 */
2394 		while (first_job_run) {
2395 			retry_cnt++;
2396 			/*
2397 			 * This race should only happen for at most a second as
2398 			 * we are only waiting for the other rpc to get here.
2399 			 * We should wait here for msg_timeout * 2, in case of
2400 			 * REQUEST_LAUNCH_PROLOG lost in forwarding tree the
2401 			 * direct retry from slurmctld will happen after
2402 			 * MessageTimeout.
2403 			 */
2404 			if (retry_cnt > (slurmctld_conf.msg_timeout * 2)) {
2405 				rc = ESLURMD_PROLOG_FAILED;
2406 				slurm_mutex_unlock(&prolog_mutex);
2407 				error("Waiting for JobId=%u REQUEST_LAUNCH_PROLOG notification failed, giving up after %u sec",
2408 				      req->job_id,
2409 				      slurmctld_conf.msg_timeout * 2);
2410 				goto done;
2411 			}
2412 
2413 			gettimeofday(&now, NULL);
2414 			ts.tv_sec = now.tv_sec + 1;
2415 			ts.tv_nsec = now.tv_usec * 1000;
2416 
2417 			slurm_cond_timedwait(&conf->prolog_running_cond,
2418 					     &prolog_mutex, &ts);
2419 			first_job_run = !slurm_cred_jobid_cached(conf->vctx,
2420 								 req->job_id);
2421 		}
2422 	}
2423 
2424 	/*
2425 	 * Insert jobid into credential context to denote that
2426 	 * we've now "seen" an instance of the job
2427 	 */
2428 	if (first_job_run) {
2429 		job_env_t job_env;
2430 		List job_gres_list, epi_env_gres_list;
2431 		uint32_t jobid;
2432 
2433 		slurm_cred_insert_jobid(conf->vctx, req->job_id);
2434 		_add_job_running_prolog(req->job_id);
2435 		slurm_mutex_unlock(&prolog_mutex);
2436 
2437 #ifndef HAVE_FRONT_END
2438 		/* It is always 0 for front end systems */
2439 		node_id = nodelist_find(req->nodes, conf->node_name);
2440 #endif
2441 		memset(&job_env, 0, sizeof(job_env));
2442 		job_gres_list = (List) slurm_cred_get_arg(req->cred,
2443 							CRED_ARG_JOB_GRES_LIST);
2444 		epi_env_gres_list = gres_plugin_epilog_build_env(job_gres_list,
2445 								 req->nodes);
2446 		gres_plugin_epilog_set_env(&job_env.gres_job_env,
2447 					   epi_env_gres_list, node_id);
2448 		FREE_NULL_LIST(epi_env_gres_list);
2449 		job_env.jobid = req->job_id;
2450 		job_env.step_id = req->step_id;
2451 		job_env.node_list = req->nodes;
2452 		job_env.het_job_id = req->het_job_id;
2453 		job_env.partition = req->partition;
2454 		job_env.spank_job_env = req->spank_job_env;
2455 		job_env.spank_job_env_size = req->spank_job_env_size;
2456 		job_env.uid = req->uid;
2457 		job_env.gid = req->gid;
2458 		job_env.user_name = req->user_name;
2459 		/*
2460 	 	 * Run job prolog on this node
2461 	 	 */
2462 
2463 #ifdef HAVE_NATIVE_CRAY
2464 		if (req->het_job_id && (req->het_job_id != NO_VAL))
2465 			jobid = req->het_job_id;
2466 		else
2467 			jobid = req->job_id;
2468 #else
2469 		jobid = req->job_id;
2470 #endif
2471 
2472 		if ((rc = container_g_create(jobid)))
2473 			error("container_g_create(%u): %m", req->job_id);
2474 		else
2475 			rc = _run_prolog(&job_env, req->cred, true);
2476 		_free_job_env(&job_env);
2477 		if (rc) {
2478 			int term_sig = 0, exit_status = 0;
2479 			if (WIFSIGNALED(rc))
2480 				term_sig    = WTERMSIG(rc);
2481 			else if (WIFEXITED(rc))
2482 				exit_status = WEXITSTATUS(rc);
2483 			error("[job %u] prolog failed status=%d:%d",
2484 			      req->job_id, exit_status, term_sig);
2485 			_prolog_error(req, rc);
2486 			rc = ESLURMD_PROLOG_FAILED;
2487 			goto done;
2488 		}
2489 	} else {
2490 		slurm_mutex_unlock(&prolog_mutex);
2491 		_wait_for_job_running_prolog(req->job_id);
2492 	}
2493 
2494 	if (_get_user_env(req) < 0) {
2495 		bool requeue = _requeue_setup_env_fail();
2496 		if (requeue) {
2497 			rc = ESLURMD_SETUP_ENVIRONMENT_ERROR;
2498 			goto done;
2499 		}
2500 	}
2501 	_set_batch_job_limits(msg);
2502 
2503 	/* Since job could have been killed while the prolog was
2504 	 * running (especially on BlueGene, which can take minutes
2505 	 * for partition booting). Test if the credential has since
2506 	 * been revoked and exit as needed. */
2507 	if (slurm_cred_revoked(conf->vctx, req->cred)) {
2508 		info("Job %u already killed, do not launch batch job",
2509 		     req->job_id);
2510 		rc = ESLURMD_CREDENTIAL_REVOKED;     /* job already ran */
2511 		goto done;
2512 	}
2513 
2514 	slurm_mutex_lock(&launch_mutex);
2515 	info("Launching batch job %u for UID %u", req->job_id, req->uid);
2516 
2517 	debug3("_rpc_batch_job: call to _forkexec_slurmstepd");
2518 	rc = _forkexec_slurmstepd(LAUNCH_BATCH_JOB, (void *)req, cli, NULL,
2519 				  (hostset_t)NULL, SLURM_PROTOCOL_VERSION);
2520 	debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc);
2521 
2522 	slurm_mutex_unlock(&launch_mutex);
2523 	_launch_complete_add(req->job_id);
2524 
2525 	/* On a busy system, slurmstepd may take a while to respond,
2526 	 * if the job was cancelled in the interim, run through the
2527 	 * abort logic below. */
2528 	revoked = slurm_cred_revoked(conf->vctx, req->cred);
2529 	if (revoked)
2530 		_launch_complete_rm(req->job_id);
2531 	if (revoked && _is_batch_job_finished(req->job_id)) {
2532 		/* If configured with select/serial and the batch job already
2533 		 * completed, consider the job successfully launched and do
2534 		 * not repeat termination logic below, which in the worst case
2535 		 * just slows things down with another message. */
2536 		revoked = false;
2537 	}
2538 	if (revoked) {
2539 		info("Job %u killed while launch was in progress",
2540 		     req->job_id);
2541 		sleep(1);	/* give slurmstepd time to create
2542 				 * the communication socket */
2543 		_terminate_all_steps(req->job_id, true);
2544 		rc = ESLURMD_CREDENTIAL_REVOKED;
2545 		goto done;
2546 	}
2547 
2548 done:
2549 	if (!replied) {
2550 		if (new_msg && (slurm_send_rc_msg(msg, rc) < 1)) {
2551 			/* The slurmctld is no longer waiting for a reply.
2552 			 * This typically indicates that the slurmd was
2553 			 * blocked from memory and/or CPUs and the slurmctld
2554 			 * has requeued the batch job request. */
2555 			error("Could not confirm batch launch for job %u, "
2556 			      "aborting request", req->job_id);
2557 			rc = SLURM_COMMUNICATIONS_SEND_ERROR;
2558 		} else {
2559 			/* No need to initiate separate reply below */
2560 			rc = SLURM_SUCCESS;
2561 		}
2562 	}
2563 	if (rc != SLURM_SUCCESS) {
2564 		/* prolog or job launch failure,
2565 		 * tell slurmctld that the job failed */
2566 		if (req->step_id == SLURM_BATCH_SCRIPT)
2567 			_launch_job_fail(req->job_id, rc);
2568 		else
2569 			_abort_step(req->job_id, req->step_id);
2570 	}
2571 
2572 	/*
2573 	 *  If job prolog failed or we could not reply,
2574 	 *  initiate message to slurmctld with current state
2575 	 */
2576 	if ((rc == ESLURMD_PROLOG_FAILED)
2577 	    || (rc == SLURM_COMMUNICATIONS_SEND_ERROR)
2578 	    || (rc == ESLURMD_SETUP_ENVIRONMENT_ERROR)) {
2579 		send_registration_msg(rc, false);
2580 	}
2581 }
2582 
2583 /*
2584  * Send notification message to batch job
2585  */
2586 static void
_rpc_job_notify(slurm_msg_t * msg)2587 _rpc_job_notify(slurm_msg_t *msg)
2588 {
2589 	job_notify_msg_t *req = msg->data;
2590 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2591 	uid_t job_uid;
2592 	List steps;
2593 	ListIterator i;
2594 	step_loc_t *stepd = NULL;
2595 	int step_cnt  = 0;
2596 	int fd;
2597 
2598 	debug("_rpc_job_notify, uid = %d, jobid = %u", req_uid, req->job_id);
2599 	job_uid = _get_job_uid(req->job_id);
2600 	if ((int)job_uid < 0)
2601 		goto no_job;
2602 
2603 	/*
2604 	 * check that requesting user ID is the Slurm UID or root
2605 	 */
2606 	if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) {
2607 		error("Security violation: job_notify(%u) from uid %d",
2608 		      req->job_id, req_uid);
2609 		return;
2610 	}
2611 
2612 	steps = stepd_available(conf->spooldir, conf->node_name);
2613 	i = list_iterator_create(steps);
2614 	while ((stepd = list_next(i))) {
2615 		if ((stepd->jobid  != req->job_id) ||
2616 		    (stepd->stepid != SLURM_BATCH_SCRIPT)) {
2617 			continue;
2618 		}
2619 
2620 		step_cnt++;
2621 
2622 		fd = stepd_connect(stepd->directory, stepd->nodename,
2623 				   stepd->jobid, stepd->stepid,
2624 				   &stepd->protocol_version);
2625 		if (fd == -1) {
2626 			debug3("Unable to connect to step %u.%u",
2627 			       stepd->jobid, stepd->stepid);
2628 			continue;
2629 		}
2630 
2631 		info("send notification to job %u.%u",
2632 		     stepd->jobid, stepd->stepid);
2633 		if (stepd_notify_job(fd, stepd->protocol_version,
2634 				     req->message) < 0)
2635 			debug("notify jobid=%u failed: %m", stepd->jobid);
2636 		close(fd);
2637 	}
2638 	list_iterator_destroy(i);
2639 	FREE_NULL_LIST(steps);
2640 
2641 no_job:
2642 	if (step_cnt == 0) {
2643 		debug2("Can't find jobid %u to send notification message",
2644 		       req->job_id);
2645 	}
2646 }
2647 
2648 static int
_launch_job_fail(uint32_t job_id,uint32_t slurm_rc)2649 _launch_job_fail(uint32_t job_id, uint32_t slurm_rc)
2650 {
2651 	complete_batch_script_msg_t comp_msg = {0};
2652 	struct requeue_msg req_msg = {0};
2653 	slurm_msg_t resp_msg;
2654 	int rc = 0, rpc_rc;
2655 	static time_t config_update = 0;
2656 	static bool requeue_no_hold = false;
2657 
2658 	if (config_update != conf->last_update) {
2659 		char *sched_params = slurm_get_sched_params();
2660 		requeue_no_hold = (xstrcasestr(sched_params,
2661 					       "nohold_on_prolog_fail"));
2662 		xfree(sched_params);
2663 		config_update = conf->last_update;
2664 	}
2665 
2666 	slurm_msg_t_init(&resp_msg);
2667 
2668 	if (slurm_rc == ESLURMD_CREDENTIAL_REVOKED) {
2669 		comp_msg.job_id = job_id;
2670 		comp_msg.job_rc = INFINITE;
2671 		comp_msg.slurm_rc = slurm_rc;
2672 		comp_msg.node_name = conf->node_name;
2673 		comp_msg.jobacct = NULL; /* unused */
2674 		resp_msg.msg_type = REQUEST_COMPLETE_BATCH_SCRIPT;
2675 		resp_msg.data = &comp_msg;
2676 	} else {
2677 		req_msg.job_id = job_id;
2678 		req_msg.job_id_str = NULL;
2679 		if (requeue_no_hold)
2680 			req_msg.flags = JOB_PENDING;
2681 		else
2682 			req_msg.flags = (JOB_REQUEUE_HOLD | JOB_LAUNCH_FAILED);
2683 		resp_msg.msg_type = REQUEST_JOB_REQUEUE;
2684 		resp_msg.data = &req_msg;
2685 	}
2686 
2687 	rpc_rc = slurm_send_recv_controller_rc_msg(&resp_msg, &rc,
2688 						   working_cluster_rec);
2689 	if ((resp_msg.msg_type == REQUEST_JOB_REQUEUE) &&
2690 	    ((rc == ESLURM_DISABLED) || (rc == ESLURM_BATCH_ONLY))) {
2691 		info("Could not launch job %u and not able to requeue it, "
2692 		     "cancelling job", job_id);
2693 
2694 		if ((slurm_rc == ESLURMD_PROLOG_FAILED) &&
2695 		    (rc == ESLURM_BATCH_ONLY)) {
2696 			char *buf = NULL;
2697 			xstrfmtcat(buf, "Prolog failure on node %s",
2698 				   conf->node_name);
2699 			slurm_notify_job(job_id, buf);
2700 			xfree(buf);
2701 		}
2702 
2703 		comp_msg.job_id = job_id;
2704 		comp_msg.job_rc = INFINITE;
2705 		comp_msg.slurm_rc = slurm_rc;
2706 		comp_msg.node_name = conf->node_name;
2707 		comp_msg.jobacct = NULL; /* unused */
2708 		resp_msg.msg_type = REQUEST_COMPLETE_BATCH_SCRIPT;
2709 		resp_msg.data = &comp_msg;
2710 		rpc_rc = slurm_send_recv_controller_rc_msg(&resp_msg, &rc,
2711 							   working_cluster_rec);
2712 	}
2713 
2714 	return rpc_rc;
2715 }
2716 
2717 static int
_abort_step(uint32_t job_id,uint32_t step_id)2718 _abort_step(uint32_t job_id, uint32_t step_id)
2719 {
2720 	step_complete_msg_t resp;
2721 	slurm_msg_t resp_msg;
2722 	slurm_msg_t_init(&resp_msg);
2723 	int rc, rc2;
2724 
2725 	memset(&resp, 0, sizeof(resp));
2726 	resp.job_id       = job_id;
2727 	resp.job_step_id  = step_id;
2728 	resp.range_first  = 0;
2729 	resp.range_last   = 0;
2730 	resp.step_rc      = 1;
2731 	resp.jobacct      = jobacctinfo_create(NULL);
2732 	resp_msg.msg_type = REQUEST_STEP_COMPLETE;
2733 	resp_msg.data     = &resp;
2734 	rc2 = slurm_send_recv_controller_rc_msg(&resp_msg, &rc,
2735 						working_cluster_rec);
2736 	/* Note: we are ignoring the RPC return code */
2737 	jobacctinfo_destroy(resp.jobacct);
2738 	return rc2;
2739 }
2740 
2741 static void
_rpc_reconfig(slurm_msg_t * msg)2742 _rpc_reconfig(slurm_msg_t *msg)
2743 {
2744 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2745 
2746 	if (!_slurm_authorized_user(req_uid))
2747 		error("Security violation, reconfig RPC from uid %d",
2748 		      req_uid);
2749 	else
2750 		kill(conf->pid, SIGHUP);
2751 	forward_wait(msg);
2752 	/* Never return a message, slurmctld does not expect one */
2753 }
2754 
_rpc_reconfig_with_config(slurm_msg_t * msg)2755 static void _rpc_reconfig_with_config(slurm_msg_t *msg)
2756 {
2757 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2758 
2759 	if (!_slurm_authorized_user(req_uid))
2760 		error("Security violation, reconfig RPC from uid %d",
2761 		      req_uid);
2762 	else {
2763 		if (conf->conf_cache) {
2764 			config_response_msg_t *configs =
2765 				(config_response_msg_t *) msg->data;
2766 			/*
2767 			 * Running in "configless" mode as indicated by the
2768 			 * cache directory's existance. Update those so
2769 			 * our reconfigure picks up the changes, and so
2770 			 * client commands see the changes as well.
2771 			 */
2772 			write_configs_to_conf_cache(configs, conf->conf_cache);
2773 		}
2774 		kill(conf->pid, SIGHUP);
2775 	}
2776 	forward_wait(msg);
2777 	/* Never return a message, slurmctld does not expect one */
2778 }
2779 
2780 static void
_rpc_shutdown(slurm_msg_t * msg)2781 _rpc_shutdown(slurm_msg_t *msg)
2782 {
2783 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2784 
2785 	forward_wait(msg);
2786 	if (!_slurm_authorized_user(req_uid))
2787 		error("Security violation, shutdown RPC from uid %d",
2788 		      req_uid);
2789 	else {
2790 		if (kill(conf->pid, SIGTERM) != 0)
2791 			error("kill(%u,SIGTERM): %m", conf->pid);
2792 	}
2793 
2794 	/* Never return a message, slurmctld does not expect one */
2795 }
2796 
2797 static void
_rpc_reboot(slurm_msg_t * msg)2798 _rpc_reboot(slurm_msg_t *msg)
2799 {
2800 	char *reboot_program, *cmd = NULL, *sp;
2801 	reboot_msg_t *reboot_msg;
2802 	slurm_ctl_conf_t *cfg;
2803 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2804 	int exit_code;
2805 
2806 	if (!_slurm_authorized_user(req_uid))
2807 		error("Security violation, reboot RPC from uid %d",
2808 		      req_uid);
2809 	else {
2810 		cfg = slurm_conf_lock();
2811 		reboot_program = cfg->reboot_program;
2812 		if (reboot_program) {
2813 			sp = strchr(reboot_program, ' ');
2814 			if (sp)
2815 				sp = xstrndup(reboot_program,
2816 					      (sp - reboot_program));
2817 			else
2818 				sp = xstrdup(reboot_program);
2819 			reboot_msg = (reboot_msg_t *) msg->data;
2820 			if (reboot_msg && reboot_msg->features) {
2821 				/*
2822 				 * Run reboot_program with only arguments given
2823 				 * in reboot_msg->features.
2824 				 */
2825 				info("Node reboot request with features %s being processed",
2826 				     reboot_msg->features);
2827 				(void) node_features_g_node_set(
2828 					reboot_msg->features);
2829 				if (reboot_msg->features[0]) {
2830 					xstrfmtcat(cmd, "%s %s",
2831 						   sp, reboot_msg->features);
2832 				} else {
2833 					cmd = xstrdup(sp);
2834 				}
2835 			} else {
2836 				/* Run reboot_program verbatim */
2837 				cmd = xstrdup(reboot_program);
2838 				info("Node reboot request being processed");
2839 			}
2840 			if (access(sp, R_OK | X_OK) < 0)
2841 				error("Cannot run RebootProgram [%s]: %m", sp);
2842 			else if ((exit_code = system(cmd)))
2843 				error("system(%s) returned %d", reboot_program,
2844 				      exit_code);
2845 			xfree(sp);
2846 			xfree(cmd);
2847 
2848 			/*
2849 			 * Explicitly shutdown the slurmd. This is usually
2850 			 * taken care of by calling reboot_program, but in
2851 			 * case that fails to shut things down this will at
2852 			 * least offline this node until someone intervenes.
2853 			 */
2854 			if (xstrcasestr(cfg->slurmd_params,
2855 					"shutdown_on_reboot"))
2856 				slurmd_shutdown(SIGTERM);
2857 		} else
2858 			error("RebootProgram isn't defined in config");
2859 		slurm_conf_unlock();
2860 	}
2861 
2862 	/* Never return a message, slurmctld does not expect one */
2863 	/* slurm_send_rc_msg(msg, rc); */
2864 }
2865 
_job_limits_match(void * x,void * key)2866 static int _job_limits_match(void *x, void *key)
2867 {
2868 	job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x;
2869 	uint32_t *job_id = (uint32_t *) key;
2870 	if (job_limits_ptr->job_id == *job_id)
2871 		return 1;
2872 	return 0;
2873 }
2874 
_step_limits_match(void * x,void * key)2875 static int _step_limits_match(void *x, void *key)
2876 {
2877 	job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x;
2878 	step_loc_t *step_ptr = (step_loc_t *) key;
2879 
2880 	if ((job_limits_ptr->job_id  == step_ptr->jobid) &&
2881 	    (job_limits_ptr->step_id == step_ptr->stepid))
2882 		return 1;
2883 	return 0;
2884 }
2885 
2886 /* Call only with job_limits_mutex locked */
2887 static void
_load_job_limits(void)2888 _load_job_limits(void)
2889 {
2890 	List steps;
2891 	ListIterator step_iter;
2892 	step_loc_t *stepd;
2893 	int fd;
2894 	job_mem_limits_t *job_limits_ptr;
2895 	slurmstepd_mem_info_t stepd_mem_info;
2896 
2897 	if (!job_limits_list)
2898 		job_limits_list = list_create(xfree_ptr);
2899 	job_limits_loaded = true;
2900 
2901 	steps = stepd_available(conf->spooldir, conf->node_name);
2902 	step_iter = list_iterator_create(steps);
2903 	while ((stepd = list_next(step_iter))) {
2904 		job_limits_ptr = list_find_first(job_limits_list,
2905 						 _step_limits_match, stepd);
2906 		if (job_limits_ptr)	/* already processed */
2907 			continue;
2908 		fd = stepd_connect(stepd->directory, stepd->nodename,
2909 				   stepd->jobid, stepd->stepid,
2910 				   &stepd->protocol_version);
2911 		if (fd == -1)
2912 			continue;	/* step completed */
2913 
2914 		if (stepd_get_mem_limits(fd, stepd->protocol_version,
2915 					 &stepd_mem_info) != SLURM_SUCCESS) {
2916 			error("Error reading step %u.%u memory limits from "
2917 			      "slurmstepd",
2918 			      stepd->jobid, stepd->stepid);
2919 			close(fd);
2920 			continue;
2921 		}
2922 
2923 
2924 		if ((stepd_mem_info.job_mem_limit
2925 		     || stepd_mem_info.step_mem_limit)) {
2926 			/* create entry for this job */
2927 			job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
2928 			job_limits_ptr->job_id   = stepd->jobid;
2929 			job_limits_ptr->step_id  = stepd->stepid;
2930 			job_limits_ptr->job_mem  =
2931 				stepd_mem_info.job_mem_limit;
2932 			job_limits_ptr->step_mem =
2933 				stepd_mem_info.step_mem_limit;
2934 #if _LIMIT_INFO
2935 			info("RecLim step:%u.%u job_mem:%"PRIu64""
2936 			     " step_mem:%"PRIu64"",
2937 			     job_limits_ptr->job_id, job_limits_ptr->step_id,
2938 			     job_limits_ptr->job_mem,
2939 			     job_limits_ptr->step_mem);
2940 #endif
2941 			list_append(job_limits_list, job_limits_ptr);
2942 		}
2943 		close(fd);
2944 	}
2945 	list_iterator_destroy(step_iter);
2946 	FREE_NULL_LIST(steps);
2947 }
2948 
2949 static void
_cancel_step_mem_limit(uint32_t job_id,uint32_t step_id)2950 _cancel_step_mem_limit(uint32_t job_id, uint32_t step_id)
2951 {
2952 	slurm_msg_t msg;
2953 	job_notify_msg_t notify_req;
2954 	job_step_kill_msg_t kill_req;
2955 
2956 	/* NOTE: Batch jobs may have no srun to get this message */
2957 	slurm_msg_t_init(&msg);
2958 	memset(&notify_req, 0, sizeof(notify_req));
2959 	notify_req.job_id      = job_id;
2960 	notify_req.job_step_id = step_id;
2961 	notify_req.message     = "Exceeded job memory limit";
2962 	msg.msg_type    = REQUEST_JOB_NOTIFY;
2963 	msg.data        = &notify_req;
2964 	slurm_send_only_controller_msg(&msg, working_cluster_rec);
2965 
2966 	memset(&kill_req, 0, sizeof(kill_req));
2967 	kill_req.job_id      = job_id;
2968 	kill_req.job_step_id = step_id;
2969 	kill_req.signal      = SIGKILL;
2970 	kill_req.flags       = KILL_OOM;
2971 	msg.msg_type    = REQUEST_CANCEL_JOB_STEP;
2972 	msg.data        = &kill_req;
2973 	slurm_send_only_controller_msg(&msg, working_cluster_rec);
2974 }
2975 
2976 /* Enforce job memory limits here in slurmd. Step memory limits are
2977  * enforced within slurmstepd (using jobacct_gather plugin). */
2978 static void
_enforce_job_mem_limit(void)2979 _enforce_job_mem_limit(void)
2980 {
2981 	List steps;
2982 	ListIterator step_iter, job_limits_iter;
2983 	job_mem_limits_t *job_limits_ptr;
2984 	step_loc_t *stepd;
2985 	int fd, i, job_inx, job_cnt;
2986 	uint16_t vsize_factor;
2987 	uint64_t step_rss, step_vsize;
2988 	job_step_id_msg_t acct_req;
2989 	job_step_stat_t *resp = NULL;
2990 	struct job_mem_info {
2991 		uint32_t job_id;
2992 		uint64_t mem_limit;	/* MB */
2993 		uint64_t mem_used;	/* MB */
2994 		uint64_t vsize_limit;	/* MB */
2995 		uint64_t vsize_used;	/* MB */
2996 	};
2997 	struct job_mem_info *job_mem_info_ptr = NULL;
2998 
2999 	if (conf->job_acct_oom_kill == false)
3000 		return;
3001 
3002 	slurm_mutex_lock(&job_limits_mutex);
3003 	if (!job_limits_loaded)
3004 		_load_job_limits();
3005 	if (list_count(job_limits_list) == 0) {
3006 		slurm_mutex_unlock(&job_limits_mutex);
3007 		return;
3008 	}
3009 
3010 	/* Build table of job limits, use highest mem limit recorded */
3011 	job_mem_info_ptr = xmalloc((list_count(job_limits_list) + 1) *
3012 				   sizeof(struct job_mem_info));
3013 	job_cnt = 0;
3014 	job_limits_iter = list_iterator_create(job_limits_list);
3015 	while ((job_limits_ptr = list_next(job_limits_iter))) {
3016 		if (job_limits_ptr->job_mem == 0) 	/* no job limit */
3017 			continue;
3018 		for (i=0; i<job_cnt; i++) {
3019 			if (job_mem_info_ptr[i].job_id !=
3020 			    job_limits_ptr->job_id)
3021 				continue;
3022 			job_mem_info_ptr[i].mem_limit = MAX(
3023 				job_mem_info_ptr[i].mem_limit,
3024 				job_limits_ptr->job_mem);
3025 			break;
3026 		}
3027 		if (i < job_cnt)	/* job already found & recorded */
3028 			continue;
3029 		job_mem_info_ptr[job_cnt].job_id    = job_limits_ptr->job_id;
3030 		job_mem_info_ptr[job_cnt].mem_limit = job_limits_ptr->job_mem;
3031 		job_cnt++;
3032 	}
3033 	list_iterator_destroy(job_limits_iter);
3034 	slurm_mutex_unlock(&job_limits_mutex);
3035 
3036 	vsize_factor = slurm_get_vsize_factor();
3037 	for (i=0; i<job_cnt; i++) {
3038 		job_mem_info_ptr[i].vsize_limit = job_mem_info_ptr[i].
3039 			mem_limit;
3040 		job_mem_info_ptr[i].vsize_limit *= (vsize_factor / 100.0);
3041 	}
3042 
3043 	steps = stepd_available(conf->spooldir, conf->node_name);
3044 	step_iter = list_iterator_create(steps);
3045 	while ((stepd = list_next(step_iter))) {
3046 		for (job_inx=0; job_inx<job_cnt; job_inx++) {
3047 			if (job_mem_info_ptr[job_inx].job_id == stepd->jobid)
3048 				break;
3049 		}
3050 		if (job_inx >= job_cnt)
3051 			continue;	/* job/step not being tracked */
3052 
3053 		fd = stepd_connect(stepd->directory, stepd->nodename,
3054 				   stepd->jobid, stepd->stepid,
3055 				   &stepd->protocol_version);
3056 		if (fd == -1)
3057 			continue;	/* step completed */
3058 		acct_req.job_id  = stepd->jobid;
3059 		acct_req.step_id = stepd->stepid;
3060 		resp = xmalloc(sizeof(job_step_stat_t));
3061 
3062 		if ((!stepd_stat_jobacct(
3063 			     fd, stepd->protocol_version,
3064 			     &acct_req, resp)) &&
3065 		    (resp->jobacct)) {
3066 			/* resp->jobacct is NULL if account is disabled */
3067 			jobacctinfo_getinfo((struct jobacctinfo *)
3068 					    resp->jobacct,
3069 					    JOBACCT_DATA_TOT_RSS,
3070 					    &step_rss,
3071 					    stepd->protocol_version);
3072 			jobacctinfo_getinfo((struct jobacctinfo *)
3073 					    resp->jobacct,
3074 					    JOBACCT_DATA_TOT_VSIZE,
3075 					    &step_vsize,
3076 					    stepd->protocol_version);
3077 #if _LIMIT_INFO
3078 			info("Step:%u.%u RSS:%"PRIu64" B VSIZE:%"PRIu64" B",
3079 			     stepd->jobid, stepd->stepid,
3080 			     step_rss, step_vsize);
3081 #endif
3082 			if (step_rss != INFINITE64) {
3083 				step_rss /= 1048576;	/* B to MB */
3084 				step_rss = MAX(step_rss, 1);
3085 				job_mem_info_ptr[job_inx].mem_used += step_rss;
3086 			}
3087 			if (step_vsize != INFINITE64) {
3088 				step_vsize /= 1048576;	/* B to MB */
3089 				step_vsize = MAX(step_vsize, 1);
3090 				job_mem_info_ptr[job_inx].vsize_used +=
3091 					step_vsize;
3092 			}
3093 		}
3094 		slurm_free_job_step_stat(resp);
3095 		close(fd);
3096 	}
3097 	list_iterator_destroy(step_iter);
3098 	FREE_NULL_LIST(steps);
3099 
3100 	for (i=0; i<job_cnt; i++) {
3101 		if (job_mem_info_ptr[i].mem_used == 0) {
3102 			/* no steps found,
3103 			 * purge records for all steps of this job */
3104 			slurm_mutex_lock(&job_limits_mutex);
3105 			list_delete_all(job_limits_list, _job_limits_match,
3106 					&job_mem_info_ptr[i].job_id);
3107 			slurm_mutex_unlock(&job_limits_mutex);
3108 			break;
3109 		}
3110 
3111 		if ((job_mem_info_ptr[i].mem_limit != 0) &&
3112 		    (job_mem_info_ptr[i].mem_used >
3113 		     job_mem_info_ptr[i].mem_limit)) {
3114 			info("Job %u exceeded memory limit "
3115 			     "(%"PRIu64">%"PRIu64"), cancelling it",
3116 			     job_mem_info_ptr[i].job_id,
3117 			     job_mem_info_ptr[i].mem_used,
3118 			     job_mem_info_ptr[i].mem_limit);
3119 			_cancel_step_mem_limit(job_mem_info_ptr[i].job_id,
3120 					       NO_VAL);
3121 		} else if ((job_mem_info_ptr[i].vsize_limit != 0) &&
3122 			   (job_mem_info_ptr[i].vsize_used >
3123 			    job_mem_info_ptr[i].vsize_limit)) {
3124 			info("Job %u exceeded virtual memory limit "
3125 			     "(%"PRIu64">%"PRIu64"), cancelling it",
3126 			     job_mem_info_ptr[i].job_id,
3127 			     job_mem_info_ptr[i].vsize_used,
3128 			     job_mem_info_ptr[i].vsize_limit);
3129 			_cancel_step_mem_limit(job_mem_info_ptr[i].job_id,
3130 					       NO_VAL);
3131 		}
3132 	}
3133 	xfree(job_mem_info_ptr);
3134 }
3135 
3136 static int
_rpc_ping(slurm_msg_t * msg)3137 _rpc_ping(slurm_msg_t *msg)
3138 {
3139 	int        rc = SLURM_SUCCESS;
3140 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3141 	static bool first_msg = true;
3142 
3143 	if (!_slurm_authorized_user(req_uid)) {
3144 		error("Security violation, ping RPC from uid %d",
3145 		      req_uid);
3146 		if (first_msg) {
3147 			error("Do you have SlurmUser configured as uid %d?",
3148 			      req_uid);
3149 		}
3150 		rc = ESLURM_USER_ID_MISSING;	/* or bad in this case */
3151 	}
3152 	first_msg = false;
3153 
3154 	if (rc != SLURM_SUCCESS) {
3155 		/* Return result. If the reply can't be sent this indicates
3156 		 * 1. The network is broken OR
3157 		 * 2. slurmctld has died    OR
3158 		 * 3. slurmd was paged out due to full memory
3159 		 * If the reply request fails, we send an registration message
3160 		 * to slurmctld in hopes of avoiding having the node set DOWN
3161 		 * due to slurmd paging and not being able to respond in a
3162 		 * timely fashion. */
3163 		if (slurm_send_rc_msg(msg, rc) < 0) {
3164 			error("Error responding to ping: %m");
3165 			send_registration_msg(SLURM_SUCCESS, false);
3166 		}
3167 	} else {
3168 		slurm_msg_t resp_msg;
3169 		ping_slurmd_resp_msg_t ping_resp;
3170 		get_cpu_load(&ping_resp.cpu_load);
3171 		get_free_mem(&ping_resp.free_mem);
3172 		slurm_msg_t_copy(&resp_msg, msg);
3173 		resp_msg.msg_type = RESPONSE_PING_SLURMD;
3174 		resp_msg.data     = &ping_resp;
3175 
3176 		slurm_send_node_msg(msg->conn_fd, &resp_msg);
3177 	}
3178 
3179 	/* Take this opportunity to enforce any job memory limits */
3180 	_enforce_job_mem_limit();
3181 	/* Clear up any stalled file transfers as well */
3182 	_file_bcast_cleanup();
3183 	return rc;
3184 }
3185 
3186 static int
_rpc_health_check(slurm_msg_t * msg)3187 _rpc_health_check(slurm_msg_t *msg)
3188 {
3189 	int        rc = SLURM_SUCCESS;
3190 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3191 
3192 	if (!_slurm_authorized_user(req_uid)) {
3193 		error("Security violation, health check RPC from uid %d",
3194 		      req_uid);
3195 		rc = ESLURM_USER_ID_MISSING;	/* or bad in this case */
3196 	}
3197 
3198 	/* Return result. If the reply can't be sent this indicates that
3199 	 * 1. The network is broken OR
3200 	 * 2. slurmctld has died    OR
3201 	 * 3. slurmd was paged out due to full memory
3202 	 * If the reply request fails, we send an registration message to
3203 	 * slurmctld in hopes of avoiding having the node set DOWN due to
3204 	 * slurmd paging and not being able to respond in a timely fashion. */
3205 	if (slurm_send_rc_msg(msg, rc) < 0) {
3206 		error("Error responding to health check: %m");
3207 		send_registration_msg(SLURM_SUCCESS, false);
3208 	}
3209 
3210 	if (rc == SLURM_SUCCESS)
3211 		rc = run_script_health_check();
3212 
3213 	/* Take this opportunity to enforce any job memory limits */
3214 	_enforce_job_mem_limit();
3215 	/* Clear up any stalled file transfers as well */
3216 	_file_bcast_cleanup();
3217 	return rc;
3218 }
3219 
3220 
3221 static int
_rpc_acct_gather_update(slurm_msg_t * msg)3222 _rpc_acct_gather_update(slurm_msg_t *msg)
3223 {
3224 	int        rc = SLURM_SUCCESS;
3225 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3226 	static bool first_msg = true;
3227 
3228 	if (!_slurm_authorized_user(req_uid)) {
3229 		error("Security violation, acct_gather_update RPC from uid %d",
3230 		      req_uid);
3231 		if (first_msg) {
3232 			error("Do you have SlurmUser configured as uid %d?",
3233 			      req_uid);
3234 		}
3235 		rc = ESLURM_USER_ID_MISSING;	/* or bad in this case */
3236 	}
3237 	first_msg = false;
3238 
3239 	if (rc != SLURM_SUCCESS) {
3240 		/* Return result. If the reply can't be sent this indicates
3241 		 * 1. The network is broken OR
3242 		 * 2. slurmctld has died    OR
3243 		 * 3. slurmd was paged out due to full memory
3244 		 * If the reply request fails, we send an registration message
3245 		 * to slurmctld in hopes of avoiding having the node set DOWN
3246 		 * due to slurmd paging and not being able to respond in a
3247 		 * timely fashion. */
3248 		if (slurm_send_rc_msg(msg, rc) < 0) {
3249 			error("Error responding to account gather: %m");
3250 			send_registration_msg(SLURM_SUCCESS, false);
3251 		}
3252 	} else {
3253 		slurm_msg_t resp_msg;
3254 		acct_gather_node_resp_msg_t acct_msg;
3255 
3256 		/* Update node energy usage data */
3257 		acct_gather_energy_g_update_node_energy();
3258 
3259 		memset(&acct_msg, 0, sizeof(acct_msg));
3260 		acct_msg.node_name = conf->node_name;
3261 		acct_msg.sensor_cnt = 1;
3262 		acct_msg.energy = acct_gather_energy_alloc(acct_msg.sensor_cnt);
3263 		(void) acct_gather_energy_g_get_sum(
3264 			ENERGY_DATA_NODE_ENERGY, acct_msg.energy);
3265 
3266 		slurm_msg_t_copy(&resp_msg, msg);
3267 		resp_msg.msg_type = RESPONSE_ACCT_GATHER_UPDATE;
3268 		resp_msg.data     = &acct_msg;
3269 
3270 		slurm_send_node_msg(msg->conn_fd, &resp_msg);
3271 
3272 		acct_gather_energy_destroy(acct_msg.energy);
3273 	}
3274 	return rc;
3275 }
3276 
3277 static int
_rpc_acct_gather_energy(slurm_msg_t * msg)3278 _rpc_acct_gather_energy(slurm_msg_t *msg)
3279 {
3280 	int        rc = SLURM_SUCCESS;
3281 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3282 	static bool first_msg = true;
3283 
3284 	if (!_slurm_authorized_user(req_uid)) {
3285 		error("Security violation, acct_gather_update RPC from uid %d",
3286 		      req_uid);
3287 		if (first_msg) {
3288 			error("Do you have SlurmUser configured as uid %d?",
3289 			      req_uid);
3290 		}
3291 		rc = ESLURM_USER_ID_MISSING;	/* or bad in this case */
3292 	}
3293 	first_msg = false;
3294 
3295 	if (rc != SLURM_SUCCESS) {
3296 		if (slurm_send_rc_msg(msg, rc) < 0)
3297 			error("Error responding to energy request: %m");
3298 	} else {
3299 		slurm_msg_t resp_msg;
3300 		acct_gather_node_resp_msg_t acct_msg;
3301 		time_t now = time(NULL), last_poll = 0;
3302 		int data_type = ENERGY_DATA_STRUCT;
3303 		uint16_t sensor_cnt;
3304 		acct_gather_energy_req_msg_t *req = msg->data;
3305 
3306 		if (req->context_id == NO_VAL16) {
3307 			rc = SLURM_PROTOCOL_VERSION_ERROR;
3308 			if (slurm_send_rc_msg(msg, rc) < 0)
3309 				error("Error responding to energy request: %m");
3310 			return rc;
3311 		}
3312 
3313 		acct_gather_energy_g_get_data(req->context_id,
3314 					      ENERGY_DATA_LAST_POLL,
3315 					      &last_poll);
3316 		acct_gather_energy_g_get_data(req->context_id,
3317 					      ENERGY_DATA_SENSOR_CNT,
3318 					      &sensor_cnt);
3319 
3320 		/* If we polled later than delta seconds then force a
3321 		   new poll.
3322 		*/
3323 		if ((now - last_poll) > req->delta)
3324 			data_type = ENERGY_DATA_JOULES_TASK;
3325 
3326 		memset(&acct_msg, 0, sizeof(acct_msg));
3327 		acct_msg.sensor_cnt = sensor_cnt;
3328 		acct_msg.energy = acct_gather_energy_alloc(acct_msg.sensor_cnt);
3329 
3330 		acct_gather_energy_g_get_data(req->context_id,
3331 					      data_type,
3332 					      acct_msg.energy);
3333 
3334 		slurm_msg_t_copy(&resp_msg, msg);
3335 		resp_msg.msg_type = RESPONSE_ACCT_GATHER_ENERGY;
3336 		resp_msg.data     = &acct_msg;
3337 
3338 		slurm_send_node_msg(msg->conn_fd, &resp_msg);
3339 
3340 		acct_gather_energy_destroy(acct_msg.energy);
3341 	}
3342 	return rc;
3343 }
3344 
3345 static int
_signal_jobstep(uint32_t jobid,uint32_t stepid,uint16_t signal,uint16_t flags,uid_t req_uid)3346 _signal_jobstep(uint32_t jobid, uint32_t stepid, uint16_t signal,
3347 		uint16_t flags, uid_t req_uid)
3348 {
3349 	int fd, rc = SLURM_SUCCESS;
3350 	uint16_t protocol_version;
3351 
3352 	/*
3353 	 * There will be no stepd if the prolog is still running
3354 	 * Return failure so caller can retry.
3355 	 */
3356 	if (_prolog_is_running(jobid)) {
3357 		info("signal %d req for %u.%u while prolog is running."
3358 		     " Returning failure.", signal, jobid, stepid);
3359 		return ESLURM_TRANSITION_STATE_NO_UPDATE;
3360 	}
3361 
3362 	fd = stepd_connect(conf->spooldir, conf->node_name, jobid, stepid,
3363 			   &protocol_version);
3364 	if (fd == -1) {
3365 		debug("signal for nonexistent %u.%u stepd_connect failed: %m",
3366 		      jobid, stepid);
3367 		return ESLURM_INVALID_JOB_ID;
3368 	}
3369 
3370 	debug2("container signal %d to job %u.%u", signal, jobid, stepid);
3371 	rc = stepd_signal_container(fd, protocol_version, signal, flags,
3372 				    req_uid);
3373 	if (rc == -1)
3374 		rc = ESLURMD_JOB_NOTRUNNING;
3375 
3376 	close(fd);
3377 	return rc;
3378 }
3379 
3380 static void
_rpc_signal_tasks(slurm_msg_t * msg)3381 _rpc_signal_tasks(slurm_msg_t *msg)
3382 {
3383 	int               rc = SLURM_SUCCESS;
3384 	signal_tasks_msg_t *req = (signal_tasks_msg_t *) msg->data;
3385 	uid_t job_uid;
3386 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3387 
3388 	job_uid = _get_job_uid(req->job_id);
3389 	if ((int)job_uid < 0) {
3390 		debug("%s: failed to get job_uid for job %u",
3391 		      __func__, req->job_id);
3392 		rc = ESLURM_INVALID_JOB_ID;
3393 		goto done;
3394 	}
3395 
3396 	if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) {
3397 		debug("%s: from uid %ld for job %u owned by uid %ld",
3398 		      __func__, (long)req_uid, req->job_id, (long)job_uid);
3399 		rc = ESLURM_USER_ID_MISSING;     /* or bad in this case */
3400 		goto done;
3401 	}
3402 
3403 	/* security is handled when communicating with the stepd */
3404 	if ((req->flags & KILL_FULL_JOB) || (req->flags & KILL_JOB_BATCH)) {
3405 		debug("%s: sending signal %u to entire job %u flag %u",
3406 		      __func__, req->signal, req->job_id, req->flags);
3407 		_kill_all_active_steps(req->job_id, req->signal,
3408 				       req->flags, true, req_uid);
3409 	} else if (req->flags & KILL_STEPS_ONLY) {
3410 		debug("%s: sending signal %u to all steps job %u flag %u",
3411 		      __func__, req->signal, req->job_id, req->flags);
3412 		_kill_all_active_steps(req->job_id, req->signal,
3413 				       req->flags, false, req_uid);
3414 	} else {
3415 		debug("%s: sending signal %u to step %u.%u flag %u", __func__,
3416 		      req->signal, req->job_id, req->job_step_id, req->flags);
3417 		rc = _signal_jobstep(req->job_id, req->job_step_id,
3418 				     req->signal, req->flags, req_uid);
3419 	}
3420 done:
3421 	slurm_send_rc_msg(msg, rc);
3422 }
3423 
3424 static void
_rpc_terminate_tasks(slurm_msg_t * msg)3425 _rpc_terminate_tasks(slurm_msg_t *msg)
3426 {
3427 	signal_tasks_msg_t *req = (signal_tasks_msg_t *) msg->data;
3428 	int               rc = SLURM_SUCCESS;
3429 	int               fd;
3430 	uint16_t protocol_version;
3431 	uid_t uid;
3432 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3433 
3434 	debug3("Entering _rpc_terminate_tasks");
3435 	fd = stepd_connect(conf->spooldir, conf->node_name,
3436 			   req->job_id, req->job_step_id, &protocol_version);
3437 	if (fd == -1) {
3438 		debug("kill for nonexistent job %u.%u stepd_connect "
3439 		      "failed: %m", req->job_id, req->job_step_id);
3440 		rc = ESLURM_INVALID_JOB_ID;
3441 		goto done;
3442 	}
3443 
3444 	if ((int)(uid = stepd_get_uid(fd, protocol_version)) < 0) {
3445 		debug("terminate_tasks couldn't read from the step %u.%u: %m",
3446 		      req->job_id, req->job_step_id);
3447 		rc = ESLURM_INVALID_JOB_ID;
3448 		goto done2;
3449 	}
3450 
3451 	if ((req_uid != uid)
3452 	    && (!_slurm_authorized_user(req_uid))) {
3453 		debug("kill req from uid %ld for job %u.%u owned by uid %ld",
3454 		      (long) req_uid, req->job_id, req->job_step_id,
3455 		      (long) uid);
3456 		rc = ESLURM_USER_ID_MISSING;     /* or bad in this case */
3457 		goto done2;
3458 	}
3459 
3460 	rc = stepd_terminate(fd, protocol_version);
3461 	if (rc == -1)
3462 		rc = ESLURMD_JOB_NOTRUNNING;
3463 
3464 done2:
3465 	close(fd);
3466 done:
3467 	slurm_send_rc_msg(msg, rc);
3468 }
3469 
3470 static int
_rpc_step_complete(slurm_msg_t * msg)3471 _rpc_step_complete(slurm_msg_t *msg)
3472 {
3473 	step_complete_msg_t *req = (step_complete_msg_t *)msg->data;
3474 	int               rc = SLURM_SUCCESS;
3475 	int               fd;
3476 	uint16_t protocol_version;
3477 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3478 
3479 	debug3("Entering _rpc_step_complete");
3480 	fd = stepd_connect(conf->spooldir, conf->node_name,
3481 			   req->job_id, req->job_step_id, &protocol_version);
3482 	if (fd == -1) {
3483 		error("stepd_connect to %u.%u failed: %m",
3484 		      req->job_id, req->job_step_id);
3485 		rc = ESLURM_INVALID_JOB_ID;
3486 		goto done;
3487 	}
3488 
3489 	/* step completion messages are only allowed from other slurmstepd,
3490 	   so only root or SlurmUser is allowed here */
3491 	if (!_slurm_authorized_user(req_uid)) {
3492 		debug("step completion from uid %ld for job %u.%u",
3493 		      (long) req_uid, req->job_id, req->job_step_id);
3494 		rc = ESLURM_USER_ID_MISSING;     /* or bad in this case */
3495 		goto done2;
3496 	}
3497 
3498 	rc = stepd_completion(fd, protocol_version, req);
3499 	if (rc == -1)
3500 		rc = ESLURMD_JOB_NOTRUNNING;
3501 
3502 done2:
3503 	close(fd);
3504 done:
3505 	slurm_send_rc_msg(msg, rc);
3506 
3507 	return rc;
3508 }
3509 
_setup_step_complete_msg(slurm_msg_t * msg,void * data)3510 static void _setup_step_complete_msg(slurm_msg_t *msg, void *data)
3511 {
3512 	slurm_msg_t_init(msg);
3513 	msg->msg_type = REQUEST_STEP_COMPLETE;
3514 	msg->data = data;
3515 }
3516 
3517 /* This step_complete RPC came from slurmstepd because we are using
3518  * message aggregation configured and we are at the head of the tree.
3519  * This just adds the message to the list and goes on it's merry way. */
3520 static int
_rpc_step_complete_aggr(slurm_msg_t * msg)3521 _rpc_step_complete_aggr(slurm_msg_t *msg)
3522 {
3523 	int rc;
3524 	uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
3525 
3526 	if (!_slurm_authorized_user(uid)) {
3527 		error("Security violation: step_complete_aggr from uid %d",
3528 		      uid);
3529 		if (msg->conn_fd >= 0)
3530 			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3531 		return SLURM_ERROR;
3532 	}
3533 
3534 	if (conf->msg_aggr_window_msgs > 1) {
3535 		slurm_msg_t *req = xmalloc_nz(sizeof(slurm_msg_t));
3536 		_setup_step_complete_msg(req, msg->data);
3537 		msg->data = NULL;
3538 
3539 		msg_aggr_add_msg(req, 1, NULL);
3540 	} else {
3541 		slurm_msg_t req;
3542 		_setup_step_complete_msg(&req, msg->data);
3543 
3544 		while (slurm_send_recv_controller_rc_msg(
3545 			       &req, &rc, working_cluster_rec) < 0) {
3546 			error("Unable to send step complete, "
3547 			      "trying again in a minute: %m");
3548 		}
3549 	}
3550 
3551 	/* Finish communication with the stepd, we have to wait for
3552 	 * the message back from the slurmctld or we will cause a race
3553 	 * condition with srun.
3554 	 */
3555 	slurm_send_rc_msg(msg, SLURM_SUCCESS);
3556 
3557 	return SLURM_SUCCESS;
3558 }
3559 
3560 /* Get list of active jobs and steps, xfree returned value */
3561 static char *
_get_step_list(void)3562 _get_step_list(void)
3563 {
3564 	char tmp[64];
3565 	char *step_list = NULL;
3566 	List steps;
3567 	ListIterator i;
3568 	step_loc_t *stepd;
3569 
3570 	steps = stepd_available(conf->spooldir, conf->node_name);
3571 	i = list_iterator_create(steps);
3572 	while ((stepd = list_next(i))) {
3573 		int fd;
3574 		fd = stepd_connect(stepd->directory, stepd->nodename,
3575 				   stepd->jobid, stepd->stepid,
3576 				   &stepd->protocol_version);
3577 		if (fd == -1)
3578 			continue;
3579 
3580 		if (stepd_state(fd, stepd->protocol_version)
3581 		    == SLURMSTEPD_NOT_RUNNING) {
3582 			debug("stale domain socket for stepd %u.%u ",
3583 			      stepd->jobid, stepd->stepid);
3584 			close(fd);
3585 			continue;
3586 		}
3587 		close(fd);
3588 
3589 		if (step_list)
3590 			xstrcat(step_list, ", ");
3591 		if (stepd->stepid == NO_VAL) {
3592 			snprintf(tmp, sizeof(tmp), "%u",
3593 				 stepd->jobid);
3594 			xstrcat(step_list, tmp);
3595 		} else {
3596 			snprintf(tmp, sizeof(tmp), "%u.%u",
3597 				 stepd->jobid, stepd->stepid);
3598 			xstrcat(step_list, tmp);
3599 		}
3600 	}
3601 	list_iterator_destroy(i);
3602 	FREE_NULL_LIST(steps);
3603 
3604 	if (step_list == NULL)
3605 		xstrcat(step_list, "NONE");
3606 	return step_list;
3607 }
3608 
3609 static int
_rpc_daemon_status(slurm_msg_t * msg)3610 _rpc_daemon_status(slurm_msg_t *msg)
3611 {
3612 	slurm_msg_t      resp_msg;
3613 	slurmd_status_t *resp = NULL;
3614 
3615 	resp = xmalloc(sizeof(slurmd_status_t));
3616 	resp->actual_cpus        = conf->actual_cpus;
3617 	resp->actual_boards      = conf->actual_boards;
3618 	resp->actual_sockets     = conf->actual_sockets;
3619 	resp->actual_cores       = conf->actual_cores;
3620 	resp->actual_threads     = conf->actual_threads;
3621 	resp->actual_real_mem    = conf->real_memory_size;
3622 	resp->actual_tmp_disk    = conf->tmp_disk_space;
3623 	resp->booted             = startup;
3624 	resp->hostname           = xstrdup(conf->node_name);
3625 	resp->step_list          = _get_step_list();
3626 	resp->last_slurmctld_msg = last_slurmctld_msg;
3627 	resp->pid                = conf->pid;
3628 	resp->slurmd_debug       = conf->debug_level;
3629 	resp->slurmd_logfile     = xstrdup(conf->logfile);
3630 	resp->version            = xstrdup(SLURM_VERSION_STRING);
3631 
3632 	slurm_msg_t_copy(&resp_msg, msg);
3633 	resp_msg.msg_type = RESPONSE_SLURMD_STATUS;
3634 	resp_msg.data     = resp;
3635 	slurm_send_node_msg(msg->conn_fd, &resp_msg);
3636 	slurm_free_slurmd_status(resp);
3637 	return SLURM_SUCCESS;
3638 }
3639 
3640 static int
_rpc_stat_jobacct(slurm_msg_t * msg)3641 _rpc_stat_jobacct(slurm_msg_t *msg)
3642 {
3643 	job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data;
3644 	slurm_msg_t        resp_msg;
3645 	job_step_stat_t *resp = NULL;
3646 	int fd;
3647 	uint16_t protocol_version;
3648 	uid_t uid;
3649 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3650 
3651 	debug3("Entering _rpc_stat_jobacct");
3652 	/* step completion messages are only allowed from other slurmstepd,
3653 	   so only root or SlurmUser is allowed here */
3654 
3655 	fd = stepd_connect(conf->spooldir, conf->node_name,
3656 			   req->job_id, req->step_id, &protocol_version);
3657 	if (fd == -1) {
3658 		error("stepd_connect to %u.%u failed: %m",
3659 		      req->job_id, req->step_id);
3660 		slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3661 		return	ESLURM_INVALID_JOB_ID;
3662 	}
3663 
3664 	if ((int)(uid = stepd_get_uid(fd, protocol_version)) < 0) {
3665 		debug("stat_jobacct couldn't read from the step %u.%u: %m",
3666 		      req->job_id, req->step_id);
3667 		close(fd);
3668 		if (msg->conn_fd >= 0)
3669 			slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3670 		return	ESLURM_INVALID_JOB_ID;
3671 	}
3672 
3673 	/*
3674 	 * check that requesting user ID is the Slurm UID or root
3675 	 */
3676 	if ((req_uid != uid) && (!_slurm_authorized_user(req_uid))) {
3677 		error("stat_jobacct from uid %ld for job %u "
3678 		      "owned by uid %ld",
3679 		      (long) req_uid, req->job_id, (long) uid);
3680 
3681 		if (msg->conn_fd >= 0) {
3682 			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3683 			close(fd);
3684 			return ESLURM_USER_ID_MISSING;/* or bad in this case */
3685 		}
3686 	}
3687 
3688 	resp = xmalloc(sizeof(job_step_stat_t));
3689 	resp->step_pids = xmalloc(sizeof(job_step_pids_t));
3690 	resp->step_pids->node_name = xstrdup(conf->node_name);
3691 	slurm_msg_t_copy(&resp_msg, msg);
3692 	resp->return_code = SLURM_SUCCESS;
3693 
3694 	if (stepd_stat_jobacct(fd, protocol_version, req, resp)
3695 	    == SLURM_ERROR) {
3696 		debug("accounting for nonexistent job %u.%u requested",
3697 		      req->job_id, req->step_id);
3698 	}
3699 
3700 	/* FIX ME: This should probably happen in the
3701 	   stepd_stat_jobacct to get more information about the pids.
3702 	*/
3703 	if (stepd_list_pids(fd, protocol_version, &resp->step_pids->pid,
3704 			    &resp->step_pids->pid_cnt) == SLURM_ERROR) {
3705 		debug("No pids for nonexistent job %u.%u requested",
3706 		      req->job_id, req->step_id);
3707 	}
3708 
3709 	close(fd);
3710 
3711 	resp_msg.msg_type     = RESPONSE_JOB_STEP_STAT;
3712 	resp_msg.data         = resp;
3713 
3714 	slurm_send_node_msg(msg->conn_fd, &resp_msg);
3715 	slurm_free_job_step_stat(resp);
3716 	return SLURM_SUCCESS;
3717 }
3718 
3719 static int
_callerid_find_job(callerid_conn_t conn,uint32_t * job_id)3720 _callerid_find_job(callerid_conn_t conn, uint32_t *job_id)
3721 {
3722 	ino_t inode;
3723 	pid_t pid;
3724 	int rc;
3725 
3726 	rc = callerid_find_inode_by_conn(conn, &inode);
3727 	if (rc != SLURM_SUCCESS) {
3728 		debug3("network_callerid inode not found");
3729 		return ESLURM_INVALID_JOB_ID;
3730 	}
3731 	debug3("network_callerid found inode %lu", (long unsigned int)inode);
3732 
3733 	rc = find_pid_by_inode(&pid, inode);
3734 	if (rc != SLURM_SUCCESS) {
3735 		debug3("network_callerid process not found");
3736 		return ESLURM_INVALID_JOB_ID;
3737 	}
3738 	debug3("network_callerid found process %d", (pid_t)pid);
3739 
3740 	rc = slurm_pid2jobid(pid, job_id);
3741 	if (rc != SLURM_SUCCESS) {
3742 		debug3("network_callerid job not found");
3743 		return ESLURM_INVALID_JOB_ID;
3744 	}
3745 	debug3("network_callerid found job %u", *job_id);
3746 	return SLURM_SUCCESS;
3747 }
3748 
3749 static int
_rpc_network_callerid(slurm_msg_t * msg)3750 _rpc_network_callerid(slurm_msg_t *msg)
3751 {
3752 	network_callerid_msg_t *req = (network_callerid_msg_t *)msg->data;
3753 	slurm_msg_t resp_msg;
3754 	network_callerid_resp_t *resp = NULL;
3755 
3756 	uid_t job_uid = -1;
3757 	uint32_t job_id = NO_VAL;
3758 	callerid_conn_t conn;
3759 	int rc = ESLURM_INVALID_JOB_ID;
3760 	char ip_src_str[INET6_ADDRSTRLEN];
3761 	char ip_dst_str[INET6_ADDRSTRLEN];
3762 
3763 	debug3("Entering _rpc_network_callerid");
3764 
3765 	resp = xmalloc(sizeof(network_callerid_resp_t));
3766 	slurm_msg_t_copy(&resp_msg, msg);
3767 
3768 	/* Ideally this would be in an if block only when debug3 is enabled */
3769 	inet_ntop(req->af, req->ip_src, ip_src_str, INET6_ADDRSTRLEN);
3770 	inet_ntop(req->af, req->ip_dst, ip_dst_str, INET6_ADDRSTRLEN);
3771 	debug3("network_callerid checking %s:%u => %s:%u",
3772 	       ip_src_str, req->port_src, ip_dst_str, req->port_dst);
3773 
3774 	/* My remote is the other's source */
3775 	memcpy((void*)&conn.ip_dst, (void*)&req->ip_src, 16);
3776 	memcpy((void*)&conn.ip_src, (void*)&req->ip_dst, 16);
3777 	conn.port_src = req->port_dst;
3778 	conn.port_dst = req->port_src;
3779 	conn.af = req->af;
3780 
3781 	/* Find the job id */
3782 	rc = _callerid_find_job(conn, &job_id);
3783 	if (rc == SLURM_SUCCESS) {
3784 		/* We found the job */
3785 		uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3786 		if (!_slurm_authorized_user(req_uid)) {
3787 			/* Requestor is not root or SlurmUser */
3788 			job_uid = _get_job_uid(job_id);
3789 			if (job_uid != req_uid) {
3790 				/* RPC call sent by non-root user who does not
3791 				 * own this job. Do not send them the job ID. */
3792 				error("Security violation, REQUEST_NETWORK_CALLERID from uid=%d",
3793 				      req_uid);
3794 				job_id = NO_VAL;
3795 				rc = ESLURM_INVALID_JOB_ID;
3796 			}
3797 		}
3798 	}
3799 
3800 	resp->job_id = job_id;
3801 	resp->node_name = xstrdup(conf->node_name);
3802 
3803 	resp_msg.msg_type = RESPONSE_NETWORK_CALLERID;
3804 	resp_msg.data     = resp;
3805 
3806 	slurm_send_node_msg(msg->conn_fd, &resp_msg);
3807 	slurm_free_network_callerid_resp(resp);
3808 	return rc;
3809 }
3810 
3811 static int
_rpc_list_pids(slurm_msg_t * msg)3812 _rpc_list_pids(slurm_msg_t *msg)
3813 {
3814 	job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data;
3815 	slurm_msg_t        resp_msg;
3816 	job_step_pids_t *resp = NULL;
3817 	int fd;
3818 	uint16_t protocol_version = 0;
3819 	uid_t job_uid;
3820 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3821 
3822 	debug3("Entering _rpc_list_pids");
3823 
3824 	job_uid = _get_job_uid(req->job_id);
3825 
3826 	if ((int)job_uid < 0) {
3827 		error("stat_pid for invalid job_id: %u",
3828 		      req->job_id);
3829 		if (msg->conn_fd >= 0)
3830 			slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3831 		return  ESLURM_INVALID_JOB_ID;
3832 	}
3833 
3834 	/*
3835 	 * check that requesting user ID is the Slurm UID or root
3836 	 */
3837 	if ((req_uid != job_uid)
3838 	    && (!_slurm_authorized_user(req_uid))) {
3839 		error("stat_pid from uid %ld for job %u "
3840 		      "owned by uid %ld",
3841 		      (long) req_uid, req->job_id, (long) job_uid);
3842 
3843 		if (msg->conn_fd >= 0) {
3844 			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3845 			return ESLURM_USER_ID_MISSING;/* or bad in this case */
3846 		}
3847 	}
3848 
3849 	resp = xmalloc(sizeof(job_step_pids_t));
3850 	slurm_msg_t_copy(&resp_msg, msg);
3851 	resp->node_name = xstrdup(conf->node_name);
3852 	resp->pid_cnt = 0;
3853 	resp->pid = NULL;
3854 	fd = stepd_connect(conf->spooldir, conf->node_name,
3855 			   req->job_id, req->step_id, &protocol_version);
3856 	if (fd == -1) {
3857 		error("stepd_connect to %u.%u failed: %m",
3858 		      req->job_id, req->step_id);
3859 		slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3860 		slurm_free_job_step_pids(resp);
3861 		return  ESLURM_INVALID_JOB_ID;
3862 
3863 	}
3864 
3865 	if (stepd_list_pids(fd, protocol_version,
3866 			    &resp->pid, &resp->pid_cnt) == SLURM_ERROR) {
3867 		debug("No pids for nonexistent job %u.%u requested",
3868 		      req->job_id, req->step_id);
3869 	}
3870 
3871 	close(fd);
3872 
3873 	resp_msg.msg_type = RESPONSE_JOB_STEP_PIDS;
3874 	resp_msg.data     = resp;
3875 
3876 	slurm_send_node_msg(msg->conn_fd, &resp_msg);
3877 	slurm_free_job_step_pids(resp);
3878 	return SLURM_SUCCESS;
3879 }
3880 
3881 /*
3882  *  For the specified job_id: reply to slurmctld,
3883  *   sleep(configured kill_wait), then send SIGKILL
3884  */
3885 static void
_rpc_timelimit(slurm_msg_t * msg)3886 _rpc_timelimit(slurm_msg_t *msg)
3887 {
3888 	uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
3889 	kill_job_msg_t *req = msg->data;
3890 	int             nsteps, rc;
3891 
3892 	if (!_slurm_authorized_user(uid)) {
3893 		error ("Security violation: rpc_timelimit req from uid %d",
3894 		       uid);
3895 		slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3896 		return;
3897 	}
3898 
3899 	/*
3900 	 *  Indicate to slurmctld that we've received the message
3901 	 */
3902 	slurm_send_rc_msg(msg, SLURM_SUCCESS);
3903 	close(msg->conn_fd);
3904 	msg->conn_fd = -1;
3905 
3906 	if (req->step_id != NO_VAL) {
3907 		slurm_ctl_conf_t *cf;
3908 		int delay;
3909 		/* A jobstep has timed out:
3910 		 * - send the container a SIG_TIME_LIMIT or SIG_PREEMPTED
3911 		 *   to log the event
3912 		 * - send a SIGCONT to resume any suspended tasks
3913 		 * - send a SIGTERM to begin termination
3914 		 * - sleep KILL_WAIT
3915 		 * - send a SIGKILL to clean up
3916 		 */
3917 		if (msg->msg_type == REQUEST_KILL_TIMELIMIT) {
3918 			rc = _signal_jobstep(req->job_id, req->step_id,
3919 					     SIG_TIME_LIMIT, 0, uid);
3920 		} else {
3921 			rc = _signal_jobstep(req->job_id, req->step_id,
3922 					     SIG_PREEMPTED, 0, uid);
3923 		}
3924 		if (rc != SLURM_SUCCESS)
3925 			return;
3926 		rc = _signal_jobstep(req->job_id, req->step_id, SIGCONT, 0,
3927 				     uid);
3928 		if (rc != SLURM_SUCCESS)
3929 			return;
3930 		rc = _signal_jobstep(req->job_id, req->step_id, SIGTERM, 0,
3931 				     uid);
3932 		if (rc != SLURM_SUCCESS)
3933 			return;
3934 		cf = slurm_conf_lock();
3935 		delay = MAX(cf->kill_wait, 5);
3936 		slurm_conf_unlock();
3937 		sleep(delay);
3938 		_signal_jobstep(req->job_id, req->step_id, SIGKILL, 0, uid);
3939 		return;
3940 	}
3941 
3942 	if (msg->msg_type == REQUEST_KILL_TIMELIMIT)
3943 		_kill_all_active_steps(req->job_id, SIG_TIME_LIMIT, 0, true,
3944 				       uid);
3945 	else /* (msg->type == REQUEST_KILL_PREEMPTED) */
3946 		_kill_all_active_steps(req->job_id, SIG_PREEMPTED, 0, true,
3947 				       uid);
3948 	nsteps = _kill_all_active_steps(req->job_id, SIGTERM, 0, false, uid);
3949 	verbose("Job %u: timeout: sent SIGTERM to %d active steps",
3950 		req->job_id, nsteps);
3951 
3952 	/* Revoke credential, send SIGKILL, run epilog, etc. */
3953 	_rpc_terminate_job(msg);
3954 }
3955 
_rpc_pid2jid(slurm_msg_t * msg)3956 static void  _rpc_pid2jid(slurm_msg_t *msg)
3957 {
3958 	job_id_request_msg_t *req = (job_id_request_msg_t *) msg->data;
3959 	slurm_msg_t           resp_msg;
3960 	job_id_response_msg_t resp;
3961 	bool         found = false;
3962 	List         steps;
3963 	ListIterator i;
3964 	step_loc_t *stepd;
3965 
3966 	steps = stepd_available(conf->spooldir, conf->node_name);
3967 	i = list_iterator_create(steps);
3968 	while ((stepd = list_next(i))) {
3969 		int fd;
3970 		fd = stepd_connect(stepd->directory, stepd->nodename,
3971 				   stepd->jobid, stepd->stepid,
3972 				   &stepd->protocol_version);
3973 		if (fd == -1)
3974 			continue;
3975 
3976 		if (stepd_pid_in_container(
3977 			    fd, stepd->protocol_version,
3978 			    req->job_pid)
3979 		    || req->job_pid == stepd_daemon_pid(
3980 			    fd, stepd->protocol_version)) {
3981 			slurm_msg_t_copy(&resp_msg, msg);
3982 			resp.job_id = stepd->jobid;
3983 			resp.return_code = SLURM_SUCCESS;
3984 			found = true;
3985 			close(fd);
3986 			break;
3987 		}
3988 		close(fd);
3989 	}
3990 	list_iterator_destroy(i);
3991 	FREE_NULL_LIST(steps);
3992 
3993 	if (found) {
3994 		debug3("_rpc_pid2jid: pid(%u) found in %u",
3995 		       req->job_pid, resp.job_id);
3996 		resp_msg.address      = msg->address;
3997 		resp_msg.msg_type     = RESPONSE_JOB_ID;
3998 		resp_msg.data         = &resp;
3999 
4000 		slurm_send_node_msg(msg->conn_fd, &resp_msg);
4001 	} else {
4002 		debug3("_rpc_pid2jid: pid(%u) not found", req->job_pid);
4003 		slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
4004 	}
4005 }
4006 
4007 /* Validate sbcast credential.
4008  * NOTE: We can only perform the full credential validation once with
4009  * Munge without generating a credential replay error
4010  * RET an sbcast credential or NULL on error.
4011  * free with sbcast_cred_arg_free()
4012  */
_valid_sbcast_cred(file_bcast_msg_t * req,uid_t req_uid,gid_t req_gid,uint16_t protocol_version)4013 static sbcast_cred_arg_t *_valid_sbcast_cred(file_bcast_msg_t *req,
4014 					     uid_t req_uid,
4015 					     gid_t req_gid,
4016 					     uint16_t protocol_version)
4017 {
4018 	sbcast_cred_arg_t *arg;
4019 	hostset_t hset = NULL;
4020 
4021 	arg = extract_sbcast_cred(conf->vctx, req->cred, req->block_no,
4022 				  protocol_version);
4023 	if (!arg) {
4024 		error("Security violation: Invalid sbcast_cred from uid %u",
4025 		      req_uid);
4026 		return NULL;
4027 	}
4028 
4029 	if (!(hset = hostset_create(arg->nodes))) {
4030 		error("Unable to parse sbcast_cred hostlist %s", arg->nodes);
4031 		sbcast_cred_arg_free(arg);
4032 		return NULL;
4033 	} else if (!hostset_within(hset, conf->node_name)) {
4034 		error("Security violation: sbcast_cred from %u has "
4035 		      "bad hostset %s", req_uid, arg->nodes);
4036 		sbcast_cred_arg_free(arg);
4037 		hostset_destroy(hset);
4038 		return NULL;
4039 	}
4040 	hostset_destroy(hset);
4041 
4042 	/* fill in the credential with any missing data */
4043 	if (arg->uid == NO_VAL)
4044 		arg->uid = req_uid;
4045 	if (arg->gid == NO_VAL)
4046 		arg->gid = req_gid;
4047 	if ((arg->uid != req_uid) || (arg->gid != req_gid)) {
4048 		error("Security violation: sbcast cred from %u/%u but rpc from %u/%u",
4049 		      arg->uid, arg->gid, req_uid, req_gid);
4050 		sbcast_cred_arg_free(arg);
4051 		return NULL;
4052 	}
4053 
4054 	/*
4055 	 * NOTE: user_name, ngids, gids may still be NULL, 0, NULL at this point.
4056 	 *       we skip filling them in here to avoid excessive lookup calls
4057 	 *       as this must run once per block (and there may be thousands of
4058 	 *       blocks), and is only currently needed by the first block.
4059 	 */
4060 	/* print_sbcast_cred(req->cred); */
4061 
4062 	return arg;
4063 }
4064 
_fb_rdlock(void)4065 static void _fb_rdlock(void)
4066 {
4067 	slurm_mutex_lock(&file_bcast_mutex);
4068 	while (1) {
4069 		if ((fb_write_wait_lock == 0) && (fb_write_lock == 0)) {
4070 			fb_read_lock++;
4071 			break;
4072 		} else {	/* wait for state change and retry */
4073 			slurm_cond_wait(&file_bcast_cond, &file_bcast_mutex);
4074 		}
4075 	}
4076 	slurm_mutex_unlock(&file_bcast_mutex);
4077 }
4078 
_fb_rdunlock(void)4079 static void _fb_rdunlock(void)
4080 {
4081 	slurm_mutex_lock(&file_bcast_mutex);
4082 	fb_read_lock--;
4083 	slurm_cond_broadcast(&file_bcast_cond);
4084 	slurm_mutex_unlock(&file_bcast_mutex);
4085 }
4086 
_fb_wrlock(void)4087 static void _fb_wrlock(void)
4088 {
4089 	slurm_mutex_lock(&file_bcast_mutex);
4090 	fb_write_wait_lock++;
4091 	while (1) {
4092 		if ((fb_read_lock == 0) && (fb_write_lock == 0)) {
4093 			fb_write_lock++;
4094 			fb_write_wait_lock--;
4095 			break;
4096 		} else {	/* wait for state change and retry */
4097 			slurm_cond_wait(&file_bcast_cond, &file_bcast_mutex);
4098 		}
4099 	}
4100 	slurm_mutex_unlock(&file_bcast_mutex);
4101 }
4102 
_fb_wrunlock(void)4103 static void _fb_wrunlock(void)
4104 {
4105 	slurm_mutex_lock(&file_bcast_mutex);
4106 	fb_write_lock--;
4107 	slurm_cond_broadcast(&file_bcast_cond);
4108 	slurm_mutex_unlock(&file_bcast_mutex);
4109 }
4110 
_bcast_find_in_list(void * x,void * y)4111 static int _bcast_find_in_list(void *x, void *y)
4112 {
4113 	file_bcast_info_t *info = (file_bcast_info_t *)x;
4114 	file_bcast_info_t *key = (file_bcast_info_t *)y;
4115 	/* uid, job_id, and fname must match */
4116 	return ((info->uid == key->uid)
4117 		&& (info->job_id == key->job_id)
4118 		&& (!xstrcmp(info->fname, key->fname)));
4119 }
4120 
4121 /* must have read lock */
_bcast_lookup_file(file_bcast_info_t * key)4122 static file_bcast_info_t *_bcast_lookup_file(file_bcast_info_t *key)
4123 {
4124 	return list_find_first(file_bcast_list, _bcast_find_in_list, key);
4125 }
4126 
4127 /* must not have read lock, will get write lock */
_file_bcast_close_file(file_bcast_info_t * key)4128 static void _file_bcast_close_file(file_bcast_info_t *key)
4129 {
4130 	_fb_wrlock();
4131 	list_delete_all(file_bcast_list, _bcast_find_in_list, key);
4132 	_fb_wrunlock();
4133 }
4134 
_free_file_bcast_info_t(void * arg)4135 static void _free_file_bcast_info_t(void *arg)
4136 {
4137 	file_bcast_info_t *f = (file_bcast_info_t *)arg;
4138 
4139 	if (!f)
4140 		return;
4141 
4142 	xfree(f->fname);
4143 	if (f->fd)
4144 		close(f->fd);
4145 	xfree(f);
4146 }
4147 
_bcast_find_in_list_to_remove(void * x,void * y)4148 static int _bcast_find_in_list_to_remove(void *x, void *y)
4149 {
4150 	file_bcast_info_t *f = (file_bcast_info_t *)x;
4151 	time_t *now = (time_t *) y;
4152 
4153 	if (f->last_update + FILE_BCAST_TIMEOUT < *now) {
4154 		error("Removing stalled file_bcast transfer from uid "
4155 		      "%u to file `%s`", f->uid, f->fname);
4156 		return true;
4157 	}
4158 
4159 	return false;
4160 }
4161 
4162 /* remove transfers that have stalled */
_file_bcast_cleanup(void)4163 static void _file_bcast_cleanup(void)
4164 {
4165 	time_t now = time(NULL);
4166 
4167 	_fb_wrlock();
4168 	list_delete_all(file_bcast_list, _bcast_find_in_list_to_remove, &now);
4169 	_fb_wrunlock();
4170 }
4171 
file_bcast_init(void)4172 void file_bcast_init(void)
4173 {
4174 	/* skip locks during slurmd init */
4175 	file_bcast_list = list_create(_free_file_bcast_info_t);
4176 }
4177 
file_bcast_purge(void)4178 void file_bcast_purge(void)
4179 {
4180 	_fb_wrlock();
4181 	list_destroy(file_bcast_list);
4182 	/* destroying list before exit, no need to unlock */
4183 }
4184 
_rpc_file_bcast(slurm_msg_t * msg)4185 static int _rpc_file_bcast(slurm_msg_t *msg)
4186 {
4187 	int rc;
4188 	int64_t offset, inx;
4189 	sbcast_cred_arg_t *cred_arg;
4190 	file_bcast_info_t *file_info;
4191 	file_bcast_msg_t *req = msg->data;
4192 	file_bcast_info_t key;
4193 
4194 	key.uid = g_slurm_auth_get_uid(msg->auth_cred);
4195 	key.gid = g_slurm_auth_get_gid(msg->auth_cred);
4196 	key.fname = req->fname;
4197 
4198 	cred_arg = _valid_sbcast_cred(req, key.uid, key.gid,
4199 				      msg->protocol_version);
4200 	if (!cred_arg)
4201 		return ESLURMD_INVALID_JOB_CREDENTIAL;
4202 
4203 #ifdef HAVE_NATIVE_CRAY
4204 	if (cred_arg->het_job_id && (cred_arg->het_job_id != NO_VAL))
4205 		key.job_id = cred_arg->het_job_id;
4206 	else
4207 		key.job_id = cred_arg->job_id;
4208 #else
4209 	key.job_id = cred_arg->job_id;
4210 #endif
4211 
4212 #if 0
4213 	info("last_block=%u force=%u modes=%o",
4214 	     req->last_block, req->force, req->modes);
4215 	info("uid=%u gid=%u atime=%lu mtime=%lu block_len[0]=%u",
4216 	     req->uid, req->gid, req->atime, req->mtime, req->block_len);
4217 #if 0
4218 	/* when the file being transferred is binary, the following line
4219 	 * can break the terminal output for slurmd */
4220 	info("req->block[0]=%s, @ %lu", \
4221 	     req->block[0], (unsigned long) &req->block);
4222 #endif
4223 #endif
4224 
4225 	if (req->block_no == 1) {
4226 		info("sbcast req_uid=%u job_id=%u fname=%s block_no=%u",
4227 		     key.uid, key.job_id, key.fname, req->block_no);
4228 	} else {
4229 		debug("sbcast req_uid=%u job_id=%u fname=%s block_no=%u",
4230 		      key.uid, key.job_id, key.fname, req->block_no);
4231 	}
4232 
4233 	/* first block must register the file and open fd/mmap */
4234 	if (req->block_no == 1) {
4235 		if ((rc = _file_bcast_register_file(msg, cred_arg, &key))) {
4236 			sbcast_cred_arg_free(cred_arg);
4237 			return rc;
4238 		}
4239 	}
4240 	sbcast_cred_arg_free(cred_arg);
4241 
4242 	_fb_rdlock();
4243 	if (!(file_info = _bcast_lookup_file(&key))) {
4244 		error("No registered file transfer for uid %u file `%s`.",
4245 		      key.uid, key.fname);
4246 		_fb_rdunlock();
4247 		return SLURM_ERROR;
4248 	}
4249 
4250 	/* now decompress file */
4251 	if (bcast_decompress_data(req) < 0) {
4252 		error("sbcast: data decompression error for UID %u, file %s",
4253 		      key.uid, key.fname);
4254 		_fb_rdunlock();
4255 		return SLURM_ERROR;
4256 	}
4257 
4258 	offset = 0;
4259 	while (req->block_len - offset) {
4260 		inx = write(file_info->fd, &req->block[offset],
4261 			    (req->block_len - offset));
4262 		if (inx == -1) {
4263 			if ((errno == EINTR) || (errno == EAGAIN))
4264 				continue;
4265 			error("sbcast: uid:%u can't write `%s`: %m",
4266 			      key.uid, key.fname);
4267 			_fb_rdunlock();
4268 			return SLURM_ERROR;
4269 		}
4270 		offset += inx;
4271 	}
4272 
4273 	file_info->last_update = time(NULL);
4274 
4275 	if (req->last_block && fchmod(file_info->fd, (req->modes & 0777))) {
4276 		error("sbcast: uid:%u can't chmod `%s`: %m",
4277 		      key.uid, key.fname);
4278 	}
4279 	if (req->last_block && fchown(file_info->fd, key.uid, key.gid)) {
4280 		error("sbcast: uid:%u gid:%u can't chown `%s`: %m",
4281 		      key.uid, key.gid, key.fname);
4282 	}
4283 	if (req->last_block && req->atime) {
4284 		struct utimbuf time_buf;
4285 		time_buf.actime  = req->atime;
4286 		time_buf.modtime = req->mtime;
4287 		if (utime(key.fname, &time_buf)) {
4288 			error("sbcast: uid:%u can't utime `%s`: %m",
4289 			      key.uid, key.fname);
4290 		}
4291 	}
4292 
4293 	_fb_rdunlock();
4294 
4295 	if (req->last_block) {
4296 		_file_bcast_close_file(&key);
4297 	}
4298 	return SLURM_SUCCESS;
4299 }
4300 
_file_bcast_register_file(slurm_msg_t * msg,sbcast_cred_arg_t * cred_arg,file_bcast_info_t * key)4301 static int _file_bcast_register_file(slurm_msg_t *msg,
4302 				     sbcast_cred_arg_t *cred_arg,
4303 				     file_bcast_info_t *key)
4304 {
4305 	file_bcast_msg_t *req = msg->data;
4306 	int fd, flags;
4307 	file_bcast_info_t *file_info;
4308 
4309 	/* may still be unset in credential */
4310 	if (!cred_arg->ngids || !cred_arg->gids)
4311 		cred_arg->ngids = group_cache_lookup(key->uid, key->gid,
4312 						     cred_arg->user_name,
4313 						     &cred_arg->gids);
4314 
4315 	flags = O_WRONLY | O_CREAT;
4316 	if (req->force)
4317 		flags |= O_TRUNC;
4318 	else
4319 		flags |= O_EXCL;
4320 
4321 	if ((fd = _open_as_other(req->fname, flags, 0700,
4322 				 key->job_id, key->uid, key->gid,
4323 				 cred_arg->ngids, cred_arg->gids)) == -1) {
4324 		error("Unable to open %s: Permission denied", req->fname);
4325 		return SLURM_ERROR;
4326 	}
4327 
4328 	file_info = xmalloc(sizeof(file_bcast_info_t));
4329 	file_info->fd = fd;
4330 	file_info->fname = xstrdup(req->fname);
4331 	file_info->uid = key->uid;
4332 	file_info->gid = key->gid;
4333 	file_info->job_id = key->job_id;
4334 	file_info->last_update = file_info->start_time = time(NULL);
4335 
4336 	//TODO: mmap the file here
4337 	_fb_wrlock();
4338 	list_append(file_bcast_list, file_info);
4339 	_fb_wrunlock();
4340 
4341 	return SLURM_SUCCESS;
4342 }
4343 
4344 static void
_rpc_reattach_tasks(slurm_msg_t * msg)4345 _rpc_reattach_tasks(slurm_msg_t *msg)
4346 {
4347 	reattach_tasks_request_msg_t  *req = msg->data;
4348 	reattach_tasks_response_msg_t *resp =
4349 		xmalloc(sizeof(reattach_tasks_response_msg_t));
4350 	slurm_msg_t                    resp_msg;
4351 	int          rc   = SLURM_SUCCESS;
4352 	uint16_t     port = 0;
4353 	char         host[MAXHOSTNAMELEN];
4354 	slurm_addr_t   ioaddr;
4355 	void        *job_cred_sig;
4356 	uint32_t     len;
4357 	int               fd;
4358 	slurm_addr_t *cli = &msg->orig_addr;
4359 	uint32_t nodeid = NO_VAL;
4360 	uid_t uid = -1;
4361 	uint16_t protocol_version;
4362 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
4363 
4364 	slurm_msg_t_copy(&resp_msg, msg);
4365 	fd = stepd_connect(conf->spooldir, conf->node_name,
4366 			   req->job_id, req->job_step_id, &protocol_version);
4367 	if (fd == -1) {
4368 		debug("reattach for nonexistent job %u.%u stepd_connect"
4369 		      " failed: %m", req->job_id, req->job_step_id);
4370 		rc = ESLURM_INVALID_JOB_ID;
4371 		goto done;
4372 	}
4373 
4374 	if ((int)(uid = stepd_get_uid(fd, protocol_version)) < 0) {
4375 		debug("_rpc_reattach_tasks couldn't read from the "
4376 		      "step %u.%u: %m",
4377 		      req->job_id, req->job_step_id);
4378 		rc = ESLURM_INVALID_JOB_ID;
4379 		goto done2;
4380 	}
4381 
4382 	nodeid = stepd_get_nodeid(fd, protocol_version);
4383 
4384 	debug2("_rpc_reattach_tasks: nodeid %d in the job step", nodeid);
4385 
4386 	if ((req_uid != uid) && (!_slurm_authorized_user(req_uid))) {
4387 		error("uid %ld attempt to attach to job %u.%u owned by %ld",
4388 		      (long) req_uid, req->job_id, req->job_step_id,
4389 		      (long) uid);
4390 		rc = EPERM;
4391 		goto done2;
4392 	}
4393 
4394 	memset(resp, 0, sizeof(reattach_tasks_response_msg_t));
4395 	slurm_get_ip_str(cli, &port, host, sizeof(host));
4396 
4397 	/*
4398 	 * Set response address by resp_port and client address
4399 	 */
4400 	memcpy(&resp_msg.address, cli, sizeof(slurm_addr_t));
4401 	if (req->num_resp_port > 0) {
4402 		port = req->resp_port[nodeid % req->num_resp_port];
4403 		slurm_set_addr(&resp_msg.address, port, NULL);
4404 	}
4405 
4406 	/*
4407 	 * Set IO address by io_port and client address
4408 	 */
4409 	memcpy(&ioaddr, cli, sizeof(slurm_addr_t));
4410 
4411 	if (req->num_io_port > 0) {
4412 		port = req->io_port[nodeid % req->num_io_port];
4413 		slurm_set_addr(&ioaddr, port, NULL);
4414 	}
4415 
4416 	/*
4417 	 * Get the signature of the job credential.  slurmstepd will need
4418 	 * this to prove its identity when it connects back to srun.
4419 	 */
4420 	slurm_cred_get_signature(req->cred, (char **)(&job_cred_sig), &len);
4421 	if (len != SLURM_IO_KEY_SIZE) {
4422 		error("Incorrect slurm cred signature length");
4423 		goto done2;
4424 	}
4425 
4426 	resp->gtids = NULL;
4427 	resp->local_pids = NULL;
4428 
4429 	/* NOTE: We need to use the protocol_version from
4430 	 * sattach here since responses will be sent back to it. */
4431 	if (msg->protocol_version < protocol_version)
4432 		protocol_version = msg->protocol_version;
4433 
4434 	/* Following call fills in gtids and local_pids when successful. */
4435 	rc = stepd_attach(fd, protocol_version, &ioaddr,
4436 			  &resp_msg.address, job_cred_sig, resp);
4437 	if (rc != SLURM_SUCCESS) {
4438 		debug2("stepd_attach call failed");
4439 		goto done2;
4440 	}
4441 
4442 done2:
4443 	close(fd);
4444 done:
4445 	debug2("update step addrs rc = %d", rc);
4446 	resp_msg.data         = resp;
4447 	resp_msg.msg_type     = RESPONSE_REATTACH_TASKS;
4448 	resp->node_name       = xstrdup(conf->node_name);
4449 	resp->return_code     = rc;
4450 	debug2("node %s sending rc = %d", conf->node_name, rc);
4451 
4452 	slurm_send_node_msg(msg->conn_fd, &resp_msg);
4453 	slurm_free_reattach_tasks_response_msg(resp);
4454 }
4455 
_get_job_uid(uint32_t jobid)4456 static uid_t _get_job_uid(uint32_t jobid)
4457 {
4458 	List steps;
4459 	ListIterator i;
4460 	step_loc_t *stepd;
4461 	uid_t uid = -1;
4462 	int fd;
4463 
4464 	steps = stepd_available(conf->spooldir, conf->node_name);
4465 	i = list_iterator_create(steps);
4466 	while ((stepd = list_next(i))) {
4467 		if (stepd->jobid != jobid) {
4468 			/* multiple jobs expected on shared nodes */
4469 			continue;
4470 		}
4471 		fd = stepd_connect(stepd->directory, stepd->nodename,
4472 				   stepd->jobid, stepd->stepid,
4473 				   &stepd->protocol_version);
4474 		if (fd == -1) {
4475 			debug3("Unable to connect to step %u.%u",
4476 			       stepd->jobid, stepd->stepid);
4477 			continue;
4478 		}
4479 		uid = stepd_get_uid(fd, stepd->protocol_version);
4480 
4481 		close(fd);
4482 		if ((int)uid < 0) {
4483 			debug("stepd_get_uid failed %u.%u: %m",
4484 			      stepd->jobid, stepd->stepid);
4485 			continue;
4486 		}
4487 		break;
4488 	}
4489 	list_iterator_destroy(i);
4490 	FREE_NULL_LIST(steps);
4491 
4492 	return uid;
4493 }
4494 
4495 /*
4496  * _kill_all_active_steps - signals the container of all steps of a job
4497  * jobid IN - id of job to signal
4498  * sig   IN - signal to send
4499  * flags IN - to decide if batch step must be signaled, if its childs too, etc
4500  * batch IN - if true signal batch script, otherwise skip it
4501  * RET count of signaled job steps (plus batch script, if applicable)
4502  */
4503 static int
_kill_all_active_steps(uint32_t jobid,int sig,int flags,bool batch,uid_t req_uid)4504 _kill_all_active_steps(uint32_t jobid, int sig, int flags, bool batch,
4505 		       uid_t req_uid)
4506 {
4507 	List steps;
4508 	ListIterator i;
4509 	step_loc_t *stepd;
4510 	int step_cnt  = 0;
4511 	int rc = SLURM_SUCCESS;
4512 
4513 	bool sig_all_steps = true;
4514 	bool sig_batch_step = false;
4515 
4516 	if ((flags & KILL_JOB_BATCH) || (flags & KILL_FULL_JOB)) {
4517 		sig_all_steps = false;
4518 		sig_batch_step = true;
4519 	} else if (batch)
4520 		sig_batch_step = true;
4521 
4522 	steps = stepd_available(conf->spooldir, conf->node_name);
4523 	i = list_iterator_create(steps);
4524 	while ((stepd = list_next(i))) {
4525 		if (stepd->jobid != jobid) {
4526 			/* multiple jobs expected on shared nodes */
4527 			debug3("%s: Looking for job %u, found step from job %u",
4528 			       __func__, jobid, stepd->jobid);
4529 			continue;
4530 		}
4531 
4532 		if ((sig_all_steps && (stepd->stepid != SLURM_BATCH_SCRIPT)) ||
4533 		    (sig_batch_step && (stepd->stepid == SLURM_BATCH_SCRIPT))) {
4534 			if (_signal_jobstep(stepd->jobid, stepd->stepid, sig,
4535 			                    flags, req_uid) != SLURM_SUCCESS) {
4536 				rc = SLURM_ERROR;
4537 				continue;
4538 			}
4539 			step_cnt++;
4540 		} else {
4541 			debug3("%s: No signaling. Job: %u, Step: %u. Flags: %u",
4542 			       __func__, stepd->jobid, stepd->stepid, flags);
4543 		}
4544 	}
4545 	list_iterator_destroy(i);
4546 	FREE_NULL_LIST(steps);
4547 
4548 	if (step_cnt == 0)
4549 		debug2("No steps in jobid %u %s %d",
4550 		       jobid, (rc == SLURM_SUCCESS) ?
4551 		       "to send signal" : "were able to be signaled with",
4552 		       sig);
4553 
4554 	return step_cnt;
4555 }
4556 
4557 /*
4558  * ume_notify - Notify all jobs and steps on this node that a Uncorrectable
4559  *	Memory Error (UME) has occured by sending SIG_UME (to log event in
4560  *	stderr)
4561  * RET count of signaled job steps
4562  */
ume_notify(void)4563 extern int ume_notify(void)
4564 {
4565 	List steps;
4566 	ListIterator i;
4567 	step_loc_t *stepd;
4568 	int step_cnt  = 0;
4569 	int fd;
4570 
4571 	steps = stepd_available(conf->spooldir, conf->node_name);
4572 	i = list_iterator_create(steps);
4573 	while ((stepd = list_next(i))) {
4574 		step_cnt++;
4575 
4576 		fd = stepd_connect(stepd->directory, stepd->nodename,
4577 				   stepd->jobid, stepd->stepid,
4578 				   &stepd->protocol_version);
4579 		if (fd == -1) {
4580 			debug3("Unable to connect to step %u.%u",
4581 			       stepd->jobid, stepd->stepid);
4582 			continue;
4583 		}
4584 
4585 		debug2("container SIG_UME to job %u.%u",
4586 		       stepd->jobid, stepd->stepid);
4587 		if (stepd_signal_container(
4588 			    fd, stepd->protocol_version, SIG_UME, 0,
4589 			    getuid()) < 0)
4590 			debug("kill jobid=%u failed: %m", stepd->jobid);
4591 		close(fd);
4592 	}
4593 	list_iterator_destroy(i);
4594 	FREE_NULL_LIST(steps);
4595 
4596 	if (step_cnt == 0)
4597 		debug2("No steps to send SIG_UME");
4598 	return step_cnt;
4599 }
4600 /*
4601  * _terminate_all_steps - signals the container of all steps of a job
4602  * jobid IN - id of job to signal
4603  * batch IN - if true signal batch script, otherwise skip it
4604  * RET count of signaled job steps (plus batch script, if applicable)
4605  */
4606 static int
_terminate_all_steps(uint32_t jobid,bool batch)4607 _terminate_all_steps(uint32_t jobid, bool batch)
4608 {
4609 	List steps;
4610 	ListIterator i;
4611 	step_loc_t *stepd;
4612 	int step_cnt  = 0;
4613 	int fd;
4614 
4615 	steps = stepd_available(conf->spooldir, conf->node_name);
4616 	i = list_iterator_create(steps);
4617 	while ((stepd = list_next(i))) {
4618 		if (stepd->jobid != jobid) {
4619 			/* multiple jobs expected on shared nodes */
4620 			debug3("Step from other job: jobid=%u (this jobid=%u)",
4621 			       stepd->jobid, jobid);
4622 			continue;
4623 		}
4624 
4625 		if ((stepd->stepid == SLURM_BATCH_SCRIPT) && (!batch))
4626 			continue;
4627 
4628 		step_cnt++;
4629 
4630 		fd = stepd_connect(stepd->directory, stepd->nodename,
4631 				   stepd->jobid, stepd->stepid,
4632 				   &stepd->protocol_version);
4633 		if (fd == -1) {
4634 			debug3("Unable to connect to step %u.%u",
4635 			       stepd->jobid, stepd->stepid);
4636 			continue;
4637 		}
4638 
4639 		debug2("terminate job step %u.%u", jobid, stepd->stepid);
4640 		if (stepd_terminate(fd, stepd->protocol_version) < 0)
4641 			debug("kill jobid=%u.%u failed: %m", jobid,
4642 			      stepd->stepid);
4643 		close(fd);
4644 	}
4645 	list_iterator_destroy(i);
4646 	FREE_NULL_LIST(steps);
4647 	if (step_cnt == 0)
4648 		debug2("No steps in job %u to terminate", jobid);
4649 	return step_cnt;
4650 }
4651 
4652 static bool
_job_still_running(uint32_t job_id)4653 _job_still_running(uint32_t job_id)
4654 {
4655 	bool         retval = false;
4656 	List         steps;
4657 	ListIterator i;
4658 	step_loc_t  *s     = NULL;
4659 
4660 	steps = stepd_available(conf->spooldir, conf->node_name);
4661 	i = list_iterator_create(steps);
4662 	while ((s = list_next(i))) {
4663 		if (s->jobid == job_id) {
4664 			int fd;
4665 			fd = stepd_connect(s->directory, s->nodename,
4666 					   s->jobid, s->stepid,
4667 					   &s->protocol_version);
4668 			if (fd == -1)
4669 				continue;
4670 
4671 			if (stepd_state(fd, s->protocol_version)
4672 			    != SLURMSTEPD_NOT_RUNNING) {
4673 				retval = true;
4674 				close(fd);
4675 				break;
4676 			}
4677 			close(fd);
4678 		}
4679 	}
4680 	list_iterator_destroy(i);
4681 	FREE_NULL_LIST(steps);
4682 
4683 	return retval;
4684 }
4685 
4686 /*
4687  * Wait until all job steps are in SLURMSTEPD_NOT_RUNNING state.
4688  * This indicates that switch_g_job_postfini has completed and
4689  * freed the switch windows (as needed only for Federation switch).
4690  */
4691 static void
_wait_state_completed(uint32_t jobid,int max_delay)4692 _wait_state_completed(uint32_t jobid, int max_delay)
4693 {
4694 	int i;
4695 
4696 	for (i=0; i<max_delay; i++) {
4697 		if (_steps_completed_now(jobid))
4698 			break;
4699 		sleep(1);
4700 	}
4701 	if (i >= max_delay)
4702 		error("timed out waiting for job %u to complete", jobid);
4703 }
4704 
4705 static bool
_steps_completed_now(uint32_t jobid)4706 _steps_completed_now(uint32_t jobid)
4707 {
4708 	List steps;
4709 	ListIterator i;
4710 	step_loc_t *stepd;
4711 	bool rc = true;
4712 
4713 	steps = stepd_available(conf->spooldir, conf->node_name);
4714 	i = list_iterator_create(steps);
4715 	while ((stepd = list_next(i))) {
4716 		if (stepd->jobid == jobid) {
4717 			int fd;
4718 			fd = stepd_connect(stepd->directory, stepd->nodename,
4719 					   stepd->jobid, stepd->stepid,
4720 					   &stepd->protocol_version);
4721 			if (fd == -1)
4722 				continue;
4723 
4724 			if (stepd_state(fd, stepd->protocol_version)
4725 			    != SLURMSTEPD_NOT_RUNNING) {
4726 				rc = false;
4727 				close(fd);
4728 				break;
4729 			}
4730 			close(fd);
4731 		}
4732 	}
4733 	list_iterator_destroy(i);
4734 	FREE_NULL_LIST(steps);
4735 
4736 	return rc;
4737 }
4738 
_epilog_complete_msg_setup(slurm_msg_t * msg,epilog_complete_msg_t * req,uint32_t jobid,int rc)4739 static void _epilog_complete_msg_setup(
4740 	slurm_msg_t *msg, epilog_complete_msg_t *req, uint32_t jobid, int rc)
4741 {
4742 	slurm_msg_t_init(msg);
4743 	memset(req, 0, sizeof(epilog_complete_msg_t));
4744 
4745 	req->job_id      = jobid;
4746 	req->return_code = rc;
4747 	req->node_name   = conf->node_name;
4748 
4749 	msg->msg_type    = MESSAGE_EPILOG_COMPLETE;
4750 	msg->data        = req;
4751 }
4752 
4753 /*
4754  *  Send epilog complete message to currently active controller.
4755  *  If enabled, use message aggregation.
4756  *   Returns SLURM_SUCCESS if message sent successfully,
4757  *           SLURM_ERROR if epilog complete message fails to be sent.
4758  */
4759 static int
_epilog_complete(uint32_t jobid,int rc)4760 _epilog_complete(uint32_t jobid, int rc)
4761 {
4762 	int ret = SLURM_SUCCESS;
4763 
4764 	if (conf->msg_aggr_window_msgs > 1) {
4765 		/* message aggregation is enabled */
4766 		slurm_msg_t *msg = xmalloc(sizeof(slurm_msg_t));
4767 		epilog_complete_msg_t *req =
4768 			xmalloc(sizeof(epilog_complete_msg_t));
4769 
4770 		_epilog_complete_msg_setup(msg, req, jobid, rc);
4771 
4772 		/* we need to copy this symbol */
4773 		req->node_name   = xstrdup(conf->node_name);
4774 
4775 		msg_aggr_add_msg(msg, 0, NULL);
4776 	} else {
4777 		slurm_msg_t msg;
4778 		epilog_complete_msg_t req;
4779 
4780 		_epilog_complete_msg_setup(&msg, &req, jobid, rc);
4781 
4782 		/* Note: No return code to message, slurmctld will resend
4783 		 * TERMINATE_JOB request if message send fails */
4784 		if (slurm_send_only_controller_msg(&msg, working_cluster_rec)
4785 		    < 0) {
4786 			error("Unable to send epilog complete message: %m");
4787 			ret = SLURM_ERROR;
4788 		} else {
4789 			debug("Job %u: sent epilog complete msg: rc = %d",
4790 			      jobid, rc);
4791 		}
4792 	}
4793 	return ret;
4794 }
4795 
4796 /* if a lock is granted to the job then return 1; else return 0 if
4797  * the lock for the job is already taken or there's no more locks */
4798 static int
_get_suspend_job_lock(uint32_t job_id)4799 _get_suspend_job_lock(uint32_t job_id)
4800 {
4801 	static bool logged = false;
4802 	int i, empty_loc = -1, rc = 0;
4803 
4804 	slurm_mutex_lock(&suspend_mutex);
4805 	for (i = 0; i < job_suspend_size; i++) {
4806 		if (job_suspend_array[i] == 0) {
4807 			empty_loc = i;
4808 			continue;
4809 		}
4810 		if (job_suspend_array[i] == job_id) {
4811 			/* another thread already a lock for this job ID */
4812 			slurm_mutex_unlock(&suspend_mutex);
4813 			return rc;
4814 		}
4815 	}
4816 
4817 	if (empty_loc != -1) {
4818 		/* nobody has the lock and here's an available used lock */
4819 		job_suspend_array[empty_loc] = job_id;
4820 		rc = 1;
4821 	} else if (job_suspend_size < NUM_PARALLEL_SUSP_JOBS) {
4822 		/* a new lock is available */
4823 		job_suspend_array[job_suspend_size++] = job_id;
4824 		rc = 1;
4825 	} else if (!logged) {
4826 		error("Simultaneous job suspend/resume limit reached (%d). "
4827 		      "Configure SchedulerTimeSlice higher.",
4828 		      NUM_PARALLEL_SUSP_JOBS);
4829 		logged = true;
4830 	}
4831 	slurm_mutex_unlock(&suspend_mutex);
4832 	return rc;
4833 }
4834 
4835 static void
_unlock_suspend_job(uint32_t job_id)4836 _unlock_suspend_job(uint32_t job_id)
4837 {
4838 	int i;
4839 	slurm_mutex_lock(&suspend_mutex);
4840 	for (i = 0; i < job_suspend_size; i++) {
4841 		if (job_suspend_array[i] == job_id)
4842 			job_suspend_array[i] = 0;
4843 	}
4844 	slurm_mutex_unlock(&suspend_mutex);
4845 }
4846 
4847 /* Add record for every launched job so we know they are ready for suspend */
record_launched_jobs(void)4848 extern void record_launched_jobs(void)
4849 {
4850 	List steps;
4851 	ListIterator i;
4852 	step_loc_t *stepd;
4853 
4854 	steps = stepd_available(conf->spooldir, conf->node_name);
4855 	i = list_iterator_create(steps);
4856 	while ((stepd = list_next(i))) {
4857 		_launch_complete_add(stepd->jobid);
4858 	}
4859 	list_iterator_destroy(i);
4860 	FREE_NULL_LIST(steps);
4861 }
4862 
4863 /*
4864  * Send a job suspend/resume request through the appropriate slurmstepds for
4865  * each job step belonging to a given job allocation.
4866  */
4867 static void
_rpc_suspend_job(slurm_msg_t * msg)4868 _rpc_suspend_job(slurm_msg_t *msg)
4869 {
4870 	int time_slice = -1;
4871 	suspend_int_msg_t *req = msg->data;
4872 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
4873 	List steps;
4874 	ListIterator i;
4875 	step_loc_t *stepd;
4876 	int step_cnt  = 0;
4877 	int rc = SLURM_SUCCESS;
4878 	DEF_TIMERS;
4879 
4880 	if (time_slice == -1)
4881 		time_slice = slurm_get_time_slice();
4882 	if ((req->op != SUSPEND_JOB) && (req->op != RESUME_JOB)) {
4883 		error("REQUEST_SUSPEND_INT: bad op code %u", req->op);
4884 		rc = ESLURM_NOT_SUPPORTED;
4885 	}
4886 
4887 	/*
4888 	 * check that requesting user ID is the Slurm UID or root
4889 	 */
4890 	if (!_slurm_authorized_user(req_uid)) {
4891 		error("Security violation: suspend_job(%u) from uid %d",
4892 		      req->job_id, req_uid);
4893 		rc =  ESLURM_USER_ID_MISSING;
4894 	}
4895 
4896 	/* send a response now, which will include any errors
4897 	 * detected with the request */
4898 	if (msg->conn_fd >= 0) {
4899 		slurm_send_rc_msg(msg, rc);
4900 		if (close(msg->conn_fd) < 0)
4901 			error("_rpc_suspend_job: close(%d): %m",
4902 			      msg->conn_fd);
4903 		msg->conn_fd = -1;
4904 	}
4905 	if (rc != SLURM_SUCCESS)
4906 		return;
4907 
4908 	/* now we can focus on performing the requested action,
4909 	 * which could take a few seconds to complete */
4910 	debug("_rpc_suspend_job jobid=%u uid=%d action=%s", req->job_id,
4911 	      req_uid, req->op == SUSPEND_JOB ? "suspend" : "resume");
4912 
4913 	/* Try to get a thread lock for this job. If the lock
4914 	 * is not available then sleep and try again */
4915 	while (!_get_suspend_job_lock(req->job_id)) {
4916 		debug3("suspend lock sleep for %u", req->job_id);
4917 		usleep(10000);
4918 	}
4919 	START_TIMER;
4920 
4921 	/* Defer suspend until job prolog and launch complete */
4922 	if (req->op == SUSPEND_JOB)
4923 		_launch_complete_wait(req->job_id);
4924 
4925 	if ((req->op == SUSPEND_JOB) && (req->indf_susp))
4926 		switch_g_job_suspend(req->switch_info, 5);
4927 
4928 	if (req->op == SUSPEND_JOB)
4929 		(void) task_g_slurmd_suspend_job(req->job_id);
4930 
4931 	/*
4932 	 * Loop through all job steps and call stepd_suspend or stepd_resume
4933 	 * as appropriate. Since the "suspend" action may contains a sleep
4934 	 * (if the launch is in progress) suspend multiple jobsteps in parallel.
4935 	 */
4936 	steps = stepd_available(conf->spooldir, conf->node_name);
4937 	i = list_iterator_create(steps);
4938 
4939 	while (1) {
4940 		int x, fdi, fd[NUM_PARALLEL_SUSP_STEPS];
4941 		uint16_t protocol_version[NUM_PARALLEL_SUSP_STEPS];
4942 
4943 		fdi = 0;
4944 		while ((stepd = list_next(i))) {
4945 			if (stepd->jobid != req->job_id) {
4946 				/* multiple jobs expected on shared nodes */
4947 				debug3("Step from other job: jobid=%u "
4948 				       "(this jobid=%u)",
4949 				       stepd->jobid, req->job_id);
4950 				continue;
4951 			}
4952 			step_cnt++;
4953 
4954 			fd[fdi] = stepd_connect(stepd->directory,
4955 						stepd->nodename, stepd->jobid,
4956 						stepd->stepid,
4957 						&protocol_version[fdi]);
4958 			if (fd[fdi] == -1) {
4959 				debug3("Unable to connect to step %u.%u",
4960 				       stepd->jobid, stepd->stepid);
4961 				continue;
4962 			}
4963 
4964 			fdi++;
4965 			if (fdi >= NUM_PARALLEL_SUSP_STEPS)
4966 				break;
4967 		}
4968 		/* check for open connections */
4969 		if (fdi == 0)
4970 			break;
4971 
4972 		if (req->op == SUSPEND_JOB) {
4973 			int susp_fail_count = 0;
4974 			/* The suspend RPCs are processed in parallel for
4975 			 * every step in the job */
4976 			for (x = 0; x < fdi; x++) {
4977 				(void) stepd_suspend(fd[x],
4978 						     protocol_version[x],
4979 						     req, 0);
4980 			}
4981 			for (x = 0; x < fdi; x++) {
4982 				if (stepd_suspend(fd[x],
4983 						  protocol_version[x],
4984 						  req, 1) < 0) {
4985 					susp_fail_count++;
4986 				} else {
4987 					close(fd[x]);
4988 					fd[x] = -1;
4989 				}
4990 			}
4991 			/* Suspend RPCs can fail at step startup, so retry */
4992 			if (susp_fail_count) {
4993 				sleep(1);
4994 				for (x = 0; x < fdi; x++) {
4995 					if (fd[x] == -1)
4996 						continue;
4997 					(void) stepd_suspend(
4998 						fd[x],
4999 						protocol_version[x],
5000 						req, 0);
5001 					if (stepd_suspend(
5002 						    fd[x],
5003 						    protocol_version[x],
5004 						    req, 1) >= 0)
5005 						continue;
5006 					debug("Suspend of job %u failed: %m",
5007 					      req->job_id);
5008 				}
5009 			}
5010 		} else {
5011 			/* The resume RPCs are processed in parallel for
5012 			 * every step in the job */
5013 			for (x = 0; x < fdi; x++) {
5014 				(void) stepd_resume(fd[x],
5015 						    protocol_version[x],
5016 						    req, 0);
5017 			}
5018 			for (x = 0; x < fdi; x++) {
5019 				if (stepd_resume(fd[x],
5020 						 protocol_version[x],
5021 						 req, 1) < 0) {
5022 					debug("Resume of job %u failed: %m",
5023 					      req->job_id);
5024 				}
5025 			}
5026 		}
5027 		for (x = 0; x < fdi; x++) {
5028 			/* fd may have been closed by stepd_suspend */
5029 			if (fd[x] != -1)
5030 				close(fd[x]);
5031 		}
5032 
5033 		/* check for no more jobs */
5034 		if (fdi < NUM_PARALLEL_SUSP_STEPS)
5035 			break;
5036 	}
5037 	list_iterator_destroy(i);
5038 	FREE_NULL_LIST(steps);
5039 
5040 	if (req->op == RESUME_JOB) /* Call task plugin after processes resume */
5041 		(void) task_g_slurmd_resume_job(req->job_id);
5042 	if ((req->op == RESUME_JOB) && (req->indf_susp))
5043 		switch_g_job_resume(req->switch_info, 5);
5044 
5045 	_unlock_suspend_job(req->job_id);
5046 
5047 	END_TIMER;
5048 	if (DELTA_TIMER >= (time_slice * USEC_IN_SEC)) {
5049 		if (req->op == SUSPEND_JOB) {
5050 			info("Suspend time for job_id %u was %s. "
5051 			     "Configure SchedulerTimeSlice higher.",
5052 			     req->job_id, TIME_STR);
5053 		} else {
5054 			info("Resume time for job_id %u was %s. "
5055 			     "Configure SchedulerTimeSlice higher.",
5056 			     req->job_id, TIME_STR);
5057 		}
5058 	}
5059 
5060 	if (step_cnt == 0) {
5061 		debug2("No steps in jobid %u to suspend/resume", req->job_id);
5062 	}
5063 }
5064 
5065 /* Job shouldn't even be running here, abort it immediately */
5066 static void
_rpc_abort_job(slurm_msg_t * msg)5067 _rpc_abort_job(slurm_msg_t *msg)
5068 {
5069 	kill_job_msg_t *req    = msg->data;
5070 	uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
5071 	job_env_t       job_env;
5072 	int		node_id = 0;
5073 	uint32_t        jobid;
5074 
5075 	debug("_rpc_abort_job, uid = %d", uid);
5076 	/*
5077 	 * check that requesting user ID is the Slurm UID
5078 	 */
5079 	if (!_slurm_authorized_user(uid)) {
5080 		error("Security violation: abort_job(%u) from uid %d",
5081 		      req->job_id, uid);
5082 		if (msg->conn_fd >= 0)
5083 			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
5084 		return;
5085 	}
5086 
5087 	task_g_slurmd_release_resources(req->job_id);
5088 
5089 	/*
5090 	 * "revoke" all future credentials for this jobid
5091 	 */
5092 	if (slurm_cred_revoke(conf->vctx, req->job_id, req->time,
5093 			      req->start_time) < 0) {
5094 		debug("revoking cred for job %u: %m", req->job_id);
5095 	} else {
5096 		save_cred_state(conf->vctx);
5097 		debug("credential for job %u revoked", req->job_id);
5098 	}
5099 
5100 	/*
5101 	 *  At this point, if connection still open, we send controller
5102 	 *   a "success" reply to indicate that we've recvd the msg.
5103 	 */
5104 	if (msg->conn_fd >= 0) {
5105 		slurm_send_rc_msg(msg, SLURM_SUCCESS);
5106 		if (close(msg->conn_fd) < 0)
5107 			error ("rpc_abort_job: close(%d): %m", msg->conn_fd);
5108 		msg->conn_fd = -1;
5109 	}
5110 
5111 	if (_kill_all_active_steps(req->job_id, SIG_ABORT, 0, true, uid)) {
5112 		/*
5113 		 *  Block until all user processes are complete.
5114 		 */
5115 		_pause_for_job_completion (req->job_id, req->nodes, 0);
5116 	}
5117 
5118 	/*
5119 	 *  Begin expiration period for cached information about job.
5120 	 *   If expiration period has already begun, then do not run
5121 	 *   the epilog again, as that script has already been executed
5122 	 *   for this job.
5123 	 */
5124 	if (slurm_cred_begin_expiration(conf->vctx, req->job_id) < 0) {
5125 		debug("Not running epilog for jobid %d: %m", req->job_id);
5126 		return;
5127 	}
5128 
5129 	save_cred_state(conf->vctx);
5130 
5131 #ifndef HAVE_FRONT_END
5132 	/* It is always 0 for front end systems */
5133 	node_id = nodelist_find(req->nodes, conf->node_name);
5134 #endif
5135 	memset(&job_env, 0, sizeof(job_env));
5136 	gres_plugin_epilog_set_env(&job_env.gres_job_env, req->job_gres_info,
5137 				   node_id);
5138 	job_env.jobid = req->job_id;
5139 	job_env.node_list = req->nodes;
5140 	job_env.spank_job_env = req->spank_job_env;
5141 	job_env.spank_job_env_size = req->spank_job_env_size;
5142 	job_env.uid = req->job_uid;
5143 	job_env.gid = req->job_gid;
5144 
5145 	_run_epilog(&job_env);
5146 	_free_job_env(&job_env);
5147 
5148 #ifdef HAVE_NATIVE_CRAY
5149 	if (req->het_job_id && (req->het_job_id != NO_VAL))
5150 		jobid = req->het_job_id;
5151 	else
5152 		jobid = req->job_id;
5153 #else
5154 	jobid = req->job_id;
5155 #endif
5156 
5157 	if (container_g_delete(jobid))
5158 		error("container_g_delete(%u): %m", req->job_id);
5159 	_launch_complete_rm(req->job_id);
5160 }
5161 
5162 /*
5163  * This complete batch RPC came from slurmstepd because we have message
5164  * aggregation configured. Terminate the job here and forward the batch
5165  * completion RPC to slurmctld.
5166  */
5167 static void
_rpc_complete_batch(slurm_msg_t * msg)5168 _rpc_complete_batch(slurm_msg_t *msg)
5169 {
5170 	int		i, rc, msg_rc;
5171 	slurm_msg_t	resp_msg;
5172 	uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
5173 	complete_batch_script_msg_t *req = msg->data;
5174 
5175 	if (!_slurm_authorized_user(uid)) {
5176 		error("Security violation: complete_batch(%u) from uid %d",
5177 		      req->job_id, uid);
5178 		if (msg->conn_fd >= 0)
5179 			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
5180 		return;
5181 	}
5182 
5183 	slurm_send_rc_msg(msg, SLURM_SUCCESS);
5184 
5185 	for (i = 0; i <= MAX_RETRY; i++) {
5186 		if (conf->msg_aggr_window_msgs > 1) {
5187 			slurm_msg_t *req_msg =
5188 				xmalloc_nz(sizeof(slurm_msg_t));
5189 			slurm_msg_t_init(req_msg);
5190 			req_msg->msg_type = msg->msg_type;
5191 			req_msg->data = msg->data;
5192 			msg->data = NULL;
5193 
5194 			msg_aggr_add_msg(req_msg, 1, NULL);
5195 			return;
5196 		} else {
5197 			slurm_msg_t req_msg;
5198 			slurm_msg_t_init(&req_msg);
5199 			req_msg.msg_type = msg->msg_type;
5200 			req_msg.data	 = msg->data;
5201 			msg_rc = slurm_send_recv_controller_msg(
5202 				&req_msg, &resp_msg, working_cluster_rec);
5203 
5204 			if (msg_rc == SLURM_SUCCESS)
5205 				break;
5206 		}
5207 		info("Retrying job complete RPC for job %u", req->job_id);
5208 		sleep(RETRY_DELAY);
5209 	}
5210 	if (i > MAX_RETRY) {
5211 		error("Unable to send job complete message: %m");
5212 		return;
5213 	}
5214 
5215 	if (resp_msg.msg_type == RESPONSE_SLURM_RC) {
5216 		last_slurmctld_msg = time(NULL);
5217 		rc = ((return_code_msg_t *) resp_msg.data)->return_code;
5218 		slurm_free_return_code_msg(resp_msg.data);
5219 		if (rc) {
5220 			error("complete_batch for job %u: %s", req->job_id,
5221 			      slurm_strerror(rc));
5222 		}
5223 		return;
5224 	}
5225 }
5226 
5227 static void
_rpc_terminate_job(slurm_msg_t * msg)5228 _rpc_terminate_job(slurm_msg_t *msg)
5229 {
5230 	int             rc     = SLURM_SUCCESS;
5231 	kill_job_msg_t *req    = msg->data;
5232 	uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
5233 	int             nsteps = 0;
5234 	int		delay;
5235 	int		node_id = 0;
5236 	job_env_t       job_env;
5237 	uint32_t        jobid;
5238 
5239 	debug("_rpc_terminate_job, uid = %d", uid);
5240 	/*
5241 	 * check that requesting user ID is the Slurm UID
5242 	 */
5243 	if (!_slurm_authorized_user(uid)) {
5244 		error("Security violation: kill_job(%u) from uid %d",
5245 		      req->job_id, uid);
5246 		if (msg->conn_fd >= 0)
5247 			slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
5248 		return;
5249 	}
5250 
5251 	/* Use this when dealing with the job container */
5252 #ifdef HAVE_NATIVE_CRAY
5253 	if (req->het_job_id && (req->het_job_id != NO_VAL))
5254 		jobid = req->het_job_id;
5255 	else
5256 		jobid = req->job_id;
5257 #else
5258 	jobid = req->job_id;
5259 #endif
5260 
5261 	task_g_slurmd_release_resources(req->job_id);
5262 
5263 	/*
5264 	 *  Initialize a "waiter" thread for this jobid. If another
5265 	 *   thread is already waiting on termination of this job,
5266 	 *   _waiter_init() will return SLURM_ERROR. In this case, just
5267 	 *   notify slurmctld that we recvd the message successfully,
5268 	 *   then exit this thread.
5269 	 */
5270 	if (_waiter_init(req->job_id) == SLURM_ERROR) {
5271 		if (msg->conn_fd >= 0) {
5272 			/* No matter if the step hasn't started yet or
5273 			 * not just send a success to let the
5274 			 * controller know we got this request.
5275 			 */
5276 			slurm_send_rc_msg (msg, SLURM_SUCCESS);
5277 		}
5278 		return;
5279 	}
5280 
5281 	/*
5282 	 * Note the job is finishing to avoid a race condition for batch jobs
5283 	 * that finish before the slurmd knows it finished launching.
5284 	 */
5285 	_note_batch_job_finished(req->job_id);
5286 	/*
5287 	 * "revoke" all future credentials for this jobid
5288 	 */
5289 	if (slurm_cred_revoke(conf->vctx, req->job_id, req->time,
5290 			      req->start_time) < 0) {
5291 		debug("revoking cred for job %u: %m", req->job_id);
5292 	} else {
5293 		save_cred_state(conf->vctx);
5294 		debug("credential for job %u revoked", req->job_id);
5295 	}
5296 
5297 	if (_prolog_is_running(req->job_id)) {
5298 		if (msg->conn_fd >= 0) {
5299 			/* If the step hasn't finished running the prolog
5300 			 * (or finshed starting the extern step) yet just send
5301 			 * a success to let the controller know we got
5302 			 * this request.
5303 			 */
5304 			debug("%s: sent SUCCESS for %u, waiting for prolog to finish",
5305 			      __func__, req->job_id);
5306 			slurm_send_rc_msg(msg, SLURM_SUCCESS);
5307 			if (close(msg->conn_fd) < 0)
5308 				error("%s: close(%d): %m",
5309 				      __func__, msg->conn_fd);
5310 			msg->conn_fd = -1;
5311 		}
5312 		_wait_for_job_running_prolog(req->job_id);
5313 	}
5314 
5315 	/*
5316 	 * Before signaling steps, if the job has any steps that are still
5317 	 * in the process of fork/exec/check in with slurmd, wait on a condition
5318 	 * var for the start.  Otherwise a slow-starting step can miss the
5319 	 * job termination message and run indefinitely.
5320 	 */
5321 	if (_step_is_starting(req->job_id, NO_VAL)) {
5322 		if (msg->conn_fd >= 0) {
5323 			/* If the step hasn't started yet just send a
5324 			 * success to let the controller know we got
5325 			 * this request.
5326 			 */
5327 			debug("sent SUCCESS, waiting for step to start");
5328 			slurm_send_rc_msg (msg, SLURM_SUCCESS);
5329 			if (close(msg->conn_fd) < 0)
5330 				error("rpc_kill_job: close(%d): %m",
5331 				      msg->conn_fd);
5332 			msg->conn_fd = -1;
5333 		}
5334 		if (_wait_for_starting_step(req->job_id, NO_VAL)) {
5335 			/*
5336 			 * There's currently no case in which we enter this
5337 			 * error condition.  If there was, it's hard to say
5338 			 * whether to proceed with the job termination.
5339 			 */
5340 			error("Error in _wait_for_starting_step");
5341 		}
5342 	}
5343 
5344 	if (IS_JOB_NODE_FAILED(req))
5345 		_kill_all_active_steps(req->job_id, SIG_NODE_FAIL, 0, true,
5346 				       uid);
5347 	if (IS_JOB_PENDING(req))
5348 		_kill_all_active_steps(req->job_id, SIG_REQUEUED, 0, true, uid);
5349 	else if (IS_JOB_FAILED(req))
5350 		_kill_all_active_steps(req->job_id, SIG_FAILURE, 0, true, uid);
5351 
5352 	/*
5353 	 * Tasks might be stopped (possibly by a debugger)
5354 	 * so send SIGCONT first.
5355 	 */
5356 	_kill_all_active_steps(req->job_id, SIGCONT, 0, true, uid);
5357 	if (errno == ESLURMD_STEP_SUSPENDED) {
5358 		/*
5359 		 * If the job step is currently suspended, we don't
5360 		 * bother with a "nice" termination.
5361 		 */
5362 		debug2("Job is currently suspended, terminating");
5363 		nsteps = _terminate_all_steps(req->job_id, true);
5364 	} else {
5365 		nsteps = _kill_all_active_steps(req->job_id, SIGTERM, 0, true,
5366 						uid);
5367 	}
5368 
5369 	/*
5370 	 *  If there are currently no active job steps and no
5371 	 *    configured epilog to run, bypass asynchronous reply and
5372 	 *    notify slurmctld that we have already completed this
5373 	 *    request. We need to send current switch state on AIX
5374 	 *    systems, so this bypass can not be used.
5375 	 */
5376 	if ((nsteps == 0) && !conf->epilog && !spank_has_epilog()) {
5377 		debug4("sent ALREADY_COMPLETE");
5378 		if (msg->conn_fd >= 0) {
5379 			slurm_send_rc_msg(msg,
5380 					  ESLURMD_KILL_JOB_ALREADY_COMPLETE);
5381 		}
5382 		slurm_cred_begin_expiration(conf->vctx, req->job_id);
5383 		save_cred_state(conf->vctx);
5384 		_waiter_complete(req->job_id);
5385 
5386 		/*
5387 		 * The controller needs to get MESSAGE_EPILOG_COMPLETE to bring
5388 		 * the job out of "completing" state.  Otherwise, the job
5389 		 * could remain "completing" unnecessarily, until the request
5390 		 * to terminate is resent.
5391 		 */
5392 		_sync_messages_kill(req);
5393 		if (msg->conn_fd < 0) {
5394 			/* The epilog complete message processing on
5395 			 * slurmctld is equivalent to that of a
5396 			 * ESLURMD_KILL_JOB_ALREADY_COMPLETE reply above */
5397 			_epilog_complete(req->job_id, rc);
5398 		}
5399 
5400 		if (container_g_delete(jobid))
5401 			error("container_g_delete(%u): %m", req->job_id);
5402 		_launch_complete_rm(req->job_id);
5403 		return;
5404 	}
5405 
5406 	/*
5407 	 *  At this point, if connection still open, we send controller
5408 	 *   a "success" reply to indicate that we've recvd the msg.
5409 	 */
5410 	if (msg->conn_fd >= 0) {
5411 		debug4("sent SUCCESS");
5412 		slurm_send_rc_msg(msg, SLURM_SUCCESS);
5413 		if (close(msg->conn_fd) < 0)
5414 			error ("rpc_kill_job: close(%d): %m", msg->conn_fd);
5415 		msg->conn_fd = -1;
5416 	}
5417 
5418 	/*
5419 	 *  Check for corpses
5420 	 */
5421 	delay = MAX(conf->kill_wait, 5);
5422 	if (!_pause_for_job_completion (req->job_id, req->nodes, delay) &&
5423 	    _terminate_all_steps(req->job_id, true) ) {
5424 		/*
5425 		 *  Block until all user processes are complete.
5426 		 */
5427 		_pause_for_job_completion (req->job_id, req->nodes, 0);
5428 	}
5429 
5430 	/*
5431 	 *  Begin expiration period for cached information about job.
5432 	 *   If expiration period has already begun, then do not run
5433 	 *   the epilog again, as that script has already been executed
5434 	 *   for this job.
5435 	 */
5436 	if (slurm_cred_begin_expiration(conf->vctx, req->job_id) < 0) {
5437 		debug("Not running epilog for jobid %d: %m", req->job_id);
5438 		goto done;
5439 	}
5440 
5441 	save_cred_state(conf->vctx);
5442 
5443 #ifndef HAVE_FRONT_END
5444 	/* It is always 0 for front end systems */
5445 	node_id = nodelist_find(req->nodes, conf->node_name);
5446 #endif
5447 	memset(&job_env, 0, sizeof(job_env));
5448 	gres_plugin_epilog_set_env(&job_env.gres_job_env, req->job_gres_info,
5449 				   node_id);
5450 
5451 	job_env.jobid = req->job_id;
5452 	job_env.node_list = req->nodes;
5453 	job_env.spank_job_env = req->spank_job_env;
5454 	job_env.spank_job_env_size = req->spank_job_env_size;
5455 	job_env.uid = req->job_uid;
5456 	job_env.gid = req->job_gid;
5457 
5458 	rc = _run_epilog(&job_env);
5459 	_free_job_env(&job_env);
5460 
5461 	if (rc) {
5462 		int term_sig = 0, exit_status = 0;
5463 		if (WIFSIGNALED(rc))
5464 			term_sig    = WTERMSIG(rc);
5465 		else if (WIFEXITED(rc))
5466 			exit_status = WEXITSTATUS(rc);
5467 		error("[job %u] epilog failed status=%d:%d",
5468 		      req->job_id, exit_status, term_sig);
5469 		rc = ESLURMD_EPILOG_FAILED;
5470 	} else
5471 		debug("completed epilog for jobid %u", req->job_id);
5472 	if (container_g_delete(jobid))
5473 		error("container_g_delete(%u): %m", req->job_id);
5474 	_launch_complete_rm(req->job_id);
5475 
5476 done:
5477 	_wait_state_completed(req->job_id, 5);
5478 	_waiter_complete(req->job_id);
5479 	_sync_messages_kill(req);
5480 
5481 	_epilog_complete(req->job_id, rc);
5482 }
5483 
5484 /* On a parallel job, every slurmd may send the EPILOG_COMPLETE
5485  * message to the slurmctld at the same time, resulting in lost
5486  * messages. We add a delay here to spead out the message traffic
5487  * assuming synchronized clocks across the cluster.
5488  * Allow 10 msec processing time in slurmctld for each RPC. */
_sync_messages_kill(kill_job_msg_t * req)5489 static void _sync_messages_kill(kill_job_msg_t *req)
5490 {
5491 	int host_cnt, host_inx;
5492 	char *host;
5493 	hostset_t hosts;
5494 	int epilog_msg_time;
5495 
5496 	hosts = hostset_create(req->nodes);
5497 	host_cnt = hostset_count(hosts);
5498 	if (host_cnt <= 64)
5499 		goto fini;
5500 	if (conf->hostname == NULL)
5501 		goto fini;	/* should never happen */
5502 
5503 	for (host_inx=0; host_inx<host_cnt; host_inx++) {
5504 		host = hostset_shift(hosts);
5505 		if (host == NULL)
5506 			break;
5507 		if (xstrcmp(host, conf->node_name) == 0) {
5508 			free(host);
5509 			break;
5510 		}
5511 		free(host);
5512 	}
5513 	epilog_msg_time = slurm_get_epilog_msg_time();
5514 	_delay_rpc(host_inx, host_cnt, epilog_msg_time);
5515 
5516 fini:	hostset_destroy(hosts);
5517 }
5518 
5519 /* Delay a message based upon the host index, total host count and RPC_TIME.
5520  * This logic depends upon synchronized clocks across the cluster. */
_delay_rpc(int host_inx,int host_cnt,int usec_per_rpc)5521 static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc)
5522 {
5523 	struct timeval tv1;
5524 	uint32_t cur_time;	/* current time in usec (just 9 digits) */
5525 	uint32_t tot_time;	/* total time expected for all RPCs */
5526 	uint32_t offset_time;	/* relative time within tot_time */
5527 	uint32_t target_time;	/* desired time to issue the RPC */
5528 	uint32_t delta_time;
5529 
5530 again:	if (gettimeofday(&tv1, NULL)) {
5531 		usleep(host_inx * usec_per_rpc);
5532 		return;
5533 	}
5534 
5535 	cur_time = ((tv1.tv_sec % 1000) * 1000000) + tv1.tv_usec;
5536 	tot_time = host_cnt * usec_per_rpc;
5537 	offset_time = cur_time % tot_time;
5538 	target_time = host_inx * usec_per_rpc;
5539 	if (target_time < offset_time)
5540 		delta_time = target_time - offset_time + tot_time;
5541 	else
5542 		delta_time = target_time - offset_time;
5543 	if (usleep(delta_time)) {
5544 		if (errno == EINVAL) /* usleep for more than 1 sec */
5545 			usleep(900000);
5546 		/* errno == EINTR */
5547 		goto again;
5548 	}
5549 }
5550 
5551 /*
5552  *  Returns true if "uid" is a "slurm authorized user" - i.e. uid == 0
5553  *   or uid == slurm user id at this time.
5554  */
5555 static bool
_slurm_authorized_user(uid_t uid)5556 _slurm_authorized_user(uid_t uid)
5557 {
5558 	return ((uid == (uid_t) 0) || (uid == conf->slurm_user_id));
5559 }
5560 
5561 
5562 struct waiter {
5563 	uint32_t jobid;
5564 	pthread_t thd;
5565 };
5566 
5567 
5568 static struct waiter *
_waiter_create(uint32_t jobid)5569 _waiter_create(uint32_t jobid)
5570 {
5571 	struct waiter *wp = xmalloc(sizeof(struct waiter));
5572 
5573 	wp->jobid = jobid;
5574 	wp->thd   = pthread_self();
5575 
5576 	return wp;
5577 }
5578 
_find_waiter(void * x,void * y)5579 static int _find_waiter(void *x, void *y)
5580 {
5581 	struct waiter *w = (struct waiter *)x;
5582 	uint32_t *jp = (uint32_t *)y;
5583 
5584 	return (w->jobid == *jp);
5585 }
5586 
_waiter_init(uint32_t jobid)5587 static int _waiter_init (uint32_t jobid)
5588 {
5589 	int rc = SLURM_SUCCESS;
5590 
5591 	slurm_mutex_lock(&waiter_mutex);
5592 	if (!waiters)
5593 		waiters = list_create(xfree_ptr);
5594 
5595 	/*
5596 	 *  Exit this thread if another thread is waiting on job
5597 	 */
5598 	if (list_find_first(waiters, _find_waiter, &jobid))
5599 		rc = SLURM_ERROR;
5600 	else
5601 		list_append(waiters, _waiter_create(jobid));
5602 
5603 	slurm_mutex_unlock(&waiter_mutex);
5604 
5605 	return rc;
5606 }
5607 
_waiter_complete(uint32_t jobid)5608 static int _waiter_complete (uint32_t jobid)
5609 {
5610 	int rc = 0;
5611 
5612 	slurm_mutex_lock(&waiter_mutex);
5613 	if (waiters)
5614 		rc = list_delete_all(waiters, _find_waiter, &jobid);
5615 	slurm_mutex_unlock(&waiter_mutex);
5616 
5617 	return rc;
5618 }
5619 
5620 /*
5621  *  Like _wait_for_procs(), but only wait for up to max_time seconds
5622  *  if max_time == 0, send SIGKILL to tasks repeatedly
5623  *
5624  *  Returns true if all job processes are gone
5625  */
5626 static bool
_pause_for_job_completion(uint32_t job_id,char * nodes,int max_time)5627 _pause_for_job_completion (uint32_t job_id, char *nodes, int max_time)
5628 {
5629 	int sec = 0;
5630 	int pause = 1;
5631 	bool rc = false;
5632 	int count = 0;
5633 
5634 	while ((sec < max_time) || (max_time == 0)) {
5635 		rc = _job_still_running (job_id);
5636 		if (!rc)
5637 			break;
5638 		if ((max_time == 0) && (sec > 1)) {
5639 			_terminate_all_steps(job_id, true);
5640 		}
5641 		if (sec > 10) {
5642 			/* Reduce logging frequency about unkillable tasks */
5643 			if (max_time)
5644 				pause = MIN((max_time - sec), 10);
5645 			else
5646 				pause = 10;
5647 		}
5648 
5649 		/*
5650 		 * The job will usually finish up within the first .02 sec.  If
5651 		 * not gradually increase the sleep until we get to a second.
5652 		 */
5653 		if (count == 0) {
5654 			usleep(20000);
5655 			count++;
5656 		} else if (count == 1) {
5657 			usleep(50000);
5658 			count++;
5659 		} else if (count == 2) {
5660 			usleep(100000);
5661 			count++;
5662 		} else if (count == 3) {
5663 			usleep(500000);
5664 			count++;
5665 			sec = 1;
5666 		} else {
5667 			sleep(pause);
5668 			sec += pause;
5669 		}
5670 	}
5671 
5672 	/*
5673 	 * Return true if job is NOT running
5674 	 */
5675 	return (!rc);
5676 }
5677 
5678 /*
5679  * Does nothing and returns SLURM_SUCCESS (if uid authenticates).
5680  *
5681  * Timelimit is not currently used in the slurmd or slurmstepd.
5682  */
5683 static void
_rpc_update_time(slurm_msg_t * msg)5684 _rpc_update_time(slurm_msg_t *msg)
5685 {
5686 	int   rc      = SLURM_SUCCESS;
5687 	uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
5688 
5689 	if ((req_uid != conf->slurm_user_id) && (req_uid != 0)) {
5690 		rc = ESLURM_USER_ID_MISSING;
5691 		error("Security violation, uid %d can't update time limit",
5692 		      req_uid);
5693 		goto done;
5694 	}
5695 
5696 /* 	if (shm_update_job_timelimit(req->job_id, req->expiration_time) < 0) { */
5697 /* 		error("updating lifetime for job %u: %m", req->job_id); */
5698 /* 		rc = ESLURM_INVALID_JOB_ID; */
5699 /* 	} else */
5700 /* 		debug("reset job %u lifetime", req->job_id); */
5701 
5702 done:
5703 	slurm_send_rc_msg(msg, rc);
5704 }
5705 
_free_job_env(job_env_t * env_ptr)5706 static void _free_job_env(job_env_t *env_ptr)
5707 {
5708 	int i;
5709 
5710 	if (env_ptr->gres_job_env) {
5711 		for (i = 0; env_ptr->gres_job_env[i]; i++)
5712 			xfree(env_ptr->gres_job_env[i]);
5713 		xfree(env_ptr->gres_job_env);
5714 	}
5715 	xfree(env_ptr->resv_id);
5716 	/* NOTE: spank_job_env is just a pointer without allocated memory */
5717 }
5718 
_prolog_timer(void * x)5719 static void *_prolog_timer(void *x)
5720 {
5721 	int delay_time, rc = SLURM_SUCCESS;
5722 	struct timespec ts;
5723 	struct timeval now;
5724 	slurm_msg_t msg;
5725 	job_notify_msg_t notify_req;
5726 	char srun_msg[128];
5727 	timer_struct_t *timer_struct = (timer_struct_t *) x;
5728 
5729 	delay_time = MAX(2, (timer_struct->msg_timeout - 2));
5730 	gettimeofday(&now, NULL);
5731 	ts.tv_sec = now.tv_sec + delay_time;
5732 	ts.tv_nsec = now.tv_usec * 1000;
5733 	slurm_mutex_lock(timer_struct->timer_mutex);
5734 	if (!timer_struct->prolog_fini) {
5735 		rc = pthread_cond_timedwait(timer_struct->timer_cond,
5736 					    timer_struct->timer_mutex, &ts);
5737 	}
5738 	slurm_mutex_unlock(timer_struct->timer_mutex);
5739 
5740 	if (rc != ETIMEDOUT)
5741 		return NULL;
5742 
5743 	slurm_msg_t_init(&msg);
5744 	snprintf(srun_msg, sizeof(srun_msg), "Prolog hung on node %s",
5745 		 conf->node_name);
5746 	memset(&notify_req, 0, sizeof(notify_req));
5747 	notify_req.job_id	= timer_struct->job_id;
5748 	notify_req.job_step_id	= NO_VAL;
5749 	notify_req.message	= srun_msg;
5750 	msg.msg_type	= REQUEST_JOB_NOTIFY;
5751 	msg.data	= &notify_req;
5752 	slurm_send_only_controller_msg(&msg, working_cluster_rec);
5753 	return NULL;
5754 }
5755 
5756 static int
_run_prolog(job_env_t * job_env,slurm_cred_t * cred,bool remove_running)5757 _run_prolog(job_env_t *job_env, slurm_cred_t *cred, bool remove_running)
5758 {
5759 	DEF_TIMERS;
5760 	int diff_time, rc;
5761 	time_t start_time = time(NULL);
5762 	pthread_t       timer_id;
5763 	pthread_cond_t  timer_cond  = PTHREAD_COND_INITIALIZER;
5764 	pthread_mutex_t timer_mutex = PTHREAD_MUTEX_INITIALIZER;
5765 	timer_struct_t  timer_struct;
5766 	bool prolog_fini = false;
5767 	bool script_lock = false;
5768 
5769 	if (slurmctld_conf.prolog_flags & PROLOG_FLAG_SERIAL) {
5770 		slurm_mutex_lock(&prolog_serial_mutex);
5771 		script_lock = true;
5772 	}
5773 
5774 	timer_struct.job_id      = job_env->jobid;
5775 	timer_struct.msg_timeout = slurmctld_conf.msg_timeout;
5776 	timer_struct.prolog_fini = &prolog_fini;
5777 	timer_struct.timer_cond  = &timer_cond;
5778 	timer_struct.timer_mutex = &timer_mutex;
5779 	slurm_thread_create(&timer_id, _prolog_timer, &timer_struct);
5780 
5781 	START_TIMER;
5782 
5783 	rc = prep_prolog(job_env, cred);
5784 
5785 	END_TIMER;
5786 	info("%s: run job script took %s", __func__, TIME_STR);
5787 	slurm_mutex_lock(&timer_mutex);
5788 	prolog_fini = true;
5789 	slurm_cond_broadcast(&timer_cond);
5790 	slurm_mutex_unlock(&timer_mutex);
5791 
5792 	diff_time = difftime(time(NULL), start_time);
5793 	info("%s: prolog with lock for job %u ran for %d seconds",
5794 	     __func__, job_env->jobid, diff_time);
5795 	if (diff_time >= (slurmctld_conf.msg_timeout / 2)) {
5796 		info("prolog for job %u ran for %d seconds",
5797 		     job_env->jobid, diff_time);
5798 	}
5799 
5800 	if (remove_running)
5801 		_remove_job_running_prolog(job_env->jobid);
5802 
5803 	if (timer_id)
5804 		pthread_join(timer_id, NULL);
5805 	if (script_lock)
5806 		slurm_mutex_unlock(&prolog_serial_mutex);
5807 
5808 	return rc;
5809 }
5810 
5811 static int
_run_epilog(job_env_t * job_env)5812 _run_epilog(job_env_t *job_env)
5813 {
5814 	time_t start_time = time(NULL);
5815 	int error_code, diff_time;
5816 	bool script_lock = false;
5817 
5818 	_wait_for_job_running_prolog(job_env->jobid);
5819 
5820 	if (slurmctld_conf.prolog_flags & PROLOG_FLAG_SERIAL) {
5821 		slurm_mutex_lock(&prolog_serial_mutex);
5822 		script_lock = true;
5823 	}
5824 
5825 	error_code = prep_epilog(job_env, NULL);
5826 
5827 	diff_time = difftime(time(NULL), start_time);
5828 	if (diff_time >= (slurmctld_conf.msg_timeout / 2)) {
5829 		info("epilog for job %u ran for %d seconds",
5830 		     job_env->jobid, diff_time);
5831 	}
5832 
5833 	if (script_lock)
5834 		slurm_mutex_unlock(&prolog_serial_mutex);
5835 
5836 	return error_code;
5837 }
5838 
5839 static int
_add_starting_step(uint16_t type,void * req)5840 _add_starting_step(uint16_t type, void *req)
5841 {
5842 	starting_step_t *starting_step;
5843 
5844 	/* Add the step info to a list of starting processes that
5845 	   cannot reliably be contacted. */
5846 	starting_step = xmalloc(sizeof(starting_step_t));
5847 
5848 	switch (type) {
5849 	case LAUNCH_BATCH_JOB:
5850 		starting_step->job_id =
5851 			((batch_job_launch_msg_t *)req)->job_id;
5852 		starting_step->step_id =
5853 			((batch_job_launch_msg_t *)req)->step_id;
5854 		break;
5855 	case LAUNCH_TASKS:
5856 		starting_step->job_id =
5857 			((launch_tasks_request_msg_t *)req)->job_id;
5858 		starting_step->step_id =
5859 			((launch_tasks_request_msg_t *)req)->job_step_id;
5860 		break;
5861 	case REQUEST_LAUNCH_PROLOG:
5862 		starting_step->job_id  = ((prolog_launch_msg_t *)req)->job_id;
5863 		starting_step->step_id = SLURM_EXTERN_CONT;
5864 		break;
5865 	default:
5866 		error("%s called with an invalid type: %u", __func__, type);
5867 		xfree(starting_step);
5868 		return SLURM_ERROR;
5869 	}
5870 
5871 	list_append(conf->starting_steps, starting_step);
5872 
5873 	return SLURM_SUCCESS;
5874 }
5875 
5876 
5877 static int
_remove_starting_step(uint16_t type,void * req)5878 _remove_starting_step(uint16_t type, void *req)
5879 {
5880 	starting_step_t starting_step;
5881 	int rc = SLURM_SUCCESS;
5882 
5883 	switch(type) {
5884 	case LAUNCH_BATCH_JOB:
5885 		starting_step.job_id =
5886 			((batch_job_launch_msg_t *)req)->job_id;
5887 		starting_step.step_id =
5888 			((batch_job_launch_msg_t *)req)->step_id;
5889 		break;
5890 	case LAUNCH_TASKS:
5891 		starting_step.job_id =
5892 			((launch_tasks_request_msg_t *)req)->job_id;
5893 		starting_step.step_id =
5894 			((launch_tasks_request_msg_t *)req)->job_step_id;
5895 		break;
5896 	default:
5897 		error("%s called with an invalid type: %u", __func__, type);
5898 		rc = SLURM_ERROR;
5899 		goto fail;
5900 	}
5901 
5902 	if (!list_delete_all(conf->starting_steps,
5903 			     _compare_starting_steps,
5904 			     &starting_step)) {
5905 		error("%s: step %u.%u not found", __func__,
5906 		      starting_step.job_id,
5907 		      starting_step.step_id);
5908 		rc = SLURM_ERROR;
5909 	}
5910 	slurm_cond_broadcast(&conf->starting_steps_cond);
5911 fail:
5912 	return rc;
5913 }
5914 
5915 
5916 
_compare_starting_steps(void * listentry,void * key)5917 static int _compare_starting_steps(void *listentry, void *key)
5918 {
5919 	starting_step_t *step0 = (starting_step_t *)listentry;
5920 	starting_step_t *step1 = (starting_step_t *)key;
5921 
5922 	return (step0->job_id == step1->job_id &&
5923 		step0->step_id == step1->step_id);
5924 }
5925 
5926 
5927 /* Wait for a step to get far enough in the launch process to have
5928    a socket open, ready to handle RPC calls.  Pass step_id = NO_VAL
5929    to wait on any step for the given job. */
5930 
_wait_for_starting_step(uint32_t job_id,uint32_t step_id)5931 static int _wait_for_starting_step(uint32_t job_id, uint32_t step_id)
5932 {
5933 	static pthread_mutex_t dummy_lock = PTHREAD_MUTEX_INITIALIZER;
5934 	struct timespec ts = {0, 0};
5935 	struct timeval now;
5936 
5937 	starting_step_t starting_step;
5938 	int num_passes = 0;
5939 
5940 	starting_step.job_id  = job_id;
5941 	starting_step.step_id = step_id;
5942 
5943 	while (list_find_first(conf->starting_steps,
5944 			       _compare_starting_steps,
5945 			       &starting_step)) {
5946 		if (num_passes == 0) {
5947 			if (step_id != NO_VAL)
5948 				debug("Blocked waiting for step %d.%d",
5949 				      job_id, step_id);
5950 			else
5951 				debug("Blocked waiting for job %d, all steps",
5952 				      job_id);
5953 		}
5954 		num_passes++;
5955 
5956 		gettimeofday(&now, NULL);
5957 		ts.tv_sec = now.tv_sec+1;
5958 		ts.tv_nsec = now.tv_usec * 1000;
5959 
5960 		slurm_mutex_lock(&dummy_lock);
5961 		slurm_cond_timedwait(&conf->starting_steps_cond,
5962 				     &dummy_lock, &ts);
5963 		slurm_mutex_unlock(&dummy_lock);
5964 	}
5965 	if (num_passes > 0) {
5966 		if (step_id != NO_VAL)
5967 			debug("Finished wait for step %d.%d",
5968 			      job_id, step_id);
5969 		else
5970 			debug("Finished wait for job %d, all steps",
5971 			      job_id);
5972 	}
5973 
5974 	return SLURM_SUCCESS;
5975 }
5976 
5977 
5978 /* Return true if the step has not yet confirmed that its socket to
5979    handle RPC calls has been created.  Pass step_id = NO_VAL
5980    to return true if any of the job's steps are still starting. */
_step_is_starting(uint32_t job_id,uint32_t step_id)5981 static bool _step_is_starting(uint32_t job_id, uint32_t step_id)
5982 {
5983 	starting_step_t  starting_step;
5984 	starting_step.job_id  = job_id;
5985 	starting_step.step_id = step_id;
5986 	bool ret = false;
5987 
5988 	if (list_find_first(conf->starting_steps,
5989 			    _compare_starting_steps,
5990 			    &starting_step )) {
5991 		ret = true;
5992 	}
5993 
5994 	return ret;
5995 }
5996 
5997 /* Add this job to the list of jobs currently running their prolog */
_add_job_running_prolog(uint32_t job_id)5998 static void _add_job_running_prolog(uint32_t job_id)
5999 {
6000 	uint32_t *job_running_prolog;
6001 
6002 	/* Add the job to a list of jobs whose prologs are running */
6003 	job_running_prolog = xmalloc(sizeof(uint32_t));
6004 	*job_running_prolog = job_id;
6005 
6006 	list_append(conf->prolog_running_jobs, job_running_prolog);
6007 }
6008 
6009 /* Remove this job from the list of jobs currently running their prolog */
_remove_job_running_prolog(uint32_t job_id)6010 static void _remove_job_running_prolog(uint32_t job_id)
6011 {
6012 	if (!list_delete_all(conf->prolog_running_jobs,
6013 			     _match_jobid, &job_id))
6014 		error("_remove_job_running_prolog: job not found");
6015 	slurm_cond_broadcast(&conf->prolog_running_cond);
6016 }
6017 
_match_jobid(void * listentry,void * key)6018 static int _match_jobid(void *listentry, void *key)
6019 {
6020 	uint32_t *job0 = (uint32_t *)listentry;
6021 	uint32_t *job1 = (uint32_t *)key;
6022 
6023 	return (*job0 == *job1);
6024 }
6025 
_prolog_is_running(uint32_t jobid)6026 static int _prolog_is_running (uint32_t jobid)
6027 {
6028 	int rc = 0;
6029 	if (conf->prolog_running_jobs &&
6030 	    list_find_first(conf->prolog_running_jobs,
6031 			    _match_jobid, &jobid))
6032 		rc = 1;
6033 	return (rc);
6034 }
6035 
6036 /* Wait for the job's prolog to complete */
_wait_for_job_running_prolog(uint32_t job_id)6037 static void _wait_for_job_running_prolog(uint32_t job_id)
6038 {
6039 	static pthread_mutex_t dummy_lock = PTHREAD_MUTEX_INITIALIZER;
6040 	struct timespec ts = {0, 0};
6041 	struct timeval now;
6042 
6043 	debug("Waiting for job %d's prolog to complete", job_id);
6044 
6045 	while (_prolog_is_running (job_id)) {
6046 
6047 		gettimeofday(&now, NULL);
6048 		ts.tv_sec = now.tv_sec+1;
6049 		ts.tv_nsec = now.tv_usec * 1000;
6050 
6051 		slurm_mutex_lock(&dummy_lock);
6052 		slurm_cond_timedwait(&conf->prolog_running_cond,
6053 				     &dummy_lock, &ts);
6054 		slurm_mutex_unlock(&dummy_lock);
6055 	}
6056 
6057 	debug("Finished wait for job %d's prolog to complete", job_id);
6058 }
6059 
6060 
6061 static void
_rpc_forward_data(slurm_msg_t * msg)6062 _rpc_forward_data(slurm_msg_t *msg)
6063 {
6064 	forward_data_msg_t *req = (forward_data_msg_t *)msg->data;
6065 	uint32_t req_uid = (uint32_t) g_slurm_auth_get_uid(msg->auth_cred);
6066 	struct sockaddr_un sa;
6067 	int fd = -1, rc = 0;
6068 
6069 	/* Make sure we adjust for the spool dir coming in on the address to
6070 	 * point to the right spot.
6071 	 */
6072 	xstrsubstitute(req->address, "%n", conf->node_name);
6073 	xstrsubstitute(req->address, "%h", conf->node_name);
6074 	debug3("Entering _rpc_forward_data, address: %s, len: %u",
6075 	       req->address, req->len);
6076 
6077 	/*
6078 	 * If socket name would be truncated, emit error and exit
6079 	 */
6080 	if (strlen(req->address) >= sizeof(sa.sun_path)) {
6081 		error("%s: Unix socket path '%s' is too long. (%ld > %ld)",
6082 		      __func__, req->address,
6083 		      (long int)(strlen(req->address) + 1),
6084 		      (long int)sizeof(sa.sun_path));
6085 		slurm_seterrno(EINVAL);
6086 		rc = errno;
6087 		goto done;
6088 	}
6089 
6090 	/* connect to specified address */
6091 	fd = socket(AF_UNIX, SOCK_STREAM, 0);
6092 	if (fd < 0) {
6093 		rc = errno;
6094 		error("failed creating UNIX domain socket: %m");
6095 		goto done;
6096 	}
6097 	memset(&sa, 0, sizeof(sa));
6098 	sa.sun_family = AF_UNIX;
6099 	strcpy(sa.sun_path, req->address);
6100 	while (((rc = connect(fd, (struct sockaddr *)&sa, SUN_LEN(&sa))) < 0) &&
6101 	       (errno == EINTR));
6102 	if (rc < 0) {
6103 		rc = errno;
6104 		debug2("failed connecting to specified socket '%s': %m",
6105 		       req->address);
6106 		goto done;
6107 	}
6108 
6109 	/*
6110 	 * although always in localhost, we still convert it to network
6111 	 * byte order, to make it consistent with pack/unpack.
6112 	 */
6113 	req_uid = htonl(req_uid);
6114 	safe_write(fd, &req_uid, sizeof(uint32_t));
6115 	req_uid = htonl(req->len);
6116 	safe_write(fd, &req_uid, sizeof(uint32_t));
6117 	safe_write(fd, req->data, req->len);
6118 
6119 rwfail:
6120 done:
6121 	if (fd >= 0){
6122 		close(fd);
6123 	}
6124 	slurm_send_rc_msg(msg, rc);
6125 }
6126 
_launch_complete_add(uint32_t job_id)6127 static void _launch_complete_add(uint32_t job_id)
6128 {
6129 	int j, empty;
6130 
6131 	slurm_mutex_lock(&job_state_mutex);
6132 	empty = -1;
6133 	for (j = 0; j < JOB_STATE_CNT; j++) {
6134 		if (job_id == active_job_id[j])
6135 			break;
6136 		if ((active_job_id[j] == 0) && (empty == -1))
6137 			empty = j;
6138 	}
6139 	if (j >= JOB_STATE_CNT || job_id != active_job_id[j]) {
6140 		if (empty == -1)	/* Discard oldest job */
6141 			empty = 0;
6142 		for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6143 			active_job_id[j - 1] = active_job_id[j];
6144 		}
6145 		active_job_id[JOB_STATE_CNT - 1] = 0;
6146 		for (j = 0; j < JOB_STATE_CNT; j++) {
6147 			if (active_job_id[j] == 0) {
6148 				active_job_id[j] = job_id;
6149 				break;
6150 			}
6151 		}
6152 	}
6153 	slurm_cond_signal(&job_state_cond);
6154 	slurm_mutex_unlock(&job_state_mutex);
6155 	_launch_complete_log("job add", job_id);
6156 }
6157 
_launch_complete_log(char * type,uint32_t job_id)6158 static void _launch_complete_log(char *type, uint32_t job_id)
6159 {
6160 #if 0
6161 	int j;
6162 
6163 	info("active %s %u", type, job_id);
6164 	slurm_mutex_lock(&job_state_mutex);
6165 	for (j = 0; j < JOB_STATE_CNT; j++) {
6166 		if (active_job_id[j] != 0) {
6167 			info("active_job_id[%d]=%u", j, active_job_id[j]);
6168 		}
6169 	}
6170 	slurm_mutex_unlock(&job_state_mutex);
6171 #endif
6172 }
6173 
6174 /* Test if we have a specific job ID still running */
_launch_job_test(uint32_t job_id)6175 static bool _launch_job_test(uint32_t job_id)
6176 {
6177 	bool found = false;
6178 	int j;
6179 
6180 	slurm_mutex_lock(&job_state_mutex);
6181 	for (j = 0; j < JOB_STATE_CNT; j++) {
6182 		if (job_id == active_job_id[j]) {
6183 			found = true;
6184 			break;
6185 		}
6186 	}
6187 	slurm_mutex_unlock(&job_state_mutex);
6188 	return found;
6189 }
6190 
6191 
_launch_complete_rm(uint32_t job_id)6192 static void _launch_complete_rm(uint32_t job_id)
6193 {
6194 	int j;
6195 
6196 	slurm_mutex_lock(&job_state_mutex);
6197 	for (j = 0; j < JOB_STATE_CNT; j++) {
6198 		if (job_id == active_job_id[j])
6199 			break;
6200 	}
6201 	if (j < JOB_STATE_CNT && job_id == active_job_id[j]) {
6202 		for (j = j + 1; j < JOB_STATE_CNT; j++) {
6203 			active_job_id[j - 1] = active_job_id[j];
6204 		}
6205 		active_job_id[JOB_STATE_CNT - 1] = 0;
6206 	}
6207 	slurm_mutex_unlock(&job_state_mutex);
6208 	_launch_complete_log("job remove", job_id);
6209 }
6210 
_launch_complete_wait(uint32_t job_id)6211 static void _launch_complete_wait(uint32_t job_id)
6212 {
6213 	int i, j, empty;
6214 	time_t start = time(NULL);
6215 	struct timeval now;
6216 	struct timespec timeout;
6217 
6218 	slurm_mutex_lock(&job_state_mutex);
6219 	for (i = 0; ; i++) {
6220 		empty = -1;
6221 		for (j = 0; j < JOB_STATE_CNT; j++) {
6222 			if (job_id == active_job_id[j])
6223 				break;
6224 			if ((active_job_id[j] == 0) && (empty == -1))
6225 				empty = j;
6226 		}
6227 		if (j < JOB_STATE_CNT)	/* Found job, ready to return */
6228 			break;
6229 		if (difftime(time(NULL), start) <= 9) {  /* Retry for 9 secs */
6230 			debug2("wait for launch of job %u before suspending it",
6231 			       job_id);
6232 			gettimeofday(&now, NULL);
6233 			timeout.tv_sec  = now.tv_sec + 1;
6234 			timeout.tv_nsec = now.tv_usec * 1000;
6235 			slurm_cond_timedwait(&job_state_cond,&job_state_mutex,
6236 					     &timeout);
6237 			continue;
6238 		}
6239 		if (empty == -1)	/* Discard oldest job */
6240 			empty = 0;
6241 		for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6242 			active_job_id[j - 1] = active_job_id[j];
6243 		}
6244 		active_job_id[JOB_STATE_CNT - 1] = 0;
6245 		for (j = 0; j < JOB_STATE_CNT; j++) {
6246 			if (active_job_id[j] == 0) {
6247 				active_job_id[j] = job_id;
6248 				break;
6249 			}
6250 		}
6251 		break;
6252 	}
6253 	slurm_mutex_unlock(&job_state_mutex);
6254 	_launch_complete_log("job wait", job_id);
6255 }
6256 
6257 static bool
_requeue_setup_env_fail(void)6258 _requeue_setup_env_fail(void)
6259 {
6260 	static time_t config_update = 0;
6261 	static bool requeue = false;
6262 
6263 	if (config_update != conf->last_update) {
6264 		char *sched_params = slurm_get_sched_params();
6265 		requeue = ((xstrcasestr(sched_params, "no_env_cache") ||
6266 			    xstrcasestr(sched_params,
6267 					"requeue_setup_env_fail")));
6268 		xfree(sched_params);
6269 		config_update = conf->last_update;
6270 	}
6271 
6272 	return requeue;
6273 }
6274