1 /*****************************************************************************\
2 * fed_mgr.c - functions for federations
3 *****************************************************************************
4 * Copyright (C) 2016 SchedMD LLC.
5 * Written by Brian Christiansen <brian@schedmd.com>
6 *
7 * This file is part of Slurm, a resource management program.
8 * For details, see <https://slurm.schedmd.com/>.
9 * Please also read the included file: DISCLAIMER.
10 *
11 * Slurm is free software; you can redistribute it and/or modify it under
12 * the terms of the GNU General Public License as published by the Free
13 * Software Foundation; either version 2 of the License, or (at your option)
14 * any later version.
15 *
16 * In addition, as a special exception, the copyright holders give permission
17 * to link the code of portions of this program with the OpenSSL library under
18 * certain conditions as described in each individual source file, and
19 * distribute linked combinations including the two. You must obey the GNU
20 * General Public License in all respects for all of the code used other than
21 * OpenSSL. If you modify file(s) with this exception, you may extend this
22 * exception to your version of the file(s), but you are not obligated to do
23 * so. If you do not wish to do so, delete this exception statement from your
24 * version. If you delete this exception statement from all source files in
25 * the program, then also delete it here.
26 *
27 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
28 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
29 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
30 * details.
31 *
32 * You should have received a copy of the GNU General Public License along
33 * with Slurm; if not, write to the Free Software Foundation, Inc.,
34 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
35 \*****************************************************************************/
36
37 #include "config.h"
38
39 #include <pthread.h>
40 #include <signal.h>
41
42 #if HAVE_SYS_PRCTL_H
43 # include <sys/prctl.h>
44 #endif
45
46 #include "src/common/list.h"
47 #include "src/common/macros.h"
48 #include "src/common/parse_time.h"
49 #include "src/common/slurm_protocol_api.h"
50 #include "src/common/slurmdbd_defs.h"
51 #include "src/common/xmalloc.h"
52 #include "src/common/xstring.h"
53 #include "src/slurmctld/fed_mgr.h"
54 #include "src/slurmctld/job_scheduler.h"
55 #include "src/slurmctld/locks.h"
56 #include "src/slurmctld/proc_req.h"
57 #include "src/slurmctld/slurmctld.h"
58 #include "src/slurmctld/srun_comm.h"
59 #include "src/slurmctld/state_save.h"
60 #include "src/slurmdbd/read_config.h"
61
62 #define FED_MGR_STATE_FILE "fed_mgr_state"
63 #define FED_MGR_CLUSTER_ID_BEGIN 26
64 #define TEST_REMOTE_DEP_FREQ 30 /* seconds */
65
66 #define FED_SIBLING_BIT(x) ((uint64_t)1 << (x - 1))
67
68 slurmdb_federation_rec_t *fed_mgr_fed_rec = NULL;
69 slurmdb_cluster_rec_t *fed_mgr_cluster_rec = NULL;
70
71 static pthread_cond_t agent_cond = PTHREAD_COND_INITIALIZER;
72 static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
73 static pthread_t agent_thread_id = (pthread_t) 0;
74 static int agent_queue_size = 0;
75
76 static pthread_cond_t job_watch_cond = PTHREAD_COND_INITIALIZER;
77 static pthread_mutex_t job_watch_mutex = PTHREAD_MUTEX_INITIALIZER;
78 static bool job_watch_thread_running = false;
79 static bool stop_job_watch_thread = false;
80
81 static bool inited = false;
82 static pthread_mutex_t open_send_mutex = PTHREAD_MUTEX_INITIALIZER;
83 static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
84 static pthread_mutex_t update_mutex = PTHREAD_MUTEX_INITIALIZER;
85
86 static List fed_job_list = NULL;
87 static List fed_job_update_list = NULL;
88 static pthread_t fed_job_update_thread_id = (pthread_t) 0;
89 static pthread_mutex_t fed_job_list_mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_cond_t job_update_cond = PTHREAD_COND_INITIALIZER;
91 static pthread_mutex_t job_update_mutex = PTHREAD_MUTEX_INITIALIZER;
92
93 static List remote_dep_recv_list = NULL;
94 static pthread_t remote_dep_thread_id = (pthread_t) 0;
95 static pthread_cond_t remote_dep_cond = PTHREAD_COND_INITIALIZER;
96 static pthread_mutex_t remote_dep_recv_mutex = PTHREAD_MUTEX_INITIALIZER;
97
98 static List remote_dep_job_list = NULL;
99 static pthread_t dep_job_thread_id = (pthread_t) 0;
100 static pthread_mutex_t dep_job_list_mutex = PTHREAD_MUTEX_INITIALIZER;
101
102 static List origin_dep_update_list = NULL;
103 static pthread_t origin_dep_thread_id = (pthread_t) 0;
104 static pthread_cond_t origin_dep_cond = PTHREAD_COND_INITIALIZER;
105 static pthread_mutex_t origin_dep_update_mutex = PTHREAD_MUTEX_INITIALIZER;
106
107 typedef struct {
108 Buf buffer;
109 uint32_t job_id;
110 time_t last_try;
111 int last_defer;
112 uint16_t msg_type;
113 } agent_queue_t;
114
115 enum fed_job_update_type {
116 FED_JOB_NONE = 0,
117 FED_JOB_CANCEL,
118 FED_JOB_COMPLETE,
119 FED_JOB_REMOVE_ACTIVE_SIB_BIT,
120 FED_JOB_REQUEUE,
121 FED_JOB_START,
122 FED_JOB_SUBMIT_BATCH,
123 FED_JOB_SUBMIT_INT,
124 FED_JOB_SUBMIT_RESP,
125 FED_JOB_SYNC,
126 FED_JOB_UPDATE,
127 FED_JOB_UPDATE_RESPONSE,
128 FED_SEND_JOB_SYNC,
129 };
130
131 typedef struct {
132 uint32_t cluster_lock;
133 uint32_t flags;
134 uint32_t job_id;
135 job_info_msg_t *job_info_msg;
136 uint32_t job_state;
137 job_step_kill_msg_t *kill_msg;
138 bool requeue;
139 uint32_t return_code;
140 uint64_t siblings_active;
141 uint64_t siblings_viable;
142 char *siblings_str;
143 time_t start_time;
144 char *submit_cluster;
145 job_desc_msg_t *submit_desc;
146 uint16_t submit_proto_ver;
147 uint32_t type;
148 uid_t uid;
149 } fed_job_update_info_t;
150
151 typedef struct {
152 uint32_t cluster_lock;
153 uint32_t job_id;
154 uint64_t siblings_active;
155 uint64_t siblings_viable;
156 uint32_t updating_sibs[MAX_FED_CLUSTERS + 1];
157 time_t updating_time[MAX_FED_CLUSTERS + 1];
158 } fed_job_info_t;
159
160 /* Local Structs */
161 typedef struct {
162 job_info_msg_t *job_info_msg;
163 uint32_t sibling_id;
164 char *sibling_name;
165 time_t sync_time;
166 } reconcile_sib_t;
167
168 /* Local Prototypes */
169 static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id);
170 static uint64_t _get_all_sibling_bits();
171 static int _validate_cluster_features(char *spec_features,
172 uint64_t *cluster_bitmap);
173 static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap);
174 static void _leave_federation(void);
175 static int _q_send_job_sync(char *sib_name);
176 static slurmdb_federation_rec_t *_state_load(char *state_save_location);
177 static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg,
178 time_t sync_time);
179 static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid);
180
_job_update_type_str(enum fed_job_update_type type)181 static char *_job_update_type_str(enum fed_job_update_type type)
182 {
183 switch (type) {
184 case FED_JOB_COMPLETE:
185 return "FED_JOB_COMPLETE";
186 case FED_JOB_CANCEL:
187 return "FED_JOB_CANCEL";
188 case FED_JOB_REMOVE_ACTIVE_SIB_BIT:
189 return "FED_JOB_REMOVE_ACTIVE_SIB_BIT";
190 case FED_JOB_REQUEUE:
191 return "FED_JOB_REQUEUE";
192 case FED_JOB_START:
193 return "FED_JOB_START";
194 case FED_JOB_SUBMIT_BATCH:
195 return "FED_JOB_SUBMIT_BATCH";
196 case FED_JOB_SUBMIT_INT:
197 return "FED_JOB_SUBMIT_INT";
198 case FED_JOB_SUBMIT_RESP:
199 return "FED_JOB_SUBMIT_RESP";
200 case FED_JOB_SYNC:
201 return "FED_JOB_SYNC";
202 case FED_JOB_UPDATE:
203 return "FED_JOB_UPDATE";
204 case FED_JOB_UPDATE_RESPONSE:
205 return "FED_JOB_UPDATE_RESPONSE";
206 case FED_SEND_JOB_SYNC:
207 return "FED_SEND_JOB_SYNC";
208 default:
209 return "?";
210 }
211 }
212
_append_job_update(fed_job_update_info_t * job_update_info)213 static void _append_job_update(fed_job_update_info_t *job_update_info)
214 {
215 list_append(fed_job_update_list, job_update_info);
216
217 slurm_mutex_lock(&job_update_mutex);
218 slurm_cond_broadcast(&job_update_cond);
219 slurm_mutex_unlock(&job_update_mutex);
220 }
221
222 /* Return true if communication failure should be logged. Only log failures
223 * every 10 minutes to avoid filling logs */
_comm_fail_log(slurmdb_cluster_rec_t * cluster)224 static bool _comm_fail_log(slurmdb_cluster_rec_t *cluster)
225 {
226 time_t now = time(NULL);
227 time_t old = now - 600; /* Log failures once every 10 mins */
228
229 if (cluster->comm_fail_time < old) {
230 cluster->comm_fail_time = now;
231 return true;
232 }
233 return false;
234 }
235
_close_controller_conn(slurmdb_cluster_rec_t * cluster)236 static int _close_controller_conn(slurmdb_cluster_rec_t *cluster)
237 {
238 int rc = SLURM_SUCCESS;
239 // slurm_persist_conn_t *persist_conn = NULL;
240
241 xassert(cluster);
242 slurm_mutex_lock(&cluster->lock);
243 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
244 info("closing sibling conn to %s", cluster->name);
245
246 /* The recv free of this is handled directly in the persist_conn code,
247 * don't free it here */
248 // slurm_persist_conn_destroy(cluster->fed.recv);
249 cluster->fed.recv = NULL;
250 slurm_persist_conn_destroy(cluster->fed.send);
251 cluster->fed.send = NULL;
252 xfree(cluster->control_host);
253 cluster->control_port = 0;
254
255 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
256 info("closed sibling conn to %s", cluster->name);
257 slurm_mutex_unlock(&cluster->lock);
258
259 return rc;
260 }
261
262 /* Get list of jobs that originated from this cluster and the remote sibling and
263 * that are viable between the two siblings.
264 * Originating here: so that the remote can determine if the tracker job is gone
265 * Originating sib: so that the remote verify jobs are where they're supposed to
266 * be. If the sibling doesn't find a job, the sibling can resubmit the job or
267 * take other actions.
268 * Viable sib: because the origin might be down and the job was started or
269 * cancelled while the origin was down.
270 *
271 * Only get jobs that were submitted prior to sync_time
272 */
_get_sync_jobid_list(uint32_t sib_id,time_t sync_time)273 static List _get_sync_jobid_list(uint32_t sib_id, time_t sync_time)
274 {
275 List jobids = NULL;
276 ListIterator job_itr;
277 job_record_t *job_ptr;
278
279 jobids = list_create(xfree_ptr);
280
281 /*
282 * Only look at jobs that:
283 * 1. originate from the remote sibling
284 * 2. originate from this cluster
285 * 3. if the sibling is in the job's viable list.
286 */
287 job_itr = list_iterator_create(job_list);
288 while ((job_ptr = list_next(job_itr))) {
289 uint32_t cluster_id = fed_mgr_get_cluster_id(job_ptr->job_id);
290 if (job_ptr->fed_details &&
291 (job_ptr->details &&
292 (job_ptr->details->submit_time < sync_time)) &&
293 ((cluster_id == sib_id) ||
294 (cluster_id == fed_mgr_cluster_rec->fed.id) ||
295 (job_ptr->fed_details->siblings_viable &
296 FED_SIBLING_BIT(sib_id)))) {
297
298 uint32_t *tmp = xmalloc(sizeof(uint32_t));
299 *tmp = job_ptr->job_id;
300 list_append(jobids, tmp);
301 }
302 }
303 list_iterator_destroy(job_itr);
304
305 return jobids;
306 }
307
_open_controller_conn(slurmdb_cluster_rec_t * cluster,bool locked)308 static int _open_controller_conn(slurmdb_cluster_rec_t *cluster, bool locked)
309 {
310 int rc;
311 slurm_persist_conn_t *persist_conn = NULL;
312 static int timeout = -1;
313
314 if (timeout < 0)
315 timeout = slurm_get_msg_timeout() * 1000;
316
317 if (cluster == fed_mgr_cluster_rec) {
318 info("%s: hey! how did we get here with ourselves?", __func__);
319 return SLURM_ERROR;
320 }
321
322 if (!locked)
323 slurm_mutex_lock(&cluster->lock);
324
325 if (!cluster->control_host || !cluster->control_host[0] ||
326 !cluster->control_port) {
327 if (cluster->fed.recv) {
328 persist_conn = cluster->fed.recv;
329 cluster->control_port = persist_conn->rem_port;
330 xfree(cluster->control_host);
331 cluster->control_host = xstrdup(persist_conn->rem_host);
332 } else {
333 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
334 info("%s: Sibling cluster %s doesn't appear to be up yet, skipping",
335 __func__, cluster->name);
336 if (!locked)
337 slurm_mutex_unlock(&cluster->lock);
338 return SLURM_ERROR;
339 }
340 }
341
342 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
343 info("opening sibling conn to %s", cluster->name);
344
345 if (!cluster->fed.send) {
346 persist_conn = xmalloc(sizeof(slurm_persist_conn_t));
347
348 cluster->fed.send = persist_conn;
349
350 /* Since this connection is coming from us, make it so ;) */
351 persist_conn->cluster_name = xstrdup(slurmctld_conf.cluster_name);
352 persist_conn->persist_type = PERSIST_TYPE_FED;
353 persist_conn->my_port = slurmctld_conf.slurmctld_port;
354 persist_conn->rem_host = xstrdup(cluster->control_host);
355 persist_conn->rem_port = cluster->control_port;
356 persist_conn->version = cluster->rpc_version;
357 persist_conn->shutdown = &slurmctld_config.shutdown_time;
358 persist_conn->timeout = timeout; /* don't put this as 0 it
359 * could cause deadlock */
360 } else {
361 persist_conn = cluster->fed.send;
362
363 /* Perhaps a backup came up, so don't assume it was the same
364 * host or port we had before.
365 */
366 xfree(persist_conn->rem_host);
367 persist_conn->rem_host = xstrdup(cluster->control_host);
368 persist_conn->rem_port = cluster->control_port;
369 }
370
371 rc = slurm_persist_conn_open(persist_conn);
372 if (rc != SLURM_SUCCESS) {
373 if (_comm_fail_log(cluster)) {
374 error("fed_mgr: Unable to open connection to cluster %s using host %s(%u)",
375 cluster->name,
376 persist_conn->rem_host, persist_conn->rem_port);
377 }
378 } else if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR) {
379 info("opened sibling conn to %s:%d",
380 cluster->name, persist_conn->fd);
381 }
382
383 if (!locked)
384 slurm_mutex_unlock(&cluster->lock);
385
386 return rc;
387 }
388
389 /* The cluster->lock should be locked before this is called */
_check_send(slurmdb_cluster_rec_t * cluster)390 static int _check_send(slurmdb_cluster_rec_t *cluster)
391 {
392 slurm_persist_conn_t *send = cluster->fed.send;
393
394 if (!send || send->fd == -1) {
395 return _open_controller_conn(cluster, true);
396 }
397
398 return SLURM_SUCCESS;
399 }
400
401 /* fed_mgr read lock needs to be set before coming in here,
402 * not the write lock */
_open_persist_sends(void)403 static void _open_persist_sends(void)
404 {
405 ListIterator itr;
406 slurmdb_cluster_rec_t *cluster = NULL;
407 slurm_persist_conn_t *send = NULL;
408
409 if (!fed_mgr_fed_rec || ! fed_mgr_fed_rec->cluster_list)
410 return;
411
412 /* This open_send_mutex will make this like a write lock since at the
413 * same time we are sending out these open requests the other slurmctlds
414 * will be replying and needing to get to the structures. If we just
415 * used the fed_mgr write lock it would cause deadlock.
416 */
417 slurm_mutex_lock(&open_send_mutex);
418 itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
419 while ((cluster = list_next(itr))) {
420 if (cluster == fed_mgr_cluster_rec)
421 continue;
422
423 send = cluster->fed.send;
424 if (!send || send->fd == -1)
425 _open_controller_conn(cluster, false);
426 }
427 list_iterator_destroy(itr);
428 slurm_mutex_unlock(&open_send_mutex);
429 }
430
_send_recv_msg(slurmdb_cluster_rec_t * cluster,slurm_msg_t * req,slurm_msg_t * resp,bool locked)431 static int _send_recv_msg(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req,
432 slurm_msg_t *resp, bool locked)
433 {
434 int rc;
435
436 xassert(cluster);
437 xassert(req);
438 xassert(resp);
439
440 slurm_msg_t_init(resp);
441
442 if (!locked)
443 slurm_mutex_lock(&cluster->lock);
444
445 rc = _check_send(cluster);
446 if ((rc == SLURM_SUCCESS) && cluster->fed.send) {
447 resp->conn = req->conn = cluster->fed.send;
448 rc = slurm_send_recv_msg(req->conn->fd, req, resp, 0);
449 }
450 if (!locked)
451 slurm_mutex_unlock(&cluster->lock);
452
453 return rc;
454 }
455
456 /* Free Buf record from a list */
_ctld_free_list_msg(void * x)457 static void _ctld_free_list_msg(void *x)
458 {
459 agent_queue_t *agent_queue_ptr = (agent_queue_t *) x;
460 if (agent_queue_ptr) {
461 FREE_NULL_BUFFER(agent_queue_ptr->buffer);
462 xfree(agent_queue_ptr);
463 }
464 }
465
_queue_rpc(slurmdb_cluster_rec_t * cluster,slurm_msg_t * req,uint32_t job_id,bool locked)466 static int _queue_rpc(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req,
467 uint32_t job_id, bool locked)
468 {
469 agent_queue_t *agent_rec;
470 Buf buf;
471
472 if (!cluster->send_rpc)
473 cluster->send_rpc = list_create(_ctld_free_list_msg);
474
475 buf = init_buf(1024);
476 pack16(req->msg_type, buf);
477 if (pack_msg(req, buf) != SLURM_SUCCESS) {
478 error("%s: failed to pack msg_type:%u",
479 __func__, req->msg_type);
480 FREE_NULL_BUFFER(buf);
481 return SLURM_ERROR;
482 }
483
484 /* Queue the RPC and notify the agent of new work */
485 agent_rec = xmalloc(sizeof(agent_queue_t));
486 agent_rec->buffer = buf;
487 agent_rec->job_id = job_id;
488 agent_rec->msg_type = req->msg_type;
489 list_append(cluster->send_rpc, agent_rec);
490 slurm_mutex_lock(&agent_mutex);
491 agent_queue_size++;
492 slurm_cond_broadcast(&agent_cond);
493 slurm_mutex_unlock(&agent_mutex);
494
495 return SLURM_SUCCESS;
496 }
497
498 /*
499 * close all sibling conns
500 * must lock before entering.
501 */
_close_sibling_conns(void)502 static int _close_sibling_conns(void)
503 {
504 ListIterator itr;
505 slurmdb_cluster_rec_t *cluster;
506
507 if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
508 goto fini;
509
510 itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
511 while ((cluster = list_next(itr))) {
512 if (cluster == fed_mgr_cluster_rec)
513 continue;
514 _close_controller_conn(cluster);
515 }
516 list_iterator_destroy(itr);
517
518 fini:
519 return SLURM_SUCCESS;
520 }
521
_mark_self_as_drained(void)522 static void _mark_self_as_drained(void)
523 {
524 List ret_list;
525 slurmdb_cluster_cond_t cluster_cond;
526 slurmdb_cluster_rec_t cluster_rec;
527
528 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
529 info("%s: setting cluster fedstate to DRAINED", __func__);
530
531 slurmdb_init_cluster_cond(&cluster_cond, false);
532 slurmdb_init_cluster_rec(&cluster_rec, false);
533
534 cluster_cond.cluster_list = list_create(NULL);
535 list_append(cluster_cond.cluster_list, fed_mgr_cluster_rec->name);
536
537 cluster_rec.fed.state = CLUSTER_FED_STATE_INACTIVE |
538 (fed_mgr_cluster_rec->fed.state & ~CLUSTER_FED_STATE_BASE);
539
540 ret_list = acct_storage_g_modify_clusters(acct_db_conn,
541 slurmctld_conf.slurm_user_id,
542 &cluster_cond, &cluster_rec);
543 if (!ret_list || !list_count(ret_list)) {
544 error("Failed to set cluster state to drained");
545 }
546
547 FREE_NULL_LIST(cluster_cond.cluster_list);
548 FREE_NULL_LIST(ret_list);
549 }
550
_remove_self_from_federation(void)551 static void _remove_self_from_federation(void)
552 {
553 List ret_list;
554 slurmdb_federation_cond_t fed_cond;
555 slurmdb_federation_rec_t fed_rec;
556 slurmdb_cluster_rec_t cluster_rec;
557
558 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
559 info("%s: removing self from federation %s",
560 __func__, fed_mgr_fed_rec->name);
561
562 slurmdb_init_federation_cond(&fed_cond, false);
563 slurmdb_init_federation_rec(&fed_rec, false);
564 slurmdb_init_cluster_rec(&cluster_rec, false);
565
566 fed_cond.federation_list = list_create(NULL);
567 list_append(fed_cond.federation_list, fed_mgr_fed_rec->name);
568
569 cluster_rec.name = xstrdup_printf("-%s", fed_mgr_cluster_rec->name);
570 fed_rec.cluster_list = list_create(NULL);
571 list_append(fed_rec.cluster_list, &cluster_rec);
572
573 ret_list = acct_storage_g_modify_federations(
574 acct_db_conn,
575 slurmctld_conf.slurm_user_id, &fed_cond,
576 &fed_rec);
577 if (!ret_list || !list_count(ret_list)) {
578 error("Failed to remove federation from list");
579 }
580
581 FREE_NULL_LIST(ret_list);
582 FREE_NULL_LIST(fed_cond.federation_list);
583 FREE_NULL_LIST(fed_rec.cluster_list);
584 xfree(cluster_rec.name);
585
586 slurmctld_config.scheduling_disabled = false;
587 slurmctld_config.submissions_disabled = false;
588
589 _leave_federation();
590 }
591
_foreach_job_completed(void * object,void * arg)592 static int _foreach_job_completed(void *object, void *arg)
593 {
594 job_record_t *job_ptr = (job_record_t *) object;
595
596 if (IS_JOB_COMPLETED(job_ptr))
597 return SLURM_SUCCESS;
598
599 return SLURM_ERROR;
600 }
601
_foreach_job_no_requeue(void * object,void * arg)602 static int _foreach_job_no_requeue(void *object, void *arg)
603 {
604 job_record_t *job_ptr = (job_record_t *) object;
605
606 if (job_ptr->details)
607 job_ptr->details->requeue = 0;
608
609 return SLURM_SUCCESS;
610 }
611
_job_watch_thread(void * arg)612 static void *_job_watch_thread(void *arg)
613 {
614 struct timespec ts = {0, 0};
615 slurmctld_lock_t job_write_fed_write_lock = {
616 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
617
618 #if HAVE_SYS_PRCTL_H
619 if (prctl(PR_SET_NAME, "fed_jobw", NULL, NULL, NULL) < 0) {
620 error("%s: cannot set my name to %s %m", __func__, "fed_jobw");
621 }
622 #endif
623 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
624 info("%s: started job_watch thread", __func__);
625
626 while (!slurmctld_config.shutdown_time && !stop_job_watch_thread) {
627 int tot_jobs, comp_jobs;
628
629 slurm_mutex_lock(&job_watch_mutex);
630 if (!slurmctld_config.shutdown_time && !stop_job_watch_thread) {
631 ts.tv_sec = time(NULL) + 5;
632 slurm_cond_timedwait(&job_watch_cond,
633 &job_watch_mutex, &ts);
634 }
635 slurm_mutex_unlock(&job_watch_mutex);
636
637 if (slurmctld_config.shutdown_time || stop_job_watch_thread)
638 break;
639
640 lock_slurmctld(job_write_fed_write_lock);
641
642 if (!fed_mgr_cluster_rec) {
643 /* not part of the federation anymore */
644 unlock_slurmctld(job_write_fed_write_lock);
645 break;
646 }
647
648 if ((tot_jobs = list_count(job_list)) !=
649 (comp_jobs = list_for_each(job_list, _foreach_job_completed,
650 NULL))) {
651 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR) {
652 /* list_for_each negates the count if failed. */
653 int remaining_jobs = tot_jobs + comp_jobs + 1;
654 info("%s: at least %d remaining jobs before being drained and/or removed from the federation",
655 __func__, remaining_jobs);
656 }
657 } else {
658 if (fed_mgr_cluster_rec->fed.state &
659 CLUSTER_FED_STATE_REMOVE) {
660 /* prevent federated jobs from being requeued */
661 list_for_each(job_list, _foreach_job_no_requeue,
662 NULL);
663 _remove_self_from_federation();
664 } else if (fed_mgr_cluster_rec->fed.state &
665 CLUSTER_FED_STATE_DRAIN) {
666 _mark_self_as_drained();
667 }
668
669 unlock_slurmctld(job_write_fed_write_lock);
670
671 break;
672 }
673
674 unlock_slurmctld(job_write_fed_write_lock);
675 }
676
677 job_watch_thread_running = false;
678
679 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
680 info("%s: exiting job watch thread", __func__);
681
682 return NULL;
683 }
684
_spawn_job_watch_thread()685 static void _spawn_job_watch_thread()
686 {
687 if (!job_watch_thread_running) {
688 /* Detach the thread since it will exit once the cluster is
689 * drained or removed. */
690 slurm_mutex_lock(&job_watch_mutex);
691 stop_job_watch_thread = false;
692 job_watch_thread_running = true;
693 slurm_thread_create_detached(NULL, _job_watch_thread, NULL);
694 slurm_mutex_unlock(&job_watch_mutex);
695 } else {
696 info("a job_watch_thread already exists");
697 }
698 }
699
_remove_job_watch_thread()700 static void _remove_job_watch_thread()
701 {
702 if (job_watch_thread_running) {
703 slurm_mutex_lock(&job_watch_mutex);
704 stop_job_watch_thread = true;
705 slurm_cond_broadcast(&job_watch_cond);
706 slurm_mutex_unlock(&job_watch_mutex);
707 }
708 }
709
_clear_recv_conns(void * object,void * arg)710 static int _clear_recv_conns(void *object, void *arg)
711 {
712 slurmdb_cluster_rec_t *cluster = (slurmdb_cluster_rec_t *)object;
713 cluster->fed.recv = NULL;
714
715 return SLURM_SUCCESS;
716 }
717
718 /*
719 * Must have FED unlocked prior to entering
720 */
_fed_mgr_ptr_init(slurmdb_federation_rec_t * db_fed,slurmdb_cluster_rec_t * cluster,uint64_t * added_clusters)721 static void _fed_mgr_ptr_init(slurmdb_federation_rec_t *db_fed,
722 slurmdb_cluster_rec_t *cluster,
723 uint64_t *added_clusters)
724 {
725 ListIterator c_itr;
726 slurmdb_cluster_rec_t *tmp_cluster, *db_cluster;
727 uint32_t cluster_state;
728 int base_state;
729 bool drain_flag;
730
731 slurmctld_lock_t fed_write_lock = {
732 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
733
734 xassert(cluster);
735
736 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
737 info("Joining federation %s", db_fed->name);
738
739 lock_slurmctld(fed_write_lock);
740 if (fed_mgr_fed_rec) {
741 /* we are already part of a federation, preserve existing
742 * conenctions */
743 c_itr = list_iterator_create(db_fed->cluster_list);
744 while ((db_cluster = list_next(c_itr))) {
745 if (!xstrcmp(db_cluster->name,
746 slurmctld_conf.cluster_name)) {
747 fed_mgr_cluster_rec = db_cluster;
748 continue;
749 }
750 if (!(tmp_cluster =
751 fed_mgr_get_cluster_by_name(db_cluster->name))) {
752 *added_clusters |=
753 FED_SIBLING_BIT(db_cluster->fed.id);
754 /* don't worry about destroying the connection
755 * here. It will happen below when we free
756 * fed_mgr_fed_rec (automagically).
757 */
758 continue;
759 }
760 slurm_mutex_lock(&tmp_cluster->lock);
761 /* transfer over the connections we already have */
762 db_cluster->fed.send = tmp_cluster->fed.send;
763 tmp_cluster->fed.send = NULL;
764 db_cluster->fed.recv = tmp_cluster->fed.recv;
765 tmp_cluster->fed.recv = NULL;
766 db_cluster->send_rpc = tmp_cluster->send_rpc;
767 tmp_cluster->send_rpc = NULL;
768 db_cluster->fed.sync_sent =
769 tmp_cluster->fed.sync_sent;
770 db_cluster->fed.sync_recvd =
771 tmp_cluster->fed.sync_recvd;
772 slurm_mutex_unlock(&tmp_cluster->lock);
773
774 list_delete_all(fed_mgr_fed_rec->cluster_list,
775 slurmdb_find_cluster_in_list,
776 db_cluster->name);
777 }
778 list_iterator_destroy(c_itr);
779
780 /* Remove any existing clusters that were part of the federation
781 * before and are not now. Don't free the recv connection now,
782 * it will get destroyed when the recv thread exits. */
783 list_for_each(fed_mgr_fed_rec->cluster_list, _clear_recv_conns,
784 NULL);
785 slurmdb_destroy_federation_rec(fed_mgr_fed_rec);
786 } else
787 fed_mgr_cluster_rec = cluster;
788
789 fed_mgr_fed_rec = db_fed;
790
791 /* Set scheduling and submissions states */
792 cluster_state = fed_mgr_cluster_rec->fed.state;
793 base_state = (cluster_state & CLUSTER_FED_STATE_BASE);
794 drain_flag = (cluster_state & CLUSTER_FED_STATE_DRAIN);
795
796 unlock_slurmctld(fed_write_lock);
797
798 if (drain_flag) {
799 slurmctld_config.scheduling_disabled = false;
800 slurmctld_config.submissions_disabled = true;
801
802 /* INACTIVE + DRAIN == DRAINED (already) */
803 if (base_state == CLUSTER_FED_STATE_ACTIVE)
804 _spawn_job_watch_thread();
805
806 } else if (base_state == CLUSTER_FED_STATE_ACTIVE) {
807 slurmctld_config.scheduling_disabled = false;
808 slurmctld_config.submissions_disabled = false;
809 } else if (base_state == CLUSTER_FED_STATE_INACTIVE) {
810 slurmctld_config.scheduling_disabled = true;
811 slurmctld_config.submissions_disabled = true;
812 }
813 if (!drain_flag)
814 _remove_job_watch_thread();
815 }
816
817 /*
818 * Must have FED write lock prior to entering
819 */
_leave_federation(void)820 static void _leave_federation(void)
821 {
822 if (!fed_mgr_fed_rec)
823 return;
824
825 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
826 info("Leaving federation %s", fed_mgr_fed_rec->name);
827
828 _close_sibling_conns();
829 _remove_job_watch_thread();
830 slurmdb_destroy_federation_rec(fed_mgr_fed_rec);
831 fed_mgr_fed_rec = NULL;
832 fed_mgr_cluster_rec = NULL;
833 }
834
_persist_callback_fini(void * arg)835 static void _persist_callback_fini(void *arg)
836 {
837 slurm_persist_conn_t *persist_conn = arg;
838 slurmdb_cluster_rec_t *cluster;
839 slurmctld_lock_t fed_write_lock = {
840 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
841
842 /* If we are shutting down just return or you will get deadlock since
843 * all these locks are already locked.
844 */
845 if (!persist_conn || *persist_conn->shutdown)
846 return;
847 lock_slurmctld(fed_write_lock);
848
849 /* shuting down */
850 if (!fed_mgr_fed_rec) {
851 unlock_slurmctld(fed_write_lock);
852 return;
853 }
854
855 if (!(cluster =
856 fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) {
857 info("Couldn't find cluster %s?",
858 persist_conn->cluster_name);
859 unlock_slurmctld(fed_write_lock);
860 return;
861 }
862
863 slurm_mutex_lock(&cluster->lock);
864
865 /* This will get handled at the end of the thread, don't free it here */
866 cluster->fed.recv = NULL;
867 // persist_conn = cluster->fed.recv;
868 // slurm_persist_conn_close(persist_conn);
869
870 persist_conn = cluster->fed.send;
871 if (persist_conn) {
872 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
873 info("Closing send to sibling cluster %s",
874 cluster->name);
875 slurm_persist_conn_destroy(persist_conn);
876 cluster->fed.send = NULL;
877 xfree(cluster->control_host);
878 cluster->control_port = 0;
879 }
880 cluster->fed.sync_recvd = false;
881 cluster->fed.sync_sent = false;
882
883 slurm_mutex_unlock(&cluster->lock);
884 unlock_slurmctld(fed_write_lock);
885 }
886
_join_federation(slurmdb_federation_rec_t * fed,slurmdb_cluster_rec_t * cluster,uint64_t * added_clusters)887 static void _join_federation(slurmdb_federation_rec_t *fed,
888 slurmdb_cluster_rec_t *cluster,
889 uint64_t *added_clusters)
890 {
891 slurmctld_lock_t fed_read_lock = {
892 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
893
894 _fed_mgr_ptr_init(fed, cluster, added_clusters);
895
896 /* We must open the connections after we get out of the
897 * write_lock or we will end up in deadlock.
898 */
899 lock_slurmctld(fed_read_lock);
900 _open_persist_sends();
901 unlock_slurmctld(fed_read_lock);
902 }
903
_persist_update_job(slurmdb_cluster_rec_t * conn,uint32_t job_id,job_desc_msg_t * data,uid_t uid)904 static int _persist_update_job(slurmdb_cluster_rec_t *conn, uint32_t job_id,
905 job_desc_msg_t *data, uid_t uid)
906 {
907 int rc;
908 slurm_msg_t req_msg, tmp_msg;
909 sib_msg_t sib_msg;
910 Buf buffer;
911
912 slurm_msg_t_init(&tmp_msg);
913 tmp_msg.msg_type = REQUEST_UPDATE_JOB;
914 tmp_msg.data = data;
915 tmp_msg.protocol_version = conn->rpc_version;
916
917 buffer = init_buf(BUF_SIZE);
918 pack_msg(&tmp_msg, buffer);
919
920 memset(&sib_msg, 0, sizeof(sib_msg));
921 sib_msg.sib_msg_type = FED_JOB_UPDATE;
922 sib_msg.data_buffer = buffer;
923 sib_msg.data_type = tmp_msg.msg_type;
924 sib_msg.data_version = tmp_msg.protocol_version;
925 sib_msg.req_uid = uid;
926 sib_msg.job_id = job_id;
927
928 slurm_msg_t_init(&req_msg);
929 req_msg.msg_type = REQUEST_SIB_MSG;
930 req_msg.protocol_version = tmp_msg.protocol_version;
931 req_msg.data = &sib_msg;
932
933 rc = _queue_rpc(conn, &req_msg, 0, false);
934
935 free_buf(buffer);
936
937 return rc;
938 }
939
_persist_update_job_resp(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t return_code)940 static int _persist_update_job_resp(slurmdb_cluster_rec_t *conn,
941 uint32_t job_id, uint32_t return_code)
942 {
943 int rc;
944 slurm_msg_t req_msg;
945 sib_msg_t sib_msg;
946
947 slurm_msg_t_init(&req_msg);
948
949 memset(&sib_msg, 0, sizeof(sib_msg));
950 sib_msg.sib_msg_type = FED_JOB_UPDATE_RESPONSE;
951 sib_msg.job_id = job_id;
952 sib_msg.return_code = return_code;
953
954 slurm_msg_t_init(&req_msg);
955 req_msg.msg_type = REQUEST_SIB_MSG;
956 req_msg.protocol_version = conn->rpc_version;
957 req_msg.data = &sib_msg;
958
959 rc = _queue_rpc(conn, &req_msg, job_id, false);
960
961 return rc;
962 }
963
964 /*
965 * Remove a sibling job that won't be scheduled
966 *
967 * IN conn - sibling connection
968 * IN job_id - the job's id
969 * IN job_state - state of job.
970 * IN return_code - return code of job.
971 * IN start_time - time the fed job started
972 * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
973 */
_persist_fed_job_revoke(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t job_state,uint32_t return_code,time_t start_time)974 static int _persist_fed_job_revoke(slurmdb_cluster_rec_t *conn, uint32_t job_id,
975 uint32_t job_state, uint32_t return_code,
976 time_t start_time)
977 {
978 int rc;
979 slurm_msg_t req_msg;
980 sib_msg_t sib_msg;
981
982 if (!conn->fed.send ||
983 (((slurm_persist_conn_t *)conn->fed.send)->fd == -1))
984 return SLURM_SUCCESS;
985
986 slurm_msg_t_init(&req_msg);
987
988 memset(&sib_msg, 0, sizeof(sib_msg));
989 sib_msg.sib_msg_type = FED_JOB_COMPLETE;
990 sib_msg.job_id = job_id;
991 sib_msg.job_state = job_state;
992 sib_msg.start_time = start_time;
993 sib_msg.return_code = return_code;
994
995 slurm_msg_t_init(&req_msg);
996 req_msg.msg_type = REQUEST_SIB_MSG;
997 req_msg.protocol_version = conn->rpc_version;
998 req_msg.data = &sib_msg;
999
1000 rc = _queue_rpc(conn, &req_msg, job_id, false);
1001
1002 return rc;
1003 }
1004
_persist_fed_job_response(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t return_code)1005 static int _persist_fed_job_response(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1006 uint32_t return_code)
1007 {
1008 int rc;
1009 slurm_msg_t req_msg;
1010 sib_msg_t sib_msg;
1011
1012 slurm_msg_t_init(&req_msg);
1013
1014 memset(&sib_msg, 0, sizeof(sib_msg));
1015 sib_msg.sib_msg_type = FED_JOB_SUBMIT_RESP;
1016 sib_msg.job_id = job_id;
1017 sib_msg.return_code = return_code;
1018
1019 slurm_msg_t_init(&req_msg);
1020 req_msg.msg_type = REQUEST_SIB_MSG;
1021 req_msg.protocol_version = conn->rpc_version;
1022 req_msg.data = &sib_msg;
1023
1024 rc = _queue_rpc(conn, &req_msg, job_id, false);
1025
1026 return rc;
1027 }
1028
1029 /*
1030 * Grab the fed lock on the sibling job.
1031 *
1032 * This message doesn't need to be queued because the other side just locks the
1033 * fed_job_list, checks and gets out -- doesn't need the internal locks.
1034 *
1035 * IN conn - sibling connection
1036 * IN job_id - the job's id
1037 * IN cluster_id - cluster id of the cluster locking
1038 * IN do_lock - true == lock, false == unlock
1039 * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
1040 */
_persist_fed_job_lock_bool(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id,bool do_lock)1041 static int _persist_fed_job_lock_bool(slurmdb_cluster_rec_t *conn,
1042 uint32_t job_id, uint32_t cluster_id,
1043 bool do_lock)
1044 {
1045 int rc;
1046 slurm_msg_t req_msg, resp_msg;
1047
1048 slurm_msg_t_init(&req_msg);
1049
1050 sib_msg_t sib_msg;
1051 memset(&sib_msg, 0, sizeof(sib_msg_t));
1052 sib_msg.job_id = job_id;
1053 sib_msg.cluster_id = cluster_id;
1054
1055 if (do_lock)
1056 req_msg.msg_type = REQUEST_SIB_JOB_LOCK;
1057 else
1058 req_msg.msg_type = REQUEST_SIB_JOB_UNLOCK;
1059
1060 req_msg.protocol_version = conn->rpc_version;
1061 req_msg.data = &sib_msg;
1062
1063 if (_send_recv_msg(conn, &req_msg, &resp_msg, false)) {
1064 rc = SLURM_ERROR;
1065 goto end_it;
1066 }
1067
1068 switch (resp_msg.msg_type) {
1069 case RESPONSE_SLURM_RC:
1070 if ((rc = slurm_get_return_code(resp_msg.msg_type,
1071 resp_msg.data))) {
1072 slurm_seterrno(rc);
1073 rc = SLURM_ERROR;
1074 }
1075 break;
1076 default:
1077 slurm_seterrno(SLURM_UNEXPECTED_MSG_ERROR);
1078 rc = SLURM_ERROR;
1079 break;
1080 }
1081
1082 end_it:
1083 slurm_free_msg_members(&resp_msg);
1084
1085 return rc;
1086 }
1087
_persist_fed_job_lock(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id)1088 static int _persist_fed_job_lock(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1089 uint32_t cluster_id)
1090 {
1091 return _persist_fed_job_lock_bool(conn, job_id, cluster_id, true);
1092 }
1093
_persist_fed_job_unlock(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id)1094 static int _persist_fed_job_unlock(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1095 uint32_t cluster_id)
1096 {
1097 return _persist_fed_job_lock_bool(conn, job_id, cluster_id, false);
1098 }
1099
1100 /*
1101 * Tell the origin cluster that the job was started
1102 *
1103 * This message is queued up as an rpc and a fed_job_update so that it can
1104 * cancel the siblings without holding up the internal locks.
1105 *
1106 * IN conn - sibling connection
1107 * IN job_id - the job's id
1108 * IN cluster_id - cluster id of the cluster that started the job
1109 * IN start_time - time the fed job started
1110 * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
1111 */
_persist_fed_job_start(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id,time_t start_time)1112 static int _persist_fed_job_start(slurmdb_cluster_rec_t *conn,
1113 uint32_t job_id, uint32_t cluster_id,
1114 time_t start_time)
1115 {
1116 int rc;
1117 slurm_msg_t req_msg;
1118
1119 slurm_msg_t_init(&req_msg);
1120
1121 sib_msg_t sib_msg;
1122 memset(&sib_msg, 0, sizeof(sib_msg_t));
1123 sib_msg.sib_msg_type = FED_JOB_START;
1124 sib_msg.job_id = job_id;
1125 sib_msg.cluster_id = cluster_id;
1126 sib_msg.start_time = start_time;
1127
1128 req_msg.msg_type = REQUEST_SIB_MSG;
1129 req_msg.protocol_version = conn->rpc_version;
1130 req_msg.data = &sib_msg;
1131
1132 rc = _queue_rpc(conn, &req_msg, job_id, false);
1133
1134 return rc;
1135 }
1136
1137 /*
1138 * Send the specified signal to all steps of an existing job
1139 * IN job_id - the job's id
1140 * IN signal - signal number
1141 * IN flags - see KILL_JOB_* flags above
1142 * IN uid - uid of user making the request.
1143 * RET SLURM_SUCCESS on success, SLURM_ERROR otherwise.
1144 */
_persist_fed_job_cancel(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint16_t signal,uint16_t flags,uid_t uid)1145 static int _persist_fed_job_cancel(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1146 uint16_t signal, uint16_t flags,
1147 uid_t uid)
1148 {
1149 int rc = SLURM_SUCCESS;
1150 slurm_msg_t req_msg, tmp_msg;
1151 sib_msg_t sib_msg;
1152 job_step_kill_msg_t kill_req;
1153 Buf buffer;
1154
1155 /* Build and pack a kill_req msg to put in a sib_msg */
1156 memset(&kill_req, 0, sizeof(job_step_kill_msg_t));
1157 kill_req.job_id = job_id;
1158 kill_req.sjob_id = NULL;
1159 kill_req.job_step_id = SLURM_BATCH_SCRIPT;
1160 kill_req.signal = signal;
1161 kill_req.flags = flags;
1162
1163 slurm_msg_t_init(&tmp_msg);
1164 tmp_msg.msg_type = REQUEST_CANCEL_JOB_STEP;
1165 tmp_msg.data = &kill_req;
1166 tmp_msg.protocol_version = conn->rpc_version;
1167
1168 buffer = init_buf(BUF_SIZE);
1169 pack_msg(&tmp_msg, buffer);
1170
1171 memset(&sib_msg, 0, sizeof(sib_msg));
1172 sib_msg.sib_msg_type = FED_JOB_CANCEL;
1173 sib_msg.data_buffer = buffer;
1174 sib_msg.data_type = tmp_msg.msg_type;
1175 sib_msg.data_version = tmp_msg.protocol_version;
1176 sib_msg.req_uid = uid;
1177
1178 slurm_msg_t_init(&req_msg);
1179 req_msg.msg_type = REQUEST_SIB_MSG;
1180 req_msg.protocol_version = tmp_msg.protocol_version;
1181 req_msg.data = &sib_msg;
1182
1183 rc = _queue_rpc(conn, &req_msg, job_id, false);
1184
1185 free_buf(buffer);
1186
1187 return rc;
1188 }
1189
1190 /*
1191 * Tell the origin cluster to requeue the job
1192 *
1193 * IN conn - sibling connection
1194 * IN job_id - the job's id
1195 * IN start_time - time the fed job started
1196 * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
1197 */
_persist_fed_job_requeue(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t flags)1198 static int _persist_fed_job_requeue(slurmdb_cluster_rec_t *conn,
1199 uint32_t job_id, uint32_t flags)
1200 {
1201 int rc;
1202 requeue_msg_t requeue_req;
1203 slurm_msg_t req_msg, tmp_msg;
1204 sib_msg_t sib_msg;
1205 Buf buffer;
1206
1207 xassert(conn);
1208
1209 requeue_req.job_id = job_id;
1210 requeue_req.job_id_str = NULL;
1211 requeue_req.flags = flags;
1212
1213 slurm_msg_t_init(&tmp_msg);
1214 tmp_msg.msg_type = REQUEST_JOB_REQUEUE;
1215 tmp_msg.data = &requeue_req;
1216 tmp_msg.protocol_version = conn->rpc_version;
1217
1218 buffer = init_buf(BUF_SIZE);
1219 pack_msg(&tmp_msg, buffer);
1220
1221 memset(&sib_msg, 0, sizeof(sib_msg));
1222 sib_msg.sib_msg_type = FED_JOB_REQUEUE;
1223 sib_msg.job_id = job_id;
1224 sib_msg.data_buffer = buffer;
1225 sib_msg.data_type = tmp_msg.msg_type;
1226 sib_msg.data_version = tmp_msg.protocol_version;
1227
1228 slurm_msg_t_init(&req_msg);
1229 req_msg.msg_type = REQUEST_SIB_MSG;
1230 req_msg.protocol_version = tmp_msg.protocol_version;
1231 req_msg.data = &sib_msg;
1232
1233 rc = _queue_rpc(conn, &req_msg, job_id, false);
1234
1235 free_buf(buffer);
1236
1237 return rc;
1238 }
1239
_find_sibling_by_id(void * x,void * key)1240 static int _find_sibling_by_id(void *x, void *key)
1241 {
1242 slurmdb_cluster_rec_t *object = (slurmdb_cluster_rec_t *)x;
1243 uint32_t id = *(uint32_t *)key;
1244
1245 if (object->fed.id == id)
1246 return 1;
1247
1248 return 0;
1249 }
1250
add_fed_job_info(job_record_t * job_ptr)1251 extern void add_fed_job_info(job_record_t *job_ptr)
1252 {
1253 fed_job_info_t *job_info;
1254
1255 job_info = xmalloc(sizeof(fed_job_info_t));
1256 job_info->job_id = job_ptr->job_id;
1257 job_info->siblings_active = job_ptr->fed_details->siblings_active;
1258 job_info->siblings_viable = job_ptr->fed_details->siblings_viable;
1259
1260 slurm_mutex_lock(&fed_job_list_mutex);
1261 if (fed_job_list)
1262 list_append(fed_job_list, job_info);
1263 else
1264 xfree(job_info);
1265 slurm_mutex_unlock(&fed_job_list_mutex);
1266 }
1267
_delete_fed_job_info_by_id(void * object,void * arg)1268 static int _delete_fed_job_info_by_id(void *object, void *arg)
1269 {
1270 fed_job_info_t *job_info = (fed_job_info_t *)object;
1271 uint32_t job_id = *(uint32_t *)arg;
1272
1273 if (job_info->job_id == job_id)
1274 return true;
1275
1276 return false;
1277 }
1278
fed_mgr_remove_fed_job_info(uint32_t job_id)1279 extern void fed_mgr_remove_fed_job_info(uint32_t job_id)
1280 {
1281 slurm_mutex_lock(&fed_job_list_mutex);
1282
1283 if (fed_job_list)
1284 list_delete_all(fed_job_list, _delete_fed_job_info_by_id,
1285 &job_id);
1286
1287 slurm_mutex_unlock(&fed_job_list_mutex);
1288 }
1289
_list_find_fed_job_info_by_jobid(void * x,void * key)1290 static int _list_find_fed_job_info_by_jobid(void *x, void *key)
1291 {
1292 fed_job_info_t *job_info = (fed_job_info_t *)x;
1293 uint32_t job_id = *(uint32_t *)key;
1294
1295 if (job_info->job_id == job_id)
1296 return 1;
1297
1298 return 0;
1299 }
1300
1301 /* Must have fed_job_mutex before entering */
_find_fed_job_info(uint32_t job_id)1302 static fed_job_info_t *_find_fed_job_info(uint32_t job_id)
1303 {
1304 if (!fed_job_list)
1305 return NULL;
1306 return list_find_first(fed_job_list, _list_find_fed_job_info_by_jobid,
1307 &job_id);
1308 }
1309
_destroy_fed_job_update_info(void * object)1310 static void _destroy_fed_job_update_info(void *object)
1311 {
1312 fed_job_update_info_t *job_update_info =
1313 (fed_job_update_info_t *)object;
1314
1315 if (job_update_info) {
1316 xfree(job_update_info->siblings_str);
1317 xfree(job_update_info->submit_cluster);
1318 slurm_free_job_info_msg(job_update_info->job_info_msg);
1319 slurm_free_job_step_kill_msg(job_update_info->kill_msg);
1320 slurm_free_job_desc_msg(job_update_info->submit_desc);
1321 xfree(job_update_info);
1322 }
1323 }
1324
_destroy_dep_msg(void * object)1325 static void _destroy_dep_msg(void *object)
1326 {
1327 slurm_free_dep_msg((dep_msg_t *)object);
1328 }
1329
_destroy_dep_update_msg(void * object)1330 static void _destroy_dep_update_msg(void *object)
1331 {
1332 slurm_free_dep_update_origin_msg((dep_update_origin_msg_t *)object);
1333 }
1334
_destroy_dep_job(void * object)1335 static void _destroy_dep_job(void *object)
1336 {
1337 job_record_t *job_ptr = (job_record_t *)object;
1338
1339 if (job_ptr) {
1340 xassert(job_ptr->magic == JOB_MAGIC);
1341 xfree(job_ptr->fed_details);
1342 xfree(job_ptr->name);
1343 if (job_ptr->details) {
1344 xassert(job_ptr->details->magic == DETAILS_MAGIC);
1345 xfree(job_ptr->details->dependency);
1346 FREE_NULL_LIST(job_ptr->details->depend_list);
1347 xfree(job_ptr->details);
1348 }
1349 free_null_array_recs(job_ptr);
1350 job_ptr->magic = 0;
1351 job_ptr->job_id = 0;
1352 job_ptr->user_id = 0;
1353 xfree(job_ptr);
1354 }
1355 }
1356
fed_mgr_get_cluster_by_id(uint32_t id)1357 extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_id(uint32_t id)
1358 {
1359 uint32_t key = id;
1360 return list_find_first(fed_mgr_fed_rec->cluster_list,
1361 _find_sibling_by_id, &key);
1362 }
1363
fed_mgr_get_cluster_by_name(char * sib_name)1364 extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_name(char *sib_name)
1365 {
1366 if (!fed_mgr_fed_rec)
1367 return NULL;
1368
1369 return list_find_first(fed_mgr_fed_rec->cluster_list,
1370 slurmdb_find_cluster_in_list, sib_name);
1371 }
1372
1373 /*
1374 * Revoke all sibling jobs except from cluster_id -- which the request came from
1375 *
1376 * IN job_ptr - job to revoke
1377 * IN cluster_id - cluster id of cluster that initiated call. Don're revoke
1378 * the job on this cluster -- it's the one that started the fed job.
1379 * IN revoke_sibs - bitmap of siblings to revoke.
1380 * IN start_time - time the fed job started
1381 */
_revoke_sibling_jobs(uint32_t job_id,uint32_t cluster_id,uint64_t revoke_sibs,time_t start_time)1382 static void _revoke_sibling_jobs(uint32_t job_id, uint32_t cluster_id,
1383 uint64_t revoke_sibs, time_t start_time)
1384 {
1385 int id = 1;
1386
1387 if (!fed_mgr_fed_rec) /* Not part of federation anymore */
1388 return;
1389
1390 while (revoke_sibs) {
1391 if ((revoke_sibs & 1) &&
1392 (id != fed_mgr_cluster_rec->fed.id) &&
1393 (id != cluster_id)) {
1394 slurmdb_cluster_rec_t *cluster =
1395 fed_mgr_get_cluster_by_id(id);
1396 if (!cluster) {
1397 error("couldn't find cluster rec by id %d", id);
1398 goto next_job;
1399 }
1400
1401 _persist_fed_job_revoke(cluster, job_id, JOB_CANCELLED,
1402 0, start_time);
1403 }
1404
1405 next_job:
1406 revoke_sibs >>= 1;
1407 id++;
1408 }
1409 }
1410
_remove_sibling_bit(job_record_t * job_ptr,slurmdb_cluster_rec_t * sibling)1411 static int _remove_sibling_bit(job_record_t *job_ptr,
1412 slurmdb_cluster_rec_t *sibling)
1413 {
1414 uint32_t origin_id;
1415
1416 if (!_is_fed_job(job_ptr, &origin_id))
1417 return ESLURM_JOB_NOT_FEDERATED;
1418
1419 job_ptr->fed_details->siblings_active &=
1420 ~(FED_SIBLING_BIT(sibling->fed.id));
1421 job_ptr->fed_details->siblings_viable &=
1422 ~(FED_SIBLING_BIT(sibling->fed.id));
1423
1424 if (!(job_ptr->fed_details->siblings_viable &
1425 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
1426 job_ptr->job_state |= JOB_REVOKED;
1427 else if (!job_ptr->fed_details->cluster_lock)
1428 job_ptr->job_state &= ~JOB_REVOKED;
1429
1430 update_job_fed_details(job_ptr);
1431
1432 return SLURM_SUCCESS;
1433 }
1434
1435 /*
1436 * Remove all pending federated jobs from the origin cluster.
1437 */
_cleanup_removed_origin_jobs(void)1438 static void _cleanup_removed_origin_jobs(void)
1439 {
1440 ListIterator job_itr;
1441 job_record_t *job_ptr;
1442 time_t now = time(NULL);
1443 uint32_t origin_id, sibling_id;
1444 uint64_t sibling_bit;
1445
1446 if (!fed_mgr_cluster_rec)
1447 return;
1448
1449 sibling_id = fed_mgr_cluster_rec->fed.id;
1450 sibling_bit = FED_SIBLING_BIT(sibling_id);
1451
1452 job_itr = list_iterator_create(job_list);
1453 while ((job_ptr = list_next(job_itr))) {
1454 bool running_remotely = false;
1455 uint64_t siblings_viable;
1456
1457 if (IS_JOB_COMPLETED(job_ptr))
1458 continue;
1459
1460 if (!_is_fed_job(job_ptr, &origin_id))
1461 continue;
1462
1463 siblings_viable = job_ptr->fed_details->siblings_viable;
1464
1465 if (sibling_id == origin_id &&
1466 job_ptr->fed_details->cluster_lock)
1467 running_remotely = true;
1468
1469 /* free fed_job_details so it can't call home. */
1470 free_job_fed_details(&job_ptr->fed_details);
1471
1472 /* allow running/completing jobs to finish. */
1473 if (IS_JOB_COMPLETED(job_ptr) ||
1474 IS_JOB_COMPLETING(job_ptr) ||
1475 IS_JOB_RUNNING(job_ptr))
1476 continue;
1477
1478 /* If this job is the only viable sibling then just let
1479 * it run as a non-federated job. The origin should remove the
1480 * tracking job. */
1481 if (!(siblings_viable & ~sibling_bit))
1482 continue;
1483
1484 /*
1485 * If a job is pending and not revoked on the origin cluster
1486 * leave it as a non-federated job.
1487 */
1488 if ((origin_id == sibling_id) &&
1489 IS_JOB_PENDING(job_ptr) && !IS_JOB_REVOKED(job_ptr))
1490 continue;
1491
1492 /* Free the resp_host so that the srun doesn't get
1493 * signaled about the job going away. The job could
1494 * still run on another sibling. */
1495 if (running_remotely ||
1496 (origin_id != sibling_id))
1497 xfree(job_ptr->resp_host);
1498
1499 job_ptr->job_state = JOB_CANCELLED|JOB_REVOKED;
1500 job_ptr->start_time = now;
1501 job_ptr->end_time = now;
1502 job_completion_logger(job_ptr, false);
1503 }
1504 list_iterator_destroy(job_itr);
1505
1506 /* Don't test these jobs for remote dependencies anymore */
1507 if (remote_dep_job_list) {
1508 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
1509 info("%s: Remove all jobs in remote_dep_job_list",
1510 __func__);
1511 slurm_mutex_lock(&dep_job_list_mutex);
1512 list_flush(remote_dep_job_list);
1513 slurm_mutex_unlock(&dep_job_list_mutex);
1514 }
1515 }
1516
1517 /*
1518 * Remove all pending jobs that originated from the given cluster.
1519 */
_cleanup_removed_cluster_jobs(slurmdb_cluster_rec_t * cluster)1520 static void _cleanup_removed_cluster_jobs(slurmdb_cluster_rec_t *cluster)
1521 {
1522 ListIterator job_itr;
1523 job_record_t *job_ptr;
1524 time_t now = time(NULL);
1525 uint32_t origin_id, sibling_id;
1526 uint64_t origin_bit, sibling_bit;
1527
1528 if (!fed_mgr_cluster_rec)
1529 return;
1530
1531 sibling_id = fed_mgr_cluster_rec->fed.id;
1532 sibling_bit = FED_SIBLING_BIT(sibling_id);
1533
1534 job_itr = list_iterator_create(job_list);
1535 while ((job_ptr = list_next(job_itr))) {
1536 uint64_t siblings_viable;
1537
1538 if (IS_JOB_COMPLETED(job_ptr))
1539 continue;
1540
1541 if (!_is_fed_job(job_ptr, &origin_id))
1542 continue;
1543
1544 origin_bit = FED_SIBLING_BIT(origin_id);
1545 siblings_viable = job_ptr->fed_details->siblings_viable;
1546
1547 /* Remove cluster from viable,active siblings */
1548 _remove_sibling_bit(job_ptr, cluster);
1549
1550 /* Remove jobs that
1551 * 1. originated from the removed cluster
1552 * 2. origin jobs that are tracking the running job
1553 * 2. origin jobs that are tracking the pending job */
1554 if (origin_id == cluster->fed.id ||
1555
1556 (job_ptr->fed_details &&
1557 origin_id == sibling_id &&
1558 job_ptr->fed_details->cluster_lock == cluster->fed.id) ||
1559
1560 (siblings_viable & FED_SIBLING_BIT(cluster->fed.id) &&
1561 !(siblings_viable & ~FED_SIBLING_BIT(cluster->fed.id)))) {
1562
1563 /*
1564 * If this job originated from the origin (which is
1565 * being removed) and the origin is not a viable sibling
1566 * and there are more than one sibling then let the job
1567 * remain as a federated job to be scheduled amongst
1568 * it's siblings.
1569 */
1570 if (IS_JOB_PENDING(job_ptr) &&
1571 (origin_id == cluster->fed.id) &&
1572 !(siblings_viable & origin_bit) &&
1573 (siblings_viable & ~sibling_bit))
1574 continue;
1575
1576 /* free fed_job_details so it can't call home. */
1577 free_job_fed_details(&job_ptr->fed_details);
1578
1579 /*
1580 * If this job originated from the origin (which is
1581 * being removed) and this sibling is the only viable
1582 * sibling then let it run/pend as a non-federated job.
1583 */
1584 if ((origin_id == cluster->fed.id) &&
1585 !(siblings_viable & ~sibling_bit))
1586 continue;
1587
1588 if (!(IS_JOB_COMPLETED(job_ptr) ||
1589 IS_JOB_COMPLETING(job_ptr) ||
1590 IS_JOB_RUNNING(job_ptr))) {
1591
1592 /* Free the resp_host so that the srun doesn't
1593 * get signaled about the job going away. The
1594 * job could still run on another sibling. */
1595 xfree(job_ptr->resp_host);
1596
1597 job_ptr->job_state = JOB_CANCELLED|JOB_REVOKED;
1598 job_ptr->start_time = now;
1599 job_ptr->end_time = now;
1600 job_ptr->state_reason = WAIT_NO_REASON;
1601 xfree(job_ptr->state_desc);
1602 job_completion_logger(job_ptr, false);
1603 }
1604 }
1605 }
1606 list_iterator_destroy(job_itr);
1607 }
1608
_handle_removed_clusters(slurmdb_federation_rec_t * db_fed,uint64_t * removed_clusters)1609 static void _handle_removed_clusters(slurmdb_federation_rec_t *db_fed,
1610 uint64_t *removed_clusters)
1611 {
1612 ListIterator itr;
1613 slurmdb_cluster_rec_t *tmp_cluster = NULL;
1614
1615 itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
1616 while ((tmp_cluster = list_next(itr))) {
1617 if (tmp_cluster->name &&
1618 !(list_find_first(db_fed->cluster_list,
1619 slurmdb_find_cluster_in_list,
1620 tmp_cluster->name))) {
1621 info("cluster %s was removed from the federation",
1622 tmp_cluster->name);
1623 *removed_clusters |=
1624 FED_SIBLING_BIT(tmp_cluster->fed.id);
1625 _cleanup_removed_cluster_jobs(tmp_cluster);
1626 }
1627 }
1628 list_iterator_destroy(itr);
1629 }
1630
1631 /* Parse a RESPONSE_CTLD_MULT_MSG message and return a bit set for every
1632 * successful operation */
_parse_resp_ctld_mult(slurm_msg_t * resp_msg)1633 bitstr_t *_parse_resp_ctld_mult(slurm_msg_t *resp_msg)
1634 {
1635 ctld_list_msg_t *ctld_resp_msg;
1636 ListIterator iter = NULL;
1637 bitstr_t *success_bits;
1638 slurm_msg_t sub_msg;
1639 return_code_msg_t *rc_msg;
1640 Buf single_resp_buf = NULL;
1641 int resp_cnt, resp_inx = -1;
1642
1643 xassert(resp_msg->msg_type == RESPONSE_CTLD_MULT_MSG);
1644
1645 ctld_resp_msg = (ctld_list_msg_t *) resp_msg->data;
1646 if (!ctld_resp_msg->my_list) {
1647 error("%s: RESPONSE_CTLD_MULT_MSG has no list component",
1648 __func__);
1649 return NULL;
1650 }
1651
1652 resp_cnt = list_count(ctld_resp_msg->my_list);
1653 success_bits = bit_alloc(resp_cnt);
1654 iter = list_iterator_create(ctld_resp_msg->my_list);
1655 while ((single_resp_buf = list_next(iter))) {
1656 resp_inx++;
1657 slurm_msg_t_init(&sub_msg);
1658 if (unpack16(&sub_msg.msg_type, single_resp_buf) ||
1659 unpack_msg(&sub_msg, single_resp_buf)) {
1660 error("%s: Sub-message unpack error for Message Type:%s",
1661 __func__, rpc_num2string(sub_msg.msg_type));
1662 continue;
1663 }
1664
1665 if (sub_msg.msg_type != RESPONSE_SLURM_RC) {
1666 error("%s: Unexpected Message Type:%s",
1667 __func__, rpc_num2string(sub_msg.msg_type));
1668 } else {
1669 rc_msg = (return_code_msg_t *) sub_msg.data;
1670 if (rc_msg->return_code == SLURM_SUCCESS)
1671 bit_set(success_bits, resp_inx);
1672 }
1673 (void) slurm_free_msg_data(sub_msg.msg_type, sub_msg.data);
1674
1675 }
1676
1677 return success_bits;
1678 }
1679
_fed_mgr_job_allocate_sib(char * sib_name,job_desc_msg_t * job_desc,bool interactive_job)1680 static int _fed_mgr_job_allocate_sib(char *sib_name, job_desc_msg_t *job_desc,
1681 bool interactive_job)
1682 {
1683 int error_code = SLURM_SUCCESS;
1684 job_record_t *job_ptr = NULL;
1685 char *err_msg = NULL;
1686 bool reject_job = false;
1687 slurmdb_cluster_rec_t *sibling;
1688 uid_t uid = 0;
1689 slurm_msg_t response_msg;
1690 slurm_msg_t_init(&response_msg);
1691
1692 xassert(sib_name);
1693 xassert(job_desc);
1694
1695 if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) {
1696 error_code = ESLURM_INVALID_CLUSTER_NAME;
1697 error("Invalid sibling name");
1698 } else if ((job_desc->alloc_node == NULL) ||
1699 (job_desc->alloc_node[0] == '\0')) {
1700 error_code = ESLURM_INVALID_NODE_NAME;
1701 error("REQUEST_SUBMIT_BATCH_JOB lacks alloc_node");
1702 }
1703
1704 if (error_code == SLURM_SUCCESS)
1705 error_code = validate_job_create_req(job_desc,uid,&err_msg);
1706
1707 if (error_code) {
1708 reject_job = true;
1709 goto send_msg;
1710 }
1711
1712 /* Create new job allocation */
1713 job_desc->het_job_offset = NO_VAL;
1714 error_code = job_allocate(job_desc, job_desc->immediate, false, NULL,
1715 interactive_job, uid, &job_ptr, &err_msg,
1716 sibling->rpc_version);
1717 if (!job_ptr ||
1718 (error_code && job_ptr->job_state == JOB_FAILED))
1719 reject_job = true;
1720
1721 if (job_desc->immediate &&
1722 (error_code != SLURM_SUCCESS))
1723 error_code = ESLURM_CAN_NOT_START_IMMEDIATELY;
1724
1725 send_msg:
1726 /* Send response back about origin jobid if an error occured. */
1727 if (reject_job)
1728 _persist_fed_job_response(sibling, job_desc->job_id, error_code);
1729 else {
1730 if (!(job_ptr->fed_details->siblings_viable &
1731 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
1732 job_ptr->job_state |= JOB_REVOKED;
1733
1734 add_fed_job_info(job_ptr);
1735 schedule_job_save(); /* Has own locks */
1736 schedule_node_save(); /* Has own locks */
1737 queue_job_scheduler();
1738 }
1739
1740 xfree(err_msg);
1741
1742 return SLURM_SUCCESS;
1743 }
1744
_do_fed_job_complete(job_record_t * job_ptr,uint32_t job_state,uint32_t exit_code,time_t start_time)1745 static void _do_fed_job_complete(job_record_t *job_ptr, uint32_t job_state,
1746 uint32_t exit_code, time_t start_time)
1747 {
1748 if (job_ptr->job_state & JOB_REQUEUE_FED) {
1749 /* Remove JOB_REQUEUE_FED and JOB_COMPLETING once
1750 * sibling reports that sibling job is done. Leave other
1751 * state in place. JOB_SPECIAL_EXIT may be in the
1752 * states. */
1753 job_ptr->job_state &= ~(JOB_PENDING | JOB_COMPLETING);
1754 batch_requeue_fini(job_ptr);
1755 } else {
1756 fed_mgr_job_revoke(job_ptr, true, job_state, exit_code,
1757 start_time);
1758 }
1759 }
1760
_handle_fed_job_complete(fed_job_update_info_t * job_update_info)1761 static void _handle_fed_job_complete(fed_job_update_info_t *job_update_info)
1762 {
1763 job_record_t *job_ptr;
1764
1765 slurmctld_lock_t job_write_lock = {
1766 NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
1767
1768 lock_slurmctld(job_write_lock);
1769 if (!(job_ptr = find_job_record(job_update_info->job_id))) {
1770 error("%s: failed to find job_record for fed JobId=%u",
1771 __func__, job_update_info->job_id);
1772 } else if (!job_ptr->fed_details) {
1773 debug2("%s: %pJ not federated anymore", __func__, job_ptr);
1774 } else if (IS_JOB_RUNNING(job_ptr)) {
1775 /*
1776 * The job could have started between the time that the origin
1777 * sent the complete message and now.
1778 */
1779 slurm_msg_t msg;
1780 sib_msg_t sib_msg = {0};
1781 job_step_kill_msg_t *kill_req;
1782
1783 /* Build and pack a kill_req msg to put in a sib_msg */
1784 kill_req = xmalloc(sizeof(job_step_kill_msg_t));
1785 kill_req->job_id = job_update_info->job_id;
1786 kill_req->sjob_id = NULL;
1787 kill_req->job_step_id = SLURM_BATCH_SCRIPT;
1788 kill_req->signal = SIGKILL;
1789 kill_req->flags = 0;
1790
1791 sib_msg.data = kill_req;
1792
1793 slurm_msg_t_init(&msg);
1794 msg.data = &sib_msg;
1795
1796 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1797 info("%s: %pJ running now, just going to cancel it.",
1798 __func__, job_ptr);
1799
1800 _q_sib_job_cancel(&msg, job_update_info->uid);
1801 } else {
1802 _do_fed_job_complete(job_ptr, job_update_info->job_state,
1803 job_update_info->return_code,
1804 job_update_info->start_time);
1805 }
1806
1807 unlock_slurmctld(job_write_lock);
1808 }
1809
_handle_fed_job_cancel(fed_job_update_info_t * job_update_info)1810 static void _handle_fed_job_cancel(fed_job_update_info_t *job_update_info)
1811 {
1812 kill_job_step(job_update_info->kill_msg, job_update_info->uid);
1813 }
1814
1815 static void
_handle_fed_job_remove_active_sib_bit(fed_job_update_info_t * job_update_info)1816 _handle_fed_job_remove_active_sib_bit(fed_job_update_info_t *job_update_info)
1817 {
1818 fed_job_info_t *job_info;
1819 job_record_t *job_ptr;
1820 slurmdb_cluster_rec_t *sibling;
1821
1822 slurmctld_lock_t job_write_lock = {
1823 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
1824
1825 lock_slurmctld(job_write_lock);
1826 if (!(job_ptr = find_job_record(job_update_info->job_id))) {
1827 error("%s: failed to find job_record for fed JobId=%u",
1828 __func__, job_update_info->job_id);
1829 unlock_slurmctld(job_write_lock);
1830 return;
1831 } else if (!job_ptr->fed_details) {
1832 debug2("%s: %pJ not federated anymore", __func__, job_ptr);
1833 unlock_slurmctld(job_write_lock);
1834 return;
1835 }
1836
1837 slurm_mutex_lock(&fed_job_list_mutex);
1838 if (!(job_info = _find_fed_job_info(job_update_info->job_id))) {
1839 error("%s: failed to find fed job info for fed JobId=%u",
1840 __func__, job_update_info->job_id);
1841 slurm_mutex_unlock(&fed_job_list_mutex);
1842 unlock_slurmctld(job_write_lock);
1843 return;
1844 }
1845
1846 sibling = fed_mgr_get_cluster_by_name(job_update_info->siblings_str);
1847 if (sibling) {
1848 job_info->siblings_active &=
1849 ~(FED_SIBLING_BIT(sibling->fed.id));
1850 job_ptr->fed_details->siblings_active =
1851 job_info->siblings_active;
1852 update_job_fed_details(job_ptr);
1853 }
1854
1855 slurm_mutex_unlock(&fed_job_list_mutex);
1856 unlock_slurmctld(job_write_lock);
1857 }
1858
_handle_fed_job_requeue(fed_job_update_info_t * job_update_info)1859 static void _handle_fed_job_requeue(fed_job_update_info_t *job_update_info)
1860 {
1861 int rc;
1862 slurmctld_lock_t job_write_lock = {
1863 NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
1864
1865 lock_slurmctld(job_write_lock);
1866 if ((rc = job_requeue(job_update_info->uid, job_update_info->job_id,
1867 NULL, false, job_update_info->flags)))
1868 error("failed to requeue fed JobId=%u - rc:%d",
1869 job_update_info->job_id, rc);
1870 unlock_slurmctld(job_write_lock);
1871 }
1872
1873 /*
1874 * Job has been started, revoke the sibling jobs.
1875 *
1876 * This is the common code between queued and local job starts.
1877 * This can be done when the job is starting on the origin cluster without
1878 * queueing because the cluster already has the job write lock and fed read
1879 * lock.
1880 *
1881 * Must have fed_job_list mutex locked and job write_lock set.
1882 */
_fed_job_start_revoke(fed_job_info_t * job_info,job_record_t * job_ptr,time_t start_time)1883 static void _fed_job_start_revoke(fed_job_info_t *job_info,
1884 job_record_t *job_ptr, time_t start_time)
1885 {
1886 uint32_t cluster_lock;
1887 uint64_t old_active;
1888 uint64_t old_viable;
1889
1890 cluster_lock = job_info->cluster_lock;
1891 old_active = job_info->siblings_active;
1892 old_viable = job_info->siblings_viable;
1893
1894 job_ptr->fed_details->cluster_lock = cluster_lock;
1895 job_ptr->fed_details->siblings_active =
1896 job_info->siblings_active =
1897 FED_SIBLING_BIT(cluster_lock);
1898 update_job_fed_details(job_ptr);
1899
1900 if (old_active & ~FED_SIBLING_BIT(cluster_lock)) {
1901 /* There are siblings that need to be removed */
1902 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1903 info("%s: %pJ is running on cluster id %d, revoking remote siblings (active:%"PRIu64" viable:%"PRIu64")",
1904 __func__, job_ptr, cluster_lock,
1905 old_active, old_viable);
1906
1907 _revoke_sibling_jobs(job_ptr->job_id, cluster_lock,
1908 old_active, start_time);
1909 }
1910 }
1911
_handle_fed_job_start(fed_job_update_info_t * job_update_info)1912 static void _handle_fed_job_start(fed_job_update_info_t *job_update_info)
1913 {
1914 fed_job_info_t *job_info;
1915 job_record_t *job_ptr;
1916
1917 slurmctld_lock_t job_write_lock = {
1918 NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
1919
1920 lock_slurmctld(job_write_lock);
1921 if (!(job_ptr = find_job_record(job_update_info->job_id))) {
1922 error("%s: failed to find job_record for fed JobId=%u",
1923 __func__, job_update_info->job_id);
1924 unlock_slurmctld(job_write_lock);
1925 return;
1926 } else if (!job_ptr->fed_details) {
1927 debug2("%s: %pJ not federated anymore", __func__, job_ptr);
1928 unlock_slurmctld(job_write_lock);
1929 return;
1930 }
1931
1932 slurm_mutex_lock(&fed_job_list_mutex);
1933 if (!(job_info =
1934 _find_fed_job_info(job_update_info->job_id))) {
1935 error("%s: failed to find fed job info for fed JobId=%u",
1936 __func__, job_update_info->job_id);
1937 slurm_mutex_unlock(&fed_job_list_mutex);
1938 unlock_slurmctld(job_write_lock);
1939 return;
1940 }
1941
1942 _fed_job_start_revoke(job_info, job_ptr, job_update_info->start_time);
1943
1944 slurm_mutex_unlock(&fed_job_list_mutex);
1945
1946 if (job_info->cluster_lock != fed_mgr_cluster_rec->fed.id) {
1947 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1948 info("%s: %pJ is running remotely, revoking origin tracking job",
1949 __func__, job_ptr);
1950
1951 /* leave as pending so that it will stay around */
1952 fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0,
1953 job_update_info->start_time);
1954 }
1955
1956 unlock_slurmctld(job_write_lock);
1957 }
1958
_list_find_jobid(void * x,void * key)1959 static int _list_find_jobid(void *x, void *key)
1960 {
1961 uint32_t src_jobid = *(uint32_t *)x;
1962 uint32_t key_jobid = *(uint32_t *)key;
1963
1964 if (src_jobid == key_jobid)
1965 return 1;
1966 return 0;
1967 }
1968
_handle_fed_job_submission(fed_job_update_info_t * job_update_info)1969 static void _handle_fed_job_submission(fed_job_update_info_t *job_update_info)
1970 {
1971 job_record_t *job_ptr;
1972 bool interactive_job =
1973 (job_update_info->type == FED_JOB_SUBMIT_INT) ?
1974 true : false;
1975
1976 slurmctld_lock_t job_write_lock = {
1977 READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
1978
1979 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1980 info("%s: submitting %s sibling JobId=%u from %s",
1981 __func__, (interactive_job) ? "interactive" : "batch",
1982 job_update_info->submit_desc->job_id,
1983 job_update_info->submit_cluster);
1984
1985
1986 /* do this outside the job write lock */
1987 delete_job_desc_files(job_update_info->job_id);
1988 lock_slurmctld(job_write_lock);
1989
1990 if ((job_ptr = find_job_record(job_update_info->job_id))) {
1991 debug("Found existing fed %pJ, going to requeue/unlink it",
1992 job_ptr);
1993 /* Delete job quickly */
1994 job_ptr->job_state |= JOB_REVOKED;
1995 unlink_job_record(job_ptr);
1996
1997 /*
1998 * Make sure that the file delete request is purged from list
1999 * -- added from purge_job_record() -- before job is allocated
2000 * again.
2001 */
2002 list_delete_all(purge_files_list, _list_find_jobid,
2003 &job_update_info->job_id);
2004
2005 }
2006
2007 _fed_mgr_job_allocate_sib(job_update_info->submit_cluster,
2008 job_update_info->submit_desc,
2009 interactive_job);
2010 unlock_slurmctld(job_write_lock);
2011 }
2012
_handle_fed_job_update(fed_job_update_info_t * job_update_info)2013 static void _handle_fed_job_update(fed_job_update_info_t *job_update_info)
2014 {
2015 int rc;
2016 slurm_msg_t msg;
2017 slurm_msg_t_init(&msg);
2018 job_desc_msg_t *job_desc = job_update_info->submit_desc;
2019 int db_inx_max_cnt = 5, i=0;
2020 slurmdb_cluster_rec_t *sibling;
2021
2022 slurmctld_lock_t job_write_lock = {
2023 READ_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
2024 slurmctld_lock_t fed_read_lock = {
2025 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2026
2027 xassert(job_desc);
2028
2029 /* scontrol always sets job_id_str */
2030 job_desc->job_id = job_update_info->job_id;
2031 msg.data = job_desc;
2032
2033 rc = ESLURM_JOB_SETTING_DB_INX;
2034 while (rc == ESLURM_JOB_SETTING_DB_INX) {
2035 lock_slurmctld(job_write_lock);
2036 rc = update_job(&msg, job_update_info->uid, false);
2037 unlock_slurmctld(job_write_lock);
2038
2039 if (i >= db_inx_max_cnt) {
2040 info("%s: can't update fed job, waited %d seconds for JobId=%u to get a db_index, but it hasn't happened yet. Giving up and letting the user know.",
2041 __func__, db_inx_max_cnt,
2042 job_update_info->job_id);
2043 break;
2044 }
2045 i++;
2046 debug("%s: We cannot update JobId=%u at the moment, we are setting the db index, waiting",
2047 __func__, job_update_info->job_id);
2048 sleep(1);
2049 }
2050
2051 lock_slurmctld(fed_read_lock);
2052 if (!(sibling =
2053 fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) {
2054 error("Invalid sibling name");
2055 } else {
2056 _persist_update_job_resp(sibling, job_update_info->job_id, rc);
2057 }
2058 unlock_slurmctld(fed_read_lock);
2059 }
2060
2061 static void
_handle_fed_job_update_response(fed_job_update_info_t * job_update_info)2062 _handle_fed_job_update_response(fed_job_update_info_t *job_update_info)
2063 {
2064 fed_job_info_t *job_info;
2065 slurmdb_cluster_rec_t *sibling;
2066
2067 slurmctld_lock_t fed_read_lock = {
2068 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2069
2070 slurm_mutex_lock(&fed_job_list_mutex);
2071 if (!(job_info = _find_fed_job_info(job_update_info->job_id))) {
2072 error("%s: failed to find fed job info for fed JobId=%u",
2073 __func__, job_update_info->job_id);
2074 slurm_mutex_unlock(&fed_job_list_mutex);
2075 return;
2076 }
2077
2078 lock_slurmctld(fed_read_lock);
2079
2080 if (!(sibling =
2081 fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) {
2082 error("Invalid sibling name");
2083 unlock_slurmctld(fed_read_lock);
2084 slurm_mutex_unlock(&fed_job_list_mutex);
2085 return;
2086 }
2087
2088 if (job_info->updating_sibs[sibling->fed.id])
2089 job_info->updating_sibs[sibling->fed.id]--;
2090 else
2091 error("%s this should never happen", __func__);
2092
2093 slurm_mutex_unlock(&fed_job_list_mutex);
2094 unlock_slurmctld(fed_read_lock);
2095 }
2096
_handle_fed_job_sync(fed_job_update_info_t * job_update_info)2097 extern int _handle_fed_job_sync(fed_job_update_info_t *job_update_info)
2098 {
2099 int rc = SLURM_SUCCESS;
2100
2101 slurmctld_lock_t job_write_lock = {
2102 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2103
2104 lock_slurmctld(job_write_lock);
2105
2106 rc = _sync_jobs(job_update_info->submit_cluster,
2107 job_update_info->job_info_msg,
2108 job_update_info->start_time);
2109
2110 unlock_slurmctld(job_write_lock);
2111
2112 return rc;
2113 }
2114
2115 /* Have to send the job sync from the job_update thread so that it can
2116 * independently get the job read lock. */
_handle_fed_send_job_sync(fed_job_update_info_t * job_update_info)2117 extern int _handle_fed_send_job_sync(fed_job_update_info_t *job_update_info)
2118 {
2119 int rc = SLURM_SUCCESS;
2120 List jobids;
2121 slurm_msg_t req_msg, job_msg;
2122 sib_msg_t sib_msg = {0};
2123 char *dump = NULL;
2124 int dump_size = 0;
2125 slurmdb_cluster_rec_t *sibling;
2126 Buf buffer;
2127 time_t sync_time = 0;
2128 char *sib_name = job_update_info->submit_cluster;
2129
2130 slurmctld_lock_t job_read_lock = {
2131 READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2132
2133 lock_slurmctld(job_read_lock);
2134
2135 if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) {
2136 error("%s: Invalid sibling name %s", __func__, sib_name);
2137 unlock_slurmctld(job_read_lock);
2138 return SLURM_ERROR;
2139 }
2140
2141 sync_time = time(NULL);
2142 jobids = _get_sync_jobid_list(sibling->fed.id, sync_time);
2143 pack_spec_jobs(&dump, &dump_size, jobids, SHOW_ALL,
2144 slurmctld_conf.slurm_user_id, NO_VAL,
2145 sibling->rpc_version);
2146 FREE_NULL_LIST(jobids);
2147
2148 unlock_slurmctld(job_read_lock);
2149
2150 slurm_msg_t_init(&job_msg);
2151 job_msg.protocol_version = sibling->rpc_version;
2152 job_msg.msg_type = RESPONSE_JOB_INFO;
2153 job_msg.data = dump;
2154 job_msg.data_size = dump_size;
2155
2156 buffer = init_buf(BUF_SIZE);
2157 pack_msg(&job_msg, buffer);
2158
2159 memset(&sib_msg, 0, sizeof(sib_msg_t));
2160 sib_msg.sib_msg_type = FED_JOB_SYNC;
2161 sib_msg.data_buffer = buffer;
2162 sib_msg.data_type = job_msg.msg_type;
2163 sib_msg.data_version = job_msg.protocol_version;
2164 sib_msg.start_time = sync_time;
2165
2166 slurm_msg_t_init(&req_msg);
2167 req_msg.msg_type = REQUEST_SIB_MSG;
2168 req_msg.protocol_version = job_msg.protocol_version;
2169 req_msg.data = &sib_msg;
2170
2171 sibling->fed.sync_sent = true;
2172
2173 rc = _queue_rpc(sibling, &req_msg, 0, false);
2174
2175 free_buf(buffer);
2176 xfree(dump);
2177
2178 return rc;
2179 }
2180
_foreach_fed_job_update_info(fed_job_update_info_t * job_update_info)2181 static int _foreach_fed_job_update_info(fed_job_update_info_t *job_update_info)
2182 {
2183 if (!fed_mgr_cluster_rec) {
2184 info("Not part of federation anymore, not performing fed job updates");
2185 return SLURM_SUCCESS;
2186 }
2187
2188 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
2189 info("%s: JobId=%u type:%s",
2190 __func__, job_update_info->job_id,
2191 _job_update_type_str(job_update_info->type));
2192
2193 switch (job_update_info->type) {
2194 case FED_JOB_COMPLETE:
2195 _handle_fed_job_complete(job_update_info);
2196 break;
2197 case FED_JOB_CANCEL:
2198 _handle_fed_job_cancel(job_update_info);
2199 break;
2200 case FED_JOB_REMOVE_ACTIVE_SIB_BIT:
2201 _handle_fed_job_remove_active_sib_bit(job_update_info);
2202 break;
2203 case FED_JOB_REQUEUE:
2204 _handle_fed_job_requeue(job_update_info);
2205 break;
2206 case FED_JOB_START:
2207 _handle_fed_job_start(job_update_info);
2208 break;
2209 case FED_JOB_SUBMIT_BATCH:
2210 case FED_JOB_SUBMIT_INT:
2211 _handle_fed_job_submission(job_update_info);
2212 break;
2213 case FED_JOB_SYNC:
2214 _handle_fed_job_sync(job_update_info);
2215 break;
2216 case FED_JOB_UPDATE:
2217 _handle_fed_job_update(job_update_info);
2218 break;
2219 case FED_JOB_UPDATE_RESPONSE:
2220 _handle_fed_job_update_response(job_update_info);
2221 break;
2222 case FED_SEND_JOB_SYNC:
2223 _handle_fed_send_job_sync(job_update_info);
2224 break;
2225 default:
2226 error("Invalid fed_job type: %d JobId=%u",
2227 job_update_info->type, job_update_info->job_id);
2228 }
2229
2230 return SLURM_SUCCESS;
2231 }
2232
_update_origin_job_dep(job_record_t * job_ptr,slurmdb_cluster_rec_t * origin)2233 static void _update_origin_job_dep(job_record_t *job_ptr,
2234 slurmdb_cluster_rec_t *origin)
2235 {
2236 slurm_msg_t req_msg;
2237 dep_update_origin_msg_t dep_update_msg = { 0 };
2238
2239 xassert(job_ptr);
2240 xassert(job_ptr->details);
2241 xassert(job_ptr->details->depend_list);
2242 xassert(origin);
2243
2244 if (origin == fed_mgr_cluster_rec) {
2245 error("%s: Cannot send dependency update of %pJ to self - were clusters removed then re-added to the federation in a different order?",
2246 __func__, job_ptr);
2247 return;
2248 }
2249
2250 dep_update_msg.depend_list = job_ptr->details->depend_list;
2251 dep_update_msg.job_id = job_ptr->job_id;
2252
2253 slurm_msg_t_init(&req_msg);
2254 req_msg.msg_type = REQUEST_UPDATE_ORIGIN_DEP;
2255 req_msg.data = &dep_update_msg;
2256
2257 if (_queue_rpc(origin, &req_msg, 0, false))
2258 error("%s: Failed to send dependency update for %pJ",
2259 __func__, job_ptr);
2260 }
2261
_find_local_dep(void * arg,void * key)2262 static int _find_local_dep(void *arg, void *key)
2263 {
2264 depend_spec_t *dep_ptr = (depend_spec_t *) arg;
2265 return !(dep_ptr->depend_flags & SLURM_FLAGS_REMOTE);
2266 }
2267
_find_job_by_id(void * arg,void * key)2268 static int _find_job_by_id(void *arg, void *key)
2269 {
2270 job_record_t *job_ptr = (job_record_t *) arg;
2271 uint32_t job_id = *((uint32_t *) key);
2272 return job_ptr->job_id == job_id;
2273 }
2274
_handle_recv_remote_dep(dep_msg_t * remote_dep_info)2275 static void _handle_recv_remote_dep(dep_msg_t *remote_dep_info)
2276 {
2277 /*
2278 * update_job_dependency() will:
2279 * - read the job list (need job read lock)
2280 * - call fed_mgr_is_origin_job_id (need fed read lock)
2281 */
2282 int rc, tmp;
2283 slurmctld_lock_t job_read_lock = { .job = READ_LOCK, .fed = READ_LOCK };
2284 job_record_t *job_ptr = xmalloc(sizeof *job_ptr);
2285
2286 job_ptr->magic = JOB_MAGIC;
2287 job_ptr->details = xmalloc(sizeof *(job_ptr->details));
2288 job_ptr->details->magic = DETAILS_MAGIC;
2289 job_ptr->job_id = remote_dep_info->job_id;
2290 job_ptr->name = remote_dep_info->job_name;
2291 job_ptr->user_id = remote_dep_info->user_id;
2292
2293 /*
2294 * Initialize array info. Allocate space for job_ptr->array_recs if
2295 * the job is an array so it's recognized as an array, but it's not used
2296 * anywhere.
2297 */
2298 job_ptr->array_job_id = remote_dep_info->array_job_id;
2299 job_ptr->array_task_id = remote_dep_info->array_task_id;
2300
2301 /*
2302 * We need to allocate space for job_ptr->array_recs if
2303 * it's an array job, but we don't use anything in it.
2304 */
2305 if (remote_dep_info->is_array)
2306 job_ptr->array_recs = xmalloc(sizeof *(job_ptr->array_recs));
2307
2308 /*
2309 * We need to allocate space for fed_details so
2310 * other places know this is a fed job, but we don't
2311 * need to set anything specific in it.
2312 */
2313 job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details));
2314
2315 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
2316 info("%s: Got job_id: %u, name: \"%s\", array_task_id: %u, dependency: \"%s\", is_array? %s, user_id: %u",
2317 __func__, remote_dep_info->job_id, remote_dep_info->job_name,
2318 remote_dep_info->array_task_id, remote_dep_info->dependency,
2319 remote_dep_info->is_array ? "yes" : "no",
2320 remote_dep_info->user_id);
2321
2322 /* NULL string so it doesn't get free'd since it's used by job_ptr */
2323 remote_dep_info->job_name = NULL;
2324
2325 /* Create and validate the dependency. */
2326 lock_slurmctld(job_read_lock);
2327 rc = update_job_dependency(job_ptr, remote_dep_info->dependency);
2328 unlock_slurmctld(job_read_lock);
2329
2330 if (rc) {
2331 error("%s: Invalid dependency %s for %pJ: %s",
2332 __func__, remote_dep_info->dependency, job_ptr,
2333 slurm_strerror(rc));
2334 _destroy_dep_job(job_ptr);
2335 } else {
2336 job_record_t *tmp_job;
2337 ListIterator itr;
2338
2339 /*
2340 * Remove the old reference to this job from remote_dep_job_list
2341 * so that we don't continue testing the old dependencies.
2342 */
2343 slurm_mutex_lock(&dep_job_list_mutex);
2344 itr = list_iterator_create(remote_dep_job_list);
2345 while ((tmp_job = list_next(itr))) {
2346 if (tmp_job->job_id == job_ptr->job_id) {
2347 list_delete_item(itr);
2348 break;
2349 }
2350 }
2351 list_iterator_destroy(itr);
2352
2353 /*
2354 * If we were sent a list of 0 dependencies, that means
2355 * the dependency was updated and cleared, so don't
2356 * add it to the list to test. Also only add it if
2357 * there are dependencies local to this cluster.
2358 */
2359 if (list_count(job_ptr->details->depend_list) &&
2360 list_find_first(job_ptr->details->depend_list,
2361 _find_local_dep, &tmp))
2362 list_append(remote_dep_job_list, job_ptr);
2363 else
2364 _destroy_dep_job(job_ptr);
2365
2366 slurm_mutex_unlock(&dep_job_list_mutex);
2367 }
2368 _destroy_dep_msg(remote_dep_info);
2369 }
2370
_handle_dep_update_origin_msgs(void)2371 static void _handle_dep_update_origin_msgs(void)
2372 {
2373 job_record_t *job_ptr;
2374 dep_update_origin_msg_t *dep_update_msg;
2375 List update_job_list = NULL;
2376 slurmctld_lock_t job_write_lock = {
2377 .conf = READ_LOCK, .job = WRITE_LOCK, .fed = READ_LOCK };
2378
2379 if (!list_count(origin_dep_update_list))
2380 return;
2381
2382 lock_slurmctld(job_write_lock);
2383 while ((dep_update_msg = list_pop(origin_dep_update_list))) {
2384 if (!(job_ptr = find_job_record(dep_update_msg->job_id))) {
2385 /*
2386 * Maybe the job was cancelled and purged before
2387 * the dependency update got here or was able
2388 * to be processed. Regardless, this job doesn't
2389 * exist here, so we have to throw out this
2390 * dependency update message.
2391 */
2392 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
2393 info("%s: Could not find job %u, cannot process dependency update. Perhaps the jobs was purged before we got here.",
2394 __func__, dep_update_msg->job_id);
2395 slurm_free_dep_update_origin_msg(dep_update_msg);
2396 continue;
2397 } else if (!job_ptr->details ||
2398 !job_ptr->details->depend_list) {
2399 /*
2400 * This might happen if the job's dependencies
2401 * were updated to be none before the dependency
2402 * update came from the sibling cluster.
2403 */
2404 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
2405 info("%s: %pJ doesn't have dependencies, cannot process dependency update",
2406 __func__, job_ptr);
2407 slurm_free_dep_update_origin_msg(dep_update_msg);
2408 continue;
2409 }
2410 if (update_job_dependency_list(job_ptr,
2411 dep_update_msg->depend_list)) {
2412 if (!update_job_list) {
2413 update_job_list = list_create(NULL);
2414 list_append(update_job_list, job_ptr);
2415 } else if (!list_find_first(update_job_list,
2416 _find_job_by_id,
2417 &job_ptr->job_id))
2418 list_append(update_job_list, job_ptr);
2419 }
2420 slurm_free_dep_update_origin_msg(dep_update_msg);
2421 }
2422 if (update_job_list) {
2423 list_for_each(update_job_list, handle_job_dependency_updates,
2424 NULL);
2425 FREE_NULL_LIST(update_job_list);
2426 }
2427 unlock_slurmctld(job_write_lock);
2428 }
2429
_test_dep_job_thread(void * arg)2430 static void *_test_dep_job_thread(void *arg)
2431 {
2432 time_t last_test = 0;
2433 time_t now;
2434 slurmctld_lock_t job_read_lock = {
2435 .job = READ_LOCK, .fed = READ_LOCK };
2436
2437 #if HAVE_SYS_PRCTL_H
2438 if (prctl(PR_SET_NAME, "fed_test_dep", NULL, NULL, NULL) < 0) {
2439 error("%s: cannot set my name to %s %m", __func__,
2440 "fed_test_dep");
2441 }
2442 #endif
2443
2444 while (!slurmctld_config.shutdown_time) {
2445 now = time(NULL);
2446 /* Only test after joining a federation. */
2447 if (fed_mgr_fed_rec && fed_mgr_cluster_rec &&
2448 ((now - last_test) > TEST_REMOTE_DEP_FREQ)) {
2449 last_test = now;
2450 lock_slurmctld(job_read_lock);
2451 fed_mgr_test_remote_dependencies();
2452 unlock_slurmctld(job_read_lock);
2453 }
2454 sleep(2);
2455 }
2456 return NULL;
2457 }
2458
_origin_dep_update_thread(void * arg)2459 static void *_origin_dep_update_thread(void *arg)
2460 {
2461 struct timespec ts = {0, 0};
2462
2463 #if HAVE_SYS_PRCTL_H
2464 if (prctl(PR_SET_NAME, "fed_update_dep", NULL, NULL, NULL) < 0) {
2465 error("%s: cannot set my name to %s %m", __func__,
2466 "fed_update_dep");
2467 }
2468 #endif
2469
2470 while (!slurmctld_config.shutdown_time) {
2471 slurm_mutex_lock(&origin_dep_update_mutex);
2472 ts.tv_sec = time(NULL) + 2;
2473 slurm_cond_timedwait(&origin_dep_cond,
2474 &origin_dep_update_mutex, &ts);
2475 slurm_mutex_unlock(&origin_dep_update_mutex);
2476
2477 if (slurmctld_config.shutdown_time)
2478 break;
2479
2480 /* Wait for fed_mgr_init() */
2481 if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec)
2482 continue;
2483
2484 _handle_dep_update_origin_msgs();
2485 }
2486 return NULL;
2487 }
2488
_remote_dep_recv_thread(void * arg)2489 static void *_remote_dep_recv_thread(void *arg)
2490 {
2491 struct timespec ts = {0, 0};
2492 dep_msg_t *remote_dep_info;
2493
2494 #if HAVE_SYS_PRCTL_H
2495 if (prctl(PR_SET_NAME, "fed_remote_dep", NULL, NULL, NULL) < 0) {
2496 error("%s: cannot set my name to %s %m", __func__,
2497 "fed_remote_dep");
2498 }
2499 #endif
2500
2501 while (!slurmctld_config.shutdown_time) {
2502 slurm_mutex_lock(&remote_dep_recv_mutex);
2503 ts.tv_sec = time(NULL) + 2;
2504 slurm_cond_timedwait(&remote_dep_cond,
2505 &remote_dep_recv_mutex, &ts);
2506 slurm_mutex_unlock(&remote_dep_recv_mutex);
2507
2508 if (slurmctld_config.shutdown_time)
2509 break;
2510
2511 /* Wait for fed_mgr_init() */
2512 if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec)
2513 continue;
2514
2515 while ((remote_dep_info = list_pop(remote_dep_recv_list))) {
2516 _handle_recv_remote_dep(remote_dep_info);
2517 }
2518 }
2519 return NULL;
2520 }
2521
2522 /* Start a thread to manage queued sibling requests */
_fed_job_update_thread(void * arg)2523 static void *_fed_job_update_thread(void *arg)
2524 {
2525 struct timespec ts = {0, 0};
2526 fed_job_update_info_t *job_update_info;
2527
2528 #if HAVE_SYS_PRCTL_H
2529 if (prctl(PR_SET_NAME, "fed_jobs", NULL, NULL, NULL) < 0) {
2530 error("%s: cannot set my name to %s %m", __func__, "fed_jobs");
2531 }
2532 #endif
2533
2534 while (!slurmctld_config.shutdown_time) {
2535 slurm_mutex_lock(&job_update_mutex);
2536 ts.tv_sec = time(NULL) + 2;
2537 slurm_cond_timedwait(&job_update_cond,
2538 &job_update_mutex, &ts);
2539 slurm_mutex_unlock(&job_update_mutex);
2540
2541 if (slurmctld_config.shutdown_time)
2542 break;
2543
2544 while ((job_update_info = list_pop(fed_job_update_list))) {
2545 _foreach_fed_job_update_info(job_update_info);
2546 _destroy_fed_job_update_info(job_update_info);
2547 }
2548 }
2549
2550 return NULL;
2551 }
2552
2553 /* Start a thread to manage queued agent requests */
_agent_thread(void * arg)2554 static void *_agent_thread(void *arg)
2555 {
2556 slurmdb_cluster_rec_t *cluster;
2557 struct timespec ts = {0, 0};
2558 ListIterator cluster_iter, rpc_iter;
2559 agent_queue_t *rpc_rec;
2560 slurm_msg_t req_msg, resp_msg;
2561 ctld_list_msg_t ctld_req_msg;
2562 bitstr_t *success_bits;
2563 int rc, resp_inx, success_size;
2564
2565 slurmctld_lock_t fed_read_lock = {
2566 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2567
2568 #if HAVE_SYS_PRCTL_H
2569 if (prctl(PR_SET_NAME, "fed_agent", NULL, NULL, NULL) < 0) {
2570 error("%s: cannot set my name to %s %m", __func__, "fed_agent");
2571 }
2572 #endif
2573
2574 while (!slurmctld_config.shutdown_time) {
2575 /* Wait for new work or re-issue RPCs after 2 second wait */
2576 slurm_mutex_lock(&agent_mutex);
2577 if (!slurmctld_config.shutdown_time && !agent_queue_size) {
2578 ts.tv_sec = time(NULL) + 2;
2579 slurm_cond_timedwait(&agent_cond, &agent_mutex, &ts);
2580 }
2581 agent_queue_size = 0;
2582 slurm_mutex_unlock(&agent_mutex);
2583 if (slurmctld_config.shutdown_time)
2584 break;
2585
2586 lock_slurmctld(fed_read_lock);
2587 if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) {
2588 unlock_slurmctld(fed_read_lock);
2589 continue;
2590 }
2591
2592 /* Look for work on each cluster */
2593 cluster_iter = list_iterator_create(
2594 fed_mgr_fed_rec->cluster_list);
2595 while (!slurmctld_config.shutdown_time &&
2596 (cluster = list_next(cluster_iter))) {
2597 time_t now = time(NULL);
2598 if ((cluster->send_rpc == NULL) ||
2599 (list_count(cluster->send_rpc) == 0))
2600 continue;
2601
2602 /* Move currently pending RPCs to new list */
2603 ctld_req_msg.my_list = NULL;
2604 rpc_iter = list_iterator_create(cluster->send_rpc);
2605 while ((rpc_rec = list_next(rpc_iter))) {
2606 if ((rpc_rec->last_try + rpc_rec->last_defer) >=
2607 now)
2608 continue;
2609 if (!ctld_req_msg.my_list)
2610 ctld_req_msg.my_list =list_create(NULL);
2611 list_append(ctld_req_msg.my_list,
2612 rpc_rec->buffer);
2613 rpc_rec->last_try = now;
2614 if (rpc_rec->last_defer == 128) {
2615 info("%s: %s JobId=%u request to cluster %s is repeatedly failing",
2616 __func__,
2617 rpc_num2string(rpc_rec->msg_type),
2618 rpc_rec->job_id, cluster->name);
2619 rpc_rec->last_defer *= 2;
2620 } else if (rpc_rec->last_defer)
2621 rpc_rec->last_defer *= 2;
2622 else
2623 rpc_rec->last_defer = 2;
2624 }
2625 list_iterator_destroy(rpc_iter);
2626 if (!ctld_req_msg.my_list)
2627 continue;
2628
2629 /* Build, pack and send the combined RPC */
2630 slurm_msg_t_init(&req_msg);
2631 req_msg.msg_type = REQUEST_CTLD_MULT_MSG;
2632 req_msg.data = &ctld_req_msg;
2633 rc = _send_recv_msg(cluster, &req_msg, &resp_msg,
2634 false);
2635
2636 /* Process the response */
2637 if ((rc == SLURM_SUCCESS) &&
2638 (resp_msg.msg_type == RESPONSE_CTLD_MULT_MSG)) {
2639 /* Remove successfully processed RPCs */
2640 resp_inx = 0;
2641 success_bits = _parse_resp_ctld_mult(&resp_msg);
2642 success_size = bit_size(success_bits);
2643 rpc_iter = list_iterator_create(cluster->
2644 send_rpc);
2645 while ((rpc_rec = list_next(rpc_iter))) {
2646 if (rpc_rec->last_try != now)
2647 continue;
2648 if (resp_inx >= success_size) {
2649 error("%s: bitmap too small (%d >= %d)",
2650 __func__, resp_inx,
2651 success_size);
2652 break;
2653 }
2654 if (bit_test(success_bits, resp_inx++))
2655 list_delete_item(rpc_iter);
2656 }
2657 list_iterator_destroy(rpc_iter);
2658 FREE_NULL_BITMAP(success_bits);
2659 } else {
2660 /* Failed to process combined RPC.
2661 * Leave all RPCs on the queue. */
2662 if (rc != SLURM_SUCCESS) {
2663 if (_comm_fail_log(cluster)) {
2664 error("%s: Failed to send RPC: %s",
2665 __func__,
2666 slurm_strerror(rc));
2667 } else {
2668 debug("%s: Failed to send RPC: %s",
2669 __func__,
2670 slurm_strerror(rc));
2671 }
2672 } else if (resp_msg.msg_type ==
2673 PERSIST_RC) {
2674 persist_rc_msg_t *msg;
2675 char *err_str;
2676 msg = resp_msg.data;
2677 if (msg->comment)
2678 err_str = msg->comment;
2679 else
2680 err_str=slurm_strerror(msg->rc);
2681 error("%s: failed to process msg: %s",
2682 __func__, err_str);
2683 } else if (resp_msg.msg_type ==
2684 RESPONSE_SLURM_RC) {
2685 rc = slurm_get_return_code(
2686 resp_msg.msg_type,
2687 resp_msg.data);
2688 error("%s: failed to process msg: %s",
2689 __func__, slurm_strerror(rc));
2690 } else {
2691 error("%s: Invalid response msg_type: %u",
2692 __func__, resp_msg.msg_type);
2693 }
2694 }
2695 (void) slurm_free_msg_data(resp_msg.msg_type,
2696 resp_msg.data);
2697
2698 list_destroy(ctld_req_msg.my_list);
2699 }
2700 list_iterator_destroy(cluster_iter);
2701
2702 unlock_slurmctld(fed_read_lock);
2703 }
2704
2705 /* Log the abandoned RPCs */
2706 lock_slurmctld(fed_read_lock);
2707 if (!fed_mgr_fed_rec)
2708 goto end_it;
2709
2710 cluster_iter = list_iterator_create(fed_mgr_fed_rec->cluster_list);
2711 while ((cluster = list_next(cluster_iter))) {
2712 if (cluster->send_rpc == NULL)
2713 continue;
2714
2715 rpc_iter = list_iterator_create(cluster->send_rpc);
2716 while ((rpc_rec = list_next(rpc_iter))) {
2717 info("%s: %s JobId=%u request to cluster %s aborted",
2718 __func__, rpc_num2string(rpc_rec->msg_type),
2719 rpc_rec->job_id, cluster->name);
2720 list_delete_item(rpc_iter);
2721 }
2722 list_iterator_destroy(rpc_iter);
2723 FREE_NULL_LIST(cluster->send_rpc);
2724 }
2725 list_iterator_destroy(cluster_iter);
2726
2727 end_it:
2728 unlock_slurmctld(fed_read_lock);
2729
2730 return NULL;
2731 }
2732
_spawn_threads(void)2733 static void _spawn_threads(void)
2734 {
2735 slurm_mutex_lock(&agent_mutex);
2736 slurm_thread_create(&agent_thread_id, _agent_thread, NULL);
2737 slurm_mutex_unlock(&agent_mutex);
2738
2739 slurm_mutex_lock(&job_update_mutex);
2740 slurm_thread_create(&fed_job_update_thread_id,
2741 _fed_job_update_thread, NULL);
2742 slurm_mutex_unlock(&job_update_mutex);
2743
2744 slurm_mutex_lock(&remote_dep_recv_mutex);
2745 slurm_thread_create(&remote_dep_thread_id,
2746 _remote_dep_recv_thread, NULL);
2747 slurm_mutex_unlock(&remote_dep_recv_mutex);
2748
2749 slurm_thread_create(&dep_job_thread_id, _test_dep_job_thread, NULL);
2750
2751 slurm_mutex_lock(&origin_dep_update_mutex);
2752 slurm_thread_create(&origin_dep_thread_id, _origin_dep_update_thread,
2753 NULL);
2754 slurm_mutex_unlock(&origin_dep_update_mutex);
2755 }
2756
_add_missing_fed_job_info()2757 static void _add_missing_fed_job_info()
2758 {
2759 job_record_t *job_ptr;
2760 ListIterator job_itr;
2761
2762 slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
2763
2764 /* Sanity check and add any missing job_info structs */
2765 lock_slurmctld(job_read_lock);
2766 job_itr = list_iterator_create(job_list);
2767 while ((job_ptr = list_next(job_itr))) {
2768 uint32_t origin_id;
2769
2770 if (!_is_fed_job(job_ptr, &origin_id))
2771 continue;
2772
2773 if (!_find_fed_job_info(job_ptr->job_id)) {
2774 info("adding missing fed_job_info for job %pJ",
2775 job_ptr);
2776 add_fed_job_info(job_ptr);
2777 }
2778 }
2779 list_iterator_destroy(job_itr);
2780 unlock_slurmctld(job_read_lock);
2781 }
2782
fed_mgr_init(void * db_conn)2783 extern int fed_mgr_init(void *db_conn)
2784 {
2785 int rc = SLURM_SUCCESS;
2786 uint64_t tmp = 0;
2787 slurmdb_federation_cond_t fed_cond;
2788 List fed_list;
2789 slurmdb_federation_rec_t *fed = NULL, *state_fed = NULL;
2790 slurmdb_cluster_rec_t *state_cluster = NULL;
2791
2792 slurm_mutex_lock(&init_mutex);
2793
2794 if (inited) {
2795 slurm_mutex_unlock(&init_mutex);
2796 return SLURM_SUCCESS;
2797 }
2798
2799 if (!association_based_accounting)
2800 goto end_it;
2801
2802 slurm_mutex_lock(&fed_job_list_mutex);
2803 if (!fed_job_list)
2804 fed_job_list = list_create(xfree_ptr);
2805 slurm_mutex_unlock(&fed_job_list_mutex);
2806
2807 /*
2808 * fed_job_update_list should only be appended to and popped from.
2809 * So rely on the list's lock. If there are ever changes to iterate the
2810 * list, then a lock will be needed around the list.
2811 */
2812 if (!fed_job_update_list)
2813 fed_job_update_list = list_create(_destroy_fed_job_update_info);
2814
2815 /*
2816 * remote_dep_recv_list should only be appended to and popped from.
2817 * So rely on the list's lock. If there are ever changes to iterate the
2818 * list, then a lock will be needed around the list.
2819 */
2820 if (!remote_dep_recv_list)
2821 remote_dep_recv_list = list_create(_destroy_dep_msg);
2822
2823 /*
2824 * origin_dep_update_list should only be read from or modified with
2825 * list_* functions (such as append, pop, count).
2826 * So rely on the list's lock. If there are ever changes to iterate the
2827 * list, then a lock will be needed around the list.
2828 */
2829 if (!origin_dep_update_list)
2830 origin_dep_update_list = list_create(_destroy_dep_update_msg);
2831
2832 slurm_mutex_lock(&dep_job_list_mutex);
2833 if (!remote_dep_job_list)
2834 remote_dep_job_list = list_create(_destroy_dep_job);
2835 slurm_mutex_unlock(&dep_job_list_mutex);
2836
2837 slurm_persist_conn_recv_server_init();
2838 _spawn_threads();
2839
2840 if (running_cache) {
2841 debug("Database appears down, reading federations from state file.");
2842 fed = _state_load(slurmctld_conf.state_save_location);
2843 if (!fed) {
2844 debug2("No federation state");
2845 rc = SLURM_SUCCESS;
2846 goto end_it;
2847 }
2848 } else {
2849 state_fed = _state_load(slurmctld_conf.state_save_location);
2850 if (state_fed)
2851 state_cluster = list_find_first(
2852 state_fed->cluster_list,
2853 slurmdb_find_cluster_in_list,
2854 slurmctld_conf.cluster_name);
2855
2856 slurmdb_init_federation_cond(&fed_cond, 0);
2857 fed_cond.cluster_list = list_create(NULL);
2858 list_append(fed_cond.cluster_list, slurmctld_conf.cluster_name);
2859
2860 fed_list = acct_storage_g_get_federations(
2861 db_conn,
2862 slurmctld_conf.slurm_user_id,
2863 &fed_cond);
2864 FREE_NULL_LIST(fed_cond.cluster_list);
2865 if (!fed_list) {
2866 error("failed to get a federation list");
2867 rc = SLURM_ERROR;
2868 goto end_it;
2869 }
2870
2871 if (list_count(fed_list) == 1)
2872 fed = list_pop(fed_list);
2873 else if (list_count(fed_list) > 1) {
2874 error("got more federations than expected");
2875 rc = SLURM_ERROR;
2876 }
2877 FREE_NULL_LIST(fed_list);
2878 }
2879
2880 if (fed) {
2881 slurmdb_cluster_rec_t *cluster = NULL;
2882 slurmctld_lock_t fedr_jobw_lock = {
2883 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
2884
2885 if ((cluster = list_find_first(fed->cluster_list,
2886 slurmdb_find_cluster_in_list,
2887 slurmctld_conf.cluster_name))) {
2888 job_record_t *job_ptr;
2889 ListIterator itr;
2890
2891 _join_federation(fed, cluster, &tmp);
2892
2893 /* Find clusters that were removed from the federation
2894 * since the last time we got an update */
2895 lock_slurmctld(fedr_jobw_lock);
2896 if (state_fed && state_cluster && fed_mgr_fed_rec)
2897 _handle_removed_clusters(state_fed, &tmp);
2898
2899 /* Send remote dependencies to siblings. */
2900 itr = list_iterator_create(job_list);
2901 while ((job_ptr = list_next(itr))) {
2902 if (job_ptr->details &&
2903 job_ptr->details->dependency &&
2904 list_count(job_ptr->details->depend_list) &&
2905 fed_mgr_submit_remote_dependencies(job_ptr,
2906 false,
2907 false))
2908 error("%s: Failed to send %pJ dependencies to some or all siblings",
2909 __func__, job_ptr);
2910 }
2911 list_iterator_destroy(itr);
2912 unlock_slurmctld(fedr_jobw_lock);
2913 } else {
2914 slurmdb_destroy_federation_rec(fed);
2915 error("failed to get cluster from federation that we requested");
2916 rc = SLURM_ERROR;
2917 }
2918 } else if (state_fed && state_cluster) {
2919 /* cluster has been removed from federation while it was down.
2920 * Need to clear up jobs */
2921 slurmctld_lock_t fedw_jobw_lock = {
2922 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
2923
2924 info("self was removed from federation since last start");
2925 lock_slurmctld(fedw_jobw_lock);
2926 fed_mgr_cluster_rec = state_cluster;
2927 _cleanup_removed_origin_jobs();
2928 fed_mgr_cluster_rec = NULL;
2929 unlock_slurmctld(fedw_jobw_lock);
2930 }
2931
2932 slurmdb_destroy_federation_rec(state_fed);
2933
2934 end_it:
2935 /* Call whether state file existed or not. */
2936 _add_missing_fed_job_info();
2937
2938 inited = true;
2939 slurm_mutex_unlock(&init_mutex);
2940
2941 return rc;
2942 }
2943
fed_mgr_fini(void)2944 extern int fed_mgr_fini(void)
2945 {
2946 slurmctld_lock_t fed_write_lock = {
2947 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
2948
2949 slurm_mutex_lock(&init_mutex);
2950 inited = false;
2951 slurm_mutex_unlock(&init_mutex);
2952
2953 lock_slurmctld(fed_write_lock);
2954 /* Call _leave_federation() before slurm_persist_conn_recv_server_fini()
2955 * as this will NULL out the cluster's recv persistent connection before
2956 * _server_fini() actually destroy's it. That way the cluster's recv
2957 * connection won't be pointing to bad memory. */
2958 _leave_federation();
2959 unlock_slurmctld(fed_write_lock);
2960
2961 slurm_persist_conn_recv_server_fini();
2962
2963 if (agent_thread_id)
2964 pthread_join(agent_thread_id, NULL);
2965
2966 if (fed_job_update_thread_id)
2967 pthread_join(fed_job_update_thread_id, NULL);
2968
2969 if (remote_dep_thread_id)
2970 pthread_join(remote_dep_thread_id, NULL);
2971
2972 if (dep_job_thread_id)
2973 pthread_join(dep_job_thread_id, NULL);
2974
2975 if (origin_dep_thread_id)
2976 pthread_join(origin_dep_thread_id, NULL);
2977
2978 _remove_job_watch_thread();
2979
2980 slurm_mutex_lock(&fed_job_list_mutex);
2981 FREE_NULL_LIST(fed_job_list);
2982 slurm_mutex_unlock(&fed_job_list_mutex);
2983
2984 FREE_NULL_LIST(fed_job_update_list);
2985
2986 return SLURM_SUCCESS;
2987 }
2988
_handle_dependencies_for_modified_fed(uint64_t added_clusters,uint64_t removed_clusters)2989 static void _handle_dependencies_for_modified_fed(uint64_t added_clusters,
2990 uint64_t removed_clusters)
2991 {
2992 uint32_t origin_id;
2993 job_record_t *job_ptr;
2994 ListIterator itr;
2995 depend_spec_t find_dep = { 0 };
2996
2997 xassert(verify_lock(JOB_LOCK, READ_LOCK));
2998 xassert(verify_lock(FED_LOCK, READ_LOCK));
2999
3000 if (!fed_mgr_cluster_rec)
3001 return;
3002
3003 find_dep.depend_type = SLURM_DEPEND_SINGLETON;
3004 itr = list_iterator_create(job_list);
3005 while ((job_ptr = list_next(itr))) {
3006 if (added_clusters && IS_JOB_PENDING(job_ptr) &&
3007 _is_fed_job(job_ptr, &origin_id) &&
3008 find_dependency(job_ptr, &find_dep))
3009 fed_mgr_submit_remote_dependencies(job_ptr, true,
3010 false);
3011 /*
3012 * Make sure any remote dependencies are immediately
3013 * marked as invalid.
3014 */
3015 if (removed_clusters)
3016 test_job_dependency(job_ptr, NULL);
3017 }
3018 list_iterator_destroy(itr);
3019 }
3020
fed_mgr_update_feds(slurmdb_update_object_t * update)3021 extern int fed_mgr_update_feds(slurmdb_update_object_t *update)
3022 {
3023 uint64_t added_clusters = 0, removed_clusters = 0;
3024 List feds;
3025 slurmdb_federation_rec_t *fed = NULL;
3026 slurmdb_cluster_rec_t *cluster = NULL;
3027 slurmctld_lock_t fedr_jobw_lock = {
3028 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
3029 slurmctld_lock_t fedw_jobw_lock = {
3030 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
3031
3032 if (!update->objects)
3033 return SLURM_SUCCESS;
3034
3035 slurm_mutex_lock(&init_mutex);
3036 if (!inited) {
3037 slurm_mutex_unlock(&init_mutex);
3038 return SLURM_SUCCESS; /* we haven't started the fed mgr and we
3039 * can't start it from here, don't worry
3040 * all will get set up later. */
3041 }
3042 slurm_mutex_unlock(&init_mutex);
3043 /* we only want one update happening at a time. */
3044 slurm_mutex_lock(&update_mutex);
3045 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
3046 info("Got a federation update");
3047
3048 feds = update->objects;
3049
3050 /* find the federation that this cluster is in.
3051 * if it's changed from last time then update stored information.
3052 * grab other clusters in federation
3053 * establish connections with each cluster in federation */
3054
3055 /* what if a remote cluster is removed from federation.
3056 * have to detect that and close the connection to the remote */
3057 while ((fed = list_pop(feds))) {
3058 if (fed->cluster_list &&
3059 (cluster = list_find_first(fed->cluster_list,
3060 slurmdb_find_cluster_in_list,
3061 slurmctld_conf.cluster_name))) {
3062 /* Find clusters that were removed from the federation
3063 * since the last time we got an update */
3064 lock_slurmctld(fedr_jobw_lock);
3065 if (fed_mgr_fed_rec)
3066 _handle_removed_clusters(fed,
3067 &removed_clusters);
3068 unlock_slurmctld(fedr_jobw_lock);
3069 _join_federation(fed, cluster, &added_clusters);
3070
3071 if (added_clusters || removed_clusters) {
3072 lock_slurmctld(fedr_jobw_lock);
3073 log_flag(DEPENDENCY, "%s: Cluster(s) added: 0x%"PRIx64"; removed: 0x%"PRIx64,
3074 __func__, added_clusters,
3075 removed_clusters);
3076 _handle_dependencies_for_modified_fed(
3077 added_clusters,
3078 removed_clusters);
3079 unlock_slurmctld(fedr_jobw_lock);
3080 }
3081 break;
3082 }
3083 slurmdb_destroy_federation_rec(fed);
3084 }
3085
3086 if (!fed && fed_mgr_fed_rec) {
3087 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
3088 info("Not part of any federation");
3089 lock_slurmctld(fedw_jobw_lock);
3090 _cleanup_removed_origin_jobs();
3091 _leave_federation();
3092 unlock_slurmctld(fedw_jobw_lock);
3093 }
3094 slurm_mutex_unlock(&update_mutex);
3095 return SLURM_SUCCESS;
3096 }
3097
_pack_fed_job_info(fed_job_info_t * job_info,Buf buffer,uint16_t protocol_version)3098 static void _pack_fed_job_info(fed_job_info_t *job_info, Buf buffer,
3099 uint16_t protocol_version)
3100 {
3101 int i;
3102 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3103 pack32(job_info->cluster_lock, buffer);
3104 pack32(job_info->job_id, buffer);
3105 pack64(job_info->siblings_active, buffer);
3106 pack64(job_info->siblings_viable, buffer);
3107
3108 for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3109 pack32(job_info->updating_sibs[i], buffer);
3110 for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3111 pack_time(job_info->updating_time[i], buffer);
3112 } else {
3113 error("%s: protocol_version %hu not supported.",
3114 __func__, protocol_version);
3115 }
3116 }
3117
_unpack_fed_job_info(fed_job_info_t ** job_info_pptr,Buf buffer,uint16_t protocol_version)3118 static int _unpack_fed_job_info(fed_job_info_t **job_info_pptr, Buf buffer,
3119 uint16_t protocol_version)
3120 {
3121 int i;
3122 fed_job_info_t *job_info = xmalloc(sizeof(fed_job_info_t));
3123
3124 *job_info_pptr = job_info;
3125
3126 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3127 safe_unpack32(&job_info->cluster_lock, buffer);
3128 safe_unpack32(&job_info->job_id, buffer);
3129 safe_unpack64(&job_info->siblings_active, buffer);
3130 safe_unpack64(&job_info->siblings_viable, buffer);
3131
3132 for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3133 safe_unpack32(&job_info->updating_sibs[i], buffer);
3134 for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3135 safe_unpack_time(&job_info->updating_time[i], buffer);
3136 } else {
3137 error("%s: protocol_version %hu not supported.",
3138 __func__, protocol_version);
3139 goto unpack_error;
3140 }
3141
3142 return SLURM_SUCCESS;
3143
3144 unpack_error:
3145 xfree(job_info);
3146 *job_info_pptr = NULL;
3147 return SLURM_ERROR;
3148 }
3149
_dump_fed_job_list(Buf buffer,uint16_t protocol_version)3150 static void _dump_fed_job_list(Buf buffer, uint16_t protocol_version)
3151 {
3152 uint32_t count = NO_VAL;
3153 fed_job_info_t *fed_job_info;
3154
3155 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3156 /*
3157 * Need to be in the lock to prevent the window between getting
3158 * the count and actually looping on the list.
3159 */
3160 slurm_mutex_lock(&fed_job_list_mutex);
3161 if (fed_job_list)
3162 count = list_count(fed_job_list);
3163 else
3164 count = NO_VAL;
3165
3166 pack32(count, buffer);
3167 if (count && (count != NO_VAL)) {
3168 ListIterator itr = list_iterator_create(fed_job_list);
3169 while ((fed_job_info = list_next(itr))) {
3170 _pack_fed_job_info(fed_job_info, buffer,
3171 protocol_version);
3172 }
3173 list_iterator_destroy(itr);
3174 }
3175 slurm_mutex_unlock(&fed_job_list_mutex);
3176 } else {
3177 error("%s: protocol_version %hu not supported.",
3178 __func__, protocol_version);
3179 }
3180 }
3181
_load_fed_job_list(Buf buffer,uint16_t protocol_version)3182 static List _load_fed_job_list(Buf buffer, uint16_t protocol_version)
3183 {
3184 int i;
3185 uint32_t count;
3186 fed_job_info_t *tmp_job_info = NULL;
3187 List tmp_list = NULL;
3188
3189 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3190 safe_unpack32(&count, buffer);
3191 if (count > NO_VAL)
3192 goto unpack_error;
3193 if (count != NO_VAL) {
3194 tmp_list = list_create(xfree_ptr);
3195
3196 for (i = 0; i < count; i++) {
3197 if (_unpack_fed_job_info(&tmp_job_info, buffer,
3198 protocol_version))
3199 goto unpack_error;
3200 list_append(tmp_list, tmp_job_info);
3201 }
3202 }
3203 } else {
3204 error("%s: protocol_version %hu not supported.",
3205 __func__, protocol_version);
3206 }
3207
3208 return tmp_list;
3209
3210 unpack_error:
3211 FREE_NULL_LIST(tmp_list);
3212 return NULL;
3213 }
3214
3215 /*
3216 * If this changes, then _pack_dep_msg() in slurm_protocol_pack.c probably
3217 * needs to change.
3218 */
_pack_remote_dep_job(job_record_t * job_ptr,Buf buffer,uint16_t protocol_version)3219 static void _pack_remote_dep_job(job_record_t *job_ptr, Buf buffer,
3220 uint16_t protocol_version)
3221 {
3222 if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3223 pack32(job_ptr->array_job_id, buffer);
3224 pack32(job_ptr->array_task_id, buffer);
3225 pack_dep_list(job_ptr->details->depend_list, buffer,
3226 protocol_version);
3227 packstr(job_ptr->details->dependency, buffer);
3228 packbool(job_ptr->array_recs ? true : false, buffer);
3229 pack32(job_ptr->job_id, buffer);
3230 packstr(job_ptr->name, buffer);
3231 pack32(job_ptr->user_id, buffer);
3232 } else {
3233 error("%s: protocol_version %hu not supported.",
3234 __func__, protocol_version);
3235 }
3236 }
3237
3238 /*
3239 * If this changes, then _unpack_dep_msg() in slurm_protocol_pack.c probably
3240 * needs to change.
3241 */
_unpack_remote_dep_job(job_record_t ** job_pptr,Buf buffer,uint16_t protocol_version)3242 static int _unpack_remote_dep_job(job_record_t **job_pptr, Buf buffer,
3243 uint16_t protocol_version)
3244 {
3245 uint32_t uint32_tmp;
3246 bool is_array;
3247 job_record_t *job_ptr;
3248
3249 xassert(job_pptr);
3250
3251 job_ptr = xmalloc(sizeof *job_ptr);
3252 job_ptr->magic = JOB_MAGIC;
3253 job_ptr->details = xmalloc(sizeof *(job_ptr->details));
3254 job_ptr->details->magic = DETAILS_MAGIC;
3255 job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details));
3256 *job_pptr = job_ptr;
3257
3258 if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3259 safe_unpack32(&job_ptr->array_job_id, buffer);
3260 safe_unpack32(&job_ptr->array_task_id, buffer);
3261 unpack_dep_list(&job_ptr->details->depend_list, buffer,
3262 protocol_version);
3263 safe_unpackstr_xmalloc(&job_ptr->details->dependency,
3264 &uint32_tmp, buffer);
3265 safe_unpackbool(&is_array, buffer);
3266 if (is_array)
3267 job_ptr->array_recs =
3268 xmalloc(sizeof *(job_ptr->array_recs));
3269 safe_unpack32(&job_ptr->job_id, buffer);
3270 safe_unpackstr_xmalloc(&job_ptr->name, &uint32_tmp, buffer);
3271 safe_unpack32(&job_ptr->user_id, buffer);
3272 } else {
3273 error("%s: protocol_version %hu not supported.",
3274 __func__, protocol_version);
3275 goto unpack_error;
3276 }
3277
3278 return SLURM_SUCCESS;
3279
3280 unpack_error:
3281 _destroy_dep_job(job_ptr);
3282 *job_pptr = NULL;
3283 return SLURM_ERROR;
3284 }
3285
_dump_remote_dep_job_list(Buf buffer,uint16_t protocol_version)3286 static void _dump_remote_dep_job_list(Buf buffer, uint16_t protocol_version)
3287 {
3288 uint32_t count = NO_VAL;
3289 job_record_t *job_ptr;
3290
3291 if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3292 slurm_mutex_lock(&dep_job_list_mutex);
3293 if (remote_dep_job_list)
3294 count = list_count(remote_dep_job_list);
3295 else
3296 count = NO_VAL;
3297 pack32(count, buffer);
3298 if (count && (count != NO_VAL)) {
3299 ListIterator itr =
3300 list_iterator_create(remote_dep_job_list);
3301 while ((job_ptr = list_next(itr)))
3302 _pack_remote_dep_job(job_ptr, buffer,
3303 protocol_version);
3304 list_iterator_destroy(itr);
3305 }
3306 slurm_mutex_unlock(&dep_job_list_mutex);
3307 } else {
3308 error("%s: protocol_version %hu not supported.",
3309 __func__, protocol_version);
3310 }
3311 }
3312
_load_remote_dep_job_list(Buf buffer,uint16_t protocol_version)3313 static List _load_remote_dep_job_list(Buf buffer, uint16_t protocol_version)
3314 {
3315 uint32_t count, i;
3316 List tmp_list = NULL;
3317 job_record_t *job_ptr = NULL;
3318
3319 if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3320 safe_unpack32(&count, buffer);
3321 if (count > NO_VAL)
3322 goto unpack_error;
3323 if (count != NO_VAL) {
3324 tmp_list = list_create(_destroy_dep_job);
3325 for (i = 0; i < count; i++) {
3326 if (_unpack_remote_dep_job(&job_ptr, buffer,
3327 protocol_version))
3328 goto unpack_error;
3329 list_append(tmp_list, job_ptr);
3330 }
3331 }
3332 } else if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3333 /*
3334 * This function didn't exist until 20.02. Add this block
3335 * to silence errors caused by upgrading an 18.08 or 19.05
3336 * versioned state file.
3337 */
3338 debug3("%s: old protocol version", __func__);
3339 } else {
3340 error("%s: protocol_version %hu not supported.",
3341 __func__, protocol_version);
3342 goto unpack_error;
3343 }
3344 return tmp_list;
3345
3346 unpack_error:
3347 FREE_NULL_LIST(tmp_list);
3348 return NULL;
3349 }
3350
fed_mgr_state_save(char * state_save_location)3351 extern int fed_mgr_state_save(char *state_save_location)
3352 {
3353 int error_code = 0, log_fd;
3354 char *old_file = NULL, *new_file = NULL, *reg_file = NULL;
3355 slurmctld_lock_t fed_read_lock = {
3356 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
3357
3358 Buf buffer = init_buf(0);
3359
3360 DEF_TIMERS;
3361
3362 START_TIMER;
3363
3364 /* write header: version, time */
3365 pack16(SLURM_PROTOCOL_VERSION, buffer);
3366 pack_time(time(NULL), buffer);
3367
3368 lock_slurmctld(fed_read_lock);
3369 slurmdb_pack_federation_rec(fed_mgr_fed_rec, SLURM_PROTOCOL_VERSION,
3370 buffer);
3371 unlock_slurmctld(fed_read_lock);
3372
3373 _dump_fed_job_list(buffer, SLURM_PROTOCOL_VERSION);
3374 _dump_remote_dep_job_list(buffer, SLURM_PROTOCOL_VERSION);
3375
3376 /* write the buffer to file */
3377 reg_file = xstrdup_printf("%s/%s", state_save_location,
3378 FED_MGR_STATE_FILE);
3379 old_file = xstrdup_printf("%s.old", reg_file);
3380 new_file = xstrdup_printf("%s.new", reg_file);
3381
3382 log_fd = creat(new_file, 0600);
3383 if (log_fd < 0) {
3384 error("Can't save state, create file %s error %m", new_file);
3385 error_code = errno;
3386 } else {
3387 int pos = 0, nwrite = get_buf_offset(buffer), amount;
3388 char *data = (char *)get_buf_data(buffer);
3389 while (nwrite > 0) {
3390 amount = write(log_fd, &data[pos], nwrite);
3391 if ((amount < 0) && (errno != EINTR)) {
3392 error("Error writing file %s, %m", new_file);
3393 error_code = errno;
3394 break;
3395 }
3396 nwrite -= amount;
3397 pos += amount;
3398 }
3399 fsync(log_fd);
3400 close(log_fd);
3401 }
3402 if (error_code)
3403 (void) unlink(new_file);
3404 else { /* file shuffle */
3405 (void) unlink(old_file);
3406 if (link(reg_file, old_file))
3407 debug4("unable to create link for %s -> %s: %m",
3408 reg_file, old_file);
3409 (void) unlink(reg_file);
3410 if (link(new_file, reg_file))
3411 debug4("unable to create link for %s -> %s: %m",
3412 new_file, reg_file);
3413 (void) unlink(new_file);
3414 }
3415 xfree(old_file);
3416 xfree(reg_file);
3417 xfree(new_file);
3418
3419 free_buf(buffer);
3420
3421 END_TIMER2("fed_mgr_state_save");
3422
3423 return error_code;
3424 }
3425
_state_load(char * state_save_location)3426 static slurmdb_federation_rec_t *_state_load(char *state_save_location)
3427 {
3428 Buf buffer = NULL;
3429 char *state_file;
3430 time_t buf_time;
3431 uint16_t ver = 0;
3432 int error_code = SLURM_SUCCESS;
3433 slurmdb_federation_rec_t *ret_fed = NULL;
3434 List tmp_list = NULL;
3435
3436 slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
3437
3438 state_file = xstrdup_printf("%s/%s", state_save_location,
3439 FED_MGR_STATE_FILE);
3440 if (!(buffer = create_mmap_buf(state_file))) {
3441 error("No fed_mgr state file (%s) to recover", state_file);
3442 xfree(state_file);
3443 return NULL;
3444 }
3445 xfree(state_file);
3446
3447 safe_unpack16(&ver, buffer);
3448
3449 debug3("Version in fed_mgr_state header is %u", ver);
3450 if (ver > SLURM_PROTOCOL_VERSION || ver < SLURM_MIN_PROTOCOL_VERSION) {
3451 if (!ignore_state_errors)
3452 fatal("Can not recover fed_mgr state, incompatible version, got %u need > %u <= %u, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.",
3453 ver, SLURM_MIN_PROTOCOL_VERSION,
3454 SLURM_PROTOCOL_VERSION);
3455 error("***********************************************");
3456 error("Can not recover fed_mgr state, incompatible version, "
3457 "got %u need > %u <= %u", ver,
3458 SLURM_MIN_PROTOCOL_VERSION, SLURM_PROTOCOL_VERSION);
3459 error("***********************************************");
3460 free_buf(buffer);
3461 return NULL;
3462 }
3463
3464 safe_unpack_time(&buf_time, buffer);
3465
3466 error_code = slurmdb_unpack_federation_rec((void **)&ret_fed, ver,
3467 buffer);
3468 if (error_code != SLURM_SUCCESS)
3469 goto unpack_error;
3470 else if (!ret_fed || !ret_fed->name ||
3471 !list_count(ret_fed->cluster_list)) {
3472 slurmdb_destroy_federation_rec(ret_fed);
3473 ret_fed = NULL;
3474 debug("No feds to retrieve from state");
3475 } else {
3476 /* We want to free the connections here since they don't exist
3477 * anymore, but they were packed when state was saved. */
3478 slurmdb_cluster_rec_t *cluster;
3479 ListIterator itr = list_iterator_create(
3480 ret_fed->cluster_list);
3481 while ((cluster = list_next(itr))) {
3482 slurm_persist_conn_destroy(cluster->fed.recv);
3483 cluster->fed.recv = NULL;
3484 slurm_persist_conn_destroy(cluster->fed.send);
3485 cluster->fed.send = NULL;
3486 }
3487 list_iterator_destroy(itr);
3488 }
3489
3490 /* Load in fed_job_list and transfer objects to actual fed_job_list only
3491 * if there is an actual job for the job */
3492 if ((tmp_list = _load_fed_job_list(buffer, ver))) {
3493 fed_job_info_t *tmp_info;
3494
3495 slurm_mutex_lock(&fed_job_list_mutex);
3496 if (fed_job_list) {
3497 lock_slurmctld(job_read_lock);
3498 while ((tmp_info = list_pop(tmp_list))) {
3499 if (find_job_record(tmp_info->job_id))
3500 list_append(fed_job_list, tmp_info);
3501 else
3502 xfree(tmp_info);
3503 }
3504 unlock_slurmctld(job_read_lock);
3505 }
3506 slurm_mutex_unlock(&fed_job_list_mutex);
3507
3508 }
3509 FREE_NULL_LIST(tmp_list);
3510
3511 /*
3512 * Load in remote_dep_job_list and transfer to actual
3513 * remote_dep_job_list. If the actual list already has that job,
3514 * just throw away this one.
3515 */
3516 if ((tmp_list = _load_remote_dep_job_list(buffer, ver))) {
3517 job_record_t *tmp_dep_job;
3518 slurm_mutex_lock(&dep_job_list_mutex);
3519 while ((tmp_dep_job = list_pop(tmp_list))) {
3520 if (!remote_dep_job_list)
3521 remote_dep_job_list =
3522 list_create(_destroy_dep_job);
3523 if (!list_find_first(remote_dep_job_list,
3524 _find_job_by_id,
3525 &tmp_dep_job->job_id) &&
3526 tmp_dep_job->details->dependency) {
3527 list_append(remote_dep_job_list, tmp_dep_job);
3528 } /* else it will get free'd with FREE_NULL_LIST */
3529 }
3530 slurm_mutex_unlock(&dep_job_list_mutex);
3531 }
3532 FREE_NULL_LIST(tmp_list);
3533
3534 free_buf(buffer);
3535
3536 return ret_fed;
3537
3538 unpack_error:
3539 if (!ignore_state_errors)
3540 fatal("Incomplete fed_mgr state file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
3541 error("Incomplete fed_mgr state file");
3542 free_buf(buffer);
3543
3544 return NULL;
3545 }
3546
3547 /*
3548 * Returns federated job id (<local id> + <cluster id>.
3549 * Bits 0-25: Local job id
3550 * Bits 26-31: Cluster id
3551 */
fed_mgr_get_job_id(uint32_t orig)3552 extern uint32_t fed_mgr_get_job_id(uint32_t orig)
3553 {
3554 if (!fed_mgr_cluster_rec)
3555 return orig;
3556 return orig + (fed_mgr_cluster_rec->fed.id << FED_MGR_CLUSTER_ID_BEGIN);
3557 }
3558
3559 /*
3560 * Returns the local job id from a federated job id.
3561 */
fed_mgr_get_local_id(uint32_t id)3562 extern uint32_t fed_mgr_get_local_id(uint32_t id)
3563 {
3564 return id & MAX_JOB_ID;
3565 }
3566
3567 /*
3568 * Returns the cluster id from a federated job id.
3569 */
fed_mgr_get_cluster_id(uint32_t id)3570 extern uint32_t fed_mgr_get_cluster_id(uint32_t id)
3571 {
3572 return id >> FED_MGR_CLUSTER_ID_BEGIN;
3573 }
3574
fed_mgr_add_sibling_conn(slurm_persist_conn_t * persist_conn,char ** out_buffer)3575 extern int fed_mgr_add_sibling_conn(slurm_persist_conn_t *persist_conn,
3576 char **out_buffer)
3577 {
3578 slurmdb_cluster_rec_t *cluster = NULL;
3579 slurmctld_lock_t fed_read_lock = {
3580 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
3581
3582 lock_slurmctld(fed_read_lock);
3583
3584 if (!fed_mgr_fed_rec) {
3585 unlock_slurmctld(fed_read_lock);
3586 *out_buffer = xstrdup_printf(
3587 "no fed_mgr_fed_rec on cluster %s yet.",
3588 slurmctld_conf.cluster_name);
3589 /* This really isn't an error. If the cluster doesn't know it
3590 * is in a federation this could happen on the initial
3591 * connection from a sibling that found out about the addition
3592 * before I did.
3593 */
3594 debug("%s: %s", __func__, *out_buffer);
3595 /* The other side needs to see this as an error though or the
3596 * connection won't be completely established.
3597 */
3598 return SLURM_ERROR;
3599 }
3600
3601 if (!fed_mgr_cluster_rec) {
3602 unlock_slurmctld(fed_read_lock);
3603 *out_buffer = xstrdup_printf(
3604 "no fed_mgr_cluster_rec on cluster %s? "
3605 "This should never happen",
3606 slurmctld_conf.cluster_name);
3607 error("%s: %s", __func__, *out_buffer);
3608 return SLURM_ERROR;
3609 }
3610
3611 if (!(cluster =
3612 fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) {
3613 unlock_slurmctld(fed_read_lock);
3614 *out_buffer = xstrdup_printf(
3615 "%s isn't a known sibling of ours, but tried to connect to cluster %s federation %s",
3616 persist_conn->cluster_name, slurmctld_conf.cluster_name,
3617 fed_mgr_fed_rec->name);
3618 error("%s: %s", __func__, *out_buffer);
3619 return SLURM_ERROR;
3620 }
3621
3622 persist_conn->callback_fini = _persist_callback_fini;
3623 persist_conn->flags |= PERSIST_FLAG_ALREADY_INITED;
3624
3625 /* If this pointer exists it will be handled by the persist_conn code,
3626 * don't free
3627 */
3628 //slurm_persist_conn_destroy(cluster->fed.recv);
3629
3630 /* Preserve the persist_conn so that the cluster can get the remote
3631 * side's hostname and port to talk back to if it doesn't have it yet.
3632 * See _open_controller_conn().
3633 * Don't lock the cluster's lock here because a (almost)deadlock
3634 * could occur if this cluster is opening a connection to the remote
3635 * cluster at the same time the remote cluster is connecting to this
3636 * cluster since the both sides will have the mutex locked in order to
3637 * send/recv. If it did happen the connection will eventually
3638 * timeout and resolved itself. */
3639 cluster->fed.recv = persist_conn;
3640
3641 slurm_persist_conn_recv_thread_init(persist_conn, -1, persist_conn);
3642 _q_send_job_sync(cluster->name);
3643
3644 unlock_slurmctld(fed_read_lock);
3645
3646 return SLURM_SUCCESS;
3647 }
3648
3649 /*
3650 * Convert comma separated list of cluster names to bitmap of cluster ids.
3651 */
_validate_cluster_names(char * clusters,uint64_t * cluster_bitmap)3652 static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap)
3653 {
3654 int rc = SLURM_SUCCESS;
3655 uint64_t cluster_ids = 0;
3656 List cluster_names;
3657
3658 xassert(clusters);
3659
3660 if (!xstrcasecmp(clusters, "all") ||
3661 (clusters && (*clusters == '\0'))) {
3662 cluster_ids = _get_all_sibling_bits();
3663 goto end_it;
3664 }
3665
3666 cluster_names = list_create(xfree_ptr);
3667 if (slurm_addto_char_list(cluster_names, clusters)) {
3668 ListIterator itr = list_iterator_create(cluster_names);
3669 char *cluster_name;
3670 slurmdb_cluster_rec_t *sibling;
3671
3672 while ((cluster_name = list_next(itr))) {
3673 if (!(sibling =
3674 fed_mgr_get_cluster_by_name(cluster_name))) {
3675 error("didn't find requested cluster name %s in list of federated clusters",
3676 cluster_name);
3677 rc = SLURM_ERROR;
3678 break;
3679 }
3680
3681 cluster_ids |= FED_SIBLING_BIT(sibling->fed.id);
3682 }
3683 list_iterator_destroy(itr);
3684 }
3685 FREE_NULL_LIST(cluster_names);
3686
3687 end_it:
3688 if (cluster_bitmap)
3689 *cluster_bitmap = cluster_ids;
3690
3691 return rc;
3692 }
3693
3694 /* Update remote sibling job's viable_siblings bitmaps.
3695 *
3696 * IN job_id - job_id of job to update.
3697 * IN job_specs - job_specs to update job_id with.
3698 * IN viable_sibs - viable siblings bitmap to send to sibling jobs.
3699 * IN update_sibs - bitmap of siblings to update.
3700 */
fed_mgr_update_job(uint32_t job_id,job_desc_msg_t * job_specs,uint64_t update_sibs,uid_t uid)3701 extern int fed_mgr_update_job(uint32_t job_id, job_desc_msg_t *job_specs,
3702 uint64_t update_sibs, uid_t uid)
3703 {
3704 ListIterator sib_itr;
3705 slurmdb_cluster_rec_t *sibling;
3706 fed_job_info_t *job_info;
3707
3708 slurm_mutex_lock(&fed_job_list_mutex);
3709 if (!(job_info = _find_fed_job_info(job_id))) {
3710 error("Didn't find JobId=%u in fed_job_list", job_id);
3711 slurm_mutex_unlock(&fed_job_list_mutex);
3712 return SLURM_ERROR;
3713 }
3714
3715 sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
3716 while ((sibling = list_next(sib_itr))) {
3717 /* Local is handled outside */
3718 if (sibling == fed_mgr_cluster_rec)
3719 continue;
3720
3721 if (!(update_sibs & FED_SIBLING_BIT(sibling->fed.id)))
3722 continue;
3723
3724 if (_persist_update_job(sibling, job_id, job_specs, uid)) {
3725 error("failed to update sibling job on sibling %s",
3726 sibling->name);
3727 continue;
3728 }
3729
3730 job_info->updating_sibs[sibling->fed.id]++;
3731 job_info->updating_time[sibling->fed.id] = time(NULL);
3732 }
3733 list_iterator_destroy(sib_itr);
3734 slurm_mutex_unlock(&fed_job_list_mutex);
3735
3736 return SLURM_SUCCESS;
3737 }
3738
3739 /*
3740 * Submit sibling jobs to designated siblings.
3741 *
3742 * Will update job_desc->fed_siblings_active with the successful submissions.
3743 * Will not send to siblings if they are in
3744 * job_desc->fed_details->siblings_active.
3745 *
3746 * IN job_desc - job_desc containing job_id and fed_siblings_viable of job to be
3747 * submitted.
3748 * IN msg - contains the original job_desc buffer to send to the siblings.
3749 * IN alloc_only - true if just an allocation. false if a batch job.
3750 * IN dest_sibs - bitmap of viable siblings to submit to.
3751 * RET returns SLURM_SUCCESS if all siblings received the job successfully or
3752 * SLURM_ERROR if any siblings failed to receive the job. If a sibling
3753 * fails, then the successful siblings will be updated with the correct
3754 * sibling bitmap.
3755 */
_submit_sibling_jobs(job_desc_msg_t * job_desc,slurm_msg_t * msg,bool alloc_only,uint64_t dest_sibs)3756 static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg,
3757 bool alloc_only, uint64_t dest_sibs)
3758 {
3759 int ret_rc = SLURM_SUCCESS;
3760 ListIterator sib_itr;
3761 sib_msg_t sib_msg = {0};
3762 slurmdb_cluster_rec_t *sibling = NULL;
3763 slurm_msg_t req_msg;
3764 uint16_t last_rpc_version = NO_VAL16;
3765 Buf buffer = NULL;
3766
3767 xassert(job_desc);
3768 xassert(msg);
3769
3770 sib_msg.data_buffer = msg->buffer;
3771 sib_msg.data_offset = msg->body_offset;
3772 sib_msg.data_type = msg->msg_type;
3773 sib_msg.data_version = msg->protocol_version;
3774 sib_msg.fed_siblings = job_desc->fed_siblings_viable;
3775 sib_msg.job_id = job_desc->job_id;
3776 sib_msg.resp_host = job_desc->resp_host;
3777 sib_msg.submit_host = job_desc->alloc_node;
3778
3779 slurm_msg_t_init(&req_msg);
3780 req_msg.msg_type = REQUEST_SIB_MSG;
3781 req_msg.data = &sib_msg;
3782
3783 sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
3784 while ((sibling = list_next(sib_itr))) {
3785 int rc;
3786 if (sibling == fed_mgr_cluster_rec)
3787 continue;
3788
3789 /* Only send to specific siblings */
3790 if (!(dest_sibs & FED_SIBLING_BIT(sibling->fed.id)))
3791 continue;
3792
3793 /* skip sibling if the sibling already has a job */
3794 if (job_desc->fed_siblings_active &
3795 FED_SIBLING_BIT(sibling->fed.id))
3796 continue;
3797
3798 if (alloc_only)
3799 sib_msg.sib_msg_type = FED_JOB_SUBMIT_INT;
3800 else
3801 sib_msg.sib_msg_type = FED_JOB_SUBMIT_BATCH;
3802
3803 /* Pack message buffer according to sibling's rpc version. A
3804 * submission from a client will already have a buffer with the
3805 * packed job_desc from the client. If this controller is
3806 * submitting new sibling jobs then the buffer needs to be
3807 * packed according to each siblings rpc_version. */
3808 if (!msg->buffer &&
3809 (last_rpc_version != sibling->rpc_version)) {
3810 free_buf(buffer);
3811 msg->protocol_version = sibling->rpc_version;
3812 buffer = init_buf(BUF_SIZE);
3813 pack_msg(msg, buffer);
3814 sib_msg.data_buffer = buffer;
3815 sib_msg.data_version = msg->protocol_version;
3816
3817 last_rpc_version = sibling->rpc_version;
3818 }
3819
3820 req_msg.protocol_version = sibling->rpc_version;
3821
3822 if (!(rc = _queue_rpc(sibling, &req_msg, 0, false)))
3823 job_desc->fed_siblings_active |=
3824 FED_SIBLING_BIT(sibling->fed.id);
3825 ret_rc |= rc;
3826 }
3827 list_iterator_destroy(sib_itr);
3828
3829 free_buf(buffer);
3830
3831 return ret_rc;
3832 }
3833
3834 /*
3835 * Prepare and submit new sibling jobs built from an existing job.
3836 *
3837 * IN job_ptr - job to submit to remote siblings.
3838 * IN dest_sibs - bitmap of viable siblings to submit to.
3839 */
_prepare_submit_siblings(job_record_t * job_ptr,uint64_t dest_sibs)3840 static int _prepare_submit_siblings(job_record_t *job_ptr, uint64_t dest_sibs)
3841 {
3842 int rc = SLURM_SUCCESS;
3843 uint32_t origin_id;
3844 job_desc_msg_t *job_desc;
3845 slurm_msg_t msg;
3846
3847 xassert(job_ptr);
3848 xassert(job_ptr->details);
3849
3850 if (!_is_fed_job(job_ptr, &origin_id))
3851 return SLURM_SUCCESS;
3852
3853 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
3854 info("submitting new siblings for %pJ", job_ptr);
3855
3856 if (!(job_desc = copy_job_record_to_job_desc(job_ptr)))
3857 return SLURM_ERROR;
3858
3859 /*
3860 * Since job_ptr could have had defaults filled on the origin cluster,
3861 * clear these before sibling submission if default flag is set
3862 */
3863 if (job_desc->bitflags & USE_DEFAULT_ACCT)
3864 xfree(job_desc->account);
3865 if (job_desc->bitflags & USE_DEFAULT_PART)
3866 xfree(job_desc->partition);
3867 if (job_desc->bitflags & USE_DEFAULT_QOS)
3868 xfree(job_desc->qos);
3869 if (job_desc->bitflags & USE_DEFAULT_WCKEY)
3870 xfree(job_desc->wckey);
3871
3872 /* Have to pack job_desc into a buffer. _submit_sibling_jobs will pack
3873 * the job_desc according to each sibling's rpc_version. */
3874 slurm_msg_t_init(&msg);
3875 msg.msg_type = REQUEST_RESOURCE_ALLOCATION;
3876 msg.data = job_desc;
3877
3878 if (_submit_sibling_jobs(job_desc, &msg, false, dest_sibs))
3879 error("Failed to submit fed job to siblings");
3880
3881 /* mark this cluster as an active sibling */
3882 if (job_desc->fed_siblings_viable &
3883 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))
3884 job_desc->fed_siblings_active |=
3885 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
3886
3887 /* Add new active jobs to siblings_active bitmap */
3888 job_ptr->fed_details->siblings_active |= job_desc->fed_siblings_active;
3889 update_job_fed_details(job_ptr);
3890
3891 /* free the environment since all strings are stored in one
3892 * xmalloced buffer */
3893 if (job_desc->environment) {
3894 xfree(job_desc->environment[0]);
3895 xfree(job_desc->environment);
3896 job_desc->env_size = 0;
3897 }
3898 slurm_free_job_desc_msg(job_desc);
3899
3900 return rc;
3901 }
3902
_get_all_sibling_bits()3903 static uint64_t _get_all_sibling_bits()
3904 {
3905 ListIterator itr;
3906 slurmdb_cluster_rec_t *cluster;
3907 uint64_t sib_bits = 0;
3908
3909 if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
3910 goto fini;
3911
3912 itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
3913 while ((cluster = list_next(itr))) {
3914 sib_bits |= FED_SIBLING_BIT(cluster->fed.id);
3915 }
3916 list_iterator_destroy(itr);
3917
3918 fini:
3919 return sib_bits;
3920 }
3921
_remove_inactive_sibs(void * object,void * arg)3922 static int _remove_inactive_sibs(void *object, void *arg)
3923 {
3924 slurmdb_cluster_rec_t *sibling = (slurmdb_cluster_rec_t *)object;
3925 uint64_t *viable_sibs = (uint64_t *)arg;
3926 uint32_t cluster_state = sibling->fed.state;
3927 int base_state = (cluster_state & CLUSTER_FED_STATE_BASE);
3928 bool drain_flag = (cluster_state & CLUSTER_FED_STATE_DRAIN);
3929
3930 if (drain_flag ||
3931 (base_state == CLUSTER_FED_STATE_INACTIVE))
3932 *viable_sibs &= ~(FED_SIBLING_BIT(sibling->fed.id));
3933
3934 return SLURM_SUCCESS;
3935 }
3936
_get_viable_sibs(char * req_clusters,uint64_t feature_sibs,bool is_array_job,char ** err_msg)3937 static uint64_t _get_viable_sibs(char *req_clusters, uint64_t feature_sibs,
3938 bool is_array_job, char **err_msg)
3939 {
3940 uint64_t viable_sibs = _get_all_sibling_bits();
3941 if (req_clusters)
3942 _validate_cluster_names(req_clusters, &viable_sibs);
3943 if (feature_sibs)
3944 viable_sibs &= feature_sibs;
3945
3946 /* filter out clusters that are inactive or draining */
3947 list_for_each(fed_mgr_fed_rec->cluster_list, _remove_inactive_sibs,
3948 &viable_sibs);
3949
3950 if (is_array_job) { /* lock array jobs to local cluster */
3951 uint32_t tmp_viable = viable_sibs &
3952 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
3953 if (viable_sibs && !tmp_viable) {
3954 info("federated job arrays must run on local cluster");
3955 if (err_msg) {
3956 xfree(*err_msg);
3957 xstrfmtcat(*err_msg, "federated job arrays must run on local cluster");
3958 }
3959 }
3960 viable_sibs = tmp_viable;
3961 }
3962
3963 return viable_sibs;
3964 }
3965
_add_remove_sibling_jobs(job_record_t * job_ptr)3966 static void _add_remove_sibling_jobs(job_record_t *job_ptr)
3967 {
3968 fed_job_info_t *job_info;
3969 uint32_t origin_id = 0;
3970 uint64_t new_sibs = 0, old_sibs = 0, add_sibs = 0,
3971 rem_sibs = 0, feature_sibs = 0;
3972
3973 xassert(job_ptr);
3974
3975 origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
3976
3977 /* if job is not pending then remove removed siblings and add
3978 * new siblings. */
3979 old_sibs = job_ptr->fed_details->siblings_active;
3980
3981 _validate_cluster_features(job_ptr->details->cluster_features,
3982 &feature_sibs);
3983
3984 new_sibs = _get_viable_sibs(job_ptr->clusters, feature_sibs,
3985 job_ptr->array_recs ? true : false, NULL);
3986 job_ptr->fed_details->siblings_viable = new_sibs;
3987
3988 add_sibs = new_sibs & ~old_sibs;
3989 rem_sibs = ~new_sibs & old_sibs;
3990
3991 if (rem_sibs) {
3992 time_t now = time(NULL);
3993 _revoke_sibling_jobs(job_ptr->job_id,
3994 fed_mgr_cluster_rec->fed.id,
3995 rem_sibs, now);
3996 if (fed_mgr_is_origin_job(job_ptr) &&
3997 (rem_sibs & FED_SIBLING_BIT(origin_id))) {
3998 fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0,
3999 now);
4000 }
4001
4002 job_ptr->fed_details->siblings_active &= ~rem_sibs;
4003 }
4004
4005 /* Don't submit new sibilings if the job is held */
4006 if (job_ptr->priority != 0 && add_sibs)
4007 _prepare_submit_siblings(
4008 job_ptr,
4009 job_ptr->fed_details->siblings_viable);
4010
4011 /* unrevoke the origin job */
4012 if (fed_mgr_is_origin_job(job_ptr) &&
4013 (add_sibs & FED_SIBLING_BIT(origin_id)))
4014 job_ptr->job_state &= ~JOB_REVOKED;
4015
4016 /* Can't have the mutex while calling fed_mgr_job_revoke because it will
4017 * lock the mutex as well. */
4018 slurm_mutex_lock(&fed_job_list_mutex);
4019 if ((job_info = _find_fed_job_info(job_ptr->job_id))) {
4020 job_info->siblings_viable =
4021 job_ptr->fed_details->siblings_viable;
4022 job_info->siblings_active =
4023 job_ptr->fed_details->siblings_active;
4024 }
4025 slurm_mutex_unlock(&fed_job_list_mutex);
4026
4027 /* Update where sibling jobs are running */
4028 update_job_fed_details(job_ptr);
4029 }
4030
_job_has_pending_updates(fed_job_info_t * job_info)4031 static bool _job_has_pending_updates(fed_job_info_t *job_info)
4032 {
4033 int i;
4034 xassert(job_info);
4035 static const int UPDATE_DELAY = 60;
4036 time_t now = time(NULL);
4037
4038 for (i = 1; i <= MAX_FED_CLUSTERS; i++) {
4039 if (job_info->updating_sibs[i]) {
4040 if (job_info->updating_time[i] > (now - UPDATE_DELAY)) {
4041 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4042 info("JobId=%u is waiting for %d update responses from cluster id %d",
4043 job_info->job_id,
4044 job_info->updating_sibs[i], i);
4045 return true;
4046 } else {
4047 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4048 info("JobId=%u is had pending updates (%d) for cluster id %d, but haven't heard back from it for %ld seconds. Clearing the cluster's updating state",
4049 job_info->job_id,
4050 job_info->updating_sibs[i],
4051 i,
4052 now - job_info->updating_time[i]);
4053 job_info->updating_sibs[i] = 0;
4054 }
4055 }
4056
4057 }
4058
4059 return false;
4060 }
4061
4062 /*
4063 * Validate requested job cluster features against each cluster's features.
4064 *
4065 * IN spec_features - cluster features that the job requested.
4066 * OUT cluster_bitmap - bitmap of clusters that have matching features.
4067 * RET SLURM_ERROR if no cluster has any of the requested features,
4068 * SLURM_SUCESS otherwise.
4069 */
_validate_cluster_features(char * spec_features,uint64_t * cluster_bitmap)4070 static int _validate_cluster_features(char *spec_features,
4071 uint64_t *cluster_bitmap)
4072 {
4073 int rc = SLURM_SUCCESS;
4074 bool negative_logic = false;
4075 uint64_t feature_sibs = 0;
4076 char *feature = NULL;
4077 slurmdb_cluster_rec_t *sib;
4078 List req_features;
4079 ListIterator feature_itr, sib_itr;
4080
4081 if (!spec_features || !fed_mgr_fed_rec) {
4082 if (cluster_bitmap)
4083 *cluster_bitmap = feature_sibs;
4084 return rc;
4085 }
4086
4087 if (*spec_features == '\0') {
4088 if (cluster_bitmap)
4089 *cluster_bitmap = _get_all_sibling_bits();
4090 return rc;
4091 }
4092
4093 req_features = list_create(xfree_ptr);
4094 slurm_addto_char_list(req_features, spec_features);
4095
4096 feature_itr = list_iterator_create(req_features);
4097 sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
4098
4099 feature = list_peek(req_features);
4100 if (feature && feature[0] == '!') {
4101 feature_sibs = _get_all_sibling_bits();
4102 negative_logic = true;
4103 }
4104
4105 while ((feature = list_next(feature_itr))) {
4106 if (negative_logic && feature[0] == '!')
4107 feature++;
4108 bool found = false;
4109 while ((sib = list_next(sib_itr))) {
4110 if (sib->fed.feature_list &&
4111 list_find_first(sib->fed.feature_list,
4112 slurm_find_char_in_list, feature)) {
4113 if (negative_logic) {
4114 feature_sibs &=
4115 ~FED_SIBLING_BIT(sib->fed.id);
4116 } else {
4117 feature_sibs |=
4118 FED_SIBLING_BIT(sib->fed.id);
4119 }
4120 found = true;
4121 }
4122 }
4123
4124 if (!found) {
4125 error("didn't find at least one cluster with the feature '%s'",
4126 feature);
4127 rc = SLURM_ERROR;
4128 goto end_features;
4129 }
4130 if (negative_logic && !feature_sibs) {
4131 error("eliminated all viable clusters with constraint '%s'",
4132 feature);
4133 rc = SLURM_ERROR;
4134 goto end_features;
4135 }
4136 list_iterator_reset(sib_itr);
4137 }
4138 end_features:
4139 list_iterator_destroy(sib_itr);
4140 list_iterator_destroy(feature_itr);
4141 FREE_NULL_LIST(req_features);
4142
4143 if (cluster_bitmap)
4144 *cluster_bitmap = feature_sibs;
4145
4146 return rc;
4147 }
4148
fed_mgr_remove_remote_dependencies(job_record_t * job_ptr)4149 extern void fed_mgr_remove_remote_dependencies(job_record_t *job_ptr)
4150 {
4151 uint32_t origin_id;
4152
4153 if (!_is_fed_job(job_ptr, &origin_id) ||
4154 !fed_mgr_is_origin_job(job_ptr) || !job_ptr->details)
4155 return;
4156
4157 fed_mgr_submit_remote_dependencies(job_ptr, false, true);
4158 }
4159
_add_to_send_list(void * object,void * arg)4160 static int _add_to_send_list(void *object, void *arg)
4161 {
4162 depend_spec_t *dependency = (depend_spec_t *)object;
4163 uint64_t *send_sib_bits = (uint64_t *)arg;
4164 uint32_t cluster_id;
4165
4166 if ((dependency->depend_type == SLURM_DEPEND_SINGLETON) &&
4167 !disable_remote_singleton) {
4168 *send_sib_bits |= _get_all_sibling_bits();
4169 /* Negative value short-circuits list_for_each */
4170 return -1;
4171 }
4172 if (!(dependency->depend_flags & SLURM_FLAGS_REMOTE) ||
4173 (dependency->depend_state != DEPEND_NOT_FULFILLED))
4174 return SLURM_SUCCESS;
4175 cluster_id = fed_mgr_get_cluster_id(dependency->job_id);
4176 *send_sib_bits |= FED_SIBLING_BIT(cluster_id);
4177 return SLURM_SUCCESS;
4178 }
4179
4180 /*
4181 * Send dependencies of job_ptr to siblings.
4182 *
4183 * If the dependency string is NULL, that means we're telling the siblings
4184 * to delete that dependency. Send empty string to indicate that.
4185 *
4186 * If send_all_sibs == true, then send dependencies to all siblings. Otherwise,
4187 * only send dependencies to siblings that own the remote jobs that job_ptr
4188 * depends on. I.e., if a sibling doesn't own any jobs that job_ptr depends on,
4189 * we won't send job_ptr's dependencies to that sibling.
4190 *
4191 * If clear_dependencies == true, then clear the dependencies on the siblings
4192 * where dependencies reside. In this case, use the job's dependency list to
4193 * find out which siblings to send the RPC to if the list is non-NULL. If the
4194 * list is NULL, then we have to send to all siblings.
4195 */
fed_mgr_submit_remote_dependencies(job_record_t * job_ptr,bool send_all_sibs,bool clear_dependencies)4196 extern int fed_mgr_submit_remote_dependencies(job_record_t *job_ptr,
4197 bool send_all_sibs,
4198 bool clear_dependencies)
4199 {
4200 int rc = SLURM_SUCCESS;
4201 uint64_t send_sib_bits = 0;
4202 ListIterator sib_itr;
4203 slurm_msg_t req_msg;
4204 dep_msg_t dep_msg = { 0 };
4205 slurmdb_cluster_rec_t *sibling;
4206 uint32_t origin_id;
4207
4208 if (!_is_fed_job(job_ptr, &origin_id))
4209 return SLURM_SUCCESS;
4210
4211 xassert(job_ptr->details);
4212
4213 dep_msg.job_id = job_ptr->job_id;
4214 dep_msg.job_name = job_ptr->name;
4215 dep_msg.array_job_id = job_ptr->array_job_id;
4216 dep_msg.array_task_id = job_ptr->array_task_id;
4217 dep_msg.is_array = job_ptr->array_recs ? true : false;
4218 dep_msg.user_id = job_ptr->user_id;
4219
4220 if (!job_ptr->details->dependency || clear_dependencies)
4221 /*
4222 * Since we have to pack these values, set dependency to empty
4223 * string and set depend_list to an empty list so we have
4224 * data to pack.
4225 */
4226 dep_msg.dependency = "";
4227 else
4228 dep_msg.dependency = job_ptr->details->dependency;
4229
4230 slurm_msg_t_init(&req_msg);
4231 req_msg.msg_type = REQUEST_SEND_DEP;
4232 req_msg.data = &dep_msg;
4233
4234 if (!job_ptr->details->depend_list)
4235 send_all_sibs = true;
4236 if (!send_all_sibs) {
4237 list_for_each(job_ptr->details->depend_list,
4238 _add_to_send_list, &send_sib_bits);
4239 }
4240
4241 sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
4242 while ((sibling = list_next(sib_itr))) {
4243 if (sibling == fed_mgr_cluster_rec)
4244 continue;
4245 /*
4246 * If we aren't sending the dependency to all siblings and
4247 * there isn't a dependency on this sibling, don't send
4248 * an RPC to this sibling.
4249 */
4250 if (!send_all_sibs &&
4251 !(send_sib_bits & FED_SIBLING_BIT(sibling->fed.id)))
4252 continue;
4253
4254 req_msg.protocol_version = sibling->rpc_version;
4255 rc |= _queue_rpc(sibling, &req_msg, 0, false);
4256 }
4257 list_iterator_destroy(sib_itr);
4258 return rc;
4259 }
4260
4261 /* submit a federated job.
4262 *
4263 * IN msg - msg that contains packed job_desc msg to send to siblings.
4264 * IN job_desc - original job_desc msg.
4265 * IN alloc_only - true if requesting just an allocation (srun/salloc).
4266 * IN uid - uid of user requesting allocation.
4267 * IN protocol_version - version of the code the caller is using
4268 * OUT job_id_ptr - job_id of allocated job
4269 * OUT alloc_code - error_code returned from job_allocate
4270 * OUT err_msg - error message returned if any
4271 * RET returns SLURM_SUCCESS if the allocation was successful, SLURM_ERROR
4272 * otherwise.
4273 */
fed_mgr_job_allocate(slurm_msg_t * msg,job_desc_msg_t * job_desc,bool alloc_only,uid_t uid,uint16_t protocol_version,uint32_t * job_id_ptr,int * alloc_code,char ** err_msg)4274 extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc,
4275 bool alloc_only, uid_t uid,
4276 uint16_t protocol_version,
4277 uint32_t *job_id_ptr, int *alloc_code,
4278 char **err_msg)
4279 {
4280 uint64_t feature_sibs = 0;
4281 job_record_t *job_ptr = NULL;
4282 bool job_held = false;
4283
4284 xassert(msg);
4285 xassert(job_desc);
4286 xassert(job_id_ptr);
4287 xassert(alloc_code);
4288 xassert(err_msg);
4289
4290 if (job_desc->job_id != NO_VAL) {
4291 error("attempt by uid %u to set JobId=%u. "
4292 "specifying a job_id is not allowed when in a federation",
4293 uid, job_desc->job_id);
4294 *alloc_code = ESLURM_INVALID_JOB_ID;
4295 return SLURM_ERROR;
4296 }
4297
4298 if (_validate_cluster_features(job_desc->cluster_features,
4299 &feature_sibs)) {
4300 *alloc_code = ESLURM_INVALID_CLUSTER_FEATURE;
4301 return SLURM_ERROR;
4302 }
4303
4304 /* get job_id now. Can't submit job to get job_id as job_allocate will
4305 * change the job_desc. */
4306 job_desc->job_id = get_next_job_id(false);
4307
4308 /* Set viable siblings */
4309 job_desc->fed_siblings_viable =
4310 _get_viable_sibs(job_desc->clusters, feature_sibs,
4311 (job_desc->array_inx) ? true : false, err_msg);
4312 if (!job_desc->fed_siblings_viable) {
4313 *alloc_code = ESLURM_FED_NO_VALID_CLUSTERS;
4314 return SLURM_ERROR;
4315 }
4316
4317 /* ensure that fed_siblings_active is clear since this is a new job */
4318 job_desc->fed_siblings_active = 0;
4319
4320 /*
4321 * Submit local job first. Then submit to all siblings. If the local job
4322 * fails, then don't worry about sending to the siblings.
4323 */
4324 job_desc->het_job_offset = NO_VAL;
4325 *alloc_code = job_allocate(job_desc, job_desc->immediate, false, NULL,
4326 alloc_only, uid, &job_ptr, err_msg,
4327 protocol_version);
4328
4329 if (!job_ptr || (*alloc_code && job_ptr->job_state == JOB_FAILED)) {
4330 /* There may be an rc but the job won't be failed. Will sit in
4331 * qeueue */
4332 info("failed to submit federated job to local cluster");
4333 return SLURM_ERROR;
4334 }
4335
4336 /* mark this cluster as an active sibling if it's in the viable list */
4337 if (job_desc->fed_siblings_viable &
4338 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))
4339 job_desc->fed_siblings_active |=
4340 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
4341
4342 /* Job is not eligible on origin cluster - mark as revoked. */
4343 if (!(job_ptr->fed_details->siblings_viable &
4344 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
4345 job_ptr->job_state |= JOB_REVOKED;
4346
4347 *job_id_ptr = job_ptr->job_id;
4348
4349 /*
4350 * Don't submit a job with dependencies to siblings - the origin will
4351 * test job dependencies and submit the job to siblings when all
4352 * dependencies are fulfilled.
4353 * job_allocate() calls job_independent() which sets the JOB_DEPENDENT
4354 * flag if the job is dependent, so check this after job_allocate().
4355 */
4356 if ((job_desc->priority == 0) || (job_ptr->bit_flags & JOB_DEPENDENT))
4357 job_held = true;
4358
4359 if (job_held) {
4360 info("Submitted held federated %pJ to %s(self)",
4361 job_ptr, fed_mgr_cluster_rec->name);
4362 } else {
4363 info("Submitted %sfederated %pJ to %s(self)",
4364 (!(job_ptr->fed_details->siblings_viable &
4365 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)) ?
4366 "tracking " : ""),
4367 job_ptr, fed_mgr_cluster_rec->name);
4368 }
4369
4370 /* Update job before submitting sibling jobs so that it will show the
4371 * viable siblings and potentially active local job */
4372 job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active;
4373 update_job_fed_details(job_ptr);
4374
4375 if (!job_held && _submit_sibling_jobs(
4376 job_desc, msg, alloc_only,
4377 job_ptr->fed_details->siblings_viable))
4378 info("failed to submit sibling job to one or more siblings");
4379 /* Send remote dependencies to siblings */
4380 if ((job_ptr->bit_flags & JOB_DEPENDENT) &&
4381 job_ptr->details && job_ptr->details->dependency)
4382 if (fed_mgr_submit_remote_dependencies(job_ptr, false, false))
4383 error("%s: %pJ Failed to send remote dependencies to some or all siblings.",
4384 __func__, job_ptr);
4385
4386 job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active;
4387 update_job_fed_details(job_ptr);
4388
4389 /* Add record to fed job table */
4390 add_fed_job_info(job_ptr);
4391
4392 return SLURM_SUCCESS;
4393 }
4394
4395 /* Tests if the job is a tracker only federated job.
4396 * Tracker only job: a job that shouldn't run on the local cluster but should be
4397 * kept around to facilitate communications for it's sibling jobs on other
4398 * clusters.
4399 */
fed_mgr_is_tracker_only_job(job_record_t * job_ptr)4400 extern bool fed_mgr_is_tracker_only_job(job_record_t *job_ptr)
4401 {
4402 bool rc = false;
4403 uint32_t origin_id;
4404
4405 xassert(job_ptr);
4406
4407 if (!_is_fed_job(job_ptr, &origin_id))
4408 return rc;
4409
4410 if (job_ptr->fed_details &&
4411 (origin_id == fed_mgr_cluster_rec->fed.id) &&
4412 job_ptr->fed_details->siblings_active &&
4413 (!(job_ptr->fed_details->siblings_active &
4414 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))))
4415 rc = true;
4416
4417 if (job_ptr->fed_details &&
4418 job_ptr->fed_details->cluster_lock &&
4419 job_ptr->fed_details->cluster_lock != fed_mgr_cluster_rec->fed.id)
4420 rc = true;
4421
4422 return rc;
4423 }
4424
4425 /* Return the cluster name for the given cluster id.
4426 * Must xfree returned string
4427 */
fed_mgr_get_cluster_name(uint32_t id)4428 extern char *fed_mgr_get_cluster_name(uint32_t id)
4429 {
4430 slurmdb_cluster_rec_t *sibling;
4431 char *name = NULL;
4432
4433 if ((sibling = fed_mgr_get_cluster_by_id(id))) {
4434 name = xstrdup(sibling->name);
4435 }
4436
4437 return name;
4438 }
4439
_is_fed_job(job_record_t * job_ptr,uint32_t * origin_id)4440 static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id)
4441 {
4442 xassert(job_ptr);
4443 xassert(origin_id);
4444
4445 if (!fed_mgr_cluster_rec)
4446 return false;
4447
4448 if ((!job_ptr->fed_details) ||
4449 (!(*origin_id = fed_mgr_get_cluster_id(job_ptr->job_id)))) {
4450 debug2("job %pJ not a federated job", job_ptr);
4451 return false;
4452 }
4453
4454 return true;
4455 }
4456
_job_unlock_spec_sibs(job_record_t * job_ptr,uint64_t spec_sibs)4457 static int _job_unlock_spec_sibs(job_record_t *job_ptr, uint64_t spec_sibs)
4458 {
4459 uint32_t cluster_id = fed_mgr_cluster_rec->fed.id;
4460 slurmdb_cluster_rec_t *sibling;
4461 int sib_id = 1;
4462
4463 while (spec_sibs) {
4464 if (!(spec_sibs & 1))
4465 goto next_unlock;
4466
4467 if (fed_mgr_cluster_rec->fed.id == sib_id)
4468 fed_mgr_job_lock_unset(job_ptr->job_id,
4469 cluster_id);
4470 else if ((sibling = fed_mgr_get_cluster_by_id(sib_id)))
4471 _persist_fed_job_unlock(sibling, job_ptr->job_id,
4472 cluster_id);
4473 next_unlock:
4474 spec_sibs >>= 1;
4475 sib_id++;
4476 }
4477
4478 return SLURM_SUCCESS;
4479 }
4480
4481 /*
4482 * Return SLURM_SUCCESS if all siblings give lock to job; SLURM_ERROR otherwise.
4483 */
_job_lock_all_sibs(job_record_t * job_ptr)4484 static int _job_lock_all_sibs(job_record_t *job_ptr)
4485 {
4486 slurmdb_cluster_rec_t *sibling = NULL;
4487 int sib_id = 1;
4488 bool all_said_yes = true;
4489 uint64_t replied_sibs = 0, tmp_sibs = 0;
4490 uint32_t origin_id, cluster_id;
4491
4492 xassert(job_ptr);
4493
4494 origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
4495 cluster_id = fed_mgr_cluster_rec->fed.id;
4496
4497 tmp_sibs = job_ptr->fed_details->siblings_viable &
4498 (~FED_SIBLING_BIT(origin_id));
4499 while (tmp_sibs) {
4500 if (!(tmp_sibs & 1))
4501 goto next_lock;
4502
4503 if (cluster_id == sib_id) {
4504 if (!fed_mgr_job_lock_set(job_ptr->job_id, cluster_id))
4505 replied_sibs |= FED_SIBLING_BIT(sib_id);
4506 else {
4507 all_said_yes = false;
4508 break;
4509 }
4510 } else if (!(sibling = fed_mgr_get_cluster_by_id(sib_id)) ||
4511 (!sibling->fed.send) ||
4512 (((slurm_persist_conn_t *)sibling->fed.send) < 0)) {
4513 /*
4514 * Don't consider clusters that are down. They will sync
4515 * up later.
4516 */
4517 goto next_lock;
4518 } else if (!_persist_fed_job_lock(sibling, job_ptr->job_id,
4519 cluster_id)) {
4520 replied_sibs |= FED_SIBLING_BIT(sib_id);
4521 } else {
4522 all_said_yes = false;
4523 break;
4524 }
4525
4526 next_lock:
4527 tmp_sibs >>= 1;
4528 sib_id++;
4529 }
4530
4531 /*
4532 * Have to talk to at least one other sibling -- if there is one -- to
4533 * start the job
4534 */
4535 if (all_said_yes &&
4536 (!(job_ptr->fed_details->siblings_viable &
4537 ~FED_SIBLING_BIT(cluster_id)) ||
4538 (replied_sibs & ~(FED_SIBLING_BIT(cluster_id)))))
4539 return SLURM_SUCCESS;
4540
4541 /* have to release the lock on those that said yes */
4542 _job_unlock_spec_sibs(job_ptr, replied_sibs);
4543
4544 return SLURM_ERROR;
4545 }
4546
_slurmdbd_conn_active()4547 static int _slurmdbd_conn_active()
4548 {
4549 int active = 0;
4550
4551 if (acct_storage_g_get_data(acct_db_conn, ACCT_STORAGE_INFO_CONN_ACTIVE,
4552 &active) != SLURM_SUCCESS)
4553 active = 0;
4554
4555 return active;
4556 }
4557
4558 /*
4559 * Attempt to grab the job's federation cluster lock so that the requesting
4560 * cluster can attempt to start to the job.
4561 *
4562 * IN job - job to lock
4563 * RET returns SLURM_SUCCESS if the lock was granted, SLURM_ERROR otherwise
4564 */
fed_mgr_job_lock(job_record_t * job_ptr)4565 extern int fed_mgr_job_lock(job_record_t *job_ptr)
4566 {
4567 int rc = SLURM_SUCCESS;
4568 uint32_t origin_id, cluster_id;
4569
4570 xassert(job_ptr);
4571
4572 if (!_is_fed_job(job_ptr, &origin_id))
4573 return SLURM_SUCCESS;
4574
4575 cluster_id = fed_mgr_cluster_rec->fed.id;
4576
4577 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4578 info("attempting fed job lock on %pJ by cluster_id %d",
4579 job_ptr, cluster_id);
4580
4581 if (origin_id != fed_mgr_cluster_rec->fed.id) {
4582 slurm_persist_conn_t *origin_conn = NULL;
4583 slurmdb_cluster_rec_t *origin_cluster;
4584 if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
4585 info("Unable to find origin cluster for %pJ from origin id %d",
4586 job_ptr, origin_id);
4587 } else
4588 origin_conn = (slurm_persist_conn_t *)
4589 origin_cluster->fed.send;
4590
4591 /* Check dbd is up to make sure ctld isn't on an island. */
4592 if (acct_db_conn && _slurmdbd_conn_active() &&
4593 (!origin_conn || (origin_conn->fd < 0))) {
4594 rc = _job_lock_all_sibs(job_ptr);
4595 } else if (origin_cluster) {
4596 rc = _persist_fed_job_lock(origin_cluster,
4597 job_ptr->job_id,
4598 cluster_id);
4599 } else {
4600 rc = SLURM_ERROR;
4601 }
4602
4603 if (!rc) {
4604 job_ptr->fed_details->cluster_lock = cluster_id;
4605 fed_mgr_job_lock_set(job_ptr->job_id, cluster_id);
4606 }
4607
4608 return rc;
4609 }
4610
4611 /* origin cluster */
4612 rc = fed_mgr_job_lock_set(job_ptr->job_id, cluster_id);
4613
4614 return rc;
4615 }
4616
fed_mgr_job_lock_set(uint32_t job_id,uint32_t cluster_id)4617 extern int fed_mgr_job_lock_set(uint32_t job_id, uint32_t cluster_id)
4618 {
4619 int rc = SLURM_SUCCESS;
4620 fed_job_info_t *job_info;
4621
4622 slurm_mutex_lock(&fed_job_list_mutex);
4623
4624 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4625 info("%s: attempting to set fed JobId=%u lock to %u",
4626 __func__, job_id, cluster_id);
4627
4628 if (!(job_info = _find_fed_job_info(job_id))) {
4629 error("Didn't find JobId=%u in fed_job_list", job_id);
4630 rc = SLURM_ERROR;
4631 } else if (_job_has_pending_updates(job_info)) {
4632 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4633 info("%s: cluster %u can't get cluster lock for JobId=%u because it has pending updates",
4634 __func__, cluster_id, job_id);
4635 rc = SLURM_ERROR;
4636 } else if (job_info->cluster_lock &&
4637 job_info->cluster_lock != cluster_id) {
4638 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4639 info("%s: fed JobId=%u already locked by cluster %d",
4640 __func__, job_id, job_info->cluster_lock);
4641 rc = SLURM_ERROR;
4642 } else {
4643 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4644 info("%s: fed JobId=%u locked by %u",
4645 __func__, job_id, cluster_id);
4646
4647 job_info->cluster_lock = cluster_id;
4648 }
4649
4650 slurm_mutex_unlock(&fed_job_list_mutex);
4651
4652 return rc;
4653 }
4654
fed_mgr_job_is_self_owned(job_record_t * job_ptr)4655 extern bool fed_mgr_job_is_self_owned(job_record_t *job_ptr)
4656 {
4657 if (!fed_mgr_cluster_rec || !job_ptr->fed_details ||
4658 (job_ptr->fed_details->cluster_lock == fed_mgr_cluster_rec->fed.id))
4659 return true;
4660
4661 return false;
4662 }
4663
fed_mgr_job_is_locked(job_record_t * job_ptr)4664 extern bool fed_mgr_job_is_locked(job_record_t *job_ptr)
4665 {
4666 if (!job_ptr->fed_details ||
4667 job_ptr->fed_details->cluster_lock)
4668 return true;
4669
4670 return false;
4671 }
4672
_q_sib_job_start(slurm_msg_t * msg)4673 static void _q_sib_job_start(slurm_msg_t *msg)
4674 {
4675 sib_msg_t *sib_msg = msg->data;
4676 fed_job_update_info_t *job_update_info;
4677
4678 /* add todo to remove remote siblings if the origin job */
4679 job_update_info = xmalloc(sizeof(fed_job_update_info_t));
4680 job_update_info->type = FED_JOB_START;
4681 job_update_info->job_id = sib_msg->job_id;
4682 job_update_info->start_time = sib_msg->start_time;
4683 job_update_info->cluster_lock = sib_msg->cluster_id;
4684
4685 _append_job_update(job_update_info);
4686 }
4687
fed_mgr_job_lock_unset(uint32_t job_id,uint32_t cluster_id)4688 extern int fed_mgr_job_lock_unset(uint32_t job_id, uint32_t cluster_id)
4689 {
4690 int rc = SLURM_SUCCESS;
4691 fed_job_info_t * job_info;
4692
4693 slurm_mutex_lock(&fed_job_list_mutex);
4694
4695 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4696 info("%s: attempting to unlock fed JobId=%u by cluster %u",
4697 __func__, job_id, cluster_id);
4698
4699 if (!(job_info = _find_fed_job_info(job_id))) {
4700 error("Didn't find JobId=%u in fed_job_list", job_id);
4701 rc = SLURM_ERROR;
4702 } else if (job_info->cluster_lock &&
4703 job_info->cluster_lock != cluster_id) {
4704 error("attempt to unlock sib JobId=%u by cluster %d which doesn't have job lock",
4705 job_id, cluster_id);
4706 rc = SLURM_ERROR;
4707 } else {
4708 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4709 info("%s: fed JobId=%u unlocked by %u",
4710 __func__, job_id, cluster_id);
4711 job_info->cluster_lock = 0;
4712 }
4713
4714 slurm_mutex_unlock(&fed_job_list_mutex);
4715
4716 return rc;
4717 }
4718
4719 /*
4720 * Release the job's federation cluster lock so that other cluster's can try to
4721 * start the job.
4722 *
4723 * IN job - job to unlock
4724 * RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise
4725 */
fed_mgr_job_unlock(job_record_t * job_ptr)4726 extern int fed_mgr_job_unlock(job_record_t *job_ptr)
4727 {
4728 int rc = SLURM_SUCCESS;
4729 uint32_t origin_id, cluster_id;
4730
4731 if (!_is_fed_job(job_ptr, &origin_id))
4732 return SLURM_SUCCESS;
4733
4734 cluster_id = fed_mgr_cluster_rec->fed.id;
4735
4736 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4737 info("releasing fed job lock on %pJ by cluster_id %d",
4738 job_ptr, cluster_id);
4739
4740 if (origin_id != fed_mgr_cluster_rec->fed.id) {
4741 slurm_persist_conn_t *origin_conn = NULL;
4742 slurmdb_cluster_rec_t *origin_cluster;
4743 if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
4744 info("Unable to find origin cluster for %pJ from origin id %d",
4745 job_ptr, origin_id);
4746 } else {
4747 origin_conn = (slurm_persist_conn_t *)
4748 origin_cluster->fed.send;
4749 }
4750
4751 if (!origin_conn || (origin_conn->fd < 0)) {
4752 uint64_t tmp_sibs;
4753 tmp_sibs = job_ptr->fed_details->siblings_viable &
4754 ~FED_SIBLING_BIT(origin_id);
4755 rc = _job_unlock_spec_sibs(job_ptr, tmp_sibs);
4756 } else {
4757 rc = _persist_fed_job_unlock(origin_cluster,
4758 job_ptr->job_id,
4759 cluster_id);
4760 }
4761
4762 if (!rc) {
4763 job_ptr->fed_details->cluster_lock = 0;
4764 fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id);
4765 }
4766
4767 return rc;
4768 }
4769
4770 /* Origin Cluster */
4771 rc = fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id);
4772
4773 return rc;
4774 }
4775
4776 /*
4777 * Notify origin cluster that cluster_id started job.
4778 *
4779 * Cancels remaining sibling jobs.
4780 *
4781 * IN job_ptr - job_ptr of job to unlock
4782 * IN start_time - start_time of the job.
4783 * RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise
4784 */
fed_mgr_job_start(job_record_t * job_ptr,time_t start_time)4785 extern int fed_mgr_job_start(job_record_t *job_ptr, time_t start_time)
4786 {
4787 int rc = SLURM_SUCCESS;
4788 uint32_t origin_id, cluster_id;
4789 fed_job_info_t *job_info;
4790
4791 assert(job_ptr);
4792
4793 if (!_is_fed_job(job_ptr, &origin_id))
4794 return SLURM_SUCCESS;
4795
4796 cluster_id = fed_mgr_cluster_rec->fed.id;
4797
4798 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4799 info("start fed %pJ by cluster_id %d",
4800 job_ptr, cluster_id);
4801
4802 if (origin_id != fed_mgr_cluster_rec->fed.id) {
4803 slurm_persist_conn_t *origin_conn = NULL;
4804 slurmdb_cluster_rec_t *origin_cluster;
4805 if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
4806 info("Unable to find origin cluster for %pJ from origin id %d",
4807 job_ptr, origin_id);
4808 } else {
4809 origin_conn = (slurm_persist_conn_t *)
4810 origin_cluster->fed.send;
4811 }
4812
4813 if (!origin_conn || (origin_conn->fd < 0)) {
4814 uint64_t viable_sibs;
4815 viable_sibs = job_ptr->fed_details->siblings_viable;
4816 viable_sibs &= ~FED_SIBLING_BIT(origin_id);
4817 viable_sibs &= ~FED_SIBLING_BIT(cluster_id);
4818 _revoke_sibling_jobs(job_ptr->job_id,
4819 fed_mgr_cluster_rec->fed.id,
4820 viable_sibs, job_ptr->start_time);
4821 rc = SLURM_SUCCESS;
4822 } else {
4823 rc = _persist_fed_job_start(origin_cluster,
4824 job_ptr->job_id, cluster_id,
4825 job_ptr->start_time);
4826 }
4827
4828 if (!rc) {
4829 job_ptr->fed_details->siblings_active =
4830 FED_SIBLING_BIT(cluster_id);
4831 update_job_fed_details(job_ptr);
4832 }
4833
4834 return rc;
4835
4836 }
4837
4838 /* Origin Cluster: */
4839 slurm_mutex_lock(&fed_job_list_mutex);
4840
4841 if (!(job_info = _find_fed_job_info(job_ptr->job_id))) {
4842 error("Didn't find %pJ in fed_job_list", job_ptr);
4843 rc = SLURM_ERROR;
4844 } else if (!job_info->cluster_lock) {
4845 error("attempt to start sib JobId=%u by cluster %u, but it's not locked",
4846 job_info->job_id, cluster_id);
4847 rc = SLURM_ERROR;
4848 } else if (job_info->cluster_lock &&
4849 (job_info->cluster_lock != cluster_id)) {
4850 error("attempt to start sib JobId=%u by cluster %u, which doesn't have job lock",
4851 job_info->job_id, cluster_id);
4852 rc = SLURM_ERROR;
4853 }
4854
4855 if (!rc)
4856 _fed_job_start_revoke(job_info, job_ptr, start_time);
4857
4858 slurm_mutex_unlock(&fed_job_list_mutex);
4859
4860
4861 return rc;
4862 }
4863
4864 /*
4865 * Complete the federated job. If the job ran on a cluster other than the
4866 * origin_cluster then it notifies the origin cluster that the job finished.
4867 *
4868 * Tells the origin cluster to revoke the tracking job.
4869 *
4870 * IN job_ptr - job_ptr of job to complete.
4871 * IN return_code - return code of job
4872 * IN start_time - start time of the job that actually ran.
4873 * RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise
4874 */
fed_mgr_job_complete(job_record_t * job_ptr,uint32_t return_code,time_t start_time)4875 extern int fed_mgr_job_complete(job_record_t *job_ptr, uint32_t return_code,
4876 time_t start_time)
4877 {
4878 uint32_t origin_id;
4879
4880 if (job_ptr->bit_flags & SIB_JOB_FLUSH)
4881 return SLURM_SUCCESS;
4882
4883 if (!_is_fed_job(job_ptr, &origin_id))
4884 return SLURM_SUCCESS;
4885
4886 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4887 info("complete fed %pJ by cluster_id %d",
4888 job_ptr, fed_mgr_cluster_rec->fed.id);
4889
4890 if (origin_id == fed_mgr_cluster_rec->fed.id) {
4891 _revoke_sibling_jobs(job_ptr->job_id,
4892 fed_mgr_cluster_rec->fed.id,
4893 job_ptr->fed_details->siblings_active,
4894 job_ptr->start_time);
4895 return SLURM_SUCCESS;
4896 }
4897
4898 slurmdb_cluster_rec_t *conn = fed_mgr_get_cluster_by_id(origin_id);
4899 if (!conn) {
4900 info("Unable to find origin cluster for %pJ from origin id %d",
4901 job_ptr, origin_id);
4902 return SLURM_ERROR;
4903 }
4904
4905 return _persist_fed_job_revoke(conn, job_ptr->job_id,
4906 job_ptr->job_state, return_code,
4907 start_time);
4908 }
4909
4910 /*
4911 * Revoke all sibling jobs.
4912 *
4913 * IN job_ptr - job to revoke sibling jobs from.
4914 * RET SLURM_SUCCESS on success, SLURM_ERROR otherwise.
4915 */
fed_mgr_job_revoke_sibs(job_record_t * job_ptr)4916 extern int fed_mgr_job_revoke_sibs(job_record_t *job_ptr)
4917 {
4918 uint32_t origin_id;
4919 time_t now = time(NULL);
4920
4921 xassert(verify_lock(JOB_LOCK, READ_LOCK));
4922 xassert(verify_lock(FED_LOCK, READ_LOCK));
4923
4924 if (!_is_fed_job(job_ptr, &origin_id))
4925 return SLURM_SUCCESS;
4926
4927 if (origin_id != fed_mgr_cluster_rec->fed.id)
4928 return SLURM_SUCCESS;
4929
4930 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4931 info("revoke fed %pJ's siblings", job_ptr);
4932
4933 _revoke_sibling_jobs(job_ptr->job_id, fed_mgr_cluster_rec->fed.id,
4934 job_ptr->fed_details->siblings_active, now);
4935
4936 return SLURM_SUCCESS;
4937 }
4938
4939 /*
4940 * Revokes the federated job.
4941 *
4942 * IN job_ptr - job_ptr of job to revoke.
4943 * IN job_complete - whether the job is done or not. If completed then sets the
4944 * state to JOB_REVOKED | completed_state. JOB_REVOKED otherwise.
4945 * IN completed_state - state of completed job. Only use if job_complete==true.
4946 * If job_complete==false, then this is unused.
4947 * IN exit_code - exit_code of job.
4948 * IN start_time - start time of the job that actually ran.
4949 * RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise
4950 */
fed_mgr_job_revoke(job_record_t * job_ptr,bool job_complete,uint32_t completed_state,uint32_t exit_code,time_t start_time)4951 extern int fed_mgr_job_revoke(job_record_t *job_ptr, bool job_complete,
4952 uint32_t completed_state, uint32_t exit_code,
4953 time_t start_time)
4954 {
4955 uint32_t origin_id;
4956 uint32_t state = JOB_REVOKED;
4957
4958 if (IS_JOB_COMPLETED(job_ptr)) /* job already completed */
4959 return SLURM_SUCCESS;
4960
4961 if (!_is_fed_job(job_ptr, &origin_id))
4962 return SLURM_SUCCESS;
4963
4964 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4965 info("revoking fed %pJ (%s)",
4966 job_ptr, job_complete ? "REVOKED|CANCELLED" : "REVOKED");
4967
4968 /* Check if the job exited with one of the configured requeue values. */
4969 job_ptr->exit_code = exit_code;
4970 if (job_hold_requeue(job_ptr)) {
4971 batch_requeue_fini(job_ptr);
4972 return SLURM_SUCCESS;
4973 }
4974 /*
4975 * Only set to a "completed" state (i.e., state > JOB_SUSPENDED)
4976 * if job_complete is true.
4977 */
4978 if (job_complete) {
4979 if (completed_state > JOB_SUSPENDED)
4980 state |= completed_state;
4981 else
4982 state |= JOB_CANCELLED;
4983 }
4984
4985 job_ptr->job_state = state;
4986 job_ptr->start_time = start_time;
4987 job_ptr->end_time = start_time;
4988 job_ptr->state_reason = WAIT_NO_REASON;
4989 xfree(job_ptr->state_desc);
4990
4991 /*
4992 * Since the job is purged/revoked quickly on the non-origin side it's
4993 * possible that the job_start message has not been sent yet. Send it
4994 * now so that the db record gets the uid set -- which the complete
4995 * message doesn't send.
4996 */
4997 if (!job_ptr->db_index && (origin_id != fed_mgr_cluster_rec->fed.id)) {
4998 if (IS_JOB_FINISHED(job_ptr))
4999 jobacct_storage_g_job_start(acct_db_conn, job_ptr);
5000 else
5001 info("%s: %pJ isn't finished and isn't an origin job (%u != %u) and doesn't have a db_index yet. We aren't sending a start message to the database.",
5002 __func__, job_ptr, origin_id,
5003 fed_mgr_cluster_rec->fed.id);
5004 }
5005
5006 job_completion_logger(job_ptr, false);
5007
5008 /* Don't remove the origin job */
5009 if (origin_id == fed_mgr_cluster_rec->fed.id)
5010 return SLURM_SUCCESS;
5011
5012 /* Purge the revoked job -- remote only */
5013 unlink_job_record(job_ptr);
5014
5015 return SLURM_SUCCESS;
5016 }
5017
5018 /* Convert cluster ids to cluster names.
5019 *
5020 * RET: return string of comma-separated cluster names.
5021 * Must free returned string.
5022 */
fed_mgr_cluster_ids_to_names(uint64_t cluster_ids)5023 extern char *fed_mgr_cluster_ids_to_names(uint64_t cluster_ids)
5024 {
5025 int bit = 1;
5026 char *names = NULL;
5027
5028 if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
5029 return names;
5030
5031 while (cluster_ids) {
5032 if (cluster_ids & 1) {
5033 slurmdb_cluster_rec_t *sibling;
5034 if ((sibling = fed_mgr_get_cluster_by_id(bit))) {
5035 xstrfmtcat(names, "%s%s",
5036 (names) ? "," : "", sibling->name);
5037 } else {
5038 error("Couldn't find a sibling cluster with id %d",
5039 bit);
5040 }
5041 }
5042
5043 cluster_ids >>= 1;
5044 bit++;
5045 }
5046
5047 return names;
5048 }
5049
5050 /*
5051 * Tests whether a federated job can be requeued.
5052 *
5053 * If called from the remote cluster (non-origin) then it will send a requeue
5054 * request to the origin to have the origin cancel this job. In this case, it
5055 * will return success and set the JOB_REQUEUE_FED flag and wait to be killed.
5056 *
5057 * If it is the origin job, it will also cancel a running remote job. New
5058 * federated sibling jobs will be submitted after the job has completed (e.g.
5059 * after epilog) in fed_mgr_job_requeue().
5060 *
5061 * IN job_ptr - job to requeue.
5062 * IN flags - flags for the requeue (e.g. JOB_RECONFIG_FAIL).
5063 * RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR
5064 * otherwise.
5065 */
fed_mgr_job_requeue_test(job_record_t * job_ptr,uint32_t flags)5066 extern int fed_mgr_job_requeue_test(job_record_t *job_ptr, uint32_t flags)
5067 {
5068 uint32_t origin_id;
5069
5070 if (!_is_fed_job(job_ptr, &origin_id))
5071 return SLURM_SUCCESS;
5072
5073 if (origin_id != fed_mgr_cluster_rec->fed.id) {
5074 slurmdb_cluster_rec_t *origin_cluster;
5075 if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
5076 error("Unable to find origin cluster for %pJ from origin id %d",
5077 job_ptr, origin_id);
5078 return SLURM_ERROR;
5079 }
5080
5081 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5082 info("requeueing fed job %pJ on origin cluster %d",
5083 job_ptr, origin_id);
5084
5085 _persist_fed_job_requeue(origin_cluster, job_ptr->job_id,
5086 flags);
5087
5088 job_ptr->job_state |= JOB_REQUEUE_FED;
5089
5090 return SLURM_SUCCESS;
5091 }
5092
5093 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5094 info("requeueing fed %pJ by cluster_id %d",
5095 job_ptr, fed_mgr_cluster_rec->fed.id);
5096
5097 /* If the job is currently running locally, then cancel the running job
5098 * and set a flag that it's being requeued. Then when the epilog
5099 * complete comes in submit the siblings to the other clusters.
5100 * Have to check this after checking for origin else it won't get to the
5101 * origin. */
5102 if (IS_JOB_RUNNING(job_ptr))
5103 return SLURM_SUCCESS;
5104
5105 /* If a sibling job is running remotely, then cancel the remote job and
5106 * wait till job finishes (e.g. after long epilog) and then resubmit the
5107 * siblings in fed_mgr_job_requeue(). */
5108 if (IS_JOB_PENDING(job_ptr) && IS_JOB_REVOKED(job_ptr)) {
5109 slurmdb_cluster_rec_t *remote_cluster;
5110 if (!(remote_cluster =
5111 fed_mgr_get_cluster_by_id(
5112 job_ptr->fed_details->cluster_lock))) {
5113 error("Unable to find remote cluster for %pJ from cluster lock %d",
5114 job_ptr, job_ptr->fed_details->cluster_lock);
5115 return SLURM_ERROR;
5116 }
5117
5118 if (_persist_fed_job_cancel(remote_cluster, job_ptr->job_id,
5119 SIGKILL, KILL_FED_REQUEUE, 0)) {
5120 error("failed to kill/requeue fed %pJ",
5121 job_ptr);
5122 }
5123 }
5124
5125 return SLURM_SUCCESS;
5126 }
5127
5128 /*
5129 * Submits requeued sibling jobs.
5130 *
5131 * IN job_ptr - job to requeue.
5132 * RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR
5133 * otherwise.
5134 */
fed_mgr_job_requeue(job_record_t * job_ptr)5135 extern int fed_mgr_job_requeue(job_record_t *job_ptr)
5136 {
5137 int rc = SLURM_SUCCESS;
5138 uint32_t origin_id;
5139 uint64_t feature_sibs = 0;
5140 fed_job_info_t *job_info;
5141
5142 xassert(job_ptr);
5143 xassert(job_ptr->details);
5144
5145 if (!_is_fed_job(job_ptr, &origin_id))
5146 return SLURM_SUCCESS;
5147
5148 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5149 info("requeueing fed job %pJ", job_ptr);
5150
5151 /* clear where actual siblings were */
5152 job_ptr->fed_details->siblings_active = 0;
5153
5154 slurm_mutex_lock(&fed_job_list_mutex);
5155 if (!(job_info = _find_fed_job_info(job_ptr->job_id))) {
5156 error("%s: failed to find fed job info for fed %pJ",
5157 __func__, job_ptr);
5158 }
5159
5160 /* don't submit siblings for jobs that are held */
5161 if (job_ptr->priority == 0) {
5162 job_ptr->job_state &= (~JOB_REQUEUE_FED);
5163
5164 update_job_fed_details(job_ptr);
5165
5166 /* clear cluster lock */
5167 job_ptr->fed_details->cluster_lock = 0;
5168 if (job_info)
5169 job_info->cluster_lock = 0;
5170
5171 slurm_mutex_unlock(&fed_job_list_mutex);
5172 return SLURM_SUCCESS;
5173 }
5174
5175 /* Don't worry about testing which clusters can start the job the
5176 * soonest since they can't start the job for 120 seconds anyways. */
5177
5178 /* Get new viable siblings since the job might just have one viable
5179 * sibling listed if the sibling was the cluster that could start the
5180 * job the soonest. */
5181 _validate_cluster_features(job_ptr->details->cluster_features,
5182 &feature_sibs);
5183 job_ptr->fed_details->siblings_viable =
5184 _get_viable_sibs(job_ptr->clusters, feature_sibs,
5185 job_ptr->array_recs ? true : false, NULL);
5186
5187 _prepare_submit_siblings(job_ptr,
5188 job_ptr->fed_details->siblings_viable);
5189
5190 job_ptr->job_state &= (~JOB_REQUEUE_FED);
5191
5192 if (!(job_ptr->fed_details->siblings_viable &
5193 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
5194 job_ptr->job_state |= JOB_REVOKED;
5195 else
5196 job_ptr->job_state &= ~JOB_REVOKED;
5197
5198 /* clear cluster lock */
5199 job_ptr->fed_details->cluster_lock = 0;
5200 if (job_info) {
5201 job_info->cluster_lock = 0;
5202 job_info->siblings_viable =
5203 job_ptr->fed_details->siblings_viable;
5204 job_info->siblings_active =
5205 job_ptr->fed_details->siblings_active;
5206 }
5207 slurm_mutex_unlock(&fed_job_list_mutex);
5208
5209 return rc;
5210 }
5211
5212 /* Cancel sibling jobs. Just send request to itself */
_cancel_sibling_jobs(job_record_t * job_ptr,uint16_t signal,uint16_t flags,uid_t uid,bool kill_viable)5213 static int _cancel_sibling_jobs(job_record_t *job_ptr, uint16_t signal,
5214 uint16_t flags, uid_t uid, bool kill_viable)
5215 {
5216 int id = 1;
5217 uint64_t tmp_sibs;
5218 slurm_persist_conn_t *sib_conn;
5219
5220 if (kill_viable) {
5221 tmp_sibs = job_ptr->fed_details->siblings_viable;
5222 flags |= KILL_NO_SIBS;
5223 } else {
5224 tmp_sibs = job_ptr->fed_details->siblings_active;
5225 flags &= ~KILL_NO_SIBS;
5226 }
5227
5228 while (tmp_sibs) {
5229 if ((tmp_sibs & 1) &&
5230 (id != fed_mgr_cluster_rec->fed.id)) {
5231 slurmdb_cluster_rec_t *cluster =
5232 fed_mgr_get_cluster_by_id(id);
5233 if (!cluster) {
5234 error("couldn't find cluster rec by id %d", id);
5235 goto next_job;
5236 }
5237
5238 /* Don't send request to siblings that are down when
5239 * killing viables */
5240 sib_conn = (slurm_persist_conn_t *)cluster->fed.send;
5241 if (kill_viable && (!sib_conn || sib_conn->fd == -1))
5242 goto next_job;
5243
5244 _persist_fed_job_cancel(cluster, job_ptr->job_id,
5245 signal, flags, uid);
5246 }
5247
5248 next_job:
5249 tmp_sibs >>= 1;
5250 id++;
5251 }
5252
5253 return SLURM_SUCCESS;
5254 }
5255
5256 /* Cancel sibling jobs of a federated job
5257 *
5258 * IN job_ptr - job to cancel
5259 * IN signal - signal to send to job
5260 * IN flags - KILL_.* flags
5261 * IN uid - uid making request
5262 * IN kill_viable - if true cancel viable_sibs, if false cancel active_sibs
5263 */
fed_mgr_job_cancel(job_record_t * job_ptr,uint16_t signal,uint16_t flags,uid_t uid,bool kill_viable)5264 extern int fed_mgr_job_cancel(job_record_t *job_ptr, uint16_t signal,
5265 uint16_t flags, uid_t uid, bool kill_viable)
5266 {
5267 uint32_t origin_id;
5268
5269 xassert(job_ptr);
5270
5271 if (!_is_fed_job(job_ptr, &origin_id))
5272 return SLURM_SUCCESS;
5273
5274 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5275 info("cancel fed %pJ by local cluster", job_ptr);
5276
5277 _cancel_sibling_jobs(job_ptr, signal, flags, uid, kill_viable);
5278
5279 return SLURM_SUCCESS;
5280 }
5281
fed_mgr_job_started_on_sib(job_record_t * job_ptr)5282 extern bool fed_mgr_job_started_on_sib(job_record_t *job_ptr)
5283 {
5284 uint32_t origin_id;
5285
5286 xassert(job_ptr);
5287
5288 /*
5289 * When a sibling starts the job, the job becomes revoked on the origin
5290 * and the job's cluster_lock is set to that sibling's id.
5291 * Don't use fed_mgr_is_origin_job() because that return true if
5292 * _is_fed_job() returns false (the job isn't federated), and that's
5293 * the opposite of what we want here.
5294 */
5295 return _is_fed_job(job_ptr, &origin_id) &&
5296 (fed_mgr_cluster_rec->fed.id == origin_id) &&
5297 IS_JOB_REVOKED(job_ptr) && job_ptr->fed_details->cluster_lock &&
5298 (job_ptr->fed_details->cluster_lock !=
5299 fed_mgr_cluster_rec->fed.id);
5300 }
5301
fed_mgr_is_job_id_in_fed(uint32_t job_id)5302 extern bool fed_mgr_is_job_id_in_fed(uint32_t job_id)
5303 {
5304 uint32_t cluster_id;
5305
5306 if (!fed_mgr_cluster_rec)
5307 return false;
5308
5309 cluster_id = fed_mgr_get_cluster_id(job_id);
5310 if (!cluster_id)
5311 return false;
5312
5313 return FED_SIBLING_BIT(cluster_id) & _get_all_sibling_bits();
5314 }
5315
fed_mgr_is_origin_job(job_record_t * job_ptr)5316 extern int fed_mgr_is_origin_job(job_record_t *job_ptr)
5317 {
5318 uint32_t origin_id;
5319
5320 xassert(job_ptr);
5321
5322 if (!_is_fed_job(job_ptr, &origin_id))
5323 return true;
5324
5325 if (fed_mgr_cluster_rec->fed.id != origin_id)
5326 return false;
5327
5328 return true;
5329 }
5330
5331 /*
5332 * Use this instead of fed_mgr_is_origin_job if job_ptr is not available.
5333 */
fed_mgr_is_origin_job_id(uint32_t job_id)5334 extern bool fed_mgr_is_origin_job_id(uint32_t job_id)
5335 {
5336 uint32_t origin_id = fed_mgr_get_cluster_id(job_id);
5337
5338 if (!fed_mgr_cluster_rec || !origin_id) {
5339 debug2("%s: job %u is not a federated job", __func__, job_id);
5340 return true;
5341 }
5342
5343 if (fed_mgr_cluster_rec->fed.id == origin_id)
5344 return true;
5345 return false;
5346 }
5347
5348 /*
5349 * Check if all siblings have fulfilled the singleton dependency.
5350 * Return true if all clusters have checked in that they've fulfilled this
5351 * singleton dependency.
5352 *
5353 * IN job_ptr - job with dependency to check
5354 * IN dep_ptr - dependency to check. If it's not singleton, just return true.
5355 * IN set_cluster_bit - if true, set the bit for this cluster indicating
5356 * that this cluster has fulfilled the dependency.
5357 */
fed_mgr_is_singleton_satisfied(job_record_t * job_ptr,depend_spec_t * dep_ptr,bool set_cluster_bit)5358 extern bool fed_mgr_is_singleton_satisfied(job_record_t *job_ptr,
5359 depend_spec_t *dep_ptr,
5360 bool set_cluster_bit)
5361 {
5362 uint32_t origin_id;
5363 uint64_t sib_bits;
5364
5365 xassert(job_ptr);
5366 xassert(dep_ptr);
5367
5368 if (!_is_fed_job(job_ptr, &origin_id) || disable_remote_singleton)
5369 return true;
5370 if (dep_ptr->depend_type != SLURM_DEPEND_SINGLETON) {
5371 error("%s: Got non-singleton dependency (type %u) for %pJ. This should never happen.",
5372 __func__, dep_ptr->depend_type, job_ptr);
5373 return true;
5374 }
5375
5376 /* Set the bit for this cluster indicating that it has been satisfied */
5377 if (set_cluster_bit)
5378 dep_ptr->singleton_bits |=
5379 FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
5380
5381 if (fed_mgr_cluster_rec->fed.id != origin_id) {
5382 return true;
5383 }
5384
5385 /*
5386 * Only test for current siblings; if a sibling was removed but
5387 * previously had passed a singleton dependency, that bit may be
5388 * set in dep_ptr->singleton_bits.
5389 */
5390 sib_bits = _get_all_sibling_bits();
5391 return (dep_ptr->singleton_bits & sib_bits) == sib_bits;
5392 }
5393
5394 /*
5395 * Update a job's required clusters.
5396 *
5397 * Results in siblings being removed and added.
5398 *
5399 * IN job_ptr - job to update.
5400 * IN spec_clusters - comma-separated list of cluster names.
5401 * RET return SLURM_SUCCESS on success, error code otherwise.
5402 */
fed_mgr_update_job_clusters(job_record_t * job_ptr,char * spec_clusters)5403 extern int fed_mgr_update_job_clusters(job_record_t *job_ptr,
5404 char *spec_clusters)
5405 {
5406 int rc = SLURM_SUCCESS;
5407 uint32_t origin_id;
5408
5409 xassert(job_ptr);
5410 xassert(spec_clusters);
5411
5412 if (!_is_fed_job(job_ptr, &origin_id)) {
5413 sched_info("update_job: not a fed job");
5414 rc = SLURM_ERROR;
5415 } else if ((!IS_JOB_PENDING(job_ptr)) ||
5416 job_ptr->fed_details->cluster_lock) {
5417 rc = ESLURM_JOB_NOT_PENDING;
5418 } else if (!fed_mgr_fed_rec) {
5419 sched_info("update_job: setting Clusters on a non-active federated cluster for %pJ",
5420 job_ptr);
5421 rc = ESLURM_JOB_NOT_FEDERATED;
5422 } else if (_validate_cluster_names(spec_clusters, NULL)) {
5423 sched_info("update_job: invalid Clusters for %pJ: %s",
5424 job_ptr, spec_clusters);
5425 rc = ESLURM_INVALID_CLUSTER_NAME;
5426 } else {
5427 xfree(job_ptr->clusters);
5428 if (spec_clusters[0] == '\0')
5429 sched_info("update_job: cleared Clusters for %pJ",
5430 job_ptr);
5431 else if (*spec_clusters)
5432 job_ptr->clusters =
5433 xstrdup(spec_clusters);
5434
5435 if (fed_mgr_is_origin_job(job_ptr))
5436 _add_remove_sibling_jobs(job_ptr);
5437 }
5438
5439 return rc;
5440 }
5441
5442 /*
5443 * Update a job's cluster features.
5444 *
5445 * Results in siblings being removed and added.
5446 *
5447 * IN job_ptr - job to update cluster features.
5448 * IN req_features - comma-separated list of feature names.
5449 * RET return SLURM_SUCCESS on success, error code otherwise.
5450 */
fed_mgr_update_job_cluster_features(job_record_t * job_ptr,char * req_features)5451 extern int fed_mgr_update_job_cluster_features(job_record_t *job_ptr,
5452 char *req_features)
5453 {
5454 int rc = SLURM_SUCCESS;
5455 uint32_t origin_id;
5456
5457 xassert(job_ptr);
5458 xassert(req_features);
5459
5460 if (!_is_fed_job(job_ptr, &origin_id)) {
5461 sched_info("update_job: not a fed job");
5462 rc = SLURM_ERROR;
5463 } else if ((!IS_JOB_PENDING(job_ptr)) ||
5464 job_ptr->fed_details->cluster_lock) {
5465 rc = ESLURM_JOB_NOT_PENDING;
5466 } else if (!fed_mgr_fed_rec) {
5467 sched_info("update_job: setting ClusterFeatures on a non-active federated cluster for %pJ",
5468 job_ptr);
5469 rc = ESLURM_JOB_NOT_FEDERATED;
5470 } else if (_validate_cluster_features(req_features, NULL)) {
5471 sched_info("update_job: invalid ClusterFeatures for %pJ",
5472 job_ptr);
5473 rc = ESLURM_INVALID_CLUSTER_FEATURE;
5474 } else {
5475 xfree(job_ptr->details->cluster_features);
5476 if (req_features[0] == '\0')
5477 sched_info("update_job: cleared ClusterFeatures for %pJ",
5478 job_ptr);
5479 else if (*req_features)
5480 job_ptr->details->cluster_features =
5481 xstrdup(req_features);
5482
5483 if (fed_mgr_is_origin_job(job_ptr))
5484 _add_remove_sibling_jobs(job_ptr);
5485 }
5486
5487 return rc;
5488 }
5489
_reconcile_fed_job(job_record_t * job_ptr,reconcile_sib_t * rec_sib)5490 static int _reconcile_fed_job(job_record_t *job_ptr, reconcile_sib_t *rec_sib)
5491 {
5492 int i;
5493 bool found_job = false;
5494 job_info_msg_t *remote_jobs_ptr = rec_sib->job_info_msg;
5495 uint32_t origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
5496 uint32_t sibling_id = rec_sib->sibling_id;
5497 uint64_t sibling_bit = FED_SIBLING_BIT(sibling_id);
5498 char *sibling_name = rec_sib->sibling_name;
5499 slurm_job_info_t *remote_job = NULL;
5500 fed_job_info_t *job_info;
5501
5502 xassert(job_ptr);
5503 xassert(remote_jobs_ptr);
5504
5505 /*
5506 * Only look at jobs that:
5507 * 1. originate from the remote sibling
5508 * 2. originate from this cluster
5509 * 3. if the sibling is in the job's viable list.
5510 */
5511 if (!job_ptr->fed_details ||
5512 !job_ptr->details ||
5513 (job_ptr->details->submit_time >= rec_sib->sync_time) ||
5514 IS_JOB_COMPLETED(job_ptr) || IS_JOB_COMPLETING(job_ptr) ||
5515 ((fed_mgr_get_cluster_id(job_ptr->job_id) != sibling_id) &&
5516 (!fed_mgr_is_origin_job(job_ptr)) &&
5517 (!(job_ptr->fed_details->siblings_viable & sibling_bit)))) {
5518 return SLURM_SUCCESS;
5519 }
5520
5521 for (i = 0; i < remote_jobs_ptr->record_count; i++) {
5522 remote_job = &remote_jobs_ptr->job_array[i];
5523 if (job_ptr->job_id == remote_job->job_id) {
5524 found_job = true;
5525 break;
5526 }
5527 }
5528
5529 /* Jobs that originated on the remote sibling */
5530 if (origin_id == sibling_id) {
5531 if (!found_job ||
5532 (remote_job && IS_JOB_COMPLETED(remote_job))) {
5533 /* origin job is missing on remote sibling or is
5534 * completed. Could have been removed from a clean
5535 * start. */
5536 info("%s: origin %pJ is missing (or completed) from origin %s. Killing this copy of the job",
5537 __func__, job_ptr, sibling_name);
5538 job_ptr->bit_flags |= SIB_JOB_FLUSH;
5539 job_signal(job_ptr, SIGKILL, KILL_NO_SIBS, 0, false);
5540 } else {
5541 info("%s: origin %s still has %pJ",
5542 __func__, sibling_name, job_ptr);
5543 }
5544 /* Jobs that are shared between two the siblings -- not originating from
5545 * either one */
5546 } else if (origin_id != fed_mgr_cluster_rec->fed.id) {
5547 if (!found_job) {
5548 /* Only care about jobs that are currently there. */
5549 } else if (IS_JOB_PENDING(job_ptr) && IS_JOB_CANCELLED(remote_job)) {
5550 info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin and sibling were down",
5551 __func__, job_ptr, sibling_name);
5552 job_ptr->job_state = JOB_CANCELLED;
5553 job_ptr->start_time = remote_job->start_time;
5554 job_ptr->end_time = remote_job->end_time;
5555 job_ptr->state_reason = WAIT_NO_REASON;
5556 xfree(job_ptr->state_desc);
5557 job_completion_logger(job_ptr, false);
5558 } else if (IS_JOB_PENDING(job_ptr) &&
5559 (IS_JOB_RUNNING(remote_job) ||
5560 IS_JOB_COMPLETING(remote_job))) {
5561 info("%s: %pJ is running on sibling %s, must have been started while the origin and sibling were down",
5562 __func__, job_ptr, sibling_name);
5563
5564 fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5565 remote_job->exit_code,
5566 job_ptr->start_time);
5567 /* return now because job_ptr have been free'd */
5568 return SLURM_SUCCESS;
5569 } else if (IS_JOB_PENDING(job_ptr) &&
5570 (IS_JOB_COMPLETED(remote_job))) {
5571 info("%s: %pJ is completed on sibling %s, must have been started and completed while the origin and sibling were down",
5572 __func__, job_ptr, sibling_name);
5573
5574 fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5575 remote_job->exit_code,
5576 job_ptr->start_time);
5577 /* return now because job_ptr have been free'd */
5578 return SLURM_SUCCESS;
5579 }
5580
5581 /* Origin Jobs */
5582 } else if (!found_job) {
5583 info("%s: didn't find %pJ on cluster %s",
5584 __func__, job_ptr, sibling_name);
5585
5586 /* Remove from active siblings */
5587 if (!(job_ptr->fed_details->siblings_active & sibling_bit)) {
5588 /* The sibling is a viable sibling but the sibling is
5589 * not active and there is no job there. This is ok. */
5590 info("%s: %s is a viable but not active sibling of %pJ. This is ok.",
5591 __func__, sibling_name, job_ptr);
5592
5593 #if 0
5594 /* Don't submit new sibling jobs if they're not found on the cluster. They could
5595 * have been removed while the cluster was down. */
5596 } else if (!job_ptr->fed_details->cluster_lock) {
5597 /* If the origin job isn't locked, then submit a sibling
5598 * to this cluster. */
5599 /* Only do this if it was an active job. Could have been
5600 * removed with --cancel-sibling */
5601 info("%s: %s is an active sibling of %pJ, attempting to submit new sibling job to the cluster.",
5602 __func__, sibling_name, job_ptr);
5603 _prepare_submit_siblings(job_ptr, sibling_bit);
5604 #endif
5605 } else if (job_ptr->fed_details->cluster_lock == sibling_id) {
5606 /* The origin thinks that the sibling was running the
5607 * job. It could have completed while this cluster was
5608 * down or the sibling removed it by clearing out jobs
5609 * (e.g. slurmctld -c). */
5610 info("%s: origin %pJ was running on sibling %s, but it's not there. Assuming that the job completed",
5611 __func__, job_ptr, sibling_name);
5612 fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, 0,
5613 job_ptr->start_time);
5614 } else {
5615 /* The origin job has a lock but it's not on the sibling
5616 * being reconciled. The job could have been started by
5617 * another cluster while the sibling was down. Or the
5618 * original sibling job submission could have failed. Or
5619 * the origin started the job on the different sibling
5620 * before the sibling before the sibling went down and
5621 * came back up (normal situation). */
5622 info("%s: origin %pJ is currently locked by sibling %d, this is ok",
5623 __func__, job_ptr,
5624 job_ptr->fed_details->cluster_lock);
5625 job_ptr->fed_details->siblings_active &= ~sibling_bit;
5626 }
5627 } else if (remote_job) {
5628 info("%s: %pJ found on remote sibling %s state:%s",
5629 __func__, job_ptr, sibling_name,
5630 job_state_string(remote_job->job_state));
5631
5632 if (job_ptr->fed_details->cluster_lock == sibling_id) {
5633 if (IS_JOB_COMPLETE(remote_job)) {
5634 info("%s: %pJ on sibling %s is already completed, completing the origin job",
5635 __func__, job_ptr, sibling_name);
5636 fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5637 remote_job->exit_code,
5638 job_ptr->start_time);
5639 } else if (IS_JOB_CANCELLED(remote_job)) {
5640 info("%s: %pJ on sibling %s is already cancelled, completing the origin job",
5641 __func__, job_ptr, sibling_name);
5642 fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5643 remote_job->exit_code,
5644 job_ptr->start_time);
5645 } else if (!IS_JOB_RUNNING(remote_job)) {
5646 /* The job could be pending if it was requeued
5647 * due to node failure */
5648 info("%s: %pJ on sibling %s has job lock but job is not running (state:%s)",
5649 __func__, job_ptr, sibling_name,
5650 job_state_string(remote_job->job_state));
5651 }
5652 } else if (job_ptr->fed_details->cluster_lock) {
5653 /* The remote might have had a sibling job before it
5654 * went away and the origin started another job while it
5655 * was away. The remote job needs to be revoked. */
5656 info("%s: %pJ found on sibling %s but job is locked by cluster id %d",
5657 __func__, job_ptr, sibling_name,
5658 job_ptr->fed_details->cluster_lock);
5659
5660 if (IS_JOB_PENDING(remote_job)) {
5661 info("%s: %pJ is on %s in a pending state but cluster %d has the lock on it -- revoking the remote sibling job",
5662 __func__, job_ptr, sibling_name,
5663 job_ptr->fed_details->cluster_lock);
5664 _revoke_sibling_jobs(
5665 job_ptr->job_id,
5666 fed_mgr_cluster_rec->fed.id,
5667 sibling_bit,
5668 job_ptr->start_time);
5669 } else {
5670 /* should this job get cancelled? Would have to
5671 * check cluster_lock before cancelling it to
5672 * make sure that it's not there. */
5673 info("%s: %pJ has a lock on sibling id %d, but found a non-pending job on sibling %s.",
5674 __func__, job_ptr,
5675 job_ptr->fed_details->cluster_lock,
5676 sibling_name);
5677
5678 _revoke_sibling_jobs(
5679 job_ptr->job_id,
5680 fed_mgr_cluster_rec->fed.id,
5681 sibling_bit,
5682 job_ptr->start_time);
5683 }
5684 } else {
5685 if (!(job_ptr->fed_details->siblings_active &
5686 sibling_bit)) {
5687 info("%s: %pJ on sibling %s but it wasn't in the active list. Adding to active list.",
5688 __func__, job_ptr, sibling_name);
5689 job_ptr->fed_details->siblings_active |=
5690 sibling_bit;
5691 }
5692 if (IS_JOB_CANCELLED(remote_job)) {
5693 info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin was down",
5694 __func__, job_ptr, sibling_name);
5695 job_ptr->job_state = JOB_CANCELLED;
5696 job_ptr->start_time = remote_job->start_time;
5697 job_ptr->end_time = remote_job->end_time;
5698 job_ptr->state_reason = WAIT_NO_REASON;
5699 xfree(job_ptr->state_desc);
5700 job_completion_logger(job_ptr, false);
5701
5702 } else if (IS_JOB_COMPLETED(remote_job)) {
5703 info("%s: %pJ is completed on sibling %s but the origin cluster wasn't part of starting the job, must have been started while the origin was down",
5704 __func__, job_ptr, sibling_name);
5705 _do_fed_job_complete(job_ptr, JOB_CANCELLED,
5706 remote_job->exit_code,
5707 remote_job->start_time);
5708
5709 } else if (IS_JOB_RUNNING(remote_job) ||
5710 IS_JOB_COMPLETING(remote_job)) {
5711 info("%s: origin doesn't think that %pJ should be running on sibling %s but it is. %s could have started the job while this cluster was down.",
5712 __func__, job_ptr, sibling_name,
5713 sibling_name);
5714 /* Job was started while we were down. Set this
5715 * job to RV and cancel other siblings */
5716 fed_job_info_t *job_info;
5717 slurm_mutex_lock(&fed_job_list_mutex);
5718 if ((job_info =
5719 _find_fed_job_info(job_ptr->job_id))) {
5720 job_info->cluster_lock = sibling_id;
5721 job_ptr->fed_details->cluster_lock =
5722 sibling_id;
5723
5724 /* Remove sibling jobs */
5725 _fed_job_start_revoke(
5726 job_info, job_ptr,
5727 remote_job->start_time);
5728
5729 /* Set job as RV to track running job */
5730 fed_mgr_job_revoke(
5731 job_ptr, false,
5732 JOB_CANCELLED, 0,
5733 remote_job->start_time);
5734 }
5735 slurm_mutex_unlock(&fed_job_list_mutex);
5736 }
5737 /* else all good */
5738 }
5739 }
5740
5741 /* Update job_info with updated siblings */
5742 slurm_mutex_lock(&fed_job_list_mutex);
5743 if ((job_info = _find_fed_job_info(job_ptr->job_id))) {
5744 job_info->siblings_viable =
5745 job_ptr->fed_details->siblings_viable;
5746 job_info->siblings_active =
5747 job_ptr->fed_details->siblings_active;
5748 } else {
5749 error("%s: failed to find fed job info for fed %pJ",
5750 __func__, job_ptr);
5751 }
5752 slurm_mutex_unlock(&fed_job_list_mutex);
5753
5754 return SLURM_SUCCESS;
5755 }
5756
5757 /*
5758 * Sync jobs with the given sibling name.
5759 *
5760 * IN sib_name - name of the sibling to sync with.
5761 */
_sync_jobs(const char * sib_name,job_info_msg_t * job_info_msg,time_t sync_time)5762 static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg,
5763 time_t sync_time)
5764 {
5765 ListIterator itr;
5766 reconcile_sib_t rec_sib = {0};
5767 slurmdb_cluster_rec_t *sib;
5768 job_record_t *job_ptr;
5769
5770 if (!(sib = fed_mgr_get_cluster_by_name((char *)sib_name))) {
5771 error("Couldn't find sibling by name '%s'", sib_name);
5772 return SLURM_ERROR;
5773 }
5774
5775 rec_sib.sibling_id = sib->fed.id;
5776 rec_sib.sibling_name = sib->name;
5777 rec_sib.job_info_msg = job_info_msg;
5778 rec_sib.sync_time = sync_time;
5779
5780 itr = list_iterator_create(job_list);
5781 while ((job_ptr = list_next(itr)))
5782 _reconcile_fed_job(job_ptr, &rec_sib);
5783 list_iterator_destroy(itr);
5784
5785 sib->fed.sync_recvd = true;
5786
5787 return SLURM_SUCCESS;
5788 }
5789
5790 /*
5791 * Remove active sibling from the given job.
5792 *
5793 * IN job_id - job_id of job to remove active sibling from.
5794 * IN sib_name - name of sibling job to remove from active siblings.
5795 * RET SLURM_SUCCESS on success, error code on error.
5796 */
fed_mgr_remove_active_sibling(uint32_t job_id,char * sib_name)5797 extern int fed_mgr_remove_active_sibling(uint32_t job_id, char *sib_name)
5798 {
5799 uint32_t origin_id;
5800 job_record_t *job_ptr = NULL;
5801 slurmdb_cluster_rec_t *sibling;
5802
5803 if (!(job_ptr = find_job_record(job_id)))
5804 return ESLURM_INVALID_JOB_ID;
5805
5806 if (!_is_fed_job(job_ptr, &origin_id))
5807 return ESLURM_JOB_NOT_FEDERATED;
5808
5809 if (job_ptr->fed_details->cluster_lock)
5810 return ESLURM_JOB_NOT_PENDING;
5811
5812 if (!(sibling = fed_mgr_get_cluster_by_name(sib_name)))
5813 return ESLURM_INVALID_CLUSTER_NAME;
5814
5815 if (job_ptr->fed_details->siblings_active &
5816 FED_SIBLING_BIT(sibling->fed.id)) {
5817 time_t now = time(NULL);
5818 if (fed_mgr_cluster_rec == sibling)
5819 fed_mgr_job_revoke(job_ptr, false, 0, JOB_CANCELLED,
5820 now);
5821 else
5822 _revoke_sibling_jobs(job_ptr->job_id,
5823 fed_mgr_cluster_rec->fed.id,
5824 FED_SIBLING_BIT(sibling->fed.id),
5825 now);
5826 job_ptr->fed_details->siblings_active &=
5827 ~(FED_SIBLING_BIT(sibling->fed.id));
5828 update_job_fed_details(job_ptr);
5829 }
5830
5831 return SLURM_SUCCESS;
5832 }
5833
_q_sib_job_submission(slurm_msg_t * msg,bool interactive_job)5834 static int _q_sib_job_submission(slurm_msg_t *msg, bool interactive_job)
5835 {
5836 fed_job_update_info_t *job_update_info = NULL;
5837 sib_msg_t *sib_msg = msg->data;
5838 job_desc_msg_t *job_desc = sib_msg->data;
5839 job_desc->job_id = sib_msg->job_id;
5840 job_desc->fed_siblings_viable = sib_msg->fed_siblings;
5841 job_desc->alloc_node = sib_msg->submit_host;
5842 /*
5843 * If the job has a dependency, it won't be submitted to siblings
5844 * or it will be revoked from siblings if it became dependent.
5845 * So, the sibling should ignore job_desc->dependency since it's
5846 */
5847 xfree(job_desc->dependency);
5848 if (interactive_job)
5849 job_desc->resp_host = xstrdup(sib_msg->resp_host);
5850
5851 /* NULL out the data pointer because we are storing the pointer on the
5852 * fed job update queue to be handled later. */
5853 sib_msg->data = NULL;
5854
5855 /* set protocol version to that of the client's version so that
5856 * the job's start_protocol_version is that of the client's and
5857 * not the calling controllers. */
5858 job_update_info = xmalloc(sizeof(fed_job_update_info_t));
5859
5860 job_update_info->job_id = job_desc->job_id;
5861 job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
5862 job_update_info->submit_desc = job_desc;
5863 job_update_info->submit_proto_ver = msg->protocol_version;
5864
5865 if (interactive_job)
5866 job_update_info->type = FED_JOB_SUBMIT_INT;
5867 else
5868 job_update_info->type = FED_JOB_SUBMIT_BATCH;
5869
5870 _append_job_update(job_update_info);
5871
5872 return SLURM_SUCCESS;
5873 }
5874
_q_sib_submit_response(slurm_msg_t * msg)5875 static int _q_sib_submit_response(slurm_msg_t *msg)
5876 {
5877 int rc = SLURM_SUCCESS;
5878 sib_msg_t *sib_msg;
5879 fed_job_update_info_t *job_update_info = NULL;
5880
5881 xassert(msg);
5882 xassert(msg->conn);
5883
5884 sib_msg = msg->data;
5885
5886 /* if failure then remove from active siblings */
5887 if (sib_msg && sib_msg->return_code) {
5888 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5889 info("%s: cluster %s failed to submit sibling JobId=%u. Removing from active_sibs. (error:%d)",
5890 __func__, msg->conn->cluster_name,
5891 sib_msg->job_id,
5892 sib_msg->return_code);
5893
5894 job_update_info = xmalloc(sizeof(fed_job_update_info_t));
5895 job_update_info->job_id = sib_msg->job_id;
5896 job_update_info->type = FED_JOB_REMOVE_ACTIVE_SIB_BIT;
5897 job_update_info->siblings_str =
5898 xstrdup(msg->conn->cluster_name);
5899 _append_job_update(job_update_info);
5900 }
5901
5902 return rc;
5903 }
5904
_q_sib_job_update(slurm_msg_t * msg,uint32_t uid)5905 static int _q_sib_job_update(slurm_msg_t *msg, uint32_t uid)
5906 {
5907 sib_msg_t *sib_msg = msg->data;
5908 job_desc_msg_t *job_desc = sib_msg->data;
5909 fed_job_update_info_t *job_update_info =
5910 xmalloc(sizeof(fed_job_update_info_t));
5911
5912 /* NULL out the data pointer because we are storing the pointer on the
5913 * fed job update queue to be handled later. */
5914 sib_msg->data = NULL;
5915
5916 job_update_info->type = FED_JOB_UPDATE;
5917 job_update_info->submit_desc = job_desc;
5918 job_update_info->job_id = sib_msg->job_id;
5919 job_update_info->uid = uid;
5920 job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
5921
5922 _append_job_update(job_update_info);
5923
5924 return SLURM_SUCCESS;
5925 }
5926
_q_sib_job_cancel(slurm_msg_t * msg,uint32_t uid)5927 static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid)
5928 {
5929 int rc = SLURM_SUCCESS;
5930 uint32_t req_uid;
5931 sib_msg_t *sib_msg = msg->data;
5932 job_step_kill_msg_t *kill_msg = (job_step_kill_msg_t *)sib_msg->data;
5933 fed_job_update_info_t *job_update_info =
5934 xmalloc(sizeof(fed_job_update_info_t));
5935
5936 /* NULL out the data pointer because we are storing the pointer on the
5937 * fed job update queue to be handled later. */
5938 sib_msg->data = NULL;
5939
5940 if (sib_msg->req_uid)
5941 req_uid = sib_msg->req_uid;
5942 else
5943 req_uid = uid;
5944
5945 job_update_info->type = FED_JOB_CANCEL;
5946 job_update_info->job_id = kill_msg->job_id;
5947 job_update_info->kill_msg = kill_msg;
5948 job_update_info->uid = req_uid;
5949
5950 _append_job_update(job_update_info);
5951
5952 return rc;
5953 }
5954
_q_sib_job_complete(slurm_msg_t * msg)5955 static int _q_sib_job_complete(slurm_msg_t *msg)
5956 {
5957 int rc = SLURM_SUCCESS;
5958 sib_msg_t *sib_msg = msg->data;
5959 fed_job_update_info_t *job_update_info =
5960 xmalloc(sizeof(fed_job_update_info_t));
5961
5962 job_update_info->type = FED_JOB_COMPLETE;
5963 job_update_info->job_id = sib_msg->job_id;
5964 job_update_info->job_state = sib_msg->job_state;
5965 job_update_info->start_time = sib_msg->start_time;
5966 job_update_info->return_code = sib_msg->return_code;
5967
5968 _append_job_update(job_update_info);
5969
5970 return rc;
5971 }
5972
_q_sib_job_update_response(slurm_msg_t * msg)5973 static int _q_sib_job_update_response(slurm_msg_t *msg)
5974 {
5975 int rc = SLURM_SUCCESS;
5976 sib_msg_t *sib_msg = msg->data;
5977
5978 fed_job_update_info_t *job_update_info =
5979 xmalloc(sizeof(fed_job_update_info_t));
5980
5981 job_update_info->type = FED_JOB_UPDATE_RESPONSE;
5982 job_update_info->job_id = sib_msg->job_id;
5983 job_update_info->return_code = sib_msg->return_code;
5984 job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
5985
5986 _append_job_update(job_update_info);
5987
5988 return rc;
5989 }
5990
_q_sib_job_requeue(slurm_msg_t * msg,uint32_t uid)5991 static int _q_sib_job_requeue(slurm_msg_t *msg, uint32_t uid)
5992 {
5993 int rc = SLURM_SUCCESS;
5994 sib_msg_t *sib_msg = msg->data;
5995 requeue_msg_t *req_ptr = (requeue_msg_t *)sib_msg->data;
5996 fed_job_update_info_t *job_update_info =
5997 xmalloc(sizeof(fed_job_update_info_t));
5998
5999 job_update_info->type = FED_JOB_REQUEUE;
6000 job_update_info->job_id = req_ptr->job_id;
6001 job_update_info->flags = req_ptr->flags;
6002 job_update_info->uid = uid;
6003
6004 _append_job_update(job_update_info);
6005
6006 return rc;
6007 }
6008
_q_send_job_sync(char * sib_name)6009 static int _q_send_job_sync(char *sib_name)
6010 {
6011 int rc = SLURM_SUCCESS;
6012 fed_job_update_info_t *job_update_info =
6013 xmalloc(sizeof(fed_job_update_info_t));
6014
6015 job_update_info->type = FED_SEND_JOB_SYNC;
6016 job_update_info->submit_cluster = xstrdup(sib_name);
6017
6018 _append_job_update(job_update_info);
6019
6020 return rc;
6021 }
6022
_q_sib_job_sync(slurm_msg_t * msg)6023 static int _q_sib_job_sync(slurm_msg_t *msg)
6024 {
6025 int rc = SLURM_SUCCESS;
6026 sib_msg_t *sib_msg = msg->data;
6027 job_info_msg_t *job_info_msg = (job_info_msg_t *)sib_msg->data;
6028 fed_job_update_info_t *job_update_info =
6029 xmalloc(sizeof(fed_job_update_info_t));
6030
6031 /* NULL out the data pointer because we are storing the pointer on the
6032 * fed job update queue to be handled later. */
6033 sib_msg->data = NULL;
6034
6035 job_update_info->type = FED_JOB_SYNC;
6036 job_update_info->job_info_msg = job_info_msg;
6037 job_update_info->start_time = sib_msg->start_time;
6038 job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
6039
6040 _append_job_update(job_update_info);
6041
6042 return rc;
6043 }
6044
fed_mgr_q_update_origin_dep_msg(slurm_msg_t * msg)6045 extern int fed_mgr_q_update_origin_dep_msg(slurm_msg_t *msg)
6046 {
6047 dep_update_origin_msg_t *update_deps;
6048 dep_update_origin_msg_t *update_msg = msg->data;
6049
6050 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
6051 info("%s: Got %s: Job %u",
6052 __func__, rpc_num2string(msg->msg_type),
6053 update_msg->job_id);
6054
6055 /* update_msg will get free'd, so copy it */
6056 update_deps = xmalloc(sizeof *update_deps);
6057 update_deps->depend_list = update_msg->depend_list;
6058 update_deps->job_id = update_msg->job_id;
6059 /*
6060 * NULL update_msg->depend_list so it doesn't get free'd; we're
6061 * using it later.
6062 */
6063 update_msg->depend_list = NULL;
6064
6065 list_append(origin_dep_update_list, update_deps);
6066 slurm_mutex_lock(&origin_dep_update_mutex);
6067 slurm_cond_broadcast(&origin_dep_cond);
6068 slurm_mutex_unlock(&origin_dep_update_mutex);
6069
6070 return SLURM_SUCCESS;
6071 }
6072
fed_mgr_q_dep_msg(slurm_msg_t * msg)6073 extern int fed_mgr_q_dep_msg(slurm_msg_t *msg)
6074 {
6075 dep_msg_t *remote_dependency;
6076 dep_msg_t *dep_msg = msg->data;
6077
6078 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
6079 info("%s: Got %s: Job %u",
6080 __func__, rpc_num2string(msg->msg_type), dep_msg->job_id);
6081
6082 /* dep_msg will get free'd, so copy it */
6083 remote_dependency = xmalloc(sizeof *remote_dependency);
6084 remote_dependency->job_id = dep_msg->job_id;
6085 remote_dependency->job_name = dep_msg->job_name;
6086 remote_dependency->dependency = dep_msg->dependency;
6087 /* NULL strings so they don't get free'd */
6088 dep_msg->job_name = NULL;
6089 dep_msg->dependency = NULL;
6090 remote_dependency->array_task_id = dep_msg->array_task_id;
6091 remote_dependency->array_job_id = dep_msg->array_job_id;
6092 remote_dependency->is_array = dep_msg->is_array;
6093 remote_dependency->user_id = dep_msg->user_id;
6094
6095 list_append(remote_dep_recv_list, remote_dependency);
6096 slurm_mutex_lock(&remote_dep_recv_mutex);
6097 slurm_cond_broadcast(&remote_dep_cond);
6098 slurm_mutex_unlock(&remote_dep_recv_mutex);
6099 return SLURM_SUCCESS;
6100 }
6101
fed_mgr_q_sib_msg(slurm_msg_t * msg,uint32_t rpc_uid)6102 extern int fed_mgr_q_sib_msg(slurm_msg_t *msg, uint32_t rpc_uid)
6103 {
6104 sib_msg_t *sib_msg = msg->data;
6105
6106 if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
6107 info("%s: sib_msg_type:%s",
6108 __func__, _job_update_type_str(sib_msg->sib_msg_type));
6109
6110 switch (sib_msg->sib_msg_type) {
6111 case FED_JOB_CANCEL:
6112 _q_sib_job_cancel(msg, rpc_uid);
6113 break;
6114 case FED_JOB_COMPLETE:
6115 _q_sib_job_complete(msg);
6116 break;
6117 case FED_JOB_REQUEUE:
6118 _q_sib_job_requeue(msg, rpc_uid);
6119 break;
6120 case FED_JOB_START:
6121 _q_sib_job_start(msg);
6122 break;
6123 case FED_JOB_SUBMIT_BATCH:
6124 _q_sib_job_submission(msg, false);
6125 break;
6126 case FED_JOB_SUBMIT_INT:
6127 _q_sib_job_submission(msg, true);
6128 break;
6129 case FED_JOB_SUBMIT_RESP:
6130 _q_sib_submit_response(msg);
6131 break;
6132 case FED_JOB_SYNC:
6133 _q_sib_job_sync(msg);
6134 break;
6135 case FED_JOB_UPDATE:
6136 _q_sib_job_update(msg, rpc_uid);
6137 break;
6138 case FED_JOB_UPDATE_RESPONSE:
6139 _q_sib_job_update_response(msg);
6140 break;
6141 default:
6142 error("%s: invalid sib_msg_type: %d",
6143 __func__, sib_msg->sib_msg_type);
6144 break;
6145 }
6146
6147 return SLURM_SUCCESS;
6148 }
6149
_list_find_not_synced_sib(void * x,void * key)6150 static int _list_find_not_synced_sib(void *x, void *key)
6151 {
6152 slurmdb_cluster_rec_t *sib = x;
6153
6154 if (sib != fed_mgr_cluster_rec &&
6155 sib->fed.send &&
6156 (((slurm_persist_conn_t *)sib->fed.send)->fd >= 0) &&
6157 !sib->fed.sync_recvd)
6158 return 1;
6159
6160 return 0;
6161 }
6162
fed_mgr_sibs_synced()6163 extern bool fed_mgr_sibs_synced()
6164 {
6165 slurmdb_cluster_rec_t *sib;
6166 int dummy = 1;
6167
6168 if (!fed_mgr_fed_rec)
6169 return true;
6170
6171 if ((sib = list_find_first(fed_mgr_fed_rec->cluster_list,
6172 _list_find_not_synced_sib, &dummy))) {
6173 debug("%s: sibling %s up but not synced yet",
6174 __func__, sib->name);
6175 return false;
6176 }
6177
6178 return true;
6179 }
6180
fed_mgr_test_remote_dependencies(void)6181 extern void fed_mgr_test_remote_dependencies(void)
6182 {
6183 int rc;
6184 uint32_t origin_id;
6185 bool was_changed;
6186 job_record_t *job_ptr;
6187 ListIterator itr;
6188 slurmdb_cluster_rec_t *origin;
6189
6190 xassert(verify_lock(JOB_LOCK, READ_LOCK));
6191 xassert(verify_lock(FED_LOCK, READ_LOCK));
6192
6193 if (!list_count(remote_dep_job_list) || !fed_mgr_fed_rec ||
6194 !fed_mgr_cluster_rec)
6195 return;
6196
6197 slurm_mutex_lock(&dep_job_list_mutex);
6198 itr = list_iterator_create(remote_dep_job_list);
6199 while ((job_ptr = list_next(itr))) {
6200 origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
6201 origin = fed_mgr_get_cluster_by_id(origin_id);
6202 if (!origin) {
6203 /*
6204 * The origin probably left the federation. If it comes
6205 * back there's no guarantee it will have the same
6206 * cluster id as before.
6207 */
6208 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
6209 info("%s: Couldn't find the origin cluster (id %u); it probably left the federation. Stop testing dependency for %pJ.",
6210 __func__, origin_id, job_ptr);
6211 list_delete_item(itr);
6212 continue;
6213 }
6214
6215 rc = test_job_dependency(job_ptr, &was_changed);
6216 if (rc == LOCAL_DEPEND) {
6217 if (was_changed) {
6218 if (slurmctld_conf.debug_flags &
6219 DEBUG_FLAG_DEPENDENCY)
6220 info("%s: %pJ has at least 1 local dependency left.",
6221 __func__, job_ptr);
6222 _update_origin_job_dep(job_ptr, origin);
6223 }
6224 } else if (rc == FAIL_DEPEND) {
6225 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
6226 info("%s: %pJ test_job_dependency() failed, dependency never satisfied.",
6227 __func__, job_ptr);
6228 _update_origin_job_dep(job_ptr, origin);
6229 list_delete_item(itr);
6230 } else { /* ((rc == REMOTE_DEPEND) || (rc == NO_DEPEND)) */
6231 if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
6232 info("%s: %pJ has no more dependencies left on this cluster.",
6233 __func__, job_ptr);
6234 _update_origin_job_dep(job_ptr, origin);
6235 list_delete_item(itr);
6236 }
6237 }
6238 list_iterator_destroy(itr);
6239 slurm_mutex_unlock(&dep_job_list_mutex);
6240 }
6241