1 /*****************************************************************************\
2 * job_scheduler.c - manage the scheduling of pending jobs in priority order
3 * Note there is a global job list (job_list)
4 *****************************************************************************
5 * Copyright (C) 2002-2007 The Regents of the University of California.
6 * Copyright (C) 2008-2010 Lawrence Livermore National Security.
7 * Portions Copyright (C) 2010-2017 SchedMD <https://www.schedmd.com>.
8 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
9 * Written by Morris Jette <jette1@llnl.gov>
10 * CODE-OCEC-09-009. All rights reserved.
11 *
12 * This file is part of Slurm, a resource management program.
13 * For details, see <https://slurm.schedmd.com/>.
14 * Please also read the included file: DISCLAIMER.
15 *
16 * Slurm is free software; you can redistribute it and/or modify it under
17 * the terms of the GNU General Public License as published by the Free
18 * Software Foundation; either version 2 of the License, or (at your option)
19 * any later version.
20 *
21 * In addition, as a special exception, the copyright holders give permission
22 * to link the code of portions of this program with the OpenSSL library under
23 * certain conditions as described in each individual source file, and
24 * distribute linked combinations including the two. You must obey the GNU
25 * General Public License in all respects for all of the code used other than
26 * OpenSSL. If you modify file(s) with this exception, you may extend this
27 * exception to your version of the file(s), but you are not obligated to do
28 * so. If you do not wish to do so, delete this exception statement from your
29 * version. If you delete this exception statement from all source files in
30 * the program, then also delete it here.
31 *
32 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
33 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
34 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
35 * details.
36 *
37 * You should have received a copy of the GNU General Public License along
38 * with Slurm; if not, write to the Free Software Foundation, Inc.,
39 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
40 \*****************************************************************************/
41
42 #include "config.h"
43
44 #include <ctype.h>
45 #include <errno.h>
46 #include <poll.h>
47 #include <signal.h> /* for SIGKILL */
48 #include <stdio.h>
49 #include <stdlib.h>
50 #include <string.h>
51 #include <unistd.h>
52
53 #if HAVE_SYS_PRCTL_H
54 # include <sys/prctl.h>
55 #endif
56
57 #include "src/common/assoc_mgr.h"
58 #include "src/common/env.h"
59 #include "src/common/gres.h"
60 #include "src/common/group_cache.h"
61 #include "src/common/layouts_mgr.h"
62 #include "src/common/list.h"
63 #include "src/common/macros.h"
64 #include "src/common/node_features.h"
65 #include "src/common/node_select.h"
66 #include "src/common/prep.h"
67 #include "src/common/power.h"
68 #include "src/common/slurm_accounting_storage.h"
69 #include "src/common/slurm_acct_gather.h"
70 #include "src/common/strlcpy.h"
71 #include "src/common/parse_time.h"
72 #include "src/common/timers.h"
73 #include "src/common/track_script.h"
74 #include "src/common/uid.h"
75 #include "src/common/xassert.h"
76 #include "src/common/xstring.h"
77
78 #include "src/slurmctld/acct_policy.h"
79 #include "src/slurmctld/agent.h"
80 #include "src/slurmctld/burst_buffer.h"
81 #include "src/slurmctld/fed_mgr.h"
82 #include "src/slurmctld/front_end.h"
83 #include "src/slurmctld/gang.h"
84 #include "src/slurmctld/locks.h"
85 #include "src/slurmctld/job_scheduler.h"
86 #include "src/slurmctld/licenses.h"
87 #include "src/slurmctld/locks.h"
88 #include "src/slurmctld/node_scheduler.h"
89 #include "src/slurmctld/power_save.h"
90 #include "src/slurmctld/preempt.h"
91 #include "src/slurmctld/proc_req.h"
92 #include "src/slurmctld/reservation.h"
93 #include "src/slurmctld/slurmctld.h"
94 #include "src/slurmctld/srun_comm.h"
95 #include "src/slurmctld/state_save.h"
96 #include "src/slurmctld/powercapping.h"
97
98 #ifndef CORRESPOND_ARRAY_TASK_CNT
99 # define CORRESPOND_ARRAY_TASK_CNT 10
100 #endif
101 #define BUILD_TIMEOUT 2000000 /* Max build_job_queue() run time in usec */
102 #define MAX_FAILED_RESV 10
103
104 typedef struct wait_boot_arg {
105 uint32_t job_id;
106 bitstr_t *node_bitmap;
107 } wait_boot_arg_t;
108
109 static batch_job_launch_msg_t *_build_launch_job_msg(job_record_t *job_ptr,
110 uint16_t protocol_version);
111 static void _job_queue_append(List job_queue, job_record_t *job_ptr,
112 part_record_t *part_ptr, uint32_t priority);
113 static bool _job_runnable_test1(job_record_t *job_ptr, bool clear_start);
114 static bool _job_runnable_test2(job_record_t *job_ptr, bool check_min_time);
115 static bool _scan_depend(List dependency_list, uint32_t job_id);
116 static void * _sched_agent(void *args);
117 static int _schedule(uint32_t job_limit);
118 static int _valid_batch_features(job_record_t *job_ptr, bool can_reboot);
119 static int _valid_feature_list(job_record_t *job_ptr, bool can_reboot);
120 static int _valid_node_feature(char *feature, bool can_reboot);
121 #ifndef HAVE_FRONT_END
122 static void * _wait_boot(void *arg);
123 #endif
124 static int build_queue_timeout = BUILD_TIMEOUT;
125 static int save_last_part_update = 0;
126
127 static pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER;
128 static int sched_pend_thread = 0;
129 static bool sched_running = false;
130 static struct timeval sched_last = {0, 0};
131 static uint32_t max_array_size = NO_VAL;
132 static bool bf_hetjob_immediate = false;
133 static uint16_t bf_hetjob_prio = 0;
134 static int sched_min_interval = 2;
135
136 static int bb_array_stage_cnt = 10;
137 extern diag_stats_t slurmctld_diag_stats;
138
_find_singleton_job(void * x,void * key)139 static int _find_singleton_job (void *x, void *key)
140 {
141 struct job_record *qjob_ptr = (struct job_record *) x;
142 struct job_record *job_ptr = (struct job_record *) key;
143
144 xassert (qjob_ptr->magic == JOB_MAGIC);
145
146 /*
147 * get user jobs with the same user and name
148 */
149 if (qjob_ptr->user_id != job_ptr->user_id)
150 return 0;
151 if (qjob_ptr->name && job_ptr->name &&
152 xstrcmp(qjob_ptr->name, job_ptr->name))
153 return 0;
154 /*
155 * already running/suspended job or previously
156 * submitted pending job
157 */
158 if (IS_JOB_RUNNING(qjob_ptr) || IS_JOB_SUSPENDED(qjob_ptr) ||
159 (IS_JOB_PENDING(qjob_ptr) &&
160 (qjob_ptr->job_id < job_ptr->job_id))) {
161 return 1;
162 }
163
164 return 0;
165 }
166
167 /*
168 * Calculate how busy the system is by figuring out how busy each node is.
169 */
_get_system_usage(void)170 static double _get_system_usage(void)
171 {
172 static double sys_usage_per = 0.0;
173 static time_t last_idle_update = 0;
174 static uint16_t priority_flags = 0;
175 static bool statics_inited = false;
176
177 if (!statics_inited) {
178 priority_flags = slurm_get_priority_flags();
179 statics_inited = true;
180 }
181
182 if (last_idle_update < last_node_update) {
183 int i;
184 double alloc_tres = 0;
185 double tot_tres = 0;
186
187 select_g_select_nodeinfo_set_all();
188
189 for (i = 0; i < node_record_count; i++) {
190 node_record_t *node_ptr =
191 &node_record_table_ptr[i];
192 double node_alloc_tres = 0.0;
193 double node_tot_tres = 0.0;
194
195 select_g_select_nodeinfo_get(
196 node_ptr->select_nodeinfo,
197 SELECT_NODEDATA_TRES_ALLOC_WEIGHTED,
198 NODE_STATE_ALLOCATED, &node_alloc_tres);
199
200 node_tot_tres =
201 assoc_mgr_tres_weighted(
202 node_ptr->tres_cnt,
203 node_ptr->config_ptr->tres_weights,
204 priority_flags, false);
205
206 alloc_tres += node_alloc_tres;
207 tot_tres += node_tot_tres;
208 }
209 last_idle_update = last_node_update;
210
211 if (tot_tres)
212 sys_usage_per = (alloc_tres / tot_tres) * 100;
213 }
214
215 return sys_usage_per;
216 }
217
_job_queue_append(List job_queue,job_record_t * job_ptr,part_record_t * part_ptr,uint32_t prio)218 static void _job_queue_append(List job_queue, job_record_t *job_ptr,
219 part_record_t *part_ptr, uint32_t prio)
220 {
221 job_queue_req_t job_queue_req = { .job_ptr = job_ptr,
222 .job_queue = job_queue,
223 .part_ptr = part_ptr,
224 .prio = prio };
225
226 job_queue_append_internal(&job_queue_req);
227
228 if (job_ptr->resv_name)
229 return;
230
231 job_resv_append_promiscuous(&job_queue_req);
232 }
233
234 /* Return true if the job has some step still in a cleaning state, which
235 * can happen on a Cray if a job is requeued and the step NHC is still running
236 * after the requeued job is eligible to run again */
_is_step_cleaning(job_record_t * job_ptr)237 static uint16_t _is_step_cleaning(job_record_t *job_ptr)
238 {
239 ListIterator step_iterator;
240 step_record_t *step_ptr;
241 uint16_t cleaning = 0;
242
243 step_iterator = list_iterator_create(job_ptr->step_list);
244 while ((step_ptr = list_next(step_iterator))) {
245 /* Only check if not a pending step */
246 if (step_ptr->step_id != SLURM_PENDING_STEP) {
247 select_g_select_jobinfo_get(step_ptr->select_jobinfo,
248 SELECT_JOBDATA_CLEANING,
249 &cleaning);
250 if (cleaning)
251 break;
252 }
253 }
254 list_iterator_destroy(step_iterator);
255
256 return cleaning;
257 }
258
259 /* Job test for ability to run now, excludes partition specific tests */
_job_runnable_test1(job_record_t * job_ptr,bool sched_plugin)260 static bool _job_runnable_test1(job_record_t *job_ptr, bool sched_plugin)
261 {
262 bool job_indepen = false;
263 uint16_t cleaning = 0;
264 time_t now = time(NULL);
265
266 xassert(job_ptr->magic == JOB_MAGIC);
267 if (!IS_JOB_PENDING(job_ptr) || IS_JOB_COMPLETING(job_ptr))
268 return false;
269
270 if (IS_JOB_REVOKED(job_ptr))
271 return false;
272
273 select_g_select_jobinfo_get(job_ptr->select_jobinfo,
274 SELECT_JOBDATA_CLEANING,
275 &cleaning);
276 if (!cleaning)
277 cleaning = _is_step_cleaning(job_ptr);
278 if (cleaning ||
279 (job_ptr->details && job_ptr->details->prolog_running) ||
280 (job_ptr->step_list && list_count(job_ptr->step_list))) {
281 /* Job's been requeued and the
282 * previous run hasn't finished yet */
283 job_ptr->state_reason = WAIT_CLEANING;
284 xfree(job_ptr->state_desc);
285 sched_debug3("%pJ. State=PENDING. Reason=Cleaning.", job_ptr);
286 return false;
287 }
288
289 #ifdef HAVE_FRONT_END
290 /* At least one front-end node up at this point */
291 if (job_ptr->state_reason == WAIT_FRONT_END) {
292 job_ptr->state_reason = WAIT_NO_REASON;
293 xfree(job_ptr->state_desc);
294 last_job_update = now;
295 }
296 #endif
297
298 job_indepen = job_independent(job_ptr);
299 if (sched_plugin)
300 job_ptr->start_time = (time_t) 0;
301 if (job_ptr->priority == 0) { /* held */
302 if (job_ptr->state_reason != FAIL_BAD_CONSTRAINTS
303 && (job_ptr->state_reason != WAIT_RESV_DELETED)
304 && (job_ptr->state_reason != FAIL_BURST_BUFFER_OP)
305 && (job_ptr->state_reason != WAIT_HELD)
306 && (job_ptr->state_reason != WAIT_HELD_USER)
307 && job_ptr->state_reason != WAIT_MAX_REQUEUE) {
308 job_ptr->state_reason = WAIT_HELD;
309 xfree(job_ptr->state_desc);
310 last_job_update = now;
311 }
312 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
313 job_ptr,
314 job_state_string(job_ptr->job_state),
315 job_reason_string(job_ptr->state_reason),
316 job_ptr->priority);
317 return false;
318 }
319
320 if (!job_indepen &&
321 ((job_ptr->state_reason == WAIT_HELD) ||
322 (job_ptr->state_reason == WAIT_HELD_USER))) {
323 /* released behind active dependency? */
324 job_ptr->state_reason = WAIT_DEPENDENCY;
325 xfree(job_ptr->state_desc);
326 }
327
328 if (!job_indepen) /* can not run now */
329 return false;
330
331 return true;
332 }
333
334 /*
335 * Job and partition tests for ability to run now
336 * IN job_ptr - job to test
337 * IN check_min_time - If set, test job's minimum time limit
338 * otherwise test maximum time limit
339 */
_job_runnable_test2(job_record_t * job_ptr,bool check_min_time)340 static bool _job_runnable_test2(job_record_t *job_ptr, bool check_min_time)
341 {
342 int reason;
343
344 reason = job_limits_check(&job_ptr, check_min_time);
345 if ((reason != job_ptr->state_reason) &&
346 ((reason != WAIT_NO_REASON) ||
347 (!part_policy_job_runnable_state(job_ptr)))) {
348 job_ptr->state_reason = reason;
349 xfree(job_ptr->state_desc);
350 }
351 if (reason != WAIT_NO_REASON)
352 return false;
353 return true;
354 }
355
356 /*
357 * Job, reservation and partition tests for ability to run now.
358 * If a job is submitted to multiple partitions, don't consider partitions
359 * on which the job would not fit given the current set of nodes in the
360 * reservation.
361 * IN job_ptr - job to test
362 * IN part_ptr - partition to test
363 */
_job_runnable_test3(job_record_t * job_ptr,part_record_t * part_ptr)364 static bool _job_runnable_test3(job_record_t *job_ptr, part_record_t *part_ptr)
365 {
366 if (job_ptr->resv_ptr && job_ptr->resv_ptr->node_bitmap &&
367 part_ptr && part_ptr->node_bitmap &&
368 (bit_overlap(job_ptr->resv_ptr->node_bitmap, part_ptr->node_bitmap)
369 < job_ptr->node_cnt_wag))
370 return false;
371 return true;
372 }
373
job_queue_rec_prom_resv(job_queue_rec_t * job_queue_rec)374 extern void job_queue_rec_prom_resv(job_queue_rec_t *job_queue_rec)
375 {
376 job_record_t *job_ptr;
377
378 if (!job_queue_rec->resv_ptr)
379 return;
380
381 xassert(job_queue_rec->job_ptr);
382 xassert(!job_queue_rec->job_ptr->resv_name);
383
384 job_ptr = job_queue_rec->job_ptr;
385 job_ptr->resv_ptr = job_queue_rec->resv_ptr;
386 job_ptr->resv_name = xstrdup(job_ptr->resv_ptr->name);
387 job_ptr->resv_id = job_ptr->resv_ptr->resv_id;
388 job_queue_rec->job_ptr->bit_flags |= JOB_PROM;
389 }
390
391 /*
392 * build_job_queue - build (non-priority ordered) list of pending jobs
393 * IN clear_start - if set then clear the start_time for pending jobs,
394 * true when called from sched/backfill or sched/builtin
395 * IN backfill - true if running backfill scheduler, enforce min time limit
396 * RET the job queue
397 * NOTE: the caller must call FREE_NULL_LIST() on RET value to free memory
398 */
build_job_queue(bool clear_start,bool backfill)399 extern List build_job_queue(bool clear_start, bool backfill)
400 {
401 static time_t last_log_time = 0;
402 List job_queue;
403 ListIterator depend_iter, job_iterator, part_iterator;
404 job_record_t *job_ptr = NULL, *new_job_ptr;
405 part_record_t *part_ptr;
406 depend_spec_t *dep_ptr;
407 int i, pend_cnt, reason, dep_corr;
408 struct timeval start_tv = {0, 0};
409 int tested_jobs = 0;
410 int job_part_pairs = 0;
411 time_t now = time(NULL);
412
413 /* init the timer */
414 (void) slurm_delta_tv(&start_tv);
415 job_queue = list_create(xfree_ptr);
416
417 /* Create individual job records for job arrays that need burst buffer
418 * staging */
419 job_iterator = list_iterator_create(job_list);
420 while ((job_ptr = list_next(job_iterator))) {
421 if (!IS_JOB_PENDING(job_ptr) ||
422 !job_ptr->burst_buffer || !job_ptr->array_recs ||
423 !job_ptr->array_recs->task_id_bitmap ||
424 (job_ptr->array_task_id != NO_VAL))
425 continue;
426
427 if ((i = bit_ffs(job_ptr->array_recs->task_id_bitmap)) < 0)
428 continue;
429 pend_cnt = num_pending_job_array_tasks(job_ptr->array_job_id);
430 if (pend_cnt >= bb_array_stage_cnt)
431 continue;
432 if (job_ptr->array_recs->task_cnt < 1)
433 continue;
434 if (job_ptr->array_recs->task_cnt == 1) {
435 job_ptr->array_task_id = i;
436 (void) job_array_post_sched(job_ptr);
437 if (job_ptr->details && job_ptr->details->dependency &&
438 job_ptr->details->depend_list)
439 fed_mgr_submit_remote_dependencies(job_ptr,
440 false,
441 false);
442 continue;
443 }
444 job_ptr->array_task_id = i;
445 new_job_ptr = job_array_split(job_ptr);
446 if (new_job_ptr) {
447 debug("%s: Split out %pJ for burst buffer use",
448 __func__, job_ptr);
449 new_job_ptr->job_state = JOB_PENDING;
450 new_job_ptr->start_time = (time_t) 0;
451 /* Do NOT clear db_index here, it is handled when
452 * task_id_str is created elsewhere */
453 (void) bb_g_job_validate2(job_ptr, NULL);
454 } else {
455 error("%s: Unable to copy record for %pJ",
456 __func__, job_ptr);
457 }
458 }
459 list_iterator_destroy(job_iterator);
460
461 /* Create individual job records for job arrays with
462 * depend_type == SLURM_DEPEND_AFTER_CORRESPOND */
463 job_iterator = list_iterator_create(job_list);
464 while ((job_ptr = list_next(job_iterator))) {
465 if (!IS_JOB_PENDING(job_ptr) ||
466 !job_ptr->array_recs ||
467 !job_ptr->array_recs->task_id_bitmap ||
468 (job_ptr->array_task_id != NO_VAL))
469 continue;
470 if ((i = bit_ffs(job_ptr->array_recs->task_id_bitmap)) < 0)
471 continue;
472 if ((job_ptr->details == NULL) ||
473 (job_ptr->details->depend_list == NULL) ||
474 (list_count(job_ptr->details->depend_list) == 0))
475 continue;
476 depend_iter = list_iterator_create(
477 job_ptr->details->depend_list);
478 dep_corr = 0;
479 while ((dep_ptr = list_next(depend_iter))) {
480 if (dep_ptr->depend_type ==
481 SLURM_DEPEND_AFTER_CORRESPOND) {
482 dep_corr = 1;
483 break;
484 }
485 }
486 if (!dep_corr)
487 continue;
488 pend_cnt = num_pending_job_array_tasks(job_ptr->array_job_id);
489 if (pend_cnt >= CORRESPOND_ARRAY_TASK_CNT)
490 continue;
491 if (job_ptr->array_recs->task_cnt < 1)
492 continue;
493 if (job_ptr->array_recs->task_cnt == 1) {
494 job_ptr->array_task_id = i;
495 (void) job_array_post_sched(job_ptr);
496 if (job_ptr->details && job_ptr->details->dependency &&
497 job_ptr->details->depend_list)
498 fed_mgr_submit_remote_dependencies(job_ptr,
499 false,
500 false);
501 continue;
502 }
503 job_ptr->array_task_id = i;
504 new_job_ptr = job_array_split(job_ptr);
505 if (new_job_ptr) {
506 info("%s: Split out %pJ for SLURM_DEPEND_AFTER_CORRESPOND use",
507 __func__, job_ptr);
508 new_job_ptr->job_state = JOB_PENDING;
509 new_job_ptr->start_time = (time_t) 0;
510 /* Do NOT clear db_index here, it is handled when
511 * task_id_str is created elsewhere */
512 } else {
513 error("%s: Unable to copy record for %pJ",
514 __func__, job_ptr);
515 }
516 }
517 list_iterator_destroy(job_iterator);
518
519 job_iterator = list_iterator_create(job_list);
520 while ((job_ptr = list_next(job_iterator))) {
521 if (IS_JOB_PENDING(job_ptr)) {
522 set_job_failed_assoc_qos_ptr(job_ptr);
523 acct_policy_handle_accrue_time(job_ptr, false);
524 }
525
526 if (((tested_jobs % 100) == 0) &&
527 (slurm_delta_tv(&start_tv) >= build_queue_timeout)) {
528 if (difftime(now, last_log_time) > 600) {
529 /* Log at most once every 10 minutes */
530 info("%s has run for %d usec, exiting with %d "
531 "of %d jobs tested, %d job-partition "
532 "pairs added",
533 __func__, build_queue_timeout, tested_jobs,
534 list_count(job_list), job_part_pairs);
535 last_log_time = now;
536 }
537 break;
538 }
539 tested_jobs++;
540 job_ptr->preempt_in_progress = false; /* initialize */
541 if (job_ptr->array_recs)
542 job_ptr->array_recs->pend_run_tasks = 0;
543 if (job_ptr->state_reason != WAIT_NO_REASON) {
544 job_ptr->state_reason_prev = job_ptr->state_reason;
545 if ((job_ptr->state_reason != WAIT_PRIORITY) &&
546 (job_ptr->state_reason != WAIT_RESOURCES))
547 job_ptr->state_reason_prev_db =
548 job_ptr->state_reason;
549 last_job_update = now;
550 } else if ((job_ptr->state_reason_prev == WAIT_TIME) &&
551 job_ptr->details &&
552 (job_ptr->details->begin_time <= now)) {
553 job_ptr->state_reason_prev = job_ptr->state_reason;
554 if ((job_ptr->state_reason != WAIT_PRIORITY) &&
555 (job_ptr->state_reason != WAIT_RESOURCES))
556 job_ptr->state_reason_prev_db =
557 job_ptr->state_reason;
558 last_job_update = now;
559 }
560 if (!_job_runnable_test1(job_ptr, clear_start))
561 continue;
562
563 if (job_ptr->part_ptr_list) {
564 int inx = -1;
565 part_iterator = list_iterator_create(
566 job_ptr->part_ptr_list);
567 while ((part_ptr = list_next(part_iterator))) {
568 job_ptr->part_ptr = part_ptr;
569 reason = job_limits_check(&job_ptr, backfill);
570 if ((reason != WAIT_NO_REASON) &&
571 (reason != job_ptr->state_reason)) {
572 job_ptr->state_reason = reason;
573 xfree(job_ptr->state_desc);
574 last_job_update = now;
575 }
576 /* priority_array index matches part_ptr_list
577 * position: increment inx */
578 inx++;
579 if (reason != WAIT_NO_REASON)
580 continue;
581 job_part_pairs++;
582 if (job_ptr->priority_array) {
583 _job_queue_append(job_queue, job_ptr,
584 part_ptr,
585 job_ptr->
586 priority_array[inx]);
587 } else {
588 _job_queue_append(job_queue, job_ptr,
589 part_ptr,
590 job_ptr->priority);
591 }
592 }
593 list_iterator_destroy(part_iterator);
594 } else {
595 if (job_ptr->part_ptr == NULL) {
596 part_ptr = find_part_record(job_ptr->partition);
597 if (part_ptr == NULL) {
598 error("Could not find partition %s for %pJ",
599 job_ptr->partition, job_ptr);
600 continue;
601 }
602 job_ptr->part_ptr = part_ptr;
603 error("partition pointer reset for %pJ, part %s",
604 job_ptr, job_ptr->partition);
605 }
606 if (!_job_runnable_test2(job_ptr, backfill))
607 continue;
608 job_part_pairs++;
609 _job_queue_append(job_queue, job_ptr,
610 job_ptr->part_ptr, job_ptr->priority);
611 }
612 }
613 list_iterator_destroy(job_iterator);
614
615 return job_queue;
616 }
617
618 /*
619 * job_is_completing - Determine if jobs are in the process of completing.
620 * IN/OUT eff_cg_bitmap - optional bitmap of all relevent completing nodes,
621 * relevenace determined by filtering via CompleteWait
622 * if NULL, function will terminate at first completing
623 * job
624 * RET - True of any job is in the process of completing AND
625 * CompleteWait is configured non-zero
626 * NOTE: This function can reduce resource fragmentation, which is a
627 * critical issue on Elan interconnect based systems.
628 */
job_is_completing(bitstr_t * eff_cg_bitmap)629 extern bool job_is_completing(bitstr_t *eff_cg_bitmap)
630 {
631 bool completing = false;
632 ListIterator job_iterator;
633 job_record_t *job_ptr = NULL;
634 uint16_t complete_wait = slurm_get_complete_wait();
635 time_t recent;
636
637 if ((job_list == NULL) || (complete_wait == 0))
638 return completing;
639
640 recent = time(NULL) - complete_wait;
641 job_iterator = list_iterator_create(job_list);
642 while ((job_ptr = list_next(job_iterator))) {
643 if (IS_JOB_COMPLETING(job_ptr) &&
644 (job_ptr->end_time >= recent)) {
645 completing = true;
646 /* can return after finding first completing job so long
647 * as a map of nodes in partitions affected by
648 * completing jobs is not required */
649 if (!eff_cg_bitmap)
650 break;
651 else if (job_ptr->part_ptr)
652 bit_or(eff_cg_bitmap,
653 job_ptr->part_ptr->node_bitmap);
654 }
655 }
656 list_iterator_destroy(job_iterator);
657
658 return completing;
659 }
660
661 /*
662 * set_job_elig_time - set the eligible time for pending jobs once their
663 * dependencies are lifted (in job->details->begin_time)
664 */
set_job_elig_time(void)665 extern void set_job_elig_time(void)
666 {
667 job_record_t *job_ptr = NULL;
668 part_record_t *part_ptr = NULL;
669 ListIterator job_iterator;
670 slurmctld_lock_t job_write_lock =
671 { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK };
672 time_t now = time(NULL);
673
674 lock_slurmctld(job_write_lock);
675 job_iterator = list_iterator_create(job_list);
676 while ((job_ptr = list_next(job_iterator))) {
677 part_ptr = job_ptr->part_ptr;
678 if (!IS_JOB_PENDING(job_ptr))
679 continue;
680 if (part_ptr == NULL)
681 continue;
682 if ((job_ptr->details == NULL) ||
683 (job_ptr->details->begin_time > now))
684 continue;
685 if ((part_ptr->state_up & PARTITION_SCHED) == 0)
686 continue;
687 if ((job_ptr->time_limit != NO_VAL) &&
688 (job_ptr->time_limit > part_ptr->max_time))
689 continue;
690 if ((job_ptr->details->max_nodes != 0) &&
691 ((job_ptr->details->max_nodes < part_ptr->min_nodes) ||
692 (job_ptr->details->min_nodes > part_ptr->max_nodes)))
693 continue;
694 /* Job's eligible time is set in job_independent() */
695 if (!job_independent(job_ptr))
696 continue;
697 }
698 list_iterator_destroy(job_iterator);
699 unlock_slurmctld(job_write_lock);
700 }
701
702 /* Test of part_ptr can still run jobs or if its nodes have
703 * already been reserved by higher priority jobs (those in
704 * the failed_parts array) */
_failed_partition(part_record_t * part_ptr,part_record_t ** failed_parts,int failed_part_cnt)705 static bool _failed_partition(part_record_t *part_ptr,
706 part_record_t **failed_parts,
707 int failed_part_cnt)
708 {
709 int i;
710
711 for (i = 0; i < failed_part_cnt; i++) {
712 if (failed_parts[i] == part_ptr)
713 return true;
714 }
715 return false;
716 }
717
_do_diag_stats(long delta_t)718 static void _do_diag_stats(long delta_t)
719 {
720 if (delta_t > slurmctld_diag_stats.schedule_cycle_max)
721 slurmctld_diag_stats.schedule_cycle_max = delta_t;
722
723 slurmctld_diag_stats.schedule_cycle_sum += delta_t;
724 slurmctld_diag_stats.schedule_cycle_last = delta_t;
725 slurmctld_diag_stats.schedule_cycle_counter++;
726 }
727
728 /* Return true of all partitions have the same priority, otherwise false. */
_all_partition_priorities_same(void)729 static bool _all_partition_priorities_same(void)
730 {
731 part_record_t *part_ptr;
732 ListIterator iter;
733 bool part_priority_set = false;
734 uint32_t part_priority = 0;
735 bool result = true;
736
737 iter = list_iterator_create(part_list);
738 while ((part_ptr = list_next(iter))) {
739 if (!part_priority_set) {
740 part_priority = part_ptr->priority_tier;
741 part_priority_set = true;
742 } else if (part_priority != part_ptr->priority_tier) {
743 result = false;
744 break;
745 }
746 }
747 list_iterator_destroy(iter);
748
749 return result;
750 }
751
752 /*
753 * schedule - attempt to schedule all pending jobs
754 * pending jobs for each partition will be scheduled in priority
755 * order until a request fails
756 * IN job_limit - maximum number of jobs to test now, avoid testing the full
757 * queue on every job submit (0 means to use the system default,
758 * SchedulerParameters for default_queue_depth)
759 * RET count of jobs scheduled
760 * Note: If the scheduler has executed recently, rather than executing again
761 * right away, a thread will be spawned to execute later in an effort
762 * to reduce system overhead.
763 * Note: We re-build the queue every time. Jobs can not only be added
764 * or removed from the queue, but have their priority or partition
765 * changed with the update_job RPC. In general nodes will be in priority
766 * order (by submit time), so the sorting should be pretty fast.
767 * Note: job_write_lock must be unlocked before calling this.
768 */
schedule(uint32_t job_limit)769 extern int schedule(uint32_t job_limit)
770 {
771 static uint32_t sched_job_limit = NO_VAL;
772 int job_count = 0;
773 struct timeval now;
774 long delta_t;
775
776 if (slurmctld_config.scheduling_disabled)
777 return 0;
778
779 gettimeofday(&now, NULL);
780 if (sched_last.tv_sec == 0) {
781 delta_t = sched_min_interval;
782 } else if (sched_running) {
783 delta_t = 0;
784 } else {
785 delta_t = (now.tv_sec - sched_last.tv_sec) * USEC_IN_SEC;
786 delta_t += now.tv_usec - sched_last.tv_usec;
787 }
788
789 if (job_limit == NO_VAL)
790 job_limit = 0; /* use system default */
791 slurm_mutex_lock(&sched_mutex);
792 if (sched_job_limit == INFINITE)
793 ; /* leave unlimited */
794 else if (job_limit == INFINITE)
795 sched_job_limit = INFINITE; /* set unlimited */
796 else if (sched_job_limit == NO_VAL)
797 sched_job_limit = job_limit; /* set initial value */
798 else
799 sched_job_limit += job_limit; /* test more jobs */
800
801 if (delta_t >= sched_min_interval) {
802 /* Temporarily set time in the future until we get the real
803 * scheduler completion time */
804 sched_last.tv_sec = now.tv_sec;
805 sched_last.tv_usec = now.tv_usec;
806 sched_running = true;
807 job_limit = sched_job_limit;
808 sched_job_limit = NO_VAL;
809 slurm_mutex_unlock(&sched_mutex);
810
811 job_count = _schedule(job_limit);
812
813 slurm_mutex_lock(&sched_mutex);
814 gettimeofday(&now, NULL);
815 sched_last.tv_sec = now.tv_sec;
816 sched_last.tv_usec = now.tv_usec;
817 sched_running = false;
818 slurm_mutex_unlock(&sched_mutex);
819 } else if (sched_pend_thread == 0) {
820 /* We don't want to run now, but also don't want to defer
821 * this forever, so spawn a thread to run later */
822 sched_pend_thread = 1;
823 slurm_thread_create_detached(NULL, _sched_agent, NULL);
824 slurm_mutex_unlock(&sched_mutex);
825 } else {
826 /* Nothing to do, agent already pending */
827 slurm_mutex_unlock(&sched_mutex);
828 }
829
830 return job_count;
831 }
832
833 /* Thread used to possibly start job scheduler later, if nothing else does */
_sched_agent(void * args)834 static void *_sched_agent(void *args)
835 {
836 long delta_t;
837 struct timeval now;
838 useconds_t usec;
839 int job_cnt;
840
841 usec = sched_min_interval / 2;
842 usec = MIN(usec, 1000000);
843 usec = MAX(usec, 10000);
844
845 /* Keep waiting until scheduler() can really run */
846 while (!slurmctld_config.shutdown_time) {
847 usleep(usec);
848 if (sched_running)
849 continue;
850 gettimeofday(&now, NULL);
851 delta_t = (now.tv_sec - sched_last.tv_sec) * USEC_IN_SEC;
852 delta_t += now.tv_usec - sched_last.tv_usec;
853 if (delta_t >= sched_min_interval)
854 break;
855 }
856
857 job_cnt = schedule(0);
858 slurm_mutex_lock(&sched_mutex);
859 sched_pend_thread = 0;
860 slurm_mutex_unlock(&sched_mutex);
861 if (job_cnt) {
862 /* jobs were started, save state */
863 schedule_node_save(); /* Has own locking */
864 schedule_job_save(); /* Has own locking */
865 }
866
867 return NULL;
868 }
869
870 /* Determine if job's deadline specification is still valid, kill job if not
871 * job_ptr IN - Job to test
872 * func IN - function named used for logging, "sched" or "backfill"
873 * RET - true of valid, false if invalid and job cancelled
874 */
deadline_ok(job_record_t * job_ptr,char * func)875 extern bool deadline_ok(job_record_t *job_ptr, char *func)
876 {
877 time_t now;
878 char time_str_deadline[32];
879 bool fail_job = false;
880 time_t inter;
881
882 now = time(NULL);
883 if ((job_ptr->time_min) && (job_ptr->time_min != NO_VAL)) {
884 inter = now + job_ptr->time_min * 60;
885 if (job_ptr->deadline < inter) {
886 slurm_make_time_str(&job_ptr->deadline,
887 time_str_deadline,
888 sizeof(time_str_deadline));
889 info("%s: %pJ with time_min %u exceeded deadline %s and cancelled",
890 __func__, job_ptr, job_ptr->time_min,
891 time_str_deadline);
892 fail_job = true;
893 }
894 } else if ((job_ptr->time_limit != NO_VAL) &&
895 (job_ptr->time_limit != INFINITE)) {
896 inter = now + job_ptr->time_limit * 60;
897 if (job_ptr->deadline < inter) {
898 slurm_make_time_str(&job_ptr->deadline,
899 time_str_deadline,
900 sizeof(time_str_deadline));
901 info("%s: %pJ with time_limit %u exceeded deadline %s and cancelled",
902 __func__, job_ptr, job_ptr->time_limit,
903 time_str_deadline);
904 fail_job = true;
905 }
906 }
907 if (fail_job) {
908 last_job_update = now;
909 job_ptr->job_state = JOB_DEADLINE;
910 job_ptr->exit_code = 1;
911 job_ptr->state_reason = FAIL_DEADLINE;
912 xfree(job_ptr->state_desc);
913 job_ptr->start_time = now;
914 job_ptr->end_time = now;
915 srun_allocate_abort(job_ptr);
916 job_completion_logger(job_ptr, false);
917 return false;
918 }
919 return true;
920 }
921
922 /*
923 * When an array job is rejected for some reason, the remaining array tasks will
924 * get skipped by both the main scheduler and the backfill scheduler (it's an
925 * optimization). Hence, their reasons should match the reason of the first job.
926 * This function sets those reasons.
927 *
928 * job_ptr (IN) The current job being evaluated, after it has gone
929 * through the scheduling loop.
930 * reject_array_job (IN) A pointer to the first job (array task) in the most
931 * recently rejected array job. If job_ptr belongs to the
932 * same array as reject_array_job, then set job_ptr's
933 * reason to match reject_array_job.
934 */
fill_array_reasons(job_record_t * job_ptr,job_record_t * reject_array_job)935 extern void fill_array_reasons(job_record_t *job_ptr,
936 job_record_t *reject_array_job)
937 {
938 if (!reject_array_job || !reject_array_job->array_job_id)
939 return;
940
941 if (job_ptr == reject_array_job)
942 return;
943
944 /*
945 * If the current job is part of the rejected job array...
946 * And if the reason isn't properly set yet...
947 */
948 if ((job_ptr->array_job_id == reject_array_job->array_job_id) &&
949 (job_ptr->state_reason != reject_array_job->state_reason)) {
950 /* Set the reason for the subsequent array task */
951 xfree(job_ptr->state_desc);
952 job_ptr->state_reason = reject_array_job->state_reason;
953 debug3("%s: Setting reason of array task %pJ to %s",
954 __func__, job_ptr,
955 job_reason_string(job_ptr->state_reason));
956 }
957 }
958
job_queue_append_internal(job_queue_req_t * job_queue_req)959 extern void job_queue_append_internal(job_queue_req_t *job_queue_req)
960 {
961 job_queue_rec_t *job_queue_rec;
962
963 xassert(job_queue_req);
964 xassert(job_queue_req->job_ptr);
965 xassert(job_queue_req->job_queue);
966 xassert(job_queue_req->part_ptr);
967
968 job_queue_rec = xmalloc(sizeof(job_queue_rec_t));
969 job_queue_rec->array_task_id = job_queue_req->job_ptr->array_task_id;
970 job_queue_rec->job_id = job_queue_req->job_ptr->job_id;
971 job_queue_rec->job_ptr = job_queue_req->job_ptr;
972 job_queue_rec->part_ptr = job_queue_req->part_ptr;
973 job_queue_rec->priority = job_queue_req->prio;
974 job_queue_rec->resv_ptr = job_queue_req->resv_ptr;
975 list_append(job_queue_req->job_queue, job_queue_rec);
976 }
977
_schedule(uint32_t job_limit)978 static int _schedule(uint32_t job_limit)
979 {
980 ListIterator job_iterator = NULL, part_iterator = NULL;
981 List job_queue = NULL;
982 int failed_part_cnt = 0, failed_resv_cnt = 0, job_cnt = 0;
983 int error_code, i, j, part_cnt, time_limit, pend_time;
984 uint32_t job_depth = 0, array_task_id;
985 job_queue_rec_t *job_queue_rec;
986 job_record_t *job_ptr = NULL;
987 part_record_t *part_ptr, **failed_parts = NULL, *skip_part_ptr = NULL;
988 struct slurmctld_resv **failed_resv = NULL;
989 bitstr_t *save_avail_node_bitmap;
990 part_record_t **sched_part_ptr = NULL;
991 int *sched_part_jobs = NULL, bb_wait_cnt = 0;
992 /* Locks: Read config, write job, write node, read partition */
993 slurmctld_lock_t job_write_lock =
994 { READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
995 bool is_job_array_head;
996 static time_t sched_update = 0;
997 static bool fifo_sched = false;
998 static bool assoc_limit_stop = false;
999 static int sched_timeout = 0;
1000 static int sched_max_job_start = 0;
1001 static int bf_min_age_reserve = 0;
1002 static uint32_t bf_min_prio_reserve = 0;
1003 static int def_job_limit = 100;
1004 static int max_jobs_per_part = 0;
1005 static int defer_rpc_cnt = 0;
1006 static bool reduce_completing_frag = false;
1007 time_t now, last_job_sched_start, sched_start;
1008 job_record_t *reject_array_job = NULL;
1009 part_record_t *reject_array_part = NULL;
1010 bool fail_by_part, wait_on_resv;
1011 uint32_t deadline_time_limit, save_time_limit = 0;
1012 uint32_t prio_reserve;
1013 #if HAVE_SYS_PRCTL_H
1014 char get_name[16];
1015 #endif
1016 DEF_TIMERS;
1017
1018 if (slurmctld_config.shutdown_time)
1019 return 0;
1020
1021 #if HAVE_SYS_PRCTL_H
1022 if (prctl(PR_GET_NAME, get_name, NULL, NULL, NULL) < 0) {
1023 error("%s: cannot get my name %m", __func__);
1024 strlcpy(get_name, "slurmctld", sizeof(get_name));
1025 }
1026 if (prctl(PR_SET_NAME, "sched", NULL, NULL, NULL) < 0) {
1027 error("%s: cannot set my name to %s %m", __func__, "sched");
1028 }
1029 #endif
1030
1031 if (sched_update != slurmctld_conf.last_update) {
1032 char *tmp_ptr;
1033 char *sched_params = slurm_get_sched_params();
1034 char *sched_type = slurm_get_sched_type();
1035 char *prio_type = slurm_get_priority_type();
1036 if ((xstrcmp(sched_type, "sched/builtin") == 0) &&
1037 (xstrcmp(prio_type, "priority/basic") == 0) &&
1038 _all_partition_priorities_same())
1039 fifo_sched = true;
1040 else
1041 fifo_sched = false;
1042 xfree(sched_type);
1043 xfree(prio_type);
1044
1045 if (xstrcasestr(sched_params, "assoc_limit_stop"))
1046 assoc_limit_stop = true;
1047 else
1048 assoc_limit_stop = false;
1049
1050 if ((tmp_ptr = xstrcasestr(sched_params,
1051 "batch_sched_delay="))) {
1052 batch_sched_delay = atoi(tmp_ptr + 18);
1053 if (batch_sched_delay < 0) {
1054 error("Invalid batch_sched_delay: %d",
1055 batch_sched_delay);
1056 batch_sched_delay = 3;
1057 }
1058 } else {
1059 batch_sched_delay = 3;
1060 }
1061
1062 bb_array_stage_cnt = 10;
1063 if ((tmp_ptr = xstrcasestr(sched_params,
1064 "bb_array_stage_cnt="))) {
1065 int task_cnt = atoi(tmp_ptr + 19);
1066 if (task_cnt > 0)
1067 bb_array_stage_cnt = task_cnt;
1068 }
1069
1070 bf_min_age_reserve = 0;
1071 if ((tmp_ptr = xstrcasestr(sched_params,
1072 "bf_min_age_reserve="))) {
1073 int min_age = atoi(tmp_ptr + 19);
1074 if (min_age > 0)
1075 bf_min_age_reserve = min_age;
1076 }
1077
1078 bf_min_prio_reserve = 0;
1079 if ((tmp_ptr = xstrcasestr(sched_params,
1080 "bf_min_prio_reserve="))) {
1081 int64_t min_prio = (int64_t) atoll(tmp_ptr + 20);
1082 if (min_prio > 0)
1083 bf_min_prio_reserve = (uint32_t) min_prio;
1084 }
1085
1086 if ((tmp_ptr = xstrcasestr(sched_params,
1087 "build_queue_timeout="))) {
1088 build_queue_timeout = atoi(tmp_ptr + 20);
1089 if (build_queue_timeout < 100) {
1090 error("Invalid build_queue_time: %d",
1091 build_queue_timeout);
1092 build_queue_timeout = BUILD_TIMEOUT;
1093 }
1094 } else {
1095 build_queue_timeout = BUILD_TIMEOUT;
1096 }
1097
1098 if ((tmp_ptr = xstrcasestr(sched_params,
1099 "default_queue_depth="))) {
1100 def_job_limit = atoi(tmp_ptr + 20);
1101 if (def_job_limit < 0) {
1102 error("ignoring SchedulerParameters: "
1103 "default_queue_depth value of %d",
1104 def_job_limit);
1105 def_job_limit = 100;
1106 }
1107 } else {
1108 def_job_limit = 100;
1109 }
1110
1111 bf_hetjob_prio = 0;
1112 if (sched_params &&
1113 (tmp_ptr = strstr(sched_params, "bf_hetjob_prio="))) {
1114 tmp_ptr = strtok(tmp_ptr + 15, ",");
1115 if (!xstrcasecmp(tmp_ptr, "min"))
1116 bf_hetjob_prio |= HETJOB_PRIO_MIN;
1117 else if (!xstrcasecmp(tmp_ptr, "max"))
1118 bf_hetjob_prio |= HETJOB_PRIO_MAX;
1119 else if (!xstrcasecmp(tmp_ptr, "avg"))
1120 bf_hetjob_prio |= HETJOB_PRIO_AVG;
1121 else
1122 error("Invalid SchedulerParameters bf_hetjob_prio: %s",
1123 tmp_ptr);
1124 }
1125
1126 bf_hetjob_immediate = false;
1127 if (xstrcasestr(sched_params, "bf_hetjob_immediate"))
1128 bf_hetjob_immediate = true;
1129
1130 if (bf_hetjob_immediate && !bf_hetjob_prio) {
1131 bf_hetjob_prio |= HETJOB_PRIO_MIN;
1132 info("bf_hetjob_immediate automatically sets bf_hetjob_prio=min");
1133 }
1134
1135 if ((tmp_ptr = xstrcasestr(sched_params,
1136 "partition_job_depth="))) {
1137 max_jobs_per_part = atoi(tmp_ptr + 20);
1138 if (max_jobs_per_part < 0) {
1139 error("ignoring SchedulerParameters: "
1140 "partition_job_depth value of %d",
1141 max_jobs_per_part);
1142 max_jobs_per_part = 0;
1143 }
1144 } else {
1145 max_jobs_per_part = 0;
1146 }
1147
1148 if (xstrcasestr(sched_params, "reduce_completing_frag"))
1149 reduce_completing_frag = true;
1150 else
1151 reduce_completing_frag = false;
1152
1153 if ((tmp_ptr = xstrcasestr(sched_params, "max_rpc_cnt=")))
1154 defer_rpc_cnt = atoi(tmp_ptr + 12);
1155 else if ((tmp_ptr = xstrcasestr(sched_params,
1156 "max_rpc_count=")))
1157 defer_rpc_cnt = atoi(tmp_ptr + 14);
1158 else
1159 defer_rpc_cnt = 0;
1160 if (defer_rpc_cnt < 0) {
1161 error("Invalid max_rpc_cnt: %d", defer_rpc_cnt);
1162 defer_rpc_cnt = 0;
1163 }
1164
1165 time_limit = slurm_get_msg_timeout() / 2;
1166 if ((tmp_ptr = xstrcasestr(sched_params, "max_sched_time="))) {
1167 sched_timeout = atoi(tmp_ptr + 15);
1168 if ((sched_timeout <= 0) ||
1169 (sched_timeout > time_limit)) {
1170 error("Invalid max_sched_time: %d",
1171 sched_timeout);
1172 sched_timeout = 0;
1173 }
1174 } else {
1175 sched_timeout = 0;
1176 }
1177 if (sched_timeout == 0) {
1178 sched_timeout = MAX(time_limit, 1);
1179 sched_timeout = MIN(sched_timeout, 2);
1180 }
1181
1182 if ((tmp_ptr = xstrcasestr(sched_params, "sched_interval="))) {
1183 sched_interval = atoi(tmp_ptr + 15);
1184 if (sched_interval < 0) {
1185 error("Invalid sched_interval: %d",
1186 sched_interval);
1187 sched_interval = 60;
1188 }
1189 } else {
1190 sched_interval = 60;
1191 }
1192
1193 if ((tmp_ptr = xstrcasestr(sched_params,
1194 "sched_min_interval="))) {
1195 i = atoi(tmp_ptr + 19);
1196 if (i < 0)
1197 error("Invalid sched_min_interval: %d", i);
1198 else
1199 sched_min_interval = i;
1200 } else {
1201 sched_min_interval = 2;
1202 }
1203
1204 if ((tmp_ptr = xstrcasestr(sched_params,
1205 "sched_max_job_start="))) {
1206 sched_max_job_start = atoi(tmp_ptr + 20);
1207 if (sched_max_job_start < 0) {
1208 error("Invalid sched_max_job_start: %d",
1209 sched_max_job_start);
1210 sched_max_job_start = 0;
1211 }
1212 } else {
1213 sched_max_job_start = 0;
1214 }
1215
1216 xfree(sched_params);
1217 sched_update = slurmctld_conf.last_update;
1218 info("SchedulerParameters=default_queue_depth=%d,"
1219 "max_rpc_cnt=%d,max_sched_time=%d,partition_job_depth=%d,"
1220 "sched_max_job_start=%d,sched_min_interval=%d",
1221 def_job_limit, defer_rpc_cnt, sched_timeout,
1222 max_jobs_per_part, sched_max_job_start,
1223 sched_min_interval);
1224 }
1225
1226 slurm_mutex_lock(&slurmctld_config.thread_count_lock);
1227 if ((defer_rpc_cnt > 0) &&
1228 (slurmctld_config.server_thread_count >= defer_rpc_cnt)) {
1229 sched_debug("schedule() returning, too many RPCs");
1230 slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
1231 goto out;
1232 }
1233 slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
1234
1235 if (!fed_mgr_sibs_synced()) {
1236 sched_info("schedule() returning, federation siblings not synced yet");
1237 goto out;
1238 }
1239
1240 if (job_limit == 0)
1241 job_limit = def_job_limit;
1242
1243 lock_slurmctld(job_write_lock);
1244 now = time(NULL);
1245 sched_start = now;
1246 last_job_sched_start = now;
1247 START_TIMER;
1248 if (!avail_front_end(NULL)) {
1249 ListIterator job_iterator = list_iterator_create(job_list);
1250 while ((job_ptr = list_next(job_iterator))) {
1251 if (!IS_JOB_PENDING(job_ptr))
1252 continue;
1253 if ((job_ptr->state_reason != WAIT_NO_REASON) &&
1254 (job_ptr->state_reason != WAIT_RESOURCES) &&
1255 (job_ptr->state_reason != WAIT_POWER_NOT_AVAIL) &&
1256 (job_ptr->state_reason != WAIT_POWER_RESERVED) &&
1257 (job_ptr->state_reason != WAIT_NODE_NOT_AVAIL))
1258 continue;
1259 job_ptr->state_reason = WAIT_FRONT_END;
1260 xfree(job_ptr->state_desc);
1261 }
1262 list_iterator_destroy(job_iterator);
1263
1264 unlock_slurmctld(job_write_lock);
1265 sched_debug("schedule() returning, no front end nodes are available");
1266 goto out;
1267 }
1268
1269 if (!reduce_completing_frag && job_is_completing(NULL)) {
1270 unlock_slurmctld(job_write_lock);
1271 sched_debug("schedule() returning, some job is still completing");
1272 goto out;
1273 }
1274
1275 part_cnt = list_count(part_list);
1276 failed_parts = xcalloc(part_cnt, sizeof(part_record_t *));
1277 failed_resv = xmalloc(sizeof(struct slurmctld_resv*) * MAX_FAILED_RESV);
1278 save_avail_node_bitmap = bit_copy(avail_node_bitmap);
1279 bit_or(avail_node_bitmap, rs_node_bitmap);
1280
1281 /* Avoid resource fragmentation if important */
1282 if (reduce_completing_frag) {
1283 bitstr_t *eff_cg_bitmap = bit_alloc(node_record_count);
1284 if (job_is_completing(eff_cg_bitmap)) {
1285 ListIterator part_iterator;
1286 part_record_t *part_ptr = NULL;
1287 char *cg_part_str = NULL;
1288
1289 part_iterator = list_iterator_create(part_list);
1290 while ((part_ptr = list_next(part_iterator))) {
1291 if (bit_overlap_any(eff_cg_bitmap,
1292 part_ptr->node_bitmap) &&
1293 (part_ptr->state_up & PARTITION_SCHED)) {
1294 failed_parts[failed_part_cnt++] =
1295 part_ptr;
1296 bit_and_not(avail_node_bitmap,
1297 part_ptr->node_bitmap);
1298 if (slurmctld_conf.slurmctld_debug >=
1299 LOG_LEVEL_DEBUG) {
1300 if (cg_part_str)
1301 xstrcat(cg_part_str,
1302 ",");
1303 xstrfmtcat(cg_part_str, "%s",
1304 part_ptr->name);
1305 }
1306 }
1307 }
1308 list_iterator_destroy(part_iterator);
1309 if (cg_part_str) {
1310 sched_debug("some job is still completing, skipping partitions '%s'",
1311 cg_part_str);
1312 xfree(cg_part_str);
1313 }
1314 }
1315 bit_free(eff_cg_bitmap);
1316 }
1317
1318 if (max_jobs_per_part) {
1319 ListIterator part_iterator;
1320 sched_part_ptr = xcalloc(part_cnt, sizeof(part_record_t *));
1321 sched_part_jobs = xmalloc(sizeof(int) * part_cnt);
1322 part_iterator = list_iterator_create(part_list);
1323 i = 0;
1324 while ((part_ptr = list_next(part_iterator))) {
1325 sched_part_ptr[i++] = part_ptr;
1326 }
1327 list_iterator_destroy(part_iterator);
1328 }
1329
1330 sched_debug("Running job scheduler");
1331 /*
1332 * If we are doing FIFO scheduling, use the job records right off the
1333 * job list.
1334 *
1335 * If a job is submitted to multiple partitions then build_job_queue()
1336 * will return a separate record for each job:partition pair.
1337 *
1338 * In both cases, we test each partition associated with the job.
1339 */
1340 if (fifo_sched) {
1341 slurmctld_diag_stats.schedule_queue_len = list_count(job_list);
1342 job_iterator = list_iterator_create(job_list);
1343 } else {
1344 job_queue = build_job_queue(false, false);
1345 slurmctld_diag_stats.schedule_queue_len = list_count(job_queue);
1346 sort_job_queue(job_queue);
1347 }
1348
1349 job_ptr = NULL;
1350 wait_on_resv = false;
1351 while (1) {
1352 /* Run some final guaranteed logic after each job iteration */
1353 if (job_ptr) {
1354 job_resv_clear_promiscous_flag(job_ptr);
1355 fill_array_reasons(job_ptr, reject_array_job);
1356 }
1357
1358 if (fifo_sched) {
1359 if (job_ptr && part_iterator &&
1360 IS_JOB_PENDING(job_ptr)) /* test job in next part */
1361 goto next_part;
1362 job_ptr = list_next(job_iterator);
1363 if (!job_ptr)
1364 break;
1365
1366 /* When not fifo we do this in build_job_queue(). */
1367 if (IS_JOB_PENDING(job_ptr)) {
1368 set_job_failed_assoc_qos_ptr(job_ptr);
1369 acct_policy_handle_accrue_time(job_ptr, false);
1370 }
1371
1372 if (!avail_front_end(job_ptr)) {
1373 job_ptr->state_reason = WAIT_FRONT_END;
1374 xfree(job_ptr->state_desc);
1375 last_job_update = now;
1376 continue;
1377 }
1378 if (!_job_runnable_test1(job_ptr, false))
1379 continue;
1380 if (job_ptr->part_ptr_list) {
1381 part_iterator = list_iterator_create(
1382 job_ptr->part_ptr_list);
1383 next_part:
1384 part_ptr = list_next(part_iterator);
1385
1386 if (!_job_runnable_test3(job_ptr, part_ptr))
1387 continue;
1388
1389 if (part_ptr) {
1390 job_ptr->part_ptr = part_ptr;
1391 if (job_limits_check(&job_ptr, false) !=
1392 WAIT_NO_REASON)
1393 continue;
1394 } else {
1395 list_iterator_destroy(part_iterator);
1396 part_iterator = NULL;
1397 continue;
1398 }
1399 } else {
1400 if (!_job_runnable_test2(job_ptr, false))
1401 continue;
1402 }
1403 } else {
1404 job_queue_rec = list_pop(job_queue);
1405 if (!job_queue_rec)
1406 break;
1407 array_task_id = job_queue_rec->array_task_id;
1408 job_ptr = job_queue_rec->job_ptr;
1409 part_ptr = job_queue_rec->part_ptr;
1410 job_ptr->priority = job_queue_rec->priority;
1411
1412 job_queue_rec_prom_resv(job_queue_rec);
1413 xfree(job_queue_rec);
1414
1415 if (!avail_front_end(job_ptr)) {
1416 job_ptr->state_reason = WAIT_FRONT_END;
1417 xfree(job_ptr->state_desc);
1418 last_job_update = now;
1419 continue;
1420 }
1421 if ((job_ptr->array_task_id != array_task_id) &&
1422 (array_task_id == NO_VAL)) {
1423 /* Job array element started in other partition,
1424 * reset pointer to "master" job array record */
1425 job_ptr = find_job_record(job_ptr->array_job_id);
1426 }
1427 if (!job_ptr || !IS_JOB_PENDING(job_ptr))
1428 continue; /* started in other partition */
1429
1430 if (!_job_runnable_test3(job_ptr, part_ptr))
1431 continue;
1432
1433 job_ptr->part_ptr = part_ptr;
1434 }
1435
1436 job_ptr->last_sched_eval = time(NULL);
1437
1438 if (job_ptr->preempt_in_progress)
1439 continue; /* scheduled in another partition */
1440
1441 if (job_ptr->het_job_id) {
1442 fail_by_part = true;
1443 goto fail_this_part;
1444 }
1445
1446 if (job_ptr->array_recs && (job_ptr->array_task_id == NO_VAL))
1447 is_job_array_head = true;
1448 else
1449 is_job_array_head = false;
1450
1451 next_task:
1452 if ((time(NULL) - sched_start) >= sched_timeout) {
1453 sched_debug("loop taking too long, breaking out");
1454 break;
1455 }
1456 if (sched_max_job_start && (job_cnt >= sched_max_job_start)) {
1457 sched_debug("sched_max_job_start reached, breaking out");
1458 break;
1459 }
1460
1461 if ((job_ptr->array_task_id != NO_VAL) || job_ptr->array_recs) {
1462 if (reject_array_job &&
1463 (reject_array_job->array_job_id ==
1464 job_ptr->array_job_id) &&
1465 (reject_array_part == part_ptr))
1466 continue; /* already rejected array element */
1467
1468 /* assume reject whole array for now, clear if OK */
1469 reject_array_job = job_ptr;
1470 reject_array_part = part_ptr;
1471
1472 if (!job_array_start_test(job_ptr))
1473 continue;
1474 }
1475 if (max_jobs_per_part) {
1476 bool skip_job = false;
1477 for (j = 0; j < part_cnt; j++) {
1478 if (sched_part_ptr[j] != job_ptr->part_ptr)
1479 continue;
1480 if (sched_part_jobs[j]++ >=
1481 max_jobs_per_part)
1482 skip_job = true;
1483 break;
1484 }
1485 if (skip_job) {
1486 if (job_ptr->state_reason == WAIT_NO_REASON) {
1487 xfree(job_ptr->state_desc);
1488 job_ptr->state_reason = WAIT_PRIORITY;
1489 }
1490 if (job_ptr->part_ptr == skip_part_ptr)
1491 continue;
1492 sched_debug2("reached partition %s job limit",
1493 job_ptr->part_ptr->name);
1494 skip_part_ptr = job_ptr->part_ptr;
1495 continue;
1496 }
1497 }
1498 if (job_depth++ > job_limit) {
1499 sched_debug("already tested %u jobs, breaking out",
1500 job_depth);
1501 break;
1502 }
1503
1504 slurm_mutex_lock(&slurmctld_config.thread_count_lock);
1505 if ((defer_rpc_cnt > 0) &&
1506 (slurmctld_config.server_thread_count >= defer_rpc_cnt)) {
1507 sched_debug("schedule() returning, too many RPCs");
1508 slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
1509 break;
1510 }
1511 slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
1512
1513 if (job_limits_check(&job_ptr, false) != WAIT_NO_REASON) {
1514 /* should never happen */
1515 continue;
1516 }
1517
1518 slurmctld_diag_stats.schedule_cycle_depth++;
1519
1520 if (job_ptr->resv_name) {
1521 bool found_resv = false;
1522
1523 /*
1524 * If we have a MaxStartDelay we need to make sure we
1525 * don't schedule any jobs that could potentially run to
1526 * avoid starvation of this job.
1527 */
1528 if (job_ptr->resv_ptr &&
1529 job_ptr->resv_ptr->max_start_delay)
1530 wait_on_resv = true;
1531
1532 for (i = 0; i < failed_resv_cnt; i++) {
1533 if (failed_resv[i] == job_ptr->resv_ptr) {
1534 found_resv = true;
1535 break;
1536 }
1537 }
1538 if (found_resv) {
1539 job_ptr->state_reason = WAIT_PRIORITY;
1540 xfree(job_ptr->state_desc);
1541 sched_debug3("%pJ. State=PENDING. Reason=Priority. Priority=%u. Resv=%s.",
1542 job_ptr,
1543 job_ptr->priority,
1544 job_ptr->resv_name);
1545 continue;
1546 }
1547 } else if (_failed_partition(job_ptr->part_ptr, failed_parts,
1548 failed_part_cnt)) {
1549 if ((job_ptr->state_reason == WAIT_NO_REASON) ||
1550 (job_ptr->state_reason == WAIT_RESOURCES)) {
1551 sched_debug("%pJ unable to schedule in Partition=%s (per _failed_partition()). State=PENDING. Previous-Reason=%s. Previous-Desc=%s. New-Reason=Priority. Priority=%u.",
1552 job_ptr,
1553 job_ptr->partition,
1554 job_reason_string(
1555 job_ptr->state_reason),
1556 job_ptr->state_desc,
1557 job_ptr->priority);
1558 job_ptr->state_reason = WAIT_PRIORITY;
1559 xfree(job_ptr->state_desc);
1560 } else {
1561 /*
1562 * Log job can not run even though we are not
1563 * overriding the reason */
1564 sched_debug2("%pJ. unable to schedule in Partition=%s (per _failed_partition()). Retaining previous scheduling Reason=%s. Desc=%s. Priority=%u.",
1565 job_ptr,
1566 job_ptr->partition,
1567 job_reason_string(
1568 job_ptr->state_reason),
1569 job_ptr->state_desc,
1570 job_ptr->priority);
1571 }
1572 last_job_update = now;
1573
1574 continue;
1575 } else if (wait_on_resv &&
1576 (job_ptr->warn_flags & KILL_JOB_RESV)) {
1577 sched_debug("%pJ. State=PENDING. Reason=Priority, Priority=%u. May be able to backfill on MaxStartDelay reservations.",
1578 job_ptr, job_ptr->priority);
1579 continue;
1580
1581 }
1582
1583 /* Test for valid account, QOS and required nodes on each pass */
1584 if (job_ptr->state_reason == FAIL_ACCOUNT) {
1585 slurmdb_assoc_rec_t assoc_rec;
1586 memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
1587 assoc_rec.acct = job_ptr->account;
1588 if (job_ptr->part_ptr)
1589 assoc_rec.partition = job_ptr->part_ptr->name;
1590 assoc_rec.uid = job_ptr->user_id;
1591
1592 if (!assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
1593 accounting_enforce,
1594 &job_ptr->assoc_ptr,
1595 false)) {
1596 job_ptr->state_reason = WAIT_NO_REASON;
1597 xfree(job_ptr->state_desc);
1598 job_ptr->assoc_id = assoc_rec.id;
1599 last_job_update = now;
1600 } else {
1601 sched_debug("%pJ has invalid association",
1602 job_ptr);
1603 xfree(job_ptr->state_desc);
1604 job_ptr->state_reason =
1605 WAIT_ASSOC_RESOURCE_LIMIT;
1606 continue;
1607 }
1608 }
1609 if (job_ptr->qos_id) {
1610 assoc_mgr_lock_t locks =
1611 { .assoc = READ_LOCK, .qos = READ_LOCK };
1612
1613 assoc_mgr_lock(&locks);
1614 if (job_ptr->assoc_ptr
1615 && (accounting_enforce & ACCOUNTING_ENFORCE_QOS)
1616 && ((job_ptr->qos_id >= g_qos_count) ||
1617 !job_ptr->assoc_ptr->usage ||
1618 !job_ptr->assoc_ptr->usage->valid_qos ||
1619 !bit_test(job_ptr->assoc_ptr->usage->valid_qos,
1620 job_ptr->qos_id))
1621 && !job_ptr->limit_set.qos) {
1622 assoc_mgr_unlock(&locks);
1623 sched_debug("%pJ has invalid QOS", job_ptr);
1624 job_fail_qos(job_ptr, __func__);
1625 last_job_update = now;
1626 continue;
1627 } else if (job_ptr->state_reason == FAIL_QOS) {
1628 xfree(job_ptr->state_desc);
1629 job_ptr->state_reason = WAIT_NO_REASON;
1630 last_job_update = now;
1631 }
1632 assoc_mgr_unlock(&locks);
1633 }
1634
1635 deadline_time_limit = 0;
1636 if ((job_ptr->deadline) && (job_ptr->deadline != NO_VAL)) {
1637 if (!deadline_ok(job_ptr, "sched"))
1638 continue;
1639
1640 deadline_time_limit = job_ptr->deadline - now;
1641 deadline_time_limit /= 60;
1642 if ((job_ptr->time_limit != NO_VAL) &&
1643 (job_ptr->time_limit != INFINITE)) {
1644 deadline_time_limit = MIN(job_ptr->time_limit,
1645 deadline_time_limit);
1646 } else {
1647 if ((job_ptr->part_ptr->default_time != NO_VAL) &&
1648 (job_ptr->part_ptr->default_time != INFINITE)){
1649 deadline_time_limit = MIN(
1650 job_ptr->part_ptr->default_time,
1651 deadline_time_limit);
1652 } else if ((job_ptr->part_ptr->max_time != NO_VAL) &&
1653 (job_ptr->part_ptr->max_time != INFINITE)){
1654 deadline_time_limit = MIN(
1655 job_ptr->part_ptr->max_time,
1656 deadline_time_limit);
1657 }
1658 }
1659 }
1660
1661 if (!acct_policy_job_runnable_state(job_ptr) &&
1662 !acct_policy_job_runnable_pre_select(job_ptr, false))
1663 continue;
1664
1665 if ((job_ptr->state_reason == WAIT_NODE_NOT_AVAIL) &&
1666 job_ptr->details && job_ptr->details->req_node_bitmap &&
1667 !bit_super_set(job_ptr->details->req_node_bitmap,
1668 avail_node_bitmap)) {
1669 continue;
1670 }
1671
1672 if (!job_ptr->part_ptr)
1673 continue;
1674 i = bit_overlap(avail_node_bitmap,
1675 job_ptr->part_ptr->node_bitmap);
1676 if ((job_ptr->details &&
1677 (job_ptr->details->min_nodes != NO_VAL) &&
1678 (job_ptr->details->min_nodes > i)) ||
1679 (!job_ptr->details && (i == 0))) {
1680 /*
1681 * Too many nodes DRAIN, DOWN, or
1682 * reserved for jobs in higher priority partition
1683 */
1684 job_ptr->state_reason = WAIT_RESOURCES;
1685 xfree(job_ptr->state_desc);
1686 job_ptr->state_desc = xstrdup("Nodes required for job are DOWN, DRAINED or reserved for jobs in higher priority partitions");
1687 last_job_update = now;
1688 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s.",
1689 job_ptr,
1690 job_state_string(job_ptr->job_state),
1691 job_reason_string(job_ptr->state_reason),
1692 job_ptr->priority, job_ptr->partition);
1693 fail_by_part = true;
1694 goto fail_this_part;
1695 }
1696 if (license_job_test(job_ptr, time(NULL), true) !=
1697 SLURM_SUCCESS) {
1698 job_ptr->state_reason = WAIT_LICENSES;
1699 xfree(job_ptr->state_desc);
1700 last_job_update = now;
1701 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
1702 job_ptr,
1703 job_state_string(job_ptr->job_state),
1704 job_reason_string(job_ptr->state_reason),
1705 job_ptr->priority);
1706 continue;
1707 }
1708
1709 if (assoc_mgr_validate_assoc_id(acct_db_conn,
1710 job_ptr->assoc_id,
1711 accounting_enforce)) {
1712 /* NOTE: This only happens if a user's account is
1713 * disabled between when the job was submitted and
1714 * the time we consider running it. It should be
1715 * very rare. */
1716 sched_info("%pJ has invalid account", job_ptr);
1717 last_job_update = now;
1718 job_ptr->state_reason = FAIL_ACCOUNT;
1719 xfree(job_ptr->state_desc);
1720 continue;
1721 }
1722
1723 last_job_sched_start = MAX(last_job_sched_start,
1724 job_ptr->start_time);
1725 if (deadline_time_limit) {
1726 save_time_limit = job_ptr->time_limit;
1727 job_ptr->time_limit = deadline_time_limit;
1728 }
1729
1730 /* get fed job lock from origin cluster */
1731 if (fed_mgr_job_lock(job_ptr)) {
1732 error_code = ESLURM_FED_JOB_LOCK;
1733 goto skip_start;
1734 }
1735
1736 error_code = select_nodes(job_ptr, false, NULL, NULL, false,
1737 SLURMDB_JOB_FLAG_SCHED);
1738
1739 if (error_code == SLURM_SUCCESS) {
1740 /*
1741 * If the following fails because of network
1742 * connectivity, the origin cluster should ask
1743 * when it comes back up if the cluster_lock
1744 * cluster actually started the job
1745 */
1746 fed_mgr_job_start(job_ptr, job_ptr->start_time);
1747 } else {
1748 fed_mgr_job_unlock(job_ptr);
1749 }
1750
1751 skip_start:
1752
1753 fail_by_part = false;
1754 if ((error_code != SLURM_SUCCESS) && deadline_time_limit)
1755 job_ptr->time_limit = save_time_limit;
1756 if ((error_code == ESLURM_NODES_BUSY) ||
1757 (error_code == ESLURM_POWER_NOT_AVAIL) ||
1758 (error_code == ESLURM_POWER_RESERVED)) {
1759 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s.",
1760 job_ptr,
1761 job_state_string(job_ptr->job_state),
1762 job_reason_string(job_ptr->state_reason),
1763 job_ptr->priority, job_ptr->partition);
1764 fail_by_part = true;
1765 } else if (error_code == ESLURM_BURST_BUFFER_WAIT) {
1766 if (job_ptr->start_time == 0) {
1767 job_ptr->start_time = last_job_sched_start;
1768 bb_wait_cnt++;
1769 }
1770 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
1771 job_ptr,
1772 job_state_string(job_ptr->job_state),
1773 job_reason_string(job_ptr->state_reason),
1774 job_ptr->priority);
1775 continue;
1776 } else if ((error_code == ESLURM_RESERVATION_BUSY) ||
1777 (error_code == ESLURM_RESERVATION_NOT_USABLE)) {
1778 if (job_ptr->resv_ptr &&
1779 job_ptr->resv_ptr->node_bitmap) {
1780 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u.",
1781 job_ptr,
1782 job_state_string(job_ptr->job_state),
1783 job_reason_string(job_ptr->state_reason),
1784 job_ptr->priority);
1785 bit_and_not(avail_node_bitmap,
1786 job_ptr->resv_ptr->node_bitmap);
1787 } else {
1788 /*
1789 * The job has no reservation but requires
1790 * nodes that are currently in some reservation
1791 * so just skip over this job and try running
1792 * the next lower priority job
1793 */
1794 sched_debug3("%pJ. State=%s. Reason=Required nodes are reserved. Priority=%u",
1795 job_ptr,
1796 job_state_string(job_ptr->job_state),
1797 job_ptr->priority);
1798 }
1799 } else if (error_code == ESLURM_FED_JOB_LOCK) {
1800 job_ptr->state_reason = WAIT_FED_JOB_LOCK;
1801 xfree(job_ptr->state_desc);
1802 last_job_update = now;
1803 sched_debug3("%pJ. State=%s. Reason=%s. Priority=%u. Partition=%s.",
1804 job_ptr,
1805 job_state_string(job_ptr->job_state),
1806 job_reason_string(job_ptr->state_reason),
1807 job_ptr->priority, job_ptr->partition);
1808 fail_by_part = true;
1809 } else if (error_code == SLURM_SUCCESS) {
1810 /* job initiated */
1811 sched_debug3("%pJ initiated", job_ptr);
1812 last_job_update = now;
1813
1814 /* Clear assumed rejected array status */
1815 reject_array_job = NULL;
1816 reject_array_part = NULL;
1817
1818 /* synchronize power layouts key/values */
1819 if ((powercap_get_cluster_current_cap() != 0) &&
1820 (which_power_layout() == 2)) {
1821 layouts_entity_pull_kv("power",
1822 "Cluster",
1823 "CurrentSumPower");
1824 }
1825 sched_info("Allocate %pJ NodeList=%s #CPUs=%u Partition=%s",
1826 job_ptr, job_ptr->nodes,
1827 job_ptr->total_cpus,
1828 job_ptr->part_ptr->name);
1829 if (job_ptr->batch_flag == 0)
1830 srun_allocate(job_ptr);
1831 else if (!IS_JOB_CONFIGURING(job_ptr))
1832 launch_job(job_ptr);
1833 rebuild_job_part_list(job_ptr);
1834 job_cnt++;
1835 if (is_job_array_head &&
1836 (job_ptr->array_task_id != NO_VAL)) {
1837 /* Try starting another task of the job array */
1838 job_ptr = find_job_record(job_ptr->array_job_id);
1839 if (job_ptr && IS_JOB_PENDING(job_ptr) &&
1840 (bb_g_job_test_stage_in(job_ptr,false) ==1))
1841 goto next_task;
1842 }
1843 continue;
1844 } else if ((error_code ==
1845 ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE) &&
1846 job_ptr->part_ptr_list) {
1847 debug("%pJ non-runnable in partition %s: %s",
1848 job_ptr, job_ptr->part_ptr->name,
1849 slurm_strerror(error_code));
1850 } else if (error_code == ESLURM_ACCOUNTING_POLICY) {
1851 sched_debug3("%pJ delayed for accounting policy",
1852 job_ptr);
1853 /* potentially starve this job */
1854 if (assoc_limit_stop)
1855 fail_by_part = true;
1856 } else if ((error_code !=
1857 ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE) &&
1858 (error_code != ESLURM_NODE_NOT_AVAIL) &&
1859 (error_code != ESLURM_INVALID_BURST_BUFFER_REQUEST)){
1860 sched_info("schedule: %pJ non-runnable: %s",
1861 job_ptr, slurm_strerror(error_code));
1862 last_job_update = now;
1863 job_ptr->job_state = JOB_PENDING;
1864 job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
1865 xfree(job_ptr->state_desc);
1866 job_ptr->start_time = job_ptr->end_time = now;
1867 job_ptr->priority = 0;
1868 debug2("%s: setting %pJ to \"%s\" (%s)",
1869 __func__, job_ptr,
1870 job_reason_string(job_ptr->state_reason),
1871 slurm_strerror(error_code));
1872 }
1873
1874 if (job_ptr->details && job_ptr->details->req_node_bitmap &&
1875 (bit_set_count(job_ptr->details->req_node_bitmap) >=
1876 job_ptr->details->min_nodes)) {
1877 fail_by_part = false;
1878 /* Do not schedule more jobs on nodes required by this
1879 * job, but don't block the entire queue/partition. */
1880 bit_and_not(avail_node_bitmap,
1881 job_ptr->details->req_node_bitmap);
1882 }
1883 if (fail_by_part && job_ptr->resv_name) {
1884 /* do not schedule more jobs in this reservation, but
1885 * other jobs in this partition can be scheduled. */
1886 fail_by_part = false;
1887 if (failed_resv_cnt < MAX_FAILED_RESV) {
1888 failed_resv[failed_resv_cnt++] =
1889 job_ptr->resv_ptr;
1890 }
1891 }
1892 if (fail_by_part && bf_min_age_reserve) {
1893 /* Consider other jobs in this partition if job has been
1894 * waiting for less than bf_min_age_reserve time */
1895 if (job_ptr->details->begin_time == 0) {
1896 fail_by_part = false;
1897 } else {
1898 pend_time = difftime(
1899 now, job_ptr->details->begin_time);
1900 if (pend_time < bf_min_age_reserve)
1901 fail_by_part = false;
1902 }
1903 }
1904
1905 if (!(prio_reserve = acct_policy_get_prio_thresh(
1906 job_ptr, false)))
1907 prio_reserve = bf_min_prio_reserve;
1908
1909 if (fail_by_part && prio_reserve &&
1910 (job_ptr->priority < prio_reserve))
1911 fail_by_part = false;
1912
1913 fail_this_part: if (fail_by_part) {
1914 /* Search for duplicates */
1915 for (i = 0; i < failed_part_cnt; i++) {
1916 if (failed_parts[i] == job_ptr->part_ptr) {
1917 fail_by_part = false;
1918 break;
1919 }
1920 }
1921 }
1922 if (fail_by_part) {
1923 /*
1924 * Do not schedule more jobs in this partition or on
1925 * nodes in this partition
1926 */
1927 failed_parts[failed_part_cnt++] = job_ptr->part_ptr;
1928 bit_and_not(avail_node_bitmap,
1929 job_ptr->part_ptr->node_bitmap);
1930 }
1931 }
1932
1933 if (bb_wait_cnt)
1934 (void) bb_g_job_try_stage_in();
1935
1936 if (job_ptr)
1937 job_resv_clear_promiscous_flag(job_ptr);
1938 save_last_part_update = last_part_update;
1939 FREE_NULL_BITMAP(avail_node_bitmap);
1940 avail_node_bitmap = save_avail_node_bitmap;
1941 xfree(failed_parts);
1942 xfree(failed_resv);
1943 if (fifo_sched) {
1944 if (job_iterator)
1945 list_iterator_destroy(job_iterator);
1946 if (part_iterator)
1947 list_iterator_destroy(part_iterator);
1948 } else if (job_queue) {
1949 FREE_NULL_LIST(job_queue);
1950 }
1951 xfree(sched_part_ptr);
1952 xfree(sched_part_jobs);
1953 slurm_mutex_lock(&slurmctld_config.thread_count_lock);
1954 if ((slurmctld_config.server_thread_count >= 150) &&
1955 (defer_rpc_cnt == 0)) {
1956 sched_info("%d pending RPCs at cycle end, consider configuring max_rpc_cnt",
1957 slurmctld_config.server_thread_count);
1958 }
1959 slurm_mutex_unlock(&slurmctld_config.thread_count_lock);
1960 unlock_slurmctld(job_write_lock);
1961 END_TIMER2("schedule");
1962
1963 _do_diag_stats(DELTA_TIMER);
1964
1965 out:
1966 #if HAVE_SYS_PRCTL_H
1967 if (prctl(PR_SET_NAME, get_name, NULL, NULL, NULL) < 0) {
1968 error("%s: cannot set my name to %s %m",
1969 __func__, get_name);
1970 }
1971 #endif
1972 return job_cnt;
1973 }
1974
1975 /*
1976 * sort_job_queue - sort job_queue in descending priority order
1977 * IN/OUT job_queue - sorted job queue
1978 */
sort_job_queue(List job_queue)1979 extern void sort_job_queue(List job_queue)
1980 {
1981 list_sort(job_queue, sort_job_queue2);
1982 }
1983
1984 /* Note this differs from the ListCmpF typedef since we want jobs sorted
1985 * in order of decreasing priority then submit time and the by increasing
1986 * job id */
sort_job_queue2(void * x,void * y)1987 extern int sort_job_queue2(void *x, void *y)
1988 {
1989 job_queue_rec_t *job_rec1 = *(job_queue_rec_t **) x;
1990 job_queue_rec_t *job_rec2 = *(job_queue_rec_t **) y;
1991 het_job_details_t *details = NULL;
1992 bool has_resv1, has_resv2;
1993 static time_t config_update = 0;
1994 static bool preemption_enabled = true;
1995 uint32_t job_id1, job_id2;
1996 uint32_t p1, p2;
1997
1998 /* The following block of code is designed to minimize run time in
1999 * typical configurations for this frequently executed function. */
2000 if (config_update != slurmctld_conf.last_update) {
2001 preemption_enabled = slurm_preemption_enabled();
2002 config_update = slurmctld_conf.last_update;
2003 }
2004 if (preemption_enabled) {
2005 if (preempt_g_job_preempt_check(job_rec1, job_rec2))
2006 return -1;
2007 if (preempt_g_job_preempt_check(job_rec2, job_rec1))
2008 return 1;
2009 }
2010
2011 if (bf_hetjob_prio && job_rec1->job_ptr->het_job_id &&
2012 (job_rec1->job_ptr->het_job_id !=
2013 job_rec2->job_ptr->het_job_id)) {
2014 if ((details = job_rec1->job_ptr->het_details))
2015 has_resv1 = details->any_resv;
2016 else
2017 has_resv1 = (job_rec1->job_ptr->resv_id != 0) ||
2018 job_rec1->resv_ptr;
2019 } else
2020 has_resv1 = (job_rec1->job_ptr->resv_id != 0) ||
2021 job_rec1->resv_ptr;
2022
2023 if (bf_hetjob_prio && job_rec2->job_ptr->het_job_id &&
2024 (job_rec2->job_ptr->het_job_id !=
2025 job_rec1->job_ptr->het_job_id)) {
2026 if ((details = job_rec2->job_ptr->het_details))
2027 has_resv2 = details->any_resv;
2028 else
2029 has_resv2 = (job_rec2->job_ptr->resv_id != 0) ||
2030 job_rec2->resv_ptr;
2031 } else
2032 has_resv2 = (job_rec2->job_ptr->resv_id != 0) ||
2033 job_rec2->resv_ptr;
2034
2035 if (has_resv1 && !has_resv2)
2036 return -1;
2037 if (!has_resv1 && has_resv2)
2038 return 1;
2039
2040 if (job_rec1->part_ptr && job_rec2->part_ptr) {
2041 if (bf_hetjob_prio && job_rec1->job_ptr->het_job_id &&
2042 (job_rec1->job_ptr->het_job_id !=
2043 job_rec2->job_ptr->het_job_id)) {
2044 if ((details = job_rec1->job_ptr->het_details))
2045 p1 = details->priority_tier;
2046 else
2047 p1 = job_rec1->part_ptr->priority_tier;
2048 } else
2049 p1 = job_rec1->part_ptr->priority_tier;
2050
2051 if (bf_hetjob_prio && job_rec2->job_ptr->het_job_id &&
2052 (job_rec2->job_ptr->het_job_id !=
2053 job_rec1->job_ptr->het_job_id)) {
2054 if ((details = job_rec2->job_ptr->het_details))
2055 p2 = details->priority_tier;
2056 else
2057 p2 = job_rec2->part_ptr->priority_tier;
2058 } else
2059 p2 = job_rec2->part_ptr->priority_tier;
2060
2061 if (p1 < p2)
2062 return 1;
2063 if (p1 > p2)
2064 return -1;
2065 }
2066
2067 if (bf_hetjob_prio && job_rec1->job_ptr->het_job_id &&
2068 (job_rec1->job_ptr->het_job_id !=
2069 job_rec2->job_ptr->het_job_id)) {
2070 if ((details = job_rec1->job_ptr->het_details))
2071 p1 = details->priority;
2072 else {
2073 if (job_rec1->job_ptr->part_ptr_list &&
2074 job_rec1->job_ptr->priority_array)
2075 p1 = job_rec1->priority;
2076 else
2077 p1 = job_rec1->job_ptr->priority;
2078 }
2079 } else {
2080 if (job_rec1->job_ptr->part_ptr_list &&
2081 job_rec1->job_ptr->priority_array)
2082 p1 = job_rec1->priority;
2083 else
2084 p1 = job_rec1->job_ptr->priority;
2085 }
2086
2087 if (bf_hetjob_prio && job_rec2->job_ptr->het_job_id &&
2088 (job_rec2->job_ptr->het_job_id !=
2089 job_rec1->job_ptr->het_job_id)) {
2090 if ((details = job_rec2->job_ptr->het_details))
2091 p2 = details->priority;
2092 else {
2093 if (job_rec2->job_ptr->part_ptr_list &&
2094 job_rec2->job_ptr->priority_array)
2095 p2 = job_rec2->priority;
2096 else
2097 p2 = job_rec2->job_ptr->priority;
2098 }
2099 } else {
2100 if (job_rec2->job_ptr->part_ptr_list &&
2101 job_rec2->job_ptr->priority_array)
2102 p2 = job_rec2->priority;
2103 else
2104 p2 = job_rec2->job_ptr->priority;
2105 }
2106
2107 if (p1 < p2)
2108 return 1;
2109 if (p1 > p2)
2110 return -1;
2111
2112 /* If the priorities are the same sort by submission time */
2113 if (job_rec1->job_ptr->details && job_rec2->job_ptr->details) {
2114 if (job_rec1->job_ptr->details->submit_time >
2115 job_rec2->job_ptr->details->submit_time)
2116 return 1;
2117 if (job_rec2->job_ptr->details->submit_time >
2118 job_rec1->job_ptr->details->submit_time)
2119 return -1;
2120 }
2121
2122 /* If the submission times are the same sort by increasing job id's */
2123 if (job_rec1->array_task_id == NO_VAL)
2124 job_id1 = job_rec1->job_id;
2125 else
2126 job_id1 = job_rec1->job_ptr->array_job_id;
2127 if (job_rec2->array_task_id == NO_VAL)
2128 job_id2 = job_rec2->job_id;
2129 else
2130 job_id2 = job_rec2->job_ptr->array_job_id;
2131 if (job_id1 > job_id2)
2132 return 1;
2133 else if (job_id1 < job_id2)
2134 return -1;
2135
2136 /* If job IDs match compare task IDs */
2137 if (job_rec1->array_task_id > job_rec2->array_task_id)
2138 return 1;
2139
2140 return -1;
2141 }
2142
2143 /* The environment" variable is points to one big xmalloc. In order to
2144 * manipulate the array for a hetjob, we need to split it into an array
2145 * containing multiple xmalloc variables */
_split_env(batch_job_launch_msg_t * launch_msg_ptr)2146 static void _split_env(batch_job_launch_msg_t *launch_msg_ptr)
2147 {
2148 int i;
2149
2150 for (i = 1; i < launch_msg_ptr->envc; i++) {
2151 launch_msg_ptr->environment[i] =
2152 xstrdup(launch_msg_ptr->environment[i]);
2153 }
2154 }
2155
2156 /* Given a scheduled job, return a pointer to it batch_job_launch_msg_t data */
_build_launch_job_msg(job_record_t * job_ptr,uint16_t protocol_version)2157 static batch_job_launch_msg_t *_build_launch_job_msg(job_record_t *job_ptr,
2158 uint16_t protocol_version)
2159 {
2160 char *fail_why = NULL;
2161 batch_job_launch_msg_t *launch_msg_ptr;
2162
2163 /* Initialization of data structures */
2164 launch_msg_ptr = (batch_job_launch_msg_t *)
2165 xmalloc(sizeof(batch_job_launch_msg_t));
2166 launch_msg_ptr->job_id = job_ptr->job_id;
2167 launch_msg_ptr->het_job_id = job_ptr->het_job_id;
2168 launch_msg_ptr->step_id = NO_VAL;
2169 launch_msg_ptr->array_job_id = job_ptr->array_job_id;
2170 launch_msg_ptr->array_task_id = job_ptr->array_task_id;
2171 launch_msg_ptr->uid = job_ptr->user_id;
2172 launch_msg_ptr->gid = job_ptr->group_id;
2173
2174 if (!(launch_msg_ptr->script_buf = get_job_script(job_ptr))) {
2175 fail_why = "Unable to load job batch script";
2176 goto job_failed;
2177 }
2178
2179 launch_msg_ptr->ntasks = job_ptr->details->num_tasks;
2180 launch_msg_ptr->alias_list = xstrdup(job_ptr->alias_list);
2181 launch_msg_ptr->nodes = xstrdup(job_ptr->nodes);
2182 launch_msg_ptr->overcommit = job_ptr->details->overcommit;
2183 launch_msg_ptr->open_mode = job_ptr->details->open_mode;
2184 launch_msg_ptr->cpus_per_task = job_ptr->details->cpus_per_task;
2185 launch_msg_ptr->pn_min_memory = job_ptr->details->pn_min_memory;
2186 launch_msg_ptr->restart_cnt = job_ptr->restart_cnt;
2187 launch_msg_ptr->profile = job_ptr->profile;
2188
2189 if (make_batch_job_cred(launch_msg_ptr, job_ptr, protocol_version)) {
2190 /* FIXME: This is a kludge, but this event indicates a serious
2191 * problem with Munge or OpenSSH and should never happen. We
2192 * are too deep into the job launch to gracefully clean up from
2193 * from the launch, so requeue if possible. */
2194 error("Can not create job credential, attempting to requeue batch %pJ",
2195 job_ptr);
2196 slurm_free_job_launch_msg(launch_msg_ptr);
2197 job_ptr->batch_flag = 1; /* Allow repeated requeue */
2198 job_ptr->details->begin_time = time(NULL) + 120;
2199 job_complete(job_ptr->job_id, slurmctld_conf.slurm_user_id,
2200 true, false, 0);
2201 return NULL;
2202 }
2203
2204 launch_msg_ptr->acctg_freq = xstrdup(job_ptr->details->acctg_freq);
2205 if (job_ptr->part_ptr)
2206 launch_msg_ptr->partition = xstrdup(job_ptr->part_ptr->name);
2207 else
2208 launch_msg_ptr->partition = xstrdup(job_ptr->partition);
2209 launch_msg_ptr->std_err = xstrdup(job_ptr->details->std_err);
2210 launch_msg_ptr->std_in = xstrdup(job_ptr->details->std_in);
2211 launch_msg_ptr->std_out = xstrdup(job_ptr->details->std_out);
2212 launch_msg_ptr->work_dir = xstrdup(job_ptr->details->work_dir);
2213
2214 launch_msg_ptr->argc = job_ptr->details->argc;
2215 launch_msg_ptr->argv = xduparray(job_ptr->details->argc,
2216 job_ptr->details->argv);
2217 launch_msg_ptr->spank_job_env_size = job_ptr->spank_job_env_size;
2218 launch_msg_ptr->spank_job_env = xduparray(job_ptr->spank_job_env_size,
2219 job_ptr->spank_job_env);
2220 launch_msg_ptr->environment = get_job_env(job_ptr,
2221 &launch_msg_ptr->envc);
2222 if (launch_msg_ptr->environment == NULL) {
2223 fail_why = "Unable to load job environment";
2224 goto job_failed;
2225 }
2226
2227 _split_env(launch_msg_ptr);
2228 launch_msg_ptr->job_mem = job_ptr->details->pn_min_memory;
2229 launch_msg_ptr->num_cpu_groups = job_ptr->job_resrcs->cpu_array_cnt;
2230 launch_msg_ptr->cpus_per_node = xmalloc(
2231 sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt);
2232 memcpy(launch_msg_ptr->cpus_per_node,
2233 job_ptr->job_resrcs->cpu_array_value,
2234 (sizeof(uint16_t) * job_ptr->job_resrcs->cpu_array_cnt));
2235 launch_msg_ptr->cpu_count_reps = xmalloc(
2236 sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt);
2237 memcpy(launch_msg_ptr->cpu_count_reps,
2238 job_ptr->job_resrcs->cpu_array_reps,
2239 (sizeof(uint32_t) * job_ptr->job_resrcs->cpu_array_cnt));
2240
2241 launch_msg_ptr->select_jobinfo = select_g_select_jobinfo_copy(
2242 job_ptr->select_jobinfo);
2243
2244 if (job_ptr->qos_ptr) {
2245 if (!xstrcmp(job_ptr->qos_ptr->description,
2246 "Normal QOS default"))
2247 launch_msg_ptr->qos = xstrdup("normal");
2248 else
2249 launch_msg_ptr->qos = xstrdup(
2250 job_ptr->qos_ptr->description);
2251 }
2252 launch_msg_ptr->account = xstrdup(job_ptr->account);
2253 launch_msg_ptr->resv_name = xstrdup(job_ptr->resv_name);
2254
2255 xassert(!fail_why);
2256 return launch_msg_ptr;
2257
2258 job_failed:
2259 /* fatal or kill the job as it can never be recovered */
2260 if (!ignore_state_errors)
2261 fatal("%s: %s for %pJ. Check file system serving StateSaveLocation as that directory may be missing or corrupted. Start with '-i' to ignore this error and kill the afflicted jobs.",
2262 __func__, fail_why, job_ptr);
2263
2264 error("%s: %s for %pJ. %pJ will be killed due to system error.",
2265 __func__, fail_why, job_ptr, job_ptr);
2266 xfree(job_ptr->state_desc);
2267 job_ptr->state_desc = xstrdup(fail_why);
2268 job_ptr->state_reason_prev = job_ptr->state_reason;
2269 job_ptr->state_reason = FAIL_SYSTEM;
2270 slurm_free_job_launch_msg(launch_msg_ptr);
2271 /* ignore the return as job is in an unknown state anyway */
2272 job_complete(job_ptr->job_id, slurmctld_conf.slurm_user_id, false,
2273 false, 1);
2274 return NULL;
2275 }
2276
2277 /* Validate the job is ready for launch
2278 * RET pointer to batch job to launch or NULL if not ready yet */
_het_job_ready(job_record_t * job_ptr)2279 static job_record_t *_het_job_ready(job_record_t *job_ptr)
2280 {
2281 job_record_t *het_job_leader, *het_job;
2282 ListIterator iter;
2283
2284 if (job_ptr->het_job_id == 0) /* Not a hetjob */
2285 return job_ptr;
2286
2287 het_job_leader = find_job_record(job_ptr->het_job_id);
2288 if (!het_job_leader) {
2289 error("Hetjob leader %pJ not found", job_ptr);
2290 return NULL;
2291 }
2292 if (!het_job_leader->het_job_list) {
2293 error("Hetjob leader %pJ lacks het_job_list", job_ptr);
2294 return NULL;
2295 }
2296
2297 iter = list_iterator_create(het_job_leader->het_job_list);
2298 while ((het_job = list_next(iter))) {
2299 uint8_t prolog = 0;
2300 if (het_job_leader->het_job_id != het_job->het_job_id) {
2301 error("%s: Bad het_job_list for %pJ",
2302 __func__, het_job_leader);
2303 continue;
2304 }
2305 if (job_ptr->details)
2306 prolog = het_job->details->prolog_running;
2307 if (prolog || IS_JOB_CONFIGURING(het_job) ||
2308 !test_job_nodes_ready(het_job)) {
2309 het_job_leader = NULL;
2310 break;
2311 }
2312 if ((job_ptr->batch_flag == 0) ||
2313 (!IS_JOB_RUNNING(job_ptr) && !IS_JOB_SUSPENDED(job_ptr))) {
2314 het_job_leader = NULL;
2315 break;
2316 }
2317 }
2318 list_iterator_destroy(iter);
2319
2320 if (slurmctld_conf.debug_flags & DEBUG_FLAG_HETJOB) {
2321 if (het_job_leader) {
2322 info("Batch hetjob %pJ being launched", het_job_leader);
2323 } else if (het_job) {
2324 info("Batch hetjob %pJ waiting for job to be ready",
2325 het_job);
2326 }
2327 }
2328
2329 return het_job_leader;
2330 }
2331
2332 /*
2333 * Set some hetjob environment variables. This will include information
2334 * about multiple job components (i.e. different slurmctld job records).
2335 */
_set_het_job_env(job_record_t * het_job_leader,batch_job_launch_msg_t * launch_msg_ptr)2336 static void _set_het_job_env(job_record_t *het_job_leader,
2337 batch_job_launch_msg_t *launch_msg_ptr)
2338 {
2339 job_record_t *het_job;
2340 int i, het_job_offset = 0;
2341 ListIterator iter;
2342
2343 if (het_job_leader->het_job_id == 0)
2344 return;
2345 if (!launch_msg_ptr->environment) {
2346 error("%pJ lacks environment", het_job_leader);
2347 return;
2348 }
2349 if (!het_job_leader->het_job_list) {
2350 error("Hetjob leader %pJ lacks het_job_list",
2351 het_job_leader);
2352 return;
2353 }
2354
2355 /* "environment" needs NULL terminator */
2356 xrealloc(launch_msg_ptr->environment,
2357 sizeof(char *) * (launch_msg_ptr->envc + 1));
2358 iter = list_iterator_create(het_job_leader->het_job_list);
2359 while ((het_job = list_next(iter))) {
2360 uint16_t cpus_per_task = 1;
2361 uint32_t num_cpus = 0;
2362 uint64_t tmp_mem = 0;
2363 char *tmp_str = NULL;
2364
2365 if (het_job_leader->het_job_id != het_job->het_job_id) {
2366 error("%s: Bad het_job_list for %pJ",
2367 __func__, het_job_leader);
2368 continue;
2369 }
2370 if (het_job->details &&
2371 (het_job->details->cpus_per_task > 0) &&
2372 (het_job->details->cpus_per_task != NO_VAL16)) {
2373 cpus_per_task = het_job->details->cpus_per_task;
2374 }
2375 if (het_job->account) {
2376 (void) env_array_overwrite_het_fmt(
2377 &launch_msg_ptr->environment,
2378 "SLURM_JOB_ACCOUNT",
2379 het_job_offset, "%s", het_job->account);
2380 }
2381
2382 if (het_job->job_resrcs) {
2383 tmp_str = uint32_compressed_to_str(
2384 het_job->job_resrcs->cpu_array_cnt,
2385 het_job->job_resrcs->cpu_array_value,
2386 het_job->job_resrcs->cpu_array_reps);
2387 (void) env_array_overwrite_het_fmt(
2388 &launch_msg_ptr->environment,
2389 "SLURM_JOB_CPUS_PER_NODE",
2390 het_job_offset, "%s", tmp_str);
2391 xfree(tmp_str);
2392 }
2393 (void) env_array_overwrite_het_fmt(
2394 &launch_msg_ptr->environment,
2395 "SLURM_JOB_ID",
2396 het_job_offset, "%u", het_job->job_id);
2397 (void) env_array_overwrite_het_fmt(
2398 &launch_msg_ptr->environment,
2399 "SLURM_JOB_NAME",
2400 het_job_offset, "%s", het_job->name);
2401 (void) env_array_overwrite_het_fmt(
2402 &launch_msg_ptr->environment,
2403 "SLURM_JOB_NODELIST",
2404 het_job_offset, "%s", het_job->nodes);
2405 (void) env_array_overwrite_het_fmt(
2406 &launch_msg_ptr->environment,
2407 "SLURM_JOB_NUM_NODES",
2408 het_job_offset, "%u", het_job->node_cnt);
2409 if (het_job->partition) {
2410 (void) env_array_overwrite_het_fmt(
2411 &launch_msg_ptr->environment,
2412 "SLURM_JOB_PARTITION",
2413 het_job_offset, "%s", het_job->partition);
2414 }
2415 if (het_job->qos_ptr) {
2416 slurmdb_qos_rec_t *qos;
2417 char *qos_name;
2418
2419 qos = (slurmdb_qos_rec_t *) het_job->qos_ptr;
2420 if (!xstrcmp(qos->description, "Normal QOS default"))
2421 qos_name = "normal";
2422 else
2423 qos_name = qos->description;
2424 (void) env_array_overwrite_het_fmt(
2425 &launch_msg_ptr->environment,
2426 "SLURM_JOB_QOS",
2427 het_job_offset, "%s", qos_name);
2428 }
2429 if (het_job->resv_name) {
2430 (void) env_array_overwrite_het_fmt(
2431 &launch_msg_ptr->environment,
2432 "SLURM_JOB_RESERVATION",
2433 het_job_offset, "%s", het_job->resv_name);
2434 }
2435 if (het_job->details)
2436 tmp_mem = het_job->details->pn_min_memory;
2437 if (tmp_mem & MEM_PER_CPU) {
2438 tmp_mem &= (~MEM_PER_CPU);
2439 (void) env_array_overwrite_het_fmt(
2440 &launch_msg_ptr->environment,
2441 "SLURM_MEM_PER_CPU",
2442 het_job_offset, "%"PRIu64"", tmp_mem);
2443 } else if (tmp_mem) {
2444 (void) env_array_overwrite_het_fmt(
2445 &launch_msg_ptr->environment,
2446 "SLURM_MEM_PER_NODE",
2447 het_job_offset, "%"PRIu64"", tmp_mem);
2448 }
2449 if (het_job->alias_list) {
2450 (void) env_array_overwrite_het_fmt(
2451 &launch_msg_ptr->environment,
2452 "SLURM_NODE_ALIASES", het_job_offset,
2453 "%s", het_job->alias_list);
2454 }
2455 if (het_job->details && het_job->job_resrcs) {
2456 /* Both should always be set for active jobs */
2457 struct job_resources *resrcs_ptr = het_job->job_resrcs;
2458 slurm_step_layout_t *step_layout = NULL;
2459 slurm_step_layout_req_t step_layout_req;
2460 uint16_t cpus_per_task_array[1];
2461 uint32_t cpus_task_reps[1], task_dist;
2462 memset(&step_layout_req, 0,
2463 sizeof(slurm_step_layout_req_t));
2464 for (i = 0; i < resrcs_ptr->cpu_array_cnt; i++) {
2465 num_cpus += resrcs_ptr->cpu_array_value[i] *
2466 resrcs_ptr->cpu_array_reps[i];
2467 }
2468
2469 if (het_job->details->num_tasks) {
2470 step_layout_req.num_tasks =
2471 het_job->details->num_tasks;
2472 } else {
2473 step_layout_req.num_tasks = num_cpus /
2474 cpus_per_task;
2475 }
2476 step_layout_req.num_hosts = het_job->node_cnt;
2477
2478 if ((step_layout_req.node_list =
2479 getenvp(launch_msg_ptr->environment,
2480 "SLURM_ARBITRARY_NODELIST"))) {
2481 task_dist = SLURM_DIST_ARBITRARY;
2482 } else {
2483 step_layout_req.node_list = het_job->nodes;
2484 task_dist = SLURM_DIST_BLOCK;
2485 }
2486 step_layout_req.cpus_per_node =
2487 het_job->job_resrcs->cpu_array_value;
2488 step_layout_req.cpu_count_reps =
2489 het_job->job_resrcs->cpu_array_reps;
2490 cpus_per_task_array[0] = cpus_per_task;
2491 step_layout_req.cpus_per_task = cpus_per_task_array;
2492 cpus_task_reps[0] = het_job->node_cnt;
2493 step_layout_req.cpus_task_reps = cpus_task_reps;
2494 step_layout_req.task_dist = task_dist;
2495 step_layout_req.plane_size = NO_VAL16;
2496 step_layout = slurm_step_layout_create(&step_layout_req);
2497 if (step_layout) {
2498 tmp_str = uint16_array_to_str(
2499 step_layout->node_cnt,
2500 step_layout->tasks);
2501 slurm_step_layout_destroy(step_layout);
2502 (void) env_array_overwrite_het_fmt(
2503 &launch_msg_ptr->environment,
2504 "SLURM_TASKS_PER_NODE",
2505 het_job_offset,"%s", tmp_str);
2506 xfree(tmp_str);
2507 }
2508 }
2509 het_job_offset++;
2510 }
2511 list_iterator_destroy(iter);
2512 /* Continue support for old hetjob terminology. */
2513 (void) env_array_overwrite_fmt(&launch_msg_ptr->environment,
2514 "SLURM_PACK_SIZE", "%d", het_job_offset);
2515 (void) env_array_overwrite_fmt(&launch_msg_ptr->environment,
2516 "SLURM_HET_SIZE", "%d", het_job_offset);
2517
2518 for (i = 0; launch_msg_ptr->environment[i]; i++)
2519 ;
2520 launch_msg_ptr->envc = i;
2521 }
2522
2523 /*
2524 * launch_job - send an RPC to a slurmd to initiate a batch job
2525 * IN job_ptr - pointer to job that will be initiated
2526 */
launch_job(job_record_t * job_ptr)2527 extern void launch_job(job_record_t *job_ptr)
2528 {
2529 batch_job_launch_msg_t *launch_msg_ptr;
2530 uint16_t protocol_version = NO_VAL16;
2531 agent_arg_t *agent_arg_ptr;
2532 job_record_t *launch_job_ptr;
2533 #ifdef HAVE_FRONT_END
2534 front_end_record_t *front_end_ptr;
2535 #else
2536 node_record_t *node_ptr;
2537 #endif
2538
2539 xassert(job_ptr);
2540 xassert(job_ptr->batch_flag);
2541
2542 if (job_ptr->total_cpus == 0)
2543 return;
2544
2545 launch_job_ptr = _het_job_ready(job_ptr);
2546 if (!launch_job_ptr)
2547 return;
2548
2549 if (pick_batch_host(launch_job_ptr) != SLURM_SUCCESS)
2550 return;
2551
2552 #ifdef HAVE_FRONT_END
2553 front_end_ptr = find_front_end_record(job_ptr->batch_host);
2554 if (front_end_ptr)
2555 protocol_version = front_end_ptr->protocol_version;
2556 #else
2557 node_ptr = find_node_record(job_ptr->batch_host);
2558 if (node_ptr)
2559 protocol_version = node_ptr->protocol_version;
2560 #endif
2561
2562 (void)build_batch_step(job_ptr);
2563
2564 launch_msg_ptr = _build_launch_job_msg(launch_job_ptr,protocol_version);
2565 if (launch_msg_ptr == NULL)
2566 return;
2567 if (launch_job_ptr->het_job_id)
2568 _set_het_job_env(launch_job_ptr, launch_msg_ptr);
2569
2570 agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
2571 agent_arg_ptr->protocol_version = protocol_version;
2572 agent_arg_ptr->node_count = 1;
2573 agent_arg_ptr->retry = 0;
2574 xassert(job_ptr->batch_host);
2575 agent_arg_ptr->hostlist = hostlist_create(launch_job_ptr->batch_host);
2576 agent_arg_ptr->msg_type = REQUEST_BATCH_JOB_LAUNCH;
2577 agent_arg_ptr->msg_args = (void *) launch_msg_ptr;
2578
2579 /* Launch the RPC via agent */
2580 agent_queue_request(agent_arg_ptr);
2581 }
2582
2583 /*
2584 * make_batch_job_cred - add a job credential to the batch_job_launch_msg
2585 * IN/OUT launch_msg_ptr - batch_job_launch_msg in which job_id, step_id,
2586 * uid and nodes have already been set
2587 * IN job_ptr - pointer to job record
2588 * RET 0 or error code
2589 */
make_batch_job_cred(batch_job_launch_msg_t * launch_msg_ptr,job_record_t * job_ptr,uint16_t protocol_version)2590 extern int make_batch_job_cred(batch_job_launch_msg_t *launch_msg_ptr,
2591 job_record_t *job_ptr,
2592 uint16_t protocol_version)
2593 {
2594 slurm_cred_arg_t cred_arg;
2595 job_resources_t *job_resrcs_ptr;
2596
2597 xassert(job_ptr->job_resrcs);
2598 job_resrcs_ptr = job_ptr->job_resrcs;
2599
2600 if (job_ptr->job_resrcs == NULL) {
2601 error("%s: %pJ is missing job_resrcs info",
2602 __func__, job_ptr);
2603 return SLURM_ERROR;
2604 }
2605
2606 memset(&cred_arg, 0, sizeof(slurm_cred_arg_t));
2607
2608 cred_arg.jobid = launch_msg_ptr->job_id;
2609 cred_arg.stepid = launch_msg_ptr->step_id;
2610 cred_arg.uid = launch_msg_ptr->uid;
2611 cred_arg.gid = launch_msg_ptr->gid;
2612 cred_arg.pw_name = launch_msg_ptr->user_name;
2613 cred_arg.x11 = job_ptr->details->x11;
2614 cred_arg.job_constraints = job_ptr->details->features;
2615 cred_arg.job_hostlist = job_resrcs_ptr->nodes;
2616 cred_arg.job_core_bitmap = job_resrcs_ptr->core_bitmap;
2617 cred_arg.job_core_spec = job_ptr->details->core_spec;
2618 cred_arg.job_mem_limit = job_ptr->details->pn_min_memory;
2619 cred_arg.job_nhosts = job_resrcs_ptr->nhosts;
2620 cred_arg.job_gres_list = job_ptr->gres_list;
2621 /* cred_arg.step_gres_list = NULL; */
2622
2623 #ifdef HAVE_FRONT_END
2624 xassert(job_ptr->batch_host);
2625 cred_arg.step_hostlist = job_ptr->batch_host;
2626 #else
2627 cred_arg.step_hostlist = launch_msg_ptr->nodes;
2628 #endif
2629 cred_arg.step_core_bitmap = job_resrcs_ptr->core_bitmap;
2630 cred_arg.step_mem_limit = job_ptr->details->pn_min_memory;
2631
2632 cred_arg.cores_per_socket = job_resrcs_ptr->cores_per_socket;
2633 cred_arg.sockets_per_node = job_resrcs_ptr->sockets_per_node;
2634 cred_arg.sock_core_rep_count = job_resrcs_ptr->sock_core_rep_count;
2635
2636 launch_msg_ptr->cred = slurm_cred_create(slurmctld_config.cred_ctx,
2637 &cred_arg, protocol_version);
2638
2639 if (launch_msg_ptr->cred)
2640 return SLURM_SUCCESS;
2641 error("slurm_cred_create failure for batch job %u", cred_arg.jobid);
2642 return SLURM_ERROR;
2643 }
2644
2645 /*
2646 * Copy a job's dependency list
2647 * IN depend_list_src - a job's depend_lst
2648 * RET copy of depend_list_src, must bee freed by caller
2649 */
depended_list_copy(List depend_list_src)2650 extern List depended_list_copy(List depend_list_src)
2651 {
2652 depend_spec_t *dep_src, *dep_dest;
2653 ListIterator iter;
2654 List depend_list_dest = NULL;
2655
2656 if (!depend_list_src)
2657 return depend_list_dest;
2658
2659 depend_list_dest = list_create(xfree_ptr);
2660 iter = list_iterator_create(depend_list_src);
2661 while ((dep_src = list_next(iter))) {
2662 dep_dest = xmalloc(sizeof(depend_spec_t));
2663 memcpy(dep_dest, dep_src, sizeof(depend_spec_t));
2664 list_append(depend_list_dest, dep_dest);
2665 }
2666 list_iterator_destroy(iter);
2667 return depend_list_dest;
2668 }
2669
_depend_type2str(depend_spec_t * dep_ptr)2670 static char *_depend_type2str(depend_spec_t *dep_ptr)
2671 {
2672 xassert(dep_ptr);
2673
2674 switch (dep_ptr->depend_type) {
2675 case SLURM_DEPEND_AFTER:
2676 return "after";
2677 case SLURM_DEPEND_AFTER_ANY:
2678 return "afterany";
2679 case SLURM_DEPEND_AFTER_NOT_OK:
2680 return "afternotok";
2681 case SLURM_DEPEND_AFTER_OK:
2682 return "afterok";
2683 case SLURM_DEPEND_AFTER_CORRESPOND:
2684 return "aftercorr";
2685 case SLURM_DEPEND_EXPAND:
2686 return "expand";
2687 case SLURM_DEPEND_BURST_BUFFER:
2688 return "afterburstbuffer";
2689 case SLURM_DEPEND_SINGLETON:
2690 return "singleton";
2691 default:
2692 return "unknown";
2693 }
2694 }
2695
_depend_state_str2state(char * state_str)2696 static uint32_t _depend_state_str2state(char *state_str)
2697 {
2698 if (!xstrcasecmp(state_str, "fulfilled"))
2699 return DEPEND_FULFILLED;
2700 if (!xstrcasecmp(state_str, "failed"))
2701 return DEPEND_FAILED;
2702 /* Default to not fulfilled */
2703 return DEPEND_NOT_FULFILLED;
2704 }
2705
_depend_state2str(depend_spec_t * dep_ptr)2706 static char *_depend_state2str(depend_spec_t *dep_ptr)
2707 {
2708 xassert(dep_ptr);
2709
2710 switch(dep_ptr->depend_state) {
2711 case DEPEND_NOT_FULFILLED:
2712 return "unfulfilled";
2713 case DEPEND_FULFILLED:
2714 return "fulfilled";
2715 case DEPEND_FAILED:
2716 return "failed";
2717 default:
2718 return "unknown";
2719 }
2720 }
2721
_depend_list2str(job_record_t * job_ptr,bool set_or_flag)2722 static void _depend_list2str(job_record_t *job_ptr, bool set_or_flag)
2723 {
2724 ListIterator depend_iter;
2725 depend_spec_t *dep_ptr;
2726 char *dep_str, *sep = "";
2727
2728 if (job_ptr->details == NULL)
2729 return;
2730
2731 xfree(job_ptr->details->dependency);
2732
2733 if (job_ptr->details->depend_list == NULL
2734 || list_count(job_ptr->details->depend_list) == 0)
2735 return;
2736
2737 depend_iter = list_iterator_create(job_ptr->details->depend_list);
2738 while ((dep_ptr = list_next(depend_iter))) {
2739 /*
2740 * Show non-fulfilled (including failed) dependencies, but don't
2741 * show fulfilled dependencies.
2742 */
2743 if (dep_ptr->depend_state == DEPEND_FULFILLED)
2744 continue;
2745 if (dep_ptr->depend_type == SLURM_DEPEND_SINGLETON) {
2746 xstrfmtcat(job_ptr->details->dependency,
2747 "%ssingleton(%s)",
2748 sep, _depend_state2str(dep_ptr));
2749 } else {
2750 dep_str = _depend_type2str(dep_ptr);
2751
2752 if (dep_ptr->array_task_id == INFINITE)
2753 xstrfmtcat(job_ptr->details->dependency, "%s%s:%u_*",
2754 sep, dep_str, dep_ptr->job_id);
2755 else if (dep_ptr->array_task_id == NO_VAL)
2756 xstrfmtcat(job_ptr->details->dependency, "%s%s:%u",
2757 sep, dep_str, dep_ptr->job_id);
2758 else
2759 xstrfmtcat(job_ptr->details->dependency, "%s%s:%u_%u",
2760 sep, dep_str, dep_ptr->job_id,
2761 dep_ptr->array_task_id);
2762
2763 if (dep_ptr->depend_time)
2764 xstrfmtcat(job_ptr->details->dependency,
2765 "+%u", dep_ptr->depend_time / 60);
2766 xstrfmtcat(job_ptr->details->dependency, "(%s)",
2767 _depend_state2str(dep_ptr));
2768 }
2769 if (set_or_flag)
2770 dep_ptr->depend_flags |= SLURM_FLAGS_OR;
2771 if (dep_ptr->depend_flags & SLURM_FLAGS_OR)
2772 sep = "?";
2773 else
2774 sep = ",";
2775 }
2776 list_iterator_destroy(depend_iter);
2777 }
2778
2779 /* Print a job's dependency information based upon job_ptr->depend_list */
print_job_dependency(job_record_t * job_ptr,const char * func)2780 extern void print_job_dependency(job_record_t *job_ptr, const char *func)
2781 {
2782 if ((job_ptr->details == NULL) ||
2783 (job_ptr->details->depend_list == NULL)) {
2784 info("%s: %pJ has no dependency.", func, job_ptr);
2785 return;
2786 }
2787 _depend_list2str(job_ptr, false);
2788 info("%s: Dependency information for %pJ:\n %s",
2789 func, job_ptr, job_ptr->details->dependency);
2790 }
2791
_test_job_dependency_common(bool is_complete,bool is_completed,bool is_pending,bool * clear_dep,bool * depends,bool * failure,job_record_t * job_ptr,struct depend_spec * dep_ptr)2792 static int _test_job_dependency_common(
2793 bool is_complete, bool is_completed, bool is_pending,
2794 bool *clear_dep, bool *depends, bool *failure,
2795 job_record_t *job_ptr, struct depend_spec *dep_ptr)
2796 {
2797 int rc = 0;
2798 job_record_t *djob_ptr = dep_ptr->job_ptr;
2799 time_t now = time(NULL);
2800
2801 xassert(clear_dep);
2802 xassert(depends);
2803 xassert(failure);
2804
2805 if (dep_ptr->depend_type == SLURM_DEPEND_AFTER) {
2806 if (!is_pending) {
2807 if (!dep_ptr->depend_time ||
2808 (djob_ptr->start_time &&
2809 ((now - djob_ptr->start_time) >=
2810 dep_ptr->depend_time)) ||
2811 fed_mgr_job_started_on_sib(djob_ptr)) {
2812 *clear_dep = true;
2813 } else
2814 *depends = true;
2815 } else
2816 *depends = true;
2817 rc = 1;
2818 } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_ANY) {
2819 if (is_completed)
2820 *clear_dep = true;
2821 else
2822 *depends = true;
2823 rc = 1;
2824 } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_NOT_OK) {
2825 if (djob_ptr->job_state & JOB_SPECIAL_EXIT)
2826 *clear_dep = true;
2827 else if (!is_completed)
2828 *depends = true;
2829 else if (!is_complete)
2830 *clear_dep = true;
2831 else
2832 *failure = true;
2833 rc = 1;
2834 } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_OK) {
2835 if (!is_completed)
2836 *depends = true;
2837 else if (is_complete)
2838 *clear_dep = true;
2839 else
2840 *failure = true;
2841 rc = 1;
2842 } else if (dep_ptr->depend_type == SLURM_DEPEND_AFTER_CORRESPOND) {
2843 job_record_t *dcjob_ptr = NULL;
2844 if ((job_ptr->array_task_id == NO_VAL) ||
2845 (job_ptr->array_task_id == INFINITE))
2846 dcjob_ptr = NULL;
2847 else
2848 dcjob_ptr = find_job_array_rec(dep_ptr->job_id,
2849 job_ptr->array_task_id);
2850
2851 if (dcjob_ptr) {
2852 if (!IS_JOB_COMPLETED(dcjob_ptr))
2853 *depends = true;
2854 else if (IS_JOB_COMPLETE(dcjob_ptr))
2855 *clear_dep = true;
2856 else
2857 *failure = true;
2858 } else {
2859 if (!is_completed)
2860 *depends = true;
2861 else if (is_complete)
2862 *clear_dep = true;
2863 else if (job_ptr->array_recs &&
2864 (job_ptr->array_task_id == NO_VAL))
2865 *depends = true;
2866 else
2867 *failure = true;
2868 }
2869 rc = 1;
2870 } else if (dep_ptr->depend_type == SLURM_DEPEND_BURST_BUFFER) {
2871 if (is_completed &&
2872 (bb_g_job_test_stage_out(djob_ptr) == 1))
2873 *clear_dep = true;
2874 else
2875 *depends = true;
2876 rc = 1;
2877 } else if (dep_ptr->depend_type == SLURM_DEPEND_EXPAND) {
2878 time_t now = time(NULL);
2879 if (is_pending) {
2880 *depends = true;
2881 } else if (is_completed)
2882 *failure = true;
2883 else if ((djob_ptr->end_time != 0) &&
2884 (djob_ptr->end_time > now)) {
2885 job_ptr->time_limit = djob_ptr->end_time - now;
2886 job_ptr->time_limit /= 60; /* sec to min */
2887 *clear_dep = true;
2888 }
2889 if (!*failure && job_ptr->details && djob_ptr->details) {
2890 job_ptr->details->share_res =
2891 djob_ptr->details->share_res;
2892 job_ptr->details->whole_node =
2893 djob_ptr->details->whole_node;
2894 }
2895 rc = 1;
2896 }
2897
2898 return rc;
2899 }
2900
_test_dependency_state(depend_spec_t * dep_ptr,bool * or_satisfied,bool * and_failed,bool * or_flag,bool * has_unfulfilled)2901 static void _test_dependency_state(depend_spec_t *dep_ptr, bool *or_satisfied,
2902 bool *and_failed, bool *or_flag,
2903 bool *has_unfulfilled)
2904 {
2905 xassert(or_satisfied);
2906 xassert(and_failed);
2907 xassert(or_flag);
2908 xassert(has_unfulfilled);
2909
2910 *or_flag = (dep_ptr->depend_flags & SLURM_FLAGS_OR) ? true : false;
2911
2912 if (*or_flag) {
2913 if (dep_ptr->depend_state == DEPEND_FULFILLED)
2914 *or_satisfied = true;
2915 else if (dep_ptr->depend_state == DEPEND_NOT_FULFILLED)
2916 *has_unfulfilled = true;
2917 } else { /* AND'd dependencies */
2918 if (dep_ptr->depend_state == DEPEND_FAILED)
2919 *and_failed = true;
2920 else if (dep_ptr->depend_state == DEPEND_NOT_FULFILLED)
2921 *has_unfulfilled = true;
2922 }
2923 }
2924
2925 /*
2926 * Determine if a job's dependencies are met
2927 * Inputs: job_ptr
2928 * Outputs: was_changed (optional) -
2929 * If it exists, set it to true if at least 1 dependency changed
2930 * state, otherwise false.
2931 * RET: NO_DEPEND = no dependencies
2932 * LOCAL_DEPEND = local dependencies remain
2933 * FAIL_DEPEND = failure (job completion code not per dependency),
2934 * delete the job
2935 * REMOTE_DEPEND = only remote dependencies remain
2936 */
test_job_dependency(job_record_t * job_ptr,bool * was_changed)2937 extern int test_job_dependency(job_record_t *job_ptr, bool *was_changed)
2938 {
2939 ListIterator depend_iter;
2940 depend_spec_t *dep_ptr;
2941 bool has_local_depend = false;
2942 int results = NO_DEPEND;
2943 job_record_t *djob_ptr;
2944 bool is_complete, is_completed, is_pending;
2945 bool or_satisfied = false, and_failed = false, or_flag = false,
2946 has_unfulfilled = false, changed = false;
2947
2948 if ((job_ptr->details == NULL) ||
2949 (job_ptr->details->depend_list == NULL) ||
2950 (list_count(job_ptr->details->depend_list) == 0)) {
2951 job_ptr->bit_flags &= ~JOB_DEPENDENT;
2952 if (was_changed)
2953 *was_changed = changed;
2954 return NO_DEPEND;
2955 }
2956
2957 depend_iter = list_iterator_create(job_ptr->details->depend_list);
2958 while ((dep_ptr = list_next(depend_iter))) {
2959 bool clear_dep = false, depends = false, failure = false;
2960 bool remote;
2961
2962 remote = (dep_ptr->depend_flags & SLURM_FLAGS_REMOTE) ?
2963 true : false;
2964 /*
2965 * If the job id is for a cluster that's not in the federation
2966 * (it's likely the cluster left the federation), then set
2967 * this dependency's state to failed.
2968 */
2969 if (remote) {
2970 if (fed_mgr_is_origin_job(job_ptr) &&
2971 (dep_ptr->depend_state == DEPEND_NOT_FULFILLED) &&
2972 (dep_ptr->depend_type != SLURM_DEPEND_SINGLETON) &&
2973 (!fed_mgr_is_job_id_in_fed(dep_ptr->job_id))) {
2974 if (slurmctld_conf.debug_flags &
2975 DEBUG_FLAG_DEPENDENCY)
2976 info("%s: %pJ dependency %s:%u failed due to job_id not in federation.",
2977 __func__, job_ptr,
2978 _depend_type2str(dep_ptr),
2979 dep_ptr->job_id);
2980 changed = true;
2981 dep_ptr->depend_state = DEPEND_FAILED;
2982 }
2983 }
2984 if ((dep_ptr->depend_state != DEPEND_NOT_FULFILLED) || remote) {
2985 _test_dependency_state(dep_ptr, &or_satisfied,
2986 &and_failed, &or_flag,
2987 &has_unfulfilled);
2988 continue;
2989 }
2990
2991 /* Test local, unfulfilled dependency: */
2992 has_local_depend = true;
2993 dep_ptr->job_ptr = find_job_array_rec(dep_ptr->job_id,
2994 dep_ptr->array_task_id);
2995 djob_ptr = dep_ptr->job_ptr;
2996 if ((dep_ptr->depend_type == SLURM_DEPEND_SINGLETON) &&
2997 job_ptr->name) {
2998 if (list_find_first(job_list, _find_singleton_job,
2999 job_ptr) ||
3000 !fed_mgr_is_singleton_satisfied(job_ptr,
3001 dep_ptr, true))
3002 depends = true;
3003 else
3004 clear_dep = true;
3005 } else if ((djob_ptr == NULL) ||
3006 (djob_ptr->magic != JOB_MAGIC) ||
3007 ((djob_ptr->job_id != dep_ptr->job_id) &&
3008 (djob_ptr->array_job_id != dep_ptr->job_id))) {
3009 /* job is gone, dependency lifted */
3010 clear_dep = true;
3011 } else {
3012 /* Special case, apply test to job array as a whole */
3013 if (dep_ptr->array_task_id == INFINITE) {
3014 is_complete = test_job_array_complete(
3015 dep_ptr->job_id);
3016 is_completed = test_job_array_completed(
3017 dep_ptr->job_id);
3018 is_pending = test_job_array_pending(
3019 dep_ptr->job_id);
3020 } else {
3021 /* Normal job */
3022 is_complete = IS_JOB_COMPLETE(djob_ptr);
3023 is_completed = IS_JOB_COMPLETED(djob_ptr);
3024 is_pending = IS_JOB_PENDING(djob_ptr);
3025 }
3026
3027 if (!_test_job_dependency_common(
3028 is_complete, is_completed, is_pending,
3029 &clear_dep, &depends, &failure,
3030 job_ptr, dep_ptr))
3031 failure = true;
3032 }
3033
3034 if (failure) {
3035 dep_ptr->depend_state = DEPEND_FAILED;
3036 changed = true;
3037 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3038 info("%s: %pJ dependency %s:%u failed.",
3039 __func__, job_ptr,
3040 _depend_type2str(dep_ptr),
3041 dep_ptr->job_id);
3042 } else if (clear_dep) {
3043 dep_ptr->depend_state = DEPEND_FULFILLED;
3044 changed = true;
3045 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3046 info("%s: %pJ dependency %s:%u fulfilled.",
3047 __func__, job_ptr,
3048 _depend_type2str(dep_ptr),
3049 dep_ptr->job_id);
3050 }
3051
3052 _test_dependency_state(dep_ptr, &or_satisfied, &and_failed,
3053 &or_flag, &has_unfulfilled);
3054 }
3055 list_iterator_destroy(depend_iter);
3056
3057 if (or_satisfied && (job_ptr->state_reason == WAIT_DEP_INVALID)) {
3058 job_ptr->state_reason = WAIT_NO_REASON;
3059 xfree(job_ptr->state_desc);
3060 }
3061
3062 if (or_satisfied || (!or_flag && !and_failed && !has_unfulfilled)) {
3063 /* Dependency fulfilled */
3064 fed_mgr_remove_remote_dependencies(job_ptr);
3065 job_ptr->bit_flags &= ~JOB_DEPENDENT;
3066 /*
3067 * Don't flush the list if this job isn't on the origin - that
3068 * means that we were called from
3069 * fed_mgr_test_remote_dependencies() and need to send back the
3070 * dependency list to the origin.
3071 */
3072 if (fed_mgr_is_origin_job(job_ptr))
3073 list_flush(job_ptr->details->depend_list);
3074 _depend_list2str(job_ptr, false);
3075 results = NO_DEPEND;
3076 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3077 info("%s: %pJ dependency fulfilled", __func__, job_ptr);
3078 } else {
3079 if (changed) {
3080 _depend_list2str(job_ptr, false);
3081 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3082 print_job_dependency(job_ptr, __func__);
3083 }
3084 job_ptr->bit_flags |= JOB_DEPENDENT;
3085 acct_policy_remove_accrue_time(job_ptr, false);
3086 if (and_failed || (or_flag && !has_unfulfilled))
3087 /* Dependency failed */
3088 results = FAIL_DEPEND;
3089 else
3090 /* Still dependent */
3091 results = has_local_depend ? LOCAL_DEPEND :
3092 REMOTE_DEPEND;
3093 }
3094
3095 if (was_changed)
3096 *was_changed = changed;
3097 return results;
3098 }
3099
3100 /* Given a new job dependency specification, expand job array specifications
3101 * into a collection of task IDs that update_job_dependency can parse.
3102 * (e.g. "after:123_[4-5]" to "after:123_4:123_5")
3103 * Returns NULL if not valid job array specification.
3104 * Returned value must be xfreed. */
_xlate_array_dep(char * new_depend)3105 static char *_xlate_array_dep(char *new_depend)
3106 {
3107 char *new_array_dep = NULL, *array_tmp, *jobid_ptr = NULL, *sep;
3108 bitstr_t *array_bitmap;
3109 int i;
3110 uint32_t job_id;
3111 int32_t t, t_first, t_last;
3112
3113 if (strstr(new_depend, "_[") == NULL)
3114 return NULL; /* No job array expressions */
3115
3116 if (max_array_size == NO_VAL) {
3117 max_array_size = slurmctld_conf.max_array_sz;
3118 }
3119
3120 for (i = 0; new_depend[i]; i++) {
3121 xstrfmtcat(new_array_dep, "%c", new_depend[i]);
3122 if ((new_depend[i] >= '0') && (new_depend[i] <= '9')) {
3123 if (jobid_ptr == NULL)
3124 jobid_ptr = new_depend + i;
3125 } else if ((new_depend[i] == '_') && (new_depend[i+1] == '[') &&
3126 (jobid_ptr != NULL)) {
3127 job_id = (uint32_t) atol(jobid_ptr);
3128 i += 2; /* Skip over "_[" */
3129 array_tmp = xstrdup(new_depend + i);
3130 sep = strchr(array_tmp, ']');
3131 if (sep)
3132 sep[0] = '\0';
3133 array_bitmap = bit_alloc(max_array_size);
3134 if ((sep == NULL) ||
3135 (bit_unfmt(array_bitmap, array_tmp) != 0) ||
3136 ((t_first = bit_ffs(array_bitmap)) == -1)) {
3137 /* Invalid format */
3138 xfree(array_tmp);
3139 bit_free(array_bitmap);
3140 xfree(new_array_dep);
3141 return NULL;
3142 }
3143 i += (sep - array_tmp); /* Move to location of ']' */
3144 xfree(array_tmp);
3145 t_last = bit_fls(array_bitmap);
3146 for (t = t_first; t <= t_last; t++) {
3147 if (!bit_test(array_bitmap, t))
3148 continue;
3149 if (t == t_first) {
3150 xstrfmtcat(new_array_dep, "%d", t);
3151 } else {
3152 xstrfmtcat(new_array_dep, ":%u_%d",
3153 job_id, t);
3154 }
3155 }
3156 bit_free(array_bitmap);
3157 jobid_ptr = NULL;
3158 } else {
3159 jobid_ptr = NULL;
3160 }
3161 }
3162
3163 return new_array_dep;
3164 }
3165
3166 /* Copy dependent job's TRES options into another job's options */
_copy_tres_opts(job_record_t * job_ptr,job_record_t * dep_job_ptr)3167 static void _copy_tres_opts(job_record_t *job_ptr, job_record_t *dep_job_ptr)
3168 {
3169 xfree(job_ptr->cpus_per_tres);
3170 job_ptr->cpus_per_tres = xstrdup(dep_job_ptr->cpus_per_tres);
3171 xfree(job_ptr->tres_per_job);
3172 job_ptr->tres_per_job = xstrdup(dep_job_ptr->tres_per_job);
3173 xfree(job_ptr->tres_per_node);
3174 job_ptr->tres_per_node = xstrdup(dep_job_ptr->tres_per_node);
3175 xfree(job_ptr->tres_per_socket);
3176 job_ptr->tres_per_socket = xstrdup(dep_job_ptr->tres_per_socket);
3177 xfree(job_ptr->tres_per_task);
3178 job_ptr->tres_per_task = xstrdup(dep_job_ptr->tres_per_task);
3179 xfree(job_ptr->mem_per_tres);
3180 job_ptr->mem_per_tres = xstrdup(dep_job_ptr->mem_per_tres);
3181 }
3182
_find_dependency(void * arg,void * key)3183 static int _find_dependency(void *arg, void *key)
3184 {
3185 /* Does arg (dependency in the list) match key (new dependency)? */
3186 depend_spec_t *dep_ptr = (depend_spec_t *)arg;
3187 depend_spec_t *new_dep = (depend_spec_t *)key;
3188 return (dep_ptr->job_id == new_dep->job_id) &&
3189 (dep_ptr->array_task_id == new_dep->array_task_id) &&
3190 (dep_ptr->depend_type == new_dep->depend_type);
3191 }
3192
find_dependency(job_record_t * job_ptr,depend_spec_t * dep_ptr)3193 extern depend_spec_t *find_dependency(job_record_t *job_ptr,
3194 depend_spec_t *dep_ptr)
3195 {
3196 if (!job_ptr->details || !job_ptr->details->depend_list)
3197 return NULL;
3198 return list_find_first(job_ptr->details->depend_list,
3199 _find_dependency, dep_ptr);
3200 }
3201
3202 /*
3203 * Add a new dependency to the list, ensuring that the list is unique.
3204 * Dependencies are uniquely identified by a combination of job_id and
3205 * depend_type.
3206 */
_add_dependency_to_list(List depend_list,depend_spec_t * dep_ptr)3207 static void _add_dependency_to_list(List depend_list,
3208 depend_spec_t *dep_ptr)
3209 {
3210 if (!list_find_first(depend_list, _find_dependency, dep_ptr))
3211 list_append(depend_list, dep_ptr);
3212 }
3213
_parse_depend_state(char ** str_ptr,uint32_t * depend_state)3214 static int _parse_depend_state(char **str_ptr, uint32_t *depend_state)
3215 {
3216 char *sep_ptr;
3217
3218 if ((sep_ptr = strchr(*str_ptr, '('))) {
3219 /* Get the whole string before ")", convert to state */
3220 char *paren = strchr(*str_ptr, ')');
3221 if (!paren)
3222 return SLURM_ERROR;
3223 else
3224 *paren = '\0';
3225 sep_ptr++; /* skip over "(" */
3226 *depend_state = _depend_state_str2state(sep_ptr);
3227 /* Don't allow depend_fulfilled as a string. */
3228 if (*depend_state != DEPEND_FAILED)
3229 *depend_state = DEPEND_NOT_FULFILLED;
3230 *str_ptr = paren + 1; /* skip over ")" */
3231 } else
3232 *depend_state = DEPEND_NOT_FULFILLED;
3233
3234 return SLURM_SUCCESS;
3235 }
3236
_find_dependent_job_ptr(uint32_t job_id,uint32_t * array_task_id)3237 static job_record_t *_find_dependent_job_ptr(uint32_t job_id,
3238 uint32_t *array_task_id)
3239 {
3240 job_record_t *dep_job_ptr;
3241
3242 if (*array_task_id == NO_VAL) {
3243 dep_job_ptr = find_job_record(job_id);
3244 if (!dep_job_ptr)
3245 dep_job_ptr = find_job_array_rec(job_id, INFINITE);
3246 if (dep_job_ptr &&
3247 (dep_job_ptr->array_job_id == job_id) &&
3248 ((dep_job_ptr->array_task_id != NO_VAL) ||
3249 (dep_job_ptr->array_recs != NULL)))
3250 *array_task_id = INFINITE;
3251 } else
3252 dep_job_ptr = find_job_array_rec(job_id, *array_task_id);
3253
3254 return dep_job_ptr;
3255 }
3256
3257 /*
3258 * The new dependency format is:
3259 *
3260 * <type:job_id[:job_id][,type:job_id[:job_id]]> or
3261 * <type:job_id[:job_id][?type:job_id[:job_id]]>
3262 *
3263 * This function parses the all job id's within a single dependency type.
3264 * One char past the end of valid job id's is returned in (*sep_ptr2).
3265 * Set (*rc) to ESLURM_DEPENDENCY for invalid job id's.
3266 */
_parse_dependency_jobid_new(job_record_t * job_ptr,List new_depend_list,char * sep_ptr,char ** sep_ptr2,char * tok,uint16_t depend_type,int select_hetero,int * rc)3267 static void _parse_dependency_jobid_new(job_record_t *job_ptr,
3268 List new_depend_list, char *sep_ptr,
3269 char **sep_ptr2, char *tok,
3270 uint16_t depend_type, int select_hetero,
3271 int *rc)
3272 {
3273 depend_spec_t *dep_ptr;
3274 job_record_t *dep_job_ptr = NULL;
3275 int expand_cnt = 0;
3276 uint32_t job_id, array_task_id, depend_state;
3277 char *tmp = NULL;
3278 int depend_time = 0;
3279
3280 while (!(*rc)) {
3281 job_id = strtol(sep_ptr, &tmp, 10);
3282 if ((tmp != NULL) && (tmp[0] == '_')) {
3283 if (tmp[1] == '*') {
3284 array_task_id = INFINITE;
3285 tmp += 2; /* Past "_*" */
3286 } else {
3287 array_task_id = strtol(tmp+1,
3288 &tmp, 10);
3289 }
3290 } else
3291 array_task_id = NO_VAL;
3292 if ((tmp == NULL) ||
3293 (job_id == 0) || (job_id == job_ptr->job_id) ||
3294 ((tmp[0] != '\0') && (tmp[0] != ',') &&
3295 (tmp[0] != '?') && (tmp[0] != ':') &&
3296 (tmp[0] != '+') && (tmp[0] != '('))) {
3297 *rc = ESLURM_DEPENDENCY;
3298 break;
3299 }
3300 dep_job_ptr = _find_dependent_job_ptr(job_id, &array_task_id);
3301 if ((depend_type == SLURM_DEPEND_EXPAND) &&
3302 ((expand_cnt++ > 0) || (dep_job_ptr == NULL) ||
3303 (!IS_JOB_RUNNING(dep_job_ptr)) ||
3304 (dep_job_ptr->qos_id != job_ptr->qos_id) ||
3305 (dep_job_ptr->part_ptr == NULL) ||
3306 (job_ptr->part_ptr == NULL) ||
3307 (dep_job_ptr->part_ptr != job_ptr->part_ptr))) {
3308 /*
3309 * Expand only jobs in the same QOS and
3310 * and partition
3311 */
3312 *rc = ESLURM_DEPENDENCY;
3313 break;
3314 }
3315
3316 if (tmp[0] == '+') {
3317 sep_ptr = &tmp[1]; /* skip over "+" */
3318 depend_time = strtol(sep_ptr, &tmp, 10);
3319
3320 if (depend_time <= 0) {
3321 *rc = ESLURM_DEPENDENCY;
3322 break;
3323 }
3324 depend_time *= 60;
3325 }
3326
3327 if (_parse_depend_state(&tmp, &depend_state)) {
3328 *rc = ESLURM_DEPENDENCY;
3329 break;
3330 }
3331
3332 if (depend_type == SLURM_DEPEND_EXPAND) {
3333 assoc_mgr_lock_t locks = { .tres = READ_LOCK };
3334 uint16_t sockets_per_node = NO_VAL16;
3335 multi_core_data_t *mc_ptr;
3336
3337 if ((mc_ptr = job_ptr->details->mc_ptr)) {
3338 sockets_per_node =
3339 mc_ptr->sockets_per_node;
3340 }
3341 job_ptr->details->expanding_jobid = job_id;
3342 if (select_hetero == 0) {
3343 /*
3344 * GRES per node of this job must match
3345 * the job being expanded. Other options
3346 * are ignored.
3347 */
3348 _copy_tres_opts(job_ptr, dep_job_ptr);
3349 }
3350 FREE_NULL_LIST(job_ptr->gres_list);
3351 (void) gres_plugin_job_state_validate(
3352 job_ptr->cpus_per_tres,
3353 job_ptr->tres_freq,
3354 job_ptr->tres_per_job,
3355 job_ptr->tres_per_node,
3356 job_ptr->tres_per_socket,
3357 job_ptr->tres_per_task,
3358 job_ptr->mem_per_tres,
3359 &job_ptr->details->num_tasks,
3360 &job_ptr->details->min_nodes,
3361 &job_ptr->details->max_nodes,
3362 &job_ptr->details->
3363 ntasks_per_node,
3364 &job_ptr->details->mc_ptr->
3365 ntasks_per_socket,
3366 &sockets_per_node,
3367 &job_ptr->details->
3368 cpus_per_task,
3369 &job_ptr->gres_list);
3370 if (mc_ptr && (sockets_per_node != NO_VAL16)) {
3371 mc_ptr->sockets_per_node =
3372 sockets_per_node;
3373 }
3374 assoc_mgr_lock(&locks);
3375 gres_set_job_tres_cnt(job_ptr->gres_list,
3376 job_ptr->details->min_nodes,
3377 job_ptr->tres_req_cnt,
3378 true);
3379 xfree(job_ptr->tres_req_str);
3380 job_ptr->tres_req_str =
3381 assoc_mgr_make_tres_str_from_array(
3382 job_ptr->tres_req_cnt,
3383 TRES_STR_FLAG_SIMPLE, true);
3384 assoc_mgr_unlock(&locks);
3385 }
3386
3387 dep_ptr = xmalloc(sizeof(depend_spec_t));
3388 dep_ptr->array_task_id = array_task_id;
3389 dep_ptr->depend_type = depend_type;
3390 if (job_ptr->fed_details && !fed_mgr_is_origin_job_id(job_id)) {
3391 if (depend_type == SLURM_DEPEND_EXPAND) {
3392 error("%s: Job expansion not permitted for remote jobs",
3393 __func__);
3394 *rc = ESLURM_DEPENDENCY;
3395 xfree(dep_ptr);
3396 break;
3397 }
3398 /* The dependency is on a remote cluster */
3399 dep_ptr->depend_flags |= SLURM_FLAGS_REMOTE;
3400 dep_job_ptr = NULL;
3401 }
3402 if (dep_job_ptr) { /* job still active */
3403 if (array_task_id == NO_VAL)
3404 dep_ptr->job_id = dep_job_ptr->job_id;
3405 else
3406 dep_ptr->job_id = dep_job_ptr->array_job_id;
3407 } else
3408 dep_ptr->job_id = job_id;
3409 dep_ptr->job_ptr = dep_job_ptr;
3410 dep_ptr->depend_time = depend_time;
3411 dep_ptr->depend_state = depend_state;
3412 _add_dependency_to_list(new_depend_list, dep_ptr);
3413 if (tmp[0] != ':')
3414 break;
3415 sep_ptr = tmp + 1; /* skip over ":" */
3416
3417 }
3418 *sep_ptr2 = tmp;
3419 }
3420
3421 /*
3422 * The old dependency format is a comma-separated list of job id's.
3423 * Parse a single jobid.
3424 * One char past the end of a valid job id will be returned in (*sep_ptr).
3425 * For an invalid job id, (*rc) will be set to ESLURM_DEPENDENCY.
3426 */
_parse_dependency_jobid_old(job_record_t * job_ptr,List new_depend_list,char ** sep_ptr,char * tok,int * rc)3427 static void _parse_dependency_jobid_old(job_record_t *job_ptr,
3428 List new_depend_list, char **sep_ptr,
3429 char *tok, int *rc)
3430 {
3431 depend_spec_t *dep_ptr;
3432 job_record_t *dep_job_ptr = NULL;
3433 uint32_t job_id, array_task_id;
3434 char *tmp = NULL;
3435
3436 job_id = strtol(tok, &tmp, 10);
3437 if ((tmp != NULL) && (tmp[0] == '_')) {
3438 if (tmp[1] == '*') {
3439 array_task_id = INFINITE;
3440 tmp += 2; /* Past "_*" */
3441 } else {
3442 array_task_id = strtol(tmp+1, &tmp, 10);
3443 }
3444 } else {
3445 array_task_id = NO_VAL;
3446 }
3447 *sep_ptr = tmp;
3448 if ((tmp == NULL) ||
3449 (job_id == 0) || (job_id == job_ptr->job_id) ||
3450 ((tmp[0] != '\0') && (tmp[0] != ','))) {
3451 *rc = ESLURM_DEPENDENCY;
3452 return;
3453 }
3454 dep_job_ptr = _find_dependent_job_ptr(job_id, &array_task_id);
3455
3456 dep_ptr = xmalloc(sizeof(depend_spec_t));
3457 dep_ptr->array_task_id = array_task_id;
3458 dep_ptr->depend_type = SLURM_DEPEND_AFTER_ANY;
3459 if (job_ptr->fed_details &&
3460 !fed_mgr_is_origin_job_id(job_id)) {
3461 /* The dependency is on a remote cluster */
3462 dep_ptr->depend_flags |= SLURM_FLAGS_REMOTE;
3463 dep_job_ptr = NULL;
3464 }
3465 if (dep_job_ptr) {
3466 if (array_task_id == NO_VAL) {
3467 dep_ptr->job_id = dep_job_ptr->job_id;
3468 } else {
3469 dep_ptr->job_id = dep_job_ptr->array_job_id;
3470 }
3471 } else
3472 dep_ptr->job_id = job_id;
3473 dep_ptr->job_ptr = dep_job_ptr; /* Can be NULL */
3474 _add_dependency_to_list(new_depend_list, dep_ptr);
3475 }
3476
update_job_dependency_list(job_record_t * job_ptr,List new_depend_list)3477 extern bool update_job_dependency_list(job_record_t *job_ptr,
3478 List new_depend_list)
3479 {
3480 depend_spec_t *dep_ptr, *job_depend;
3481 ListIterator itr;
3482 List job_depend_list;
3483 bool was_changed = false;
3484
3485 xassert(job_ptr);
3486 xassert(job_ptr->details);
3487 xassert(job_ptr->details->depend_list);
3488
3489 job_depend_list = job_ptr->details->depend_list;
3490
3491 itr = list_iterator_create(new_depend_list);
3492 while ((dep_ptr = list_next(itr))) {
3493 /*
3494 * If the dependency is marked as remote, then it wasn't updated
3495 * by the sibling cluster. Skip it.
3496 */
3497 if (dep_ptr->depend_flags & SLURM_FLAGS_REMOTE) {
3498 continue;
3499 }
3500 /*
3501 * Find the dependency in job_ptr that matches this one.
3502 * Then update job_ptr's dependency state (not fulfilled,
3503 * fulfilled, or failed) to match this one.
3504 */
3505 job_depend = list_find_first(job_depend_list, _find_dependency,
3506 dep_ptr);
3507 if (!job_depend) {
3508 /*
3509 * This can happen if the job's dependency is updated
3510 * and the update doesn't get to the sibling before
3511 * the sibling sends back an update to the origin (us).
3512 */
3513 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3514 info("%s: Cannot find dependency %s:%u for %pJ, it may have been cleared before we got here.",
3515 __func__, _depend_type2str(dep_ptr),
3516 dep_ptr->job_id, job_ptr);
3517 continue;
3518 }
3519
3520 /*
3521 * If the dependency is already fulfilled, don't update it.
3522 * Otherwise update the dependency state.
3523 */
3524 if ((job_depend->depend_state == DEPEND_FULFILLED) ||
3525 (job_depend->depend_state == dep_ptr->depend_state))
3526 continue;
3527 if (job_depend->depend_type == SLURM_DEPEND_SINGLETON) {
3528 /*
3529 * We need to update the singleton dependency with
3530 * the cluster bit, but test_job_dependency() will test
3531 * if it is fulfilled, so don't change the depend_state
3532 * here.
3533 */
3534 job_depend->singleton_bits |=
3535 dep_ptr->singleton_bits;
3536 if (!fed_mgr_is_singleton_satisfied(job_ptr, job_depend,
3537 false))
3538 continue;
3539 }
3540 job_depend->depend_state = dep_ptr->depend_state;
3541 was_changed = true;
3542 }
3543 list_iterator_destroy(itr);
3544 return was_changed;
3545 }
3546
handle_job_dependency_updates(void * object,void * arg)3547 extern int handle_job_dependency_updates(void *object, void *arg)
3548 {
3549 job_record_t *job_ptr = (job_record_t *) object;
3550 depend_spec_t *dep_ptr = NULL;
3551 ListIterator itr;
3552 bool or_satisfied = false, and_failed = false, or_flag = false,
3553 has_unfulfilled = false;
3554
3555 xassert(job_ptr->details);
3556 xassert(job_ptr->details->depend_list);
3557
3558 /*
3559 * Check the depend_state of each dependency.
3560 * All dependencies are OR'd or AND'd - we don't allow a mix.
3561 * OR'd dependencies:
3562 * - If one dependency succeeded, the whole thing passes.
3563 * - If there is at least one unfulfilled dependency,
3564 * the job is still dependent.
3565 * - All dependencies failed == dependency never satisfied.
3566 * AND'd dependencies:
3567 * - One failure == dependency never satisfied
3568 * - One+ not fulfilled == still dependent
3569 * - All succeeded == dependency fulfilled
3570 */
3571 itr = list_iterator_create(job_ptr->details->depend_list);
3572 while ((dep_ptr = list_next(itr))) {
3573 _test_dependency_state(dep_ptr, &or_satisfied, &and_failed,
3574 &or_flag, &has_unfulfilled);
3575 }
3576 list_iterator_destroy(itr);
3577
3578 if (or_satisfied || (!or_flag && !and_failed && !has_unfulfilled)) {
3579 /* Dependency fulfilled */
3580 fed_mgr_remove_remote_dependencies(job_ptr);
3581 job_ptr->bit_flags &= ~JOB_DEPENDENT;
3582 list_flush(job_ptr->details->depend_list);
3583 if ((job_ptr->state_reason == WAIT_DEP_INVALID) ||
3584 (job_ptr->state_reason == WAIT_DEPENDENCY)) {
3585 job_ptr->state_reason = WAIT_NO_REASON;
3586 xfree(job_ptr->state_desc);
3587 }
3588 _depend_list2str(job_ptr, false);
3589 fed_mgr_job_requeue(job_ptr);
3590 } else {
3591 _depend_list2str(job_ptr, false);
3592 job_ptr->bit_flags |= JOB_DEPENDENT;
3593 acct_policy_remove_accrue_time(job_ptr, false);
3594 if (and_failed || (or_flag && !has_unfulfilled)) {
3595 /* Dependency failed */
3596 handle_invalid_dependency(job_ptr);
3597 } else {
3598 /* Still dependent */
3599 job_ptr->state_reason = WAIT_DEPENDENCY;
3600 xfree(job_ptr->state_desc);
3601 }
3602 }
3603 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3604 print_job_dependency(job_ptr, __func__);
3605
3606 return SLURM_SUCCESS;
3607 }
3608
3609 /*
3610 * Parse a job dependency string and use it to establish a "depend_spec"
3611 * list of dependencies. We accept both old format (a single job ID) and
3612 * new format (e.g. "afterok:123:124,after:128").
3613 * IN job_ptr - job record to have dependency and depend_list updated
3614 * IN new_depend - new dependency description
3615 * RET returns an error code from slurm_errno.h
3616 */
update_job_dependency(job_record_t * job_ptr,char * new_depend)3617 extern int update_job_dependency(job_record_t *job_ptr, char *new_depend)
3618 {
3619 static int select_hetero = -1;
3620 int rc = SLURM_SUCCESS;
3621 uint16_t depend_type = 0;
3622 char *tok, *new_array_dep, *sep_ptr, *sep_ptr2 = NULL;
3623 List new_depend_list = NULL;
3624 depend_spec_t *dep_ptr;
3625 bool or_flag = false;
3626
3627 if (job_ptr->details == NULL)
3628 return EINVAL;
3629
3630 if (select_hetero == -1) {
3631 /*
3632 * Determine if the select plugin supports heterogeneous
3633 * GRES allocations (count differ by node): 1=yes, 0=no
3634 */
3635 char *select_type = slurm_get_select_type();
3636 if (select_type &&
3637 (strstr(select_type, "cons_tres") ||
3638 (strstr(select_type, "cray_aries") &&
3639 (slurm_get_select_type_param() & CR_OTHER_CONS_TRES)))) {
3640 select_hetero = 1;
3641 } else
3642 select_hetero = 0;
3643 xfree(select_type);
3644 }
3645
3646 /* Clear dependencies on NULL, "0", or empty dependency input */
3647 job_ptr->details->expanding_jobid = 0;
3648 if ((new_depend == NULL) || (new_depend[0] == '\0') ||
3649 ((new_depend[0] == '0') && (new_depend[1] == '\0'))) {
3650 xfree(job_ptr->details->dependency);
3651 FREE_NULL_LIST(job_ptr->details->depend_list);
3652 return rc;
3653
3654 }
3655
3656 new_depend_list = list_create(xfree_ptr);
3657 if ((new_array_dep = _xlate_array_dep(new_depend)))
3658 tok = new_array_dep;
3659 else
3660 tok = new_depend;
3661 /* validate new dependency string */
3662 while (rc == SLURM_SUCCESS) {
3663 /* test singleton dependency flag */
3664 if (xstrncasecmp(tok, "singleton", 9) == 0) {
3665 uint32_t state;
3666 tok += 9; /* skip past "singleton" */
3667 depend_type = SLURM_DEPEND_SINGLETON;
3668 if (_parse_depend_state(&tok, &state)) {
3669 rc = ESLURM_DEPENDENCY;
3670 break;
3671 }
3672 if (disable_remote_singleton &&
3673 !fed_mgr_is_origin_job(job_ptr)) {
3674 /* Singleton disabled for non-origin cluster */
3675 } else {
3676 dep_ptr = xmalloc(sizeof(depend_spec_t));
3677 dep_ptr->depend_state = state;
3678 dep_ptr->depend_type = depend_type;
3679 /* dep_ptr->job_id = 0; set by xmalloc */
3680 /* dep_ptr->job_ptr = NULL; set by xmalloc */
3681 /* dep_ptr->singleton_bits = 0;set by xmalloc */
3682 _add_dependency_to_list(new_depend_list,
3683 dep_ptr);
3684 }
3685 if (tok[0] == ',') {
3686 tok++;
3687 continue;
3688 } else if (tok[0] == '?') {
3689 tok++;
3690 or_flag = true;
3691 continue;
3692 }
3693 if (tok[0] != '\0')
3694 rc = ESLURM_DEPENDENCY;
3695 break;
3696 }
3697
3698 /* Test for old format, just a job ID */
3699 sep_ptr = strchr(tok, ':');
3700 if ((sep_ptr == NULL) && (tok[0] >= '0') && (tok[0] <= '9')) {
3701 _parse_dependency_jobid_old(job_ptr, new_depend_list,
3702 &sep_ptr, tok, &rc);
3703 if (rc)
3704 break;
3705 if (sep_ptr && (sep_ptr[0] == ',')) {
3706 tok = sep_ptr + 1;
3707 continue;
3708 } else {
3709 break;
3710 }
3711 } else if (sep_ptr == NULL) {
3712 rc = ESLURM_DEPENDENCY;
3713 break;
3714 }
3715
3716 /* New format, <test>:job_ID */
3717 if (!xstrncasecmp(tok, "afternotok:", 11))
3718 depend_type = SLURM_DEPEND_AFTER_NOT_OK;
3719 else if (!xstrncasecmp(tok, "aftercorr:", 10))
3720 depend_type = SLURM_DEPEND_AFTER_CORRESPOND;
3721 else if (!xstrncasecmp(tok, "afterany:", 9))
3722 depend_type = SLURM_DEPEND_AFTER_ANY;
3723 else if (!xstrncasecmp(tok, "afterok:", 8))
3724 depend_type = SLURM_DEPEND_AFTER_OK;
3725 else if (!xstrncasecmp(tok, "afterburstbuffer:", 11))
3726 depend_type = SLURM_DEPEND_BURST_BUFFER;
3727 else if (!xstrncasecmp(tok, "after:", 6))
3728 depend_type = SLURM_DEPEND_AFTER;
3729 else if (!xstrncasecmp(tok, "expand:", 7)) {
3730 if (!permit_job_expansion()) {
3731 rc = ESLURM_NOT_SUPPORTED;
3732 break;
3733 }
3734 depend_type = SLURM_DEPEND_EXPAND;
3735 } else {
3736 rc = ESLURM_DEPENDENCY;
3737 break;
3738 }
3739 sep_ptr++; /* skip over ":" */
3740 _parse_dependency_jobid_new(job_ptr, new_depend_list, sep_ptr,
3741 &sep_ptr2, tok, depend_type,
3742 select_hetero, &rc);
3743
3744 if (sep_ptr2 && (sep_ptr2[0] == ',')) {
3745 tok = sep_ptr2 + 1;
3746 } else if (sep_ptr2 && (sep_ptr2[0] == '?')) {
3747 tok = sep_ptr2 + 1;
3748 or_flag = true;
3749 } else {
3750 break;
3751 }
3752 }
3753
3754 if (rc == SLURM_SUCCESS) {
3755 /* test for circular dependencies (e.g. A -> B -> A) */
3756 (void) _scan_depend(NULL, job_ptr->job_id);
3757 if (_scan_depend(new_depend_list, job_ptr->job_id))
3758 rc = ESLURM_CIRCULAR_DEPENDENCY;
3759 }
3760
3761 if (rc == SLURM_SUCCESS) {
3762 FREE_NULL_LIST(job_ptr->details->depend_list);
3763 job_ptr->details->depend_list = new_depend_list;
3764 _depend_list2str(job_ptr, or_flag);
3765 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
3766 print_job_dependency(job_ptr, __func__);
3767 } else {
3768 FREE_NULL_LIST(new_depend_list);
3769 }
3770 xfree(new_array_dep);
3771 return rc;
3772 }
3773
3774 /* Return true if job_id is found in dependency_list.
3775 * Pass NULL dependency list to clear the counter.
3776 * Execute recursively for each dependent job */
_scan_depend(List dependency_list,uint32_t job_id)3777 static bool _scan_depend(List dependency_list, uint32_t job_id)
3778 {
3779 static int job_counter = 0;
3780 bool rc = false;
3781 ListIterator iter;
3782 depend_spec_t *dep_ptr;
3783
3784 if (dependency_list == NULL) {
3785 job_counter = 0;
3786 return false;
3787 } else if (job_counter++ >= max_depend_depth) {
3788 return false;
3789 }
3790
3791 xassert(job_id);
3792 iter = list_iterator_create(dependency_list);
3793 while (!rc && (dep_ptr = list_next(iter))) {
3794 if (dep_ptr->job_id == 0) /* Singleton */
3795 continue;
3796 /*
3797 * We can't test for circular dependencies if the job_ptr
3798 * wasn't found - the job may not be on this cluster, or the
3799 * job was already purged when the dependency submitted,
3800 * or the job just didn't exist.
3801 */
3802 if (!dep_ptr->job_ptr)
3803 continue;
3804 if (dep_ptr->job_id == job_id)
3805 rc = true;
3806 else if ((dep_ptr->job_id != dep_ptr->job_ptr->job_id) ||
3807 (dep_ptr->job_ptr->magic != JOB_MAGIC))
3808 continue; /* purged job, ptr not yet cleared */
3809 else if (!IS_JOB_FINISHED(dep_ptr->job_ptr) &&
3810 dep_ptr->job_ptr->details &&
3811 dep_ptr->job_ptr->details->depend_list) {
3812 rc = _scan_depend(dep_ptr->job_ptr->details->
3813 depend_list, job_id);
3814 if (rc) {
3815 info("circular dependency: %pJ is dependent upon JobId=%u",
3816 dep_ptr->job_ptr, job_id);
3817 }
3818 }
3819 }
3820 list_iterator_destroy(iter);
3821 return rc;
3822 }
3823
3824 /* If there are higher priority queued jobs in this job's partition, then
3825 * delay the job's expected initiation time as needed to run those jobs.
3826 * NOTE: This is only a rough estimate of the job's start time as it ignores
3827 * job dependencies, feature requirements, specific node requirements, etc. */
_delayed_job_start_time(job_record_t * job_ptr)3828 static void _delayed_job_start_time(job_record_t *job_ptr)
3829 {
3830 uint32_t part_node_cnt, part_cpu_cnt, part_cpus_per_node;
3831 uint32_t job_size_cpus, job_size_nodes, job_time;
3832 uint64_t cume_space_time = 0;
3833 job_record_t *job_q_ptr;
3834 ListIterator job_iterator;
3835
3836 if (job_ptr->part_ptr == NULL)
3837 return;
3838 part_node_cnt = job_ptr->part_ptr->total_nodes;
3839 part_cpu_cnt = job_ptr->part_ptr->total_cpus;
3840 if (part_cpu_cnt > part_node_cnt)
3841 part_cpus_per_node = part_cpu_cnt / part_node_cnt;
3842 else
3843 part_cpus_per_node = 1;
3844
3845 job_iterator = list_iterator_create(job_list);
3846 while ((job_q_ptr = list_next(job_iterator))) {
3847 if (!IS_JOB_PENDING(job_q_ptr) || !job_q_ptr->details ||
3848 (job_q_ptr->part_ptr != job_ptr->part_ptr) ||
3849 (job_q_ptr->priority < job_ptr->priority) ||
3850 (job_q_ptr->job_id == job_ptr->job_id) ||
3851 (IS_JOB_REVOKED(job_q_ptr)))
3852 continue;
3853 if (job_q_ptr->details->min_nodes == NO_VAL)
3854 job_size_nodes = 1;
3855 else
3856 job_size_nodes = job_q_ptr->details->min_nodes;
3857 if (job_q_ptr->details->min_cpus == NO_VAL)
3858 job_size_cpus = 1;
3859 else
3860 job_size_cpus = job_q_ptr->details->min_cpus;
3861 job_size_cpus = MAX(job_size_cpus,
3862 (job_size_nodes * part_cpus_per_node));
3863 if (job_q_ptr->time_limit == NO_VAL)
3864 job_time = job_q_ptr->part_ptr->max_time;
3865 else
3866 job_time = job_q_ptr->time_limit;
3867 cume_space_time += job_size_cpus * job_time;
3868 }
3869 list_iterator_destroy(job_iterator);
3870 cume_space_time /= part_cpu_cnt;/* Factor out size */
3871 cume_space_time *= 60; /* Minutes to seconds */
3872 debug2("Increasing estimated start of %pJ by %"PRIu64" secs",
3873 job_ptr, cume_space_time);
3874 job_ptr->start_time += cume_space_time;
3875 }
3876
_part_weight_sort(void * x,void * y)3877 static int _part_weight_sort(void *x, void *y)
3878 {
3879 part_record_t *parta = *(part_record_t **) x;
3880 part_record_t *partb = *(part_record_t **) y;
3881
3882 if (parta->priority_tier > partb->priority_tier)
3883 return -1;
3884 if (parta->priority_tier < partb->priority_tier)
3885 return 1;
3886
3887 return 0;
3888 }
3889
3890 /*
3891 * Determine if a pending job will run using only the specified nodes
3892 * (in job_desc_msg->req_nodes), build response message and return
3893 * SLURM_SUCCESS on success. Otherwise return an error code. Caller
3894 * must free response message
3895 */
job_start_data(job_desc_msg_t * job_desc_msg,will_run_response_msg_t ** resp)3896 extern int job_start_data(job_desc_msg_t *job_desc_msg,
3897 will_run_response_msg_t **resp)
3898 {
3899 job_record_t *job_ptr;
3900 part_record_t *part_ptr;
3901 bitstr_t *active_bitmap = NULL, *avail_bitmap = NULL;
3902 bitstr_t *resv_bitmap = NULL;
3903 bitstr_t *exc_core_bitmap = NULL;
3904 uint32_t min_nodes, max_nodes, req_nodes;
3905 int i, rc = SLURM_SUCCESS;
3906 time_t now = time(NULL), start_res, orig_start_time = (time_t) 0;
3907 List preemptee_candidates = NULL, preemptee_job_list = NULL;
3908 bool resv_overlap = false;
3909 ListIterator iter = NULL;
3910
3911 job_ptr = find_job_record(job_desc_msg->job_id);
3912 if (job_ptr == NULL)
3913 return ESLURM_INVALID_JOB_ID;
3914
3915 /*
3916 * NOTE: Do not use IS_JOB_PENDING since that doesn't take
3917 * into account the COMPLETING FLAG which we need to since we don't want
3918 * to schedule a requeued job until it is actually done completing
3919 * the first time.
3920 */
3921 if ((job_ptr->details == NULL) || (job_ptr->job_state != JOB_PENDING))
3922 return ESLURM_DISABLED;
3923
3924 if (job_ptr->part_ptr_list) {
3925 list_sort(job_ptr->part_ptr_list, _part_weight_sort);
3926 iter = list_iterator_create(job_ptr->part_ptr_list);
3927 part_ptr = list_next(iter);
3928 } else
3929 part_ptr = job_ptr->part_ptr;
3930 next_part:
3931 rc = SLURM_SUCCESS;
3932 if (part_ptr == NULL) {
3933 if (iter)
3934 list_iterator_destroy(iter);
3935 return ESLURM_INVALID_PARTITION_NAME;
3936 }
3937
3938 if ((job_desc_msg->req_nodes == NULL) ||
3939 (job_desc_msg->req_nodes[0] == '\0')) {
3940 /* assume all nodes available to job for testing */
3941 avail_bitmap = bit_alloc(node_record_count);
3942 bit_nset(avail_bitmap, 0, (node_record_count - 1));
3943 } else if (node_name2bitmap(job_desc_msg->req_nodes, false,
3944 &avail_bitmap) != 0) {
3945 /* Don't need to check for each partition */
3946 if (iter)
3947 list_iterator_destroy(iter);
3948 return ESLURM_INVALID_NODE_NAME;
3949 }
3950
3951 /* Consider only nodes in this job's partition */
3952 if (part_ptr->node_bitmap)
3953 bit_and(avail_bitmap, part_ptr->node_bitmap);
3954 else
3955 rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE;
3956 if (job_req_node_filter(job_ptr, avail_bitmap, true))
3957 rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE;
3958 if (job_ptr->details->exc_node_bitmap) {
3959 bit_and_not(avail_bitmap, job_ptr->details->exc_node_bitmap);
3960 }
3961 if (job_ptr->details->req_node_bitmap) {
3962 if (!bit_super_set(job_ptr->details->req_node_bitmap,
3963 avail_bitmap)) {
3964 rc = ESLURM_REQUESTED_PART_CONFIG_UNAVAILABLE;
3965 }
3966 }
3967
3968 /* Enforce reservation: access control, time and nodes */
3969 if (job_ptr->details->begin_time &&
3970 (job_ptr->details->begin_time > now))
3971 start_res = job_ptr->details->begin_time;
3972 else
3973 start_res = now;
3974
3975 i = job_test_resv(job_ptr, &start_res, true, &resv_bitmap,
3976 &exc_core_bitmap, &resv_overlap, false);
3977 if (i != SLURM_SUCCESS) {
3978 FREE_NULL_BITMAP(avail_bitmap);
3979 FREE_NULL_BITMAP(exc_core_bitmap);
3980 if (job_ptr->part_ptr_list && (part_ptr = list_next(iter)))
3981 goto next_part;
3982
3983 if (iter)
3984 list_iterator_destroy(iter);
3985 return i;
3986 }
3987 bit_and(avail_bitmap, resv_bitmap);
3988 FREE_NULL_BITMAP(resv_bitmap);
3989
3990 /* Only consider nodes that are not DOWN or DRAINED */
3991 bit_and(avail_bitmap, avail_node_bitmap);
3992
3993 if (rc == SLURM_SUCCESS) {
3994 int test_fini = -1;
3995 uint8_t save_share_res, save_whole_node;
3996 /* On BlueGene systems don't adjust the min/max node limits
3997 here. We are working on midplane values. */
3998 min_nodes = MAX(job_ptr->details->min_nodes,
3999 part_ptr->min_nodes);
4000 if (job_ptr->details->max_nodes == 0)
4001 max_nodes = part_ptr->max_nodes;
4002 else
4003 max_nodes = MIN(job_ptr->details->max_nodes,
4004 part_ptr->max_nodes);
4005 max_nodes = MIN(max_nodes, 500000); /* prevent overflows */
4006 if (!job_ptr->limit_set.tres[TRES_ARRAY_NODE] &&
4007 job_ptr->details->max_nodes)
4008 req_nodes = max_nodes;
4009 else
4010 req_nodes = min_nodes;
4011 preemptee_candidates = slurm_find_preemptable_jobs(job_ptr);
4012
4013 /* The orig_start is based upon the backfill scheduler data
4014 * and considers all higher priority jobs. The logic below
4015 * only considers currently running jobs, so the expected
4016 * start time will almost certainly be earlier and not as
4017 * accurate, but this algorithm is much faster. */
4018 orig_start_time = job_ptr->start_time;
4019 build_active_feature_bitmap(job_ptr, avail_bitmap,
4020 &active_bitmap);
4021 if (active_bitmap) {
4022 rc = select_g_job_test(job_ptr, active_bitmap,
4023 min_nodes, max_nodes, req_nodes,
4024 SELECT_MODE_WILL_RUN,
4025 preemptee_candidates,
4026 &preemptee_job_list,
4027 exc_core_bitmap);
4028 if (rc == SLURM_SUCCESS) {
4029 FREE_NULL_BITMAP(avail_bitmap);
4030 avail_bitmap = active_bitmap;
4031 active_bitmap = NULL;
4032 test_fini = 1;
4033 } else {
4034 FREE_NULL_BITMAP(active_bitmap);
4035 save_share_res = job_ptr->details->share_res;
4036 save_whole_node = job_ptr->details->whole_node;
4037 job_ptr->details->share_res = 0;
4038 job_ptr->details->whole_node = 1;
4039 test_fini = 0;
4040 }
4041 }
4042 if (test_fini != 1) {
4043 rc = select_g_job_test(job_ptr, avail_bitmap,
4044 min_nodes, max_nodes, req_nodes,
4045 SELECT_MODE_WILL_RUN,
4046 preemptee_candidates,
4047 &preemptee_job_list,
4048 exc_core_bitmap);
4049 if (test_fini == 0) {
4050 job_ptr->details->share_res = save_share_res;
4051 job_ptr->details->whole_node = save_whole_node;
4052 }
4053 }
4054 }
4055
4056 if (rc == SLURM_SUCCESS) {
4057 will_run_response_msg_t *resp_data;
4058 resp_data = xmalloc(sizeof(will_run_response_msg_t));
4059 resp_data->job_id = job_ptr->job_id;
4060 resp_data->proc_cnt = job_ptr->total_cpus;
4061 _delayed_job_start_time(job_ptr);
4062 resp_data->start_time = MAX(job_ptr->start_time,
4063 orig_start_time);
4064 resp_data->start_time = MAX(resp_data->start_time, start_res);
4065 job_ptr->start_time = 0; /* restore pending job start time */
4066 resp_data->node_list = bitmap2node_name(avail_bitmap);
4067 resp_data->part_name = xstrdup(part_ptr->name);
4068
4069 if (preemptee_job_list) {
4070 ListIterator preemptee_iterator;
4071 uint32_t *preemptee_jid;
4072 job_record_t *tmp_job_ptr;
4073 resp_data->preemptee_job_id = list_create(xfree_ptr);
4074 preemptee_iterator = list_iterator_create(
4075 preemptee_job_list);
4076 while ((tmp_job_ptr = list_next(preemptee_iterator))) {
4077 preemptee_jid = xmalloc(sizeof(uint32_t));
4078 (*preemptee_jid) = tmp_job_ptr->job_id;
4079 list_append(resp_data->preemptee_job_id,
4080 preemptee_jid);
4081 }
4082 list_iterator_destroy(preemptee_iterator);
4083 }
4084
4085 resp_data->sys_usage_per = _get_system_usage();
4086
4087 *resp = resp_data;
4088 } else {
4089 rc = ESLURM_REQUESTED_NODE_CONFIG_UNAVAILABLE;
4090 }
4091
4092 FREE_NULL_LIST(preemptee_candidates);
4093 FREE_NULL_LIST(preemptee_job_list);
4094 FREE_NULL_BITMAP(avail_bitmap);
4095 FREE_NULL_BITMAP(exc_core_bitmap);
4096
4097 if (rc && job_ptr->part_ptr_list && (part_ptr = list_next(iter)))
4098 goto next_part;
4099
4100 if (iter)
4101 list_iterator_destroy(iter);
4102
4103 return rc;
4104 }
4105
4106 /*
4107 * epilog_slurmctld - execute the epilog_slurmctld for a job that has just
4108 * terminated.
4109 * IN job_ptr - pointer to job that has been terminated
4110 */
epilog_slurmctld(job_record_t * job_ptr)4111 extern void epilog_slurmctld(job_record_t *job_ptr)
4112 {
4113 xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
4114
4115 prep_epilog_slurmctld(job_ptr);
4116 }
4117
4118 /*
4119 * Determine which nodes must be rebooted for a job
4120 * IN job_ptr - pointer to job that will be initiated
4121 * RET bitmap of nodes requiring a reboot for NodeFeaturesPlugin or NULL if none
4122 */
node_features_reboot(job_record_t * job_ptr)4123 extern bitstr_t *node_features_reboot(job_record_t *job_ptr)
4124 {
4125 bitstr_t *active_bitmap = NULL, *boot_node_bitmap = NULL;
4126 bitstr_t *feature_node_bitmap, *tmp_bitmap;
4127 char *reboot_features;
4128
4129 if ((node_features_g_count() == 0) ||
4130 !node_features_g_user_update(job_ptr->user_id))
4131 return NULL;
4132
4133 /*
4134 * Check if all features supported with AND/OR combinations
4135 */
4136 build_active_feature_bitmap(job_ptr, job_ptr->node_bitmap,
4137 &active_bitmap);
4138 if (active_bitmap == NULL) /* All nodes have desired features */
4139 return NULL;
4140 bit_free(active_bitmap);
4141
4142 /*
4143 * If some XOR/XAND option, filter out only first set of features
4144 * for NodeFeaturesPlugin
4145 */
4146 feature_node_bitmap = node_features_g_get_node_bitmap();
4147 if (feature_node_bitmap == NULL) /* No nodes under NodeFeaturesPlugin */
4148 return NULL;
4149
4150 reboot_features = node_features_g_job_xlate(job_ptr->details->features);
4151 tmp_bitmap = build_active_feature_bitmap2(reboot_features);
4152 xfree(reboot_features);
4153 boot_node_bitmap = bit_copy(job_ptr->node_bitmap);
4154 bit_and(boot_node_bitmap, feature_node_bitmap);
4155 bit_free(feature_node_bitmap);
4156 if (tmp_bitmap) {
4157 bit_and_not(boot_node_bitmap, tmp_bitmap);
4158 bit_free(tmp_bitmap);
4159 }
4160 if (bit_ffs(boot_node_bitmap) == -1)
4161 FREE_NULL_BITMAP(boot_node_bitmap);
4162
4163 return boot_node_bitmap;
4164 }
4165
4166 /* Determine if node boot required for this job
4167 * IN job_ptr - pointer to job that will be initiated
4168 * IN node_bitmap - nodes to be allocated
4169 * RET - true if reboot required
4170 */
node_features_reboot_test(job_record_t * job_ptr,bitstr_t * node_bitmap)4171 extern bool node_features_reboot_test(job_record_t *job_ptr,
4172 bitstr_t *node_bitmap)
4173 {
4174 bitstr_t *active_bitmap = NULL, *boot_node_bitmap = NULL;
4175 int node_cnt;
4176
4177 if (job_ptr->reboot)
4178 return true;
4179
4180 if ((node_features_g_count() == 0) ||
4181 !node_features_g_user_update(job_ptr->user_id))
4182 return false;
4183
4184 build_active_feature_bitmap(job_ptr, node_bitmap, &active_bitmap);
4185 if (active_bitmap == NULL) /* All have desired features */
4186 return false;
4187
4188 boot_node_bitmap = bit_copy(node_bitmap);
4189 bit_and_not(boot_node_bitmap, active_bitmap);
4190 node_cnt = bit_set_count(boot_node_bitmap);
4191 FREE_NULL_BITMAP(active_bitmap);
4192 FREE_NULL_BITMAP(boot_node_bitmap);
4193
4194 if (node_cnt == 0)
4195 return false;
4196 return true;
4197 }
4198
4199 /*
4200 * reboot_job_nodes - Reboot the compute nodes allocated to a job.
4201 * Also change the modes of KNL nodes for node_features/knl_generic plugin.
4202 * IN job_ptr - pointer to job that will be initiated
4203 * RET SLURM_SUCCESS(0) or error code
4204 */
4205 #ifdef HAVE_FRONT_END
reboot_job_nodes(job_record_t * job_ptr)4206 extern int reboot_job_nodes(job_record_t *job_ptr)
4207 {
4208 return SLURM_SUCCESS;
4209 }
4210 #else
4211 /* NOTE: See power_job_reboot() in power_save.c for similar logic */
reboot_job_nodes(job_record_t * job_ptr)4212 extern int reboot_job_nodes(job_record_t *job_ptr)
4213 {
4214 int rc = SLURM_SUCCESS;
4215 int i, i_first, i_last;
4216 agent_arg_t *reboot_agent_args = NULL;
4217 reboot_msg_t *reboot_msg;
4218 node_record_t *node_ptr;
4219 time_t now = time(NULL);
4220 bitstr_t *boot_node_bitmap = NULL, *feature_node_bitmap = NULL;
4221 char *nodes, *reboot_features = NULL;
4222 uint16_t protocol_version = SLURM_PROTOCOL_VERSION;
4223 wait_boot_arg_t *wait_boot_arg;
4224 pthread_t tid;
4225
4226 if ((job_ptr->details == NULL) || (job_ptr->node_bitmap == NULL))
4227 return SLURM_SUCCESS;
4228 if (power_save_test())
4229 return power_job_reboot(job_ptr);
4230 if ((slurmctld_conf.reboot_program == NULL) ||
4231 (slurmctld_conf.reboot_program[0] == '\0'))
4232 return SLURM_SUCCESS;
4233
4234 /*
4235 * See power_job_reboot() in power_save.c for similar logic used by
4236 * node_features/knl_cray plugin.
4237 */
4238 if (job_ptr->reboot) {
4239 boot_node_bitmap = bit_copy(job_ptr->node_bitmap);
4240 } else {
4241 boot_node_bitmap = node_features_reboot(job_ptr);
4242 if (boot_node_bitmap == NULL)
4243 return SLURM_SUCCESS;
4244 }
4245
4246 wait_boot_arg = xmalloc(sizeof(wait_boot_arg_t));
4247 wait_boot_arg->job_id = job_ptr->job_id;
4248 wait_boot_arg->node_bitmap = bit_alloc(node_record_count);
4249
4250 /* Modify state information for all nodes, KNL and others */
4251 i_first = bit_ffs(boot_node_bitmap);
4252 if (i_first >= 0)
4253 i_last = bit_fls(boot_node_bitmap);
4254 else
4255 i_last = i_first - 1;
4256 for (i = i_first; i <= i_last; i++) {
4257 if (!bit_test(boot_node_bitmap, i))
4258 continue;
4259 node_ptr = node_record_table_ptr + i;
4260 if (protocol_version > node_ptr->protocol_version)
4261 protocol_version = node_ptr->protocol_version;
4262 node_ptr->node_state |= NODE_STATE_NO_RESPOND;
4263 node_ptr->node_state |= NODE_STATE_POWER_UP;
4264 bit_clear(avail_node_bitmap, i);
4265 bit_set(booting_node_bitmap, i);
4266 bit_set(wait_boot_arg->node_bitmap, i);
4267 node_ptr->boot_req_time = now;
4268 node_ptr->last_response = now + slurmctld_conf.resume_timeout;
4269 }
4270
4271 if (job_ptr->details->features &&
4272 node_features_g_user_update(job_ptr->user_id)) {
4273 reboot_features = node_features_g_job_xlate(
4274 job_ptr->details->features);
4275 if (reboot_features)
4276 feature_node_bitmap = node_features_g_get_node_bitmap();
4277 if (feature_node_bitmap)
4278 bit_and(feature_node_bitmap, boot_node_bitmap);
4279 if (!feature_node_bitmap ||
4280 (bit_ffs(feature_node_bitmap) == -1)) {
4281 /* No KNL nodes to reboot */
4282 FREE_NULL_BITMAP(feature_node_bitmap);
4283 } else {
4284 bit_and_not(boot_node_bitmap, feature_node_bitmap);
4285 if (bit_ffs(boot_node_bitmap) == -1) {
4286 /* No non-KNL nodes to reboot */
4287 FREE_NULL_BITMAP(boot_node_bitmap);
4288 }
4289 }
4290 }
4291
4292 if (feature_node_bitmap) {
4293 /* Reboot nodes to change KNL NUMA and/or MCDRAM mode */
4294 reboot_agent_args = xmalloc(sizeof(agent_arg_t));
4295 reboot_agent_args->msg_type = REQUEST_REBOOT_NODES;
4296 reboot_agent_args->retry = 0;
4297 reboot_agent_args->node_count = 0;
4298 reboot_agent_args->protocol_version = protocol_version;
4299 reboot_agent_args->hostlist = hostlist_create(NULL);
4300 reboot_msg = xmalloc(sizeof(reboot_msg_t));
4301 slurm_init_reboot_msg(reboot_msg, false);
4302 reboot_agent_args->msg_args = reboot_msg;
4303 reboot_msg->features = reboot_features; /* Move, not copy */
4304 for (i = i_first; i <= i_last; i++) {
4305 if (!bit_test(feature_node_bitmap, i))
4306 continue;
4307 node_ptr = node_record_table_ptr + i;
4308 hostlist_push_host(reboot_agent_args->hostlist,
4309 node_ptr->name);
4310 reboot_agent_args->node_count++;
4311 }
4312 nodes = bitmap2node_name(feature_node_bitmap);
4313 if (nodes) {
4314 info("%s: reboot nodes %s features %s",
4315 __func__, nodes, reboot_features);
4316 } else {
4317 error("%s: bitmap2nodename", __func__);
4318 rc = SLURM_ERROR;
4319 }
4320 xfree(nodes);
4321 agent_queue_request(reboot_agent_args);
4322 }
4323
4324 if (boot_node_bitmap) {
4325 /* Reboot nodes with no feature changes */
4326 reboot_agent_args = xmalloc(sizeof(agent_arg_t));
4327 reboot_agent_args->msg_type = REQUEST_REBOOT_NODES;
4328 reboot_agent_args->retry = 0;
4329 reboot_agent_args->node_count = 0;
4330 reboot_agent_args->protocol_version = protocol_version;
4331 reboot_agent_args->hostlist = hostlist_create(NULL);
4332 reboot_msg = xmalloc(sizeof(reboot_msg_t));
4333 slurm_init_reboot_msg(reboot_msg, false);
4334 reboot_agent_args->msg_args = reboot_msg;
4335 /* reboot_msg->features = NULL; */
4336 for (i = i_first; i <= i_last; i++) {
4337 if (!bit_test(boot_node_bitmap, i))
4338 continue;
4339 node_ptr = node_record_table_ptr + i;
4340 hostlist_push_host(reboot_agent_args->hostlist,
4341 node_ptr->name);
4342 reboot_agent_args->node_count++;
4343 }
4344 nodes = bitmap2node_name(boot_node_bitmap);
4345 if (nodes) {
4346 info("%s: reboot nodes %s", __func__, nodes);
4347 } else {
4348 error("%s: bitmap2nodename", __func__);
4349 rc = SLURM_ERROR;
4350 }
4351 xfree(nodes);
4352 agent_queue_request(reboot_agent_args);
4353 }
4354
4355 job_ptr->details->prolog_running++;
4356 slurm_thread_create(&tid, _wait_boot, wait_boot_arg);
4357 FREE_NULL_BITMAP(boot_node_bitmap);
4358 FREE_NULL_BITMAP(feature_node_bitmap);
4359
4360 return rc;
4361 }
4362
_wait_boot(void * arg)4363 static void *_wait_boot(void *arg)
4364 {
4365 wait_boot_arg_t *wait_boot_arg = (wait_boot_arg_t *) arg;
4366 job_record_t *job_ptr;
4367 bitstr_t *boot_node_bitmap = wait_boot_arg->node_bitmap;
4368 /* Locks: Write jobs; read nodes */
4369 slurmctld_lock_t job_write_lock = {
4370 READ_LOCK, WRITE_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
4371 /* Locks: Write jobs; write nodes */
4372 slurmctld_lock_t node_write_lock = {
4373 READ_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
4374 uint16_t resume_timeout = slurm_get_resume_timeout();
4375 node_record_t *node_ptr;
4376 time_t start_time = time(NULL);
4377 int i, total_node_cnt, wait_node_cnt;
4378 bool job_timeout = false;
4379
4380 /*
4381 * This adds the process to the list, we don't need the return pointer
4382 */
4383 (void)track_script_rec_add(wait_boot_arg->job_id, 0, pthread_self());
4384
4385 do {
4386 sleep(5);
4387 total_node_cnt = wait_node_cnt = 0;
4388 lock_slurmctld(job_write_lock);
4389 if (!(job_ptr = find_job_record(wait_boot_arg->job_id))) {
4390 error("%s: JobId=%u vanished while waiting for node boot",
4391 __func__, wait_boot_arg->job_id);
4392 unlock_slurmctld(job_write_lock);
4393 track_script_remove(pthread_self());
4394 return NULL;
4395 }
4396 if (IS_JOB_PENDING(job_ptr) || /* Job requeued or killed */
4397 IS_JOB_FINISHED(job_ptr) ||
4398 !job_ptr->node_bitmap) {
4399 verbose("%pJ no longer waiting for node boot",
4400 job_ptr);
4401 unlock_slurmctld(job_write_lock);
4402 track_script_remove(pthread_self());
4403 return NULL;
4404 }
4405 for (i = 0, node_ptr = node_record_table_ptr;
4406 i < node_record_count; i++, node_ptr++) {
4407 if (!bit_test(boot_node_bitmap, i))
4408 continue;
4409 total_node_cnt++;
4410 if (node_ptr->boot_time < start_time)
4411 wait_node_cnt++;
4412 }
4413 if (wait_node_cnt) {
4414 debug("%pJ still waiting for %d of %d nodes to boot",
4415 job_ptr, wait_node_cnt, total_node_cnt);
4416 } else {
4417 info("%pJ boot complete for all %d nodes",
4418 job_ptr, total_node_cnt);
4419 }
4420 i = (int) difftime(time(NULL), start_time);
4421 if (i >= resume_timeout) {
4422 error("%pJ timeout waiting for node %d of %d boots",
4423 job_ptr, wait_node_cnt, total_node_cnt);
4424 wait_node_cnt = 0;
4425 job_timeout = true;
4426 }
4427 unlock_slurmctld(job_write_lock);
4428 } while (wait_node_cnt);
4429
4430 lock_slurmctld(node_write_lock);
4431 if (!(job_ptr = find_job_record(wait_boot_arg->job_id))) {
4432 error("%s: missing JobId=%u after node_write_lock acquired",
4433 __func__, wait_boot_arg->job_id);
4434 } else {
4435 if (job_timeout)
4436 (void) job_requeue(getuid(), job_ptr->job_id, NULL, false, 0);
4437 prolog_running_decr(job_ptr);
4438 }
4439 unlock_slurmctld(node_write_lock);
4440
4441 FREE_NULL_BITMAP(wait_boot_arg->node_bitmap);
4442 xfree(arg);
4443 track_script_remove(pthread_self());
4444 return NULL;
4445 }
4446 #endif
4447
4448 /*
4449 * Deferring this setup ensures that all calling paths into select_nodes()
4450 * have had a chance to update all appropriate job records.
4451 * This works since select_nodes() will always be holding the job_write lock,
4452 * and thus this new thread will be blocked waiting to acquire job_write
4453 * until that has completed.
4454 * For HetJobs in particular, this is critical to ensure that all components
4455 * have been setup properly before prolog_slurmctld actually runs.
4456 *
4457 * FIXME: explore other ways to refactor slurmctld to avoid this extraneous
4458 * thread
4459 */
_start_prolog_slurmctld_thread(void * x)4460 static void *_start_prolog_slurmctld_thread(void *x)
4461 {
4462 slurmctld_lock_t node_write_lock = {
4463 .conf = READ_LOCK, .job = WRITE_LOCK,
4464 .node = WRITE_LOCK, .fed = READ_LOCK };
4465 uint32_t *job_id = (uint32_t *) x;
4466 job_record_t *job_ptr;
4467
4468 lock_slurmctld(node_write_lock);
4469 if (!(job_ptr = find_job_record(*job_id))) {
4470 error("%s: missing JobId=%u", __func__, *job_id);
4471 unlock_slurmctld(node_write_lock);
4472 return NULL;
4473 }
4474 prep_prolog_slurmctld(job_ptr);
4475
4476 /*
4477 * No async prolog_slurmctld threads running, so decrement now to move
4478 * on with the job launch.
4479 */
4480 if (!job_ptr->prep_prolog_cnt) {
4481 debug2("%s: no async prolog_slurmctld running", __func__);
4482 prolog_running_decr(job_ptr);
4483 }
4484
4485 unlock_slurmctld(node_write_lock);
4486 xfree(job_id);
4487
4488 return NULL;
4489 }
4490
4491 /*
4492 * prolog_slurmctld - execute the prolog_slurmctld for a job that has just
4493 * been allocated resources.
4494 * IN job_ptr - pointer to job that will be initiated
4495 */
prolog_slurmctld(job_record_t * job_ptr)4496 extern void prolog_slurmctld(job_record_t *job_ptr)
4497 {
4498 uint32_t *job_id = xmalloc(sizeof(*job_id));
4499 xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
4500 xassert(verify_lock(NODE_LOCK, WRITE_LOCK));
4501
4502 job_ptr->details->prolog_running++;
4503 job_ptr->job_state |= JOB_CONFIGURING;
4504
4505 *job_id = job_ptr->job_id;
4506 slurm_thread_create_detached(NULL, _start_prolog_slurmctld_thread, job_id);
4507 }
4508
4509 /* Decrement a job's prolog_running counter and launch the job if zero */
prolog_running_decr(job_record_t * job_ptr)4510 extern void prolog_running_decr(job_record_t *job_ptr)
4511 {
4512 xassert(verify_lock(JOB_LOCK, WRITE_LOCK));
4513 xassert(verify_lock(FED_LOCK, READ_LOCK));
4514
4515 if (!job_ptr)
4516 return;
4517
4518 if (job_ptr->details && job_ptr->details->prolog_running &&
4519 (--job_ptr->details->prolog_running > 0))
4520 return;
4521
4522 /* Federated job notified the origin that the job is to be requeued,
4523 * need to wait for this job to be cancelled. */
4524 if (job_ptr->job_state & JOB_REQUEUE_FED)
4525 return;
4526
4527 if (IS_JOB_CONFIGURING(job_ptr) && test_job_nodes_ready(job_ptr)) {
4528 info("%s: Configuration for %pJ is complete",
4529 __func__, job_ptr);
4530 job_config_fini(job_ptr);
4531 if (job_ptr->batch_flag &&
4532 (IS_JOB_RUNNING(job_ptr) || IS_JOB_SUSPENDED(job_ptr))) {
4533 launch_job(job_ptr);
4534 }
4535 }
4536 }
4537
4538 /*
4539 * Copy a job's feature list
4540 * IN feature_list_src - a job's depend_lst
4541 * RET copy of feature_list_src, must be freed by caller
4542 */
feature_list_copy(List feature_list_src)4543 extern List feature_list_copy(List feature_list_src)
4544 {
4545 job_feature_t *feat_src, *feat_dest;
4546 ListIterator iter;
4547 List feature_list_dest = NULL;
4548
4549 if (!feature_list_src)
4550 return feature_list_dest;
4551
4552 feature_list_dest = list_create(feature_list_delete);
4553 iter = list_iterator_create(feature_list_src);
4554 while ((feat_src = list_next(iter))) {
4555 feat_dest = xmalloc(sizeof(job_feature_t));
4556 memcpy(feat_dest, feat_src, sizeof(job_feature_t));
4557 if (feat_src->node_bitmap_active)
4558 feat_dest->node_bitmap_active =
4559 bit_copy(feat_src->node_bitmap_active);
4560 if (feat_src->node_bitmap_avail)
4561 feat_dest->node_bitmap_avail =
4562 bit_copy(feat_src->node_bitmap_avail);
4563 feat_dest->name = xstrdup(feat_src->name);
4564 list_append(feature_list_dest, feat_dest);
4565 }
4566 list_iterator_destroy(iter);
4567 return feature_list_dest;
4568 }
4569
4570 /*
4571 * build_feature_list - Translate a job's feature string into a feature_list
4572 * NOTE: This function is also used for reservations if job_id == 0
4573 * IN details->features
4574 * OUT details->feature_list
4575 * RET error code
4576 */
build_feature_list(job_record_t * job_ptr)4577 extern int build_feature_list(job_record_t *job_ptr)
4578 {
4579 struct job_details *detail_ptr = job_ptr->details;
4580 char *tmp_requested, *str_ptr, *feature = NULL;
4581 int bracket = 0, count = 0, i, paren = 0, rc;
4582 bool fail = false;
4583 job_feature_t *feat;
4584 bool can_reboot;
4585
4586 if (!detail_ptr || !detail_ptr->features) { /* no constraints */
4587 if (job_ptr->batch_features)
4588 return ESLURM_BATCH_CONSTRAINT;
4589 return SLURM_SUCCESS;
4590 }
4591 if (detail_ptr->feature_list) /* already processed */
4592 return SLURM_SUCCESS;
4593
4594 /* Use of commas separator is a common error. Replace them with '&' */
4595 while ((str_ptr = strstr(detail_ptr->features, ",")))
4596 str_ptr[0] = '&';
4597
4598 can_reboot = node_features_g_user_update(job_ptr->user_id);
4599 tmp_requested = xstrdup(detail_ptr->features);
4600 detail_ptr->feature_list = list_create(feature_list_delete);
4601 for (i = 0; ; i++) {
4602 if (tmp_requested[i] == '*') {
4603 tmp_requested[i] = '\0';
4604 count = strtol(&tmp_requested[i+1], &str_ptr, 10);
4605 if ((feature == NULL) || (count <= 0) || (paren != 0)) {
4606 fail = true;
4607 break;
4608 }
4609 i = str_ptr - tmp_requested - 1;
4610 } else if (tmp_requested[i] == '&') {
4611 tmp_requested[i] = '\0';
4612 if (feature == NULL) {
4613 fail = true;
4614 break;
4615 }
4616 feat = xmalloc(sizeof(job_feature_t));
4617 feat->name = xstrdup(feature);
4618 feat->changeable = node_features_g_changeable_feature(
4619 feature);
4620 feat->count = count;
4621 feat->paren = paren;
4622 if (paren)
4623 feat->op_code = FEATURE_OP_AND;
4624 else if (bracket)
4625 feat->op_code = FEATURE_OP_XAND;
4626 else
4627 feat->op_code = FEATURE_OP_AND;
4628 list_append(detail_ptr->feature_list, feat);
4629 feature = NULL;
4630 count = 0;
4631 } else if (tmp_requested[i] == '|') {
4632 tmp_requested[i] = '\0';
4633 if (feature == NULL) {
4634 fail = true;
4635 break;
4636 }
4637 if (paren && (node_features_g_count() > 0)) {
4638 /*
4639 * Most (but not all) of the logic to support
4640 * OR within parenthesis works today except when
4641 * trying to use available (not active) features
4642 * srun -C "(hemi|snc2|snc4|quad)&(flat|cache)" ...
4643 */
4644 fail = true;
4645 break;
4646 }
4647 feat = xmalloc(sizeof(job_feature_t));
4648 feat->name = xstrdup(feature);
4649 feat->changeable = node_features_g_changeable_feature(
4650 feature);
4651 feat->count = count;
4652 feat->paren = paren;
4653 if (paren)
4654 feat->op_code = FEATURE_OP_OR;
4655 else if (bracket)
4656 feat->op_code = FEATURE_OP_XOR;
4657 else
4658 feat->op_code = FEATURE_OP_OR;
4659 list_append(detail_ptr->feature_list, feat);
4660 feature = NULL;
4661 count = 0;
4662 } else if (tmp_requested[i] == '[') {
4663 tmp_requested[i] = '\0';
4664 if ((feature != NULL) || bracket) {
4665 fail = true;
4666 break;
4667 }
4668 bracket++;
4669 } else if (tmp_requested[i] == ']') {
4670 tmp_requested[i] = '\0';
4671 if ((feature == NULL) || (bracket == 0)) {
4672 if (job_ptr->job_id) {
4673 verbose("%pJ invalid constraint %s",
4674 job_ptr, detail_ptr->features);
4675 } else {
4676 verbose("Reservation invalid constraint %s",
4677 detail_ptr->features);
4678 }
4679 xfree(tmp_requested);
4680 return ESLURM_INVALID_FEATURE;
4681 }
4682 bracket--;
4683 } else if (tmp_requested[i] == '(') {
4684 tmp_requested[i] = '\0';
4685 if ((feature != NULL) || paren) {
4686 fail = true;
4687 break;
4688 }
4689 paren++;
4690 } else if (tmp_requested[i] == ')') {
4691 tmp_requested[i] = '\0';
4692 if ((feature == NULL) || (paren == 0)) {
4693 fail = true;
4694 break;
4695 }
4696 paren--;
4697 } else if (tmp_requested[i] == '\0') {
4698 if (feature) {
4699 feat = xmalloc(sizeof(job_feature_t));
4700 feat->name = xstrdup(feature);
4701 feat->changeable = node_features_g_changeable_feature(
4702 feature);
4703 feat->count = count;
4704 feat->paren = paren;
4705 feat->op_code = FEATURE_OP_END;
4706 list_append(detail_ptr->feature_list, feat);
4707 }
4708 break;
4709 } else if (feature == NULL) {
4710 feature = &tmp_requested[i];
4711 }
4712 }
4713 xfree(tmp_requested);
4714 if (fail) {
4715 if (job_ptr->job_id) {
4716 verbose("%pJ invalid constraint %s",
4717 job_ptr, detail_ptr->features);
4718 } else {
4719 verbose("Reservation invalid constraint %s",
4720 detail_ptr->features);
4721 }
4722 return ESLURM_INVALID_FEATURE;
4723 }
4724 if (bracket != 0) {
4725 if (job_ptr->job_id) {
4726 verbose("%pJ constraint has unbalanced brackets: %s",
4727 job_ptr, detail_ptr->features);
4728 } else {
4729 verbose("Reservation constraint has unbalanced brackets: %s",
4730 detail_ptr->features);
4731 }
4732 return ESLURM_INVALID_FEATURE;
4733 }
4734 if (paren != 0) {
4735 if (job_ptr->job_id) {
4736 verbose("%pJ constraint has unbalanced parenthesis: %s",
4737 job_ptr, detail_ptr->features);
4738 } else {
4739 verbose("Reservation constraint has unbalanced parenthesis: %s",
4740 detail_ptr->features);
4741 }
4742 return ESLURM_INVALID_FEATURE;
4743 }
4744
4745 if (job_ptr->batch_features) {
4746 rc = _valid_batch_features(job_ptr, can_reboot);
4747 if (rc != SLURM_SUCCESS)
4748 return rc;
4749 }
4750
4751 return _valid_feature_list(job_ptr, can_reboot);
4752 }
4753
4754 /*
4755 * Delete a record from a job's feature_list
4756 */
feature_list_delete(void * x)4757 extern void feature_list_delete(void *x)
4758 {
4759 job_feature_t *feature_ptr = (job_feature_t *)x;
4760 xfree(feature_ptr->name);
4761 FREE_NULL_BITMAP(feature_ptr->node_bitmap_active);
4762 FREE_NULL_BITMAP(feature_ptr->node_bitmap_avail);
4763 xfree(feature_ptr);
4764 }
4765
_match_job_feature(void * x,void * key)4766 static int _match_job_feature(void *x, void *key)
4767 {
4768 job_feature_t *feat = (job_feature_t *) x;
4769 char *tok = (char *) key;
4770
4771 if (!xstrcmp(feat->name, tok)) /* Found matching feature name */
4772 return 1;
4773 return 0;
4774 }
4775
_valid_batch_features(job_record_t * job_ptr,bool can_reboot)4776 static int _valid_batch_features(job_record_t *job_ptr, bool can_reboot)
4777 {
4778 char *tmp, *tok, *save_ptr = NULL;
4779 int rc = SLURM_SUCCESS;
4780 bool have_or = false, success_or = false;
4781
4782 if (!job_ptr->batch_features)
4783 return SLURM_SUCCESS;
4784 if (!job_ptr->details || !job_ptr->details->feature_list)
4785 return ESLURM_BATCH_CONSTRAINT;
4786
4787 if (strchr(job_ptr->batch_features, '|'))
4788 have_or = true;
4789 tmp = xstrdup(job_ptr->batch_features);
4790 tok = strtok_r(tmp, "&|", &save_ptr);
4791 while (tok) {
4792 if (!list_find_first(job_ptr->details->feature_list,
4793 _match_job_feature, tok)) {
4794 rc = ESLURM_BATCH_CONSTRAINT;
4795 break;
4796 }
4797 rc = _valid_node_feature(tok, can_reboot);
4798 if (have_or) {
4799 if (rc == SLURM_SUCCESS)
4800 success_or = true;
4801 /* Ignore failure on some OR components */
4802 } else if (rc != SLURM_SUCCESS) {
4803 rc = ESLURM_BATCH_CONSTRAINT;
4804 break;
4805 }
4806 tok = strtok_r(NULL, "&|", &save_ptr);
4807 }
4808 xfree(tmp);
4809
4810 if (have_or && success_or)
4811 return SLURM_SUCCESS;
4812 return rc;
4813 }
4814
_valid_feature_list(job_record_t * job_ptr,bool can_reboot)4815 static int _valid_feature_list(job_record_t *job_ptr, bool can_reboot)
4816 {
4817 List feature_list = job_ptr->details->feature_list;
4818 ListIterator feat_iter;
4819 job_feature_t *feat_ptr;
4820 char *buf = NULL;
4821 int bracket = 0, paren = 0;
4822 int rc = SLURM_SUCCESS;
4823
4824 if (feature_list == NULL) {
4825 if (job_ptr->job_id)
4826 debug2("%pJ feature list is empty", job_ptr);
4827 else
4828 debug2("Reservation feature list is empty");
4829 return rc;
4830 }
4831
4832 feat_iter = list_iterator_create(feature_list);
4833 while ((feat_ptr = list_next(feat_iter))) {
4834 if ((feat_ptr->op_code == FEATURE_OP_XOR) ||
4835 (feat_ptr->op_code == FEATURE_OP_XAND)) {
4836 if (bracket == 0)
4837 xstrcat(buf, "[");
4838 bracket = 1;
4839 }
4840 if (feat_ptr->paren > paren) {
4841 xstrcat(buf, "(");
4842 paren = feat_ptr->paren;
4843 }
4844 xstrcat(buf, feat_ptr->name);
4845 if (feat_ptr->paren < paren) {
4846 xstrcat(buf, ")");
4847 paren = feat_ptr->paren;
4848 }
4849 if (rc == SLURM_SUCCESS)
4850 rc = _valid_node_feature(feat_ptr->name, can_reboot);
4851 if (feat_ptr->count)
4852 xstrfmtcat(buf, "*%u", feat_ptr->count);
4853 if (bracket &&
4854 ((feat_ptr->op_code != FEATURE_OP_XOR) &&
4855 (feat_ptr->op_code != FEATURE_OP_XAND))) {
4856 xstrcat(buf, "]");
4857 bracket = 0;
4858 }
4859 if ((feat_ptr->op_code == FEATURE_OP_AND) ||
4860 (feat_ptr->op_code == FEATURE_OP_XAND))
4861 xstrcat(buf, "&");
4862 else if ((feat_ptr->op_code == FEATURE_OP_OR) ||
4863 (feat_ptr->op_code == FEATURE_OP_XOR))
4864 xstrcat(buf, "|");
4865 }
4866 list_iterator_destroy(feat_iter);
4867
4868 if (rc == SLURM_SUCCESS) {
4869 if (job_ptr->job_id)
4870 debug("%pJ feature list: %s", job_ptr, buf);
4871 else
4872 debug("Reservation feature list: %s", buf);
4873 } else {
4874 if (job_ptr->job_id) {
4875 info("%pJ has invalid feature list: %s",
4876 job_ptr, buf);
4877 } else {
4878 info("Reservation has invalid feature list: %s", buf);
4879 }
4880 }
4881 xfree(buf);
4882
4883 return rc;
4884 }
4885
4886 /* Validate that job's feature is available on some node(s) */
_valid_node_feature(char * feature,bool can_reboot)4887 static int _valid_node_feature(char *feature, bool can_reboot)
4888 {
4889 int rc = ESLURM_INVALID_FEATURE;
4890 node_feature_t *feature_ptr;
4891 ListIterator feature_iter;
4892
4893 if (can_reboot)
4894 feature_iter = list_iterator_create(avail_feature_list);
4895 else
4896 feature_iter = list_iterator_create(active_feature_list);
4897 while ((feature_ptr = list_next(feature_iter))) {
4898 if (xstrcmp(feature_ptr->name, feature))
4899 continue;
4900 rc = SLURM_SUCCESS;
4901 break;
4902 }
4903 list_iterator_destroy(feature_iter);
4904
4905 return rc;
4906 }
4907
4908 /* If a job can run in multiple partitions, when it is started we want to
4909 * put the name of the partition used _first_ in that list. When slurmctld
4910 * restarts, that will be used to set the job's part_ptr and that will be
4911 * reported to squeue. We leave all of the partitions in the list though,
4912 * so the job can be requeued and have access to them all. */
rebuild_job_part_list(job_record_t * job_ptr)4913 extern void rebuild_job_part_list(job_record_t *job_ptr)
4914 {
4915 ListIterator part_iterator;
4916 part_record_t *part_ptr;
4917
4918 if (!job_ptr->part_ptr_list)
4919 return;
4920 if (!job_ptr->part_ptr || !job_ptr->part_ptr->name) {
4921 error("%pJ has NULL part_ptr or the partition name is NULL",
4922 job_ptr);
4923 return;
4924 }
4925
4926 xfree(job_ptr->partition);
4927 job_ptr->partition = xstrdup(job_ptr->part_ptr->name);
4928
4929 part_iterator = list_iterator_create(job_ptr->part_ptr_list);
4930 while ((part_ptr = list_next(part_iterator))) {
4931 if (part_ptr == job_ptr->part_ptr)
4932 continue;
4933 xstrcat(job_ptr->partition, ",");
4934 xstrcat(job_ptr->partition, part_ptr->name);
4935 }
4936 list_iterator_destroy(part_iterator);
4937 }
4938
4939 /* cleanup_completing()
4940 *
4941 * Clean up the JOB_COMPLETING flag and eventually
4942 * requeue the job if there is a pending request
4943 * for it. This function assumes the caller has the
4944 * appropriate locks on the job_record.
4945 */
cleanup_completing(job_record_t * job_ptr)4946 void cleanup_completing(job_record_t *job_ptr)
4947 {
4948 time_t delay;
4949
4950 trace_job(job_ptr, __func__, "");
4951
4952 delay = last_job_update - job_ptr->end_time;
4953 if (delay > 60) {
4954 info("%s: %pJ completion process took %ld seconds",
4955 __func__, job_ptr, (long) delay);
4956 }
4957
4958 license_job_return(job_ptr);
4959 gs_job_fini(job_ptr);
4960
4961 delete_step_records(job_ptr);
4962 job_ptr->job_state &= (~JOB_COMPLETING);
4963 job_hold_requeue(job_ptr);
4964
4965 /*
4966 * Clear alloc tres fields after a requeue. job_set_alloc_tres will
4967 * clear the fields when the job is pending and not completing.
4968 */
4969 if (IS_JOB_PENDING(job_ptr))
4970 job_set_alloc_tres(job_ptr, false);
4971
4972 /* Job could be pending if the job was requeued due to a node failure */
4973 if (IS_JOB_COMPLETED(job_ptr))
4974 fed_mgr_job_complete(job_ptr, job_ptr->exit_code,
4975 job_ptr->start_time);
4976 }
4977
4978 /*
4979 * _waitpid_timeout()
4980 *
4981 * Same as waitpid(2) but kill process group for pid after timeout secs.
4982 */
4983 int
waitpid_timeout(const char * name,pid_t pid,int * pstatus,int timeout)4984 waitpid_timeout(const char *name, pid_t pid, int *pstatus, int timeout)
4985 {
4986 int timeout_ms = 1000 * timeout; /* timeout in ms */
4987 int max_delay = 1000; /* max delay between waitpid calls */
4988 int delay = 10; /* initial delay */
4989 int rc;
4990 int options = WNOHANG;
4991
4992 if (timeout <= 0 || timeout == NO_VAL16)
4993 options = 0;
4994
4995 while ((rc = waitpid (pid, pstatus, options)) <= 0) {
4996 if (rc < 0) {
4997 if (errno == EINTR)
4998 continue;
4999 error("waidpid: %m");
5000 return -1;
5001 }
5002 else if (timeout_ms <= 0) {
5003 info("%s%stimeout after %ds: killing pgid %d",
5004 name != NULL ? name : "",
5005 name != NULL ? ": " : "",
5006 timeout, pid);
5007 killpg(pid, SIGKILL);
5008 options = 0;
5009 }
5010 else {
5011 (void) poll(NULL, 0, delay);
5012 timeout_ms -= delay;
5013 delay = MIN (timeout_ms, MIN(max_delay, delay*2));
5014 }
5015 }
5016
5017 killpg(pid, SIGKILL); /* kill children too */
5018 return pid;
5019 }
5020