1 /*****************************************************************************\
2 * src/slurmd/slurmd/req.c - slurmd request handling
3 *****************************************************************************
4 * Copyright (C) 2002-2007 The Regents of the University of California.
5 * Copyright (C) 2008-2010 Lawrence Livermore National Security.
6 * Portions Copyright (C) 2010-2016 SchedMD LLC.
7 * Portions copyright (C) 2015 Mellanox Technologies Inc.
8 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
9 * Written by Mark Grondona <mgrondona@llnl.gov>.
10 * CODE-OCEC-09-009. All rights reserved.
11 *
12 * This file is part of Slurm, a resource management program.
13 * For details, see <https://slurm.schedmd.com/>.
14 * Please also read the included file: DISCLAIMER.
15 *
16 * Slurm is free software; you can redistribute it and/or modify it under
17 * the terms of the GNU General Public License as published by the Free
18 * Software Foundation; either version 2 of the License, or (at your option)
19 * any later version.
20 *
21 * In addition, as a special exception, the copyright holders give permission
22 * to link the code of portions of this program with the OpenSSL library under
23 * certain conditions as described in each individual source file, and
24 * distribute linked combinations including the two. You must obey the GNU
25 * General Public License in all respects for all of the code used other than
26 * OpenSSL. If you modify file(s) with this exception, you may extend this
27 * exception to your version of the file(s), but you are not obligated to do
28 * so. If you do not wish to do so, delete this exception statement from your
29 * version. If you delete this exception statement from all source files in
30 * the program, then also delete it here.
31 *
32 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
33 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
34 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
35 * details.
36 *
37 * You should have received a copy of the GNU General Public License along
38 * with Slurm; if not, write to the Free Software Foundation, Inc.,
39 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
40 \*****************************************************************************/
41
42 #include "config.h"
43
44 #include <ctype.h>
45 #include <fcntl.h>
46 #include <grp.h>
47 #ifdef HAVE_NUMA
48 #undef NUMA_VERSION1_COMPATIBILITY
49 #include <numa.h>
50 #endif
51 #include <poll.h>
52 #include <pthread.h>
53 #include <sched.h>
54 #include <signal.h>
55 #include <stdlib.h>
56 #include <string.h>
57 #include <sys/param.h>
58 #include <sys/stat.h>
59 #include <sys/types.h>
60 #include <sys/un.h>
61 #include <sys/wait.h>
62 #include <time.h>
63 #include <unistd.h>
64 #include <utime.h>
65
66 #include "src/common/assoc_mgr.h"
67 #include "src/common/callerid.h"
68 #include "src/common/cpu_frequency.h"
69 #include "src/common/env.h"
70 #include "src/common/fd.h"
71 #include "src/common/fetch_config.h"
72 #include "src/common/forward.h"
73 #include "src/common/gres.h"
74 #include "src/common/group_cache.h"
75 #include "src/common/hostlist.h"
76 #include "src/common/list.h"
77 #include "src/common/log.h"
78 #include "src/common/macros.h"
79 #include "src/common/msg_aggr.h"
80 #include "src/common/node_features.h"
81 #include "src/common/node_select.h"
82 #include "src/common/plugstack.h"
83 #include "src/common/prep.h"
84 #include "src/common/read_config.h"
85 #include "src/common/slurm_auth.h"
86 #include "src/common/slurm_cred.h"
87 #include "src/common/slurm_acct_gather_energy.h"
88 #include "src/common/slurm_jobacct_gather.h"
89 #include "src/common/slurm_protocol_defs.h"
90 #include "src/common/slurm_protocol_api.h"
91 #include "src/common/slurm_protocol_interface.h"
92 #include "src/common/stepd_api.h"
93 #include "src/common/uid.h"
94 #include "src/common/util-net.h"
95 #include "src/common/xcgroup_read_config.h"
96 #include "src/common/xstring.h"
97 #include "src/common/xmalloc.h"
98
99 #include "src/bcast/file_bcast.h"
100
101 #include "src/slurmd/slurmd/get_mach_stat.h"
102 #include "src/slurmd/slurmd/slurmd.h"
103
104 #include "src/slurmd/common/fname.h"
105 #include "src/slurmd/common/job_container_plugin.h"
106 #include "src/slurmd/common/proctrack.h"
107 #include "src/slurmd/common/run_script.h"
108 #include "src/slurmd/common/reverse_tree_math.h"
109 #include "src/slurmd/common/slurmstepd_init.h"
110 #include "src/slurmd/common/task_plugin.h"
111
112 #define _LIMIT_INFO 0
113
114 #define RETRY_DELAY 15 /* retry every 15 seconds */
115 #define MAX_RETRY 240 /* retry 240 times (one hour max) */
116
117 #ifndef MAXHOSTNAMELEN
118 #define MAXHOSTNAMELEN 64
119 #endif
120
121 #define MAX_CPU_CNT 1024
122 #define MAX_NUMA_CNT 128
123
124 typedef struct {
125 uint32_t job_id;
126 uint32_t step_id;
127 uint64_t job_mem;
128 uint64_t step_mem;
129 } job_mem_limits_t;
130
131 typedef struct {
132 uint32_t job_id;
133 uint32_t step_id;
134 } starting_step_t;
135
136 typedef struct {
137 uint32_t job_id;
138 uint16_t msg_timeout;
139 bool *prolog_fini;
140 pthread_cond_t *timer_cond;
141 pthread_mutex_t *timer_mutex;
142 } timer_struct_t;
143
144 static int _abort_step(uint32_t job_id, uint32_t step_id);
145 static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc);
146 static void _free_job_env(job_env_t *env_ptr);
147 static bool _is_batch_job_finished(uint32_t job_id);
148 static int _job_limits_match(void *x, void *key);
149 static bool _job_still_running(uint32_t job_id);
150 static int _kill_all_active_steps(uint32_t jobid, int sig,
151 int flags, bool batch, uid_t req_uid);
152 static void _launch_complete_add(uint32_t job_id);
153 static void _launch_complete_log(char *type, uint32_t job_id);
154 static void _launch_complete_rm(uint32_t job_id);
155 static void _launch_complete_wait(uint32_t job_id);
156 static int _launch_job_fail(uint32_t job_id, uint32_t slurm_rc);
157 static bool _launch_job_test(uint32_t job_id);
158 static void _note_batch_job_finished(uint32_t job_id);
159 static int _prolog_is_running (uint32_t jobid);
160 static int _step_limits_match(void *x, void *key);
161 static int _terminate_all_steps(uint32_t jobid, bool batch);
162 static void _rpc_launch_tasks(slurm_msg_t *);
163 static void _rpc_abort_job(slurm_msg_t *);
164 static void _rpc_batch_job(slurm_msg_t *msg, bool new_msg);
165 static void _rpc_prolog(slurm_msg_t *msg);
166 static void _rpc_job_notify(slurm_msg_t *);
167 static void _rpc_signal_tasks(slurm_msg_t *);
168 static void _rpc_complete_batch(slurm_msg_t *);
169 static void _rpc_terminate_tasks(slurm_msg_t *);
170 static void _rpc_timelimit(slurm_msg_t *);
171 static void _rpc_reattach_tasks(slurm_msg_t *);
172 static void _rpc_suspend_job(slurm_msg_t *msg);
173 static void _rpc_terminate_job(slurm_msg_t *);
174 static void _rpc_update_time(slurm_msg_t *);
175 static void _rpc_shutdown(slurm_msg_t *msg);
176 static void _rpc_reconfig(slurm_msg_t *msg);
177 static void _rpc_reconfig_with_config(slurm_msg_t *msg);
178 static void _rpc_reboot(slurm_msg_t *msg);
179 static void _rpc_pid2jid(slurm_msg_t *msg);
180 static int _rpc_file_bcast(slurm_msg_t *msg);
181 static void _file_bcast_cleanup(void);
182 static int _file_bcast_register_file(slurm_msg_t *msg,
183 sbcast_cred_arg_t *cred_arg,
184 file_bcast_info_t *key);
185 static int _rpc_ping(slurm_msg_t *);
186 static int _rpc_health_check(slurm_msg_t *);
187 static int _rpc_acct_gather_update(slurm_msg_t *);
188 static int _rpc_acct_gather_energy(slurm_msg_t *);
189 static int _rpc_step_complete(slurm_msg_t *msg);
190 static int _rpc_step_complete_aggr(slurm_msg_t *msg);
191 static int _rpc_stat_jobacct(slurm_msg_t *msg);
192 static int _rpc_list_pids(slurm_msg_t *msg);
193 static int _rpc_daemon_status(slurm_msg_t *msg);
194 static int _run_epilog(job_env_t *job_env);
195 static int _run_prolog(job_env_t *job_env, slurm_cred_t *cred,
196 bool remove_running);
197 static void _rpc_forward_data(slurm_msg_t *msg);
198 static int _rpc_network_callerid(slurm_msg_t *msg);
199
200 static bool _pause_for_job_completion(uint32_t jobid, char *nodes,
201 int maxtime);
202 static bool _slurm_authorized_user(uid_t uid);
203 static void _sync_messages_kill(kill_job_msg_t *req);
204 static int _waiter_init (uint32_t jobid);
205 static int _waiter_complete (uint32_t jobid);
206
207 static bool _steps_completed_now(uint32_t jobid);
208 static sbcast_cred_arg_t *_valid_sbcast_cred(file_bcast_msg_t *req,
209 uid_t req_uid,
210 gid_t req_gid,
211 uint16_t protocol_version);
212 static void _wait_state_completed(uint32_t jobid, int max_delay);
213 static uid_t _get_job_uid(uint32_t jobid);
214
215 static int _add_starting_step(uint16_t type, void *req);
216 static int _remove_starting_step(uint16_t type, void *req);
217 static int _compare_starting_steps(void *s0, void *s1);
218 static int _wait_for_starting_step(uint32_t job_id, uint32_t step_id);
219 static bool _step_is_starting(uint32_t job_id, uint32_t step_id);
220
221 static void _add_job_running_prolog(uint32_t job_id);
222 static void _remove_job_running_prolog(uint32_t job_id);
223 static int _match_jobid(void *s0, void *s1);
224 static void _wait_for_job_running_prolog(uint32_t job_id);
225 static bool _requeue_setup_env_fail(void);
226
227 /*
228 * List of threads waiting for jobs to complete
229 */
230 static List waiters;
231
232 static pthread_mutex_t launch_mutex = PTHREAD_MUTEX_INITIALIZER;
233 static time_t startup = 0; /* daemon startup time */
234 static time_t last_slurmctld_msg = 0;
235
236 static pthread_mutex_t job_limits_mutex = PTHREAD_MUTEX_INITIALIZER;
237 static List job_limits_list = NULL;
238 static bool job_limits_loaded = false;
239
240 static int next_fini_job_inx = 0;
241
242 /* NUM_PARALLEL_SUSP_JOBS controls the number of jobs that can be suspended or
243 * resumed at one time. */
244 #define NUM_PARALLEL_SUSP_JOBS 64
245 /* NUM_PARALLEL_SUSP_STEPS controls the number of steps per job that can be
246 * suspended at one time. */
247 #define NUM_PARALLEL_SUSP_STEPS 8
248 static pthread_mutex_t suspend_mutex = PTHREAD_MUTEX_INITIALIZER;
249 static uint32_t job_suspend_array[NUM_PARALLEL_SUSP_JOBS] = {0};
250 static int job_suspend_size = 0;
251
252 #define JOB_STATE_CNT 64
253 static pthread_mutex_t job_state_mutex = PTHREAD_MUTEX_INITIALIZER;
254 static pthread_cond_t job_state_cond = PTHREAD_COND_INITIALIZER;
255 static uint32_t active_job_id[JOB_STATE_CNT] = {0};
256
257 static pthread_mutex_t prolog_mutex = PTHREAD_MUTEX_INITIALIZER;
258 static pthread_mutex_t prolog_serial_mutex = PTHREAD_MUTEX_INITIALIZER;
259
260 #define FILE_BCAST_TIMEOUT 300
261 static pthread_mutex_t file_bcast_mutex = PTHREAD_MUTEX_INITIALIZER;
262 static pthread_cond_t file_bcast_cond = PTHREAD_COND_INITIALIZER;
263 static int fb_read_lock = 0, fb_write_wait_lock = 0, fb_write_lock = 0;
264 static List file_bcast_list = NULL;
265
266 static pthread_mutex_t waiter_mutex = PTHREAD_MUTEX_INITIALIZER;
267
268 void
slurmd_req(slurm_msg_t * msg)269 slurmd_req(slurm_msg_t *msg)
270 {
271 int rc;
272
273 if (msg == NULL) {
274 if (startup == 0)
275 startup = time(NULL);
276 slurm_mutex_lock(&waiter_mutex);
277 FREE_NULL_LIST(waiters);
278 slurm_mutex_unlock(&waiter_mutex);
279 slurm_mutex_lock(&job_limits_mutex);
280 if (job_limits_list) {
281 FREE_NULL_LIST(job_limits_list);
282 job_limits_loaded = false;
283 }
284 slurm_mutex_unlock(&job_limits_mutex);
285 return;
286 }
287
288 switch (msg->msg_type) {
289 case REQUEST_LAUNCH_PROLOG:
290 debug2("Processing RPC: REQUEST_LAUNCH_PROLOG");
291 _rpc_prolog(msg);
292 last_slurmctld_msg = time(NULL);
293 break;
294 case REQUEST_BATCH_JOB_LAUNCH:
295 debug2("Processing RPC: REQUEST_BATCH_JOB_LAUNCH");
296 /* Mutex locking moved into _rpc_batch_job() due to
297 * very slow prolog on Blue Gene system. Only batch
298 * jobs are supported on Blue Gene (no job steps). */
299 _rpc_batch_job(msg, true);
300 last_slurmctld_msg = time(NULL);
301 break;
302 case REQUEST_LAUNCH_TASKS:
303 debug2("Processing RPC: REQUEST_LAUNCH_TASKS");
304 slurm_mutex_lock(&launch_mutex);
305 _rpc_launch_tasks(msg);
306 slurm_mutex_unlock(&launch_mutex);
307 break;
308 case REQUEST_SIGNAL_TASKS:
309 debug2("Processing RPC: REQUEST_SIGNAL_TASKS");
310 _rpc_signal_tasks(msg);
311 break;
312 case REQUEST_TERMINATE_TASKS:
313 debug2("Processing RPC: REQUEST_TERMINATE_TASKS");
314 _rpc_terminate_tasks(msg);
315 break;
316 case REQUEST_KILL_PREEMPTED:
317 debug2("Processing RPC: REQUEST_KILL_PREEMPTED");
318 last_slurmctld_msg = time(NULL);
319 _rpc_timelimit(msg);
320 break;
321 case REQUEST_KILL_TIMELIMIT:
322 debug2("Processing RPC: REQUEST_KILL_TIMELIMIT");
323 last_slurmctld_msg = time(NULL);
324 _rpc_timelimit(msg);
325 break;
326 case REQUEST_REATTACH_TASKS:
327 debug2("Processing RPC: REQUEST_REATTACH_TASKS");
328 _rpc_reattach_tasks(msg);
329 break;
330 case REQUEST_SUSPEND_INT:
331 debug2("Processing RPC: REQUEST_SUSPEND_INT");
332 _rpc_suspend_job(msg);
333 last_slurmctld_msg = time(NULL);
334 break;
335 case REQUEST_ABORT_JOB:
336 debug2("Processing RPC: REQUEST_ABORT_JOB");
337 last_slurmctld_msg = time(NULL);
338 _rpc_abort_job(msg);
339 break;
340 case REQUEST_TERMINATE_JOB:
341 debug2("Processing RPC: REQUEST_TERMINATE_JOB");
342 last_slurmctld_msg = time(NULL);
343 _rpc_terminate_job(msg);
344 break;
345 case REQUEST_COMPLETE_BATCH_SCRIPT:
346 debug2("Processing RPC: REQUEST_COMPLETE_BATCH_SCRIPT");
347 _rpc_complete_batch(msg);
348 break;
349 case REQUEST_UPDATE_JOB_TIME:
350 debug2("Processing RPC: REQUEST_UPDATE_JOB_TIME");
351 _rpc_update_time(msg);
352 last_slurmctld_msg = time(NULL);
353 break;
354 case REQUEST_SHUTDOWN:
355 debug2("Processing RPC: REQUEST_SHUTDOWN");
356 _rpc_shutdown(msg);
357 break;
358 case REQUEST_RECONFIGURE:
359 debug2("Processing RPC: REQUEST_RECONFIGURE");
360 _rpc_reconfig(msg);
361 last_slurmctld_msg = time(NULL);
362 break;
363 case REQUEST_RECONFIGURE_WITH_CONFIG:
364 debug2("Processing RPC: REQUEST_RECONFIGURE_WITH_CONFIG");
365 _rpc_reconfig_with_config(msg);
366 last_slurmctld_msg = time(NULL);
367 break;
368 case REQUEST_REBOOT_NODES:
369 debug2("Processing RPC: REQUEST_REBOOT_NODES");
370 _rpc_reboot(msg);
371 break;
372 case REQUEST_NODE_REGISTRATION_STATUS:
373 debug2("Processing RPC: REQUEST_NODE_REGISTRATION_STATUS");
374 get_reg_resp = 1;
375 /* Treat as ping (for slurmctld agent, just return SUCCESS) */
376 rc = _rpc_ping(msg);
377 last_slurmctld_msg = time(NULL);
378 /* Then initiate a separate node registration */
379 if (rc == SLURM_SUCCESS)
380 send_registration_msg(SLURM_SUCCESS, true);
381 break;
382 case REQUEST_PING:
383 _rpc_ping(msg);
384 last_slurmctld_msg = time(NULL);
385 break;
386 case REQUEST_HEALTH_CHECK:
387 debug2("Processing RPC: REQUEST_HEALTH_CHECK");
388 _rpc_health_check(msg);
389 last_slurmctld_msg = time(NULL);
390 break;
391 case REQUEST_ACCT_GATHER_UPDATE:
392 debug2("Processing RPC: REQUEST_ACCT_GATHER_UPDATE");
393 _rpc_acct_gather_update(msg);
394 last_slurmctld_msg = time(NULL);
395 break;
396 case REQUEST_ACCT_GATHER_ENERGY:
397 debug2("Processing RPC: REQUEST_ACCT_GATHER_ENERGY");
398 _rpc_acct_gather_energy(msg);
399 break;
400 case REQUEST_JOB_ID:
401 _rpc_pid2jid(msg);
402 break;
403 case REQUEST_FILE_BCAST:
404 rc = _rpc_file_bcast(msg);
405 slurm_send_rc_msg(msg, rc);
406 break;
407 case REQUEST_STEP_COMPLETE:
408 (void) _rpc_step_complete(msg);
409 break;
410 case REQUEST_STEP_COMPLETE_AGGR:
411 (void) _rpc_step_complete_aggr(msg);
412 break;
413 case REQUEST_JOB_STEP_STAT:
414 (void) _rpc_stat_jobacct(msg);
415 break;
416 case REQUEST_JOB_STEP_PIDS:
417 (void) _rpc_list_pids(msg);
418 break;
419 case REQUEST_DAEMON_STATUS:
420 _rpc_daemon_status(msg);
421 break;
422 case REQUEST_JOB_NOTIFY:
423 _rpc_job_notify(msg);
424 break;
425 case REQUEST_FORWARD_DATA:
426 _rpc_forward_data(msg);
427 break;
428 case REQUEST_NETWORK_CALLERID:
429 debug2("Processing RPC: REQUEST_NETWORK_CALLERID");
430 _rpc_network_callerid(msg);
431 break;
432 case MESSAGE_COMPOSITE:
433 error("Processing RPC: MESSAGE_COMPOSITE: "
434 "This should never happen");
435 msg_aggr_add_msg(msg, 0, NULL);
436 break;
437 case RESPONSE_MESSAGE_COMPOSITE:
438 debug2("Processing RPC: RESPONSE_MESSAGE_COMPOSITE");
439 msg_aggr_resp(msg);
440 break;
441 default:
442 error("slurmd_req: invalid request msg type %d",
443 msg->msg_type);
444 slurm_send_rc_msg(msg, EINVAL);
445 break;
446 }
447 return;
448 }
449
send_slurmd_conf_lite(int fd,slurmd_conf_t * cf)450 extern int send_slurmd_conf_lite(int fd, slurmd_conf_t *cf)
451 {
452 int len;
453
454 /*
455 * Wait for the registration to come back from the slurmctld so we have
456 * a TRES list to work with.
457 */
458 if (!assoc_mgr_tres_list) {
459 slurm_mutex_lock(&tres_mutex);
460 slurm_cond_wait(&tres_cond, &tres_mutex);
461 slurm_mutex_unlock(&tres_mutex);
462 }
463
464 slurm_mutex_lock(&cf->config_mutex);
465
466 xassert(cf->buf);
467 if (!tres_packed) {
468 assoc_mgr_lock_t locks = { .tres = READ_LOCK };
469 assoc_mgr_lock(&locks);
470 if (assoc_mgr_tres_list) {
471 slurm_pack_list(assoc_mgr_tres_list,
472 slurmdb_pack_tres_rec, cf->buf,
473 SLURM_PROTOCOL_VERSION);
474 } else {
475 fatal("%s: assoc_mgr_tres_list is NULL when trying to start a slurmstepd. This should never happen.",
476 __func__);
477 }
478 assoc_mgr_unlock(&locks);
479 tres_packed = true;
480 }
481
482 len = get_buf_offset(cf->buf);
483 safe_write(fd, &len, sizeof(int));
484 safe_write(fd, get_buf_data(cf->buf), len);
485
486 slurm_mutex_unlock(&cf->config_mutex);
487
488 return (0);
489
490 rwfail:
491 slurm_mutex_unlock(&cf->config_mutex);
492 return (-1);
493 }
494
495 static int
_send_slurmstepd_init(int fd,int type,void * req,slurm_addr_t * cli,slurm_addr_t * self,hostset_t step_hset,uint16_t protocol_version)496 _send_slurmstepd_init(int fd, int type, void *req,
497 slurm_addr_t *cli, slurm_addr_t *self,
498 hostset_t step_hset, uint16_t protocol_version)
499 {
500 int len = 0;
501 Buf buffer = NULL;
502 slurm_msg_t msg;
503
504 int rank;
505 int parent_rank, children, depth, max_depth;
506 char *parent_alias = NULL;
507 slurm_addr_t parent_addr = {0};
508
509 slurm_msg_t_init(&msg);
510
511 /* send conf over to slurmstepd */
512 if (send_slurmd_conf_lite(fd, conf) < 0)
513 goto rwfail;
514
515 /* send cgroup conf over to slurmstepd */
516 if (xcgroup_write_conf(fd) < 0)
517 goto rwfail;
518
519 /* send acct_gather.conf over to slurmstepd */
520 if (acct_gather_write_conf(fd) < 0)
521 goto rwfail;
522
523 /* send type over to slurmstepd */
524 safe_write(fd, &type, sizeof(int));
525
526 /* step_hset can be NULL for batch scripts OR if the job was submitted
527 * by SlurmUser or root using the --no-allocate/-Z option and the job
528 * job credential validation by _check_job_credential() failed. If the
529 * job credential did not validate, then it did not come from slurmctld
530 * and there is no reason to send step completion messages to slurmctld.
531 */
532 if (step_hset == NULL) {
533 bool send_error = false;
534 if (type == LAUNCH_TASKS) {
535 launch_tasks_request_msg_t *launch_req;
536 launch_req = (launch_tasks_request_msg_t *) req;
537 if (launch_req->job_step_id != SLURM_EXTERN_CONT)
538 send_error = true;
539 }
540 if (send_error) {
541 info("task rank unavailable due to invalid job "
542 "credential, step completion RPC impossible");
543 }
544 rank = -1;
545 parent_rank = -1;
546 children = 0;
547 depth = 0;
548 max_depth = 0;
549 } else if ((type == LAUNCH_TASKS) &&
550 (((launch_tasks_request_msg_t *)req)->alias_list)) {
551 /*
552 * In the cloud, each task talks directly to the slurmctld
553 * since node addressing is abnormal. Setting parent_rank = -1
554 * is sufficient to force slurmstepd to talk directly to the
555 * slurmctld - see _one_step_complete_msg. We need to make sure
556 * to set rank to the actual rank so that the slurmctld will
557 * properly clean up all nodes.
558 */
559 rank = hostset_find(step_hset, conf->node_name);
560 parent_rank = -1;
561 children = 0;
562 depth = 0;
563 max_depth = 0;
564 } else {
565 #ifndef HAVE_FRONT_END
566 int count;
567 count = hostset_count(step_hset);
568 rank = hostset_find(step_hset, conf->node_name);
569 reverse_tree_info(rank, count, REVERSE_TREE_WIDTH,
570 &parent_rank, &children,
571 &depth, &max_depth);
572
573 if (children == -1) {
574 error("reverse_tree_info: Sanity check fail, can't start job");
575 goto rwfail;
576 }
577 if (rank > 0) { /* rank 0 talks directly to the slurmctld */
578 int rc;
579 /* Find the slurm_addr_t of this node's parent slurmd
580 * in the step host list */
581 parent_alias = hostset_nth(step_hset, parent_rank);
582 rc = slurm_conf_get_addr(parent_alias, &parent_addr, 0);
583 if (rc != SLURM_SUCCESS) {
584 error("Failed looking up address for "
585 "NodeName %s", parent_alias);
586 /* parent_rank = -1; */
587 }
588 }
589 #else
590 /* In FRONT_END mode, one slurmd pretends to be all
591 * NodeNames, so we can't compare conf->node_name
592 * to the NodeNames in step_hset. Just send step complete
593 * RPC directly to the controller.
594 */
595 rank = 0;
596 parent_rank = -1;
597 children = 0;
598 depth = 0;
599 max_depth = 0;
600 #endif
601 }
602 debug3("slurmstepd rank %d (%s), parent rank %d (%s), "
603 "children %d, depth %d, max_depth %d",
604 rank, conf->node_name,
605 parent_rank, parent_alias ? parent_alias : "NONE",
606 children, depth, max_depth);
607 if (parent_alias)
608 free(parent_alias);
609
610 /* send reverse-tree info to the slurmstepd */
611 safe_write(fd, &rank, sizeof(int));
612 safe_write(fd, &parent_rank, sizeof(int));
613 safe_write(fd, &children, sizeof(int));
614 safe_write(fd, &depth, sizeof(int));
615 safe_write(fd, &max_depth, sizeof(int));
616 safe_write(fd, &parent_addr, sizeof(slurm_addr_t));
617
618 /* send cli address over to slurmstepd */
619 buffer = init_buf(0);
620 slurm_pack_slurm_addr(cli, buffer);
621 len = get_buf_offset(buffer);
622 safe_write(fd, &len, sizeof(int));
623 safe_write(fd, get_buf_data(buffer), len);
624 free_buf(buffer);
625 buffer = NULL;
626
627 /* send self address over to slurmstepd */
628 if (self) {
629 buffer = init_buf(0);
630 slurm_pack_slurm_addr(self, buffer);
631 len = get_buf_offset(buffer);
632 safe_write(fd, &len, sizeof(int));
633 safe_write(fd, get_buf_data(buffer), len);
634 free_buf(buffer);
635 buffer = NULL;
636
637 } else {
638 len = 0;
639 safe_write(fd, &len, sizeof(int));
640 }
641
642 /* Send GRES information to slurmstepd */
643 gres_plugin_send_stepd(fd);
644
645 /* send cpu_frequency info to slurmstepd */
646 cpu_freq_send_info(fd);
647
648 /* send req over to slurmstepd */
649 switch (type) {
650 case LAUNCH_BATCH_JOB:
651 msg.msg_type = REQUEST_BATCH_JOB_LAUNCH;
652 break;
653 case LAUNCH_TASKS:
654 msg.msg_type = REQUEST_LAUNCH_TASKS;
655 break;
656 default:
657 error("Was sent a task I didn't understand");
658 break;
659 }
660 buffer = init_buf(0);
661 msg.data = req;
662
663 /* always force the RPC format to the latest */
664 msg.protocol_version = SLURM_PROTOCOL_VERSION;
665 pack_msg(&msg, buffer);
666 len = get_buf_offset(buffer);
667
668 /* send the srun protocol_version over, which may be older */
669 safe_write(fd, &protocol_version, sizeof(uint16_t));
670
671 safe_write(fd, &len, sizeof(int));
672 safe_write(fd, get_buf_data(buffer), len);
673 free_buf(buffer);
674 buffer = NULL;
675
676 return 0;
677
678 rwfail:
679 if (buffer)
680 free_buf(buffer);
681 error("_send_slurmstepd_init failed");
682 return errno;
683 }
684
685
686 /*
687 * Fork and exec the slurmstepd, then send the slurmstepd its
688 * initialization data. Then wait for slurmstepd to send an "ok"
689 * message before returning. When the "ok" message is received,
690 * the slurmstepd has created and begun listening on its unix
691 * domain socket.
692 *
693 * Note that this code forks twice and it is the grandchild that
694 * becomes the slurmstepd process, so the slurmstepd's parent process
695 * will be init, not slurmd.
696 */
697 static int
_forkexec_slurmstepd(uint16_t type,void * req,slurm_addr_t * cli,slurm_addr_t * self,const hostset_t step_hset,uint16_t protocol_version)698 _forkexec_slurmstepd(uint16_t type, void *req,
699 slurm_addr_t *cli, slurm_addr_t *self,
700 const hostset_t step_hset, uint16_t protocol_version)
701 {
702 pid_t pid;
703 int to_stepd[2] = {-1, -1};
704 int to_slurmd[2] = {-1, -1};
705
706 if (pipe(to_stepd) < 0 || pipe(to_slurmd) < 0) {
707 error("%s: pipe failed: %m", __func__);
708 return SLURM_ERROR;
709 }
710
711 if (_add_starting_step(type, req)) {
712 error("%s: failed in _add_starting_step: %m", __func__);
713 return SLURM_ERROR;
714 }
715
716 if ((pid = fork()) < 0) {
717 error("%s: fork: %m", __func__);
718 close(to_stepd[0]);
719 close(to_stepd[1]);
720 close(to_slurmd[0]);
721 close(to_slurmd[1]);
722 _remove_starting_step(type, req);
723 return SLURM_ERROR;
724 } else if (pid > 0) {
725 int rc = SLURM_SUCCESS;
726 #if (SLURMSTEPD_MEMCHECK == 0)
727 int i;
728 time_t start_time = time(NULL);
729 #endif
730 /*
731 * Parent sends initialization data to the slurmstepd
732 * over the to_stepd pipe, and waits for the return code
733 * reply on the to_slurmd pipe.
734 */
735 if (close(to_stepd[0]) < 0)
736 error("Unable to close read to_stepd in parent: %m");
737 if (close(to_slurmd[1]) < 0)
738 error("Unable to close write to_slurmd in parent: %m");
739
740 if ((rc = _send_slurmstepd_init(to_stepd[1], type,
741 req, cli, self,
742 step_hset,
743 protocol_version)) != 0) {
744 error("Unable to init slurmstepd");
745 goto done;
746 }
747
748 /* If running under valgrind/memcheck, this pipe doesn't work
749 * correctly so just skip it. */
750 #if (SLURMSTEPD_MEMCHECK == 0)
751 i = read(to_slurmd[0], &rc, sizeof(int));
752 if (i < 0) {
753 error("%s: Can not read return code from slurmstepd "
754 "got %d: %m", __func__, i);
755 rc = SLURM_ERROR;
756 } else if (i != sizeof(int)) {
757 error("%s: slurmstepd failed to send return code "
758 "got %d: %m", __func__, i);
759 rc = SLURM_ERROR;
760 } else {
761 int delta_time = time(NULL) - start_time;
762 int cc;
763 if (delta_time > 5) {
764 info("Warning: slurmstepd startup took %d sec, "
765 "possible file system problem or full "
766 "memory", delta_time);
767 }
768 if (rc != SLURM_SUCCESS)
769 error("slurmstepd return code %d", rc);
770
771 cc = SLURM_SUCCESS;
772 cc = write(to_stepd[1], &cc, sizeof(int));
773 if (cc != sizeof(int)) {
774 error("%s: failed to send ack to stepd %d: %m",
775 __func__, cc);
776 }
777 }
778 #endif
779 done:
780 if (_remove_starting_step(type, req))
781 error("Error cleaning up starting_step list");
782
783 /* Reap child */
784 if (waitpid(pid, NULL, 0) < 0)
785 error("Unable to reap slurmd child process");
786 if (close(to_stepd[1]) < 0)
787 error("close write to_stepd in parent: %m");
788 if (close(to_slurmd[0]) < 0)
789 error("close read to_slurmd in parent: %m");
790 return rc;
791 } else {
792 #if (SLURMSTEPD_MEMCHECK == 1)
793 /* memcheck test of slurmstepd, option #1 */
794 char *const argv[3] = {"memcheck",
795 (char *)conf->stepd_loc, NULL};
796 #elif (SLURMSTEPD_MEMCHECK == 2)
797 /* valgrind test of slurmstepd, option #2 */
798 uint32_t job_id = 0, step_id = 0;
799 char log_file[256];
800 char *const argv[13] = {"valgrind", "--tool=memcheck",
801 "--error-limit=no",
802 "--leak-check=summary",
803 "--show-reachable=yes",
804 "--max-stackframe=16777216",
805 "--num-callers=20",
806 "--child-silent-after-fork=yes",
807 "--track-origins=yes",
808 log_file, (char *)conf->stepd_loc,
809 NULL};
810 if (type == LAUNCH_BATCH_JOB) {
811 job_id = ((batch_job_launch_msg_t *)req)->job_id;
812 step_id = ((batch_job_launch_msg_t *)req)->step_id;
813 } else if (type == LAUNCH_TASKS) {
814 job_id = ((launch_tasks_request_msg_t *)req)->job_id;
815 step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
816 }
817 snprintf(log_file, sizeof(log_file),
818 "--log-file=/tmp/slurmstepd_valgrind_%u.%u",
819 job_id, step_id);
820 #elif (SLURMSTEPD_MEMCHECK == 3)
821 /* valgrind/drd test of slurmstepd, option #3 */
822 uint32_t job_id = 0, step_id = 0;
823 char log_file[256];
824 char *const argv[10] = {"valgrind", "--tool=drd",
825 "--error-limit=no",
826 "--max-stackframe=16777216",
827 "--num-callers=20",
828 "--child-silent-after-fork=yes",
829 log_file, (char *)conf->stepd_loc,
830 NULL};
831 if (type == LAUNCH_BATCH_JOB) {
832 job_id = ((batch_job_launch_msg_t *)req)->job_id;
833 step_id = ((batch_job_launch_msg_t *)req)->step_id;
834 } else if (type == LAUNCH_TASKS) {
835 job_id = ((launch_tasks_request_msg_t *)req)->job_id;
836 step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
837 }
838 snprintf(log_file, sizeof(log_file),
839 "--log-file=/tmp/slurmstepd_valgrind_%u.%u",
840 job_id, step_id);
841 #elif (SLURMSTEPD_MEMCHECK == 4)
842 /* valgrind/helgrind test of slurmstepd, option #4 */
843 uint32_t job_id = 0, step_id = 0;
844 char log_file[256];
845 char *const argv[10] = {"valgrind", "--tool=helgrind",
846 "--error-limit=no",
847 "--max-stackframe=16777216",
848 "--num-callers=20",
849 "--child-silent-after-fork=yes",
850 log_file, (char *)conf->stepd_loc,
851 NULL};
852 if (type == LAUNCH_BATCH_JOB) {
853 job_id = ((batch_job_launch_msg_t *)req)->job_id;
854 step_id = ((batch_job_launch_msg_t *)req)->step_id;
855 } else if (type == LAUNCH_TASKS) {
856 job_id = ((launch_tasks_request_msg_t *)req)->job_id;
857 step_id = ((launch_tasks_request_msg_t *)req)->job_step_id;
858 }
859 snprintf(log_file, sizeof(log_file),
860 "--log-file=/tmp/slurmstepd_valgrind_%u.%u",
861 job_id, step_id);
862 #else
863 /* no memory checking, default */
864 char *const argv[2] = { (char *)conf->stepd_loc, NULL};
865 #endif
866 int i;
867 int failed = 0;
868
869 /*
870 * Child forks and exits
871 */
872 if (setsid() < 0) {
873 error("%s: setsid: %m", __func__);
874 failed = 1;
875 }
876 if ((pid = fork()) < 0) {
877 error("%s: Unable to fork grandchild: %m", __func__);
878 failed = 2;
879 } else if (pid > 0) { /* child */
880 _exit(0);
881 }
882
883 /*
884 * Just in case we (or someone we are linking to)
885 * opened a file and didn't do a close on exec. This
886 * is needed mostly to protect us against libs we link
887 * to that don't set the flag as we should already be
888 * setting it for those that we open. The number 256
889 * is an arbitrary number based off test7.9.
890 */
891 for (i=3; i<256; i++) {
892 (void) fcntl(i, F_SETFD, FD_CLOEXEC);
893 }
894
895 /*
896 * Grandchild exec's the slurmstepd
897 *
898 * If the slurmd is being shutdown/restarted before
899 * the pipe happens the old conf->lfd could be reused
900 * and if we close it the dup2 below will fail.
901 */
902 if ((to_stepd[0] != conf->lfd)
903 && (to_slurmd[1] != conf->lfd))
904 close(conf->lfd);
905
906 if (close(to_stepd[1]) < 0)
907 error("close write to_stepd in grandchild: %m");
908 if (close(to_slurmd[0]) < 0)
909 error("close read to_slurmd in parent: %m");
910
911 (void) close(STDIN_FILENO); /* ignore return */
912 if (dup2(to_stepd[0], STDIN_FILENO) == -1) {
913 error("dup2 over STDIN_FILENO: %m");
914 _exit(1);
915 }
916 fd_set_close_on_exec(to_stepd[0]);
917 (void) close(STDOUT_FILENO); /* ignore return */
918 if (dup2(to_slurmd[1], STDOUT_FILENO) == -1) {
919 error("dup2 over STDOUT_FILENO: %m");
920 _exit(1);
921 }
922 fd_set_close_on_exec(to_slurmd[1]);
923 (void) close(STDERR_FILENO); /* ignore return */
924 if (dup2(devnull, STDERR_FILENO) == -1) {
925 error("dup2 /dev/null to STDERR_FILENO: %m");
926 _exit(1);
927 }
928 fd_set_noclose_on_exec(STDERR_FILENO);
929 log_fini();
930 if (!failed) {
931 execvp(argv[0], argv);
932 error("exec of slurmstepd failed: %m");
933 }
934 _exit(2);
935 }
936 }
937
_setup_x11_display(uint32_t job_id,uint32_t step_id,char *** env,uint32_t * envc)938 static void _setup_x11_display(uint32_t job_id, uint32_t step_id,
939 char ***env, uint32_t *envc)
940 {
941 int display = 0, fd;
942 char *xauthority = NULL;
943 uint16_t protocol_version;
944
945 fd = stepd_connect(conf->spooldir, conf->node_name,
946 job_id, SLURM_EXTERN_CONT,
947 &protocol_version);
948
949 if (fd == -1) {
950 error("could not get x11 forwarding display for job %u step %u,"
951 " x11 forwarding disabled", job_id, step_id);
952 return;
953 }
954
955 display = stepd_get_x11_display(fd, protocol_version, &xauthority);
956 close(fd);
957
958 if (!display) {
959 error("could not get x11 forwarding display for job %u step %u,"
960 " x11 forwarding disabled", job_id, step_id);
961 env_array_overwrite(env, "DISPLAY", "SLURM_X11_SETUP_FAILED");
962 *envc = envcount(*env);
963 return;
964 }
965
966 debug2("%s: setting DISPLAY=localhost:%d:0 for job %u step %u",
967 __func__, display, job_id, step_id);
968 env_array_overwrite_fmt(env, "DISPLAY", "localhost:%d.0", display);
969
970 if (xauthority) {
971 env_array_overwrite(env, "XAUTHORITY", xauthority);
972 xfree(xauthority);
973 }
974
975 *envc = envcount(*env);
976 }
977
978 /*
979 * The job(step) credential is the only place to get a definitive
980 * list of the nodes allocated to a job step. We need to return
981 * a hostset_t of the nodes. Validate the incoming RPC, updating
982 * job_mem needed.
983 */
_check_job_credential(launch_tasks_request_msg_t * req,uid_t auth_uid,gid_t auth_gid,int node_id,hostset_t * step_hset,uint16_t protocol_version)984 static int _check_job_credential(launch_tasks_request_msg_t *req,
985 uid_t auth_uid, gid_t auth_gid,
986 int node_id, hostset_t *step_hset,
987 uint16_t protocol_version)
988 {
989 slurm_cred_arg_t arg;
990 hostset_t s_hset = NULL;
991 bool user_ok = _slurm_authorized_user(auth_uid);
992 int host_index = -1;
993 slurm_cred_t *cred = req->cred;
994 uint32_t jobid = req->job_id;
995 uint32_t stepid = req->job_step_id;
996 int tasks_to_launch = req->tasks_to_launch[node_id];
997 uint32_t job_cpus = 0, step_cpus = 0;
998
999 if (user_ok && (req->flags & LAUNCH_NO_ALLOC)) {
1000 /* If we didn't allocate then the cred isn't valid, just skip
1001 * checking. This is only cool for root or SlurmUser */
1002 debug("%s: FYI, user %d is an authorized user running outside of an allocation.",
1003 __func__, auth_uid);
1004 return SLURM_SUCCESS;
1005 }
1006
1007 /*
1008 * First call slurm_cred_verify() so that all credentials are checked
1009 */
1010 if (slurm_cred_verify(conf->vctx, cred, &arg, protocol_version) < 0)
1011 return SLURM_ERROR;
1012
1013 if ((arg.jobid != jobid) || (arg.stepid != stepid)) {
1014 error("job credential for %u.%u, expected %u.%u",
1015 arg.jobid, arg.stepid, jobid, stepid);
1016 goto fail;
1017 }
1018
1019 if (arg.uid != req->uid) {
1020 error("job %u credential created for uid %u, expected %u",
1021 arg.jobid, arg.uid, req->uid);
1022 goto fail;
1023 }
1024
1025 if (arg.gid != req->gid) {
1026 error("job %u credential created for gid %u, expected %u",
1027 arg.jobid, arg.gid, req->gid);
1028 goto fail;
1029 }
1030
1031 xfree(req->user_name);
1032 if (arg.pw_name)
1033 req->user_name = xstrdup(arg.pw_name);
1034 else
1035 req->user_name = uid_to_string(req->uid);
1036
1037 xfree(req->gids);
1038 if (arg.ngids) {
1039 req->ngids = arg.ngids;
1040 req->gids = copy_gids(arg.ngids, arg.gids);
1041 } else {
1042 /*
1043 * The gids were not sent in the cred, or dealing with an older
1044 * RPC format, so retrieve from cache instead.
1045 */
1046 req->ngids = group_cache_lookup(req->uid, req->gid,
1047 req->user_name, &req->gids);
1048 }
1049
1050 /*
1051 * Check that credential is valid for this host
1052 */
1053 if (!(s_hset = hostset_create(arg.step_hostlist))) {
1054 error("Unable to parse credential hostlist: `%s'",
1055 arg.step_hostlist);
1056 goto fail;
1057 }
1058
1059 if (!hostset_within(s_hset, conf->node_name)) {
1060 error("Invalid job %u.%u credential for user %u: "
1061 "host %s not in hostset %s",
1062 arg.jobid, arg.stepid, arg.uid,
1063 conf->node_name, arg.step_hostlist);
1064 goto fail;
1065 }
1066
1067 if ((arg.job_nhosts > 0) && (tasks_to_launch > 0)) {
1068 uint32_t hi, i, i_first_bit=0, i_last_bit=0, j;
1069 bool cpu_log = slurm_get_debug_flags() & DEBUG_FLAG_CPU_BIND;
1070 bool setup_x11 = false;
1071
1072 #ifdef HAVE_FRONT_END
1073 host_index = 0; /* It is always 0 for front end systems */
1074 #else
1075 hostset_t j_hset;
1076 /* Determine the CPU count based upon this node's index into
1077 * the _job's_ allocation (job's hostlist and core_bitmap) */
1078 if (!(j_hset = hostset_create(arg.job_hostlist))) {
1079 error("Unable to parse credential hostlist: `%s'",
1080 arg.job_hostlist);
1081 goto fail;
1082 }
1083 host_index = hostset_find(j_hset, conf->node_name);
1084 hostset_destroy(j_hset);
1085
1086 if ((host_index < 0) || (host_index >= arg.job_nhosts)) {
1087 error("job cr credential invalid host_index %d for "
1088 "job %u", host_index, arg.jobid);
1089 goto fail;
1090 }
1091 #endif
1092
1093 /*
1094 * handle the x11 flag bit here since we have access to the
1095 * host_index already.
1096 *
1097 */
1098 if (!arg.x11)
1099 setup_x11 = false;
1100 else if (arg.x11 & X11_FORWARD_ALL)
1101 setup_x11 = true;
1102 /* assumes that the first node is the batch host */
1103 else if (((arg.x11 & X11_FORWARD_FIRST) ||
1104 (arg.x11 & X11_FORWARD_BATCH))
1105 && (host_index == 0))
1106 setup_x11 = true;
1107 else if ((arg.x11 & X11_FORWARD_LAST)
1108 && (host_index == (req->nnodes - 1)))
1109 setup_x11 = true;
1110
1111 if (setup_x11)
1112 _setup_x11_display(req->job_id, req->job_step_id,
1113 &req->env, &req->envc);
1114
1115 if (cpu_log) {
1116 char *per_job = "", *per_step = "";
1117 uint64_t job_mem = arg.job_mem_limit;
1118 uint64_t step_mem = arg.step_mem_limit;
1119 if (job_mem & MEM_PER_CPU) {
1120 job_mem &= (~MEM_PER_CPU);
1121 per_job = "_per_CPU";
1122 }
1123 if (step_mem & MEM_PER_CPU) {
1124 step_mem &= (~MEM_PER_CPU);
1125 per_step = "_per_CPU";
1126 }
1127 info("====================");
1128 info("step_id:%u.%u job_mem:%"PRIu64"MB%s "
1129 "step_mem:%"PRIu64"MB%s",
1130 arg.jobid, arg.stepid, job_mem, per_job,
1131 step_mem, per_step);
1132 }
1133
1134 hi = host_index + 1; /* change from 0-origin to 1-origin */
1135 for (i=0; hi; i++) {
1136 if (hi > arg.sock_core_rep_count[i]) {
1137 i_first_bit += arg.sockets_per_node[i] *
1138 arg.cores_per_socket[i] *
1139 arg.sock_core_rep_count[i];
1140 hi -= arg.sock_core_rep_count[i];
1141 } else {
1142 i_first_bit += arg.sockets_per_node[i] *
1143 arg.cores_per_socket[i] *
1144 (hi - 1);
1145 i_last_bit = i_first_bit +
1146 arg.sockets_per_node[i] *
1147 arg.cores_per_socket[i];
1148 break;
1149 }
1150 }
1151 /* Now count the allocated processors */
1152 for (i=i_first_bit, j=0; i<i_last_bit; i++, j++) {
1153 char *who_has = NULL;
1154 if (bit_test(arg.job_core_bitmap, i)) {
1155 job_cpus++;
1156 who_has = "Job";
1157 }
1158 if (bit_test(arg.step_core_bitmap, i)) {
1159 step_cpus++;
1160 who_has = "Step";
1161 }
1162 if (cpu_log && who_has) {
1163 info("JobNode[%u] CPU[%u] %s alloc",
1164 host_index, j, who_has);
1165 }
1166 }
1167 if (cpu_log)
1168 info("====================");
1169 if (step_cpus == 0) {
1170 error("cons_res: zero processors allocated to step");
1171 step_cpus = 1;
1172 }
1173 /* NOTE: step_cpus is the count of allocated resources
1174 * (typically cores). Convert to CPU count as needed */
1175 if (i_last_bit <= i_first_bit)
1176 error("step credential has no CPUs selected");
1177 else {
1178 i = conf->cpus / (i_last_bit - i_first_bit);
1179 if (i > 1) {
1180 if (cpu_log)
1181 info("Scaling CPU count by factor of "
1182 "%d (%u/(%u-%u))",
1183 i, conf->cpus,
1184 i_last_bit, i_first_bit);
1185 step_cpus *= i;
1186 job_cpus *= i;
1187 }
1188 }
1189 if (tasks_to_launch > step_cpus) {
1190 /* This is expected with the --overcommit option
1191 * or hyperthreads */
1192 debug("cons_res: More than one tasks per logical "
1193 "processor (%d > %u) on host [%u.%u %ld %s] ",
1194 tasks_to_launch, step_cpus, arg.jobid,
1195 arg.stepid, (long) arg.uid, arg.step_hostlist);
1196 }
1197 } else {
1198 step_cpus = 1;
1199 job_cpus = 1;
1200 }
1201
1202 /* Overwrite any memory limits in the RPC with contents of the
1203 * memory limit within the credential.
1204 * Reset the CPU count on this node to correct value. */
1205 if (arg.step_mem_limit) {
1206 if (arg.step_mem_limit & MEM_PER_CPU) {
1207 req->step_mem_lim = arg.step_mem_limit &
1208 (~MEM_PER_CPU);
1209 req->step_mem_lim *= step_cpus;
1210 } else
1211 req->step_mem_lim = arg.step_mem_limit;
1212 } else {
1213 if (arg.job_mem_limit & MEM_PER_CPU) {
1214 req->step_mem_lim = arg.job_mem_limit &
1215 (~MEM_PER_CPU);
1216 req->step_mem_lim *= job_cpus;
1217 } else
1218 req->step_mem_lim = arg.job_mem_limit;
1219 }
1220 if (arg.job_mem_limit & MEM_PER_CPU) {
1221 req->job_mem_lim = arg.job_mem_limit & (~MEM_PER_CPU);
1222 req->job_mem_lim *= job_cpus;
1223 } else
1224 req->job_mem_lim = arg.job_mem_limit;
1225 req->job_core_spec = arg.job_core_spec;
1226 req->node_cpus = step_cpus;
1227 #if 0
1228 info("%u.%u node_id:%d mem orig:%"PRIu64" cpus:%u limit:%"PRIu64"",
1229 jobid, stepid, node_id, arg.job_mem_limit,
1230 step_cpus, req->job_mem_lim);
1231 #endif
1232
1233 *step_hset = s_hset;
1234 slurm_cred_free_args(&arg);
1235 return SLURM_SUCCESS;
1236
1237 fail:
1238 if (s_hset)
1239 hostset_destroy(s_hset);
1240 *step_hset = NULL;
1241 slurm_cred_free_args(&arg);
1242 slurm_seterrno_ret(ESLURMD_INVALID_JOB_CREDENTIAL);
1243 }
1244
_str_to_memset(bitstr_t * mask,char * str)1245 static int _str_to_memset(bitstr_t *mask, char *str)
1246 {
1247 int len = strlen(str);
1248 const char *ptr = str + len - 1;
1249 int base = 0;
1250
1251 while (ptr >= str) {
1252 char val = slurm_char_to_hex(*ptr);
1253 if (val == (char) -1)
1254 return -1;
1255 if ((val & 1) && (base < MAX_NUMA_CNT))
1256 bit_set(mask, base);
1257 base++;
1258 if ((val & 2) && (base < MAX_NUMA_CNT))
1259 bit_set(mask, base);
1260 base++;
1261 if ((val & 4) && (base < MAX_NUMA_CNT))
1262 bit_set(mask, base);
1263 base++;
1264 if ((val & 8) && (base < MAX_NUMA_CNT))
1265 bit_set(mask, base);
1266 base++;
1267 len--;
1268 ptr--;
1269 }
1270
1271 return 0;
1272 }
1273
_build_cpu_bitmap(uint16_t cpu_bind_type,char * cpu_bind,int task_cnt_on_node)1274 static bitstr_t *_build_cpu_bitmap(uint16_t cpu_bind_type, char *cpu_bind,
1275 int task_cnt_on_node)
1276 {
1277 bitstr_t *cpu_bitmap = NULL;
1278 char *tmp_str, *tok, *save_ptr = NULL;
1279 int cpu_id;
1280
1281 if (cpu_bind_type & CPU_BIND_NONE) {
1282 /* Return NULL bitmap, sort all NUMA */
1283 } else if ((cpu_bind_type & CPU_BIND_RANK) &&
1284 (task_cnt_on_node > 0)) {
1285 cpu_bitmap = bit_alloc(MAX_CPU_CNT);
1286 if (task_cnt_on_node >= MAX_CPU_CNT)
1287 task_cnt_on_node = MAX_CPU_CNT;
1288 for (cpu_id = 0; cpu_id < task_cnt_on_node; cpu_id++) {
1289 bit_set(cpu_bitmap, cpu_id);
1290 }
1291 } else if (cpu_bind_type & CPU_BIND_MAP) {
1292 cpu_bitmap = bit_alloc(MAX_CPU_CNT);
1293 tmp_str = xstrdup(cpu_bind);
1294 tok = strtok_r(tmp_str, ",", &save_ptr);
1295 while (tok) {
1296 if (!xstrncmp(tok, "0x", 2))
1297 cpu_id = strtoul(tok + 2, NULL, 16);
1298 else
1299 cpu_id = strtoul(tok, NULL, 10);
1300 if (cpu_id < MAX_CPU_CNT)
1301 bit_set(cpu_bitmap, cpu_id);
1302 tok = strtok_r(NULL, ",", &save_ptr);
1303 }
1304 xfree(tmp_str);
1305 } else if (cpu_bind_type & CPU_BIND_MASK) {
1306 cpu_bitmap = bit_alloc(MAX_CPU_CNT);
1307 tmp_str = xstrdup(cpu_bind);
1308 tok = strtok_r(tmp_str, ",", &save_ptr);
1309 while (tok) {
1310 if (!xstrncmp(tok, "0x", 2))
1311 tok += 2; /* Skip "0x", always hex */
1312 (void) _str_to_memset(cpu_bitmap, tok);
1313 tok = strtok_r(NULL, ",", &save_ptr);
1314 }
1315 xfree(tmp_str);
1316 }
1317 return cpu_bitmap;
1318 }
1319
_xlate_cpu_to_numa_bitmap(bitstr_t * cpu_bitmap)1320 static bitstr_t *_xlate_cpu_to_numa_bitmap(bitstr_t *cpu_bitmap)
1321 {
1322 bitstr_t *numa_bitmap = NULL;
1323 #ifdef HAVE_NUMA
1324 struct bitmask *numa_bitmask = NULL;
1325 char cpu_str[10240];
1326 int i, max_numa;
1327
1328 if (numa_available() != -1) {
1329 bit_fmt(cpu_str, sizeof(cpu_str), cpu_bitmap);
1330 numa_bitmask = numa_parse_cpustring(cpu_str);
1331 if (numa_bitmask) {
1332 max_numa = numa_max_node();
1333 numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1334 for (i = 0; i <= max_numa; i++) {
1335 if (numa_bitmask_isbitset(numa_bitmask, i))
1336 bit_set(numa_bitmap, i);
1337 }
1338 numa_bitmask_free(numa_bitmask);
1339 }
1340 }
1341 #endif
1342 return numa_bitmap;
1343
1344 }
1345
_build_numa_bitmap(uint16_t mem_bind_type,char * mem_bind,uint16_t cpu_bind_type,char * cpu_bind,int task_cnt_on_node)1346 static bitstr_t *_build_numa_bitmap(uint16_t mem_bind_type, char *mem_bind,
1347 uint16_t cpu_bind_type, char *cpu_bind,
1348 int task_cnt_on_node)
1349 {
1350 bitstr_t *cpu_bitmap = NULL, *numa_bitmap = NULL;
1351 char *tmp_str, *tok, *save_ptr = NULL;
1352 int numa_id;
1353
1354 if (mem_bind_type & MEM_BIND_NONE) {
1355 /* Return NULL bitmap, sort all NUMA */
1356 } else if ((mem_bind_type & MEM_BIND_RANK) &&
1357 (task_cnt_on_node > 0)) {
1358 numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1359 if (task_cnt_on_node >= MAX_NUMA_CNT)
1360 task_cnt_on_node = MAX_NUMA_CNT;
1361 for (numa_id = 0; numa_id < task_cnt_on_node; numa_id++) {
1362 bit_set(numa_bitmap, numa_id);
1363 }
1364 } else if (mem_bind_type & MEM_BIND_MAP) {
1365 numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1366 tmp_str = xstrdup(mem_bind);
1367 tok = strtok_r(tmp_str, ",", &save_ptr);
1368 while (tok) {
1369 if (!xstrncmp(tok, "0x", 2))
1370 numa_id = strtoul(tok + 2, NULL, 16);
1371 else
1372 numa_id = strtoul(tok, NULL, 10);
1373 if (numa_id < MAX_NUMA_CNT)
1374 bit_set(numa_bitmap, numa_id);
1375 tok = strtok_r(NULL, ",", &save_ptr);
1376 }
1377 xfree(tmp_str);
1378 } else if (mem_bind_type & MEM_BIND_MASK) {
1379 numa_bitmap = bit_alloc(MAX_NUMA_CNT);
1380 tmp_str = xstrdup(mem_bind);
1381 tok = strtok_r(tmp_str, ",", &save_ptr);
1382 while (tok) {
1383 if (!xstrncmp(tok, "0x", 2))
1384 tok += 2; /* Skip "0x", always hex */
1385 (void) _str_to_memset(numa_bitmap, tok);
1386 tok = strtok_r(NULL, ",", &save_ptr);
1387 }
1388 xfree(tmp_str);
1389 } else if (mem_bind_type & MEM_BIND_LOCAL) {
1390 cpu_bitmap = _build_cpu_bitmap(cpu_bind_type, cpu_bind,
1391 task_cnt_on_node);
1392 if (cpu_bitmap) {
1393 numa_bitmap = _xlate_cpu_to_numa_bitmap(cpu_bitmap);
1394 FREE_NULL_BITMAP(cpu_bitmap);
1395 }
1396 }
1397
1398 return numa_bitmap;
1399 }
1400
1401 static void
_rpc_launch_tasks(slurm_msg_t * msg)1402 _rpc_launch_tasks(slurm_msg_t *msg)
1403 {
1404 int errnum = SLURM_SUCCESS;
1405 uint16_t port;
1406 char host[MAXHOSTNAMELEN];
1407 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
1408 gid_t req_gid = g_slurm_auth_get_gid(msg->auth_cred);
1409 launch_tasks_request_msg_t *req = msg->data;
1410 bool super_user = false;
1411 bool mem_sort = false;
1412 #ifndef HAVE_FRONT_END
1413 bool first_job_run;
1414 #endif
1415 slurm_addr_t self;
1416 slurm_addr_t *cli = &msg->orig_addr;
1417 hostset_t step_hset = NULL;
1418 job_mem_limits_t *job_limits_ptr;
1419 int node_id = 0;
1420 bitstr_t *numa_bitmap = NULL;
1421
1422 #ifndef HAVE_FRONT_END
1423 /* It is always 0 for front end systems */
1424 node_id = nodelist_find(req->complete_nodelist, conf->node_name);
1425 #endif
1426 memcpy(&req->orig_addr, &msg->orig_addr, sizeof(slurm_addr_t));
1427
1428 super_user = _slurm_authorized_user(req_uid);
1429
1430 if ((super_user == false) && (req_uid != req->uid)) {
1431 error("launch task request from uid %u",
1432 (unsigned int) req_uid);
1433 errnum = ESLURM_USER_ID_MISSING; /* or invalid user */
1434 goto done;
1435 }
1436 if (node_id < 0) {
1437 info("%s: Invalid node list (%s not in %s)", __func__,
1438 conf->node_name, req->complete_nodelist);
1439 errnum = ESLURM_INVALID_NODE_NAME;
1440 goto done;
1441 }
1442
1443 slurm_get_ip_str(cli, &port, host, sizeof(host));
1444 if (req->het_job_id && (req->het_job_id != NO_VAL)) {
1445 info("launch task %u+%u.%u (%u.%u) request from UID:%u GID:%u HOST:%s PORT:%hu",
1446 req->het_job_id, req->het_job_offset, req->job_step_id,
1447 req->job_id, req->job_step_id, req->uid, req->gid,
1448 host, port);
1449 } else {
1450 info("launch task %u.%u request from UID:%u GID:%u HOST:%s PORT:%hu",
1451 req->job_id, req->job_step_id, req->uid, req->gid,
1452 host, port);
1453 }
1454
1455 /* this could be set previously and needs to be overwritten by
1456 * this call for messages to work correctly for the new call */
1457 env_array_overwrite(&req->env, "SLURM_SRUN_COMM_HOST", host);
1458 req->envc = envcount(req->env);
1459
1460 #ifndef HAVE_FRONT_END
1461 slurm_mutex_lock(&prolog_mutex);
1462 first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
1463 #endif
1464 if (_check_job_credential(req, req_uid, req_gid, node_id, &step_hset,
1465 msg->protocol_version) < 0) {
1466 errnum = errno;
1467 error("Invalid job credential from %ld@%s: %m",
1468 (long) req_uid, host);
1469 #ifndef HAVE_FRONT_END
1470 slurm_mutex_unlock(&prolog_mutex);
1471 #endif
1472 goto done;
1473 }
1474
1475 /* Must follow _check_job_credential(), which sets some req fields */
1476 task_g_slurmd_launch_request(req, node_id);
1477
1478 #ifndef HAVE_FRONT_END
1479 if (first_job_run) {
1480 int rc;
1481 job_env_t job_env;
1482 List job_gres_list, epi_env_gres_list;
1483 uint32_t jobid;
1484
1485 slurm_cred_insert_jobid(conf->vctx, req->job_id);
1486 _add_job_running_prolog(req->job_id);
1487 slurm_mutex_unlock(&prolog_mutex);
1488
1489 #ifdef HAVE_NATIVE_CRAY
1490 if (req->het_job_id && (req->het_job_id != NO_VAL))
1491 jobid = req->het_job_id;
1492 else
1493 jobid = req->job_id;
1494 #else
1495 jobid = req->job_id;
1496 #endif
1497 if (container_g_create(jobid))
1498 error("container_g_create(%u): %m", req->job_id);
1499
1500 memset(&job_env, 0, sizeof(job_env));
1501 job_gres_list = (List) slurm_cred_get_arg(req->cred,
1502 CRED_ARG_JOB_GRES_LIST);
1503 epi_env_gres_list = gres_plugin_epilog_build_env(job_gres_list,
1504 req->complete_nodelist);
1505 gres_plugin_epilog_set_env(&job_env.gres_job_env,
1506 epi_env_gres_list, node_id);
1507 FREE_NULL_LIST(epi_env_gres_list);
1508
1509 job_env.jobid = req->job_id;
1510 job_env.step_id = req->job_step_id;
1511 job_env.node_list = req->complete_nodelist;
1512 job_env.het_job_id = req->het_job_id;
1513 job_env.partition = req->partition;
1514 job_env.spank_job_env = req->spank_job_env;
1515 job_env.spank_job_env_size = req->spank_job_env_size;
1516 job_env.uid = req->uid;
1517 job_env.gid = req->gid;
1518 job_env.user_name = req->user_name;
1519 rc = _run_prolog(&job_env, req->cred, true);
1520 _free_job_env(&job_env);
1521 if (rc) {
1522 int term_sig = 0, exit_status = 0;
1523 if (WIFSIGNALED(rc))
1524 term_sig = WTERMSIG(rc);
1525 else if (WIFEXITED(rc))
1526 exit_status = WEXITSTATUS(rc);
1527 error("[job %u] prolog failed status=%d:%d",
1528 req->job_id, exit_status, term_sig);
1529 errnum = ESLURMD_PROLOG_FAILED;
1530 goto done;
1531 }
1532 } else {
1533 slurm_mutex_unlock(&prolog_mutex);
1534 _wait_for_job_running_prolog(req->job_id);
1535 }
1536
1537 /*
1538 * Since the job could have been killed while the prolog was running,
1539 * test if the credential has since been revoked and exit as needed.
1540 */
1541 if (slurm_cred_revoked(conf->vctx, req->cred)) {
1542 info("Job %u already killed, do not launch step %u.%u",
1543 req->job_id, req->job_id, req->job_step_id);
1544 errnum = ESLURMD_CREDENTIAL_REVOKED;
1545 goto done;
1546 }
1547 #endif
1548
1549 if (req->mem_bind_type & MEM_BIND_SORT) {
1550 int task_cnt = -1;
1551 if (req->tasks_to_launch)
1552 task_cnt = (int) req->tasks_to_launch[node_id];
1553 mem_sort = true;
1554 numa_bitmap = _build_numa_bitmap(req->mem_bind_type,
1555 req->mem_bind,
1556 req->cpu_bind_type,
1557 req->cpu_bind, task_cnt);
1558 }
1559 node_features_g_step_config(mem_sort, numa_bitmap);
1560 FREE_NULL_BITMAP(numa_bitmap);
1561
1562 if (req->job_mem_lim || req->step_mem_lim) {
1563 step_loc_t step_info;
1564 slurm_mutex_lock(&job_limits_mutex);
1565 if (!job_limits_list)
1566 job_limits_list = list_create(xfree_ptr);
1567 step_info.jobid = req->job_id;
1568 step_info.stepid = req->job_step_id;
1569 job_limits_ptr = list_find_first(job_limits_list,
1570 _step_limits_match,
1571 &step_info);
1572 if (!job_limits_ptr) {
1573 job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
1574 job_limits_ptr->job_id = req->job_id;
1575 job_limits_ptr->job_mem = req->job_mem_lim;
1576 job_limits_ptr->step_id = req->job_step_id;
1577 job_limits_ptr->step_mem = req->step_mem_lim;
1578 #if _LIMIT_INFO
1579 info("AddLim step:%u.%u job_mem:%"PRIu64" "
1580 "step_mem:%"PRIu64"",
1581 job_limits_ptr->job_id, job_limits_ptr->step_id,
1582 job_limits_ptr->job_mem,
1583 job_limits_ptr->step_mem);
1584 #endif
1585 list_append(job_limits_list, job_limits_ptr);
1586 }
1587 slurm_mutex_unlock(&job_limits_mutex);
1588 }
1589
1590 if (slurm_get_stream_addr(msg->conn_fd, &self)) {
1591 error("%s: slurm_get_stream_addr(): %m", __func__);
1592 errnum = errno;
1593 goto done;
1594 }
1595
1596 debug3("%s: call to _forkexec_slurmstepd", __func__);
1597 errnum = _forkexec_slurmstepd(LAUNCH_TASKS, (void *)req, cli, &self,
1598 step_hset, msg->protocol_version);
1599 debug3("%s: return from _forkexec_slurmstepd", __func__);
1600 _launch_complete_add(req->job_id);
1601
1602 done:
1603 if (step_hset)
1604 hostset_destroy(step_hset);
1605
1606 if (slurm_send_rc_msg(msg, errnum) < 0) {
1607 char addr_str[32];
1608 slurm_print_slurm_addr(&msg->address, addr_str,
1609 sizeof(addr_str));
1610 error("%s: unable to send return code to address:port=%s msg_type=%u: %m",
1611 __func__, addr_str, msg->msg_type);
1612
1613 /*
1614 * Rewind credential so that srun may perform retry
1615 */
1616 slurm_cred_rewind(conf->vctx, req->cred); /* ignore errors */
1617
1618 } else if (errnum == SLURM_SUCCESS) {
1619 save_cred_state(conf->vctx);
1620 task_g_slurmd_reserve_resources(req, node_id);
1621 }
1622
1623 /*
1624 * If job prolog failed, indicate failure to slurmctld
1625 */
1626 if (errnum == ESLURMD_PROLOG_FAILED)
1627 send_registration_msg(errnum, false);
1628 }
1629
1630 /*
1631 * Open file based upon permissions of a different user
1632 * IN path_name - name of file to open
1633 * IN flags - flags to open() call
1634 * IN mode - mode to open() call
1635 * IN jobid - (optional) job id
1636 * IN uid - User ID to use for file access check
1637 * IN gid - Group ID to use for file access check
1638 * RET -1 on error, file descriptor otherwise
1639 */
_open_as_other(char * path_name,int flags,int mode,uint32_t jobid,uid_t uid,gid_t gid,int ngids,gid_t * gids)1640 static int _open_as_other(char *path_name, int flags, int mode,
1641 uint32_t jobid, uid_t uid, gid_t gid,
1642 int ngids, gid_t *gids)
1643 {
1644 pid_t child;
1645 int pipe[2];
1646 int fd = -1, rc = 0;
1647
1648 if ((rc = container_g_create(jobid))) {
1649 error("%s: container_g_create(%u): %m", __func__, jobid);
1650 return -1;
1651 }
1652
1653 /* child process will setuid to the user, register the process
1654 * with the container, and open the file for us. */
1655 if (socketpair(AF_UNIX, SOCK_DGRAM, 0, pipe) != 0) {
1656 error("%s: Failed to open pipe: %m", __func__);
1657 return -1;
1658 }
1659
1660 child = fork();
1661 if (child == -1) {
1662 error("%s: fork failure", __func__);
1663 close(pipe[0]);
1664 close(pipe[1]);
1665 return -1;
1666 } else if (child > 0) {
1667 close(pipe[0]);
1668 (void) waitpid(child, &rc, 0);
1669 if (WIFEXITED(rc) && (WEXITSTATUS(rc) == 0))
1670 fd = receive_fd_over_pipe(pipe[1]);
1671 close(pipe[1]);
1672 return fd;
1673 }
1674
1675 /* child process below here */
1676
1677 close(pipe[1]);
1678
1679 /* container_g_join needs to be called in the
1680 * forked process part of the fork to avoid a race
1681 * condition where if this process makes a file or
1682 * detacts itself from a child before we add the pid
1683 * to the container in the parent of the fork. */
1684 if (container_g_join(jobid, uid)) {
1685 error("%s container_g_join(%u): %m", __func__, jobid);
1686 _exit(SLURM_ERROR);
1687 }
1688
1689 /* The child actually performs the I/O and exits with
1690 * a return code, do not return! */
1691
1692 /*********************************************************************\
1693 * NOTE: It would be best to do an exec() immediately after the fork()
1694 * in order to help prevent a possible deadlock in the child process
1695 * due to locks being set at the time of the fork and being freed by
1696 * the parent process, but not freed by the child process. Performing
1697 * the work inline is done for simplicity. Note that the logging
1698 * performed by error() should be safe due to the use of
1699 * atfork_install_handlers() as defined in src/common/log.c.
1700 * Change the code below with caution.
1701 \*********************************************************************/
1702
1703 if (setgroups(ngids, gids) < 0) {
1704 error("%s: uid: %u setgroups failed: %m", __func__, uid);
1705 _exit(errno);
1706 }
1707
1708 if (setgid(gid) < 0) {
1709 error("%s: uid:%u setgid(%u): %m", __func__, uid, gid);
1710 _exit(errno);
1711 }
1712 if (setuid(uid) < 0) {
1713 error("%s: getuid(%u): %m", __func__, uid);
1714 _exit(errno);
1715 }
1716
1717 fd = open(path_name, flags, mode);
1718 if (fd == -1) {
1719 error("%s: uid:%u can't open `%s`: %m",
1720 __func__, uid, path_name);
1721 _exit(errno);
1722 }
1723 send_fd_over_pipe(pipe[0], fd);
1724 close(fd);
1725 _exit(SLURM_SUCCESS);
1726 }
1727
1728
1729 static void
_prolog_error(batch_job_launch_msg_t * req,int rc)1730 _prolog_error(batch_job_launch_msg_t *req, int rc)
1731 {
1732 char *err_name = NULL, *path_name = NULL;
1733 int fd;
1734 int flags = (O_CREAT|O_APPEND|O_WRONLY);
1735 uint32_t jobid;
1736
1737 #ifdef HAVE_NATIVE_CRAY
1738 if (req->het_job_id && (req->het_job_id != NO_VAL))
1739 jobid = req->het_job_id;
1740 else
1741 jobid = req->job_id;
1742 #else
1743 jobid = req->job_id;
1744 #endif
1745
1746 path_name = fname_create2(req);
1747 if ((fd = _open_as_other(path_name, flags, 0644,
1748 jobid, req->uid, req->gid,
1749 req->ngids, req->gids)) == -1) {
1750 error("Unable to open %s: Permission denied", path_name);
1751 xfree(path_name);
1752 return;
1753 }
1754 xfree(path_name);
1755
1756 xstrfmtcat(err_name, "Error running slurm prolog: %d\n",
1757 WEXITSTATUS(rc));
1758 safe_write(fd, err_name, strlen(err_name));
1759
1760 rwfail:
1761 xfree(err_name);
1762 close(fd);
1763 }
1764
1765 /* load the user's environment on this machine if requested
1766 * SLURM_GET_USER_ENV environment variable is set */
1767 static int
_get_user_env(batch_job_launch_msg_t * req)1768 _get_user_env(batch_job_launch_msg_t *req)
1769 {
1770 char **new_env;
1771 int i;
1772 static time_t config_update = 0;
1773 static bool no_env_cache = false;
1774
1775 if (config_update != conf->last_update) {
1776 char *sched_params = slurm_get_sched_params();
1777 no_env_cache = (xstrcasestr(sched_params, "no_env_cache"));
1778 xfree(sched_params);
1779 config_update = conf->last_update;
1780 }
1781
1782 for (i=0; i<req->envc; i++) {
1783 if (xstrcmp(req->environment[i], "SLURM_GET_USER_ENV=1") == 0)
1784 break;
1785 }
1786 if (i >= req->envc)
1787 return 0; /* don't need to load env */
1788
1789 verbose("%s: get env for user %s here", __func__, req->user_name);
1790
1791 /* Permit up to 120 second delay before using cache file */
1792 new_env = env_array_user_default(req->user_name, 120, 0, no_env_cache);
1793 if (! new_env) {
1794 error("%s: Unable to get user's local environment%s",
1795 __func__, no_env_cache ?
1796 "" : ", running only with passed environment");
1797 return -1;
1798 }
1799
1800 env_array_merge(&new_env,
1801 (const char **) req->environment);
1802 env_array_free(req->environment);
1803 req->environment = new_env;
1804 req->envc = envcount(new_env);
1805
1806 return 0;
1807 }
1808
1809 /* The RPC currently contains a memory size limit, but we load the
1810 * value from the job credential to be certain it has not been
1811 * altered by the user */
1812 static void
_set_batch_job_limits(slurm_msg_t * msg)1813 _set_batch_job_limits(slurm_msg_t *msg)
1814 {
1815 int i;
1816 uint32_t alloc_lps = 0, last_bit = 0;
1817 bool cpu_log = slurm_get_debug_flags() & DEBUG_FLAG_CPU_BIND;
1818 slurm_cred_arg_t arg;
1819 batch_job_launch_msg_t *req = (batch_job_launch_msg_t *)msg->data;
1820
1821 if (slurm_cred_get_args(req->cred, &arg) != SLURM_SUCCESS)
1822 return;
1823 req->job_core_spec = arg.job_core_spec; /* Prevent user reset */
1824
1825 if (cpu_log) {
1826 char *per_job = "";
1827 uint64_t job_mem = arg.job_mem_limit;
1828 if (job_mem & MEM_PER_CPU) {
1829 job_mem &= (~MEM_PER_CPU);
1830 per_job = "_per_CPU";
1831 }
1832 info("====================");
1833 info("batch_job:%u job_mem:%"PRIu64"MB%s", req->job_id,
1834 job_mem, per_job);
1835 }
1836 if (cpu_log || (arg.job_mem_limit & MEM_PER_CPU)) {
1837 if (arg.job_nhosts > 0) {
1838 last_bit = arg.sockets_per_node[0] *
1839 arg.cores_per_socket[0];
1840 for (i=0; i<last_bit; i++) {
1841 if (!bit_test(arg.job_core_bitmap, i))
1842 continue;
1843 if (cpu_log)
1844 info("JobNode[0] CPU[%u] Job alloc",i);
1845 alloc_lps++;
1846 }
1847 }
1848 if (cpu_log)
1849 info("====================");
1850 if (alloc_lps == 0) {
1851 error("_set_batch_job_limit: alloc_lps is zero");
1852 alloc_lps = 1;
1853 }
1854
1855 /* NOTE: alloc_lps is the count of allocated resources
1856 * (typically cores). Convert to CPU count as needed */
1857 if (last_bit < 1)
1858 error("Batch job credential allocates no CPUs");
1859 else {
1860 i = conf->cpus / last_bit;
1861 if (i > 1)
1862 alloc_lps *= i;
1863 }
1864 }
1865
1866 if (arg.job_mem_limit & MEM_PER_CPU) {
1867 req->job_mem = arg.job_mem_limit & (~MEM_PER_CPU);
1868 req->job_mem *= alloc_lps;
1869 } else
1870 req->job_mem = arg.job_mem_limit;
1871
1872 /*
1873 * handle x11 settings here since this is the only access to the cred
1874 * on the batch step.
1875 */
1876 if ((arg.x11 & X11_FORWARD_ALL) || (arg.x11 & X11_FORWARD_BATCH))
1877 _setup_x11_display(req->job_id, SLURM_BATCH_SCRIPT,
1878 &req->environment, &req->envc);
1879
1880 slurm_cred_free_args(&arg);
1881 }
1882
1883 /* These functions prevent a possible race condition if the batch script's
1884 * complete RPC is processed before it's launch_successful response. This
1885 * */
_is_batch_job_finished(uint32_t job_id)1886 static bool _is_batch_job_finished(uint32_t job_id)
1887 {
1888 bool found_job = false;
1889 int i;
1890
1891 slurm_mutex_lock(&fini_job_mutex);
1892 for (i = 0; i < fini_job_cnt; i++) {
1893 if (fini_job_id[i] == job_id) {
1894 found_job = true;
1895 break;
1896 }
1897 }
1898 slurm_mutex_unlock(&fini_job_mutex);
1899
1900 return found_job;
1901 }
_note_batch_job_finished(uint32_t job_id)1902 static void _note_batch_job_finished(uint32_t job_id)
1903 {
1904 slurm_mutex_lock(&fini_job_mutex);
1905 fini_job_id[next_fini_job_inx] = job_id;
1906 if (++next_fini_job_inx >= fini_job_cnt)
1907 next_fini_job_inx = 0;
1908 slurm_mutex_unlock(&fini_job_mutex);
1909 }
1910
1911 /* Send notification to slurmctld we are finished running the prolog.
1912 * This is needed on system that don't use srun to launch their tasks.
1913 */
_notify_slurmctld_prolog_fini(uint32_t job_id,uint32_t prolog_return_code)1914 static int _notify_slurmctld_prolog_fini(
1915 uint32_t job_id, uint32_t prolog_return_code)
1916 {
1917 int rc, ret_c;
1918 slurm_msg_t req_msg;
1919 complete_prolog_msg_t req;
1920
1921 slurm_msg_t_init(&req_msg);
1922 memset(&req, 0, sizeof(req));
1923 req.job_id = job_id;
1924 req.prolog_rc = prolog_return_code;
1925
1926 req_msg.msg_type= REQUEST_COMPLETE_PROLOG;
1927 req_msg.data = &req;
1928
1929 /*
1930 * Here we only care about the return code of
1931 * slurm_send_recv_controller_rc_msg since it means there was a
1932 * communication failure and we may need to try again.
1933 */
1934 if ((ret_c = slurm_send_recv_controller_rc_msg(
1935 &req_msg, &rc, working_cluster_rec)))
1936 error("Error sending prolog completion notification: %m");
1937
1938 return ret_c;
1939 }
1940
1941 /* Convert memory limits from per-CPU to per-node */
_convert_job_mem(slurm_msg_t * msg)1942 static void _convert_job_mem(slurm_msg_t *msg)
1943 {
1944 prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
1945 slurm_cred_arg_t arg;
1946 hostset_t j_hset = NULL;
1947 int rc, hi, host_index, job_cpus;
1948 int i, i_first_bit = 0, i_last_bit = 0;
1949
1950 rc = slurm_cred_verify(conf->vctx, req->cred, &arg,
1951 msg->protocol_version);
1952 if (rc < 0) {
1953 error("%s: slurm_cred_verify failed: %m", __func__);
1954 req->nnodes = 1; /* best guess */
1955 return;
1956 }
1957
1958 req->nnodes = arg.job_nhosts;
1959
1960 if (arg.job_mem_limit == 0)
1961 goto fini;
1962 if ((arg.job_mem_limit & MEM_PER_CPU) == 0) {
1963 req->job_mem_limit = arg.job_mem_limit;
1964 goto fini;
1965 }
1966
1967 /* Assume 1 CPU on error */
1968 req->job_mem_limit = arg.job_mem_limit & (~MEM_PER_CPU);
1969
1970 if (!(j_hset = hostset_create(arg.job_hostlist))) {
1971 error("%s: Unable to parse credential hostlist: `%s'",
1972 __func__, arg.step_hostlist);
1973 goto fini;
1974 }
1975 host_index = hostset_find(j_hset, conf->node_name);
1976 hostset_destroy(j_hset);
1977
1978 hi = host_index + 1; /* change from 0-origin to 1-origin */
1979 for (i = 0; hi; i++) {
1980 if (hi > arg.sock_core_rep_count[i]) {
1981 i_first_bit += arg.sockets_per_node[i] *
1982 arg.cores_per_socket[i] *
1983 arg.sock_core_rep_count[i];
1984 i_last_bit = i_first_bit +
1985 arg.sockets_per_node[i] *
1986 arg.cores_per_socket[i] *
1987 arg.sock_core_rep_count[i];
1988 hi -= arg.sock_core_rep_count[i];
1989 } else {
1990 i_first_bit += arg.sockets_per_node[i] *
1991 arg.cores_per_socket[i] * (hi - 1);
1992 i_last_bit = i_first_bit +
1993 arg.sockets_per_node[i] *
1994 arg.cores_per_socket[i];
1995 break;
1996 }
1997 }
1998
1999 /* Now count the allocated processors on this node */
2000 job_cpus = 0;
2001 for (i = i_first_bit; i < i_last_bit; i++) {
2002 if (bit_test(arg.job_core_bitmap, i))
2003 job_cpus++;
2004 }
2005
2006 /* NOTE: alloc_lps is the count of allocated resources
2007 * (typically cores). Convert to CPU count as needed */
2008 if (i_last_bit > i_first_bit) {
2009 i = conf->cpus / (i_last_bit - i_first_bit);
2010 if (i > 1)
2011 job_cpus *= i;
2012 }
2013
2014 req->job_mem_limit *= job_cpus;
2015
2016 fini: slurm_cred_free_args(&arg);
2017 }
2018
_make_prolog_mem_container(slurm_msg_t * msg)2019 static void _make_prolog_mem_container(slurm_msg_t *msg)
2020 {
2021 prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
2022 job_mem_limits_t *job_limits_ptr;
2023 step_loc_t step_info;
2024
2025 _convert_job_mem(msg); /* Convert per-CPU mem limit */
2026 if (req->job_mem_limit) {
2027 slurm_mutex_lock(&job_limits_mutex);
2028 if (!job_limits_list)
2029 job_limits_list = list_create(xfree_ptr);
2030 step_info.jobid = req->job_id;
2031 step_info.stepid = SLURM_EXTERN_CONT;
2032 job_limits_ptr = list_find_first(job_limits_list,
2033 _step_limits_match,
2034 &step_info);
2035 if (!job_limits_ptr) {
2036 job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
2037 job_limits_ptr->job_id = req->job_id;
2038 job_limits_ptr->job_mem = req->job_mem_limit;
2039 job_limits_ptr->step_id = SLURM_EXTERN_CONT;
2040 job_limits_ptr->step_mem = req->job_mem_limit;
2041 #if _LIMIT_INFO
2042 info("AddLim step:%u.%u job_mem:%"PRIu64""
2043 " step_mem:%"PRIu64"",
2044 job_limits_ptr->job_id, job_limits_ptr->step_id,
2045 job_limits_ptr->job_mem,
2046 job_limits_ptr->step_mem);
2047 #endif
2048 list_append(job_limits_list, job_limits_ptr);
2049 }
2050 slurm_mutex_unlock(&job_limits_mutex);
2051 }
2052 }
2053
_spawn_prolog_stepd(slurm_msg_t * msg)2054 static int _spawn_prolog_stepd(slurm_msg_t *msg)
2055 {
2056 prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
2057 launch_tasks_request_msg_t *launch_req;
2058 slurm_addr_t self;
2059 slurm_addr_t *cli = &msg->orig_addr;
2060 int rc = SLURM_SUCCESS;
2061 int i;
2062
2063 launch_req = xmalloc(sizeof(launch_tasks_request_msg_t));
2064 launch_req->alias_list = req->alias_list;
2065 launch_req->complete_nodelist = req->nodes;
2066 launch_req->cpus_per_task = 1;
2067 launch_req->cred = req->cred;
2068 launch_req->cwd = req->work_dir;
2069 launch_req->efname = "/dev/null";
2070 launch_req->gid = req->gid;
2071 launch_req->global_task_ids = xmalloc(sizeof(uint32_t *)
2072 * req->nnodes);
2073 launch_req->ifname = "/dev/null";
2074 launch_req->job_id = req->job_id;
2075 launch_req->job_mem_lim = req->job_mem_limit;
2076 launch_req->job_step_id = SLURM_EXTERN_CONT;
2077 launch_req->nnodes = req->nnodes;
2078 launch_req->ntasks = req->nnodes;
2079 launch_req->ofname = "/dev/null";
2080
2081 launch_req->het_job_id = req->het_job_id;
2082 launch_req->het_job_nnodes = NO_VAL;
2083
2084 launch_req->partition = req->partition;
2085 launch_req->spank_job_env_size = req->spank_job_env_size;
2086 launch_req->spank_job_env = req->spank_job_env;
2087 launch_req->step_mem_lim = req->job_mem_limit;
2088 launch_req->tasks_to_launch = xmalloc(sizeof(uint16_t)
2089 * req->nnodes);
2090 launch_req->uid = req->uid;
2091 launch_req->user_name = req->user_name;
2092
2093 /*
2094 * determine which node this is in the allocation and if
2095 * it should setup the x11 forwarding or not
2096 */
2097 if (req->x11) {
2098 bool setup_x11 = false;
2099 int host_index = -1;
2100 #ifdef HAVE_FRONT_END
2101 host_index = 0; /* It is always 0 for front end systems */
2102 #else
2103 hostset_t j_hset;
2104 /*
2105 * Determine need to setup X11 based upon this node's index into
2106 * the _job's_ allocation
2107 */
2108 if (req->x11 & X11_FORWARD_ALL) {
2109 ; /* Don't need host_index */
2110 } else if (!(j_hset = hostset_create(req->nodes))) {
2111 error("Unable to parse hostlist: `%s'", req->nodes);
2112 } else {
2113 host_index = hostset_find(j_hset, conf->node_name);
2114 hostset_destroy(j_hset);
2115 }
2116 #endif
2117
2118 if (req->x11 & X11_FORWARD_ALL)
2119 setup_x11 = true;
2120 /* assumes that the first node is the batch host */
2121 else if (((req->x11 & X11_FORWARD_FIRST) ||
2122 (req->x11 & X11_FORWARD_BATCH))
2123 && (host_index == 0))
2124 setup_x11 = true;
2125 else if ((req->x11 & X11_FORWARD_LAST)
2126 && (host_index == (req->nnodes - 1)))
2127 setup_x11 = true;
2128
2129 if (setup_x11) {
2130 launch_req->x11 = req->x11;
2131 launch_req->x11_alloc_host = req->x11_alloc_host;
2132 launch_req->x11_alloc_port = req->x11_alloc_port;
2133 launch_req->x11_magic_cookie = req->x11_magic_cookie;
2134 launch_req->x11_target = req->x11_target;
2135 launch_req->x11_target_port = req->x11_target_port;
2136 }
2137 }
2138
2139 for (i = 0; i < req->nnodes; i++) {
2140 uint32_t *tmp32 = xmalloc(sizeof(uint32_t));
2141 *tmp32 = i;
2142 launch_req->global_task_ids[i] = tmp32;
2143 launch_req->tasks_to_launch[i] = 1;
2144 }
2145
2146 /*
2147 * Since job could have been killed while the prolog was
2148 * running (especially on BlueGene, which can take minutes
2149 * for partition booting). Test if the credential has since
2150 * been revoked and exit as needed.
2151 */
2152 if (slurm_get_stream_addr(msg->conn_fd, &self)) {
2153 error("%s: slurm_get_stream_addr(): %m", __func__);
2154 rc = SLURM_ERROR;
2155 } else if (slurm_cred_revoked(conf->vctx, req->cred)) {
2156 info("Job %u already killed, do not launch extern step",
2157 req->job_id);
2158 /*
2159 * Don't set the rc to SLURM_ERROR at this point.
2160 * The job's already been killed, and returning a prolog
2161 * failure will just add more confusion. Better to just
2162 * silently terminate.
2163 */
2164 } else {
2165 hostset_t step_hset = hostset_create(req->nodes);
2166 int rc;
2167
2168 debug3("%s: call to _forkexec_slurmstepd", __func__);
2169 rc = _forkexec_slurmstepd(LAUNCH_TASKS, (void *)launch_req,
2170 cli, &self, step_hset,
2171 msg->protocol_version);
2172 debug3("%s: return from _forkexec_slurmstepd %d",
2173 __func__, rc);
2174
2175 if (rc != SLURM_SUCCESS)
2176 _launch_job_fail(req->job_id, rc);
2177
2178 if (step_hset)
2179 hostset_destroy(step_hset);
2180 }
2181
2182 for (i = 0; i < req->nnodes; i++)
2183 xfree(launch_req->global_task_ids[i]);
2184 xfree(launch_req->global_task_ids);
2185 xfree(launch_req->tasks_to_launch);
2186 xfree(launch_req);
2187
2188 return rc;
2189 }
2190
_rpc_prolog(slurm_msg_t * msg)2191 static void _rpc_prolog(slurm_msg_t *msg)
2192 {
2193 int rc = SLURM_SUCCESS, alt_rc = SLURM_ERROR, node_id = 0;
2194 prolog_launch_msg_t *req = (prolog_launch_msg_t *)msg->data;
2195 job_env_t job_env;
2196 bool first_job_run;
2197 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2198 uint32_t jobid;
2199
2200 if (req == NULL)
2201 return;
2202
2203 if (!_slurm_authorized_user(req_uid)) {
2204 error("REQUEST_LAUNCH_PROLOG request from uid %u",
2205 (unsigned int) req_uid);
2206 return;
2207 }
2208
2209 if (!req->user_name)
2210 req->user_name = uid_to_string(req->uid);
2211 /*
2212 * Send message back to the slurmctld so it knows we got the rpc. A
2213 * prolog could easily run way longer than a MessageTimeout or we would
2214 * just wait.
2215 */
2216 if (slurm_send_rc_msg(msg, rc) < 0) {
2217 error("%s: Error talking to slurmctld: %m", __func__);
2218 }
2219
2220 slurm_mutex_lock(&prolog_mutex);
2221 first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
2222 if (first_job_run) {
2223 #ifndef HAVE_FRONT_END
2224 /* It is always 0 for front end systems */
2225 node_id = nodelist_find(req->nodes, conf->node_name);
2226 #endif
2227 if (slurmctld_conf.prolog_flags & PROLOG_FLAG_CONTAIN)
2228 _make_prolog_mem_container(msg);
2229
2230 slurm_cred_insert_jobid(conf->vctx, req->job_id);
2231 _add_job_running_prolog(req->job_id);
2232 /* signal just in case the batch rpc got here before we did */
2233 slurm_cond_broadcast(&conf->prolog_running_cond);
2234 slurm_mutex_unlock(&prolog_mutex);
2235 memset(&job_env, 0, sizeof(job_env));
2236 gres_plugin_epilog_set_env(&job_env.gres_job_env,
2237 req->job_gres_info, node_id);
2238
2239 job_env.jobid = req->job_id;
2240 job_env.step_id = 0; /* not available */
2241 job_env.node_list = req->nodes;
2242 job_env.het_job_id = req->het_job_id;
2243 job_env.partition = req->partition;
2244 job_env.spank_job_env = req->spank_job_env;
2245 job_env.spank_job_env_size = req->spank_job_env_size;
2246 job_env.uid = req->uid;
2247 job_env.gid = req->gid;
2248 job_env.user_name = req->user_name;
2249
2250 #ifdef HAVE_NATIVE_CRAY
2251 if (req->het_job_id && (req->het_job_id != NO_VAL))
2252 jobid = req->het_job_id;
2253 else
2254 jobid = req->job_id;
2255 #else
2256 jobid = req->job_id;
2257 #endif
2258
2259 if ((rc = container_g_create(jobid)))
2260 error("container_g_create(%u): %m", req->job_id);
2261 else
2262 rc = _run_prolog(&job_env, req->cred, false);
2263 _free_job_env(&job_env);
2264 if (rc) {
2265 int term_sig = 0, exit_status = 0;
2266 if (WIFSIGNALED(rc))
2267 term_sig = WTERMSIG(rc);
2268 else if (WIFEXITED(rc))
2269 exit_status = WEXITSTATUS(rc);
2270 error("[job %u] prolog failed status=%d:%d",
2271 req->job_id, exit_status, term_sig);
2272 rc = ESLURMD_PROLOG_FAILED;
2273 }
2274
2275 if ((rc == SLURM_SUCCESS) &&
2276 (slurmctld_conf.prolog_flags & PROLOG_FLAG_CONTAIN))
2277 rc = _spawn_prolog_stepd(msg);
2278
2279 /*
2280 * Revoke cred so that the slurmd won't launch tasks if the
2281 * prolog failed. The slurmd waits for the prolog to finish but
2282 * can't check the return code.
2283 */
2284 if (rc)
2285 slurm_cred_revoke(conf->vctx, req->job_id, time(NULL),
2286 time(NULL));
2287
2288 _remove_job_running_prolog(req->job_id);
2289 } else
2290 slurm_mutex_unlock(&prolog_mutex);
2291
2292 /*
2293 * We need the slurmctld to know we are done or we can get into a
2294 * situation where nothing from the job will ever launch because the
2295 * prolog will never appear to stop running.
2296 */
2297 while (alt_rc != SLURM_SUCCESS) {
2298 if (!(slurmctld_conf.prolog_flags & PROLOG_FLAG_NOHOLD))
2299 alt_rc = _notify_slurmctld_prolog_fini(
2300 req->job_id, rc);
2301 else
2302 alt_rc = SLURM_SUCCESS;
2303
2304 if (rc != SLURM_SUCCESS) {
2305 alt_rc = _launch_job_fail(req->job_id, rc);
2306 send_registration_msg(rc, false);
2307 }
2308
2309 if (alt_rc != SLURM_SUCCESS) {
2310 info("%s: Retrying prolog complete RPC for JobId=%u [sleeping %us]",
2311 __func__, req->job_id, RETRY_DELAY);
2312 sleep(RETRY_DELAY);
2313 }
2314 }
2315 }
2316
2317 static void
_rpc_batch_job(slurm_msg_t * msg,bool new_msg)2318 _rpc_batch_job(slurm_msg_t *msg, bool new_msg)
2319 {
2320 batch_job_launch_msg_t *req = (batch_job_launch_msg_t *)msg->data;
2321 bool first_job_run;
2322 int rc = SLURM_SUCCESS, node_id = 0;
2323 bool replied = false, revoked;
2324 slurm_addr_t *cli = &msg->orig_addr;
2325
2326 if (new_msg) {
2327 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2328 if (!_slurm_authorized_user(req_uid)) {
2329 error("Security violation, batch launch RPC from uid %d",
2330 req_uid);
2331 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
2332 goto done;
2333 }
2334 }
2335
2336 if (_launch_job_test(req->job_id)) {
2337 error("Job %u already running, do not launch second copy",
2338 req->job_id);
2339 rc = ESLURM_DUPLICATE_JOB_ID; /* job already running */
2340 _launch_job_fail(req->job_id, rc);
2341 goto done;
2342 }
2343
2344 slurm_cred_handle_reissue(conf->vctx, req->cred, false);
2345 if (slurm_cred_revoked(conf->vctx, req->cred)) {
2346 error("Job %u already killed, do not launch batch job",
2347 req->job_id);
2348 rc = ESLURMD_CREDENTIAL_REVOKED; /* job already ran */
2349 goto done;
2350 }
2351
2352 /* lookup user_name if not provided by slurmctld */
2353 if (!req->user_name)
2354 req->user_name = uid_to_string(req->uid);
2355
2356 /* lookup gids if they weren't sent by slurmctld */
2357 if (!req->ngids)
2358 req->ngids = group_cache_lookup(req->uid, req->gid,
2359 req->user_name, &req->gids);
2360
2361 task_g_slurmd_batch_request(req); /* determine task affinity */
2362
2363 slurm_mutex_lock(&prolog_mutex);
2364 first_job_run = !slurm_cred_jobid_cached(conf->vctx, req->job_id);
2365
2366 /* BlueGene prolog waits for partition boot and is very slow.
2367 * On any system we might need to load environment variables
2368 * for Moab (see --get-user-env), which could also be slow.
2369 * Just reply now and send a separate kill job request if the
2370 * prolog or launch fail. */
2371 replied = true;
2372 if (new_msg && (slurm_send_rc_msg(msg, rc) < 1)) {
2373 /* The slurmctld is no longer waiting for a reply.
2374 * This typically indicates that the slurmd was
2375 * blocked from memory and/or CPUs and the slurmctld
2376 * has requeued the batch job request. */
2377 error("Could not confirm batch launch for job %u, "
2378 "aborting request", req->job_id);
2379 rc = SLURM_COMMUNICATIONS_SEND_ERROR;
2380 slurm_mutex_unlock(&prolog_mutex);
2381 goto done;
2382 }
2383
2384 if (slurmctld_conf.prolog_flags & PROLOG_FLAG_ALLOC) {
2385 struct timespec ts = {0, 0};
2386 struct timeval now;
2387 int retry_cnt = 0;
2388 /*
2389 * We want to wait until the rpc_prolog is ran before
2390 * continuing. Since we are already locked on prolog_mutex here
2391 * we don't have to unlock to wait on the
2392 * conf->prolog_running_cond.
2393 */
2394 while (first_job_run) {
2395 retry_cnt++;
2396 /*
2397 * This race should only happen for at most a second as
2398 * we are only waiting for the other rpc to get here.
2399 * We should wait here for msg_timeout * 2, in case of
2400 * REQUEST_LAUNCH_PROLOG lost in forwarding tree the
2401 * direct retry from slurmctld will happen after
2402 * MessageTimeout.
2403 */
2404 if (retry_cnt > (slurmctld_conf.msg_timeout * 2)) {
2405 rc = ESLURMD_PROLOG_FAILED;
2406 slurm_mutex_unlock(&prolog_mutex);
2407 error("Waiting for JobId=%u REQUEST_LAUNCH_PROLOG notification failed, giving up after %u sec",
2408 req->job_id,
2409 slurmctld_conf.msg_timeout * 2);
2410 goto done;
2411 }
2412
2413 gettimeofday(&now, NULL);
2414 ts.tv_sec = now.tv_sec + 1;
2415 ts.tv_nsec = now.tv_usec * 1000;
2416
2417 slurm_cond_timedwait(&conf->prolog_running_cond,
2418 &prolog_mutex, &ts);
2419 first_job_run = !slurm_cred_jobid_cached(conf->vctx,
2420 req->job_id);
2421 }
2422 }
2423
2424 /*
2425 * Insert jobid into credential context to denote that
2426 * we've now "seen" an instance of the job
2427 */
2428 if (first_job_run) {
2429 job_env_t job_env;
2430 List job_gres_list, epi_env_gres_list;
2431 uint32_t jobid;
2432
2433 slurm_cred_insert_jobid(conf->vctx, req->job_id);
2434 _add_job_running_prolog(req->job_id);
2435 slurm_mutex_unlock(&prolog_mutex);
2436
2437 #ifndef HAVE_FRONT_END
2438 /* It is always 0 for front end systems */
2439 node_id = nodelist_find(req->nodes, conf->node_name);
2440 #endif
2441 memset(&job_env, 0, sizeof(job_env));
2442 job_gres_list = (List) slurm_cred_get_arg(req->cred,
2443 CRED_ARG_JOB_GRES_LIST);
2444 epi_env_gres_list = gres_plugin_epilog_build_env(job_gres_list,
2445 req->nodes);
2446 gres_plugin_epilog_set_env(&job_env.gres_job_env,
2447 epi_env_gres_list, node_id);
2448 FREE_NULL_LIST(epi_env_gres_list);
2449 job_env.jobid = req->job_id;
2450 job_env.step_id = req->step_id;
2451 job_env.node_list = req->nodes;
2452 job_env.het_job_id = req->het_job_id;
2453 job_env.partition = req->partition;
2454 job_env.spank_job_env = req->spank_job_env;
2455 job_env.spank_job_env_size = req->spank_job_env_size;
2456 job_env.uid = req->uid;
2457 job_env.gid = req->gid;
2458 job_env.user_name = req->user_name;
2459 /*
2460 * Run job prolog on this node
2461 */
2462
2463 #ifdef HAVE_NATIVE_CRAY
2464 if (req->het_job_id && (req->het_job_id != NO_VAL))
2465 jobid = req->het_job_id;
2466 else
2467 jobid = req->job_id;
2468 #else
2469 jobid = req->job_id;
2470 #endif
2471
2472 if ((rc = container_g_create(jobid)))
2473 error("container_g_create(%u): %m", req->job_id);
2474 else
2475 rc = _run_prolog(&job_env, req->cred, true);
2476 _free_job_env(&job_env);
2477 if (rc) {
2478 int term_sig = 0, exit_status = 0;
2479 if (WIFSIGNALED(rc))
2480 term_sig = WTERMSIG(rc);
2481 else if (WIFEXITED(rc))
2482 exit_status = WEXITSTATUS(rc);
2483 error("[job %u] prolog failed status=%d:%d",
2484 req->job_id, exit_status, term_sig);
2485 _prolog_error(req, rc);
2486 rc = ESLURMD_PROLOG_FAILED;
2487 goto done;
2488 }
2489 } else {
2490 slurm_mutex_unlock(&prolog_mutex);
2491 _wait_for_job_running_prolog(req->job_id);
2492 }
2493
2494 if (_get_user_env(req) < 0) {
2495 bool requeue = _requeue_setup_env_fail();
2496 if (requeue) {
2497 rc = ESLURMD_SETUP_ENVIRONMENT_ERROR;
2498 goto done;
2499 }
2500 }
2501 _set_batch_job_limits(msg);
2502
2503 /* Since job could have been killed while the prolog was
2504 * running (especially on BlueGene, which can take minutes
2505 * for partition booting). Test if the credential has since
2506 * been revoked and exit as needed. */
2507 if (slurm_cred_revoked(conf->vctx, req->cred)) {
2508 info("Job %u already killed, do not launch batch job",
2509 req->job_id);
2510 rc = ESLURMD_CREDENTIAL_REVOKED; /* job already ran */
2511 goto done;
2512 }
2513
2514 slurm_mutex_lock(&launch_mutex);
2515 info("Launching batch job %u for UID %u", req->job_id, req->uid);
2516
2517 debug3("_rpc_batch_job: call to _forkexec_slurmstepd");
2518 rc = _forkexec_slurmstepd(LAUNCH_BATCH_JOB, (void *)req, cli, NULL,
2519 (hostset_t)NULL, SLURM_PROTOCOL_VERSION);
2520 debug3("_rpc_batch_job: return from _forkexec_slurmstepd: %d", rc);
2521
2522 slurm_mutex_unlock(&launch_mutex);
2523 _launch_complete_add(req->job_id);
2524
2525 /* On a busy system, slurmstepd may take a while to respond,
2526 * if the job was cancelled in the interim, run through the
2527 * abort logic below. */
2528 revoked = slurm_cred_revoked(conf->vctx, req->cred);
2529 if (revoked)
2530 _launch_complete_rm(req->job_id);
2531 if (revoked && _is_batch_job_finished(req->job_id)) {
2532 /* If configured with select/serial and the batch job already
2533 * completed, consider the job successfully launched and do
2534 * not repeat termination logic below, which in the worst case
2535 * just slows things down with another message. */
2536 revoked = false;
2537 }
2538 if (revoked) {
2539 info("Job %u killed while launch was in progress",
2540 req->job_id);
2541 sleep(1); /* give slurmstepd time to create
2542 * the communication socket */
2543 _terminate_all_steps(req->job_id, true);
2544 rc = ESLURMD_CREDENTIAL_REVOKED;
2545 goto done;
2546 }
2547
2548 done:
2549 if (!replied) {
2550 if (new_msg && (slurm_send_rc_msg(msg, rc) < 1)) {
2551 /* The slurmctld is no longer waiting for a reply.
2552 * This typically indicates that the slurmd was
2553 * blocked from memory and/or CPUs and the slurmctld
2554 * has requeued the batch job request. */
2555 error("Could not confirm batch launch for job %u, "
2556 "aborting request", req->job_id);
2557 rc = SLURM_COMMUNICATIONS_SEND_ERROR;
2558 } else {
2559 /* No need to initiate separate reply below */
2560 rc = SLURM_SUCCESS;
2561 }
2562 }
2563 if (rc != SLURM_SUCCESS) {
2564 /* prolog or job launch failure,
2565 * tell slurmctld that the job failed */
2566 if (req->step_id == SLURM_BATCH_SCRIPT)
2567 _launch_job_fail(req->job_id, rc);
2568 else
2569 _abort_step(req->job_id, req->step_id);
2570 }
2571
2572 /*
2573 * If job prolog failed or we could not reply,
2574 * initiate message to slurmctld with current state
2575 */
2576 if ((rc == ESLURMD_PROLOG_FAILED)
2577 || (rc == SLURM_COMMUNICATIONS_SEND_ERROR)
2578 || (rc == ESLURMD_SETUP_ENVIRONMENT_ERROR)) {
2579 send_registration_msg(rc, false);
2580 }
2581 }
2582
2583 /*
2584 * Send notification message to batch job
2585 */
2586 static void
_rpc_job_notify(slurm_msg_t * msg)2587 _rpc_job_notify(slurm_msg_t *msg)
2588 {
2589 job_notify_msg_t *req = msg->data;
2590 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2591 uid_t job_uid;
2592 List steps;
2593 ListIterator i;
2594 step_loc_t *stepd = NULL;
2595 int step_cnt = 0;
2596 int fd;
2597
2598 debug("_rpc_job_notify, uid = %d, jobid = %u", req_uid, req->job_id);
2599 job_uid = _get_job_uid(req->job_id);
2600 if ((int)job_uid < 0)
2601 goto no_job;
2602
2603 /*
2604 * check that requesting user ID is the Slurm UID or root
2605 */
2606 if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) {
2607 error("Security violation: job_notify(%u) from uid %d",
2608 req->job_id, req_uid);
2609 return;
2610 }
2611
2612 steps = stepd_available(conf->spooldir, conf->node_name);
2613 i = list_iterator_create(steps);
2614 while ((stepd = list_next(i))) {
2615 if ((stepd->jobid != req->job_id) ||
2616 (stepd->stepid != SLURM_BATCH_SCRIPT)) {
2617 continue;
2618 }
2619
2620 step_cnt++;
2621
2622 fd = stepd_connect(stepd->directory, stepd->nodename,
2623 stepd->jobid, stepd->stepid,
2624 &stepd->protocol_version);
2625 if (fd == -1) {
2626 debug3("Unable to connect to step %u.%u",
2627 stepd->jobid, stepd->stepid);
2628 continue;
2629 }
2630
2631 info("send notification to job %u.%u",
2632 stepd->jobid, stepd->stepid);
2633 if (stepd_notify_job(fd, stepd->protocol_version,
2634 req->message) < 0)
2635 debug("notify jobid=%u failed: %m", stepd->jobid);
2636 close(fd);
2637 }
2638 list_iterator_destroy(i);
2639 FREE_NULL_LIST(steps);
2640
2641 no_job:
2642 if (step_cnt == 0) {
2643 debug2("Can't find jobid %u to send notification message",
2644 req->job_id);
2645 }
2646 }
2647
2648 static int
_launch_job_fail(uint32_t job_id,uint32_t slurm_rc)2649 _launch_job_fail(uint32_t job_id, uint32_t slurm_rc)
2650 {
2651 complete_batch_script_msg_t comp_msg = {0};
2652 struct requeue_msg req_msg = {0};
2653 slurm_msg_t resp_msg;
2654 int rc = 0, rpc_rc;
2655 static time_t config_update = 0;
2656 static bool requeue_no_hold = false;
2657
2658 if (config_update != conf->last_update) {
2659 char *sched_params = slurm_get_sched_params();
2660 requeue_no_hold = (xstrcasestr(sched_params,
2661 "nohold_on_prolog_fail"));
2662 xfree(sched_params);
2663 config_update = conf->last_update;
2664 }
2665
2666 slurm_msg_t_init(&resp_msg);
2667
2668 if (slurm_rc == ESLURMD_CREDENTIAL_REVOKED) {
2669 comp_msg.job_id = job_id;
2670 comp_msg.job_rc = INFINITE;
2671 comp_msg.slurm_rc = slurm_rc;
2672 comp_msg.node_name = conf->node_name;
2673 comp_msg.jobacct = NULL; /* unused */
2674 resp_msg.msg_type = REQUEST_COMPLETE_BATCH_SCRIPT;
2675 resp_msg.data = &comp_msg;
2676 } else {
2677 req_msg.job_id = job_id;
2678 req_msg.job_id_str = NULL;
2679 if (requeue_no_hold)
2680 req_msg.flags = JOB_PENDING;
2681 else
2682 req_msg.flags = (JOB_REQUEUE_HOLD | JOB_LAUNCH_FAILED);
2683 resp_msg.msg_type = REQUEST_JOB_REQUEUE;
2684 resp_msg.data = &req_msg;
2685 }
2686
2687 rpc_rc = slurm_send_recv_controller_rc_msg(&resp_msg, &rc,
2688 working_cluster_rec);
2689 if ((resp_msg.msg_type == REQUEST_JOB_REQUEUE) &&
2690 ((rc == ESLURM_DISABLED) || (rc == ESLURM_BATCH_ONLY))) {
2691 info("Could not launch job %u and not able to requeue it, "
2692 "cancelling job", job_id);
2693
2694 if ((slurm_rc == ESLURMD_PROLOG_FAILED) &&
2695 (rc == ESLURM_BATCH_ONLY)) {
2696 char *buf = NULL;
2697 xstrfmtcat(buf, "Prolog failure on node %s",
2698 conf->node_name);
2699 slurm_notify_job(job_id, buf);
2700 xfree(buf);
2701 }
2702
2703 comp_msg.job_id = job_id;
2704 comp_msg.job_rc = INFINITE;
2705 comp_msg.slurm_rc = slurm_rc;
2706 comp_msg.node_name = conf->node_name;
2707 comp_msg.jobacct = NULL; /* unused */
2708 resp_msg.msg_type = REQUEST_COMPLETE_BATCH_SCRIPT;
2709 resp_msg.data = &comp_msg;
2710 rpc_rc = slurm_send_recv_controller_rc_msg(&resp_msg, &rc,
2711 working_cluster_rec);
2712 }
2713
2714 return rpc_rc;
2715 }
2716
2717 static int
_abort_step(uint32_t job_id,uint32_t step_id)2718 _abort_step(uint32_t job_id, uint32_t step_id)
2719 {
2720 step_complete_msg_t resp;
2721 slurm_msg_t resp_msg;
2722 slurm_msg_t_init(&resp_msg);
2723 int rc, rc2;
2724
2725 memset(&resp, 0, sizeof(resp));
2726 resp.job_id = job_id;
2727 resp.job_step_id = step_id;
2728 resp.range_first = 0;
2729 resp.range_last = 0;
2730 resp.step_rc = 1;
2731 resp.jobacct = jobacctinfo_create(NULL);
2732 resp_msg.msg_type = REQUEST_STEP_COMPLETE;
2733 resp_msg.data = &resp;
2734 rc2 = slurm_send_recv_controller_rc_msg(&resp_msg, &rc,
2735 working_cluster_rec);
2736 /* Note: we are ignoring the RPC return code */
2737 jobacctinfo_destroy(resp.jobacct);
2738 return rc2;
2739 }
2740
2741 static void
_rpc_reconfig(slurm_msg_t * msg)2742 _rpc_reconfig(slurm_msg_t *msg)
2743 {
2744 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2745
2746 if (!_slurm_authorized_user(req_uid))
2747 error("Security violation, reconfig RPC from uid %d",
2748 req_uid);
2749 else
2750 kill(conf->pid, SIGHUP);
2751 forward_wait(msg);
2752 /* Never return a message, slurmctld does not expect one */
2753 }
2754
_rpc_reconfig_with_config(slurm_msg_t * msg)2755 static void _rpc_reconfig_with_config(slurm_msg_t *msg)
2756 {
2757 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2758
2759 if (!_slurm_authorized_user(req_uid))
2760 error("Security violation, reconfig RPC from uid %d",
2761 req_uid);
2762 else {
2763 if (conf->conf_cache) {
2764 config_response_msg_t *configs =
2765 (config_response_msg_t *) msg->data;
2766 /*
2767 * Running in "configless" mode as indicated by the
2768 * cache directory's existance. Update those so
2769 * our reconfigure picks up the changes, and so
2770 * client commands see the changes as well.
2771 */
2772 write_configs_to_conf_cache(configs, conf->conf_cache);
2773 }
2774 kill(conf->pid, SIGHUP);
2775 }
2776 forward_wait(msg);
2777 /* Never return a message, slurmctld does not expect one */
2778 }
2779
2780 static void
_rpc_shutdown(slurm_msg_t * msg)2781 _rpc_shutdown(slurm_msg_t *msg)
2782 {
2783 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2784
2785 forward_wait(msg);
2786 if (!_slurm_authorized_user(req_uid))
2787 error("Security violation, shutdown RPC from uid %d",
2788 req_uid);
2789 else {
2790 if (kill(conf->pid, SIGTERM) != 0)
2791 error("kill(%u,SIGTERM): %m", conf->pid);
2792 }
2793
2794 /* Never return a message, slurmctld does not expect one */
2795 }
2796
2797 static void
_rpc_reboot(slurm_msg_t * msg)2798 _rpc_reboot(slurm_msg_t *msg)
2799 {
2800 char *reboot_program, *cmd = NULL, *sp;
2801 reboot_msg_t *reboot_msg;
2802 slurm_ctl_conf_t *cfg;
2803 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
2804 int exit_code;
2805
2806 if (!_slurm_authorized_user(req_uid))
2807 error("Security violation, reboot RPC from uid %d",
2808 req_uid);
2809 else {
2810 cfg = slurm_conf_lock();
2811 reboot_program = cfg->reboot_program;
2812 if (reboot_program) {
2813 sp = strchr(reboot_program, ' ');
2814 if (sp)
2815 sp = xstrndup(reboot_program,
2816 (sp - reboot_program));
2817 else
2818 sp = xstrdup(reboot_program);
2819 reboot_msg = (reboot_msg_t *) msg->data;
2820 if (reboot_msg && reboot_msg->features) {
2821 /*
2822 * Run reboot_program with only arguments given
2823 * in reboot_msg->features.
2824 */
2825 info("Node reboot request with features %s being processed",
2826 reboot_msg->features);
2827 (void) node_features_g_node_set(
2828 reboot_msg->features);
2829 if (reboot_msg->features[0]) {
2830 xstrfmtcat(cmd, "%s %s",
2831 sp, reboot_msg->features);
2832 } else {
2833 cmd = xstrdup(sp);
2834 }
2835 } else {
2836 /* Run reboot_program verbatim */
2837 cmd = xstrdup(reboot_program);
2838 info("Node reboot request being processed");
2839 }
2840 if (access(sp, R_OK | X_OK) < 0)
2841 error("Cannot run RebootProgram [%s]: %m", sp);
2842 else if ((exit_code = system(cmd)))
2843 error("system(%s) returned %d", reboot_program,
2844 exit_code);
2845 xfree(sp);
2846 xfree(cmd);
2847
2848 /*
2849 * Explicitly shutdown the slurmd. This is usually
2850 * taken care of by calling reboot_program, but in
2851 * case that fails to shut things down this will at
2852 * least offline this node until someone intervenes.
2853 */
2854 if (xstrcasestr(cfg->slurmd_params,
2855 "shutdown_on_reboot"))
2856 slurmd_shutdown(SIGTERM);
2857 } else
2858 error("RebootProgram isn't defined in config");
2859 slurm_conf_unlock();
2860 }
2861
2862 /* Never return a message, slurmctld does not expect one */
2863 /* slurm_send_rc_msg(msg, rc); */
2864 }
2865
_job_limits_match(void * x,void * key)2866 static int _job_limits_match(void *x, void *key)
2867 {
2868 job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x;
2869 uint32_t *job_id = (uint32_t *) key;
2870 if (job_limits_ptr->job_id == *job_id)
2871 return 1;
2872 return 0;
2873 }
2874
_step_limits_match(void * x,void * key)2875 static int _step_limits_match(void *x, void *key)
2876 {
2877 job_mem_limits_t *job_limits_ptr = (job_mem_limits_t *) x;
2878 step_loc_t *step_ptr = (step_loc_t *) key;
2879
2880 if ((job_limits_ptr->job_id == step_ptr->jobid) &&
2881 (job_limits_ptr->step_id == step_ptr->stepid))
2882 return 1;
2883 return 0;
2884 }
2885
2886 /* Call only with job_limits_mutex locked */
2887 static void
_load_job_limits(void)2888 _load_job_limits(void)
2889 {
2890 List steps;
2891 ListIterator step_iter;
2892 step_loc_t *stepd;
2893 int fd;
2894 job_mem_limits_t *job_limits_ptr;
2895 slurmstepd_mem_info_t stepd_mem_info;
2896
2897 if (!job_limits_list)
2898 job_limits_list = list_create(xfree_ptr);
2899 job_limits_loaded = true;
2900
2901 steps = stepd_available(conf->spooldir, conf->node_name);
2902 step_iter = list_iterator_create(steps);
2903 while ((stepd = list_next(step_iter))) {
2904 job_limits_ptr = list_find_first(job_limits_list,
2905 _step_limits_match, stepd);
2906 if (job_limits_ptr) /* already processed */
2907 continue;
2908 fd = stepd_connect(stepd->directory, stepd->nodename,
2909 stepd->jobid, stepd->stepid,
2910 &stepd->protocol_version);
2911 if (fd == -1)
2912 continue; /* step completed */
2913
2914 if (stepd_get_mem_limits(fd, stepd->protocol_version,
2915 &stepd_mem_info) != SLURM_SUCCESS) {
2916 error("Error reading step %u.%u memory limits from "
2917 "slurmstepd",
2918 stepd->jobid, stepd->stepid);
2919 close(fd);
2920 continue;
2921 }
2922
2923
2924 if ((stepd_mem_info.job_mem_limit
2925 || stepd_mem_info.step_mem_limit)) {
2926 /* create entry for this job */
2927 job_limits_ptr = xmalloc(sizeof(job_mem_limits_t));
2928 job_limits_ptr->job_id = stepd->jobid;
2929 job_limits_ptr->step_id = stepd->stepid;
2930 job_limits_ptr->job_mem =
2931 stepd_mem_info.job_mem_limit;
2932 job_limits_ptr->step_mem =
2933 stepd_mem_info.step_mem_limit;
2934 #if _LIMIT_INFO
2935 info("RecLim step:%u.%u job_mem:%"PRIu64""
2936 " step_mem:%"PRIu64"",
2937 job_limits_ptr->job_id, job_limits_ptr->step_id,
2938 job_limits_ptr->job_mem,
2939 job_limits_ptr->step_mem);
2940 #endif
2941 list_append(job_limits_list, job_limits_ptr);
2942 }
2943 close(fd);
2944 }
2945 list_iterator_destroy(step_iter);
2946 FREE_NULL_LIST(steps);
2947 }
2948
2949 static void
_cancel_step_mem_limit(uint32_t job_id,uint32_t step_id)2950 _cancel_step_mem_limit(uint32_t job_id, uint32_t step_id)
2951 {
2952 slurm_msg_t msg;
2953 job_notify_msg_t notify_req;
2954 job_step_kill_msg_t kill_req;
2955
2956 /* NOTE: Batch jobs may have no srun to get this message */
2957 slurm_msg_t_init(&msg);
2958 memset(¬ify_req, 0, sizeof(notify_req));
2959 notify_req.job_id = job_id;
2960 notify_req.job_step_id = step_id;
2961 notify_req.message = "Exceeded job memory limit";
2962 msg.msg_type = REQUEST_JOB_NOTIFY;
2963 msg.data = ¬ify_req;
2964 slurm_send_only_controller_msg(&msg, working_cluster_rec);
2965
2966 memset(&kill_req, 0, sizeof(kill_req));
2967 kill_req.job_id = job_id;
2968 kill_req.job_step_id = step_id;
2969 kill_req.signal = SIGKILL;
2970 kill_req.flags = KILL_OOM;
2971 msg.msg_type = REQUEST_CANCEL_JOB_STEP;
2972 msg.data = &kill_req;
2973 slurm_send_only_controller_msg(&msg, working_cluster_rec);
2974 }
2975
2976 /* Enforce job memory limits here in slurmd. Step memory limits are
2977 * enforced within slurmstepd (using jobacct_gather plugin). */
2978 static void
_enforce_job_mem_limit(void)2979 _enforce_job_mem_limit(void)
2980 {
2981 List steps;
2982 ListIterator step_iter, job_limits_iter;
2983 job_mem_limits_t *job_limits_ptr;
2984 step_loc_t *stepd;
2985 int fd, i, job_inx, job_cnt;
2986 uint16_t vsize_factor;
2987 uint64_t step_rss, step_vsize;
2988 job_step_id_msg_t acct_req;
2989 job_step_stat_t *resp = NULL;
2990 struct job_mem_info {
2991 uint32_t job_id;
2992 uint64_t mem_limit; /* MB */
2993 uint64_t mem_used; /* MB */
2994 uint64_t vsize_limit; /* MB */
2995 uint64_t vsize_used; /* MB */
2996 };
2997 struct job_mem_info *job_mem_info_ptr = NULL;
2998
2999 if (conf->job_acct_oom_kill == false)
3000 return;
3001
3002 slurm_mutex_lock(&job_limits_mutex);
3003 if (!job_limits_loaded)
3004 _load_job_limits();
3005 if (list_count(job_limits_list) == 0) {
3006 slurm_mutex_unlock(&job_limits_mutex);
3007 return;
3008 }
3009
3010 /* Build table of job limits, use highest mem limit recorded */
3011 job_mem_info_ptr = xmalloc((list_count(job_limits_list) + 1) *
3012 sizeof(struct job_mem_info));
3013 job_cnt = 0;
3014 job_limits_iter = list_iterator_create(job_limits_list);
3015 while ((job_limits_ptr = list_next(job_limits_iter))) {
3016 if (job_limits_ptr->job_mem == 0) /* no job limit */
3017 continue;
3018 for (i=0; i<job_cnt; i++) {
3019 if (job_mem_info_ptr[i].job_id !=
3020 job_limits_ptr->job_id)
3021 continue;
3022 job_mem_info_ptr[i].mem_limit = MAX(
3023 job_mem_info_ptr[i].mem_limit,
3024 job_limits_ptr->job_mem);
3025 break;
3026 }
3027 if (i < job_cnt) /* job already found & recorded */
3028 continue;
3029 job_mem_info_ptr[job_cnt].job_id = job_limits_ptr->job_id;
3030 job_mem_info_ptr[job_cnt].mem_limit = job_limits_ptr->job_mem;
3031 job_cnt++;
3032 }
3033 list_iterator_destroy(job_limits_iter);
3034 slurm_mutex_unlock(&job_limits_mutex);
3035
3036 vsize_factor = slurm_get_vsize_factor();
3037 for (i=0; i<job_cnt; i++) {
3038 job_mem_info_ptr[i].vsize_limit = job_mem_info_ptr[i].
3039 mem_limit;
3040 job_mem_info_ptr[i].vsize_limit *= (vsize_factor / 100.0);
3041 }
3042
3043 steps = stepd_available(conf->spooldir, conf->node_name);
3044 step_iter = list_iterator_create(steps);
3045 while ((stepd = list_next(step_iter))) {
3046 for (job_inx=0; job_inx<job_cnt; job_inx++) {
3047 if (job_mem_info_ptr[job_inx].job_id == stepd->jobid)
3048 break;
3049 }
3050 if (job_inx >= job_cnt)
3051 continue; /* job/step not being tracked */
3052
3053 fd = stepd_connect(stepd->directory, stepd->nodename,
3054 stepd->jobid, stepd->stepid,
3055 &stepd->protocol_version);
3056 if (fd == -1)
3057 continue; /* step completed */
3058 acct_req.job_id = stepd->jobid;
3059 acct_req.step_id = stepd->stepid;
3060 resp = xmalloc(sizeof(job_step_stat_t));
3061
3062 if ((!stepd_stat_jobacct(
3063 fd, stepd->protocol_version,
3064 &acct_req, resp)) &&
3065 (resp->jobacct)) {
3066 /* resp->jobacct is NULL if account is disabled */
3067 jobacctinfo_getinfo((struct jobacctinfo *)
3068 resp->jobacct,
3069 JOBACCT_DATA_TOT_RSS,
3070 &step_rss,
3071 stepd->protocol_version);
3072 jobacctinfo_getinfo((struct jobacctinfo *)
3073 resp->jobacct,
3074 JOBACCT_DATA_TOT_VSIZE,
3075 &step_vsize,
3076 stepd->protocol_version);
3077 #if _LIMIT_INFO
3078 info("Step:%u.%u RSS:%"PRIu64" B VSIZE:%"PRIu64" B",
3079 stepd->jobid, stepd->stepid,
3080 step_rss, step_vsize);
3081 #endif
3082 if (step_rss != INFINITE64) {
3083 step_rss /= 1048576; /* B to MB */
3084 step_rss = MAX(step_rss, 1);
3085 job_mem_info_ptr[job_inx].mem_used += step_rss;
3086 }
3087 if (step_vsize != INFINITE64) {
3088 step_vsize /= 1048576; /* B to MB */
3089 step_vsize = MAX(step_vsize, 1);
3090 job_mem_info_ptr[job_inx].vsize_used +=
3091 step_vsize;
3092 }
3093 }
3094 slurm_free_job_step_stat(resp);
3095 close(fd);
3096 }
3097 list_iterator_destroy(step_iter);
3098 FREE_NULL_LIST(steps);
3099
3100 for (i=0; i<job_cnt; i++) {
3101 if (job_mem_info_ptr[i].mem_used == 0) {
3102 /* no steps found,
3103 * purge records for all steps of this job */
3104 slurm_mutex_lock(&job_limits_mutex);
3105 list_delete_all(job_limits_list, _job_limits_match,
3106 &job_mem_info_ptr[i].job_id);
3107 slurm_mutex_unlock(&job_limits_mutex);
3108 break;
3109 }
3110
3111 if ((job_mem_info_ptr[i].mem_limit != 0) &&
3112 (job_mem_info_ptr[i].mem_used >
3113 job_mem_info_ptr[i].mem_limit)) {
3114 info("Job %u exceeded memory limit "
3115 "(%"PRIu64">%"PRIu64"), cancelling it",
3116 job_mem_info_ptr[i].job_id,
3117 job_mem_info_ptr[i].mem_used,
3118 job_mem_info_ptr[i].mem_limit);
3119 _cancel_step_mem_limit(job_mem_info_ptr[i].job_id,
3120 NO_VAL);
3121 } else if ((job_mem_info_ptr[i].vsize_limit != 0) &&
3122 (job_mem_info_ptr[i].vsize_used >
3123 job_mem_info_ptr[i].vsize_limit)) {
3124 info("Job %u exceeded virtual memory limit "
3125 "(%"PRIu64">%"PRIu64"), cancelling it",
3126 job_mem_info_ptr[i].job_id,
3127 job_mem_info_ptr[i].vsize_used,
3128 job_mem_info_ptr[i].vsize_limit);
3129 _cancel_step_mem_limit(job_mem_info_ptr[i].job_id,
3130 NO_VAL);
3131 }
3132 }
3133 xfree(job_mem_info_ptr);
3134 }
3135
3136 static int
_rpc_ping(slurm_msg_t * msg)3137 _rpc_ping(slurm_msg_t *msg)
3138 {
3139 int rc = SLURM_SUCCESS;
3140 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3141 static bool first_msg = true;
3142
3143 if (!_slurm_authorized_user(req_uid)) {
3144 error("Security violation, ping RPC from uid %d",
3145 req_uid);
3146 if (first_msg) {
3147 error("Do you have SlurmUser configured as uid %d?",
3148 req_uid);
3149 }
3150 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3151 }
3152 first_msg = false;
3153
3154 if (rc != SLURM_SUCCESS) {
3155 /* Return result. If the reply can't be sent this indicates
3156 * 1. The network is broken OR
3157 * 2. slurmctld has died OR
3158 * 3. slurmd was paged out due to full memory
3159 * If the reply request fails, we send an registration message
3160 * to slurmctld in hopes of avoiding having the node set DOWN
3161 * due to slurmd paging and not being able to respond in a
3162 * timely fashion. */
3163 if (slurm_send_rc_msg(msg, rc) < 0) {
3164 error("Error responding to ping: %m");
3165 send_registration_msg(SLURM_SUCCESS, false);
3166 }
3167 } else {
3168 slurm_msg_t resp_msg;
3169 ping_slurmd_resp_msg_t ping_resp;
3170 get_cpu_load(&ping_resp.cpu_load);
3171 get_free_mem(&ping_resp.free_mem);
3172 slurm_msg_t_copy(&resp_msg, msg);
3173 resp_msg.msg_type = RESPONSE_PING_SLURMD;
3174 resp_msg.data = &ping_resp;
3175
3176 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3177 }
3178
3179 /* Take this opportunity to enforce any job memory limits */
3180 _enforce_job_mem_limit();
3181 /* Clear up any stalled file transfers as well */
3182 _file_bcast_cleanup();
3183 return rc;
3184 }
3185
3186 static int
_rpc_health_check(slurm_msg_t * msg)3187 _rpc_health_check(slurm_msg_t *msg)
3188 {
3189 int rc = SLURM_SUCCESS;
3190 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3191
3192 if (!_slurm_authorized_user(req_uid)) {
3193 error("Security violation, health check RPC from uid %d",
3194 req_uid);
3195 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3196 }
3197
3198 /* Return result. If the reply can't be sent this indicates that
3199 * 1. The network is broken OR
3200 * 2. slurmctld has died OR
3201 * 3. slurmd was paged out due to full memory
3202 * If the reply request fails, we send an registration message to
3203 * slurmctld in hopes of avoiding having the node set DOWN due to
3204 * slurmd paging and not being able to respond in a timely fashion. */
3205 if (slurm_send_rc_msg(msg, rc) < 0) {
3206 error("Error responding to health check: %m");
3207 send_registration_msg(SLURM_SUCCESS, false);
3208 }
3209
3210 if (rc == SLURM_SUCCESS)
3211 rc = run_script_health_check();
3212
3213 /* Take this opportunity to enforce any job memory limits */
3214 _enforce_job_mem_limit();
3215 /* Clear up any stalled file transfers as well */
3216 _file_bcast_cleanup();
3217 return rc;
3218 }
3219
3220
3221 static int
_rpc_acct_gather_update(slurm_msg_t * msg)3222 _rpc_acct_gather_update(slurm_msg_t *msg)
3223 {
3224 int rc = SLURM_SUCCESS;
3225 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3226 static bool first_msg = true;
3227
3228 if (!_slurm_authorized_user(req_uid)) {
3229 error("Security violation, acct_gather_update RPC from uid %d",
3230 req_uid);
3231 if (first_msg) {
3232 error("Do you have SlurmUser configured as uid %d?",
3233 req_uid);
3234 }
3235 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3236 }
3237 first_msg = false;
3238
3239 if (rc != SLURM_SUCCESS) {
3240 /* Return result. If the reply can't be sent this indicates
3241 * 1. The network is broken OR
3242 * 2. slurmctld has died OR
3243 * 3. slurmd was paged out due to full memory
3244 * If the reply request fails, we send an registration message
3245 * to slurmctld in hopes of avoiding having the node set DOWN
3246 * due to slurmd paging and not being able to respond in a
3247 * timely fashion. */
3248 if (slurm_send_rc_msg(msg, rc) < 0) {
3249 error("Error responding to account gather: %m");
3250 send_registration_msg(SLURM_SUCCESS, false);
3251 }
3252 } else {
3253 slurm_msg_t resp_msg;
3254 acct_gather_node_resp_msg_t acct_msg;
3255
3256 /* Update node energy usage data */
3257 acct_gather_energy_g_update_node_energy();
3258
3259 memset(&acct_msg, 0, sizeof(acct_msg));
3260 acct_msg.node_name = conf->node_name;
3261 acct_msg.sensor_cnt = 1;
3262 acct_msg.energy = acct_gather_energy_alloc(acct_msg.sensor_cnt);
3263 (void) acct_gather_energy_g_get_sum(
3264 ENERGY_DATA_NODE_ENERGY, acct_msg.energy);
3265
3266 slurm_msg_t_copy(&resp_msg, msg);
3267 resp_msg.msg_type = RESPONSE_ACCT_GATHER_UPDATE;
3268 resp_msg.data = &acct_msg;
3269
3270 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3271
3272 acct_gather_energy_destroy(acct_msg.energy);
3273 }
3274 return rc;
3275 }
3276
3277 static int
_rpc_acct_gather_energy(slurm_msg_t * msg)3278 _rpc_acct_gather_energy(slurm_msg_t *msg)
3279 {
3280 int rc = SLURM_SUCCESS;
3281 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3282 static bool first_msg = true;
3283
3284 if (!_slurm_authorized_user(req_uid)) {
3285 error("Security violation, acct_gather_update RPC from uid %d",
3286 req_uid);
3287 if (first_msg) {
3288 error("Do you have SlurmUser configured as uid %d?",
3289 req_uid);
3290 }
3291 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3292 }
3293 first_msg = false;
3294
3295 if (rc != SLURM_SUCCESS) {
3296 if (slurm_send_rc_msg(msg, rc) < 0)
3297 error("Error responding to energy request: %m");
3298 } else {
3299 slurm_msg_t resp_msg;
3300 acct_gather_node_resp_msg_t acct_msg;
3301 time_t now = time(NULL), last_poll = 0;
3302 int data_type = ENERGY_DATA_STRUCT;
3303 uint16_t sensor_cnt;
3304 acct_gather_energy_req_msg_t *req = msg->data;
3305
3306 if (req->context_id == NO_VAL16) {
3307 rc = SLURM_PROTOCOL_VERSION_ERROR;
3308 if (slurm_send_rc_msg(msg, rc) < 0)
3309 error("Error responding to energy request: %m");
3310 return rc;
3311 }
3312
3313 acct_gather_energy_g_get_data(req->context_id,
3314 ENERGY_DATA_LAST_POLL,
3315 &last_poll);
3316 acct_gather_energy_g_get_data(req->context_id,
3317 ENERGY_DATA_SENSOR_CNT,
3318 &sensor_cnt);
3319
3320 /* If we polled later than delta seconds then force a
3321 new poll.
3322 */
3323 if ((now - last_poll) > req->delta)
3324 data_type = ENERGY_DATA_JOULES_TASK;
3325
3326 memset(&acct_msg, 0, sizeof(acct_msg));
3327 acct_msg.sensor_cnt = sensor_cnt;
3328 acct_msg.energy = acct_gather_energy_alloc(acct_msg.sensor_cnt);
3329
3330 acct_gather_energy_g_get_data(req->context_id,
3331 data_type,
3332 acct_msg.energy);
3333
3334 slurm_msg_t_copy(&resp_msg, msg);
3335 resp_msg.msg_type = RESPONSE_ACCT_GATHER_ENERGY;
3336 resp_msg.data = &acct_msg;
3337
3338 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3339
3340 acct_gather_energy_destroy(acct_msg.energy);
3341 }
3342 return rc;
3343 }
3344
3345 static int
_signal_jobstep(uint32_t jobid,uint32_t stepid,uint16_t signal,uint16_t flags,uid_t req_uid)3346 _signal_jobstep(uint32_t jobid, uint32_t stepid, uint16_t signal,
3347 uint16_t flags, uid_t req_uid)
3348 {
3349 int fd, rc = SLURM_SUCCESS;
3350 uint16_t protocol_version;
3351
3352 /*
3353 * There will be no stepd if the prolog is still running
3354 * Return failure so caller can retry.
3355 */
3356 if (_prolog_is_running(jobid)) {
3357 info("signal %d req for %u.%u while prolog is running."
3358 " Returning failure.", signal, jobid, stepid);
3359 return ESLURM_TRANSITION_STATE_NO_UPDATE;
3360 }
3361
3362 fd = stepd_connect(conf->spooldir, conf->node_name, jobid, stepid,
3363 &protocol_version);
3364 if (fd == -1) {
3365 debug("signal for nonexistent %u.%u stepd_connect failed: %m",
3366 jobid, stepid);
3367 return ESLURM_INVALID_JOB_ID;
3368 }
3369
3370 debug2("container signal %d to job %u.%u", signal, jobid, stepid);
3371 rc = stepd_signal_container(fd, protocol_version, signal, flags,
3372 req_uid);
3373 if (rc == -1)
3374 rc = ESLURMD_JOB_NOTRUNNING;
3375
3376 close(fd);
3377 return rc;
3378 }
3379
3380 static void
_rpc_signal_tasks(slurm_msg_t * msg)3381 _rpc_signal_tasks(slurm_msg_t *msg)
3382 {
3383 int rc = SLURM_SUCCESS;
3384 signal_tasks_msg_t *req = (signal_tasks_msg_t *) msg->data;
3385 uid_t job_uid;
3386 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3387
3388 job_uid = _get_job_uid(req->job_id);
3389 if ((int)job_uid < 0) {
3390 debug("%s: failed to get job_uid for job %u",
3391 __func__, req->job_id);
3392 rc = ESLURM_INVALID_JOB_ID;
3393 goto done;
3394 }
3395
3396 if ((req_uid != job_uid) && (!_slurm_authorized_user(req_uid))) {
3397 debug("%s: from uid %ld for job %u owned by uid %ld",
3398 __func__, (long)req_uid, req->job_id, (long)job_uid);
3399 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3400 goto done;
3401 }
3402
3403 /* security is handled when communicating with the stepd */
3404 if ((req->flags & KILL_FULL_JOB) || (req->flags & KILL_JOB_BATCH)) {
3405 debug("%s: sending signal %u to entire job %u flag %u",
3406 __func__, req->signal, req->job_id, req->flags);
3407 _kill_all_active_steps(req->job_id, req->signal,
3408 req->flags, true, req_uid);
3409 } else if (req->flags & KILL_STEPS_ONLY) {
3410 debug("%s: sending signal %u to all steps job %u flag %u",
3411 __func__, req->signal, req->job_id, req->flags);
3412 _kill_all_active_steps(req->job_id, req->signal,
3413 req->flags, false, req_uid);
3414 } else {
3415 debug("%s: sending signal %u to step %u.%u flag %u", __func__,
3416 req->signal, req->job_id, req->job_step_id, req->flags);
3417 rc = _signal_jobstep(req->job_id, req->job_step_id,
3418 req->signal, req->flags, req_uid);
3419 }
3420 done:
3421 slurm_send_rc_msg(msg, rc);
3422 }
3423
3424 static void
_rpc_terminate_tasks(slurm_msg_t * msg)3425 _rpc_terminate_tasks(slurm_msg_t *msg)
3426 {
3427 signal_tasks_msg_t *req = (signal_tasks_msg_t *) msg->data;
3428 int rc = SLURM_SUCCESS;
3429 int fd;
3430 uint16_t protocol_version;
3431 uid_t uid;
3432 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3433
3434 debug3("Entering _rpc_terminate_tasks");
3435 fd = stepd_connect(conf->spooldir, conf->node_name,
3436 req->job_id, req->job_step_id, &protocol_version);
3437 if (fd == -1) {
3438 debug("kill for nonexistent job %u.%u stepd_connect "
3439 "failed: %m", req->job_id, req->job_step_id);
3440 rc = ESLURM_INVALID_JOB_ID;
3441 goto done;
3442 }
3443
3444 if ((int)(uid = stepd_get_uid(fd, protocol_version)) < 0) {
3445 debug("terminate_tasks couldn't read from the step %u.%u: %m",
3446 req->job_id, req->job_step_id);
3447 rc = ESLURM_INVALID_JOB_ID;
3448 goto done2;
3449 }
3450
3451 if ((req_uid != uid)
3452 && (!_slurm_authorized_user(req_uid))) {
3453 debug("kill req from uid %ld for job %u.%u owned by uid %ld",
3454 (long) req_uid, req->job_id, req->job_step_id,
3455 (long) uid);
3456 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3457 goto done2;
3458 }
3459
3460 rc = stepd_terminate(fd, protocol_version);
3461 if (rc == -1)
3462 rc = ESLURMD_JOB_NOTRUNNING;
3463
3464 done2:
3465 close(fd);
3466 done:
3467 slurm_send_rc_msg(msg, rc);
3468 }
3469
3470 static int
_rpc_step_complete(slurm_msg_t * msg)3471 _rpc_step_complete(slurm_msg_t *msg)
3472 {
3473 step_complete_msg_t *req = (step_complete_msg_t *)msg->data;
3474 int rc = SLURM_SUCCESS;
3475 int fd;
3476 uint16_t protocol_version;
3477 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3478
3479 debug3("Entering _rpc_step_complete");
3480 fd = stepd_connect(conf->spooldir, conf->node_name,
3481 req->job_id, req->job_step_id, &protocol_version);
3482 if (fd == -1) {
3483 error("stepd_connect to %u.%u failed: %m",
3484 req->job_id, req->job_step_id);
3485 rc = ESLURM_INVALID_JOB_ID;
3486 goto done;
3487 }
3488
3489 /* step completion messages are only allowed from other slurmstepd,
3490 so only root or SlurmUser is allowed here */
3491 if (!_slurm_authorized_user(req_uid)) {
3492 debug("step completion from uid %ld for job %u.%u",
3493 (long) req_uid, req->job_id, req->job_step_id);
3494 rc = ESLURM_USER_ID_MISSING; /* or bad in this case */
3495 goto done2;
3496 }
3497
3498 rc = stepd_completion(fd, protocol_version, req);
3499 if (rc == -1)
3500 rc = ESLURMD_JOB_NOTRUNNING;
3501
3502 done2:
3503 close(fd);
3504 done:
3505 slurm_send_rc_msg(msg, rc);
3506
3507 return rc;
3508 }
3509
_setup_step_complete_msg(slurm_msg_t * msg,void * data)3510 static void _setup_step_complete_msg(slurm_msg_t *msg, void *data)
3511 {
3512 slurm_msg_t_init(msg);
3513 msg->msg_type = REQUEST_STEP_COMPLETE;
3514 msg->data = data;
3515 }
3516
3517 /* This step_complete RPC came from slurmstepd because we are using
3518 * message aggregation configured and we are at the head of the tree.
3519 * This just adds the message to the list and goes on it's merry way. */
3520 static int
_rpc_step_complete_aggr(slurm_msg_t * msg)3521 _rpc_step_complete_aggr(slurm_msg_t *msg)
3522 {
3523 int rc;
3524 uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
3525
3526 if (!_slurm_authorized_user(uid)) {
3527 error("Security violation: step_complete_aggr from uid %d",
3528 uid);
3529 if (msg->conn_fd >= 0)
3530 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3531 return SLURM_ERROR;
3532 }
3533
3534 if (conf->msg_aggr_window_msgs > 1) {
3535 slurm_msg_t *req = xmalloc_nz(sizeof(slurm_msg_t));
3536 _setup_step_complete_msg(req, msg->data);
3537 msg->data = NULL;
3538
3539 msg_aggr_add_msg(req, 1, NULL);
3540 } else {
3541 slurm_msg_t req;
3542 _setup_step_complete_msg(&req, msg->data);
3543
3544 while (slurm_send_recv_controller_rc_msg(
3545 &req, &rc, working_cluster_rec) < 0) {
3546 error("Unable to send step complete, "
3547 "trying again in a minute: %m");
3548 }
3549 }
3550
3551 /* Finish communication with the stepd, we have to wait for
3552 * the message back from the slurmctld or we will cause a race
3553 * condition with srun.
3554 */
3555 slurm_send_rc_msg(msg, SLURM_SUCCESS);
3556
3557 return SLURM_SUCCESS;
3558 }
3559
3560 /* Get list of active jobs and steps, xfree returned value */
3561 static char *
_get_step_list(void)3562 _get_step_list(void)
3563 {
3564 char tmp[64];
3565 char *step_list = NULL;
3566 List steps;
3567 ListIterator i;
3568 step_loc_t *stepd;
3569
3570 steps = stepd_available(conf->spooldir, conf->node_name);
3571 i = list_iterator_create(steps);
3572 while ((stepd = list_next(i))) {
3573 int fd;
3574 fd = stepd_connect(stepd->directory, stepd->nodename,
3575 stepd->jobid, stepd->stepid,
3576 &stepd->protocol_version);
3577 if (fd == -1)
3578 continue;
3579
3580 if (stepd_state(fd, stepd->protocol_version)
3581 == SLURMSTEPD_NOT_RUNNING) {
3582 debug("stale domain socket for stepd %u.%u ",
3583 stepd->jobid, stepd->stepid);
3584 close(fd);
3585 continue;
3586 }
3587 close(fd);
3588
3589 if (step_list)
3590 xstrcat(step_list, ", ");
3591 if (stepd->stepid == NO_VAL) {
3592 snprintf(tmp, sizeof(tmp), "%u",
3593 stepd->jobid);
3594 xstrcat(step_list, tmp);
3595 } else {
3596 snprintf(tmp, sizeof(tmp), "%u.%u",
3597 stepd->jobid, stepd->stepid);
3598 xstrcat(step_list, tmp);
3599 }
3600 }
3601 list_iterator_destroy(i);
3602 FREE_NULL_LIST(steps);
3603
3604 if (step_list == NULL)
3605 xstrcat(step_list, "NONE");
3606 return step_list;
3607 }
3608
3609 static int
_rpc_daemon_status(slurm_msg_t * msg)3610 _rpc_daemon_status(slurm_msg_t *msg)
3611 {
3612 slurm_msg_t resp_msg;
3613 slurmd_status_t *resp = NULL;
3614
3615 resp = xmalloc(sizeof(slurmd_status_t));
3616 resp->actual_cpus = conf->actual_cpus;
3617 resp->actual_boards = conf->actual_boards;
3618 resp->actual_sockets = conf->actual_sockets;
3619 resp->actual_cores = conf->actual_cores;
3620 resp->actual_threads = conf->actual_threads;
3621 resp->actual_real_mem = conf->real_memory_size;
3622 resp->actual_tmp_disk = conf->tmp_disk_space;
3623 resp->booted = startup;
3624 resp->hostname = xstrdup(conf->node_name);
3625 resp->step_list = _get_step_list();
3626 resp->last_slurmctld_msg = last_slurmctld_msg;
3627 resp->pid = conf->pid;
3628 resp->slurmd_debug = conf->debug_level;
3629 resp->slurmd_logfile = xstrdup(conf->logfile);
3630 resp->version = xstrdup(SLURM_VERSION_STRING);
3631
3632 slurm_msg_t_copy(&resp_msg, msg);
3633 resp_msg.msg_type = RESPONSE_SLURMD_STATUS;
3634 resp_msg.data = resp;
3635 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3636 slurm_free_slurmd_status(resp);
3637 return SLURM_SUCCESS;
3638 }
3639
3640 static int
_rpc_stat_jobacct(slurm_msg_t * msg)3641 _rpc_stat_jobacct(slurm_msg_t *msg)
3642 {
3643 job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data;
3644 slurm_msg_t resp_msg;
3645 job_step_stat_t *resp = NULL;
3646 int fd;
3647 uint16_t protocol_version;
3648 uid_t uid;
3649 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3650
3651 debug3("Entering _rpc_stat_jobacct");
3652 /* step completion messages are only allowed from other slurmstepd,
3653 so only root or SlurmUser is allowed here */
3654
3655 fd = stepd_connect(conf->spooldir, conf->node_name,
3656 req->job_id, req->step_id, &protocol_version);
3657 if (fd == -1) {
3658 error("stepd_connect to %u.%u failed: %m",
3659 req->job_id, req->step_id);
3660 slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3661 return ESLURM_INVALID_JOB_ID;
3662 }
3663
3664 if ((int)(uid = stepd_get_uid(fd, protocol_version)) < 0) {
3665 debug("stat_jobacct couldn't read from the step %u.%u: %m",
3666 req->job_id, req->step_id);
3667 close(fd);
3668 if (msg->conn_fd >= 0)
3669 slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3670 return ESLURM_INVALID_JOB_ID;
3671 }
3672
3673 /*
3674 * check that requesting user ID is the Slurm UID or root
3675 */
3676 if ((req_uid != uid) && (!_slurm_authorized_user(req_uid))) {
3677 error("stat_jobacct from uid %ld for job %u "
3678 "owned by uid %ld",
3679 (long) req_uid, req->job_id, (long) uid);
3680
3681 if (msg->conn_fd >= 0) {
3682 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3683 close(fd);
3684 return ESLURM_USER_ID_MISSING;/* or bad in this case */
3685 }
3686 }
3687
3688 resp = xmalloc(sizeof(job_step_stat_t));
3689 resp->step_pids = xmalloc(sizeof(job_step_pids_t));
3690 resp->step_pids->node_name = xstrdup(conf->node_name);
3691 slurm_msg_t_copy(&resp_msg, msg);
3692 resp->return_code = SLURM_SUCCESS;
3693
3694 if (stepd_stat_jobacct(fd, protocol_version, req, resp)
3695 == SLURM_ERROR) {
3696 debug("accounting for nonexistent job %u.%u requested",
3697 req->job_id, req->step_id);
3698 }
3699
3700 /* FIX ME: This should probably happen in the
3701 stepd_stat_jobacct to get more information about the pids.
3702 */
3703 if (stepd_list_pids(fd, protocol_version, &resp->step_pids->pid,
3704 &resp->step_pids->pid_cnt) == SLURM_ERROR) {
3705 debug("No pids for nonexistent job %u.%u requested",
3706 req->job_id, req->step_id);
3707 }
3708
3709 close(fd);
3710
3711 resp_msg.msg_type = RESPONSE_JOB_STEP_STAT;
3712 resp_msg.data = resp;
3713
3714 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3715 slurm_free_job_step_stat(resp);
3716 return SLURM_SUCCESS;
3717 }
3718
3719 static int
_callerid_find_job(callerid_conn_t conn,uint32_t * job_id)3720 _callerid_find_job(callerid_conn_t conn, uint32_t *job_id)
3721 {
3722 ino_t inode;
3723 pid_t pid;
3724 int rc;
3725
3726 rc = callerid_find_inode_by_conn(conn, &inode);
3727 if (rc != SLURM_SUCCESS) {
3728 debug3("network_callerid inode not found");
3729 return ESLURM_INVALID_JOB_ID;
3730 }
3731 debug3("network_callerid found inode %lu", (long unsigned int)inode);
3732
3733 rc = find_pid_by_inode(&pid, inode);
3734 if (rc != SLURM_SUCCESS) {
3735 debug3("network_callerid process not found");
3736 return ESLURM_INVALID_JOB_ID;
3737 }
3738 debug3("network_callerid found process %d", (pid_t)pid);
3739
3740 rc = slurm_pid2jobid(pid, job_id);
3741 if (rc != SLURM_SUCCESS) {
3742 debug3("network_callerid job not found");
3743 return ESLURM_INVALID_JOB_ID;
3744 }
3745 debug3("network_callerid found job %u", *job_id);
3746 return SLURM_SUCCESS;
3747 }
3748
3749 static int
_rpc_network_callerid(slurm_msg_t * msg)3750 _rpc_network_callerid(slurm_msg_t *msg)
3751 {
3752 network_callerid_msg_t *req = (network_callerid_msg_t *)msg->data;
3753 slurm_msg_t resp_msg;
3754 network_callerid_resp_t *resp = NULL;
3755
3756 uid_t job_uid = -1;
3757 uint32_t job_id = NO_VAL;
3758 callerid_conn_t conn;
3759 int rc = ESLURM_INVALID_JOB_ID;
3760 char ip_src_str[INET6_ADDRSTRLEN];
3761 char ip_dst_str[INET6_ADDRSTRLEN];
3762
3763 debug3("Entering _rpc_network_callerid");
3764
3765 resp = xmalloc(sizeof(network_callerid_resp_t));
3766 slurm_msg_t_copy(&resp_msg, msg);
3767
3768 /* Ideally this would be in an if block only when debug3 is enabled */
3769 inet_ntop(req->af, req->ip_src, ip_src_str, INET6_ADDRSTRLEN);
3770 inet_ntop(req->af, req->ip_dst, ip_dst_str, INET6_ADDRSTRLEN);
3771 debug3("network_callerid checking %s:%u => %s:%u",
3772 ip_src_str, req->port_src, ip_dst_str, req->port_dst);
3773
3774 /* My remote is the other's source */
3775 memcpy((void*)&conn.ip_dst, (void*)&req->ip_src, 16);
3776 memcpy((void*)&conn.ip_src, (void*)&req->ip_dst, 16);
3777 conn.port_src = req->port_dst;
3778 conn.port_dst = req->port_src;
3779 conn.af = req->af;
3780
3781 /* Find the job id */
3782 rc = _callerid_find_job(conn, &job_id);
3783 if (rc == SLURM_SUCCESS) {
3784 /* We found the job */
3785 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3786 if (!_slurm_authorized_user(req_uid)) {
3787 /* Requestor is not root or SlurmUser */
3788 job_uid = _get_job_uid(job_id);
3789 if (job_uid != req_uid) {
3790 /* RPC call sent by non-root user who does not
3791 * own this job. Do not send them the job ID. */
3792 error("Security violation, REQUEST_NETWORK_CALLERID from uid=%d",
3793 req_uid);
3794 job_id = NO_VAL;
3795 rc = ESLURM_INVALID_JOB_ID;
3796 }
3797 }
3798 }
3799
3800 resp->job_id = job_id;
3801 resp->node_name = xstrdup(conf->node_name);
3802
3803 resp_msg.msg_type = RESPONSE_NETWORK_CALLERID;
3804 resp_msg.data = resp;
3805
3806 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3807 slurm_free_network_callerid_resp(resp);
3808 return rc;
3809 }
3810
3811 static int
_rpc_list_pids(slurm_msg_t * msg)3812 _rpc_list_pids(slurm_msg_t *msg)
3813 {
3814 job_step_id_msg_t *req = (job_step_id_msg_t *)msg->data;
3815 slurm_msg_t resp_msg;
3816 job_step_pids_t *resp = NULL;
3817 int fd;
3818 uint16_t protocol_version = 0;
3819 uid_t job_uid;
3820 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
3821
3822 debug3("Entering _rpc_list_pids");
3823
3824 job_uid = _get_job_uid(req->job_id);
3825
3826 if ((int)job_uid < 0) {
3827 error("stat_pid for invalid job_id: %u",
3828 req->job_id);
3829 if (msg->conn_fd >= 0)
3830 slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3831 return ESLURM_INVALID_JOB_ID;
3832 }
3833
3834 /*
3835 * check that requesting user ID is the Slurm UID or root
3836 */
3837 if ((req_uid != job_uid)
3838 && (!_slurm_authorized_user(req_uid))) {
3839 error("stat_pid from uid %ld for job %u "
3840 "owned by uid %ld",
3841 (long) req_uid, req->job_id, (long) job_uid);
3842
3843 if (msg->conn_fd >= 0) {
3844 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3845 return ESLURM_USER_ID_MISSING;/* or bad in this case */
3846 }
3847 }
3848
3849 resp = xmalloc(sizeof(job_step_pids_t));
3850 slurm_msg_t_copy(&resp_msg, msg);
3851 resp->node_name = xstrdup(conf->node_name);
3852 resp->pid_cnt = 0;
3853 resp->pid = NULL;
3854 fd = stepd_connect(conf->spooldir, conf->node_name,
3855 req->job_id, req->step_id, &protocol_version);
3856 if (fd == -1) {
3857 error("stepd_connect to %u.%u failed: %m",
3858 req->job_id, req->step_id);
3859 slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
3860 slurm_free_job_step_pids(resp);
3861 return ESLURM_INVALID_JOB_ID;
3862
3863 }
3864
3865 if (stepd_list_pids(fd, protocol_version,
3866 &resp->pid, &resp->pid_cnt) == SLURM_ERROR) {
3867 debug("No pids for nonexistent job %u.%u requested",
3868 req->job_id, req->step_id);
3869 }
3870
3871 close(fd);
3872
3873 resp_msg.msg_type = RESPONSE_JOB_STEP_PIDS;
3874 resp_msg.data = resp;
3875
3876 slurm_send_node_msg(msg->conn_fd, &resp_msg);
3877 slurm_free_job_step_pids(resp);
3878 return SLURM_SUCCESS;
3879 }
3880
3881 /*
3882 * For the specified job_id: reply to slurmctld,
3883 * sleep(configured kill_wait), then send SIGKILL
3884 */
3885 static void
_rpc_timelimit(slurm_msg_t * msg)3886 _rpc_timelimit(slurm_msg_t *msg)
3887 {
3888 uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
3889 kill_job_msg_t *req = msg->data;
3890 int nsteps, rc;
3891
3892 if (!_slurm_authorized_user(uid)) {
3893 error ("Security violation: rpc_timelimit req from uid %d",
3894 uid);
3895 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
3896 return;
3897 }
3898
3899 /*
3900 * Indicate to slurmctld that we've received the message
3901 */
3902 slurm_send_rc_msg(msg, SLURM_SUCCESS);
3903 close(msg->conn_fd);
3904 msg->conn_fd = -1;
3905
3906 if (req->step_id != NO_VAL) {
3907 slurm_ctl_conf_t *cf;
3908 int delay;
3909 /* A jobstep has timed out:
3910 * - send the container a SIG_TIME_LIMIT or SIG_PREEMPTED
3911 * to log the event
3912 * - send a SIGCONT to resume any suspended tasks
3913 * - send a SIGTERM to begin termination
3914 * - sleep KILL_WAIT
3915 * - send a SIGKILL to clean up
3916 */
3917 if (msg->msg_type == REQUEST_KILL_TIMELIMIT) {
3918 rc = _signal_jobstep(req->job_id, req->step_id,
3919 SIG_TIME_LIMIT, 0, uid);
3920 } else {
3921 rc = _signal_jobstep(req->job_id, req->step_id,
3922 SIG_PREEMPTED, 0, uid);
3923 }
3924 if (rc != SLURM_SUCCESS)
3925 return;
3926 rc = _signal_jobstep(req->job_id, req->step_id, SIGCONT, 0,
3927 uid);
3928 if (rc != SLURM_SUCCESS)
3929 return;
3930 rc = _signal_jobstep(req->job_id, req->step_id, SIGTERM, 0,
3931 uid);
3932 if (rc != SLURM_SUCCESS)
3933 return;
3934 cf = slurm_conf_lock();
3935 delay = MAX(cf->kill_wait, 5);
3936 slurm_conf_unlock();
3937 sleep(delay);
3938 _signal_jobstep(req->job_id, req->step_id, SIGKILL, 0, uid);
3939 return;
3940 }
3941
3942 if (msg->msg_type == REQUEST_KILL_TIMELIMIT)
3943 _kill_all_active_steps(req->job_id, SIG_TIME_LIMIT, 0, true,
3944 uid);
3945 else /* (msg->type == REQUEST_KILL_PREEMPTED) */
3946 _kill_all_active_steps(req->job_id, SIG_PREEMPTED, 0, true,
3947 uid);
3948 nsteps = _kill_all_active_steps(req->job_id, SIGTERM, 0, false, uid);
3949 verbose("Job %u: timeout: sent SIGTERM to %d active steps",
3950 req->job_id, nsteps);
3951
3952 /* Revoke credential, send SIGKILL, run epilog, etc. */
3953 _rpc_terminate_job(msg);
3954 }
3955
_rpc_pid2jid(slurm_msg_t * msg)3956 static void _rpc_pid2jid(slurm_msg_t *msg)
3957 {
3958 job_id_request_msg_t *req = (job_id_request_msg_t *) msg->data;
3959 slurm_msg_t resp_msg;
3960 job_id_response_msg_t resp;
3961 bool found = false;
3962 List steps;
3963 ListIterator i;
3964 step_loc_t *stepd;
3965
3966 steps = stepd_available(conf->spooldir, conf->node_name);
3967 i = list_iterator_create(steps);
3968 while ((stepd = list_next(i))) {
3969 int fd;
3970 fd = stepd_connect(stepd->directory, stepd->nodename,
3971 stepd->jobid, stepd->stepid,
3972 &stepd->protocol_version);
3973 if (fd == -1)
3974 continue;
3975
3976 if (stepd_pid_in_container(
3977 fd, stepd->protocol_version,
3978 req->job_pid)
3979 || req->job_pid == stepd_daemon_pid(
3980 fd, stepd->protocol_version)) {
3981 slurm_msg_t_copy(&resp_msg, msg);
3982 resp.job_id = stepd->jobid;
3983 resp.return_code = SLURM_SUCCESS;
3984 found = true;
3985 close(fd);
3986 break;
3987 }
3988 close(fd);
3989 }
3990 list_iterator_destroy(i);
3991 FREE_NULL_LIST(steps);
3992
3993 if (found) {
3994 debug3("_rpc_pid2jid: pid(%u) found in %u",
3995 req->job_pid, resp.job_id);
3996 resp_msg.address = msg->address;
3997 resp_msg.msg_type = RESPONSE_JOB_ID;
3998 resp_msg.data = &resp;
3999
4000 slurm_send_node_msg(msg->conn_fd, &resp_msg);
4001 } else {
4002 debug3("_rpc_pid2jid: pid(%u) not found", req->job_pid);
4003 slurm_send_rc_msg(msg, ESLURM_INVALID_JOB_ID);
4004 }
4005 }
4006
4007 /* Validate sbcast credential.
4008 * NOTE: We can only perform the full credential validation once with
4009 * Munge without generating a credential replay error
4010 * RET an sbcast credential or NULL on error.
4011 * free with sbcast_cred_arg_free()
4012 */
_valid_sbcast_cred(file_bcast_msg_t * req,uid_t req_uid,gid_t req_gid,uint16_t protocol_version)4013 static sbcast_cred_arg_t *_valid_sbcast_cred(file_bcast_msg_t *req,
4014 uid_t req_uid,
4015 gid_t req_gid,
4016 uint16_t protocol_version)
4017 {
4018 sbcast_cred_arg_t *arg;
4019 hostset_t hset = NULL;
4020
4021 arg = extract_sbcast_cred(conf->vctx, req->cred, req->block_no,
4022 protocol_version);
4023 if (!arg) {
4024 error("Security violation: Invalid sbcast_cred from uid %u",
4025 req_uid);
4026 return NULL;
4027 }
4028
4029 if (!(hset = hostset_create(arg->nodes))) {
4030 error("Unable to parse sbcast_cred hostlist %s", arg->nodes);
4031 sbcast_cred_arg_free(arg);
4032 return NULL;
4033 } else if (!hostset_within(hset, conf->node_name)) {
4034 error("Security violation: sbcast_cred from %u has "
4035 "bad hostset %s", req_uid, arg->nodes);
4036 sbcast_cred_arg_free(arg);
4037 hostset_destroy(hset);
4038 return NULL;
4039 }
4040 hostset_destroy(hset);
4041
4042 /* fill in the credential with any missing data */
4043 if (arg->uid == NO_VAL)
4044 arg->uid = req_uid;
4045 if (arg->gid == NO_VAL)
4046 arg->gid = req_gid;
4047 if ((arg->uid != req_uid) || (arg->gid != req_gid)) {
4048 error("Security violation: sbcast cred from %u/%u but rpc from %u/%u",
4049 arg->uid, arg->gid, req_uid, req_gid);
4050 sbcast_cred_arg_free(arg);
4051 return NULL;
4052 }
4053
4054 /*
4055 * NOTE: user_name, ngids, gids may still be NULL, 0, NULL at this point.
4056 * we skip filling them in here to avoid excessive lookup calls
4057 * as this must run once per block (and there may be thousands of
4058 * blocks), and is only currently needed by the first block.
4059 */
4060 /* print_sbcast_cred(req->cred); */
4061
4062 return arg;
4063 }
4064
_fb_rdlock(void)4065 static void _fb_rdlock(void)
4066 {
4067 slurm_mutex_lock(&file_bcast_mutex);
4068 while (1) {
4069 if ((fb_write_wait_lock == 0) && (fb_write_lock == 0)) {
4070 fb_read_lock++;
4071 break;
4072 } else { /* wait for state change and retry */
4073 slurm_cond_wait(&file_bcast_cond, &file_bcast_mutex);
4074 }
4075 }
4076 slurm_mutex_unlock(&file_bcast_mutex);
4077 }
4078
_fb_rdunlock(void)4079 static void _fb_rdunlock(void)
4080 {
4081 slurm_mutex_lock(&file_bcast_mutex);
4082 fb_read_lock--;
4083 slurm_cond_broadcast(&file_bcast_cond);
4084 slurm_mutex_unlock(&file_bcast_mutex);
4085 }
4086
_fb_wrlock(void)4087 static void _fb_wrlock(void)
4088 {
4089 slurm_mutex_lock(&file_bcast_mutex);
4090 fb_write_wait_lock++;
4091 while (1) {
4092 if ((fb_read_lock == 0) && (fb_write_lock == 0)) {
4093 fb_write_lock++;
4094 fb_write_wait_lock--;
4095 break;
4096 } else { /* wait for state change and retry */
4097 slurm_cond_wait(&file_bcast_cond, &file_bcast_mutex);
4098 }
4099 }
4100 slurm_mutex_unlock(&file_bcast_mutex);
4101 }
4102
_fb_wrunlock(void)4103 static void _fb_wrunlock(void)
4104 {
4105 slurm_mutex_lock(&file_bcast_mutex);
4106 fb_write_lock--;
4107 slurm_cond_broadcast(&file_bcast_cond);
4108 slurm_mutex_unlock(&file_bcast_mutex);
4109 }
4110
_bcast_find_in_list(void * x,void * y)4111 static int _bcast_find_in_list(void *x, void *y)
4112 {
4113 file_bcast_info_t *info = (file_bcast_info_t *)x;
4114 file_bcast_info_t *key = (file_bcast_info_t *)y;
4115 /* uid, job_id, and fname must match */
4116 return ((info->uid == key->uid)
4117 && (info->job_id == key->job_id)
4118 && (!xstrcmp(info->fname, key->fname)));
4119 }
4120
4121 /* must have read lock */
_bcast_lookup_file(file_bcast_info_t * key)4122 static file_bcast_info_t *_bcast_lookup_file(file_bcast_info_t *key)
4123 {
4124 return list_find_first(file_bcast_list, _bcast_find_in_list, key);
4125 }
4126
4127 /* must not have read lock, will get write lock */
_file_bcast_close_file(file_bcast_info_t * key)4128 static void _file_bcast_close_file(file_bcast_info_t *key)
4129 {
4130 _fb_wrlock();
4131 list_delete_all(file_bcast_list, _bcast_find_in_list, key);
4132 _fb_wrunlock();
4133 }
4134
_free_file_bcast_info_t(void * arg)4135 static void _free_file_bcast_info_t(void *arg)
4136 {
4137 file_bcast_info_t *f = (file_bcast_info_t *)arg;
4138
4139 if (!f)
4140 return;
4141
4142 xfree(f->fname);
4143 if (f->fd)
4144 close(f->fd);
4145 xfree(f);
4146 }
4147
_bcast_find_in_list_to_remove(void * x,void * y)4148 static int _bcast_find_in_list_to_remove(void *x, void *y)
4149 {
4150 file_bcast_info_t *f = (file_bcast_info_t *)x;
4151 time_t *now = (time_t *) y;
4152
4153 if (f->last_update + FILE_BCAST_TIMEOUT < *now) {
4154 error("Removing stalled file_bcast transfer from uid "
4155 "%u to file `%s`", f->uid, f->fname);
4156 return true;
4157 }
4158
4159 return false;
4160 }
4161
4162 /* remove transfers that have stalled */
_file_bcast_cleanup(void)4163 static void _file_bcast_cleanup(void)
4164 {
4165 time_t now = time(NULL);
4166
4167 _fb_wrlock();
4168 list_delete_all(file_bcast_list, _bcast_find_in_list_to_remove, &now);
4169 _fb_wrunlock();
4170 }
4171
file_bcast_init(void)4172 void file_bcast_init(void)
4173 {
4174 /* skip locks during slurmd init */
4175 file_bcast_list = list_create(_free_file_bcast_info_t);
4176 }
4177
file_bcast_purge(void)4178 void file_bcast_purge(void)
4179 {
4180 _fb_wrlock();
4181 list_destroy(file_bcast_list);
4182 /* destroying list before exit, no need to unlock */
4183 }
4184
_rpc_file_bcast(slurm_msg_t * msg)4185 static int _rpc_file_bcast(slurm_msg_t *msg)
4186 {
4187 int rc;
4188 int64_t offset, inx;
4189 sbcast_cred_arg_t *cred_arg;
4190 file_bcast_info_t *file_info;
4191 file_bcast_msg_t *req = msg->data;
4192 file_bcast_info_t key;
4193
4194 key.uid = g_slurm_auth_get_uid(msg->auth_cred);
4195 key.gid = g_slurm_auth_get_gid(msg->auth_cred);
4196 key.fname = req->fname;
4197
4198 cred_arg = _valid_sbcast_cred(req, key.uid, key.gid,
4199 msg->protocol_version);
4200 if (!cred_arg)
4201 return ESLURMD_INVALID_JOB_CREDENTIAL;
4202
4203 #ifdef HAVE_NATIVE_CRAY
4204 if (cred_arg->het_job_id && (cred_arg->het_job_id != NO_VAL))
4205 key.job_id = cred_arg->het_job_id;
4206 else
4207 key.job_id = cred_arg->job_id;
4208 #else
4209 key.job_id = cred_arg->job_id;
4210 #endif
4211
4212 #if 0
4213 info("last_block=%u force=%u modes=%o",
4214 req->last_block, req->force, req->modes);
4215 info("uid=%u gid=%u atime=%lu mtime=%lu block_len[0]=%u",
4216 req->uid, req->gid, req->atime, req->mtime, req->block_len);
4217 #if 0
4218 /* when the file being transferred is binary, the following line
4219 * can break the terminal output for slurmd */
4220 info("req->block[0]=%s, @ %lu", \
4221 req->block[0], (unsigned long) &req->block);
4222 #endif
4223 #endif
4224
4225 if (req->block_no == 1) {
4226 info("sbcast req_uid=%u job_id=%u fname=%s block_no=%u",
4227 key.uid, key.job_id, key.fname, req->block_no);
4228 } else {
4229 debug("sbcast req_uid=%u job_id=%u fname=%s block_no=%u",
4230 key.uid, key.job_id, key.fname, req->block_no);
4231 }
4232
4233 /* first block must register the file and open fd/mmap */
4234 if (req->block_no == 1) {
4235 if ((rc = _file_bcast_register_file(msg, cred_arg, &key))) {
4236 sbcast_cred_arg_free(cred_arg);
4237 return rc;
4238 }
4239 }
4240 sbcast_cred_arg_free(cred_arg);
4241
4242 _fb_rdlock();
4243 if (!(file_info = _bcast_lookup_file(&key))) {
4244 error("No registered file transfer for uid %u file `%s`.",
4245 key.uid, key.fname);
4246 _fb_rdunlock();
4247 return SLURM_ERROR;
4248 }
4249
4250 /* now decompress file */
4251 if (bcast_decompress_data(req) < 0) {
4252 error("sbcast: data decompression error for UID %u, file %s",
4253 key.uid, key.fname);
4254 _fb_rdunlock();
4255 return SLURM_ERROR;
4256 }
4257
4258 offset = 0;
4259 while (req->block_len - offset) {
4260 inx = write(file_info->fd, &req->block[offset],
4261 (req->block_len - offset));
4262 if (inx == -1) {
4263 if ((errno == EINTR) || (errno == EAGAIN))
4264 continue;
4265 error("sbcast: uid:%u can't write `%s`: %m",
4266 key.uid, key.fname);
4267 _fb_rdunlock();
4268 return SLURM_ERROR;
4269 }
4270 offset += inx;
4271 }
4272
4273 file_info->last_update = time(NULL);
4274
4275 if (req->last_block && fchmod(file_info->fd, (req->modes & 0777))) {
4276 error("sbcast: uid:%u can't chmod `%s`: %m",
4277 key.uid, key.fname);
4278 }
4279 if (req->last_block && fchown(file_info->fd, key.uid, key.gid)) {
4280 error("sbcast: uid:%u gid:%u can't chown `%s`: %m",
4281 key.uid, key.gid, key.fname);
4282 }
4283 if (req->last_block && req->atime) {
4284 struct utimbuf time_buf;
4285 time_buf.actime = req->atime;
4286 time_buf.modtime = req->mtime;
4287 if (utime(key.fname, &time_buf)) {
4288 error("sbcast: uid:%u can't utime `%s`: %m",
4289 key.uid, key.fname);
4290 }
4291 }
4292
4293 _fb_rdunlock();
4294
4295 if (req->last_block) {
4296 _file_bcast_close_file(&key);
4297 }
4298 return SLURM_SUCCESS;
4299 }
4300
_file_bcast_register_file(slurm_msg_t * msg,sbcast_cred_arg_t * cred_arg,file_bcast_info_t * key)4301 static int _file_bcast_register_file(slurm_msg_t *msg,
4302 sbcast_cred_arg_t *cred_arg,
4303 file_bcast_info_t *key)
4304 {
4305 file_bcast_msg_t *req = msg->data;
4306 int fd, flags;
4307 file_bcast_info_t *file_info;
4308
4309 /* may still be unset in credential */
4310 if (!cred_arg->ngids || !cred_arg->gids)
4311 cred_arg->ngids = group_cache_lookup(key->uid, key->gid,
4312 cred_arg->user_name,
4313 &cred_arg->gids);
4314
4315 flags = O_WRONLY | O_CREAT;
4316 if (req->force)
4317 flags |= O_TRUNC;
4318 else
4319 flags |= O_EXCL;
4320
4321 if ((fd = _open_as_other(req->fname, flags, 0700,
4322 key->job_id, key->uid, key->gid,
4323 cred_arg->ngids, cred_arg->gids)) == -1) {
4324 error("Unable to open %s: Permission denied", req->fname);
4325 return SLURM_ERROR;
4326 }
4327
4328 file_info = xmalloc(sizeof(file_bcast_info_t));
4329 file_info->fd = fd;
4330 file_info->fname = xstrdup(req->fname);
4331 file_info->uid = key->uid;
4332 file_info->gid = key->gid;
4333 file_info->job_id = key->job_id;
4334 file_info->last_update = file_info->start_time = time(NULL);
4335
4336 //TODO: mmap the file here
4337 _fb_wrlock();
4338 list_append(file_bcast_list, file_info);
4339 _fb_wrunlock();
4340
4341 return SLURM_SUCCESS;
4342 }
4343
4344 static void
_rpc_reattach_tasks(slurm_msg_t * msg)4345 _rpc_reattach_tasks(slurm_msg_t *msg)
4346 {
4347 reattach_tasks_request_msg_t *req = msg->data;
4348 reattach_tasks_response_msg_t *resp =
4349 xmalloc(sizeof(reattach_tasks_response_msg_t));
4350 slurm_msg_t resp_msg;
4351 int rc = SLURM_SUCCESS;
4352 uint16_t port = 0;
4353 char host[MAXHOSTNAMELEN];
4354 slurm_addr_t ioaddr;
4355 void *job_cred_sig;
4356 uint32_t len;
4357 int fd;
4358 slurm_addr_t *cli = &msg->orig_addr;
4359 uint32_t nodeid = NO_VAL;
4360 uid_t uid = -1;
4361 uint16_t protocol_version;
4362 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
4363
4364 slurm_msg_t_copy(&resp_msg, msg);
4365 fd = stepd_connect(conf->spooldir, conf->node_name,
4366 req->job_id, req->job_step_id, &protocol_version);
4367 if (fd == -1) {
4368 debug("reattach for nonexistent job %u.%u stepd_connect"
4369 " failed: %m", req->job_id, req->job_step_id);
4370 rc = ESLURM_INVALID_JOB_ID;
4371 goto done;
4372 }
4373
4374 if ((int)(uid = stepd_get_uid(fd, protocol_version)) < 0) {
4375 debug("_rpc_reattach_tasks couldn't read from the "
4376 "step %u.%u: %m",
4377 req->job_id, req->job_step_id);
4378 rc = ESLURM_INVALID_JOB_ID;
4379 goto done2;
4380 }
4381
4382 nodeid = stepd_get_nodeid(fd, protocol_version);
4383
4384 debug2("_rpc_reattach_tasks: nodeid %d in the job step", nodeid);
4385
4386 if ((req_uid != uid) && (!_slurm_authorized_user(req_uid))) {
4387 error("uid %ld attempt to attach to job %u.%u owned by %ld",
4388 (long) req_uid, req->job_id, req->job_step_id,
4389 (long) uid);
4390 rc = EPERM;
4391 goto done2;
4392 }
4393
4394 memset(resp, 0, sizeof(reattach_tasks_response_msg_t));
4395 slurm_get_ip_str(cli, &port, host, sizeof(host));
4396
4397 /*
4398 * Set response address by resp_port and client address
4399 */
4400 memcpy(&resp_msg.address, cli, sizeof(slurm_addr_t));
4401 if (req->num_resp_port > 0) {
4402 port = req->resp_port[nodeid % req->num_resp_port];
4403 slurm_set_addr(&resp_msg.address, port, NULL);
4404 }
4405
4406 /*
4407 * Set IO address by io_port and client address
4408 */
4409 memcpy(&ioaddr, cli, sizeof(slurm_addr_t));
4410
4411 if (req->num_io_port > 0) {
4412 port = req->io_port[nodeid % req->num_io_port];
4413 slurm_set_addr(&ioaddr, port, NULL);
4414 }
4415
4416 /*
4417 * Get the signature of the job credential. slurmstepd will need
4418 * this to prove its identity when it connects back to srun.
4419 */
4420 slurm_cred_get_signature(req->cred, (char **)(&job_cred_sig), &len);
4421 if (len != SLURM_IO_KEY_SIZE) {
4422 error("Incorrect slurm cred signature length");
4423 goto done2;
4424 }
4425
4426 resp->gtids = NULL;
4427 resp->local_pids = NULL;
4428
4429 /* NOTE: We need to use the protocol_version from
4430 * sattach here since responses will be sent back to it. */
4431 if (msg->protocol_version < protocol_version)
4432 protocol_version = msg->protocol_version;
4433
4434 /* Following call fills in gtids and local_pids when successful. */
4435 rc = stepd_attach(fd, protocol_version, &ioaddr,
4436 &resp_msg.address, job_cred_sig, resp);
4437 if (rc != SLURM_SUCCESS) {
4438 debug2("stepd_attach call failed");
4439 goto done2;
4440 }
4441
4442 done2:
4443 close(fd);
4444 done:
4445 debug2("update step addrs rc = %d", rc);
4446 resp_msg.data = resp;
4447 resp_msg.msg_type = RESPONSE_REATTACH_TASKS;
4448 resp->node_name = xstrdup(conf->node_name);
4449 resp->return_code = rc;
4450 debug2("node %s sending rc = %d", conf->node_name, rc);
4451
4452 slurm_send_node_msg(msg->conn_fd, &resp_msg);
4453 slurm_free_reattach_tasks_response_msg(resp);
4454 }
4455
_get_job_uid(uint32_t jobid)4456 static uid_t _get_job_uid(uint32_t jobid)
4457 {
4458 List steps;
4459 ListIterator i;
4460 step_loc_t *stepd;
4461 uid_t uid = -1;
4462 int fd;
4463
4464 steps = stepd_available(conf->spooldir, conf->node_name);
4465 i = list_iterator_create(steps);
4466 while ((stepd = list_next(i))) {
4467 if (stepd->jobid != jobid) {
4468 /* multiple jobs expected on shared nodes */
4469 continue;
4470 }
4471 fd = stepd_connect(stepd->directory, stepd->nodename,
4472 stepd->jobid, stepd->stepid,
4473 &stepd->protocol_version);
4474 if (fd == -1) {
4475 debug3("Unable to connect to step %u.%u",
4476 stepd->jobid, stepd->stepid);
4477 continue;
4478 }
4479 uid = stepd_get_uid(fd, stepd->protocol_version);
4480
4481 close(fd);
4482 if ((int)uid < 0) {
4483 debug("stepd_get_uid failed %u.%u: %m",
4484 stepd->jobid, stepd->stepid);
4485 continue;
4486 }
4487 break;
4488 }
4489 list_iterator_destroy(i);
4490 FREE_NULL_LIST(steps);
4491
4492 return uid;
4493 }
4494
4495 /*
4496 * _kill_all_active_steps - signals the container of all steps of a job
4497 * jobid IN - id of job to signal
4498 * sig IN - signal to send
4499 * flags IN - to decide if batch step must be signaled, if its childs too, etc
4500 * batch IN - if true signal batch script, otherwise skip it
4501 * RET count of signaled job steps (plus batch script, if applicable)
4502 */
4503 static int
_kill_all_active_steps(uint32_t jobid,int sig,int flags,bool batch,uid_t req_uid)4504 _kill_all_active_steps(uint32_t jobid, int sig, int flags, bool batch,
4505 uid_t req_uid)
4506 {
4507 List steps;
4508 ListIterator i;
4509 step_loc_t *stepd;
4510 int step_cnt = 0;
4511 int rc = SLURM_SUCCESS;
4512
4513 bool sig_all_steps = true;
4514 bool sig_batch_step = false;
4515
4516 if ((flags & KILL_JOB_BATCH) || (flags & KILL_FULL_JOB)) {
4517 sig_all_steps = false;
4518 sig_batch_step = true;
4519 } else if (batch)
4520 sig_batch_step = true;
4521
4522 steps = stepd_available(conf->spooldir, conf->node_name);
4523 i = list_iterator_create(steps);
4524 while ((stepd = list_next(i))) {
4525 if (stepd->jobid != jobid) {
4526 /* multiple jobs expected on shared nodes */
4527 debug3("%s: Looking for job %u, found step from job %u",
4528 __func__, jobid, stepd->jobid);
4529 continue;
4530 }
4531
4532 if ((sig_all_steps && (stepd->stepid != SLURM_BATCH_SCRIPT)) ||
4533 (sig_batch_step && (stepd->stepid == SLURM_BATCH_SCRIPT))) {
4534 if (_signal_jobstep(stepd->jobid, stepd->stepid, sig,
4535 flags, req_uid) != SLURM_SUCCESS) {
4536 rc = SLURM_ERROR;
4537 continue;
4538 }
4539 step_cnt++;
4540 } else {
4541 debug3("%s: No signaling. Job: %u, Step: %u. Flags: %u",
4542 __func__, stepd->jobid, stepd->stepid, flags);
4543 }
4544 }
4545 list_iterator_destroy(i);
4546 FREE_NULL_LIST(steps);
4547
4548 if (step_cnt == 0)
4549 debug2("No steps in jobid %u %s %d",
4550 jobid, (rc == SLURM_SUCCESS) ?
4551 "to send signal" : "were able to be signaled with",
4552 sig);
4553
4554 return step_cnt;
4555 }
4556
4557 /*
4558 * ume_notify - Notify all jobs and steps on this node that a Uncorrectable
4559 * Memory Error (UME) has occured by sending SIG_UME (to log event in
4560 * stderr)
4561 * RET count of signaled job steps
4562 */
ume_notify(void)4563 extern int ume_notify(void)
4564 {
4565 List steps;
4566 ListIterator i;
4567 step_loc_t *stepd;
4568 int step_cnt = 0;
4569 int fd;
4570
4571 steps = stepd_available(conf->spooldir, conf->node_name);
4572 i = list_iterator_create(steps);
4573 while ((stepd = list_next(i))) {
4574 step_cnt++;
4575
4576 fd = stepd_connect(stepd->directory, stepd->nodename,
4577 stepd->jobid, stepd->stepid,
4578 &stepd->protocol_version);
4579 if (fd == -1) {
4580 debug3("Unable to connect to step %u.%u",
4581 stepd->jobid, stepd->stepid);
4582 continue;
4583 }
4584
4585 debug2("container SIG_UME to job %u.%u",
4586 stepd->jobid, stepd->stepid);
4587 if (stepd_signal_container(
4588 fd, stepd->protocol_version, SIG_UME, 0,
4589 getuid()) < 0)
4590 debug("kill jobid=%u failed: %m", stepd->jobid);
4591 close(fd);
4592 }
4593 list_iterator_destroy(i);
4594 FREE_NULL_LIST(steps);
4595
4596 if (step_cnt == 0)
4597 debug2("No steps to send SIG_UME");
4598 return step_cnt;
4599 }
4600 /*
4601 * _terminate_all_steps - signals the container of all steps of a job
4602 * jobid IN - id of job to signal
4603 * batch IN - if true signal batch script, otherwise skip it
4604 * RET count of signaled job steps (plus batch script, if applicable)
4605 */
4606 static int
_terminate_all_steps(uint32_t jobid,bool batch)4607 _terminate_all_steps(uint32_t jobid, bool batch)
4608 {
4609 List steps;
4610 ListIterator i;
4611 step_loc_t *stepd;
4612 int step_cnt = 0;
4613 int fd;
4614
4615 steps = stepd_available(conf->spooldir, conf->node_name);
4616 i = list_iterator_create(steps);
4617 while ((stepd = list_next(i))) {
4618 if (stepd->jobid != jobid) {
4619 /* multiple jobs expected on shared nodes */
4620 debug3("Step from other job: jobid=%u (this jobid=%u)",
4621 stepd->jobid, jobid);
4622 continue;
4623 }
4624
4625 if ((stepd->stepid == SLURM_BATCH_SCRIPT) && (!batch))
4626 continue;
4627
4628 step_cnt++;
4629
4630 fd = stepd_connect(stepd->directory, stepd->nodename,
4631 stepd->jobid, stepd->stepid,
4632 &stepd->protocol_version);
4633 if (fd == -1) {
4634 debug3("Unable to connect to step %u.%u",
4635 stepd->jobid, stepd->stepid);
4636 continue;
4637 }
4638
4639 debug2("terminate job step %u.%u", jobid, stepd->stepid);
4640 if (stepd_terminate(fd, stepd->protocol_version) < 0)
4641 debug("kill jobid=%u.%u failed: %m", jobid,
4642 stepd->stepid);
4643 close(fd);
4644 }
4645 list_iterator_destroy(i);
4646 FREE_NULL_LIST(steps);
4647 if (step_cnt == 0)
4648 debug2("No steps in job %u to terminate", jobid);
4649 return step_cnt;
4650 }
4651
4652 static bool
_job_still_running(uint32_t job_id)4653 _job_still_running(uint32_t job_id)
4654 {
4655 bool retval = false;
4656 List steps;
4657 ListIterator i;
4658 step_loc_t *s = NULL;
4659
4660 steps = stepd_available(conf->spooldir, conf->node_name);
4661 i = list_iterator_create(steps);
4662 while ((s = list_next(i))) {
4663 if (s->jobid == job_id) {
4664 int fd;
4665 fd = stepd_connect(s->directory, s->nodename,
4666 s->jobid, s->stepid,
4667 &s->protocol_version);
4668 if (fd == -1)
4669 continue;
4670
4671 if (stepd_state(fd, s->protocol_version)
4672 != SLURMSTEPD_NOT_RUNNING) {
4673 retval = true;
4674 close(fd);
4675 break;
4676 }
4677 close(fd);
4678 }
4679 }
4680 list_iterator_destroy(i);
4681 FREE_NULL_LIST(steps);
4682
4683 return retval;
4684 }
4685
4686 /*
4687 * Wait until all job steps are in SLURMSTEPD_NOT_RUNNING state.
4688 * This indicates that switch_g_job_postfini has completed and
4689 * freed the switch windows (as needed only for Federation switch).
4690 */
4691 static void
_wait_state_completed(uint32_t jobid,int max_delay)4692 _wait_state_completed(uint32_t jobid, int max_delay)
4693 {
4694 int i;
4695
4696 for (i=0; i<max_delay; i++) {
4697 if (_steps_completed_now(jobid))
4698 break;
4699 sleep(1);
4700 }
4701 if (i >= max_delay)
4702 error("timed out waiting for job %u to complete", jobid);
4703 }
4704
4705 static bool
_steps_completed_now(uint32_t jobid)4706 _steps_completed_now(uint32_t jobid)
4707 {
4708 List steps;
4709 ListIterator i;
4710 step_loc_t *stepd;
4711 bool rc = true;
4712
4713 steps = stepd_available(conf->spooldir, conf->node_name);
4714 i = list_iterator_create(steps);
4715 while ((stepd = list_next(i))) {
4716 if (stepd->jobid == jobid) {
4717 int fd;
4718 fd = stepd_connect(stepd->directory, stepd->nodename,
4719 stepd->jobid, stepd->stepid,
4720 &stepd->protocol_version);
4721 if (fd == -1)
4722 continue;
4723
4724 if (stepd_state(fd, stepd->protocol_version)
4725 != SLURMSTEPD_NOT_RUNNING) {
4726 rc = false;
4727 close(fd);
4728 break;
4729 }
4730 close(fd);
4731 }
4732 }
4733 list_iterator_destroy(i);
4734 FREE_NULL_LIST(steps);
4735
4736 return rc;
4737 }
4738
_epilog_complete_msg_setup(slurm_msg_t * msg,epilog_complete_msg_t * req,uint32_t jobid,int rc)4739 static void _epilog_complete_msg_setup(
4740 slurm_msg_t *msg, epilog_complete_msg_t *req, uint32_t jobid, int rc)
4741 {
4742 slurm_msg_t_init(msg);
4743 memset(req, 0, sizeof(epilog_complete_msg_t));
4744
4745 req->job_id = jobid;
4746 req->return_code = rc;
4747 req->node_name = conf->node_name;
4748
4749 msg->msg_type = MESSAGE_EPILOG_COMPLETE;
4750 msg->data = req;
4751 }
4752
4753 /*
4754 * Send epilog complete message to currently active controller.
4755 * If enabled, use message aggregation.
4756 * Returns SLURM_SUCCESS if message sent successfully,
4757 * SLURM_ERROR if epilog complete message fails to be sent.
4758 */
4759 static int
_epilog_complete(uint32_t jobid,int rc)4760 _epilog_complete(uint32_t jobid, int rc)
4761 {
4762 int ret = SLURM_SUCCESS;
4763
4764 if (conf->msg_aggr_window_msgs > 1) {
4765 /* message aggregation is enabled */
4766 slurm_msg_t *msg = xmalloc(sizeof(slurm_msg_t));
4767 epilog_complete_msg_t *req =
4768 xmalloc(sizeof(epilog_complete_msg_t));
4769
4770 _epilog_complete_msg_setup(msg, req, jobid, rc);
4771
4772 /* we need to copy this symbol */
4773 req->node_name = xstrdup(conf->node_name);
4774
4775 msg_aggr_add_msg(msg, 0, NULL);
4776 } else {
4777 slurm_msg_t msg;
4778 epilog_complete_msg_t req;
4779
4780 _epilog_complete_msg_setup(&msg, &req, jobid, rc);
4781
4782 /* Note: No return code to message, slurmctld will resend
4783 * TERMINATE_JOB request if message send fails */
4784 if (slurm_send_only_controller_msg(&msg, working_cluster_rec)
4785 < 0) {
4786 error("Unable to send epilog complete message: %m");
4787 ret = SLURM_ERROR;
4788 } else {
4789 debug("Job %u: sent epilog complete msg: rc = %d",
4790 jobid, rc);
4791 }
4792 }
4793 return ret;
4794 }
4795
4796 /* if a lock is granted to the job then return 1; else return 0 if
4797 * the lock for the job is already taken or there's no more locks */
4798 static int
_get_suspend_job_lock(uint32_t job_id)4799 _get_suspend_job_lock(uint32_t job_id)
4800 {
4801 static bool logged = false;
4802 int i, empty_loc = -1, rc = 0;
4803
4804 slurm_mutex_lock(&suspend_mutex);
4805 for (i = 0; i < job_suspend_size; i++) {
4806 if (job_suspend_array[i] == 0) {
4807 empty_loc = i;
4808 continue;
4809 }
4810 if (job_suspend_array[i] == job_id) {
4811 /* another thread already a lock for this job ID */
4812 slurm_mutex_unlock(&suspend_mutex);
4813 return rc;
4814 }
4815 }
4816
4817 if (empty_loc != -1) {
4818 /* nobody has the lock and here's an available used lock */
4819 job_suspend_array[empty_loc] = job_id;
4820 rc = 1;
4821 } else if (job_suspend_size < NUM_PARALLEL_SUSP_JOBS) {
4822 /* a new lock is available */
4823 job_suspend_array[job_suspend_size++] = job_id;
4824 rc = 1;
4825 } else if (!logged) {
4826 error("Simultaneous job suspend/resume limit reached (%d). "
4827 "Configure SchedulerTimeSlice higher.",
4828 NUM_PARALLEL_SUSP_JOBS);
4829 logged = true;
4830 }
4831 slurm_mutex_unlock(&suspend_mutex);
4832 return rc;
4833 }
4834
4835 static void
_unlock_suspend_job(uint32_t job_id)4836 _unlock_suspend_job(uint32_t job_id)
4837 {
4838 int i;
4839 slurm_mutex_lock(&suspend_mutex);
4840 for (i = 0; i < job_suspend_size; i++) {
4841 if (job_suspend_array[i] == job_id)
4842 job_suspend_array[i] = 0;
4843 }
4844 slurm_mutex_unlock(&suspend_mutex);
4845 }
4846
4847 /* Add record for every launched job so we know they are ready for suspend */
record_launched_jobs(void)4848 extern void record_launched_jobs(void)
4849 {
4850 List steps;
4851 ListIterator i;
4852 step_loc_t *stepd;
4853
4854 steps = stepd_available(conf->spooldir, conf->node_name);
4855 i = list_iterator_create(steps);
4856 while ((stepd = list_next(i))) {
4857 _launch_complete_add(stepd->jobid);
4858 }
4859 list_iterator_destroy(i);
4860 FREE_NULL_LIST(steps);
4861 }
4862
4863 /*
4864 * Send a job suspend/resume request through the appropriate slurmstepds for
4865 * each job step belonging to a given job allocation.
4866 */
4867 static void
_rpc_suspend_job(slurm_msg_t * msg)4868 _rpc_suspend_job(slurm_msg_t *msg)
4869 {
4870 int time_slice = -1;
4871 suspend_int_msg_t *req = msg->data;
4872 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
4873 List steps;
4874 ListIterator i;
4875 step_loc_t *stepd;
4876 int step_cnt = 0;
4877 int rc = SLURM_SUCCESS;
4878 DEF_TIMERS;
4879
4880 if (time_slice == -1)
4881 time_slice = slurm_get_time_slice();
4882 if ((req->op != SUSPEND_JOB) && (req->op != RESUME_JOB)) {
4883 error("REQUEST_SUSPEND_INT: bad op code %u", req->op);
4884 rc = ESLURM_NOT_SUPPORTED;
4885 }
4886
4887 /*
4888 * check that requesting user ID is the Slurm UID or root
4889 */
4890 if (!_slurm_authorized_user(req_uid)) {
4891 error("Security violation: suspend_job(%u) from uid %d",
4892 req->job_id, req_uid);
4893 rc = ESLURM_USER_ID_MISSING;
4894 }
4895
4896 /* send a response now, which will include any errors
4897 * detected with the request */
4898 if (msg->conn_fd >= 0) {
4899 slurm_send_rc_msg(msg, rc);
4900 if (close(msg->conn_fd) < 0)
4901 error("_rpc_suspend_job: close(%d): %m",
4902 msg->conn_fd);
4903 msg->conn_fd = -1;
4904 }
4905 if (rc != SLURM_SUCCESS)
4906 return;
4907
4908 /* now we can focus on performing the requested action,
4909 * which could take a few seconds to complete */
4910 debug("_rpc_suspend_job jobid=%u uid=%d action=%s", req->job_id,
4911 req_uid, req->op == SUSPEND_JOB ? "suspend" : "resume");
4912
4913 /* Try to get a thread lock for this job. If the lock
4914 * is not available then sleep and try again */
4915 while (!_get_suspend_job_lock(req->job_id)) {
4916 debug3("suspend lock sleep for %u", req->job_id);
4917 usleep(10000);
4918 }
4919 START_TIMER;
4920
4921 /* Defer suspend until job prolog and launch complete */
4922 if (req->op == SUSPEND_JOB)
4923 _launch_complete_wait(req->job_id);
4924
4925 if ((req->op == SUSPEND_JOB) && (req->indf_susp))
4926 switch_g_job_suspend(req->switch_info, 5);
4927
4928 if (req->op == SUSPEND_JOB)
4929 (void) task_g_slurmd_suspend_job(req->job_id);
4930
4931 /*
4932 * Loop through all job steps and call stepd_suspend or stepd_resume
4933 * as appropriate. Since the "suspend" action may contains a sleep
4934 * (if the launch is in progress) suspend multiple jobsteps in parallel.
4935 */
4936 steps = stepd_available(conf->spooldir, conf->node_name);
4937 i = list_iterator_create(steps);
4938
4939 while (1) {
4940 int x, fdi, fd[NUM_PARALLEL_SUSP_STEPS];
4941 uint16_t protocol_version[NUM_PARALLEL_SUSP_STEPS];
4942
4943 fdi = 0;
4944 while ((stepd = list_next(i))) {
4945 if (stepd->jobid != req->job_id) {
4946 /* multiple jobs expected on shared nodes */
4947 debug3("Step from other job: jobid=%u "
4948 "(this jobid=%u)",
4949 stepd->jobid, req->job_id);
4950 continue;
4951 }
4952 step_cnt++;
4953
4954 fd[fdi] = stepd_connect(stepd->directory,
4955 stepd->nodename, stepd->jobid,
4956 stepd->stepid,
4957 &protocol_version[fdi]);
4958 if (fd[fdi] == -1) {
4959 debug3("Unable to connect to step %u.%u",
4960 stepd->jobid, stepd->stepid);
4961 continue;
4962 }
4963
4964 fdi++;
4965 if (fdi >= NUM_PARALLEL_SUSP_STEPS)
4966 break;
4967 }
4968 /* check for open connections */
4969 if (fdi == 0)
4970 break;
4971
4972 if (req->op == SUSPEND_JOB) {
4973 int susp_fail_count = 0;
4974 /* The suspend RPCs are processed in parallel for
4975 * every step in the job */
4976 for (x = 0; x < fdi; x++) {
4977 (void) stepd_suspend(fd[x],
4978 protocol_version[x],
4979 req, 0);
4980 }
4981 for (x = 0; x < fdi; x++) {
4982 if (stepd_suspend(fd[x],
4983 protocol_version[x],
4984 req, 1) < 0) {
4985 susp_fail_count++;
4986 } else {
4987 close(fd[x]);
4988 fd[x] = -1;
4989 }
4990 }
4991 /* Suspend RPCs can fail at step startup, so retry */
4992 if (susp_fail_count) {
4993 sleep(1);
4994 for (x = 0; x < fdi; x++) {
4995 if (fd[x] == -1)
4996 continue;
4997 (void) stepd_suspend(
4998 fd[x],
4999 protocol_version[x],
5000 req, 0);
5001 if (stepd_suspend(
5002 fd[x],
5003 protocol_version[x],
5004 req, 1) >= 0)
5005 continue;
5006 debug("Suspend of job %u failed: %m",
5007 req->job_id);
5008 }
5009 }
5010 } else {
5011 /* The resume RPCs are processed in parallel for
5012 * every step in the job */
5013 for (x = 0; x < fdi; x++) {
5014 (void) stepd_resume(fd[x],
5015 protocol_version[x],
5016 req, 0);
5017 }
5018 for (x = 0; x < fdi; x++) {
5019 if (stepd_resume(fd[x],
5020 protocol_version[x],
5021 req, 1) < 0) {
5022 debug("Resume of job %u failed: %m",
5023 req->job_id);
5024 }
5025 }
5026 }
5027 for (x = 0; x < fdi; x++) {
5028 /* fd may have been closed by stepd_suspend */
5029 if (fd[x] != -1)
5030 close(fd[x]);
5031 }
5032
5033 /* check for no more jobs */
5034 if (fdi < NUM_PARALLEL_SUSP_STEPS)
5035 break;
5036 }
5037 list_iterator_destroy(i);
5038 FREE_NULL_LIST(steps);
5039
5040 if (req->op == RESUME_JOB) /* Call task plugin after processes resume */
5041 (void) task_g_slurmd_resume_job(req->job_id);
5042 if ((req->op == RESUME_JOB) && (req->indf_susp))
5043 switch_g_job_resume(req->switch_info, 5);
5044
5045 _unlock_suspend_job(req->job_id);
5046
5047 END_TIMER;
5048 if (DELTA_TIMER >= (time_slice * USEC_IN_SEC)) {
5049 if (req->op == SUSPEND_JOB) {
5050 info("Suspend time for job_id %u was %s. "
5051 "Configure SchedulerTimeSlice higher.",
5052 req->job_id, TIME_STR);
5053 } else {
5054 info("Resume time for job_id %u was %s. "
5055 "Configure SchedulerTimeSlice higher.",
5056 req->job_id, TIME_STR);
5057 }
5058 }
5059
5060 if (step_cnt == 0) {
5061 debug2("No steps in jobid %u to suspend/resume", req->job_id);
5062 }
5063 }
5064
5065 /* Job shouldn't even be running here, abort it immediately */
5066 static void
_rpc_abort_job(slurm_msg_t * msg)5067 _rpc_abort_job(slurm_msg_t *msg)
5068 {
5069 kill_job_msg_t *req = msg->data;
5070 uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
5071 job_env_t job_env;
5072 int node_id = 0;
5073 uint32_t jobid;
5074
5075 debug("_rpc_abort_job, uid = %d", uid);
5076 /*
5077 * check that requesting user ID is the Slurm UID
5078 */
5079 if (!_slurm_authorized_user(uid)) {
5080 error("Security violation: abort_job(%u) from uid %d",
5081 req->job_id, uid);
5082 if (msg->conn_fd >= 0)
5083 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
5084 return;
5085 }
5086
5087 task_g_slurmd_release_resources(req->job_id);
5088
5089 /*
5090 * "revoke" all future credentials for this jobid
5091 */
5092 if (slurm_cred_revoke(conf->vctx, req->job_id, req->time,
5093 req->start_time) < 0) {
5094 debug("revoking cred for job %u: %m", req->job_id);
5095 } else {
5096 save_cred_state(conf->vctx);
5097 debug("credential for job %u revoked", req->job_id);
5098 }
5099
5100 /*
5101 * At this point, if connection still open, we send controller
5102 * a "success" reply to indicate that we've recvd the msg.
5103 */
5104 if (msg->conn_fd >= 0) {
5105 slurm_send_rc_msg(msg, SLURM_SUCCESS);
5106 if (close(msg->conn_fd) < 0)
5107 error ("rpc_abort_job: close(%d): %m", msg->conn_fd);
5108 msg->conn_fd = -1;
5109 }
5110
5111 if (_kill_all_active_steps(req->job_id, SIG_ABORT, 0, true, uid)) {
5112 /*
5113 * Block until all user processes are complete.
5114 */
5115 _pause_for_job_completion (req->job_id, req->nodes, 0);
5116 }
5117
5118 /*
5119 * Begin expiration period for cached information about job.
5120 * If expiration period has already begun, then do not run
5121 * the epilog again, as that script has already been executed
5122 * for this job.
5123 */
5124 if (slurm_cred_begin_expiration(conf->vctx, req->job_id) < 0) {
5125 debug("Not running epilog for jobid %d: %m", req->job_id);
5126 return;
5127 }
5128
5129 save_cred_state(conf->vctx);
5130
5131 #ifndef HAVE_FRONT_END
5132 /* It is always 0 for front end systems */
5133 node_id = nodelist_find(req->nodes, conf->node_name);
5134 #endif
5135 memset(&job_env, 0, sizeof(job_env));
5136 gres_plugin_epilog_set_env(&job_env.gres_job_env, req->job_gres_info,
5137 node_id);
5138 job_env.jobid = req->job_id;
5139 job_env.node_list = req->nodes;
5140 job_env.spank_job_env = req->spank_job_env;
5141 job_env.spank_job_env_size = req->spank_job_env_size;
5142 job_env.uid = req->job_uid;
5143 job_env.gid = req->job_gid;
5144
5145 _run_epilog(&job_env);
5146 _free_job_env(&job_env);
5147
5148 #ifdef HAVE_NATIVE_CRAY
5149 if (req->het_job_id && (req->het_job_id != NO_VAL))
5150 jobid = req->het_job_id;
5151 else
5152 jobid = req->job_id;
5153 #else
5154 jobid = req->job_id;
5155 #endif
5156
5157 if (container_g_delete(jobid))
5158 error("container_g_delete(%u): %m", req->job_id);
5159 _launch_complete_rm(req->job_id);
5160 }
5161
5162 /*
5163 * This complete batch RPC came from slurmstepd because we have message
5164 * aggregation configured. Terminate the job here and forward the batch
5165 * completion RPC to slurmctld.
5166 */
5167 static void
_rpc_complete_batch(slurm_msg_t * msg)5168 _rpc_complete_batch(slurm_msg_t *msg)
5169 {
5170 int i, rc, msg_rc;
5171 slurm_msg_t resp_msg;
5172 uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
5173 complete_batch_script_msg_t *req = msg->data;
5174
5175 if (!_slurm_authorized_user(uid)) {
5176 error("Security violation: complete_batch(%u) from uid %d",
5177 req->job_id, uid);
5178 if (msg->conn_fd >= 0)
5179 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
5180 return;
5181 }
5182
5183 slurm_send_rc_msg(msg, SLURM_SUCCESS);
5184
5185 for (i = 0; i <= MAX_RETRY; i++) {
5186 if (conf->msg_aggr_window_msgs > 1) {
5187 slurm_msg_t *req_msg =
5188 xmalloc_nz(sizeof(slurm_msg_t));
5189 slurm_msg_t_init(req_msg);
5190 req_msg->msg_type = msg->msg_type;
5191 req_msg->data = msg->data;
5192 msg->data = NULL;
5193
5194 msg_aggr_add_msg(req_msg, 1, NULL);
5195 return;
5196 } else {
5197 slurm_msg_t req_msg;
5198 slurm_msg_t_init(&req_msg);
5199 req_msg.msg_type = msg->msg_type;
5200 req_msg.data = msg->data;
5201 msg_rc = slurm_send_recv_controller_msg(
5202 &req_msg, &resp_msg, working_cluster_rec);
5203
5204 if (msg_rc == SLURM_SUCCESS)
5205 break;
5206 }
5207 info("Retrying job complete RPC for job %u", req->job_id);
5208 sleep(RETRY_DELAY);
5209 }
5210 if (i > MAX_RETRY) {
5211 error("Unable to send job complete message: %m");
5212 return;
5213 }
5214
5215 if (resp_msg.msg_type == RESPONSE_SLURM_RC) {
5216 last_slurmctld_msg = time(NULL);
5217 rc = ((return_code_msg_t *) resp_msg.data)->return_code;
5218 slurm_free_return_code_msg(resp_msg.data);
5219 if (rc) {
5220 error("complete_batch for job %u: %s", req->job_id,
5221 slurm_strerror(rc));
5222 }
5223 return;
5224 }
5225 }
5226
5227 static void
_rpc_terminate_job(slurm_msg_t * msg)5228 _rpc_terminate_job(slurm_msg_t *msg)
5229 {
5230 int rc = SLURM_SUCCESS;
5231 kill_job_msg_t *req = msg->data;
5232 uid_t uid = g_slurm_auth_get_uid(msg->auth_cred);
5233 int nsteps = 0;
5234 int delay;
5235 int node_id = 0;
5236 job_env_t job_env;
5237 uint32_t jobid;
5238
5239 debug("_rpc_terminate_job, uid = %d", uid);
5240 /*
5241 * check that requesting user ID is the Slurm UID
5242 */
5243 if (!_slurm_authorized_user(uid)) {
5244 error("Security violation: kill_job(%u) from uid %d",
5245 req->job_id, uid);
5246 if (msg->conn_fd >= 0)
5247 slurm_send_rc_msg(msg, ESLURM_USER_ID_MISSING);
5248 return;
5249 }
5250
5251 /* Use this when dealing with the job container */
5252 #ifdef HAVE_NATIVE_CRAY
5253 if (req->het_job_id && (req->het_job_id != NO_VAL))
5254 jobid = req->het_job_id;
5255 else
5256 jobid = req->job_id;
5257 #else
5258 jobid = req->job_id;
5259 #endif
5260
5261 task_g_slurmd_release_resources(req->job_id);
5262
5263 /*
5264 * Initialize a "waiter" thread for this jobid. If another
5265 * thread is already waiting on termination of this job,
5266 * _waiter_init() will return SLURM_ERROR. In this case, just
5267 * notify slurmctld that we recvd the message successfully,
5268 * then exit this thread.
5269 */
5270 if (_waiter_init(req->job_id) == SLURM_ERROR) {
5271 if (msg->conn_fd >= 0) {
5272 /* No matter if the step hasn't started yet or
5273 * not just send a success to let the
5274 * controller know we got this request.
5275 */
5276 slurm_send_rc_msg (msg, SLURM_SUCCESS);
5277 }
5278 return;
5279 }
5280
5281 /*
5282 * Note the job is finishing to avoid a race condition for batch jobs
5283 * that finish before the slurmd knows it finished launching.
5284 */
5285 _note_batch_job_finished(req->job_id);
5286 /*
5287 * "revoke" all future credentials for this jobid
5288 */
5289 if (slurm_cred_revoke(conf->vctx, req->job_id, req->time,
5290 req->start_time) < 0) {
5291 debug("revoking cred for job %u: %m", req->job_id);
5292 } else {
5293 save_cred_state(conf->vctx);
5294 debug("credential for job %u revoked", req->job_id);
5295 }
5296
5297 if (_prolog_is_running(req->job_id)) {
5298 if (msg->conn_fd >= 0) {
5299 /* If the step hasn't finished running the prolog
5300 * (or finshed starting the extern step) yet just send
5301 * a success to let the controller know we got
5302 * this request.
5303 */
5304 debug("%s: sent SUCCESS for %u, waiting for prolog to finish",
5305 __func__, req->job_id);
5306 slurm_send_rc_msg(msg, SLURM_SUCCESS);
5307 if (close(msg->conn_fd) < 0)
5308 error("%s: close(%d): %m",
5309 __func__, msg->conn_fd);
5310 msg->conn_fd = -1;
5311 }
5312 _wait_for_job_running_prolog(req->job_id);
5313 }
5314
5315 /*
5316 * Before signaling steps, if the job has any steps that are still
5317 * in the process of fork/exec/check in with slurmd, wait on a condition
5318 * var for the start. Otherwise a slow-starting step can miss the
5319 * job termination message and run indefinitely.
5320 */
5321 if (_step_is_starting(req->job_id, NO_VAL)) {
5322 if (msg->conn_fd >= 0) {
5323 /* If the step hasn't started yet just send a
5324 * success to let the controller know we got
5325 * this request.
5326 */
5327 debug("sent SUCCESS, waiting for step to start");
5328 slurm_send_rc_msg (msg, SLURM_SUCCESS);
5329 if (close(msg->conn_fd) < 0)
5330 error("rpc_kill_job: close(%d): %m",
5331 msg->conn_fd);
5332 msg->conn_fd = -1;
5333 }
5334 if (_wait_for_starting_step(req->job_id, NO_VAL)) {
5335 /*
5336 * There's currently no case in which we enter this
5337 * error condition. If there was, it's hard to say
5338 * whether to proceed with the job termination.
5339 */
5340 error("Error in _wait_for_starting_step");
5341 }
5342 }
5343
5344 if (IS_JOB_NODE_FAILED(req))
5345 _kill_all_active_steps(req->job_id, SIG_NODE_FAIL, 0, true,
5346 uid);
5347 if (IS_JOB_PENDING(req))
5348 _kill_all_active_steps(req->job_id, SIG_REQUEUED, 0, true, uid);
5349 else if (IS_JOB_FAILED(req))
5350 _kill_all_active_steps(req->job_id, SIG_FAILURE, 0, true, uid);
5351
5352 /*
5353 * Tasks might be stopped (possibly by a debugger)
5354 * so send SIGCONT first.
5355 */
5356 _kill_all_active_steps(req->job_id, SIGCONT, 0, true, uid);
5357 if (errno == ESLURMD_STEP_SUSPENDED) {
5358 /*
5359 * If the job step is currently suspended, we don't
5360 * bother with a "nice" termination.
5361 */
5362 debug2("Job is currently suspended, terminating");
5363 nsteps = _terminate_all_steps(req->job_id, true);
5364 } else {
5365 nsteps = _kill_all_active_steps(req->job_id, SIGTERM, 0, true,
5366 uid);
5367 }
5368
5369 /*
5370 * If there are currently no active job steps and no
5371 * configured epilog to run, bypass asynchronous reply and
5372 * notify slurmctld that we have already completed this
5373 * request. We need to send current switch state on AIX
5374 * systems, so this bypass can not be used.
5375 */
5376 if ((nsteps == 0) && !conf->epilog && !spank_has_epilog()) {
5377 debug4("sent ALREADY_COMPLETE");
5378 if (msg->conn_fd >= 0) {
5379 slurm_send_rc_msg(msg,
5380 ESLURMD_KILL_JOB_ALREADY_COMPLETE);
5381 }
5382 slurm_cred_begin_expiration(conf->vctx, req->job_id);
5383 save_cred_state(conf->vctx);
5384 _waiter_complete(req->job_id);
5385
5386 /*
5387 * The controller needs to get MESSAGE_EPILOG_COMPLETE to bring
5388 * the job out of "completing" state. Otherwise, the job
5389 * could remain "completing" unnecessarily, until the request
5390 * to terminate is resent.
5391 */
5392 _sync_messages_kill(req);
5393 if (msg->conn_fd < 0) {
5394 /* The epilog complete message processing on
5395 * slurmctld is equivalent to that of a
5396 * ESLURMD_KILL_JOB_ALREADY_COMPLETE reply above */
5397 _epilog_complete(req->job_id, rc);
5398 }
5399
5400 if (container_g_delete(jobid))
5401 error("container_g_delete(%u): %m", req->job_id);
5402 _launch_complete_rm(req->job_id);
5403 return;
5404 }
5405
5406 /*
5407 * At this point, if connection still open, we send controller
5408 * a "success" reply to indicate that we've recvd the msg.
5409 */
5410 if (msg->conn_fd >= 0) {
5411 debug4("sent SUCCESS");
5412 slurm_send_rc_msg(msg, SLURM_SUCCESS);
5413 if (close(msg->conn_fd) < 0)
5414 error ("rpc_kill_job: close(%d): %m", msg->conn_fd);
5415 msg->conn_fd = -1;
5416 }
5417
5418 /*
5419 * Check for corpses
5420 */
5421 delay = MAX(conf->kill_wait, 5);
5422 if (!_pause_for_job_completion (req->job_id, req->nodes, delay) &&
5423 _terminate_all_steps(req->job_id, true) ) {
5424 /*
5425 * Block until all user processes are complete.
5426 */
5427 _pause_for_job_completion (req->job_id, req->nodes, 0);
5428 }
5429
5430 /*
5431 * Begin expiration period for cached information about job.
5432 * If expiration period has already begun, then do not run
5433 * the epilog again, as that script has already been executed
5434 * for this job.
5435 */
5436 if (slurm_cred_begin_expiration(conf->vctx, req->job_id) < 0) {
5437 debug("Not running epilog for jobid %d: %m", req->job_id);
5438 goto done;
5439 }
5440
5441 save_cred_state(conf->vctx);
5442
5443 #ifndef HAVE_FRONT_END
5444 /* It is always 0 for front end systems */
5445 node_id = nodelist_find(req->nodes, conf->node_name);
5446 #endif
5447 memset(&job_env, 0, sizeof(job_env));
5448 gres_plugin_epilog_set_env(&job_env.gres_job_env, req->job_gres_info,
5449 node_id);
5450
5451 job_env.jobid = req->job_id;
5452 job_env.node_list = req->nodes;
5453 job_env.spank_job_env = req->spank_job_env;
5454 job_env.spank_job_env_size = req->spank_job_env_size;
5455 job_env.uid = req->job_uid;
5456 job_env.gid = req->job_gid;
5457
5458 rc = _run_epilog(&job_env);
5459 _free_job_env(&job_env);
5460
5461 if (rc) {
5462 int term_sig = 0, exit_status = 0;
5463 if (WIFSIGNALED(rc))
5464 term_sig = WTERMSIG(rc);
5465 else if (WIFEXITED(rc))
5466 exit_status = WEXITSTATUS(rc);
5467 error("[job %u] epilog failed status=%d:%d",
5468 req->job_id, exit_status, term_sig);
5469 rc = ESLURMD_EPILOG_FAILED;
5470 } else
5471 debug("completed epilog for jobid %u", req->job_id);
5472 if (container_g_delete(jobid))
5473 error("container_g_delete(%u): %m", req->job_id);
5474 _launch_complete_rm(req->job_id);
5475
5476 done:
5477 _wait_state_completed(req->job_id, 5);
5478 _waiter_complete(req->job_id);
5479 _sync_messages_kill(req);
5480
5481 _epilog_complete(req->job_id, rc);
5482 }
5483
5484 /* On a parallel job, every slurmd may send the EPILOG_COMPLETE
5485 * message to the slurmctld at the same time, resulting in lost
5486 * messages. We add a delay here to spead out the message traffic
5487 * assuming synchronized clocks across the cluster.
5488 * Allow 10 msec processing time in slurmctld for each RPC. */
_sync_messages_kill(kill_job_msg_t * req)5489 static void _sync_messages_kill(kill_job_msg_t *req)
5490 {
5491 int host_cnt, host_inx;
5492 char *host;
5493 hostset_t hosts;
5494 int epilog_msg_time;
5495
5496 hosts = hostset_create(req->nodes);
5497 host_cnt = hostset_count(hosts);
5498 if (host_cnt <= 64)
5499 goto fini;
5500 if (conf->hostname == NULL)
5501 goto fini; /* should never happen */
5502
5503 for (host_inx=0; host_inx<host_cnt; host_inx++) {
5504 host = hostset_shift(hosts);
5505 if (host == NULL)
5506 break;
5507 if (xstrcmp(host, conf->node_name) == 0) {
5508 free(host);
5509 break;
5510 }
5511 free(host);
5512 }
5513 epilog_msg_time = slurm_get_epilog_msg_time();
5514 _delay_rpc(host_inx, host_cnt, epilog_msg_time);
5515
5516 fini: hostset_destroy(hosts);
5517 }
5518
5519 /* Delay a message based upon the host index, total host count and RPC_TIME.
5520 * This logic depends upon synchronized clocks across the cluster. */
_delay_rpc(int host_inx,int host_cnt,int usec_per_rpc)5521 static void _delay_rpc(int host_inx, int host_cnt, int usec_per_rpc)
5522 {
5523 struct timeval tv1;
5524 uint32_t cur_time; /* current time in usec (just 9 digits) */
5525 uint32_t tot_time; /* total time expected for all RPCs */
5526 uint32_t offset_time; /* relative time within tot_time */
5527 uint32_t target_time; /* desired time to issue the RPC */
5528 uint32_t delta_time;
5529
5530 again: if (gettimeofday(&tv1, NULL)) {
5531 usleep(host_inx * usec_per_rpc);
5532 return;
5533 }
5534
5535 cur_time = ((tv1.tv_sec % 1000) * 1000000) + tv1.tv_usec;
5536 tot_time = host_cnt * usec_per_rpc;
5537 offset_time = cur_time % tot_time;
5538 target_time = host_inx * usec_per_rpc;
5539 if (target_time < offset_time)
5540 delta_time = target_time - offset_time + tot_time;
5541 else
5542 delta_time = target_time - offset_time;
5543 if (usleep(delta_time)) {
5544 if (errno == EINVAL) /* usleep for more than 1 sec */
5545 usleep(900000);
5546 /* errno == EINTR */
5547 goto again;
5548 }
5549 }
5550
5551 /*
5552 * Returns true if "uid" is a "slurm authorized user" - i.e. uid == 0
5553 * or uid == slurm user id at this time.
5554 */
5555 static bool
_slurm_authorized_user(uid_t uid)5556 _slurm_authorized_user(uid_t uid)
5557 {
5558 return ((uid == (uid_t) 0) || (uid == conf->slurm_user_id));
5559 }
5560
5561
5562 struct waiter {
5563 uint32_t jobid;
5564 pthread_t thd;
5565 };
5566
5567
5568 static struct waiter *
_waiter_create(uint32_t jobid)5569 _waiter_create(uint32_t jobid)
5570 {
5571 struct waiter *wp = xmalloc(sizeof(struct waiter));
5572
5573 wp->jobid = jobid;
5574 wp->thd = pthread_self();
5575
5576 return wp;
5577 }
5578
_find_waiter(void * x,void * y)5579 static int _find_waiter(void *x, void *y)
5580 {
5581 struct waiter *w = (struct waiter *)x;
5582 uint32_t *jp = (uint32_t *)y;
5583
5584 return (w->jobid == *jp);
5585 }
5586
_waiter_init(uint32_t jobid)5587 static int _waiter_init (uint32_t jobid)
5588 {
5589 int rc = SLURM_SUCCESS;
5590
5591 slurm_mutex_lock(&waiter_mutex);
5592 if (!waiters)
5593 waiters = list_create(xfree_ptr);
5594
5595 /*
5596 * Exit this thread if another thread is waiting on job
5597 */
5598 if (list_find_first(waiters, _find_waiter, &jobid))
5599 rc = SLURM_ERROR;
5600 else
5601 list_append(waiters, _waiter_create(jobid));
5602
5603 slurm_mutex_unlock(&waiter_mutex);
5604
5605 return rc;
5606 }
5607
_waiter_complete(uint32_t jobid)5608 static int _waiter_complete (uint32_t jobid)
5609 {
5610 int rc = 0;
5611
5612 slurm_mutex_lock(&waiter_mutex);
5613 if (waiters)
5614 rc = list_delete_all(waiters, _find_waiter, &jobid);
5615 slurm_mutex_unlock(&waiter_mutex);
5616
5617 return rc;
5618 }
5619
5620 /*
5621 * Like _wait_for_procs(), but only wait for up to max_time seconds
5622 * if max_time == 0, send SIGKILL to tasks repeatedly
5623 *
5624 * Returns true if all job processes are gone
5625 */
5626 static bool
_pause_for_job_completion(uint32_t job_id,char * nodes,int max_time)5627 _pause_for_job_completion (uint32_t job_id, char *nodes, int max_time)
5628 {
5629 int sec = 0;
5630 int pause = 1;
5631 bool rc = false;
5632 int count = 0;
5633
5634 while ((sec < max_time) || (max_time == 0)) {
5635 rc = _job_still_running (job_id);
5636 if (!rc)
5637 break;
5638 if ((max_time == 0) && (sec > 1)) {
5639 _terminate_all_steps(job_id, true);
5640 }
5641 if (sec > 10) {
5642 /* Reduce logging frequency about unkillable tasks */
5643 if (max_time)
5644 pause = MIN((max_time - sec), 10);
5645 else
5646 pause = 10;
5647 }
5648
5649 /*
5650 * The job will usually finish up within the first .02 sec. If
5651 * not gradually increase the sleep until we get to a second.
5652 */
5653 if (count == 0) {
5654 usleep(20000);
5655 count++;
5656 } else if (count == 1) {
5657 usleep(50000);
5658 count++;
5659 } else if (count == 2) {
5660 usleep(100000);
5661 count++;
5662 } else if (count == 3) {
5663 usleep(500000);
5664 count++;
5665 sec = 1;
5666 } else {
5667 sleep(pause);
5668 sec += pause;
5669 }
5670 }
5671
5672 /*
5673 * Return true if job is NOT running
5674 */
5675 return (!rc);
5676 }
5677
5678 /*
5679 * Does nothing and returns SLURM_SUCCESS (if uid authenticates).
5680 *
5681 * Timelimit is not currently used in the slurmd or slurmstepd.
5682 */
5683 static void
_rpc_update_time(slurm_msg_t * msg)5684 _rpc_update_time(slurm_msg_t *msg)
5685 {
5686 int rc = SLURM_SUCCESS;
5687 uid_t req_uid = g_slurm_auth_get_uid(msg->auth_cred);
5688
5689 if ((req_uid != conf->slurm_user_id) && (req_uid != 0)) {
5690 rc = ESLURM_USER_ID_MISSING;
5691 error("Security violation, uid %d can't update time limit",
5692 req_uid);
5693 goto done;
5694 }
5695
5696 /* if (shm_update_job_timelimit(req->job_id, req->expiration_time) < 0) { */
5697 /* error("updating lifetime for job %u: %m", req->job_id); */
5698 /* rc = ESLURM_INVALID_JOB_ID; */
5699 /* } else */
5700 /* debug("reset job %u lifetime", req->job_id); */
5701
5702 done:
5703 slurm_send_rc_msg(msg, rc);
5704 }
5705
_free_job_env(job_env_t * env_ptr)5706 static void _free_job_env(job_env_t *env_ptr)
5707 {
5708 int i;
5709
5710 if (env_ptr->gres_job_env) {
5711 for (i = 0; env_ptr->gres_job_env[i]; i++)
5712 xfree(env_ptr->gres_job_env[i]);
5713 xfree(env_ptr->gres_job_env);
5714 }
5715 xfree(env_ptr->resv_id);
5716 /* NOTE: spank_job_env is just a pointer without allocated memory */
5717 }
5718
_prolog_timer(void * x)5719 static void *_prolog_timer(void *x)
5720 {
5721 int delay_time, rc = SLURM_SUCCESS;
5722 struct timespec ts;
5723 struct timeval now;
5724 slurm_msg_t msg;
5725 job_notify_msg_t notify_req;
5726 char srun_msg[128];
5727 timer_struct_t *timer_struct = (timer_struct_t *) x;
5728
5729 delay_time = MAX(2, (timer_struct->msg_timeout - 2));
5730 gettimeofday(&now, NULL);
5731 ts.tv_sec = now.tv_sec + delay_time;
5732 ts.tv_nsec = now.tv_usec * 1000;
5733 slurm_mutex_lock(timer_struct->timer_mutex);
5734 if (!timer_struct->prolog_fini) {
5735 rc = pthread_cond_timedwait(timer_struct->timer_cond,
5736 timer_struct->timer_mutex, &ts);
5737 }
5738 slurm_mutex_unlock(timer_struct->timer_mutex);
5739
5740 if (rc != ETIMEDOUT)
5741 return NULL;
5742
5743 slurm_msg_t_init(&msg);
5744 snprintf(srun_msg, sizeof(srun_msg), "Prolog hung on node %s",
5745 conf->node_name);
5746 memset(¬ify_req, 0, sizeof(notify_req));
5747 notify_req.job_id = timer_struct->job_id;
5748 notify_req.job_step_id = NO_VAL;
5749 notify_req.message = srun_msg;
5750 msg.msg_type = REQUEST_JOB_NOTIFY;
5751 msg.data = ¬ify_req;
5752 slurm_send_only_controller_msg(&msg, working_cluster_rec);
5753 return NULL;
5754 }
5755
5756 static int
_run_prolog(job_env_t * job_env,slurm_cred_t * cred,bool remove_running)5757 _run_prolog(job_env_t *job_env, slurm_cred_t *cred, bool remove_running)
5758 {
5759 DEF_TIMERS;
5760 int diff_time, rc;
5761 time_t start_time = time(NULL);
5762 pthread_t timer_id;
5763 pthread_cond_t timer_cond = PTHREAD_COND_INITIALIZER;
5764 pthread_mutex_t timer_mutex = PTHREAD_MUTEX_INITIALIZER;
5765 timer_struct_t timer_struct;
5766 bool prolog_fini = false;
5767 bool script_lock = false;
5768
5769 if (slurmctld_conf.prolog_flags & PROLOG_FLAG_SERIAL) {
5770 slurm_mutex_lock(&prolog_serial_mutex);
5771 script_lock = true;
5772 }
5773
5774 timer_struct.job_id = job_env->jobid;
5775 timer_struct.msg_timeout = slurmctld_conf.msg_timeout;
5776 timer_struct.prolog_fini = &prolog_fini;
5777 timer_struct.timer_cond = &timer_cond;
5778 timer_struct.timer_mutex = &timer_mutex;
5779 slurm_thread_create(&timer_id, _prolog_timer, &timer_struct);
5780
5781 START_TIMER;
5782
5783 rc = prep_prolog(job_env, cred);
5784
5785 END_TIMER;
5786 info("%s: run job script took %s", __func__, TIME_STR);
5787 slurm_mutex_lock(&timer_mutex);
5788 prolog_fini = true;
5789 slurm_cond_broadcast(&timer_cond);
5790 slurm_mutex_unlock(&timer_mutex);
5791
5792 diff_time = difftime(time(NULL), start_time);
5793 info("%s: prolog with lock for job %u ran for %d seconds",
5794 __func__, job_env->jobid, diff_time);
5795 if (diff_time >= (slurmctld_conf.msg_timeout / 2)) {
5796 info("prolog for job %u ran for %d seconds",
5797 job_env->jobid, diff_time);
5798 }
5799
5800 if (remove_running)
5801 _remove_job_running_prolog(job_env->jobid);
5802
5803 if (timer_id)
5804 pthread_join(timer_id, NULL);
5805 if (script_lock)
5806 slurm_mutex_unlock(&prolog_serial_mutex);
5807
5808 return rc;
5809 }
5810
5811 static int
_run_epilog(job_env_t * job_env)5812 _run_epilog(job_env_t *job_env)
5813 {
5814 time_t start_time = time(NULL);
5815 int error_code, diff_time;
5816 bool script_lock = false;
5817
5818 _wait_for_job_running_prolog(job_env->jobid);
5819
5820 if (slurmctld_conf.prolog_flags & PROLOG_FLAG_SERIAL) {
5821 slurm_mutex_lock(&prolog_serial_mutex);
5822 script_lock = true;
5823 }
5824
5825 error_code = prep_epilog(job_env, NULL);
5826
5827 diff_time = difftime(time(NULL), start_time);
5828 if (diff_time >= (slurmctld_conf.msg_timeout / 2)) {
5829 info("epilog for job %u ran for %d seconds",
5830 job_env->jobid, diff_time);
5831 }
5832
5833 if (script_lock)
5834 slurm_mutex_unlock(&prolog_serial_mutex);
5835
5836 return error_code;
5837 }
5838
5839 static int
_add_starting_step(uint16_t type,void * req)5840 _add_starting_step(uint16_t type, void *req)
5841 {
5842 starting_step_t *starting_step;
5843
5844 /* Add the step info to a list of starting processes that
5845 cannot reliably be contacted. */
5846 starting_step = xmalloc(sizeof(starting_step_t));
5847
5848 switch (type) {
5849 case LAUNCH_BATCH_JOB:
5850 starting_step->job_id =
5851 ((batch_job_launch_msg_t *)req)->job_id;
5852 starting_step->step_id =
5853 ((batch_job_launch_msg_t *)req)->step_id;
5854 break;
5855 case LAUNCH_TASKS:
5856 starting_step->job_id =
5857 ((launch_tasks_request_msg_t *)req)->job_id;
5858 starting_step->step_id =
5859 ((launch_tasks_request_msg_t *)req)->job_step_id;
5860 break;
5861 case REQUEST_LAUNCH_PROLOG:
5862 starting_step->job_id = ((prolog_launch_msg_t *)req)->job_id;
5863 starting_step->step_id = SLURM_EXTERN_CONT;
5864 break;
5865 default:
5866 error("%s called with an invalid type: %u", __func__, type);
5867 xfree(starting_step);
5868 return SLURM_ERROR;
5869 }
5870
5871 list_append(conf->starting_steps, starting_step);
5872
5873 return SLURM_SUCCESS;
5874 }
5875
5876
5877 static int
_remove_starting_step(uint16_t type,void * req)5878 _remove_starting_step(uint16_t type, void *req)
5879 {
5880 starting_step_t starting_step;
5881 int rc = SLURM_SUCCESS;
5882
5883 switch(type) {
5884 case LAUNCH_BATCH_JOB:
5885 starting_step.job_id =
5886 ((batch_job_launch_msg_t *)req)->job_id;
5887 starting_step.step_id =
5888 ((batch_job_launch_msg_t *)req)->step_id;
5889 break;
5890 case LAUNCH_TASKS:
5891 starting_step.job_id =
5892 ((launch_tasks_request_msg_t *)req)->job_id;
5893 starting_step.step_id =
5894 ((launch_tasks_request_msg_t *)req)->job_step_id;
5895 break;
5896 default:
5897 error("%s called with an invalid type: %u", __func__, type);
5898 rc = SLURM_ERROR;
5899 goto fail;
5900 }
5901
5902 if (!list_delete_all(conf->starting_steps,
5903 _compare_starting_steps,
5904 &starting_step)) {
5905 error("%s: step %u.%u not found", __func__,
5906 starting_step.job_id,
5907 starting_step.step_id);
5908 rc = SLURM_ERROR;
5909 }
5910 slurm_cond_broadcast(&conf->starting_steps_cond);
5911 fail:
5912 return rc;
5913 }
5914
5915
5916
_compare_starting_steps(void * listentry,void * key)5917 static int _compare_starting_steps(void *listentry, void *key)
5918 {
5919 starting_step_t *step0 = (starting_step_t *)listentry;
5920 starting_step_t *step1 = (starting_step_t *)key;
5921
5922 return (step0->job_id == step1->job_id &&
5923 step0->step_id == step1->step_id);
5924 }
5925
5926
5927 /* Wait for a step to get far enough in the launch process to have
5928 a socket open, ready to handle RPC calls. Pass step_id = NO_VAL
5929 to wait on any step for the given job. */
5930
_wait_for_starting_step(uint32_t job_id,uint32_t step_id)5931 static int _wait_for_starting_step(uint32_t job_id, uint32_t step_id)
5932 {
5933 static pthread_mutex_t dummy_lock = PTHREAD_MUTEX_INITIALIZER;
5934 struct timespec ts = {0, 0};
5935 struct timeval now;
5936
5937 starting_step_t starting_step;
5938 int num_passes = 0;
5939
5940 starting_step.job_id = job_id;
5941 starting_step.step_id = step_id;
5942
5943 while (list_find_first(conf->starting_steps,
5944 _compare_starting_steps,
5945 &starting_step)) {
5946 if (num_passes == 0) {
5947 if (step_id != NO_VAL)
5948 debug("Blocked waiting for step %d.%d",
5949 job_id, step_id);
5950 else
5951 debug("Blocked waiting for job %d, all steps",
5952 job_id);
5953 }
5954 num_passes++;
5955
5956 gettimeofday(&now, NULL);
5957 ts.tv_sec = now.tv_sec+1;
5958 ts.tv_nsec = now.tv_usec * 1000;
5959
5960 slurm_mutex_lock(&dummy_lock);
5961 slurm_cond_timedwait(&conf->starting_steps_cond,
5962 &dummy_lock, &ts);
5963 slurm_mutex_unlock(&dummy_lock);
5964 }
5965 if (num_passes > 0) {
5966 if (step_id != NO_VAL)
5967 debug("Finished wait for step %d.%d",
5968 job_id, step_id);
5969 else
5970 debug("Finished wait for job %d, all steps",
5971 job_id);
5972 }
5973
5974 return SLURM_SUCCESS;
5975 }
5976
5977
5978 /* Return true if the step has not yet confirmed that its socket to
5979 handle RPC calls has been created. Pass step_id = NO_VAL
5980 to return true if any of the job's steps are still starting. */
_step_is_starting(uint32_t job_id,uint32_t step_id)5981 static bool _step_is_starting(uint32_t job_id, uint32_t step_id)
5982 {
5983 starting_step_t starting_step;
5984 starting_step.job_id = job_id;
5985 starting_step.step_id = step_id;
5986 bool ret = false;
5987
5988 if (list_find_first(conf->starting_steps,
5989 _compare_starting_steps,
5990 &starting_step )) {
5991 ret = true;
5992 }
5993
5994 return ret;
5995 }
5996
5997 /* Add this job to the list of jobs currently running their prolog */
_add_job_running_prolog(uint32_t job_id)5998 static void _add_job_running_prolog(uint32_t job_id)
5999 {
6000 uint32_t *job_running_prolog;
6001
6002 /* Add the job to a list of jobs whose prologs are running */
6003 job_running_prolog = xmalloc(sizeof(uint32_t));
6004 *job_running_prolog = job_id;
6005
6006 list_append(conf->prolog_running_jobs, job_running_prolog);
6007 }
6008
6009 /* Remove this job from the list of jobs currently running their prolog */
_remove_job_running_prolog(uint32_t job_id)6010 static void _remove_job_running_prolog(uint32_t job_id)
6011 {
6012 if (!list_delete_all(conf->prolog_running_jobs,
6013 _match_jobid, &job_id))
6014 error("_remove_job_running_prolog: job not found");
6015 slurm_cond_broadcast(&conf->prolog_running_cond);
6016 }
6017
_match_jobid(void * listentry,void * key)6018 static int _match_jobid(void *listentry, void *key)
6019 {
6020 uint32_t *job0 = (uint32_t *)listentry;
6021 uint32_t *job1 = (uint32_t *)key;
6022
6023 return (*job0 == *job1);
6024 }
6025
_prolog_is_running(uint32_t jobid)6026 static int _prolog_is_running (uint32_t jobid)
6027 {
6028 int rc = 0;
6029 if (conf->prolog_running_jobs &&
6030 list_find_first(conf->prolog_running_jobs,
6031 _match_jobid, &jobid))
6032 rc = 1;
6033 return (rc);
6034 }
6035
6036 /* Wait for the job's prolog to complete */
_wait_for_job_running_prolog(uint32_t job_id)6037 static void _wait_for_job_running_prolog(uint32_t job_id)
6038 {
6039 static pthread_mutex_t dummy_lock = PTHREAD_MUTEX_INITIALIZER;
6040 struct timespec ts = {0, 0};
6041 struct timeval now;
6042
6043 debug("Waiting for job %d's prolog to complete", job_id);
6044
6045 while (_prolog_is_running (job_id)) {
6046
6047 gettimeofday(&now, NULL);
6048 ts.tv_sec = now.tv_sec+1;
6049 ts.tv_nsec = now.tv_usec * 1000;
6050
6051 slurm_mutex_lock(&dummy_lock);
6052 slurm_cond_timedwait(&conf->prolog_running_cond,
6053 &dummy_lock, &ts);
6054 slurm_mutex_unlock(&dummy_lock);
6055 }
6056
6057 debug("Finished wait for job %d's prolog to complete", job_id);
6058 }
6059
6060
6061 static void
_rpc_forward_data(slurm_msg_t * msg)6062 _rpc_forward_data(slurm_msg_t *msg)
6063 {
6064 forward_data_msg_t *req = (forward_data_msg_t *)msg->data;
6065 uint32_t req_uid = (uint32_t) g_slurm_auth_get_uid(msg->auth_cred);
6066 struct sockaddr_un sa;
6067 int fd = -1, rc = 0;
6068
6069 /* Make sure we adjust for the spool dir coming in on the address to
6070 * point to the right spot.
6071 */
6072 xstrsubstitute(req->address, "%n", conf->node_name);
6073 xstrsubstitute(req->address, "%h", conf->node_name);
6074 debug3("Entering _rpc_forward_data, address: %s, len: %u",
6075 req->address, req->len);
6076
6077 /*
6078 * If socket name would be truncated, emit error and exit
6079 */
6080 if (strlen(req->address) >= sizeof(sa.sun_path)) {
6081 error("%s: Unix socket path '%s' is too long. (%ld > %ld)",
6082 __func__, req->address,
6083 (long int)(strlen(req->address) + 1),
6084 (long int)sizeof(sa.sun_path));
6085 slurm_seterrno(EINVAL);
6086 rc = errno;
6087 goto done;
6088 }
6089
6090 /* connect to specified address */
6091 fd = socket(AF_UNIX, SOCK_STREAM, 0);
6092 if (fd < 0) {
6093 rc = errno;
6094 error("failed creating UNIX domain socket: %m");
6095 goto done;
6096 }
6097 memset(&sa, 0, sizeof(sa));
6098 sa.sun_family = AF_UNIX;
6099 strcpy(sa.sun_path, req->address);
6100 while (((rc = connect(fd, (struct sockaddr *)&sa, SUN_LEN(&sa))) < 0) &&
6101 (errno == EINTR));
6102 if (rc < 0) {
6103 rc = errno;
6104 debug2("failed connecting to specified socket '%s': %m",
6105 req->address);
6106 goto done;
6107 }
6108
6109 /*
6110 * although always in localhost, we still convert it to network
6111 * byte order, to make it consistent with pack/unpack.
6112 */
6113 req_uid = htonl(req_uid);
6114 safe_write(fd, &req_uid, sizeof(uint32_t));
6115 req_uid = htonl(req->len);
6116 safe_write(fd, &req_uid, sizeof(uint32_t));
6117 safe_write(fd, req->data, req->len);
6118
6119 rwfail:
6120 done:
6121 if (fd >= 0){
6122 close(fd);
6123 }
6124 slurm_send_rc_msg(msg, rc);
6125 }
6126
_launch_complete_add(uint32_t job_id)6127 static void _launch_complete_add(uint32_t job_id)
6128 {
6129 int j, empty;
6130
6131 slurm_mutex_lock(&job_state_mutex);
6132 empty = -1;
6133 for (j = 0; j < JOB_STATE_CNT; j++) {
6134 if (job_id == active_job_id[j])
6135 break;
6136 if ((active_job_id[j] == 0) && (empty == -1))
6137 empty = j;
6138 }
6139 if (j >= JOB_STATE_CNT || job_id != active_job_id[j]) {
6140 if (empty == -1) /* Discard oldest job */
6141 empty = 0;
6142 for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6143 active_job_id[j - 1] = active_job_id[j];
6144 }
6145 active_job_id[JOB_STATE_CNT - 1] = 0;
6146 for (j = 0; j < JOB_STATE_CNT; j++) {
6147 if (active_job_id[j] == 0) {
6148 active_job_id[j] = job_id;
6149 break;
6150 }
6151 }
6152 }
6153 slurm_cond_signal(&job_state_cond);
6154 slurm_mutex_unlock(&job_state_mutex);
6155 _launch_complete_log("job add", job_id);
6156 }
6157
_launch_complete_log(char * type,uint32_t job_id)6158 static void _launch_complete_log(char *type, uint32_t job_id)
6159 {
6160 #if 0
6161 int j;
6162
6163 info("active %s %u", type, job_id);
6164 slurm_mutex_lock(&job_state_mutex);
6165 for (j = 0; j < JOB_STATE_CNT; j++) {
6166 if (active_job_id[j] != 0) {
6167 info("active_job_id[%d]=%u", j, active_job_id[j]);
6168 }
6169 }
6170 slurm_mutex_unlock(&job_state_mutex);
6171 #endif
6172 }
6173
6174 /* Test if we have a specific job ID still running */
_launch_job_test(uint32_t job_id)6175 static bool _launch_job_test(uint32_t job_id)
6176 {
6177 bool found = false;
6178 int j;
6179
6180 slurm_mutex_lock(&job_state_mutex);
6181 for (j = 0; j < JOB_STATE_CNT; j++) {
6182 if (job_id == active_job_id[j]) {
6183 found = true;
6184 break;
6185 }
6186 }
6187 slurm_mutex_unlock(&job_state_mutex);
6188 return found;
6189 }
6190
6191
_launch_complete_rm(uint32_t job_id)6192 static void _launch_complete_rm(uint32_t job_id)
6193 {
6194 int j;
6195
6196 slurm_mutex_lock(&job_state_mutex);
6197 for (j = 0; j < JOB_STATE_CNT; j++) {
6198 if (job_id == active_job_id[j])
6199 break;
6200 }
6201 if (j < JOB_STATE_CNT && job_id == active_job_id[j]) {
6202 for (j = j + 1; j < JOB_STATE_CNT; j++) {
6203 active_job_id[j - 1] = active_job_id[j];
6204 }
6205 active_job_id[JOB_STATE_CNT - 1] = 0;
6206 }
6207 slurm_mutex_unlock(&job_state_mutex);
6208 _launch_complete_log("job remove", job_id);
6209 }
6210
_launch_complete_wait(uint32_t job_id)6211 static void _launch_complete_wait(uint32_t job_id)
6212 {
6213 int i, j, empty;
6214 time_t start = time(NULL);
6215 struct timeval now;
6216 struct timespec timeout;
6217
6218 slurm_mutex_lock(&job_state_mutex);
6219 for (i = 0; ; i++) {
6220 empty = -1;
6221 for (j = 0; j < JOB_STATE_CNT; j++) {
6222 if (job_id == active_job_id[j])
6223 break;
6224 if ((active_job_id[j] == 0) && (empty == -1))
6225 empty = j;
6226 }
6227 if (j < JOB_STATE_CNT) /* Found job, ready to return */
6228 break;
6229 if (difftime(time(NULL), start) <= 9) { /* Retry for 9 secs */
6230 debug2("wait for launch of job %u before suspending it",
6231 job_id);
6232 gettimeofday(&now, NULL);
6233 timeout.tv_sec = now.tv_sec + 1;
6234 timeout.tv_nsec = now.tv_usec * 1000;
6235 slurm_cond_timedwait(&job_state_cond,&job_state_mutex,
6236 &timeout);
6237 continue;
6238 }
6239 if (empty == -1) /* Discard oldest job */
6240 empty = 0;
6241 for (j = empty + 1; j < JOB_STATE_CNT; j++) {
6242 active_job_id[j - 1] = active_job_id[j];
6243 }
6244 active_job_id[JOB_STATE_CNT - 1] = 0;
6245 for (j = 0; j < JOB_STATE_CNT; j++) {
6246 if (active_job_id[j] == 0) {
6247 active_job_id[j] = job_id;
6248 break;
6249 }
6250 }
6251 break;
6252 }
6253 slurm_mutex_unlock(&job_state_mutex);
6254 _launch_complete_log("job wait", job_id);
6255 }
6256
6257 static bool
_requeue_setup_env_fail(void)6258 _requeue_setup_env_fail(void)
6259 {
6260 static time_t config_update = 0;
6261 static bool requeue = false;
6262
6263 if (config_update != conf->last_update) {
6264 char *sched_params = slurm_get_sched_params();
6265 requeue = ((xstrcasestr(sched_params, "no_env_cache") ||
6266 xstrcasestr(sched_params,
6267 "requeue_setup_env_fail")));
6268 xfree(sched_params);
6269 config_update = conf->last_update;
6270 }
6271
6272 return requeue;
6273 }
6274