1 /*****************************************************************************\
2 * src/srun/allocate.c - srun functions for managing node allocations
3 *****************************************************************************
4 * Copyright (C) 2002-2007 The Regents of the University of California.
5 * Copyright (C) 2008-2010 Lawrence Livermore National Security.
6 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7 * Written by Mark Grondona <mgrondona@llnl.gov>.
8 * CODE-OCEC-09-009. All rights reserved.
9 *
10 * This file is part of Slurm, a resource management program.
11 * For details, see <https://slurm.schedmd.com/>.
12 * Please also read the included file: DISCLAIMER.
13 *
14 * Slurm is free software; you can redistribute it and/or modify it under
15 * the terms of the GNU General Public License as published by the Free
16 * Software Foundation; either version 2 of the License, or (at your option)
17 * any later version.
18 *
19 * In addition, as a special exception, the copyright holders give permission
20 * to link the code of portions of this program with the OpenSSL library under
21 * certain conditions as described in each individual source file, and
22 * distribute linked combinations including the two. You must obey the GNU
23 * General Public License in all respects for all of the code used other than
24 * OpenSSL. If you modify file(s) with this exception, you may extend this
25 * exception to your version of the file(s), but you are not obligated to do
26 * so. If you do not wish to do so, delete this exception statement from your
27 * version. If you delete this exception statement from all source files in
28 * the program, then also delete it here.
29 *
30 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
31 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
32 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
33 * details.
34 *
35 * You should have received a copy of the GNU General Public License along
36 * with Slurm; if not, write to the Free Software Foundation, Inc.,
37 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
38 \*****************************************************************************/
39
40 #include "config.h"
41
42 #include <poll.h>
43 #include <pwd.h>
44 #include <stdlib.h>
45 #include <sys/types.h>
46 #include <unistd.h>
47
48 #include "src/common/env.h"
49 #include "src/common/fd.h"
50 #include "src/common/forward.h"
51 #include "src/common/list.h"
52 #include "src/common/log.h"
53 #include "src/common/macros.h"
54 #include "src/common/proc_args.h"
55 #include "src/common/slurm_auth.h"
56 #include "src/common/slurm_protocol_api.h"
57 #include "src/common/slurm_time.h"
58 #include "src/common/xmalloc.h"
59 #include "src/common/xsignal.h"
60 #include "src/common/xstring.h"
61
62 #include "allocate.h"
63 #include "opt.h"
64 #include "launch.h"
65
66 #define MAX_ALLOC_WAIT 60 /* seconds */
67 #define MIN_ALLOC_WAIT 5 /* seconds */
68 #define MAX_RETRIES 10
69 #define POLL_SLEEP 0.1 /* retry interval in seconds */
70
71 pthread_mutex_t msg_lock = PTHREAD_MUTEX_INITIALIZER;
72 pthread_cond_t msg_cond = PTHREAD_COND_INITIALIZER;
73 allocation_msg_thread_t *msg_thr = NULL;
74 struct pollfd global_fds[1];
75 uint16_t slurmctld_comm_port = 0;
76
77 extern char **environ;
78
79 static uint32_t pending_job_id = 0;
80
81 /*
82 * Static Prototypes
83 */
84 static job_desc_msg_t *_job_desc_msg_create_from_opts(slurm_opt_t *opt_local);
85 static void _set_pending_job_id(uint32_t job_id);
86 static void _signal_while_allocating(int signo);
87 static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc);
88
89 static sig_atomic_t destroy_job = 0;
90
_set_pending_job_id(uint32_t job_id)91 static void _set_pending_job_id(uint32_t job_id)
92 {
93 debug2("Pending job allocation %u", job_id);
94 pending_job_id = job_id;
95 }
96
_safe_signal_while_allocating(void * in_data)97 static void *_safe_signal_while_allocating(void *in_data)
98 {
99 int signo = *(int *)in_data;
100
101 debug("Got signal %d", signo);
102 xfree(in_data);
103 if (signo == SIGCONT)
104 return NULL;
105
106 destroy_job = 1;
107 if (pending_job_id != 0) {
108 info("Job allocation %u has been revoked", pending_job_id);
109 slurm_complete_job(pending_job_id, NO_VAL);
110 destroy_job = 1;
111 }
112
113 return NULL;
114 }
115
_signal_while_allocating(int signo)116 static void _signal_while_allocating(int signo)
117 {
118 int *local_signal;
119
120 /*
121 * There are places where _signal_while_allocating() can't be
122 * put into a thread, but if this isn't on a separate thread
123 * and we try to print something using the log functions and
124 * it just so happens to be in a poll or something we can get
125 * deadlock. So after the signal happens we are able to spawn
126 * a thread here and avoid the deadlock.
127 *
128 * SO, DON'T PRINT ANYTHING IN THIS FUNCTION.
129 */
130 local_signal = xmalloc(sizeof(int));
131 *local_signal = signo;
132 slurm_thread_create_detached(NULL, _safe_signal_while_allocating,
133 local_signal);
134 }
135
136 /* This typically signifies the job was cancelled by scancel */
_job_complete_handler(srun_job_complete_msg_t * msg)137 static void _job_complete_handler(srun_job_complete_msg_t *msg)
138 {
139 if (pending_job_id && (pending_job_id != msg->job_id)) {
140 error("Ignoring job_complete for job %u because our job ID is %u",
141 msg->job_id, pending_job_id);
142 return;
143 }
144
145 if (msg->step_id == NO_VAL)
146 info("Force Terminated job %u", msg->job_id);
147 else
148 info("Force Terminated job %u.%u", msg->job_id, msg->step_id);
149 }
150
151 /*
152 * Job has been notified of it's approaching time limit.
153 * Job will be killed shortly after timeout.
154 * This RPC can arrive multiple times with the same or updated timeouts.
155 * FIXME: We may want to signal the job or perform other action for this.
156 * FIXME: How much lead time do we want for this message? Some jobs may
157 * require tens of minutes to gracefully terminate.
158 */
_timeout_handler(srun_timeout_msg_t * msg)159 static void _timeout_handler(srun_timeout_msg_t *msg)
160 {
161 static time_t last_timeout = 0;
162
163 if (msg->timeout != last_timeout) {
164 last_timeout = msg->timeout;
165 verbose("job time limit to be reached at %s",
166 slurm_ctime2(&msg->timeout));
167 }
168 }
169
_user_msg_handler(srun_user_msg_t * msg)170 static void _user_msg_handler(srun_user_msg_t *msg)
171 {
172 info("%s", msg->msg);
173 }
174
_ping_handler(srun_ping_msg_t * msg)175 static void _ping_handler(srun_ping_msg_t *msg)
176 {
177 /* the api will respond so there really isn't anything to do
178 here */
179 }
180
_node_fail_handler(srun_node_fail_msg_t * msg)181 static void _node_fail_handler(srun_node_fail_msg_t *msg)
182 {
183 error("Node failure on %s", msg->nodelist);
184 }
185
186
187
_retry(void)188 static bool _retry(void)
189 {
190 static int retries = 0;
191 static char *msg = "Slurm controller not responding, "
192 "sleeping and retrying.";
193
194 if ((errno == ESLURM_ERROR_ON_DESC_TO_RECORD_COPY) || (errno == EAGAIN)) {
195 if (retries == 0)
196 error("%s", msg);
197 else if (retries < MAX_RETRIES)
198 debug("%s", msg);
199 else
200 return false;
201 sleep (++retries);
202 } else if (errno == EINTR) {
203 /* srun may be interrupted by the BLCR checkpoint signal */
204 /*
205 * XXX: this will cause the old job cancelled and a new
206 * job allocated
207 */
208 debug("Syscall interrupted while allocating resources, "
209 "retrying.");
210 return true;
211 } else if (opt.immediate &&
212 ((errno == ETIMEDOUT) || (errno == ESLURM_NODES_BUSY))) {
213 error("Unable to allocate resources: %s",
214 slurm_strerror(ESLURM_NODES_BUSY));
215 error_exit = immediate_exit;
216 return false;
217 } else if ((errno == SLURM_PROTOCOL_AUTHENTICATION_ERROR) ||
218 (errno == SLURM_UNEXPECTED_MSG_ERROR) ||
219 (errno == SLURM_PROTOCOL_INSANE_MSG_LENGTH)) {
220 static int external_msg_count = 0;
221 error("Srun communication socket apparently being written to "
222 "by something other than Slurm");
223 if (external_msg_count++ < 4)
224 return true;
225 error("Unable to allocate resources: %m");
226 return false;
227 } else {
228 error("Unable to allocate resources: %m");
229 return false;
230 }
231
232 return true;
233 }
234
235 /* returns 1 if job and nodes are ready for job to begin, 0 otherwise */
_wait_nodes_ready(resource_allocation_response_msg_t * alloc)236 static int _wait_nodes_ready(resource_allocation_response_msg_t *alloc)
237 {
238 int is_ready = 0, i, rc;
239 double cur_delay = 0;
240 double cur_sleep = 0;
241 int suspend_time, resume_time, max_delay;
242 bool job_killed = false;
243
244 suspend_time = slurm_get_suspend_timeout();
245 resume_time = slurm_get_resume_timeout();
246 if ((suspend_time == 0) || (resume_time == 0))
247 return 1; /* Power save mode disabled */
248 max_delay = suspend_time + resume_time;
249 max_delay *= 5; /* Allow for ResumeRate support */
250
251 pending_job_id = alloc->job_id;
252
253 for (i = 0; cur_delay < max_delay; i++) {
254 if (i) {
255 cur_sleep = POLL_SLEEP * i;
256 if (i == 1) {
257 verbose("Waiting for nodes to boot (delay looping %d times @ %f secs x index)",
258 max_delay, POLL_SLEEP);
259 } else {
260 debug("Waited %f sec and still waiting: next sleep for %f sec",
261 cur_delay, cur_sleep);
262 }
263 usleep(USEC_IN_SEC * cur_sleep);
264 cur_delay += cur_sleep;
265 }
266
267 rc = slurm_job_node_ready(alloc->job_id);
268 if (rc == READY_JOB_FATAL)
269 break; /* fatal error */
270 if ((rc == READY_JOB_ERROR) || (rc == EAGAIN))
271 continue; /* retry */
272 if ((rc & READY_JOB_STATE) == 0) { /* job killed */
273 job_killed = true;
274 break;
275 }
276 if (rc & READY_NODE_STATE) { /* job and node ready */
277 is_ready = 1;
278 break;
279 }
280 if (destroy_job)
281 break;
282 }
283 if (is_ready) {
284 resource_allocation_response_msg_t *resp;
285 char *tmp_str;
286 if (i > 0)
287 verbose("Nodes %s are ready for job", alloc->node_list);
288 if (alloc->alias_list && !xstrcmp(alloc->alias_list, "TBD") &&
289 (slurm_allocation_lookup(pending_job_id, &resp)
290 == SLURM_SUCCESS)) {
291 tmp_str = alloc->alias_list;
292 alloc->alias_list = resp->alias_list;
293 resp->alias_list = tmp_str;
294 slurm_free_resource_allocation_response_msg(resp);
295 }
296 } else if (!destroy_job) {
297 if (job_killed) {
298 error("Job allocation %u has been revoked",
299 alloc->job_id);
300 destroy_job = true;
301 } else
302 error("Nodes %s are still not ready", alloc->node_list);
303 } else /* allocation_interrupted and slurmctld not responing */
304 is_ready = 0;
305
306 pending_job_id = 0;
307
308 return is_ready;
309 }
310
_allocate_test(slurm_opt_t * opt_local)311 static int _allocate_test(slurm_opt_t *opt_local)
312 {
313 job_desc_msg_t *j;
314 int rc;
315
316 if ((j = _job_desc_msg_create_from_opts(opt_local)) == NULL)
317 return SLURM_ERROR;
318
319 if (opt_local->clusters &&
320 (slurmdb_get_first_avail_cluster(j, opt_local->clusters,
321 &working_cluster_rec)
322 != SLURM_SUCCESS)) {
323 print_db_notok(opt_local->clusters, 0);
324 return SLURM_ERROR;
325 }
326
327 rc = slurm_job_will_run(j);
328 job_desc_msg_destroy(j);
329 return rc;
330
331 }
332
allocate_test(void)333 extern int allocate_test(void)
334 {
335 int rc = SLURM_SUCCESS;
336 ListIterator iter;
337 slurm_opt_t *opt_local;
338
339 if (opt_list) {
340 iter = list_iterator_create(opt_list);
341 while ((opt_local = list_next(iter))) {
342 if ((rc = _allocate_test(opt_local)) != SLURM_SUCCESS)
343 break;
344 }
345 list_iterator_destroy(iter);
346 } else {
347 rc = _allocate_test(&opt);
348 }
349
350 return rc;
351 }
352
353 /*
354 * Allocate nodes from the slurm controller -- retrying the attempt
355 * if the controller appears to be down, and optionally waiting for
356 * resources if none are currently available (see opt.immediate)
357 *
358 * Returns a pointer to a resource_allocation_response_msg which must
359 * be freed with slurm_free_resource_allocation_response_msg()
360 */
361 extern resource_allocation_response_msg_t *
allocate_nodes(bool handle_signals,slurm_opt_t * opt_local)362 allocate_nodes(bool handle_signals, slurm_opt_t *opt_local)
363
364 {
365 srun_opt_t *srun_opt = opt_local->srun_opt;
366 resource_allocation_response_msg_t *resp = NULL;
367 job_desc_msg_t *j;
368 slurm_allocation_callbacks_t callbacks;
369 int i;
370
371 xassert(srun_opt);
372
373 if (srun_opt->relative != NO_VAL)
374 fatal("--relative option invalid for job allocation request");
375
376 if ((j = _job_desc_msg_create_from_opts(&opt)) == NULL)
377 return NULL;
378
379 if (opt_local->clusters &&
380 (slurmdb_get_first_avail_cluster(j, opt_local->clusters,
381 &working_cluster_rec)
382 != SLURM_SUCCESS)) {
383 print_db_notok(opt_local->clusters, 0);
384 return NULL;
385 }
386
387 j->origin_cluster = xstrdup(slurmctld_conf.cluster_name);
388
389 callbacks.ping = _ping_handler;
390 callbacks.timeout = _timeout_handler;
391 callbacks.job_complete = _job_complete_handler;
392 callbacks.job_suspend = NULL;
393 callbacks.user_msg = _user_msg_handler;
394 callbacks.node_fail = _node_fail_handler;
395
396 /* create message thread to handle pings and such from slurmctld */
397 msg_thr = slurm_allocation_msg_thr_create(&j->other_port, &callbacks);
398
399 /* NOTE: Do not process signals in separate pthread. The signal will
400 * cause slurm_allocate_resources_blocking() to exit immediately. */
401 if (handle_signals) {
402 xsignal_unblock(sig_array);
403 for (i = 0; sig_array[i]; i++)
404 xsignal(sig_array[i], _signal_while_allocating);
405 }
406
407 while (!resp) {
408 resp = slurm_allocate_resources_blocking(j,
409 opt_local->immediate,
410 _set_pending_job_id);
411 if (destroy_job) {
412 /* cancelled by signal */
413 break;
414 } else if (!resp && !_retry()) {
415 break;
416 }
417 }
418
419 if (resp)
420 print_multi_line_string(resp->job_submit_user_msg,
421 -1, LOG_LEVEL_INFO);
422
423 if (resp && !destroy_job) {
424 /*
425 * Allocation granted!
426 */
427 pending_job_id = resp->job_id;
428
429 /*
430 * These values could be changed while the job was
431 * pending so overwrite the request with what was
432 * allocated so we don't have issues when we use them
433 * in the step creation.
434 */
435 opt_local->pn_min_memory = NO_VAL64;
436 opt_local->mem_per_cpu = NO_VAL64;
437 if (resp->pn_min_memory != NO_VAL64) {
438 if (resp->pn_min_memory & MEM_PER_CPU) {
439 opt_local->mem_per_cpu = (resp->pn_min_memory &
440 (~MEM_PER_CPU));
441 } else {
442 opt_local->pn_min_memory = resp->pn_min_memory;
443 }
444 }
445
446 opt_local->min_nodes = resp->node_cnt;
447 opt_local->max_nodes = resp->node_cnt;
448
449 if (resp->working_cluster_rec)
450 slurm_setup_remote_working_cluster(resp);
451
452 if (!_wait_nodes_ready(resp)) {
453 if (!destroy_job)
454 error("Something is wrong with the boot of the nodes.");
455 goto relinquish;
456 }
457 } else if (destroy_job) {
458 goto relinquish;
459 }
460
461 if (handle_signals)
462 xsignal_block(sig_array);
463
464 job_desc_msg_destroy(j);
465
466 return resp;
467
468 relinquish:
469 if (resp) {
470 if (!destroy_job)
471 slurm_complete_job(resp->job_id, 1);
472 slurm_free_resource_allocation_response_msg(resp);
473 }
474 exit(error_exit);
475 return NULL;
476 }
477
478 /*
479 * Allocate nodes for heterogeneous job from the slurm controller --
480 * retrying the attempt if the controller appears to be down, and optionally
481 * waiting for resources if none are currently available (see opt.immediate)
482 *
483 * Returns a pointer to a resource_allocation_response_msg which must
484 * be freed with slurm_free_resource_allocation_response_msg()
485 */
allocate_het_job_nodes(bool handle_signals)486 List allocate_het_job_nodes(bool handle_signals)
487 {
488 resource_allocation_response_msg_t *resp = NULL;
489 job_desc_msg_t *j, *first_job = NULL;
490 slurm_allocation_callbacks_t callbacks;
491 ListIterator opt_iter, resp_iter;
492 slurm_opt_t *opt_local, *first_opt = NULL;
493 List job_req_list = NULL, job_resp_list = NULL;
494 uint32_t my_job_id = 0;
495 int i, k;
496
497 job_req_list = list_create(NULL);
498 opt_iter = list_iterator_create(opt_list);
499 while ((opt_local = list_next(opt_iter))) {
500 srun_opt_t *srun_opt = opt_local->srun_opt;
501 xassert(srun_opt);
502 if (!first_opt)
503 first_opt = opt_local;
504 if (srun_opt->relative != NO_VAL)
505 fatal("--relative option invalid for job allocation request");
506
507 if ((j = _job_desc_msg_create_from_opts(opt_local)) == NULL) {
508 FREE_NULL_LIST(job_req_list);
509 return NULL;
510 }
511 if (!first_job)
512 first_job = j;
513
514 j->origin_cluster = xstrdup(slurmctld_conf.cluster_name);
515
516 list_append(job_req_list, j);
517 }
518 list_iterator_destroy(opt_iter);
519
520 if (!first_job) {
521 error("%s: No job requests found", __func__);
522 FREE_NULL_LIST(job_req_list);
523 return NULL;
524 }
525
526 if (first_opt && first_opt->clusters &&
527 (slurmdb_get_first_het_job_cluster(job_req_list,
528 first_opt->clusters,
529 &working_cluster_rec)
530 != SLURM_SUCCESS)) {
531 print_db_notok(first_opt->clusters, 0);
532 FREE_NULL_LIST(job_req_list);
533 return NULL;
534 }
535
536 callbacks.ping = _ping_handler;
537 callbacks.timeout = _timeout_handler;
538 callbacks.job_complete = _job_complete_handler;
539 callbacks.job_suspend = NULL;
540 callbacks.user_msg = _user_msg_handler;
541 callbacks.node_fail = _node_fail_handler;
542
543 /* create message thread to handle pings and such from slurmctld */
544 msg_thr = slurm_allocation_msg_thr_create(&first_job->other_port,
545 &callbacks);
546
547 /* NOTE: Do not process signals in separate pthread. The signal will
548 * cause slurm_allocate_resources_blocking() to exit immediately. */
549 if (handle_signals) {
550 xsignal_unblock(sig_array);
551 for (i = 0; sig_array[i]; i++)
552 xsignal(sig_array[i], _signal_while_allocating);
553 }
554
555 while (first_opt && !job_resp_list) {
556 job_resp_list = slurm_allocate_het_job_blocking(job_req_list,
557 first_opt->immediate, _set_pending_job_id);
558 if (destroy_job) {
559 /* cancelled by signal */
560 break;
561 } else if (!job_resp_list && !_retry()) {
562 break;
563 }
564 }
565 FREE_NULL_LIST(job_req_list);
566
567 if (job_resp_list && !destroy_job) {
568 /*
569 * Allocation granted!
570 */
571
572 opt_iter = list_iterator_create(opt_list);
573 resp_iter = list_iterator_create(job_resp_list);
574 while ((opt_local = list_next(opt_iter))) {
575 resp = (resource_allocation_response_msg_t *)
576 list_next(resp_iter);
577 if (!resp)
578 break;
579
580 if (pending_job_id == 0)
581 pending_job_id = resp->job_id;
582 if (my_job_id == 0) {
583 my_job_id = resp->job_id;
584 i = list_count(opt_list);
585 k = list_count(job_resp_list);
586 if (i != k) {
587 error("%s: request count != response count (%d != %d)",
588 __func__, i, k);
589 goto relinquish;
590 }
591 }
592
593 /*
594 * These values could be changed while the job was
595 * pending so overwrite the request with what was
596 * allocated so we don't have issues when we use them
597 * in the step creation.
598 */
599 if (opt_local->pn_min_memory != NO_VAL64)
600 opt_local->pn_min_memory =
601 (resp->pn_min_memory & (~MEM_PER_CPU));
602 else if (opt_local->mem_per_cpu != NO_VAL64)
603 opt_local->mem_per_cpu =
604 (resp->pn_min_memory & (~MEM_PER_CPU));
605
606 opt_local->min_nodes = resp->node_cnt;
607 opt_local->max_nodes = resp->node_cnt;
608
609 if (resp->working_cluster_rec)
610 slurm_setup_remote_working_cluster(resp);
611
612 if (!_wait_nodes_ready(resp)) {
613 if (!destroy_job)
614 error("Something is wrong with the "
615 "boot of the nodes.");
616 goto relinquish;
617 }
618 }
619 list_iterator_destroy(resp_iter);
620 list_iterator_destroy(opt_iter);
621 } else if (destroy_job) {
622 goto relinquish;
623 }
624
625 if (handle_signals)
626 xsignal_block(sig_array);
627
628 return job_resp_list;
629
630 relinquish:
631 if (job_resp_list) {
632 if (!destroy_job && my_job_id)
633 slurm_complete_job(my_job_id, 1);
634 list_destroy(job_resp_list);
635 }
636 exit(error_exit);
637 return NULL;
638 }
639
640 void
ignore_signal(int signo)641 ignore_signal(int signo)
642 {
643 /* do nothing */
644 }
645
646 int
cleanup_allocation(void)647 cleanup_allocation(void)
648 {
649 slurm_allocation_msg_thr_destroy(msg_thr);
650 return SLURM_SUCCESS;
651 }
652
existing_allocation(void)653 extern List existing_allocation(void)
654 {
655 uint32_t old_job_id;
656 List job_resp_list = NULL;
657
658 if (sropt.jobid == NO_VAL)
659 return NULL;
660
661 if (opt.clusters) {
662 List clusters = NULL;
663 if (!(clusters = slurmdb_get_info_cluster(opt.clusters))) {
664 print_db_notok(opt.clusters, 0);
665 exit(1);
666 }
667 working_cluster_rec = list_peek(clusters);
668 debug2("Looking for job %d on cluster %s (addr: %s)",
669 sropt.jobid,
670 working_cluster_rec->name,
671 working_cluster_rec->control_host);
672 }
673
674 old_job_id = (uint32_t) sropt.jobid;
675 if (slurm_het_job_lookup(old_job_id, &job_resp_list) < 0) {
676 if (sropt.parallel_debug)
677 return NULL; /* create new allocation as needed */
678 if (errno == ESLURM_ALREADY_DONE)
679 error("Slurm job %u has expired", old_job_id);
680 else
681 error("Unable to confirm allocation for job %u: %m",
682 old_job_id);
683 info("Check SLURM_JOB_ID environment variable. Expired or invalid job %u",
684 old_job_id);
685 exit(error_exit);
686 }
687
688 return job_resp_list;
689 }
690
691 /* Set up port to handle messages from slurmctld */
slurmctld_msg_init(void)692 int slurmctld_msg_init(void)
693 {
694 slurm_addr_t slurm_address;
695 static int slurmctld_fd = -1;
696 uint16_t *ports;
697
698 if (slurmctld_fd >= 0) /* May set early for queued job allocation */
699 return slurmctld_fd;
700
701 if ((ports = slurm_get_srun_port_range()))
702 slurmctld_fd = slurm_init_msg_engine_ports(ports);
703 else
704 slurmctld_fd = slurm_init_msg_engine_port(0);
705
706 if (slurmctld_fd < 0) {
707 error("slurm_init_msg_engine_port error %m");
708 exit(error_exit);
709 }
710
711 if (slurm_get_stream_addr(slurmctld_fd, &slurm_address) < 0) {
712 error("slurm_get_stream_addr error %m");
713 exit(error_exit);
714 }
715 fd_set_nonblocking(slurmctld_fd);
716 /* hostname is not set, so slurm_get_addr fails
717 slurm_get_addr(&slurm_address, &port, hostname, sizeof(hostname)); */
718 slurmctld_comm_port = ntohs(slurm_address.sin_port);
719 debug2("srun PMI messages to port=%u", slurmctld_comm_port);
720
721 return slurmctld_fd;
722 }
723
724 /*
725 * Create job description structure based off srun options
726 * (see opt.h)
727 */
_job_desc_msg_create_from_opts(slurm_opt_t * opt_local)728 static job_desc_msg_t *_job_desc_msg_create_from_opts(slurm_opt_t *opt_local)
729 {
730 srun_opt_t *srun_opt = opt_local->srun_opt;
731 job_desc_msg_t *j = xmalloc(sizeof(*j));
732 hostlist_t hl = NULL;
733 xassert(srun_opt);
734
735 slurm_init_job_desc_msg(j);
736 j->contiguous = opt_local->contiguous;
737 if (opt_local->core_spec != NO_VAL16)
738 j->core_spec = opt_local->core_spec;
739 j->features = opt_local->constraint;
740 j->cluster_features = opt_local->c_constraint;
741 if (opt_local->immediate == 1)
742 j->immediate = opt_local->immediate;
743 if (opt_local->job_name)
744 j->name = opt_local->job_name;
745 else
746 j->name = srun_opt->cmd_name;
747 if (srun_opt->argc > 0) {
748 j->argc = 1;
749 j->argv = (char **) xmalloc(sizeof(char *) * 2);
750 j->argv[0] = xstrdup(srun_opt->argv[0]);
751 }
752 if (opt_local->acctg_freq)
753 j->acctg_freq = xstrdup(opt_local->acctg_freq);
754 j->reservation = opt_local->reservation;
755 j->wckey = opt_local->wckey;
756 j->x11 = opt.x11;
757 if (j->x11) {
758 j->x11_magic_cookie = xstrdup(opt.x11_magic_cookie);
759 j->x11_target = xstrdup(opt.x11_target);
760 j->x11_target_port = opt.x11_target_port;
761 }
762
763 j->req_nodes = xstrdup(opt_local->nodelist);
764
765 /* simplify the job allocation nodelist,
766 * not laying out tasks until step */
767 if (j->req_nodes) {
768 hl = hostlist_create(j->req_nodes);
769 xfree(opt_local->nodelist);
770 opt_local->nodelist = hostlist_ranged_string_xmalloc(hl);
771 hostlist_uniq(hl);
772 xfree(j->req_nodes);
773 j->req_nodes = hostlist_ranged_string_xmalloc(hl);
774 hostlist_destroy(hl);
775
776 }
777
778 if (((opt_local->distribution & SLURM_DIST_STATE_BASE) ==
779 SLURM_DIST_ARBITRARY) && !j->req_nodes) {
780 error("With Arbitrary distribution you need to "
781 "specify a nodelist or hostfile with the -w option");
782 return NULL;
783 }
784 j->extra = opt_local->extra;
785 j->exc_nodes = opt_local->exclude;
786 j->partition = opt_local->partition;
787 j->min_nodes = opt_local->min_nodes;
788 if (opt_local->sockets_per_node != NO_VAL)
789 j->sockets_per_node = opt_local->sockets_per_node;
790 if (opt_local->cores_per_socket != NO_VAL)
791 j->cores_per_socket = opt_local->cores_per_socket;
792 if (opt_local->threads_per_core != NO_VAL) {
793 j->threads_per_core = opt_local->threads_per_core;
794 /* if 1 always make sure affinity knows about it */
795 if (j->threads_per_core == 1)
796 srun_opt->cpu_bind_type |= CPU_BIND_ONE_THREAD_PER_CORE;
797 }
798 j->user_id = opt_local->uid;
799 j->dependency = opt_local->dependency;
800 if (opt_local->nice != NO_VAL)
801 j->nice = NICE_OFFSET + opt_local->nice;
802 if (opt_local->priority)
803 j->priority = opt_local->priority;
804 if (srun_opt->cpu_bind)
805 j->cpu_bind = srun_opt->cpu_bind;
806 if (srun_opt->cpu_bind_type)
807 j->cpu_bind_type = srun_opt->cpu_bind_type;
808 if (opt_local->delay_boot != NO_VAL)
809 j->delay_boot = opt_local->delay_boot;
810 if (opt_local->mem_bind)
811 j->mem_bind = opt_local->mem_bind;
812 if (opt_local->mem_bind_type)
813 j->mem_bind_type = opt_local->mem_bind_type;
814 if (opt_local->plane_size != NO_VAL)
815 j->plane_size = opt_local->plane_size;
816 j->task_dist = opt_local->distribution;
817
818 j->group_id = opt_local->gid;
819 j->mail_type = opt_local->mail_type;
820
821 if (opt_local->ntasks_per_node != NO_VAL)
822 j->ntasks_per_node = opt_local->ntasks_per_node;
823 if (opt_local->ntasks_per_socket != NO_VAL)
824 j->ntasks_per_socket = opt_local->ntasks_per_socket;
825 if (opt_local->ntasks_per_core != NO_VAL)
826 j->ntasks_per_core = opt_local->ntasks_per_core;
827
828 if (opt_local->mail_user)
829 j->mail_user = opt_local->mail_user;
830 if (opt_local->burst_buffer)
831 j->burst_buffer = opt_local->burst_buffer;
832 if (opt_local->begin)
833 j->begin_time = opt_local->begin;
834 if (opt_local->deadline)
835 j->deadline = opt_local->deadline;
836 if (opt_local->licenses)
837 j->licenses = opt_local->licenses;
838 if (opt_local->network)
839 j->network = opt_local->network;
840 if (opt_local->profile)
841 j->profile = opt_local->profile;
842 if (opt_local->account)
843 j->account = opt_local->account;
844 if (opt_local->comment)
845 j->comment = opt_local->comment;
846 if (opt_local->qos)
847 j->qos = opt_local->qos;
848 if (opt_local->chdir)
849 j->work_dir = opt_local->chdir;
850
851 if (opt_local->hold)
852 j->priority = 0;
853 if (opt_local->reboot)
854 j->reboot = 1;
855
856 if (opt_local->max_nodes)
857 j->max_nodes = opt_local->max_nodes;
858 else if (opt_local->nodes_set) {
859 /* On an allocation if the max nodes isn't set set it
860 * to do the same behavior as with salloc or sbatch.
861 */
862 j->max_nodes = opt_local->min_nodes;
863 }
864 if (opt_local->pn_min_cpus > -1)
865 j->pn_min_cpus = opt_local->pn_min_cpus;
866 if (opt_local->pn_min_memory != NO_VAL64)
867 j->pn_min_memory = opt_local->pn_min_memory;
868 else if (opt_local->mem_per_cpu != NO_VAL64)
869 j->pn_min_memory = opt_local->mem_per_cpu | MEM_PER_CPU;
870 if (opt_local->pn_min_tmp_disk != NO_VAL64)
871 j->pn_min_tmp_disk = opt_local->pn_min_tmp_disk;
872 if (opt_local->overcommit) {
873 j->min_cpus = opt_local->min_nodes;
874 j->overcommit = opt_local->overcommit;
875 } else if (opt_local->cpus_set)
876 j->min_cpus = opt_local->ntasks * opt_local->cpus_per_task;
877 else
878 j->min_cpus = opt_local->ntasks;
879 if (opt_local->ntasks_set)
880 j->num_tasks = opt_local->ntasks;
881
882 if (opt_local->cpus_set)
883 j->cpus_per_task = opt_local->cpus_per_task;
884
885 if (opt_local->no_kill)
886 j->kill_on_node_fail = 0;
887 if (opt_local->time_limit != NO_VAL)
888 j->time_limit = opt_local->time_limit;
889 if (opt_local->time_min != NO_VAL)
890 j->time_min = opt_local->time_min;
891 if (opt_local->shared != NO_VAL16)
892 j->shared = opt_local->shared;
893
894 if (opt_local->warn_flags)
895 j->warn_flags = opt_local->warn_flags;
896 if (opt_local->warn_signal)
897 j->warn_signal = opt_local->warn_signal;
898 if (opt_local->warn_time)
899 j->warn_time = opt_local->warn_time;
900 if (opt_local->job_flags)
901 j->bitflags = opt_local->job_flags;
902
903 if (opt_local->cpu_freq_min != NO_VAL)
904 j->cpu_freq_min = opt_local->cpu_freq_min;
905 if (opt_local->cpu_freq_max != NO_VAL)
906 j->cpu_freq_max = opt_local->cpu_freq_max;
907 if (opt_local->cpu_freq_gov != NO_VAL)
908 j->cpu_freq_gov = opt_local->cpu_freq_gov;
909
910 if (opt_local->req_switch >= 0)
911 j->req_switch = opt_local->req_switch;
912 if (opt_local->wait4switch >= 0)
913 j->wait4switch = opt_local->wait4switch;
914
915 /* srun uses the same listening port for the allocation response
916 * message as all other messages */
917 j->alloc_resp_port = slurmctld_comm_port;
918 j->other_port = slurmctld_comm_port;
919
920 if (opt_local->spank_job_env_size) {
921 j->spank_job_env = opt_local->spank_job_env;
922 j->spank_job_env_size = opt_local->spank_job_env_size;
923 }
924
925 j->power_flags = opt_local->power;
926 if (opt_local->mcs_label)
927 j->mcs_label = opt_local->mcs_label;
928 j->wait_all_nodes = 1;
929
930 /* If can run on multiple clusters find the earliest run time
931 * and run it there */
932 j->clusters = xstrdup(opt_local->clusters);
933
934 if (opt_local->cpus_per_gpu)
935 xstrfmtcat(j->cpus_per_tres, "gpu:%d", opt_local->cpus_per_gpu);
936 j->tres_bind = xstrdup(opt_local->tres_bind);
937 j->tres_freq = xstrdup(opt_local->tres_freq);
938 xfmt_tres(&j->tres_per_job, "gpu", opt_local->gpus);
939 xfmt_tres(&j->tres_per_node, "gpu", opt_local->gpus_per_node);
940 if (opt_local->gres && xstrcasecmp(opt_local->gres, "NONE")) {
941 if (j->tres_per_node)
942 xstrfmtcat(j->tres_per_node, ",%s", opt_local->gres);
943 else
944 j->tres_per_node = xstrdup(opt_local->gres);
945 }
946 xfmt_tres(&j->tres_per_socket, "gpu", opt_local->gpus_per_socket);
947 xfmt_tres(&j->tres_per_task, "gpu", opt_local->gpus_per_task);
948 if (opt_local->mem_per_gpu != NO_VAL64)
949 xstrfmtcat(j->mem_per_tres, "gpu:%"PRIu64,
950 opt_local->mem_per_gpu);
951
952 return j;
953 }
954
955 void
job_desc_msg_destroy(job_desc_msg_t * j)956 job_desc_msg_destroy(job_desc_msg_t *j)
957 {
958 if (j) {
959 xfree(j->req_nodes);
960 xfree(j);
961 }
962 }
963
create_job_step(srun_job_t * job,bool use_all_cpus,slurm_opt_t * opt_local)964 extern int create_job_step(srun_job_t *job, bool use_all_cpus,
965 slurm_opt_t *opt_local)
966 {
967 return launch_g_create_job_step(job, use_all_cpus,
968 _signal_while_allocating,
969 &destroy_job, opt_local);
970 }
971