1 /*****************************************************************************\
2  *  src/srun/allocate.c - srun functions for managing node allocations
3  *****************************************************************************
4  *  Copyright (C) 2002-2007 The Regents of the University of California.
5  *  Copyright (C) 2008-2010 Lawrence Livermore National Security.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Mark Grondona <mgrondona@llnl.gov>.
8  *  CODE-OCEC-09-009. All rights reserved.
9  *
10  *  This file is part of Slurm, a resource management program.
11  *  For details, see <https://slurm.schedmd.com/>.
12  *  Please also read the included file: DISCLAIMER.
13  *
14  *  Slurm is free software; you can redistribute it and/or modify it under
15  *  the terms of the GNU General Public License as published by the Free
16  *  Software Foundation; either version 2 of the License, or (at your option)
17  *  any later version.
18  *
19  *  In addition, as a special exception, the copyright holders give permission
20  *  to link the code of portions of this program with the OpenSSL library under
21  *  certain conditions as described in each individual source file, and
22  *  distribute linked combinations including the two. You must obey the GNU
23  *  General Public License in all respects for all of the code used other than
24  *  OpenSSL. If you modify file(s) with this exception, you may extend this
25  *  exception to your version of the file(s), but you are not obligated to do
26  *  so. If you do not wish to do so, delete this exception statement from your
27  *  version.  If you delete this exception statement from all source files in
28  *  the program, then also delete it here.
29  *
30  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
33  *  details.
34  *
35  *  You should have received a copy of the GNU General Public License along
36  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
37  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
38 \*****************************************************************************/
39 
40 #include "config.h"
41 
42 #include <poll.h>
43 #include <pwd.h>
44 #include <stdlib.h>
45 #include <sys/types.h>
46 #include <unistd.h>
47 
48 #include "src/common/env.h"
49 #include "src/common/fd.h"
50 #include "src/common/forward.h"
51 #include "src/common/list.h"
52 #include "src/common/log.h"
53 #include "src/common/macros.h"
54 #include "src/common/proc_args.h"
55 #include "src/common/slurm_auth.h"
56 #include "src/common/slurm_protocol_api.h"
57 #include "src/common/slurm_time.h"
58 #include "src/common/xmalloc.h"
59 #include "src/common/xsignal.h"
60 #include "src/common/xstring.h"
61 
62 #include "allocate.h"
63 #include "opt.h"
64 #include "launch.h"
65 
66 #define MAX_ALLOC_WAIT	60	/* seconds */
67 #define MIN_ALLOC_WAIT	5	/* seconds */
68 #define MAX_RETRIES	10
69 #define POLL_SLEEP	0.1	/* retry interval in seconds  */
70 
71 pthread_mutex_t msg_lock = PTHREAD_MUTEX_INITIALIZER;
72 pthread_cond_t msg_cond = PTHREAD_COND_INITIALIZER;
73 allocation_msg_thread_t *msg_thr = NULL;
74 struct pollfd global_fds[1];
75 uint16_t slurmctld_comm_port = 0;
76 
77 extern char **environ;
78 
79 static uint32_t pending_job_id = 0;
80 
81 /*
82  * Static Prototypes
83  */
84 static job_desc_msg_t *_job_desc_msg_create_from_opts(slurm_opt_t *opt_local);
85 static void _set_pending_job_id(uint32_t job_id);
86 static void _signal_while_allocating(int signo);
87 static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc);
88 
89 static sig_atomic_t destroy_job = 0;
90 
_set_pending_job_id(uint32_t job_id)91 static void _set_pending_job_id(uint32_t job_id)
92 {
93 	debug2("Pending job allocation %u", job_id);
94 	pending_job_id = job_id;
95 }
96 
_safe_signal_while_allocating(void * in_data)97 static void *_safe_signal_while_allocating(void *in_data)
98 {
99 	int signo = *(int *)in_data;
100 
101 	debug("Got signal %d", signo);
102 	xfree(in_data);
103 	if (signo == SIGCONT)
104 		return NULL;
105 
106 	destroy_job = 1;
107 	if (pending_job_id != 0) {
108 		info("Job allocation %u has been revoked", pending_job_id);
109 		slurm_complete_job(pending_job_id, NO_VAL);
110 		destroy_job = 1;
111 	}
112 
113 	return NULL;
114 }
115 
_signal_while_allocating(int signo)116 static void _signal_while_allocating(int signo)
117 {
118 	int *local_signal;
119 
120 	/*
121 	 * There are places where _signal_while_allocating() can't be
122 	 * put into a thread, but if this isn't on a separate thread
123 	 * and we try to print something using the log functions and
124 	 * it just so happens to be in a poll or something we can get
125 	 * deadlock. So after the signal happens we are able to spawn
126 	 * a thread here and avoid the deadlock.
127 	 *
128 	 * SO, DON'T PRINT ANYTHING IN THIS FUNCTION.
129 	 */
130 	local_signal = xmalloc(sizeof(int));
131 	*local_signal = signo;
132 	slurm_thread_create_detached(NULL, _safe_signal_while_allocating,
133 				     local_signal);
134 }
135 
136 /* This typically signifies the job was cancelled by scancel */
_job_complete_handler(srun_job_complete_msg_t * msg)137 static void _job_complete_handler(srun_job_complete_msg_t *msg)
138 {
139 	if (pending_job_id && (pending_job_id != msg->job_id)) {
140 		error("Ignoring job_complete for job %u because our job ID is %u",
141 		      msg->job_id, pending_job_id);
142 		return;
143 	}
144 
145 	if (msg->step_id == NO_VAL)
146 		info("Force Terminated job %u", msg->job_id);
147 	else
148 		info("Force Terminated job %u.%u", msg->job_id, msg->step_id);
149 }
150 
151 /*
152  * Job has been notified of it's approaching time limit.
153  * Job will be killed shortly after timeout.
154  * This RPC can arrive multiple times with the same or updated timeouts.
155  * FIXME: We may want to signal the job or perform other action for this.
156  * FIXME: How much lead time do we want for this message? Some jobs may
157  *	require tens of minutes to gracefully terminate.
158  */
_timeout_handler(srun_timeout_msg_t * msg)159 static void _timeout_handler(srun_timeout_msg_t *msg)
160 {
161 	static time_t last_timeout = 0;
162 
163 	if (msg->timeout != last_timeout) {
164 		last_timeout = msg->timeout;
165 		verbose("job time limit to be reached at %s",
166 			slurm_ctime2(&msg->timeout));
167 	}
168 }
169 
_user_msg_handler(srun_user_msg_t * msg)170 static void _user_msg_handler(srun_user_msg_t *msg)
171 {
172 	info("%s", msg->msg);
173 }
174 
_ping_handler(srun_ping_msg_t * msg)175 static void _ping_handler(srun_ping_msg_t *msg)
176 {
177 	/* the api will respond so there really isn't anything to do
178 	   here */
179 }
180 
_node_fail_handler(srun_node_fail_msg_t * msg)181 static void _node_fail_handler(srun_node_fail_msg_t *msg)
182 {
183 	error("Node failure on %s", msg->nodelist);
184 }
185 
186 
187 
_retry(void)188 static bool _retry(void)
189 {
190 	static int  retries = 0;
191 	static char *msg = "Slurm controller not responding, "
192 		"sleeping and retrying.";
193 
194 	if ((errno == ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) || (errno == EAGAIN)) {
195 		if (retries == 0)
196 			error("%s", msg);
197 		else if (retries < MAX_RETRIES)
198 			debug("%s", msg);
199 		else
200 			return false;
201 		sleep (++retries);
202 	} else if (errno == EINTR) {
203 		/* srun may be interrupted by the BLCR checkpoint signal */
204 		/*
205 		 * XXX: this will cause the old job cancelled and a new
206 		 * job allocated
207 		 */
208 		debug("Syscall interrupted while allocating resources, "
209 		      "retrying.");
210 		return true;
211 	} else if (opt.immediate &&
212 		   ((errno == ETIMEDOUT) || (errno == ESLURM_NODES_BUSY))) {
213 		error("Unable to allocate resources: %s",
214 		      slurm_strerror(ESLURM_NODES_BUSY));
215 		error_exit = immediate_exit;
216 		return false;
217 	} else if ((errno == SLURM_PROTOCOL_AUTHENTICATION_ERROR) ||
218 		   (errno == SLURM_UNEXPECTED_MSG_ERROR) ||
219 		   (errno == SLURM_PROTOCOL_INSANE_MSG_LENGTH)) {
220 		static int external_msg_count = 0;
221 		error("Srun communication socket apparently being written to "
222 		      "by something other than Slurm");
223 		if (external_msg_count++ < 4)
224 			return true;
225 		error("Unable to allocate resources: %m");
226 		return false;
227 	} else {
228 		error("Unable to allocate resources: %m");
229 		return false;
230 	}
231 
232 	return true;
233 }
234 
235 /* returns 1 if job and nodes are ready for job to begin, 0 otherwise */
_wait_nodes_ready(resource_allocation_response_msg_t * alloc)236 static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc)
237 {
238 	int is_ready = 0, i, rc;
239 	double cur_delay = 0;
240 	double cur_sleep = 0;
241 	int suspend_time, resume_time, max_delay;
242 	bool job_killed = false;
243 
244 	suspend_time = slurm_get_suspend_timeout();
245 	resume_time  = slurm_get_resume_timeout();
246 	if ((suspend_time == 0) || (resume_time == 0))
247 		return 1;	/* Power save mode disabled */
248 	max_delay = suspend_time + resume_time;
249 	max_delay *= 5;		/* Allow for ResumeRate support */
250 
251 	pending_job_id = alloc->job_id;
252 
253 	for (i = 0; cur_delay < max_delay; i++) {
254 		if (i) {
255 			cur_sleep = POLL_SLEEP * i;
256 			if (i == 1) {
257 				verbose("Waiting for nodes to boot (delay looping %d times @ %f secs x index)",
258 					max_delay, POLL_SLEEP);
259 			} else {
260 				debug("Waited %f sec and still waiting: next sleep for %f sec",
261 				      cur_delay, cur_sleep);
262 			}
263 			usleep(USEC_IN_SEC * cur_sleep);
264 			cur_delay += cur_sleep;
265 		}
266 
267 		rc = slurm_job_node_ready(alloc->job_id);
268 		if (rc == READY_JOB_FATAL)
269 			break;				/* fatal error */
270 		if ((rc == READY_JOB_ERROR) || (rc == EAGAIN))
271 			continue;			/* retry */
272 		if ((rc & READY_JOB_STATE) == 0) {	/* job killed */
273 			job_killed = true;
274 			break;
275 		}
276 		if (rc & READY_NODE_STATE) {		/* job and node ready */
277 			is_ready = 1;
278 			break;
279 		}
280 		if (destroy_job)
281 			break;
282 	}
283 	if (is_ready) {
284 		resource_allocation_response_msg_t *resp;
285 		char *tmp_str;
286 		if (i > 0)
287      			verbose("Nodes %s are ready for job", alloc->node_list);
288 		if (alloc->alias_list && !xstrcmp(alloc->alias_list, "TBD") &&
289 		    (slurm_allocation_lookup(pending_job_id, &resp)
290 		     == SLURM_SUCCESS)) {
291 			tmp_str = alloc->alias_list;
292 			alloc->alias_list = resp->alias_list;
293 			resp->alias_list = tmp_str;
294 			slurm_free_resource_allocation_response_msg(resp);
295 		}
296 	} else if (!destroy_job) {
297 		if (job_killed) {
298 			error("Job allocation %u has been revoked",
299 			      alloc->job_id);
300 			destroy_job = true;
301 		} else
302 			error("Nodes %s are still not ready", alloc->node_list);
303 	} else	/* allocation_interrupted and slurmctld not responing */
304 		is_ready = 0;
305 
306 	pending_job_id = 0;
307 
308 	return is_ready;
309 }
310 
_allocate_test(slurm_opt_t * opt_local)311 static int _allocate_test(slurm_opt_t *opt_local)
312 {
313 	job_desc_msg_t *j;
314 	int rc;
315 
316 	if ((j = _job_desc_msg_create_from_opts(opt_local)) == NULL)
317 		return SLURM_ERROR;
318 
319 	if (opt_local->clusters &&
320 	    (slurmdb_get_first_avail_cluster(j, opt_local->clusters,
321 					     &working_cluster_rec)
322 	     != SLURM_SUCCESS)) {
323 		print_db_notok(opt_local->clusters, 0);
324 		return SLURM_ERROR;
325 	}
326 
327 	rc = slurm_job_will_run(j);
328 	job_desc_msg_destroy(j);
329 	return rc;
330 
331 }
332 
allocate_test(void)333 extern int allocate_test(void)
334 {
335 	int rc = SLURM_SUCCESS;
336 	ListIterator iter;
337 	slurm_opt_t *opt_local;
338 
339 	if (opt_list) {
340 		iter = list_iterator_create(opt_list);
341 		while ((opt_local = list_next(iter))) {
342 			if ((rc = _allocate_test(opt_local)) != SLURM_SUCCESS)
343 				break;
344  		}
345 		list_iterator_destroy(iter);
346 	} else {
347 		rc = _allocate_test(&opt);
348 	}
349 
350 	return rc;
351 }
352 
353 /*
354  * Allocate nodes from the slurm controller -- retrying the attempt
355  * if the controller appears to be down, and optionally waiting for
356  * resources if none are currently available (see opt.immediate)
357  *
358  * Returns a pointer to a resource_allocation_response_msg which must
359  * be freed with slurm_free_resource_allocation_response_msg()
360  */
361 extern resource_allocation_response_msg_t *
allocate_nodes(bool handle_signals,slurm_opt_t * opt_local)362 	allocate_nodes(bool handle_signals, slurm_opt_t *opt_local)
363 
364 {
365 	srun_opt_t *srun_opt = opt_local->srun_opt;
366 	resource_allocation_response_msg_t *resp = NULL;
367 	job_desc_msg_t *j;
368 	slurm_allocation_callbacks_t callbacks;
369 	int i;
370 
371 	xassert(srun_opt);
372 
373 	if (srun_opt->relative != NO_VAL)
374 		fatal("--relative option invalid for job allocation request");
375 
376 	if ((j = _job_desc_msg_create_from_opts(&opt)) == NULL)
377 		return NULL;
378 
379 	if (opt_local->clusters &&
380 	    (slurmdb_get_first_avail_cluster(j, opt_local->clusters,
381 					     &working_cluster_rec)
382 	     != SLURM_SUCCESS)) {
383 		print_db_notok(opt_local->clusters, 0);
384 		return NULL;
385 	}
386 
387 	j->origin_cluster = xstrdup(slurmctld_conf.cluster_name);
388 
389 	callbacks.ping = _ping_handler;
390 	callbacks.timeout = _timeout_handler;
391 	callbacks.job_complete = _job_complete_handler;
392 	callbacks.job_suspend = NULL;
393 	callbacks.user_msg = _user_msg_handler;
394 	callbacks.node_fail = _node_fail_handler;
395 
396 	/* create message thread to handle pings and such from slurmctld */
397 	msg_thr = slurm_allocation_msg_thr_create(&j->other_port, &callbacks);
398 
399 	/* NOTE: Do not process signals in separate pthread. The signal will
400 	 * cause slurm_allocate_resources_blocking() to exit immediately. */
401 	if (handle_signals) {
402 		xsignal_unblock(sig_array);
403 		for (i = 0; sig_array[i]; i++)
404 			xsignal(sig_array[i], _signal_while_allocating);
405 	}
406 
407 	while (!resp) {
408 		resp = slurm_allocate_resources_blocking(j,
409 							 opt_local->immediate,
410 							 _set_pending_job_id);
411 		if (destroy_job) {
412 			/* cancelled by signal */
413 			break;
414 		} else if (!resp && !_retry()) {
415 			break;
416 		}
417 	}
418 
419 	if (resp)
420 		print_multi_line_string(resp->job_submit_user_msg,
421 					-1, LOG_LEVEL_INFO);
422 
423 	if (resp && !destroy_job) {
424 		/*
425 		 * Allocation granted!
426 		 */
427 		pending_job_id = resp->job_id;
428 
429 		/*
430 		 * These values could be changed while the job was
431 		 * pending so overwrite the request with what was
432 		 * allocated so we don't have issues when we use them
433 		 * in the step creation.
434 		 */
435 		opt_local->pn_min_memory = NO_VAL64;
436 		opt_local->mem_per_cpu = NO_VAL64;
437 		if (resp->pn_min_memory != NO_VAL64) {
438 			if (resp->pn_min_memory & MEM_PER_CPU) {
439 				opt_local->mem_per_cpu = (resp->pn_min_memory &
440 							 (~MEM_PER_CPU));
441 			} else {
442 				opt_local->pn_min_memory = resp->pn_min_memory;
443 			}
444 		}
445 
446 		opt_local->min_nodes = resp->node_cnt;
447 		opt_local->max_nodes = resp->node_cnt;
448 
449 		if (resp->working_cluster_rec)
450 			slurm_setup_remote_working_cluster(resp);
451 
452 		if (!_wait_nodes_ready(resp)) {
453 			if (!destroy_job)
454 				error("Something is wrong with the boot of the nodes.");
455 			goto relinquish;
456 		}
457 	} else if (destroy_job) {
458 		goto relinquish;
459 	}
460 
461 	if (handle_signals)
462 		xsignal_block(sig_array);
463 
464 	job_desc_msg_destroy(j);
465 
466 	return resp;
467 
468 relinquish:
469 	if (resp) {
470 		if (!destroy_job)
471 			slurm_complete_job(resp->job_id, 1);
472 		slurm_free_resource_allocation_response_msg(resp);
473 	}
474 	exit(error_exit);
475 	return NULL;
476 }
477 
478 /*
479  * Allocate nodes for heterogeneous job from the slurm controller --
480  * retrying the attempt if the controller appears to be down, and optionally
481  * waiting for resources if none are currently available (see opt.immediate)
482  *
483  * Returns a pointer to a resource_allocation_response_msg which must
484  * be freed with slurm_free_resource_allocation_response_msg()
485  */
allocate_het_job_nodes(bool handle_signals)486 List allocate_het_job_nodes(bool handle_signals)
487 {
488 	resource_allocation_response_msg_t *resp = NULL;
489 	job_desc_msg_t *j, *first_job = NULL;
490 	slurm_allocation_callbacks_t callbacks;
491 	ListIterator opt_iter, resp_iter;
492 	slurm_opt_t *opt_local, *first_opt = NULL;
493 	List job_req_list = NULL, job_resp_list = NULL;
494 	uint32_t my_job_id = 0;
495 	int i, k;
496 
497 	job_req_list = list_create(NULL);
498 	opt_iter = list_iterator_create(opt_list);
499 	while ((opt_local = list_next(opt_iter))) {
500 		srun_opt_t *srun_opt = opt_local->srun_opt;
501 		xassert(srun_opt);
502 		if (!first_opt)
503 			first_opt = opt_local;
504 		if (srun_opt->relative != NO_VAL)
505 			fatal("--relative option invalid for job allocation request");
506 
507 		if ((j = _job_desc_msg_create_from_opts(opt_local)) == NULL) {
508 			FREE_NULL_LIST(job_req_list);
509 			return NULL;
510 		}
511 		if (!first_job)
512 			first_job = j;
513 
514 		j->origin_cluster = xstrdup(slurmctld_conf.cluster_name);
515 
516 		list_append(job_req_list, j);
517 	}
518 	list_iterator_destroy(opt_iter);
519 
520 	if (!first_job) {
521 		error("%s: No job requests found", __func__);
522 		FREE_NULL_LIST(job_req_list);
523 		return NULL;
524 	}
525 
526 	if (first_opt && first_opt->clusters &&
527 	    (slurmdb_get_first_het_job_cluster(job_req_list,
528 					       first_opt->clusters,
529 					       &working_cluster_rec)
530 	     != SLURM_SUCCESS)) {
531 		print_db_notok(first_opt->clusters, 0);
532 		FREE_NULL_LIST(job_req_list);
533 		return NULL;
534 	}
535 
536 	callbacks.ping = _ping_handler;
537 	callbacks.timeout = _timeout_handler;
538 	callbacks.job_complete = _job_complete_handler;
539 	callbacks.job_suspend = NULL;
540 	callbacks.user_msg = _user_msg_handler;
541 	callbacks.node_fail = _node_fail_handler;
542 
543 	/* create message thread to handle pings and such from slurmctld */
544 	msg_thr = slurm_allocation_msg_thr_create(&first_job->other_port,
545 						  &callbacks);
546 
547 	/* NOTE: Do not process signals in separate pthread. The signal will
548 	 * cause slurm_allocate_resources_blocking() to exit immediately. */
549 	if (handle_signals) {
550 		xsignal_unblock(sig_array);
551 		for (i = 0; sig_array[i]; i++)
552 			xsignal(sig_array[i], _signal_while_allocating);
553 	}
554 
555 	while (first_opt && !job_resp_list) {
556 		job_resp_list = slurm_allocate_het_job_blocking(job_req_list,
557 				 first_opt->immediate, _set_pending_job_id);
558 		if (destroy_job) {
559 			/* cancelled by signal */
560 			break;
561 		} else if (!job_resp_list && !_retry()) {
562 			break;
563 		}
564 	}
565 	FREE_NULL_LIST(job_req_list);
566 
567 	if (job_resp_list && !destroy_job) {
568 		/*
569 		 * Allocation granted!
570 		 */
571 
572 		opt_iter  = list_iterator_create(opt_list);
573 		resp_iter = list_iterator_create(job_resp_list);
574 		while ((opt_local = list_next(opt_iter))) {
575 			resp = (resource_allocation_response_msg_t *)
576 			       list_next(resp_iter);
577 			if (!resp)
578 				break;
579 
580 			if (pending_job_id == 0)
581 				pending_job_id = resp->job_id;
582 			if (my_job_id == 0) {
583 				my_job_id = resp->job_id;
584 				i = list_count(opt_list);
585 				k = list_count(job_resp_list);
586 				if (i != k) {
587 					error("%s: request count != response count (%d != %d)",
588 					      __func__, i, k);
589 					goto relinquish;
590 				}
591 			}
592 
593 			/*
594 			 * These values could be changed while the job was
595 			 * pending so overwrite the request with what was
596 			 * allocated so we don't have issues when we use them
597 			 * in the step creation.
598 			 */
599 			if (opt_local->pn_min_memory != NO_VAL64)
600 				opt_local->pn_min_memory =
601 					(resp->pn_min_memory & (~MEM_PER_CPU));
602 			else if (opt_local->mem_per_cpu != NO_VAL64)
603 				opt_local->mem_per_cpu =
604 					(resp->pn_min_memory & (~MEM_PER_CPU));
605 
606 			opt_local->min_nodes = resp->node_cnt;
607 			opt_local->max_nodes = resp->node_cnt;
608 
609 			if (resp->working_cluster_rec)
610 				slurm_setup_remote_working_cluster(resp);
611 
612 			if (!_wait_nodes_ready(resp)) {
613 				if (!destroy_job)
614 					error("Something is wrong with the "
615 					      "boot of the nodes.");
616 				goto relinquish;
617 			}
618 		}
619 		list_iterator_destroy(resp_iter);
620 		list_iterator_destroy(opt_iter);
621 	} else if (destroy_job) {
622 		goto relinquish;
623 	}
624 
625 	if (handle_signals)
626 		xsignal_block(sig_array);
627 
628 	return job_resp_list;
629 
630 relinquish:
631 	if (job_resp_list) {
632 		if (!destroy_job && my_job_id)
633 			slurm_complete_job(my_job_id, 1);
634 		list_destroy(job_resp_list);
635 	}
636 	exit(error_exit);
637 	return NULL;
638 }
639 
640 void
ignore_signal(int signo)641 ignore_signal(int signo)
642 {
643 	/* do nothing */
644 }
645 
646 int
cleanup_allocation(void)647 cleanup_allocation(void)
648 {
649 	slurm_allocation_msg_thr_destroy(msg_thr);
650 	return SLURM_SUCCESS;
651 }
652 
existing_allocation(void)653 extern List existing_allocation(void)
654 {
655 	uint32_t old_job_id;
656 	List job_resp_list = NULL;
657 
658 	if (sropt.jobid == NO_VAL)
659 		return NULL;
660 
661 	if (opt.clusters) {
662 		List clusters = NULL;
663 		if (!(clusters = slurmdb_get_info_cluster(opt.clusters))) {
664 			print_db_notok(opt.clusters, 0);
665 			exit(1);
666 		}
667 		working_cluster_rec = list_peek(clusters);
668 		debug2("Looking for job %d on cluster %s (addr: %s)",
669 		       sropt.jobid,
670 		       working_cluster_rec->name,
671 		       working_cluster_rec->control_host);
672 	}
673 
674 	old_job_id = (uint32_t) sropt.jobid;
675 	if (slurm_het_job_lookup(old_job_id, &job_resp_list) < 0) {
676 		if (sropt.parallel_debug)
677 			return NULL;    /* create new allocation as needed */
678 		if (errno == ESLURM_ALREADY_DONE)
679 			error("Slurm job %u has expired", old_job_id);
680 		else
681 			error("Unable to confirm allocation for job %u: %m",
682 			      old_job_id);
683 		info("Check SLURM_JOB_ID environment variable. Expired or invalid job %u",
684 		     old_job_id);
685 		exit(error_exit);
686 	}
687 
688 	return job_resp_list;
689 }
690 
691 /* Set up port to handle messages from slurmctld */
slurmctld_msg_init(void)692 int slurmctld_msg_init(void)
693 {
694 	slurm_addr_t slurm_address;
695 	static int slurmctld_fd = -1;
696 	uint16_t *ports;
697 
698 	if (slurmctld_fd >= 0)	/* May set early for queued job allocation */
699 		return slurmctld_fd;
700 
701 	if ((ports = slurm_get_srun_port_range()))
702 		slurmctld_fd = slurm_init_msg_engine_ports(ports);
703 	else
704 		slurmctld_fd = slurm_init_msg_engine_port(0);
705 
706 	if (slurmctld_fd < 0) {
707 		error("slurm_init_msg_engine_port error %m");
708 		exit(error_exit);
709 	}
710 
711 	if (slurm_get_stream_addr(slurmctld_fd, &slurm_address) < 0) {
712 		error("slurm_get_stream_addr error %m");
713 		exit(error_exit);
714 	}
715 	fd_set_nonblocking(slurmctld_fd);
716 	/* hostname is not set,  so slurm_get_addr fails
717 	   slurm_get_addr(&slurm_address, &port, hostname, sizeof(hostname)); */
718 	slurmctld_comm_port = ntohs(slurm_address.sin_port);
719 	debug2("srun PMI messages to port=%u", slurmctld_comm_port);
720 
721 	return slurmctld_fd;
722 }
723 
724 /*
725  * Create job description structure based off srun options
726  * (see opt.h)
727  */
_job_desc_msg_create_from_opts(slurm_opt_t * opt_local)728 static job_desc_msg_t *_job_desc_msg_create_from_opts(slurm_opt_t *opt_local)
729 {
730 	srun_opt_t *srun_opt = opt_local->srun_opt;
731 	job_desc_msg_t *j = xmalloc(sizeof(*j));
732 	hostlist_t hl = NULL;
733 	xassert(srun_opt);
734 
735 	slurm_init_job_desc_msg(j);
736 	j->contiguous     = opt_local->contiguous;
737 	if (opt_local->core_spec != NO_VAL16)
738 		j->core_spec      = opt_local->core_spec;
739 	j->features       = opt_local->constraint;
740 	j->cluster_features = opt_local->c_constraint;
741 	if (opt_local->immediate == 1)
742 		j->immediate = opt_local->immediate;
743 	if (opt_local->job_name)
744 		j->name   = opt_local->job_name;
745 	else
746 		j->name = srun_opt->cmd_name;
747 	if (srun_opt->argc > 0) {
748 		j->argc    = 1;
749 		j->argv    = (char **) xmalloc(sizeof(char *) * 2);
750 		j->argv[0] = xstrdup(srun_opt->argv[0]);
751 	}
752 	if (opt_local->acctg_freq)
753 		j->acctg_freq     = xstrdup(opt_local->acctg_freq);
754 	j->reservation    = opt_local->reservation;
755 	j->wckey          = opt_local->wckey;
756 	j->x11 = opt.x11;
757 	if (j->x11) {
758 		j->x11_magic_cookie = xstrdup(opt.x11_magic_cookie);
759 		j->x11_target = xstrdup(opt.x11_target);
760 		j->x11_target_port = opt.x11_target_port;
761 	}
762 
763 	j->req_nodes      = xstrdup(opt_local->nodelist);
764 
765 	/* simplify the job allocation nodelist,
766 	 * not laying out tasks until step */
767 	if (j->req_nodes) {
768 		hl = hostlist_create(j->req_nodes);
769 		xfree(opt_local->nodelist);
770 		opt_local->nodelist = hostlist_ranged_string_xmalloc(hl);
771 		hostlist_uniq(hl);
772 		xfree(j->req_nodes);
773 		j->req_nodes = hostlist_ranged_string_xmalloc(hl);
774 		hostlist_destroy(hl);
775 
776 	}
777 
778 	if (((opt_local->distribution & SLURM_DIST_STATE_BASE) ==
779 	     SLURM_DIST_ARBITRARY) && !j->req_nodes) {
780 		error("With Arbitrary distribution you need to "
781 		      "specify a nodelist or hostfile with the -w option");
782 		return NULL;
783 	}
784 	j->extra = opt_local->extra;
785 	j->exc_nodes      = opt_local->exclude;
786 	j->partition      = opt_local->partition;
787 	j->min_nodes      = opt_local->min_nodes;
788 	if (opt_local->sockets_per_node != NO_VAL)
789 		j->sockets_per_node    = opt_local->sockets_per_node;
790 	if (opt_local->cores_per_socket != NO_VAL)
791 		j->cores_per_socket      = opt_local->cores_per_socket;
792 	if (opt_local->threads_per_core != NO_VAL) {
793 		j->threads_per_core    = opt_local->threads_per_core;
794 		/* if 1 always make sure affinity knows about it */
795 		if (j->threads_per_core == 1)
796 			srun_opt->cpu_bind_type |= CPU_BIND_ONE_THREAD_PER_CORE;
797 	}
798 	j->user_id        = opt_local->uid;
799 	j->dependency     = opt_local->dependency;
800 	if (opt_local->nice != NO_VAL)
801 		j->nice   = NICE_OFFSET + opt_local->nice;
802 	if (opt_local->priority)
803 		j->priority = opt_local->priority;
804 	if (srun_opt->cpu_bind)
805 		j->cpu_bind = srun_opt->cpu_bind;
806 	if (srun_opt->cpu_bind_type)
807 		j->cpu_bind_type = srun_opt->cpu_bind_type;
808 	if (opt_local->delay_boot != NO_VAL)
809 		j->delay_boot = opt_local->delay_boot;
810 	if (opt_local->mem_bind)
811 		j->mem_bind       = opt_local->mem_bind;
812 	if (opt_local->mem_bind_type)
813 		j->mem_bind_type  = opt_local->mem_bind_type;
814 	if (opt_local->plane_size != NO_VAL)
815 		j->plane_size     = opt_local->plane_size;
816 	j->task_dist      = opt_local->distribution;
817 
818 	j->group_id       = opt_local->gid;
819 	j->mail_type      = opt_local->mail_type;
820 
821 	if (opt_local->ntasks_per_node != NO_VAL)
822 		j->ntasks_per_node   = opt_local->ntasks_per_node;
823 	if (opt_local->ntasks_per_socket != NO_VAL)
824 		j->ntasks_per_socket = opt_local->ntasks_per_socket;
825 	if (opt_local->ntasks_per_core != NO_VAL)
826 		j->ntasks_per_core   = opt_local->ntasks_per_core;
827 
828 	if (opt_local->mail_user)
829 		j->mail_user = opt_local->mail_user;
830 	if (opt_local->burst_buffer)
831 		j->burst_buffer = opt_local->burst_buffer;
832 	if (opt_local->begin)
833 		j->begin_time = opt_local->begin;
834 	if (opt_local->deadline)
835 		j->deadline = opt_local->deadline;
836 	if (opt_local->licenses)
837 		j->licenses = opt_local->licenses;
838 	if (opt_local->network)
839 		j->network = opt_local->network;
840 	if (opt_local->profile)
841 		j->profile = opt_local->profile;
842 	if (opt_local->account)
843 		j->account = opt_local->account;
844 	if (opt_local->comment)
845 		j->comment = opt_local->comment;
846 	if (opt_local->qos)
847 		j->qos = opt_local->qos;
848 	if (opt_local->chdir)
849 		j->work_dir = opt_local->chdir;
850 
851 	if (opt_local->hold)
852 		j->priority     = 0;
853 	if (opt_local->reboot)
854 		j->reboot = 1;
855 
856 	if (opt_local->max_nodes)
857 		j->max_nodes    = opt_local->max_nodes;
858 	else if (opt_local->nodes_set) {
859 		/* On an allocation if the max nodes isn't set set it
860 		 * to do the same behavior as with salloc or sbatch.
861 		 */
862 		j->max_nodes    = opt_local->min_nodes;
863 	}
864 	if (opt_local->pn_min_cpus > -1)
865 		j->pn_min_cpus = opt_local->pn_min_cpus;
866 	if (opt_local->pn_min_memory != NO_VAL64)
867 		j->pn_min_memory = opt_local->pn_min_memory;
868 	else if (opt_local->mem_per_cpu != NO_VAL64)
869 		j->pn_min_memory = opt_local->mem_per_cpu | MEM_PER_CPU;
870 	if (opt_local->pn_min_tmp_disk != NO_VAL64)
871 		j->pn_min_tmp_disk = opt_local->pn_min_tmp_disk;
872 	if (opt_local->overcommit) {
873 		j->min_cpus    = opt_local->min_nodes;
874 		j->overcommit  = opt_local->overcommit;
875 	} else if (opt_local->cpus_set)
876 		j->min_cpus    = opt_local->ntasks * opt_local->cpus_per_task;
877 	else
878 		j->min_cpus    = opt_local->ntasks;
879 	if (opt_local->ntasks_set)
880 		j->num_tasks   = opt_local->ntasks;
881 
882 	if (opt_local->cpus_set)
883 		j->cpus_per_task = opt_local->cpus_per_task;
884 
885 	if (opt_local->no_kill)
886 		j->kill_on_node_fail   = 0;
887 	if (opt_local->time_limit != NO_VAL)
888 		j->time_limit          = opt_local->time_limit;
889 	if (opt_local->time_min != NO_VAL)
890 		j->time_min            = opt_local->time_min;
891 	if (opt_local->shared != NO_VAL16)
892 		j->shared = opt_local->shared;
893 
894 	if (opt_local->warn_flags)
895 		j->warn_flags = opt_local->warn_flags;
896 	if (opt_local->warn_signal)
897 		j->warn_signal = opt_local->warn_signal;
898 	if (opt_local->warn_time)
899 		j->warn_time = opt_local->warn_time;
900 	if (opt_local->job_flags)
901 		j->bitflags = opt_local->job_flags;
902 
903 	if (opt_local->cpu_freq_min != NO_VAL)
904 		j->cpu_freq_min = opt_local->cpu_freq_min;
905 	if (opt_local->cpu_freq_max != NO_VAL)
906 		j->cpu_freq_max = opt_local->cpu_freq_max;
907 	if (opt_local->cpu_freq_gov != NO_VAL)
908 		j->cpu_freq_gov = opt_local->cpu_freq_gov;
909 
910 	if (opt_local->req_switch >= 0)
911 		j->req_switch = opt_local->req_switch;
912 	if (opt_local->wait4switch >= 0)
913 		j->wait4switch = opt_local->wait4switch;
914 
915 	/* srun uses the same listening port for the allocation response
916 	 * message as all other messages */
917 	j->alloc_resp_port = slurmctld_comm_port;
918 	j->other_port = slurmctld_comm_port;
919 
920 	if (opt_local->spank_job_env_size) {
921 		j->spank_job_env      = opt_local->spank_job_env;
922 		j->spank_job_env_size = opt_local->spank_job_env_size;
923 	}
924 
925 	j->power_flags = opt_local->power;
926 	if (opt_local->mcs_label)
927 		j->mcs_label = opt_local->mcs_label;
928 	j->wait_all_nodes = 1;
929 
930 	/* If can run on multiple clusters find the earliest run time
931 	 * and run it there */
932 	j->clusters = xstrdup(opt_local->clusters);
933 
934 	if (opt_local->cpus_per_gpu)
935 		xstrfmtcat(j->cpus_per_tres, "gpu:%d", opt_local->cpus_per_gpu);
936 	j->tres_bind = xstrdup(opt_local->tres_bind);
937 	j->tres_freq = xstrdup(opt_local->tres_freq);
938 	xfmt_tres(&j->tres_per_job,    "gpu", opt_local->gpus);
939 	xfmt_tres(&j->tres_per_node,   "gpu", opt_local->gpus_per_node);
940 	if (opt_local->gres && xstrcasecmp(opt_local->gres, "NONE")) {
941 		if (j->tres_per_node)
942 			xstrfmtcat(j->tres_per_node, ",%s", opt_local->gres);
943 		else
944 			j->tres_per_node = xstrdup(opt_local->gres);
945 	}
946 	xfmt_tres(&j->tres_per_socket, "gpu", opt_local->gpus_per_socket);
947 	xfmt_tres(&j->tres_per_task,   "gpu", opt_local->gpus_per_task);
948 	if (opt_local->mem_per_gpu != NO_VAL64)
949 		xstrfmtcat(j->mem_per_tres, "gpu:%"PRIu64,
950 			   opt_local->mem_per_gpu);
951 
952 	return j;
953 }
954 
955 void
job_desc_msg_destroy(job_desc_msg_t * j)956 job_desc_msg_destroy(job_desc_msg_t *j)
957 {
958 	if (j) {
959 		xfree(j->req_nodes);
960 		xfree(j);
961 	}
962 }
963 
create_job_step(srun_job_t * job,bool use_all_cpus,slurm_opt_t * opt_local)964 extern int create_job_step(srun_job_t *job, bool use_all_cpus,
965 			   slurm_opt_t *opt_local)
966 {
967 	return launch_g_create_job_step(job, use_all_cpus,
968 					_signal_while_allocating,
969 					&destroy_job, opt_local);
970 }
971