1 /*****************************************************************************\
2  *  fed_mgr.c - functions for federations
3  *****************************************************************************
4  *  Copyright (C) 2016 SchedMD LLC.
5  *  Written by Brian Christiansen <brian@schedmd.com>
6  *
7  *  This file is part of Slurm, a resource management program.
8  *  For details, see <https://slurm.schedmd.com/>.
9  *  Please also read the included file: DISCLAIMER.
10  *
11  *  Slurm is free software; you can redistribute it and/or modify it under
12  *  the terms of the GNU General Public License as published by the Free
13  *  Software Foundation; either version 2 of the License, or (at your option)
14  *  any later version.
15  *
16  *  In addition, as a special exception, the copyright holders give permission
17  *  to link the code of portions of this program with the OpenSSL library under
18  *  certain conditions as described in each individual source file, and
19  *  distribute linked combinations including the two. You must obey the GNU
20  *  General Public License in all respects for all of the code used other than
21  *  OpenSSL. If you modify file(s) with this exception, you may extend this
22  *  exception to your version of the file(s), but you are not obligated to do
23  *  so. If you do not wish to do so, delete this exception statement from your
24  *  version.  If you delete this exception statement from all source files in
25  *  the program, then also delete it here.
26  *
27  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
28  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
29  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
30  *  details.
31  *
32  *  You should have received a copy of the GNU General Public License along
33  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
34  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
35 \*****************************************************************************/
36 
37 #include "config.h"
38 
39 #include <pthread.h>
40 #include <signal.h>
41 
42 #if HAVE_SYS_PRCTL_H
43 #  include <sys/prctl.h>
44 #endif
45 
46 #include "src/common/list.h"
47 #include "src/common/macros.h"
48 #include "src/common/parse_time.h"
49 #include "src/common/slurm_protocol_api.h"
50 #include "src/common/slurmdbd_defs.h"
51 #include "src/common/xmalloc.h"
52 #include "src/common/xstring.h"
53 #include "src/slurmctld/fed_mgr.h"
54 #include "src/slurmctld/job_scheduler.h"
55 #include "src/slurmctld/locks.h"
56 #include "src/slurmctld/proc_req.h"
57 #include "src/slurmctld/slurmctld.h"
58 #include "src/slurmctld/srun_comm.h"
59 #include "src/slurmctld/state_save.h"
60 #include "src/slurmdbd/read_config.h"
61 
62 #define FED_MGR_STATE_FILE       "fed_mgr_state"
63 #define FED_MGR_CLUSTER_ID_BEGIN 26
64 #define TEST_REMOTE_DEP_FREQ 30 /* seconds */
65 
66 #define FED_SIBLING_BIT(x) ((uint64_t)1 << (x - 1))
67 
68 slurmdb_federation_rec_t *fed_mgr_fed_rec     = NULL;
69 slurmdb_cluster_rec_t    *fed_mgr_cluster_rec = NULL;
70 
71 static pthread_cond_t  agent_cond = PTHREAD_COND_INITIALIZER;
72 static pthread_mutex_t agent_mutex = PTHREAD_MUTEX_INITIALIZER;
73 static pthread_t       agent_thread_id = (pthread_t) 0;
74 static int             agent_queue_size = 0;
75 
76 static pthread_cond_t  job_watch_cond = PTHREAD_COND_INITIALIZER;
77 static pthread_mutex_t job_watch_mutex = PTHREAD_MUTEX_INITIALIZER;
78 static bool            job_watch_thread_running = false;
79 static bool            stop_job_watch_thread = false;
80 
81 static bool inited = false;
82 static pthread_mutex_t open_send_mutex = PTHREAD_MUTEX_INITIALIZER;
83 static pthread_mutex_t init_mutex = PTHREAD_MUTEX_INITIALIZER;
84 static pthread_mutex_t update_mutex = PTHREAD_MUTEX_INITIALIZER;
85 
86 static List fed_job_list        = NULL;
87 static List fed_job_update_list = NULL;
88 static pthread_t       fed_job_update_thread_id = (pthread_t) 0;
89 static pthread_mutex_t fed_job_list_mutex = PTHREAD_MUTEX_INITIALIZER;
90 static pthread_cond_t  job_update_cond    = PTHREAD_COND_INITIALIZER;
91 static pthread_mutex_t job_update_mutex   = PTHREAD_MUTEX_INITIALIZER;
92 
93 static List remote_dep_recv_list = NULL;
94 static pthread_t remote_dep_thread_id = (pthread_t) 0;
95 static pthread_cond_t remote_dep_cond = PTHREAD_COND_INITIALIZER;
96 static pthread_mutex_t remote_dep_recv_mutex = PTHREAD_MUTEX_INITIALIZER;
97 
98 static List remote_dep_job_list = NULL;
99 static pthread_t dep_job_thread_id = (pthread_t) 0;
100 static pthread_mutex_t dep_job_list_mutex = PTHREAD_MUTEX_INITIALIZER;
101 
102 static List origin_dep_update_list = NULL;
103 static pthread_t origin_dep_thread_id = (pthread_t) 0;
104 static pthread_cond_t origin_dep_cond = PTHREAD_COND_INITIALIZER;
105 static pthread_mutex_t origin_dep_update_mutex = PTHREAD_MUTEX_INITIALIZER;
106 
107 typedef struct {
108 	Buf        buffer;
109 	uint32_t   job_id;
110 	time_t     last_try;
111 	int        last_defer;
112 	uint16_t   msg_type;
113 } agent_queue_t;
114 
115 enum fed_job_update_type {
116 	FED_JOB_NONE = 0,
117 	FED_JOB_CANCEL,
118 	FED_JOB_COMPLETE,
119 	FED_JOB_REMOVE_ACTIVE_SIB_BIT,
120 	FED_JOB_REQUEUE,
121 	FED_JOB_START,
122 	FED_JOB_SUBMIT_BATCH,
123 	FED_JOB_SUBMIT_INT,
124 	FED_JOB_SUBMIT_RESP,
125 	FED_JOB_SYNC,
126 	FED_JOB_UPDATE,
127 	FED_JOB_UPDATE_RESPONSE,
128 	FED_SEND_JOB_SYNC,
129 };
130 
131 typedef struct {
132 	uint32_t        cluster_lock;
133 	uint32_t        flags;
134 	uint32_t        job_id;
135 	job_info_msg_t *job_info_msg;
136 	uint32_t        job_state;
137 	job_step_kill_msg_t *kill_msg;
138 	bool            requeue;
139 	uint32_t        return_code;
140 	uint64_t        siblings_active;
141 	uint64_t        siblings_viable;
142 	char           *siblings_str;
143 	time_t          start_time;
144 	char           *submit_cluster;
145 	job_desc_msg_t *submit_desc;
146 	uint16_t        submit_proto_ver;
147 	uint32_t        type;
148 	uid_t           uid;
149 } fed_job_update_info_t;
150 
151 typedef struct {
152 	uint32_t        cluster_lock;
153 	uint32_t        job_id;
154 	uint64_t        siblings_active;
155 	uint64_t        siblings_viable;
156 	uint32_t        updating_sibs[MAX_FED_CLUSTERS + 1];
157 	time_t          updating_time[MAX_FED_CLUSTERS + 1];
158 } fed_job_info_t;
159 
160 /* Local Structs */
161 typedef struct {
162 	job_info_msg_t *job_info_msg;
163 	uint32_t        sibling_id;
164 	char           *sibling_name;
165 	time_t          sync_time;
166 } reconcile_sib_t;
167 
168 /* Local Prototypes */
169 static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id);
170 static uint64_t _get_all_sibling_bits();
171 static int _validate_cluster_features(char *spec_features,
172 				      uint64_t *cluster_bitmap);
173 static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap);
174 static void _leave_federation(void);
175 static int _q_send_job_sync(char *sib_name);
176 static slurmdb_federation_rec_t *_state_load(char *state_save_location);
177 static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg,
178 		      time_t sync_time);
179 static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid);
180 
_job_update_type_str(enum fed_job_update_type type)181 static char *_job_update_type_str(enum fed_job_update_type type)
182 {
183 	switch (type) {
184 	case FED_JOB_COMPLETE:
185 		return "FED_JOB_COMPLETE";
186 	case FED_JOB_CANCEL:
187 		return "FED_JOB_CANCEL";
188 	case FED_JOB_REMOVE_ACTIVE_SIB_BIT:
189 		return "FED_JOB_REMOVE_ACTIVE_SIB_BIT";
190 	case FED_JOB_REQUEUE:
191 		return "FED_JOB_REQUEUE";
192 	case FED_JOB_START:
193 		return "FED_JOB_START";
194 	case FED_JOB_SUBMIT_BATCH:
195 		return "FED_JOB_SUBMIT_BATCH";
196 	case FED_JOB_SUBMIT_INT:
197 		return "FED_JOB_SUBMIT_INT";
198 	case FED_JOB_SUBMIT_RESP:
199 		return "FED_JOB_SUBMIT_RESP";
200 	case FED_JOB_SYNC:
201 		return "FED_JOB_SYNC";
202 	case FED_JOB_UPDATE:
203 		return "FED_JOB_UPDATE";
204 	case FED_JOB_UPDATE_RESPONSE:
205 		return "FED_JOB_UPDATE_RESPONSE";
206 	case FED_SEND_JOB_SYNC:
207 		return "FED_SEND_JOB_SYNC";
208 	default:
209 		return "?";
210 	}
211 }
212 
_append_job_update(fed_job_update_info_t * job_update_info)213 static void _append_job_update(fed_job_update_info_t *job_update_info)
214 {
215 	list_append(fed_job_update_list, job_update_info);
216 
217 	slurm_mutex_lock(&job_update_mutex);
218 	slurm_cond_broadcast(&job_update_cond);
219 	slurm_mutex_unlock(&job_update_mutex);
220 }
221 
222 /* Return true if communication failure should be logged. Only log failures
223  * every 10 minutes to avoid filling logs */
_comm_fail_log(slurmdb_cluster_rec_t * cluster)224 static bool _comm_fail_log(slurmdb_cluster_rec_t *cluster)
225 {
226 	time_t now = time(NULL);
227 	time_t old = now - 600;	/* Log failures once every 10 mins */
228 
229 	if (cluster->comm_fail_time < old) {
230 		cluster->comm_fail_time = now;
231 		return true;
232 	}
233 	return false;
234 }
235 
_close_controller_conn(slurmdb_cluster_rec_t * cluster)236 static int _close_controller_conn(slurmdb_cluster_rec_t *cluster)
237 {
238 	int rc = SLURM_SUCCESS;
239 //	slurm_persist_conn_t *persist_conn = NULL;
240 
241 	xassert(cluster);
242 	slurm_mutex_lock(&cluster->lock);
243 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
244 		info("closing sibling conn to %s", cluster->name);
245 
246 	/* The recv free of this is handled directly in the persist_conn code,
247 	 * don't free it here */
248 //	slurm_persist_conn_destroy(cluster->fed.recv);
249 	cluster->fed.recv = NULL;
250 	slurm_persist_conn_destroy(cluster->fed.send);
251 	cluster->fed.send = NULL;
252 	xfree(cluster->control_host);
253 	cluster->control_port = 0;
254 
255 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
256 		info("closed sibling conn to %s", cluster->name);
257 	slurm_mutex_unlock(&cluster->lock);
258 
259 	return rc;
260 }
261 
262 /* Get list of jobs that originated from this cluster and the remote sibling and
263  * that are viable between the two siblings.
264  * Originating here: so that the remote can determine if the tracker job is gone
265  * Originating sib: so that the remote verify jobs are where they're supposed to
266  * be. If the sibling doesn't find a job, the sibling can resubmit the job or
267  * take other actions.
268  * Viable sib: because the origin might be down and the job was started or
269  * cancelled while the origin was down.
270  *
271  * Only get jobs that were submitted prior to sync_time
272  */
_get_sync_jobid_list(uint32_t sib_id,time_t sync_time)273 static List _get_sync_jobid_list(uint32_t sib_id, time_t sync_time)
274 {
275 	List jobids = NULL;
276 	ListIterator job_itr;
277 	job_record_t *job_ptr;
278 
279 	jobids = list_create(xfree_ptr);
280 
281 	/*
282 	 * Only look at jobs that:
283 	 * 1. originate from the remote sibling
284 	 * 2. originate from this cluster
285 	 * 3. if the sibling is in the job's viable list.
286 	 */
287 	job_itr = list_iterator_create(job_list);
288 	while ((job_ptr = list_next(job_itr))) {
289 		uint32_t cluster_id = fed_mgr_get_cluster_id(job_ptr->job_id);
290 		if (job_ptr->fed_details &&
291 		    (job_ptr->details &&
292 		     (job_ptr->details->submit_time < sync_time)) &&
293 		    ((cluster_id == sib_id) ||
294 		     (cluster_id == fed_mgr_cluster_rec->fed.id) ||
295 		     (job_ptr->fed_details->siblings_viable &
296 		      FED_SIBLING_BIT(sib_id)))) {
297 
298 		    uint32_t *tmp = xmalloc(sizeof(uint32_t));
299 		    *tmp = job_ptr->job_id;
300 		    list_append(jobids, tmp);
301 		}
302 	}
303 	list_iterator_destroy(job_itr);
304 
305 	return jobids;
306 }
307 
_open_controller_conn(slurmdb_cluster_rec_t * cluster,bool locked)308 static int _open_controller_conn(slurmdb_cluster_rec_t *cluster, bool locked)
309 {
310 	int rc;
311 	slurm_persist_conn_t *persist_conn = NULL;
312 	static int timeout = -1;
313 
314 	if (timeout < 0)
315 		timeout = slurm_get_msg_timeout() * 1000;
316 
317 	if (cluster == fed_mgr_cluster_rec) {
318 		info("%s: hey! how did we get here with ourselves?", __func__);
319 		return SLURM_ERROR;
320 	}
321 
322 	if (!locked)
323 		slurm_mutex_lock(&cluster->lock);
324 
325 	if (!cluster->control_host || !cluster->control_host[0] ||
326 	    !cluster->control_port) {
327 		if (cluster->fed.recv) {
328 			persist_conn = cluster->fed.recv;
329 			cluster->control_port = persist_conn->rem_port;
330 			xfree(cluster->control_host);
331 			cluster->control_host = xstrdup(persist_conn->rem_host);
332 		} else {
333 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
334 				info("%s: Sibling cluster %s doesn't appear to be up yet, skipping",
335 				     __func__, cluster->name);
336 			if (!locked)
337 				slurm_mutex_unlock(&cluster->lock);
338 			return SLURM_ERROR;
339 		}
340 	}
341 
342 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
343 		info("opening sibling conn to %s", cluster->name);
344 
345 	if (!cluster->fed.send) {
346 		persist_conn = xmalloc(sizeof(slurm_persist_conn_t));
347 
348 		cluster->fed.send = persist_conn;
349 
350 		/* Since this connection is coming from us, make it so ;) */
351 		persist_conn->cluster_name = xstrdup(slurmctld_conf.cluster_name);
352 		persist_conn->persist_type = PERSIST_TYPE_FED;
353 		persist_conn->my_port = slurmctld_conf.slurmctld_port;
354 		persist_conn->rem_host = xstrdup(cluster->control_host);
355 		persist_conn->rem_port = cluster->control_port;
356 		persist_conn->version  = cluster->rpc_version;
357 		persist_conn->shutdown = &slurmctld_config.shutdown_time;
358 		persist_conn->timeout = timeout; /* don't put this as 0 it
359 						  * could cause deadlock */
360 	} else {
361 		persist_conn = cluster->fed.send;
362 
363 		/* Perhaps a backup came up, so don't assume it was the same
364 		 * host or port we had before.
365 		 */
366 		xfree(persist_conn->rem_host);
367 		persist_conn->rem_host = xstrdup(cluster->control_host);
368 		persist_conn->rem_port = cluster->control_port;
369 	}
370 
371 	rc = slurm_persist_conn_open(persist_conn);
372 	if (rc != SLURM_SUCCESS) {
373 		if (_comm_fail_log(cluster)) {
374 			error("fed_mgr: Unable to open connection to cluster %s using host %s(%u)",
375 			      cluster->name,
376 			      persist_conn->rem_host, persist_conn->rem_port);
377 		}
378 	} else if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR) {
379 		info("opened sibling conn to %s:%d",
380 		     cluster->name, persist_conn->fd);
381 	}
382 
383 	if (!locked)
384 		slurm_mutex_unlock(&cluster->lock);
385 
386 	return rc;
387 }
388 
389 /* The cluster->lock should be locked before this is called */
_check_send(slurmdb_cluster_rec_t * cluster)390 static int _check_send(slurmdb_cluster_rec_t *cluster)
391 {
392 	slurm_persist_conn_t *send = cluster->fed.send;
393 
394 	if (!send || send->fd == -1) {
395 		return _open_controller_conn(cluster, true);
396 	}
397 
398 	return SLURM_SUCCESS;
399 }
400 
401 /* fed_mgr read lock needs to be set before coming in here,
402  * not the write lock */
_open_persist_sends(void)403 static void _open_persist_sends(void)
404 {
405 	ListIterator itr;
406 	slurmdb_cluster_rec_t *cluster = NULL;
407 	slurm_persist_conn_t *send = NULL;
408 
409 	if (!fed_mgr_fed_rec || ! fed_mgr_fed_rec->cluster_list)
410 		return;
411 
412 	/* This open_send_mutex will make this like a write lock since at the
413 	 * same time we are sending out these open requests the other slurmctlds
414 	 * will be replying and needing to get to the structures.  If we just
415 	 * used the fed_mgr write lock it would cause deadlock.
416 	 */
417 	slurm_mutex_lock(&open_send_mutex);
418 	itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
419 	while ((cluster = list_next(itr))) {
420 		if (cluster == fed_mgr_cluster_rec)
421 			continue;
422 
423 		send = cluster->fed.send;
424 		if (!send || send->fd == -1)
425 			_open_controller_conn(cluster, false);
426 	}
427 	list_iterator_destroy(itr);
428 	slurm_mutex_unlock(&open_send_mutex);
429 }
430 
_send_recv_msg(slurmdb_cluster_rec_t * cluster,slurm_msg_t * req,slurm_msg_t * resp,bool locked)431 static int _send_recv_msg(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req,
432 			  slurm_msg_t *resp, bool locked)
433 {
434 	int rc;
435 
436 	xassert(cluster);
437 	xassert(req);
438 	xassert(resp);
439 
440 	slurm_msg_t_init(resp);
441 
442 	if (!locked)
443 		slurm_mutex_lock(&cluster->lock);
444 
445 	rc = _check_send(cluster);
446 	if ((rc == SLURM_SUCCESS) && cluster->fed.send) {
447 		resp->conn = req->conn = cluster->fed.send;
448 		rc = slurm_send_recv_msg(req->conn->fd, req, resp, 0);
449 	}
450 	if (!locked)
451 		slurm_mutex_unlock(&cluster->lock);
452 
453 	return rc;
454 }
455 
456 /* Free Buf record from a list */
_ctld_free_list_msg(void * x)457 static void _ctld_free_list_msg(void *x)
458 {
459 	agent_queue_t *agent_queue_ptr = (agent_queue_t *) x;
460 	if (agent_queue_ptr) {
461 		FREE_NULL_BUFFER(agent_queue_ptr->buffer);
462 		xfree(agent_queue_ptr);
463 	}
464 }
465 
_queue_rpc(slurmdb_cluster_rec_t * cluster,slurm_msg_t * req,uint32_t job_id,bool locked)466 static int _queue_rpc(slurmdb_cluster_rec_t *cluster, slurm_msg_t *req,
467 		      uint32_t job_id, bool locked)
468 {
469 	agent_queue_t *agent_rec;
470 	Buf buf;
471 
472 	if (!cluster->send_rpc)
473 		cluster->send_rpc = list_create(_ctld_free_list_msg);
474 
475 	buf = init_buf(1024);
476 	pack16(req->msg_type, buf);
477 	if (pack_msg(req, buf) != SLURM_SUCCESS) {
478 		error("%s: failed to pack msg_type:%u",
479 		      __func__, req->msg_type);
480 		FREE_NULL_BUFFER(buf);
481 		return SLURM_ERROR;
482 	}
483 
484 	/* Queue the RPC and notify the agent of new work */
485 	agent_rec = xmalloc(sizeof(agent_queue_t));
486 	agent_rec->buffer = buf;
487 	agent_rec->job_id = job_id;
488 	agent_rec->msg_type = req->msg_type;
489 	list_append(cluster->send_rpc, agent_rec);
490 	slurm_mutex_lock(&agent_mutex);
491 	agent_queue_size++;
492 	slurm_cond_broadcast(&agent_cond);
493 	slurm_mutex_unlock(&agent_mutex);
494 
495 	return SLURM_SUCCESS;
496 }
497 
498 /*
499  * close all sibling conns
500  * must lock before entering.
501  */
_close_sibling_conns(void)502 static int _close_sibling_conns(void)
503 {
504 	ListIterator itr;
505 	slurmdb_cluster_rec_t *cluster;
506 
507 	if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
508 		goto fini;
509 
510 	itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
511 	while ((cluster = list_next(itr))) {
512 		if (cluster == fed_mgr_cluster_rec)
513 			continue;
514 		_close_controller_conn(cluster);
515 	}
516 	list_iterator_destroy(itr);
517 
518 fini:
519 	return SLURM_SUCCESS;
520 }
521 
_mark_self_as_drained(void)522 static void _mark_self_as_drained(void)
523 {
524 	List ret_list;
525 	slurmdb_cluster_cond_t cluster_cond;
526 	slurmdb_cluster_rec_t  cluster_rec;
527 
528 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
529 		info("%s: setting cluster fedstate to DRAINED", __func__);
530 
531 	slurmdb_init_cluster_cond(&cluster_cond, false);
532 	slurmdb_init_cluster_rec(&cluster_rec, false);
533 
534 	cluster_cond.cluster_list = list_create(NULL);
535 	list_append(cluster_cond.cluster_list, fed_mgr_cluster_rec->name);
536 
537 	cluster_rec.fed.state = CLUSTER_FED_STATE_INACTIVE |
538 		(fed_mgr_cluster_rec->fed.state & ~CLUSTER_FED_STATE_BASE);
539 
540 	ret_list = acct_storage_g_modify_clusters(acct_db_conn,
541 						  slurmctld_conf.slurm_user_id,
542 						  &cluster_cond, &cluster_rec);
543 	if (!ret_list || !list_count(ret_list)) {
544 		error("Failed to set cluster state to drained");
545 	}
546 
547 	FREE_NULL_LIST(cluster_cond.cluster_list);
548 	FREE_NULL_LIST(ret_list);
549 }
550 
_remove_self_from_federation(void)551 static void _remove_self_from_federation(void)
552 {
553 	List ret_list;
554 	slurmdb_federation_cond_t fed_cond;
555 	slurmdb_federation_rec_t  fed_rec;
556 	slurmdb_cluster_rec_t     cluster_rec;
557 
558 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
559 		info("%s: removing self from federation %s",
560 		     __func__, fed_mgr_fed_rec->name);
561 
562 	slurmdb_init_federation_cond(&fed_cond, false);
563 	slurmdb_init_federation_rec(&fed_rec, false);
564 	slurmdb_init_cluster_rec(&cluster_rec, false);
565 
566 	fed_cond.federation_list = list_create(NULL);
567 	list_append(fed_cond.federation_list, fed_mgr_fed_rec->name);
568 
569 	cluster_rec.name = xstrdup_printf("-%s", fed_mgr_cluster_rec->name);
570 	fed_rec.cluster_list = list_create(NULL);
571 	list_append(fed_rec.cluster_list, &cluster_rec);
572 
573 	ret_list = acct_storage_g_modify_federations(
574 					acct_db_conn,
575 					slurmctld_conf.slurm_user_id, &fed_cond,
576 					&fed_rec);
577 	if (!ret_list || !list_count(ret_list)) {
578 		error("Failed to remove federation from list");
579 	}
580 
581 	FREE_NULL_LIST(ret_list);
582 	FREE_NULL_LIST(fed_cond.federation_list);
583 	FREE_NULL_LIST(fed_rec.cluster_list);
584 	xfree(cluster_rec.name);
585 
586 	slurmctld_config.scheduling_disabled = false;
587 	slurmctld_config.submissions_disabled = false;
588 
589 	_leave_federation();
590 }
591 
_foreach_job_completed(void * object,void * arg)592 static int _foreach_job_completed(void *object, void *arg)
593 {
594 	job_record_t *job_ptr = (job_record_t *) object;
595 
596 	if (IS_JOB_COMPLETED(job_ptr))
597 		return SLURM_SUCCESS;
598 
599 	return SLURM_ERROR;
600 }
601 
_foreach_job_no_requeue(void * object,void * arg)602 static int _foreach_job_no_requeue(void *object, void *arg)
603 {
604 	job_record_t *job_ptr = (job_record_t *) object;
605 
606 	if (job_ptr->details)
607 		job_ptr->details->requeue = 0;
608 
609 	return SLURM_SUCCESS;
610 }
611 
_job_watch_thread(void * arg)612 static void *_job_watch_thread(void *arg)
613 {
614 	struct timespec ts = {0, 0};
615 	slurmctld_lock_t job_write_fed_write_lock = {
616 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
617 
618 #if HAVE_SYS_PRCTL_H
619 	if (prctl(PR_SET_NAME, "fed_jobw", NULL, NULL, NULL) < 0) {
620 		error("%s: cannot set my name to %s %m", __func__, "fed_jobw");
621 	}
622 #endif
623 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
624 		info("%s: started job_watch thread", __func__);
625 
626 	while (!slurmctld_config.shutdown_time && !stop_job_watch_thread) {
627 		int tot_jobs, comp_jobs;
628 
629 		slurm_mutex_lock(&job_watch_mutex);
630 		if (!slurmctld_config.shutdown_time && !stop_job_watch_thread) {
631 			ts.tv_sec  = time(NULL) + 5;
632 			slurm_cond_timedwait(&job_watch_cond,
633 					     &job_watch_mutex, &ts);
634 		}
635 		slurm_mutex_unlock(&job_watch_mutex);
636 
637 		if (slurmctld_config.shutdown_time || stop_job_watch_thread)
638 			break;
639 
640 		lock_slurmctld(job_write_fed_write_lock);
641 
642 		if (!fed_mgr_cluster_rec) {
643 			/* not part of the federation anymore */
644 			unlock_slurmctld(job_write_fed_write_lock);
645 			break;
646 		}
647 
648 		if ((tot_jobs = list_count(job_list)) !=
649 		    (comp_jobs = list_for_each(job_list, _foreach_job_completed,
650 					       NULL))) {
651 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR) {
652 				/* list_for_each negates the count if failed. */
653 				int remaining_jobs = tot_jobs + comp_jobs + 1;
654 				info("%s: at least %d remaining jobs before being drained and/or removed from the federation",
655 				     __func__, remaining_jobs);
656 			}
657 		} else {
658 			if (fed_mgr_cluster_rec->fed.state &
659 			    CLUSTER_FED_STATE_REMOVE) {
660 				/* prevent federated jobs from being requeued */
661 				list_for_each(job_list, _foreach_job_no_requeue,
662 					      NULL);
663 				_remove_self_from_federation();
664 			} else if (fed_mgr_cluster_rec->fed.state &
665 				 CLUSTER_FED_STATE_DRAIN) {
666 				_mark_self_as_drained();
667 			}
668 
669 			unlock_slurmctld(job_write_fed_write_lock);
670 
671 			break;
672 		}
673 
674 		unlock_slurmctld(job_write_fed_write_lock);
675 	}
676 
677 	job_watch_thread_running = false;
678 
679 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
680 		info("%s: exiting job watch thread", __func__);
681 
682 	return NULL;
683 }
684 
_spawn_job_watch_thread()685 static void _spawn_job_watch_thread()
686 {
687 	if (!job_watch_thread_running) {
688 		/* Detach the thread since it will exit once the cluster is
689 		 * drained or removed. */
690 		slurm_mutex_lock(&job_watch_mutex);
691 		stop_job_watch_thread = false;
692 		job_watch_thread_running = true;
693 		slurm_thread_create_detached(NULL, _job_watch_thread, NULL);
694 		slurm_mutex_unlock(&job_watch_mutex);
695 	} else {
696 		info("a job_watch_thread already exists");
697 	}
698 }
699 
_remove_job_watch_thread()700 static void _remove_job_watch_thread()
701 {
702 	if (job_watch_thread_running) {
703 		slurm_mutex_lock(&job_watch_mutex);
704 		stop_job_watch_thread = true;
705 		slurm_cond_broadcast(&job_watch_cond);
706 		slurm_mutex_unlock(&job_watch_mutex);
707 	}
708 }
709 
_clear_recv_conns(void * object,void * arg)710 static int _clear_recv_conns(void *object, void *arg)
711 {
712 	slurmdb_cluster_rec_t *cluster = (slurmdb_cluster_rec_t *)object;
713 	cluster->fed.recv = NULL;
714 
715 	return SLURM_SUCCESS;
716 }
717 
718 /*
719  * Must have FED unlocked prior to entering
720  */
_fed_mgr_ptr_init(slurmdb_federation_rec_t * db_fed,slurmdb_cluster_rec_t * cluster,uint64_t * added_clusters)721 static void _fed_mgr_ptr_init(slurmdb_federation_rec_t *db_fed,
722 			      slurmdb_cluster_rec_t *cluster,
723 			      uint64_t *added_clusters)
724 {
725 	ListIterator c_itr;
726 	slurmdb_cluster_rec_t *tmp_cluster, *db_cluster;
727 	uint32_t cluster_state;
728 	int  base_state;
729 	bool drain_flag;
730 
731 	slurmctld_lock_t fed_write_lock = {
732 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
733 
734 	xassert(cluster);
735 
736 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
737 		info("Joining federation %s", db_fed->name);
738 
739 	lock_slurmctld(fed_write_lock);
740 	if (fed_mgr_fed_rec) {
741 		/* we are already part of a federation, preserve existing
742 		 * conenctions */
743 		c_itr = list_iterator_create(db_fed->cluster_list);
744 		while ((db_cluster = list_next(c_itr))) {
745 			if (!xstrcmp(db_cluster->name,
746 				     slurmctld_conf.cluster_name)) {
747 				fed_mgr_cluster_rec = db_cluster;
748 				continue;
749 			}
750 			if (!(tmp_cluster =
751 			      fed_mgr_get_cluster_by_name(db_cluster->name))) {
752 				*added_clusters |=
753 					FED_SIBLING_BIT(db_cluster->fed.id);
754 				/* don't worry about destroying the connection
755 				 * here.  It will happen below when we free
756 				 * fed_mgr_fed_rec (automagically).
757 				 */
758 				continue;
759 			}
760 			slurm_mutex_lock(&tmp_cluster->lock);
761 			/* transfer over the connections we already have */
762 			db_cluster->fed.send = tmp_cluster->fed.send;
763 			tmp_cluster->fed.send = NULL;
764 			db_cluster->fed.recv = tmp_cluster->fed.recv;
765 			tmp_cluster->fed.recv = NULL;
766 			db_cluster->send_rpc = tmp_cluster->send_rpc;
767 			tmp_cluster->send_rpc = NULL;
768 			db_cluster->fed.sync_sent =
769 				tmp_cluster->fed.sync_sent;
770 			db_cluster->fed.sync_recvd =
771 				tmp_cluster->fed.sync_recvd;
772 			slurm_mutex_unlock(&tmp_cluster->lock);
773 
774 			list_delete_all(fed_mgr_fed_rec->cluster_list,
775 					slurmdb_find_cluster_in_list,
776 					db_cluster->name);
777 		}
778 		list_iterator_destroy(c_itr);
779 
780 		/* Remove any existing clusters that were part of the federation
781 		 * before and are not now. Don't free the recv connection now,
782 		 * it will get destroyed when the recv thread exits. */
783 		list_for_each(fed_mgr_fed_rec->cluster_list, _clear_recv_conns,
784 			      NULL);
785 		slurmdb_destroy_federation_rec(fed_mgr_fed_rec);
786 	} else
787 		fed_mgr_cluster_rec = cluster;
788 
789 	fed_mgr_fed_rec = db_fed;
790 
791 	/* Set scheduling and submissions states */
792 	cluster_state = fed_mgr_cluster_rec->fed.state;
793 	base_state  = (cluster_state & CLUSTER_FED_STATE_BASE);
794 	drain_flag  = (cluster_state & CLUSTER_FED_STATE_DRAIN);
795 
796 	unlock_slurmctld(fed_write_lock);
797 
798 	if (drain_flag) {
799 		slurmctld_config.scheduling_disabled = false;
800 		slurmctld_config.submissions_disabled = true;
801 
802 		/* INACTIVE + DRAIN == DRAINED (already) */
803 		if (base_state == CLUSTER_FED_STATE_ACTIVE)
804 			_spawn_job_watch_thread();
805 
806 	} else if (base_state == CLUSTER_FED_STATE_ACTIVE) {
807 		slurmctld_config.scheduling_disabled = false;
808 		slurmctld_config.submissions_disabled = false;
809 	} else if (base_state == CLUSTER_FED_STATE_INACTIVE) {
810 		slurmctld_config.scheduling_disabled = true;
811 		slurmctld_config.submissions_disabled = true;
812 	}
813 	if (!drain_flag)
814 		_remove_job_watch_thread();
815 }
816 
817 /*
818  * Must have FED write lock prior to entering
819  */
_leave_federation(void)820 static void _leave_federation(void)
821 {
822 	if (!fed_mgr_fed_rec)
823 		return;
824 
825 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
826 		info("Leaving federation %s", fed_mgr_fed_rec->name);
827 
828 	_close_sibling_conns();
829 	_remove_job_watch_thread();
830 	slurmdb_destroy_federation_rec(fed_mgr_fed_rec);
831 	fed_mgr_fed_rec = NULL;
832 	fed_mgr_cluster_rec = NULL;
833 }
834 
_persist_callback_fini(void * arg)835 static void _persist_callback_fini(void *arg)
836 {
837 	slurm_persist_conn_t *persist_conn = arg;
838 	slurmdb_cluster_rec_t *cluster;
839 	slurmctld_lock_t fed_write_lock = {
840 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
841 
842 	/* If we are shutting down just return or you will get deadlock since
843 	 * all these locks are already locked.
844 	 */
845 	if (!persist_conn || *persist_conn->shutdown)
846 		return;
847 	lock_slurmctld(fed_write_lock);
848 
849 	/* shuting down */
850 	if (!fed_mgr_fed_rec) {
851 		unlock_slurmctld(fed_write_lock);
852 		return;
853 	}
854 
855 	if (!(cluster =
856 	      fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) {
857 		info("Couldn't find cluster %s?",
858 		     persist_conn->cluster_name);
859 		unlock_slurmctld(fed_write_lock);
860 		return;
861 	}
862 
863 	slurm_mutex_lock(&cluster->lock);
864 
865 	/* This will get handled at the end of the thread, don't free it here */
866 	cluster->fed.recv = NULL;
867 //	persist_conn = cluster->fed.recv;
868 //	slurm_persist_conn_close(persist_conn);
869 
870 	persist_conn = cluster->fed.send;
871 	if (persist_conn) {
872 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
873 			info("Closing send to sibling cluster %s",
874 			     cluster->name);
875 		slurm_persist_conn_destroy(persist_conn);
876 		cluster->fed.send = NULL;
877 		xfree(cluster->control_host);
878 		cluster->control_port = 0;
879 	}
880 	cluster->fed.sync_recvd = false;
881 	cluster->fed.sync_sent  = false;
882 
883 	slurm_mutex_unlock(&cluster->lock);
884 	unlock_slurmctld(fed_write_lock);
885 }
886 
_join_federation(slurmdb_federation_rec_t * fed,slurmdb_cluster_rec_t * cluster,uint64_t * added_clusters)887 static void _join_federation(slurmdb_federation_rec_t *fed,
888 			     slurmdb_cluster_rec_t *cluster,
889 			     uint64_t *added_clusters)
890 {
891 	slurmctld_lock_t fed_read_lock = {
892 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
893 
894 	_fed_mgr_ptr_init(fed, cluster, added_clusters);
895 
896 	/* We must open the connections after we get out of the
897 	 * write_lock or we will end up in deadlock.
898 	 */
899 	lock_slurmctld(fed_read_lock);
900 	_open_persist_sends();
901 	unlock_slurmctld(fed_read_lock);
902 }
903 
_persist_update_job(slurmdb_cluster_rec_t * conn,uint32_t job_id,job_desc_msg_t * data,uid_t uid)904 static int _persist_update_job(slurmdb_cluster_rec_t *conn, uint32_t job_id,
905 			       job_desc_msg_t *data, uid_t uid)
906 {
907 	int rc;
908 	slurm_msg_t req_msg, tmp_msg;
909 	sib_msg_t   sib_msg;
910 	Buf buffer;
911 
912 	slurm_msg_t_init(&tmp_msg);
913 	tmp_msg.msg_type         = REQUEST_UPDATE_JOB;
914 	tmp_msg.data             = data;
915 	tmp_msg.protocol_version = conn->rpc_version;
916 
917 	buffer = init_buf(BUF_SIZE);
918 	pack_msg(&tmp_msg, buffer);
919 
920 	memset(&sib_msg, 0, sizeof(sib_msg));
921 	sib_msg.sib_msg_type = FED_JOB_UPDATE;
922 	sib_msg.data_buffer  = buffer;
923 	sib_msg.data_type    = tmp_msg.msg_type;
924 	sib_msg.data_version = tmp_msg.protocol_version;
925 	sib_msg.req_uid      = uid;
926 	sib_msg.job_id       = job_id;
927 
928 	slurm_msg_t_init(&req_msg);
929 	req_msg.msg_type         = REQUEST_SIB_MSG;
930 	req_msg.protocol_version = tmp_msg.protocol_version;
931 	req_msg.data             = &sib_msg;
932 
933 	rc = _queue_rpc(conn, &req_msg, 0, false);
934 
935 	free_buf(buffer);
936 
937 	return rc;
938 }
939 
_persist_update_job_resp(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t return_code)940 static int _persist_update_job_resp(slurmdb_cluster_rec_t *conn,
941 				    uint32_t job_id, uint32_t return_code)
942 {
943 	int rc;
944 	slurm_msg_t req_msg;
945 	sib_msg_t   sib_msg;
946 
947 	slurm_msg_t_init(&req_msg);
948 
949 	memset(&sib_msg, 0, sizeof(sib_msg));
950 	sib_msg.sib_msg_type = FED_JOB_UPDATE_RESPONSE;
951 	sib_msg.job_id       = job_id;
952 	sib_msg.return_code  = return_code;
953 
954 	slurm_msg_t_init(&req_msg);
955 	req_msg.msg_type         = REQUEST_SIB_MSG;
956 	req_msg.protocol_version = conn->rpc_version;
957 	req_msg.data             = &sib_msg;
958 
959 	rc = _queue_rpc(conn, &req_msg, job_id, false);
960 
961 	return rc;
962 }
963 
964 /*
965  * Remove a sibling job that won't be scheduled
966  *
967  * IN conn        - sibling connection
968  * IN job_id      - the job's id
969  * IN job_state   - state of job.
970  * IN return_code - return code of job.
971  * IN start_time  - time the fed job started
972  * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
973  */
_persist_fed_job_revoke(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t job_state,uint32_t return_code,time_t start_time)974 static int _persist_fed_job_revoke(slurmdb_cluster_rec_t *conn, uint32_t job_id,
975 				   uint32_t job_state, uint32_t return_code,
976 				   time_t start_time)
977 {
978 	int rc;
979 	slurm_msg_t req_msg;
980 	sib_msg_t   sib_msg;
981 
982 	if (!conn->fed.send ||
983 	    (((slurm_persist_conn_t *)conn->fed.send)->fd == -1))
984 		return SLURM_SUCCESS;
985 
986 	slurm_msg_t_init(&req_msg);
987 
988 	memset(&sib_msg, 0, sizeof(sib_msg));
989 	sib_msg.sib_msg_type = FED_JOB_COMPLETE;
990 	sib_msg.job_id       = job_id;
991 	sib_msg.job_state    = job_state;
992 	sib_msg.start_time   = start_time;
993 	sib_msg.return_code  = return_code;
994 
995 	slurm_msg_t_init(&req_msg);
996 	req_msg.msg_type         = REQUEST_SIB_MSG;
997 	req_msg.protocol_version = conn->rpc_version;
998 	req_msg.data             = &sib_msg;
999 
1000 	rc = _queue_rpc(conn, &req_msg, job_id, false);
1001 
1002 	return rc;
1003 }
1004 
_persist_fed_job_response(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t return_code)1005 static int _persist_fed_job_response(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1006 				     uint32_t return_code)
1007 {
1008 	int rc;
1009 	slurm_msg_t req_msg;
1010 	sib_msg_t   sib_msg;
1011 
1012 	slurm_msg_t_init(&req_msg);
1013 
1014 	memset(&sib_msg, 0, sizeof(sib_msg));
1015 	sib_msg.sib_msg_type = FED_JOB_SUBMIT_RESP;
1016 	sib_msg.job_id       = job_id;
1017 	sib_msg.return_code  = return_code;
1018 
1019 	slurm_msg_t_init(&req_msg);
1020 	req_msg.msg_type         = REQUEST_SIB_MSG;
1021 	req_msg.protocol_version = conn->rpc_version;
1022 	req_msg.data             = &sib_msg;
1023 
1024 	rc = _queue_rpc(conn, &req_msg, job_id, false);
1025 
1026 	return rc;
1027 }
1028 
1029 /*
1030  * Grab the fed lock on the sibling job.
1031  *
1032  * This message doesn't need to be queued because the other side just locks the
1033  * fed_job_list, checks and gets out -- doesn't need the internal locks.
1034  *
1035  * IN conn       - sibling connection
1036  * IN job_id     - the job's id
1037  * IN cluster_id - cluster id of the cluster locking
1038  * IN do_lock    - true == lock, false == unlock
1039  * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
1040  */
_persist_fed_job_lock_bool(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id,bool do_lock)1041 static int _persist_fed_job_lock_bool(slurmdb_cluster_rec_t *conn,
1042 				      uint32_t job_id, uint32_t cluster_id,
1043 				      bool do_lock)
1044 {
1045 	int rc;
1046 	slurm_msg_t req_msg, resp_msg;
1047 
1048 	slurm_msg_t_init(&req_msg);
1049 
1050 	sib_msg_t sib_msg;
1051 	memset(&sib_msg, 0, sizeof(sib_msg_t));
1052 	sib_msg.job_id     = job_id;
1053 	sib_msg.cluster_id = cluster_id;
1054 
1055 	if (do_lock)
1056 		req_msg.msg_type = REQUEST_SIB_JOB_LOCK;
1057 	else
1058 		req_msg.msg_type = REQUEST_SIB_JOB_UNLOCK;
1059 
1060 	req_msg.protocol_version = conn->rpc_version;
1061 	req_msg.data             = &sib_msg;
1062 
1063 	if (_send_recv_msg(conn, &req_msg, &resp_msg, false)) {
1064 		rc = SLURM_ERROR;
1065 		goto end_it;
1066 	}
1067 
1068 	switch (resp_msg.msg_type) {
1069 	case RESPONSE_SLURM_RC:
1070 		if ((rc = slurm_get_return_code(resp_msg.msg_type,
1071 						resp_msg.data))) {
1072 			slurm_seterrno(rc);
1073 			rc = SLURM_ERROR;
1074 		}
1075 		break;
1076 	default:
1077 		slurm_seterrno(SLURM_UNEXPECTED_MSG_ERROR);
1078 		rc = SLURM_ERROR;
1079 		break;
1080 	}
1081 
1082 end_it:
1083 	slurm_free_msg_members(&resp_msg);
1084 
1085 	return rc;
1086 }
1087 
_persist_fed_job_lock(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id)1088 static int _persist_fed_job_lock(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1089 				 uint32_t cluster_id)
1090 {
1091 	return _persist_fed_job_lock_bool(conn, job_id, cluster_id, true);
1092 }
1093 
_persist_fed_job_unlock(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id)1094 static int _persist_fed_job_unlock(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1095 				   uint32_t cluster_id)
1096 {
1097 	return _persist_fed_job_lock_bool(conn, job_id, cluster_id, false);
1098 }
1099 
1100 /*
1101  * Tell the origin cluster that the job was started
1102  *
1103  * This message is queued up as an rpc and a fed_job_update so that it can
1104  * cancel the siblings without holding up the internal locks.
1105  *
1106  * IN conn       - sibling connection
1107  * IN job_id     - the job's id
1108  * IN cluster_id - cluster id of the cluster that started the job
1109  * IN start_time - time the fed job started
1110  * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
1111  */
_persist_fed_job_start(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t cluster_id,time_t start_time)1112 static int _persist_fed_job_start(slurmdb_cluster_rec_t *conn,
1113 				  uint32_t job_id, uint32_t cluster_id,
1114 				  time_t start_time)
1115 {
1116 	int rc;
1117 	slurm_msg_t req_msg;
1118 
1119 	slurm_msg_t_init(&req_msg);
1120 
1121 	sib_msg_t sib_msg;
1122 	memset(&sib_msg, 0, sizeof(sib_msg_t));
1123 	sib_msg.sib_msg_type = FED_JOB_START;
1124 	sib_msg.job_id       = job_id;
1125 	sib_msg.cluster_id   = cluster_id;
1126 	sib_msg.start_time   = start_time;
1127 
1128 	req_msg.msg_type         = REQUEST_SIB_MSG;
1129 	req_msg.protocol_version = conn->rpc_version;
1130 	req_msg.data             = &sib_msg;
1131 
1132 	rc = _queue_rpc(conn, &req_msg, job_id, false);
1133 
1134 	return rc;
1135 }
1136 
1137 /*
1138  * Send the specified signal to all steps of an existing job
1139  * IN job_id - the job's id
1140  * IN signal - signal number
1141  * IN flags  - see KILL_JOB_* flags above
1142  * IN uid    - uid of user making the request.
1143  * RET SLURM_SUCCESS on success, SLURM_ERROR otherwise.
1144  */
_persist_fed_job_cancel(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint16_t signal,uint16_t flags,uid_t uid)1145 static int _persist_fed_job_cancel(slurmdb_cluster_rec_t *conn, uint32_t job_id,
1146 				   uint16_t signal, uint16_t flags,
1147 				   uid_t uid)
1148 {
1149 	int rc = SLURM_SUCCESS;
1150 	slurm_msg_t req_msg, tmp_msg;
1151 	sib_msg_t   sib_msg;
1152 	job_step_kill_msg_t kill_req;
1153 	Buf buffer;
1154 
1155 	/* Build and pack a kill_req msg to put in a sib_msg */
1156 	memset(&kill_req, 0, sizeof(job_step_kill_msg_t));
1157 	kill_req.job_id      = job_id;
1158 	kill_req.sjob_id     = NULL;
1159 	kill_req.job_step_id = SLURM_BATCH_SCRIPT;
1160 	kill_req.signal      = signal;
1161 	kill_req.flags       = flags;
1162 
1163 	slurm_msg_t_init(&tmp_msg);
1164 	tmp_msg.msg_type         = REQUEST_CANCEL_JOB_STEP;
1165 	tmp_msg.data             = &kill_req;
1166 	tmp_msg.protocol_version = conn->rpc_version;
1167 
1168 	buffer = init_buf(BUF_SIZE);
1169 	pack_msg(&tmp_msg, buffer);
1170 
1171 	memset(&sib_msg, 0, sizeof(sib_msg));
1172 	sib_msg.sib_msg_type = FED_JOB_CANCEL;
1173 	sib_msg.data_buffer  = buffer;
1174 	sib_msg.data_type    = tmp_msg.msg_type;
1175 	sib_msg.data_version = tmp_msg.protocol_version;
1176 	sib_msg.req_uid      = uid;
1177 
1178 	slurm_msg_t_init(&req_msg);
1179 	req_msg.msg_type         = REQUEST_SIB_MSG;
1180 	req_msg.protocol_version = tmp_msg.protocol_version;
1181 	req_msg.data             = &sib_msg;
1182 
1183 	rc = _queue_rpc(conn, &req_msg, job_id, false);
1184 
1185 	free_buf(buffer);
1186 
1187 	return rc;
1188 }
1189 
1190 /*
1191  * Tell the origin cluster to requeue the job
1192  *
1193  * IN conn       - sibling connection
1194  * IN job_id     - the job's id
1195  * IN start_time - time the fed job started
1196  * RET SLURM_SUCCESS on success, otherwise return SLURM_ERROR with errno set
1197  */
_persist_fed_job_requeue(slurmdb_cluster_rec_t * conn,uint32_t job_id,uint32_t flags)1198 static int _persist_fed_job_requeue(slurmdb_cluster_rec_t *conn,
1199 				    uint32_t job_id, uint32_t flags)
1200 {
1201 	int rc;
1202 	requeue_msg_t requeue_req;
1203 	slurm_msg_t   req_msg, tmp_msg;
1204 	sib_msg_t     sib_msg;
1205 	Buf buffer;
1206 
1207 	xassert(conn);
1208 
1209 	requeue_req.job_id     = job_id;
1210 	requeue_req.job_id_str = NULL;
1211 	requeue_req.flags      = flags;
1212 
1213 	slurm_msg_t_init(&tmp_msg);
1214 	tmp_msg.msg_type         = REQUEST_JOB_REQUEUE;
1215 	tmp_msg.data             = &requeue_req;
1216 	tmp_msg.protocol_version = conn->rpc_version;
1217 
1218 	buffer = init_buf(BUF_SIZE);
1219 	pack_msg(&tmp_msg, buffer);
1220 
1221 	memset(&sib_msg, 0, sizeof(sib_msg));
1222 	sib_msg.sib_msg_type = FED_JOB_REQUEUE;
1223 	sib_msg.job_id       = job_id;
1224 	sib_msg.data_buffer  = buffer;
1225 	sib_msg.data_type    = tmp_msg.msg_type;
1226 	sib_msg.data_version = tmp_msg.protocol_version;
1227 
1228 	slurm_msg_t_init(&req_msg);
1229 	req_msg.msg_type         = REQUEST_SIB_MSG;
1230 	req_msg.protocol_version = tmp_msg.protocol_version;
1231 	req_msg.data             = &sib_msg;
1232 
1233 	rc = _queue_rpc(conn, &req_msg, job_id, false);
1234 
1235 	free_buf(buffer);
1236 
1237 	return rc;
1238 }
1239 
_find_sibling_by_id(void * x,void * key)1240 static int _find_sibling_by_id(void *x, void *key)
1241 {
1242 	slurmdb_cluster_rec_t *object = (slurmdb_cluster_rec_t *)x;
1243 	uint32_t id = *(uint32_t *)key;
1244 
1245 	if (object->fed.id == id)
1246 		return 1;
1247 
1248 	return 0;
1249 }
1250 
add_fed_job_info(job_record_t * job_ptr)1251 extern void add_fed_job_info(job_record_t *job_ptr)
1252 {
1253 	fed_job_info_t *job_info;
1254 
1255 	job_info = xmalloc(sizeof(fed_job_info_t));
1256 	job_info->job_id          = job_ptr->job_id;
1257 	job_info->siblings_active = job_ptr->fed_details->siblings_active;
1258 	job_info->siblings_viable = job_ptr->fed_details->siblings_viable;
1259 
1260 	slurm_mutex_lock(&fed_job_list_mutex);
1261 	if (fed_job_list)
1262 		list_append(fed_job_list, job_info);
1263 	else
1264 		xfree(job_info);
1265 	slurm_mutex_unlock(&fed_job_list_mutex);
1266 }
1267 
_delete_fed_job_info_by_id(void * object,void * arg)1268 static int _delete_fed_job_info_by_id(void *object, void *arg)
1269 {
1270 	fed_job_info_t *job_info = (fed_job_info_t *)object;
1271 	uint32_t job_id          = *(uint32_t *)arg;
1272 
1273 	if (job_info->job_id == job_id)
1274 		return true;
1275 
1276 	return false;
1277 }
1278 
fed_mgr_remove_fed_job_info(uint32_t job_id)1279 extern void fed_mgr_remove_fed_job_info(uint32_t job_id)
1280 {
1281 	slurm_mutex_lock(&fed_job_list_mutex);
1282 
1283 	if (fed_job_list)
1284 		list_delete_all(fed_job_list, _delete_fed_job_info_by_id,
1285 				&job_id);
1286 
1287 	slurm_mutex_unlock(&fed_job_list_mutex);
1288 }
1289 
_list_find_fed_job_info_by_jobid(void * x,void * key)1290 static int _list_find_fed_job_info_by_jobid(void *x, void *key)
1291 {
1292 	fed_job_info_t *job_info = (fed_job_info_t *)x;
1293 	uint32_t        job_id   = *(uint32_t *)key;
1294 
1295 	if (job_info->job_id == job_id)
1296 		return 1;
1297 
1298 	return 0;
1299 }
1300 
1301 /* Must have fed_job_mutex before entering */
_find_fed_job_info(uint32_t job_id)1302 static fed_job_info_t *_find_fed_job_info(uint32_t job_id)
1303 {
1304 	if (!fed_job_list)
1305 		return NULL;
1306 	return list_find_first(fed_job_list, _list_find_fed_job_info_by_jobid,
1307 			       &job_id);
1308 }
1309 
_destroy_fed_job_update_info(void * object)1310 static void _destroy_fed_job_update_info(void *object)
1311 {
1312 	fed_job_update_info_t *job_update_info =
1313 		(fed_job_update_info_t *)object;
1314 
1315 	if (job_update_info) {
1316 		xfree(job_update_info->siblings_str);
1317 		xfree(job_update_info->submit_cluster);
1318 		slurm_free_job_info_msg(job_update_info->job_info_msg);
1319 		slurm_free_job_step_kill_msg(job_update_info->kill_msg);
1320 		slurm_free_job_desc_msg(job_update_info->submit_desc);
1321 		xfree(job_update_info);
1322 	}
1323 }
1324 
_destroy_dep_msg(void * object)1325 static void _destroy_dep_msg(void *object)
1326 {
1327 	slurm_free_dep_msg((dep_msg_t *)object);
1328 }
1329 
_destroy_dep_update_msg(void * object)1330 static void _destroy_dep_update_msg(void *object)
1331 {
1332 	slurm_free_dep_update_origin_msg((dep_update_origin_msg_t *)object);
1333 }
1334 
_destroy_dep_job(void * object)1335 static void _destroy_dep_job(void *object)
1336 {
1337 	job_record_t *job_ptr = (job_record_t *)object;
1338 
1339 	if (job_ptr) {
1340 		xassert(job_ptr->magic == JOB_MAGIC);
1341 		xfree(job_ptr->fed_details);
1342 		xfree(job_ptr->name);
1343 		if (job_ptr->details) {
1344 			xassert(job_ptr->details->magic == DETAILS_MAGIC);
1345 			xfree(job_ptr->details->dependency);
1346 			FREE_NULL_LIST(job_ptr->details->depend_list);
1347 			xfree(job_ptr->details);
1348 		}
1349 		free_null_array_recs(job_ptr);
1350 		job_ptr->magic = 0;
1351 		job_ptr->job_id = 0;
1352 		job_ptr->user_id = 0;
1353 		xfree(job_ptr);
1354 	}
1355 }
1356 
fed_mgr_get_cluster_by_id(uint32_t id)1357 extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_id(uint32_t id)
1358 {
1359 	uint32_t key = id;
1360 	return list_find_first(fed_mgr_fed_rec->cluster_list,
1361 			       _find_sibling_by_id, &key);
1362 }
1363 
fed_mgr_get_cluster_by_name(char * sib_name)1364 extern slurmdb_cluster_rec_t *fed_mgr_get_cluster_by_name(char *sib_name)
1365 {
1366 	if (!fed_mgr_fed_rec)
1367 		return NULL;
1368 
1369 	return list_find_first(fed_mgr_fed_rec->cluster_list,
1370 			       slurmdb_find_cluster_in_list, sib_name);
1371 }
1372 
1373 /*
1374  * Revoke all sibling jobs except from cluster_id -- which the request came from
1375  *
1376  * IN job_ptr     - job to revoke
1377  * IN cluster_id  - cluster id of cluster that initiated call. Don're revoke
1378  * 	the job on this cluster -- it's the one that started the fed job.
1379  * IN revoke_sibs - bitmap of siblings to revoke.
1380  * IN start_time  - time the fed job started
1381  */
_revoke_sibling_jobs(uint32_t job_id,uint32_t cluster_id,uint64_t revoke_sibs,time_t start_time)1382 static void _revoke_sibling_jobs(uint32_t job_id, uint32_t cluster_id,
1383 				 uint64_t revoke_sibs, time_t start_time)
1384 {
1385 	int id = 1;
1386 
1387 	if (!fed_mgr_fed_rec) /* Not part of federation anymore */
1388 		return;
1389 
1390 	while (revoke_sibs) {
1391 		if ((revoke_sibs & 1) &&
1392 		    (id != fed_mgr_cluster_rec->fed.id) &&
1393 		    (id != cluster_id)) {
1394 			slurmdb_cluster_rec_t *cluster =
1395 				fed_mgr_get_cluster_by_id(id);
1396 			if (!cluster) {
1397 				error("couldn't find cluster rec by id %d", id);
1398 				goto next_job;
1399 			}
1400 
1401 			_persist_fed_job_revoke(cluster, job_id, JOB_CANCELLED,
1402 						0, start_time);
1403 		}
1404 
1405 next_job:
1406 		revoke_sibs >>= 1;
1407 		id++;
1408 	}
1409 }
1410 
_remove_sibling_bit(job_record_t * job_ptr,slurmdb_cluster_rec_t * sibling)1411 static int _remove_sibling_bit(job_record_t *job_ptr,
1412 			       slurmdb_cluster_rec_t *sibling)
1413 {
1414 	uint32_t origin_id;
1415 
1416 	if (!_is_fed_job(job_ptr, &origin_id))
1417 		return ESLURM_JOB_NOT_FEDERATED;
1418 
1419 	job_ptr->fed_details->siblings_active &=
1420 		~(FED_SIBLING_BIT(sibling->fed.id));
1421 	job_ptr->fed_details->siblings_viable &=
1422 		~(FED_SIBLING_BIT(sibling->fed.id));
1423 
1424 	if (!(job_ptr->fed_details->siblings_viable &
1425 	      FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
1426 		job_ptr->job_state |= JOB_REVOKED;
1427 	else if (!job_ptr->fed_details->cluster_lock)
1428 		job_ptr->job_state &= ~JOB_REVOKED;
1429 
1430 	update_job_fed_details(job_ptr);
1431 
1432 	return SLURM_SUCCESS;
1433 }
1434 
1435 /*
1436  * Remove all pending federated jobs from the origin cluster.
1437  */
_cleanup_removed_origin_jobs(void)1438 static void _cleanup_removed_origin_jobs(void)
1439 {
1440 	ListIterator job_itr;
1441 	job_record_t *job_ptr;
1442 	time_t now = time(NULL);
1443 	uint32_t origin_id, sibling_id;
1444 	uint64_t sibling_bit;
1445 
1446 	if (!fed_mgr_cluster_rec)
1447 		return;
1448 
1449 	sibling_id  = fed_mgr_cluster_rec->fed.id;
1450 	sibling_bit = FED_SIBLING_BIT(sibling_id);
1451 
1452 	job_itr = list_iterator_create(job_list);
1453 	while ((job_ptr = list_next(job_itr))) {
1454 		bool running_remotely = false;
1455 		uint64_t siblings_viable;
1456 
1457 		if (IS_JOB_COMPLETED(job_ptr))
1458 			continue;
1459 
1460 		if (!_is_fed_job(job_ptr, &origin_id))
1461 			continue;
1462 
1463 		siblings_viable = job_ptr->fed_details->siblings_viable;
1464 
1465 		if (sibling_id == origin_id &&
1466 		    job_ptr->fed_details->cluster_lock)
1467 			running_remotely = true;
1468 
1469 		/* free fed_job_details so it can't call home. */
1470 		free_job_fed_details(&job_ptr->fed_details);
1471 
1472 		/* allow running/completing jobs to finish. */
1473 		if (IS_JOB_COMPLETED(job_ptr) ||
1474 		    IS_JOB_COMPLETING(job_ptr) ||
1475 		    IS_JOB_RUNNING(job_ptr))
1476 			continue;
1477 
1478 		/* If this job is the only viable sibling then just let
1479 		 * it run as a non-federated job. The origin should remove the
1480 		 * tracking job. */
1481 		if (!(siblings_viable & ~sibling_bit))
1482 			continue;
1483 
1484 		/*
1485 		 * If a job is pending and not revoked on the origin cluster
1486 		 * leave it as a non-federated job.
1487 		 */
1488 		if ((origin_id == sibling_id) &&
1489 		    IS_JOB_PENDING(job_ptr) && !IS_JOB_REVOKED(job_ptr))
1490 			continue;
1491 
1492 		/* Free the resp_host so that the srun doesn't get
1493 		 * signaled about the job going away. The job could
1494 		 * still run on another sibling. */
1495 		if (running_remotely ||
1496 		    (origin_id != sibling_id))
1497 			xfree(job_ptr->resp_host);
1498 
1499 		job_ptr->job_state  = JOB_CANCELLED|JOB_REVOKED;
1500 		job_ptr->start_time = now;
1501 		job_ptr->end_time   = now;
1502 		job_completion_logger(job_ptr, false);
1503 	}
1504 	list_iterator_destroy(job_itr);
1505 
1506 	/* Don't test these jobs for remote dependencies anymore */
1507 	if (remote_dep_job_list) {
1508 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
1509 			info("%s: Remove all jobs in remote_dep_job_list",
1510 			     __func__);
1511 		slurm_mutex_lock(&dep_job_list_mutex);
1512 		list_flush(remote_dep_job_list);
1513 		slurm_mutex_unlock(&dep_job_list_mutex);
1514 	}
1515 }
1516 
1517 /*
1518  * Remove all pending jobs that originated from the given cluster.
1519  */
_cleanup_removed_cluster_jobs(slurmdb_cluster_rec_t * cluster)1520 static void _cleanup_removed_cluster_jobs(slurmdb_cluster_rec_t *cluster)
1521 {
1522 	ListIterator job_itr;
1523 	job_record_t *job_ptr;
1524 	time_t now = time(NULL);
1525 	uint32_t origin_id, sibling_id;
1526 	uint64_t origin_bit, sibling_bit;
1527 
1528 	if (!fed_mgr_cluster_rec)
1529 		return;
1530 
1531 	sibling_id  = fed_mgr_cluster_rec->fed.id;
1532 	sibling_bit = FED_SIBLING_BIT(sibling_id);
1533 
1534 	job_itr = list_iterator_create(job_list);
1535 	while ((job_ptr = list_next(job_itr))) {
1536 		uint64_t siblings_viable;
1537 
1538 		if (IS_JOB_COMPLETED(job_ptr))
1539 			continue;
1540 
1541 		if (!_is_fed_job(job_ptr, &origin_id))
1542 			continue;
1543 
1544 		origin_bit = FED_SIBLING_BIT(origin_id);
1545 		siblings_viable = job_ptr->fed_details->siblings_viable;
1546 
1547 		/* Remove cluster from viable,active siblings */
1548 		_remove_sibling_bit(job_ptr, cluster);
1549 
1550 		/* Remove jobs that
1551 		 * 1. originated from the removed cluster
1552 		 * 2. origin jobs that are tracking the running job
1553 		 * 2. origin jobs that are tracking the pending job */
1554 		if (origin_id == cluster->fed.id ||
1555 
1556 		    (job_ptr->fed_details &&
1557 		     origin_id == sibling_id &&
1558 		     job_ptr->fed_details->cluster_lock == cluster->fed.id) ||
1559 
1560 		    (siblings_viable & FED_SIBLING_BIT(cluster->fed.id) &&
1561 		     !(siblings_viable & ~FED_SIBLING_BIT(cluster->fed.id)))) {
1562 
1563 			/*
1564 			 * If this job originated from the origin (which is
1565 			 * being removed) and the origin is not a viable sibling
1566 			 * and there are more than one sibling then let the job
1567 			 * remain as a federated job to be scheduled amongst
1568 			 * it's siblings.
1569 			 */
1570 			if (IS_JOB_PENDING(job_ptr) &&
1571 			    (origin_id == cluster->fed.id) &&
1572 			    !(siblings_viable & origin_bit) &&
1573 			    (siblings_viable & ~sibling_bit))
1574 				continue;
1575 
1576 			/* free fed_job_details so it can't call home. */
1577 			free_job_fed_details(&job_ptr->fed_details);
1578 
1579 			/*
1580 			 * If this job originated from the origin (which is
1581 			 * being removed) and this sibling is the only viable
1582 			 * sibling then let it run/pend as a non-federated job.
1583 			 */
1584 			if ((origin_id == cluster->fed.id) &&
1585 			    !(siblings_viable & ~sibling_bit))
1586 				continue;
1587 
1588 			if (!(IS_JOB_COMPLETED(job_ptr) ||
1589 			      IS_JOB_COMPLETING(job_ptr) ||
1590 			      IS_JOB_RUNNING(job_ptr))) {
1591 
1592 				/* Free the resp_host so that the srun doesn't
1593 				 * get signaled about the job going away. The
1594 				 * job could still run on another sibling. */
1595 				xfree(job_ptr->resp_host);
1596 
1597 				job_ptr->job_state  = JOB_CANCELLED|JOB_REVOKED;
1598 				job_ptr->start_time = now;
1599 				job_ptr->end_time   = now;
1600 				job_ptr->state_reason = WAIT_NO_REASON;
1601 				xfree(job_ptr->state_desc);
1602 				job_completion_logger(job_ptr, false);
1603 			}
1604 		}
1605 	}
1606 	list_iterator_destroy(job_itr);
1607 }
1608 
_handle_removed_clusters(slurmdb_federation_rec_t * db_fed,uint64_t * removed_clusters)1609 static void _handle_removed_clusters(slurmdb_federation_rec_t *db_fed,
1610 				     uint64_t *removed_clusters)
1611 {
1612 	ListIterator itr;
1613 	slurmdb_cluster_rec_t *tmp_cluster = NULL;
1614 
1615 	itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
1616 	while ((tmp_cluster = list_next(itr))) {
1617 		if (tmp_cluster->name &&
1618 		    !(list_find_first(db_fed->cluster_list,
1619 				      slurmdb_find_cluster_in_list,
1620 				      tmp_cluster->name))) {
1621 			info("cluster %s was removed from the federation",
1622 			     tmp_cluster->name);
1623 			*removed_clusters |=
1624 				FED_SIBLING_BIT(tmp_cluster->fed.id);
1625 			_cleanup_removed_cluster_jobs(tmp_cluster);
1626 		}
1627 	}
1628 	list_iterator_destroy(itr);
1629 }
1630 
1631 /* Parse a RESPONSE_CTLD_MULT_MSG message and return a bit set for every
1632  * successful operation */
_parse_resp_ctld_mult(slurm_msg_t * resp_msg)1633 bitstr_t *_parse_resp_ctld_mult(slurm_msg_t *resp_msg)
1634 {
1635 	ctld_list_msg_t *ctld_resp_msg;
1636 	ListIterator iter = NULL;
1637 	bitstr_t *success_bits;
1638 	slurm_msg_t sub_msg;
1639 	return_code_msg_t *rc_msg;
1640 	Buf single_resp_buf = NULL;
1641 	int resp_cnt, resp_inx = -1;
1642 
1643 	xassert(resp_msg->msg_type == RESPONSE_CTLD_MULT_MSG);
1644 
1645 	ctld_resp_msg = (ctld_list_msg_t *) resp_msg->data;
1646 	if (!ctld_resp_msg->my_list) {
1647 		error("%s: RESPONSE_CTLD_MULT_MSG has no list component",
1648 		      __func__);
1649 		return NULL;
1650 	}
1651 
1652 	resp_cnt = list_count(ctld_resp_msg->my_list);
1653 	success_bits = bit_alloc(resp_cnt);
1654 	iter = list_iterator_create(ctld_resp_msg->my_list);
1655 	while ((single_resp_buf = list_next(iter))) {
1656 		resp_inx++;
1657 		slurm_msg_t_init(&sub_msg);
1658 		if (unpack16(&sub_msg.msg_type, single_resp_buf) ||
1659 		    unpack_msg(&sub_msg, single_resp_buf)) {
1660 			error("%s: Sub-message unpack error for Message Type:%s",
1661 			      __func__, rpc_num2string(sub_msg.msg_type));
1662 			continue;
1663 		}
1664 
1665 		if (sub_msg.msg_type != RESPONSE_SLURM_RC) {
1666 			error("%s: Unexpected Message Type:%s",
1667 			      __func__, rpc_num2string(sub_msg.msg_type));
1668 		} else {
1669 			rc_msg = (return_code_msg_t *) sub_msg.data;
1670 			if (rc_msg->return_code == SLURM_SUCCESS)
1671 				bit_set(success_bits, resp_inx);
1672 		}
1673 		(void) slurm_free_msg_data(sub_msg.msg_type, sub_msg.data);
1674 
1675 	}
1676 
1677 	return success_bits;
1678 }
1679 
_fed_mgr_job_allocate_sib(char * sib_name,job_desc_msg_t * job_desc,bool interactive_job)1680 static int _fed_mgr_job_allocate_sib(char *sib_name, job_desc_msg_t *job_desc,
1681 				     bool interactive_job)
1682 {
1683 	int error_code = SLURM_SUCCESS;
1684 	job_record_t *job_ptr = NULL;
1685 	char *err_msg = NULL;
1686 	bool reject_job = false;
1687 	slurmdb_cluster_rec_t *sibling;
1688 	uid_t uid = 0;
1689 	slurm_msg_t response_msg;
1690 	slurm_msg_t_init(&response_msg);
1691 
1692 	xassert(sib_name);
1693 	xassert(job_desc);
1694 
1695 	if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) {
1696 		error_code = ESLURM_INVALID_CLUSTER_NAME;
1697 		error("Invalid sibling name");
1698 	} else if ((job_desc->alloc_node == NULL) ||
1699 	    (job_desc->alloc_node[0] == '\0')) {
1700 		error_code = ESLURM_INVALID_NODE_NAME;
1701 		error("REQUEST_SUBMIT_BATCH_JOB lacks alloc_node");
1702 	}
1703 
1704 	if (error_code == SLURM_SUCCESS)
1705 		error_code = validate_job_create_req(job_desc,uid,&err_msg);
1706 
1707 	if (error_code) {
1708 		reject_job = true;
1709 		goto send_msg;
1710 	}
1711 
1712 	/* Create new job allocation */
1713 	job_desc->het_job_offset = NO_VAL;
1714 	error_code = job_allocate(job_desc, job_desc->immediate, false, NULL,
1715 				  interactive_job, uid, &job_ptr, &err_msg,
1716 				  sibling->rpc_version);
1717 	if (!job_ptr ||
1718 	    (error_code && job_ptr->job_state == JOB_FAILED))
1719 		reject_job = true;
1720 
1721 	if (job_desc->immediate &&
1722 	    (error_code != SLURM_SUCCESS))
1723 		error_code = ESLURM_CAN_NOT_START_IMMEDIATELY;
1724 
1725 send_msg:
1726 	/* Send response back about origin jobid if an error occured. */
1727 	if (reject_job)
1728 		_persist_fed_job_response(sibling, job_desc->job_id, error_code);
1729 	else {
1730 		if (!(job_ptr->fed_details->siblings_viable &
1731 		      FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
1732 			job_ptr->job_state |= JOB_REVOKED;
1733 
1734 		add_fed_job_info(job_ptr);
1735 		schedule_job_save();	/* Has own locks */
1736 		schedule_node_save();	/* Has own locks */
1737 		queue_job_scheduler();
1738 	}
1739 
1740 	xfree(err_msg);
1741 
1742 	return SLURM_SUCCESS;
1743 }
1744 
_do_fed_job_complete(job_record_t * job_ptr,uint32_t job_state,uint32_t exit_code,time_t start_time)1745 static void _do_fed_job_complete(job_record_t *job_ptr, uint32_t job_state,
1746 				 uint32_t exit_code, time_t start_time)
1747 {
1748 	if (job_ptr->job_state & JOB_REQUEUE_FED) {
1749 		/* Remove JOB_REQUEUE_FED and JOB_COMPLETING once
1750 		 * sibling reports that sibling job is done. Leave other
1751 		 * state in place. JOB_SPECIAL_EXIT may be in the
1752 		 * states. */
1753 		job_ptr->job_state &= ~(JOB_PENDING | JOB_COMPLETING);
1754 		batch_requeue_fini(job_ptr);
1755 	} else {
1756 		fed_mgr_job_revoke(job_ptr, true, job_state, exit_code,
1757 				   start_time);
1758 	}
1759 }
1760 
_handle_fed_job_complete(fed_job_update_info_t * job_update_info)1761 static void _handle_fed_job_complete(fed_job_update_info_t *job_update_info)
1762 {
1763 	job_record_t *job_ptr;
1764 
1765 	slurmctld_lock_t job_write_lock = {
1766 		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
1767 
1768 	lock_slurmctld(job_write_lock);
1769 	if (!(job_ptr = find_job_record(job_update_info->job_id))) {
1770 		error("%s: failed to find job_record for fed JobId=%u",
1771 		      __func__, job_update_info->job_id);
1772 	} else if (!job_ptr->fed_details) {
1773 		debug2("%s: %pJ not federated anymore", __func__, job_ptr);
1774 	} else if (IS_JOB_RUNNING(job_ptr)) {
1775 		/*
1776 		 * The job could have started between the time that the origin
1777 		 * sent the complete message and now.
1778 		 */
1779 		slurm_msg_t msg;
1780 		sib_msg_t sib_msg = {0};
1781 		job_step_kill_msg_t *kill_req;
1782 
1783 		/* Build and pack a kill_req msg to put in a sib_msg */
1784 		kill_req = xmalloc(sizeof(job_step_kill_msg_t));
1785 		kill_req->job_id      = job_update_info->job_id;
1786 		kill_req->sjob_id     = NULL;
1787 		kill_req->job_step_id = SLURM_BATCH_SCRIPT;
1788 		kill_req->signal      = SIGKILL;
1789 		kill_req->flags       = 0;
1790 
1791 		sib_msg.data = kill_req;
1792 
1793 		slurm_msg_t_init(&msg);
1794 		msg.data = &sib_msg;
1795 
1796 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1797 			info("%s: %pJ running now, just going to cancel it.",
1798 			     __func__, job_ptr);
1799 
1800 		_q_sib_job_cancel(&msg, job_update_info->uid);
1801 	} else {
1802 		_do_fed_job_complete(job_ptr, job_update_info->job_state,
1803 				     job_update_info->return_code,
1804 				     job_update_info->start_time);
1805 	}
1806 
1807 	unlock_slurmctld(job_write_lock);
1808 }
1809 
_handle_fed_job_cancel(fed_job_update_info_t * job_update_info)1810 static void _handle_fed_job_cancel(fed_job_update_info_t *job_update_info)
1811 {
1812 	kill_job_step(job_update_info->kill_msg, job_update_info->uid);
1813 }
1814 
1815 static void
_handle_fed_job_remove_active_sib_bit(fed_job_update_info_t * job_update_info)1816 _handle_fed_job_remove_active_sib_bit(fed_job_update_info_t *job_update_info)
1817 {
1818 	fed_job_info_t *job_info;
1819 	job_record_t *job_ptr;
1820 	slurmdb_cluster_rec_t *sibling;
1821 
1822 	slurmctld_lock_t job_write_lock = {
1823 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
1824 
1825 	lock_slurmctld(job_write_lock);
1826 	if (!(job_ptr = find_job_record(job_update_info->job_id))) {
1827 		error("%s: failed to find job_record for fed JobId=%u",
1828 		      __func__, job_update_info->job_id);
1829 		unlock_slurmctld(job_write_lock);
1830 		return;
1831 	} else if (!job_ptr->fed_details) {
1832 		debug2("%s: %pJ not federated anymore", __func__, job_ptr);
1833 		unlock_slurmctld(job_write_lock);
1834 		return;
1835 	}
1836 
1837 	slurm_mutex_lock(&fed_job_list_mutex);
1838 	if (!(job_info = _find_fed_job_info(job_update_info->job_id))) {
1839 		error("%s: failed to find fed job info for fed JobId=%u",
1840 		      __func__, job_update_info->job_id);
1841 		slurm_mutex_unlock(&fed_job_list_mutex);
1842 		unlock_slurmctld(job_write_lock);
1843 		return;
1844 	}
1845 
1846 	sibling = fed_mgr_get_cluster_by_name(job_update_info->siblings_str);
1847 	if (sibling) {
1848 		job_info->siblings_active &=
1849 			~(FED_SIBLING_BIT(sibling->fed.id));
1850 		job_ptr->fed_details->siblings_active =
1851 			job_info->siblings_active;
1852 		update_job_fed_details(job_ptr);
1853 	}
1854 
1855 	slurm_mutex_unlock(&fed_job_list_mutex);
1856 	unlock_slurmctld(job_write_lock);
1857 }
1858 
_handle_fed_job_requeue(fed_job_update_info_t * job_update_info)1859 static void _handle_fed_job_requeue(fed_job_update_info_t *job_update_info)
1860 {
1861 	int rc;
1862 	slurmctld_lock_t job_write_lock = {
1863 		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
1864 
1865 	lock_slurmctld(job_write_lock);
1866 	if ((rc = job_requeue(job_update_info->uid, job_update_info->job_id,
1867 			      NULL, false, job_update_info->flags)))
1868 		error("failed to requeue fed JobId=%u - rc:%d",
1869 		      job_update_info->job_id, rc);
1870 	unlock_slurmctld(job_write_lock);
1871 }
1872 
1873 /*
1874  * Job has been started, revoke the sibling jobs.
1875  *
1876  * This is the common code between queued and local job starts.
1877  * This can be done when the job is starting on the origin cluster without
1878  * queueing because the cluster already has the job write lock and fed read
1879  * lock.
1880  *
1881  * Must have fed_job_list mutex locked and job write_lock set.
1882  */
_fed_job_start_revoke(fed_job_info_t * job_info,job_record_t * job_ptr,time_t start_time)1883 static void _fed_job_start_revoke(fed_job_info_t *job_info,
1884 				  job_record_t *job_ptr, time_t start_time)
1885 {
1886 	uint32_t cluster_lock;
1887 	uint64_t old_active;
1888 	uint64_t old_viable;
1889 
1890 	cluster_lock = job_info->cluster_lock;
1891 	old_active   = job_info->siblings_active;
1892 	old_viable   = job_info->siblings_viable;
1893 
1894 	job_ptr->fed_details->cluster_lock = cluster_lock;
1895 	job_ptr->fed_details->siblings_active =
1896 		job_info->siblings_active =
1897 		FED_SIBLING_BIT(cluster_lock);
1898 	update_job_fed_details(job_ptr);
1899 
1900 	if (old_active & ~FED_SIBLING_BIT(cluster_lock)) {
1901 		/* There are siblings that need to be removed */
1902 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1903 			info("%s: %pJ is running on cluster id %d, revoking remote siblings (active:%"PRIu64" viable:%"PRIu64")",
1904 			     __func__, job_ptr, cluster_lock,
1905 			     old_active, old_viable);
1906 
1907 		_revoke_sibling_jobs(job_ptr->job_id, cluster_lock,
1908 				     old_active, start_time);
1909 	}
1910 }
1911 
_handle_fed_job_start(fed_job_update_info_t * job_update_info)1912 static void _handle_fed_job_start(fed_job_update_info_t *job_update_info)
1913 {
1914 	fed_job_info_t *job_info;
1915 	job_record_t *job_ptr;
1916 
1917 	slurmctld_lock_t job_write_lock = {
1918 		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
1919 
1920 	lock_slurmctld(job_write_lock);
1921 	if (!(job_ptr = find_job_record(job_update_info->job_id))) {
1922 		error("%s: failed to find job_record for fed JobId=%u",
1923 		      __func__, job_update_info->job_id);
1924 		unlock_slurmctld(job_write_lock);
1925 		return;
1926 	} else if (!job_ptr->fed_details) {
1927 		debug2("%s: %pJ not federated anymore", __func__, job_ptr);
1928 		unlock_slurmctld(job_write_lock);
1929 		return;
1930 	}
1931 
1932 	slurm_mutex_lock(&fed_job_list_mutex);
1933 	if (!(job_info =
1934 	      _find_fed_job_info(job_update_info->job_id))) {
1935 		error("%s: failed to find fed job info for fed JobId=%u",
1936 		      __func__, job_update_info->job_id);
1937 		slurm_mutex_unlock(&fed_job_list_mutex);
1938 		unlock_slurmctld(job_write_lock);
1939 		return;
1940 	}
1941 
1942 	_fed_job_start_revoke(job_info, job_ptr, job_update_info->start_time);
1943 
1944 	slurm_mutex_unlock(&fed_job_list_mutex);
1945 
1946 	if (job_info->cluster_lock != fed_mgr_cluster_rec->fed.id) {
1947 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1948 			info("%s: %pJ is running remotely, revoking origin tracking job",
1949 			     __func__, job_ptr);
1950 
1951 		/* leave as pending so that it will stay around */
1952 		fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0,
1953 				   job_update_info->start_time);
1954 	}
1955 
1956 	unlock_slurmctld(job_write_lock);
1957 }
1958 
_list_find_jobid(void * x,void * key)1959 static int _list_find_jobid(void *x, void *key)
1960 {
1961 	uint32_t src_jobid = *(uint32_t *)x;
1962 	uint32_t key_jobid = *(uint32_t *)key;
1963 
1964 	if (src_jobid == key_jobid)
1965 		return 1;
1966 	return 0;
1967 }
1968 
_handle_fed_job_submission(fed_job_update_info_t * job_update_info)1969 static void _handle_fed_job_submission(fed_job_update_info_t *job_update_info)
1970 {
1971 	job_record_t *job_ptr;
1972 	bool interactive_job =
1973 		(job_update_info->type == FED_JOB_SUBMIT_INT) ?
1974 		true : false;
1975 
1976 	slurmctld_lock_t job_write_lock = {
1977 		READ_LOCK, WRITE_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK };
1978 
1979 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
1980 		info("%s: submitting %s sibling JobId=%u from %s",
1981 		     __func__, (interactive_job) ? "interactive" : "batch",
1982 		     job_update_info->submit_desc->job_id,
1983 		     job_update_info->submit_cluster);
1984 
1985 
1986 	/* do this outside the job write lock */
1987 	delete_job_desc_files(job_update_info->job_id);
1988 	lock_slurmctld(job_write_lock);
1989 
1990 	if ((job_ptr = find_job_record(job_update_info->job_id))) {
1991 		debug("Found existing fed %pJ, going to requeue/unlink it",
1992 		      job_ptr);
1993 		/* Delete job quickly */
1994 		job_ptr->job_state |= JOB_REVOKED;
1995 		unlink_job_record(job_ptr);
1996 
1997 		/*
1998 		 * Make sure that the file delete request is purged from list
1999 		 * -- added from purge_job_record() -- before job is allocated
2000 		 *  again.
2001 		 */
2002 		list_delete_all(purge_files_list, _list_find_jobid,
2003 				&job_update_info->job_id);
2004 
2005 	}
2006 
2007 	_fed_mgr_job_allocate_sib(job_update_info->submit_cluster,
2008 				  job_update_info->submit_desc,
2009 				  interactive_job);
2010 	unlock_slurmctld(job_write_lock);
2011 }
2012 
_handle_fed_job_update(fed_job_update_info_t * job_update_info)2013 static void _handle_fed_job_update(fed_job_update_info_t *job_update_info)
2014 {
2015 	int rc;
2016 	slurm_msg_t msg;
2017 	slurm_msg_t_init(&msg);
2018 	job_desc_msg_t *job_desc = job_update_info->submit_desc;
2019 	int db_inx_max_cnt = 5, i=0;
2020 	slurmdb_cluster_rec_t *sibling;
2021 
2022 	slurmctld_lock_t job_write_lock = {
2023 		READ_LOCK, WRITE_LOCK, READ_LOCK, READ_LOCK, READ_LOCK };
2024 	slurmctld_lock_t fed_read_lock = {
2025 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2026 
2027 	xassert(job_desc);
2028 
2029 	/* scontrol always sets job_id_str */
2030 	job_desc->job_id = job_update_info->job_id;
2031 	msg.data = job_desc;
2032 
2033 	rc = ESLURM_JOB_SETTING_DB_INX;
2034 	while (rc == ESLURM_JOB_SETTING_DB_INX) {
2035 		lock_slurmctld(job_write_lock);
2036 		rc = update_job(&msg, job_update_info->uid, false);
2037 		unlock_slurmctld(job_write_lock);
2038 
2039 		if (i >= db_inx_max_cnt) {
2040 			info("%s: can't update fed job, waited %d seconds for JobId=%u to get a db_index, but it hasn't happened yet. Giving up and letting the user know.",
2041 			     __func__, db_inx_max_cnt,
2042 			     job_update_info->job_id);
2043 			break;
2044 		}
2045 		i++;
2046 		debug("%s: We cannot update JobId=%u at the moment, we are setting the db index, waiting",
2047 		      __func__, job_update_info->job_id);
2048 		sleep(1);
2049 	}
2050 
2051 	lock_slurmctld(fed_read_lock);
2052 	if (!(sibling =
2053 	      fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) {
2054 		error("Invalid sibling name");
2055 	} else {
2056 		_persist_update_job_resp(sibling, job_update_info->job_id, rc);
2057 	}
2058 	unlock_slurmctld(fed_read_lock);
2059 }
2060 
2061 static void
_handle_fed_job_update_response(fed_job_update_info_t * job_update_info)2062 _handle_fed_job_update_response(fed_job_update_info_t *job_update_info)
2063 {
2064 	fed_job_info_t *job_info;
2065 	slurmdb_cluster_rec_t *sibling;
2066 
2067 	slurmctld_lock_t fed_read_lock = {
2068 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2069 
2070 	slurm_mutex_lock(&fed_job_list_mutex);
2071 	if (!(job_info = _find_fed_job_info(job_update_info->job_id))) {
2072 		error("%s: failed to find fed job info for fed JobId=%u",
2073 		      __func__, job_update_info->job_id);
2074 		slurm_mutex_unlock(&fed_job_list_mutex);
2075 		return;
2076 	}
2077 
2078 	lock_slurmctld(fed_read_lock);
2079 
2080 	if (!(sibling =
2081 	      fed_mgr_get_cluster_by_name(job_update_info->submit_cluster))) {
2082 		error("Invalid sibling name");
2083 		unlock_slurmctld(fed_read_lock);
2084 		slurm_mutex_unlock(&fed_job_list_mutex);
2085 		return;
2086 	}
2087 
2088 	if (job_info->updating_sibs[sibling->fed.id])
2089 		job_info->updating_sibs[sibling->fed.id]--;
2090 	else
2091 		error("%s this should never happen", __func__);
2092 
2093 	slurm_mutex_unlock(&fed_job_list_mutex);
2094 	unlock_slurmctld(fed_read_lock);
2095 }
2096 
_handle_fed_job_sync(fed_job_update_info_t * job_update_info)2097 extern int _handle_fed_job_sync(fed_job_update_info_t *job_update_info)
2098 {
2099 	int rc = SLURM_SUCCESS;
2100 
2101 	slurmctld_lock_t job_write_lock = {
2102 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2103 
2104 	lock_slurmctld(job_write_lock);
2105 
2106 	rc = _sync_jobs(job_update_info->submit_cluster,
2107 			job_update_info->job_info_msg,
2108 			job_update_info->start_time);
2109 
2110 	unlock_slurmctld(job_write_lock);
2111 
2112 	return rc;
2113 }
2114 
2115 /* Have to send the job sync from the job_update thread so that it can
2116  * independently get the job read lock. */
_handle_fed_send_job_sync(fed_job_update_info_t * job_update_info)2117 extern int _handle_fed_send_job_sync(fed_job_update_info_t *job_update_info)
2118 {
2119         int rc = SLURM_SUCCESS;
2120 	List jobids;
2121         slurm_msg_t req_msg, job_msg;
2122 	sib_msg_t sib_msg = {0};
2123 	char *dump = NULL;
2124 	int dump_size = 0;
2125 	slurmdb_cluster_rec_t *sibling;
2126 	Buf buffer;
2127 	time_t sync_time = 0;
2128 	char *sib_name = job_update_info->submit_cluster;
2129 
2130 	slurmctld_lock_t job_read_lock = {
2131 		READ_LOCK, READ_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2132 
2133 	lock_slurmctld(job_read_lock);
2134 
2135 	if (!(sibling = fed_mgr_get_cluster_by_name(sib_name))) {
2136 		error("%s: Invalid sibling name %s", __func__, sib_name);
2137 		unlock_slurmctld(job_read_lock);
2138 		return SLURM_ERROR;
2139 	}
2140 
2141 	sync_time = time(NULL);
2142 	jobids = _get_sync_jobid_list(sibling->fed.id, sync_time);
2143 	pack_spec_jobs(&dump, &dump_size, jobids, SHOW_ALL,
2144 		       slurmctld_conf.slurm_user_id, NO_VAL,
2145 		       sibling->rpc_version);
2146 	FREE_NULL_LIST(jobids);
2147 
2148 	unlock_slurmctld(job_read_lock);
2149 
2150 	slurm_msg_t_init(&job_msg);
2151 	job_msg.protocol_version = sibling->rpc_version;
2152 	job_msg.msg_type         = RESPONSE_JOB_INFO;
2153 	job_msg.data             = dump;
2154 	job_msg.data_size        = dump_size;
2155 
2156 	buffer = init_buf(BUF_SIZE);
2157 	pack_msg(&job_msg, buffer);
2158 
2159 	memset(&sib_msg, 0, sizeof(sib_msg_t));
2160 	sib_msg.sib_msg_type = FED_JOB_SYNC;
2161 	sib_msg.data_buffer  = buffer;
2162 	sib_msg.data_type    = job_msg.msg_type;
2163 	sib_msg.data_version = job_msg.protocol_version;
2164 	sib_msg.start_time   = sync_time;
2165 
2166 	slurm_msg_t_init(&req_msg);
2167 	req_msg.msg_type         = REQUEST_SIB_MSG;
2168 	req_msg.protocol_version = job_msg.protocol_version;
2169 	req_msg.data             = &sib_msg;
2170 
2171 	sibling->fed.sync_sent = true;
2172 
2173 	rc = _queue_rpc(sibling, &req_msg, 0, false);
2174 
2175 	free_buf(buffer);
2176 	xfree(dump);
2177 
2178 	return rc;
2179 }
2180 
_foreach_fed_job_update_info(fed_job_update_info_t * job_update_info)2181 static int _foreach_fed_job_update_info(fed_job_update_info_t *job_update_info)
2182 {
2183 	if (!fed_mgr_cluster_rec) {
2184 		info("Not part of federation anymore, not performing fed job updates");
2185 		return SLURM_SUCCESS;
2186 	}
2187 
2188 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
2189 		info("%s: JobId=%u type:%s",
2190 		     __func__, job_update_info->job_id,
2191 		     _job_update_type_str(job_update_info->type));
2192 
2193 	switch (job_update_info->type) {
2194 	case FED_JOB_COMPLETE:
2195 		_handle_fed_job_complete(job_update_info);
2196 		break;
2197 	case FED_JOB_CANCEL:
2198 		_handle_fed_job_cancel(job_update_info);
2199 		break;
2200 	case FED_JOB_REMOVE_ACTIVE_SIB_BIT:
2201 		_handle_fed_job_remove_active_sib_bit(job_update_info);
2202 		break;
2203 	case FED_JOB_REQUEUE:
2204 		_handle_fed_job_requeue(job_update_info);
2205 		break;
2206 	case FED_JOB_START:
2207 		_handle_fed_job_start(job_update_info);
2208 		break;
2209 	case FED_JOB_SUBMIT_BATCH:
2210 	case FED_JOB_SUBMIT_INT:
2211 		_handle_fed_job_submission(job_update_info);
2212 		break;
2213 	case FED_JOB_SYNC:
2214 		_handle_fed_job_sync(job_update_info);
2215 		break;
2216 	case FED_JOB_UPDATE:
2217 		_handle_fed_job_update(job_update_info);
2218 		break;
2219 	case FED_JOB_UPDATE_RESPONSE:
2220 		_handle_fed_job_update_response(job_update_info);
2221 		break;
2222 	case FED_SEND_JOB_SYNC:
2223 		_handle_fed_send_job_sync(job_update_info);
2224 		break;
2225 	default:
2226 		error("Invalid fed_job type: %d JobId=%u",
2227 		      job_update_info->type, job_update_info->job_id);
2228 	}
2229 
2230 	return SLURM_SUCCESS;
2231 }
2232 
_update_origin_job_dep(job_record_t * job_ptr,slurmdb_cluster_rec_t * origin)2233 static void _update_origin_job_dep(job_record_t *job_ptr,
2234 				   slurmdb_cluster_rec_t *origin)
2235 {
2236 	slurm_msg_t req_msg;
2237 	dep_update_origin_msg_t dep_update_msg = { 0 };
2238 
2239 	xassert(job_ptr);
2240 	xassert(job_ptr->details);
2241 	xassert(job_ptr->details->depend_list);
2242 	xassert(origin);
2243 
2244 	if (origin == fed_mgr_cluster_rec) {
2245 		error("%s: Cannot send dependency update of %pJ to self - were clusters removed then re-added to the federation in a different order?",
2246 		      __func__, job_ptr);
2247 		return;
2248 	}
2249 
2250 	dep_update_msg.depend_list = job_ptr->details->depend_list;
2251 	dep_update_msg.job_id = job_ptr->job_id;
2252 
2253 	slurm_msg_t_init(&req_msg);
2254 	req_msg.msg_type = REQUEST_UPDATE_ORIGIN_DEP;
2255 	req_msg.data = &dep_update_msg;
2256 
2257 	if (_queue_rpc(origin, &req_msg, 0, false))
2258 		error("%s: Failed to send dependency update for %pJ",
2259 		      __func__, job_ptr);
2260 }
2261 
_find_local_dep(void * arg,void * key)2262 static int _find_local_dep(void *arg, void *key)
2263 {
2264 	depend_spec_t *dep_ptr = (depend_spec_t *) arg;
2265 	return !(dep_ptr->depend_flags & SLURM_FLAGS_REMOTE);
2266 }
2267 
_find_job_by_id(void * arg,void * key)2268 static int _find_job_by_id(void *arg, void *key)
2269 {
2270 	job_record_t *job_ptr = (job_record_t *) arg;
2271 	uint32_t job_id = *((uint32_t *) key);
2272 	return job_ptr->job_id == job_id;
2273 }
2274 
_handle_recv_remote_dep(dep_msg_t * remote_dep_info)2275 static void _handle_recv_remote_dep(dep_msg_t *remote_dep_info)
2276 {
2277 	/*
2278 	 * update_job_dependency() will:
2279 	 * - read the job list (need job read lock)
2280 	 * - call fed_mgr_is_origin_job_id (need fed read lock)
2281 	 */
2282 	int rc, tmp;
2283 	slurmctld_lock_t job_read_lock = { .job = READ_LOCK, .fed = READ_LOCK };
2284 	job_record_t *job_ptr = xmalloc(sizeof *job_ptr);
2285 
2286 	job_ptr->magic = JOB_MAGIC;
2287 	job_ptr->details = xmalloc(sizeof *(job_ptr->details));
2288 	job_ptr->details->magic = DETAILS_MAGIC;
2289 	job_ptr->job_id = remote_dep_info->job_id;
2290 	job_ptr->name = remote_dep_info->job_name;
2291 	job_ptr->user_id = remote_dep_info->user_id;
2292 
2293 	/*
2294 	 * Initialize array info. Allocate space for job_ptr->array_recs if
2295 	 * the job is an array so it's recognized as an array, but it's not used
2296 	 * anywhere.
2297 	 */
2298 	job_ptr->array_job_id = remote_dep_info->array_job_id;
2299 	job_ptr->array_task_id = remote_dep_info->array_task_id;
2300 
2301 	/*
2302 	 * We need to allocate space for job_ptr->array_recs if
2303 	 * it's an array job, but we don't use anything in it.
2304 	 */
2305 	if (remote_dep_info->is_array)
2306 		job_ptr->array_recs = xmalloc(sizeof *(job_ptr->array_recs));
2307 
2308 	/*
2309 	 * We need to allocate space for fed_details so
2310 	 * other places know this is a fed job, but we don't
2311 	 * need to set anything specific in it.
2312 	 */
2313 	job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details));
2314 
2315 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
2316 		info("%s: Got job_id: %u, name: \"%s\", array_task_id: %u, dependency: \"%s\", is_array? %s, user_id: %u",
2317 		     __func__, remote_dep_info->job_id, remote_dep_info->job_name,
2318 		     remote_dep_info->array_task_id, remote_dep_info->dependency,
2319 		     remote_dep_info->is_array ? "yes" : "no",
2320 		     remote_dep_info->user_id);
2321 
2322 	/* NULL string so it doesn't get free'd since it's used by job_ptr */
2323 	remote_dep_info->job_name = NULL;
2324 
2325 	/* Create and validate the dependency. */
2326 	lock_slurmctld(job_read_lock);
2327 	rc = update_job_dependency(job_ptr, remote_dep_info->dependency);
2328 	unlock_slurmctld(job_read_lock);
2329 
2330 	if (rc) {
2331 		error("%s: Invalid dependency %s for %pJ: %s",
2332 		      __func__, remote_dep_info->dependency, job_ptr,
2333 		      slurm_strerror(rc));
2334 		_destroy_dep_job(job_ptr);
2335 	} else {
2336 		job_record_t *tmp_job;
2337 		ListIterator itr;
2338 
2339 		/*
2340 		 * Remove the old reference to this job from remote_dep_job_list
2341 		 * so that we don't continue testing the old dependencies.
2342 		 */
2343 		slurm_mutex_lock(&dep_job_list_mutex);
2344 		itr = list_iterator_create(remote_dep_job_list);
2345 		while ((tmp_job = list_next(itr))) {
2346 			if (tmp_job->job_id == job_ptr->job_id) {
2347 				list_delete_item(itr);
2348 				break;
2349 			}
2350 		}
2351 		list_iterator_destroy(itr);
2352 
2353 		/*
2354 		 * If we were sent a list of 0 dependencies, that means
2355 		 * the dependency was updated and cleared, so don't
2356 		 * add it to the list to test. Also only add it if
2357 		 * there are dependencies local to this cluster.
2358 		 */
2359 		if (list_count(job_ptr->details->depend_list) &&
2360 		    list_find_first(job_ptr->details->depend_list,
2361 				    _find_local_dep, &tmp))
2362 			list_append(remote_dep_job_list, job_ptr);
2363 		else
2364 			_destroy_dep_job(job_ptr);
2365 
2366 		slurm_mutex_unlock(&dep_job_list_mutex);
2367 	}
2368 	_destroy_dep_msg(remote_dep_info);
2369 }
2370 
_handle_dep_update_origin_msgs(void)2371 static void _handle_dep_update_origin_msgs(void)
2372 {
2373 	job_record_t *job_ptr;
2374 	dep_update_origin_msg_t *dep_update_msg;
2375 	List update_job_list = NULL;
2376 	slurmctld_lock_t job_write_lock = {
2377 		.conf = READ_LOCK, .job = WRITE_LOCK, .fed = READ_LOCK };
2378 
2379 	if (!list_count(origin_dep_update_list))
2380 		return;
2381 
2382 	lock_slurmctld(job_write_lock);
2383 	while ((dep_update_msg = list_pop(origin_dep_update_list))) {
2384 		if (!(job_ptr = find_job_record(dep_update_msg->job_id))) {
2385 			/*
2386 			 * Maybe the job was cancelled and purged before
2387 			 * the dependency update got here or was able
2388 			 * to be processed. Regardless, this job doesn't
2389 			 * exist here, so we have to throw out this
2390 			 * dependency update message.
2391 			 */
2392 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
2393 				info("%s: Could not find job %u, cannot process dependency update. Perhaps the jobs was purged before we got here.",
2394 				     __func__, dep_update_msg->job_id);
2395 			slurm_free_dep_update_origin_msg(dep_update_msg);
2396 			continue;
2397 		} else if (!job_ptr->details ||
2398 			   !job_ptr->details->depend_list) {
2399 			/*
2400 			 * This might happen if the job's dependencies
2401 			 * were updated to be none before the dependency
2402 			 * update came from the sibling cluster.
2403 			 */
2404 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
2405 				info("%s: %pJ doesn't have dependencies, cannot process dependency update",
2406 				     __func__, job_ptr);
2407 			slurm_free_dep_update_origin_msg(dep_update_msg);
2408 			continue;
2409 		}
2410 		if (update_job_dependency_list(job_ptr,
2411 					       dep_update_msg->depend_list)) {
2412 			if (!update_job_list) {
2413 				update_job_list = list_create(NULL);
2414 				list_append(update_job_list, job_ptr);
2415 			} else if (!list_find_first(update_job_list,
2416 						    _find_job_by_id,
2417 						    &job_ptr->job_id))
2418 				list_append(update_job_list, job_ptr);
2419 		}
2420 		slurm_free_dep_update_origin_msg(dep_update_msg);
2421 	}
2422 	if (update_job_list) {
2423 		list_for_each(update_job_list, handle_job_dependency_updates,
2424 			      NULL);
2425 		FREE_NULL_LIST(update_job_list);
2426 	}
2427 	unlock_slurmctld(job_write_lock);
2428 }
2429 
_test_dep_job_thread(void * arg)2430 static void *_test_dep_job_thread(void *arg)
2431 {
2432 	time_t last_test = 0;
2433 	time_t now;
2434 	slurmctld_lock_t job_read_lock = {
2435 		.job = READ_LOCK, .fed = READ_LOCK };
2436 
2437 #if HAVE_SYS_PRCTL_H
2438 	if (prctl(PR_SET_NAME, "fed_test_dep", NULL, NULL, NULL) < 0) {
2439 		error("%s: cannot set my name to %s %m", __func__,
2440 		      "fed_test_dep");
2441 	}
2442 #endif
2443 
2444 	while (!slurmctld_config.shutdown_time) {
2445 		now = time(NULL);
2446 		/* Only test after joining a federation. */
2447 		if (fed_mgr_fed_rec && fed_mgr_cluster_rec &&
2448 		    ((now - last_test) > TEST_REMOTE_DEP_FREQ)) {
2449 			last_test = now;
2450 			lock_slurmctld(job_read_lock);
2451 			fed_mgr_test_remote_dependencies();
2452 			unlock_slurmctld(job_read_lock);
2453 		}
2454 		sleep(2);
2455 	}
2456 	return NULL;
2457 }
2458 
_origin_dep_update_thread(void * arg)2459 static void *_origin_dep_update_thread(void *arg)
2460 {
2461 	struct timespec ts = {0, 0};
2462 
2463 #if HAVE_SYS_PRCTL_H
2464 	if (prctl(PR_SET_NAME, "fed_update_dep", NULL, NULL, NULL) < 0) {
2465 		error("%s: cannot set my name to %s %m", __func__,
2466 		      "fed_update_dep");
2467 	}
2468 #endif
2469 
2470 	while (!slurmctld_config.shutdown_time) {
2471 		slurm_mutex_lock(&origin_dep_update_mutex);
2472 		ts.tv_sec = time(NULL) + 2;
2473 		slurm_cond_timedwait(&origin_dep_cond,
2474 				     &origin_dep_update_mutex, &ts);
2475 		slurm_mutex_unlock(&origin_dep_update_mutex);
2476 
2477 		if (slurmctld_config.shutdown_time)
2478 			break;
2479 
2480 		/* Wait for fed_mgr_init() */
2481 		if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec)
2482 			continue;
2483 
2484 		_handle_dep_update_origin_msgs();
2485 	}
2486 	return NULL;
2487 }
2488 
_remote_dep_recv_thread(void * arg)2489 static void *_remote_dep_recv_thread(void *arg)
2490 {
2491 	struct timespec ts = {0, 0};
2492 	dep_msg_t *remote_dep_info;
2493 
2494 #if HAVE_SYS_PRCTL_H
2495 	if (prctl(PR_SET_NAME, "fed_remote_dep", NULL, NULL, NULL) < 0) {
2496 		error("%s: cannot set my name to %s %m", __func__,
2497 		      "fed_remote_dep");
2498 	}
2499 #endif
2500 
2501 	while (!slurmctld_config.shutdown_time) {
2502 		slurm_mutex_lock(&remote_dep_recv_mutex);
2503 		ts.tv_sec = time(NULL) + 2;
2504 		slurm_cond_timedwait(&remote_dep_cond,
2505 				     &remote_dep_recv_mutex, &ts);
2506 		slurm_mutex_unlock(&remote_dep_recv_mutex);
2507 
2508 		if (slurmctld_config.shutdown_time)
2509 			break;
2510 
2511 		/* Wait for fed_mgr_init() */
2512 		if (!fed_mgr_fed_rec || !fed_mgr_cluster_rec)
2513 			continue;
2514 
2515 		while ((remote_dep_info = list_pop(remote_dep_recv_list))) {
2516 			_handle_recv_remote_dep(remote_dep_info);
2517 		}
2518 	}
2519 	return NULL;
2520 }
2521 
2522 /* Start a thread to manage queued sibling requests */
_fed_job_update_thread(void * arg)2523 static void *_fed_job_update_thread(void *arg)
2524 {
2525 	struct timespec ts = {0, 0};
2526 	fed_job_update_info_t *job_update_info;
2527 
2528 #if HAVE_SYS_PRCTL_H
2529 	if (prctl(PR_SET_NAME, "fed_jobs", NULL, NULL, NULL) < 0) {
2530 		error("%s: cannot set my name to %s %m", __func__, "fed_jobs");
2531 	}
2532 #endif
2533 
2534 	while (!slurmctld_config.shutdown_time) {
2535 		slurm_mutex_lock(&job_update_mutex);
2536 		ts.tv_sec  = time(NULL) + 2;
2537 		slurm_cond_timedwait(&job_update_cond,
2538 				     &job_update_mutex, &ts);
2539 		slurm_mutex_unlock(&job_update_mutex);
2540 
2541 		if (slurmctld_config.shutdown_time)
2542 			break;
2543 
2544 		while ((job_update_info = list_pop(fed_job_update_list))) {
2545 			_foreach_fed_job_update_info(job_update_info);
2546 			_destroy_fed_job_update_info(job_update_info);
2547 		}
2548 	}
2549 
2550 	return NULL;
2551 }
2552 
2553 /* Start a thread to manage queued agent requests */
_agent_thread(void * arg)2554 static void *_agent_thread(void *arg)
2555 {
2556 	slurmdb_cluster_rec_t *cluster;
2557 	struct timespec ts = {0, 0};
2558 	ListIterator cluster_iter, rpc_iter;
2559 	agent_queue_t *rpc_rec;
2560 	slurm_msg_t req_msg, resp_msg;
2561 	ctld_list_msg_t ctld_req_msg;
2562 	bitstr_t *success_bits;
2563 	int rc, resp_inx, success_size;
2564 
2565 	slurmctld_lock_t fed_read_lock = {
2566 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
2567 
2568 #if HAVE_SYS_PRCTL_H
2569 	if (prctl(PR_SET_NAME, "fed_agent", NULL, NULL, NULL) < 0) {
2570 		error("%s: cannot set my name to %s %m", __func__, "fed_agent");
2571 	}
2572 #endif
2573 
2574 	while (!slurmctld_config.shutdown_time) {
2575 		/* Wait for new work or re-issue RPCs after 2 second wait */
2576 		slurm_mutex_lock(&agent_mutex);
2577 		if (!slurmctld_config.shutdown_time && !agent_queue_size) {
2578 			ts.tv_sec  = time(NULL) + 2;
2579 			slurm_cond_timedwait(&agent_cond, &agent_mutex, &ts);
2580 		}
2581 		agent_queue_size = 0;
2582 		slurm_mutex_unlock(&agent_mutex);
2583 		if (slurmctld_config.shutdown_time)
2584 			break;
2585 
2586 		lock_slurmctld(fed_read_lock);
2587 		if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list) {
2588 			unlock_slurmctld(fed_read_lock);
2589 			continue;
2590 		}
2591 
2592 		/* Look for work on each cluster */
2593 		cluster_iter = list_iterator_create(
2594 					fed_mgr_fed_rec->cluster_list);
2595 		while (!slurmctld_config.shutdown_time &&
2596 		       (cluster = list_next(cluster_iter))) {
2597 			time_t now = time(NULL);
2598 			if ((cluster->send_rpc == NULL) ||
2599 			   (list_count(cluster->send_rpc) == 0))
2600 				continue;
2601 
2602 			/* Move currently pending RPCs to new list */
2603 			ctld_req_msg.my_list = NULL;
2604 			rpc_iter = list_iterator_create(cluster->send_rpc);
2605 			while ((rpc_rec = list_next(rpc_iter))) {
2606 				if ((rpc_rec->last_try + rpc_rec->last_defer) >=
2607 				    now)
2608 					continue;
2609 				if (!ctld_req_msg.my_list)
2610 					ctld_req_msg.my_list =list_create(NULL);
2611 				list_append(ctld_req_msg.my_list,
2612 					    rpc_rec->buffer);
2613 				rpc_rec->last_try = now;
2614 				if (rpc_rec->last_defer == 128) {
2615 					info("%s: %s JobId=%u request to cluster %s is repeatedly failing",
2616 					     __func__,
2617 					     rpc_num2string(rpc_rec->msg_type),
2618 					     rpc_rec->job_id, cluster->name);
2619 					rpc_rec->last_defer *= 2;
2620 				} else if (rpc_rec->last_defer)
2621 					rpc_rec->last_defer *= 2;
2622 				else
2623 					rpc_rec->last_defer = 2;
2624 			}
2625 			list_iterator_destroy(rpc_iter);
2626 			if (!ctld_req_msg.my_list)
2627 				continue;
2628 
2629 			/* Build, pack and send the combined RPC */
2630 			slurm_msg_t_init(&req_msg);
2631 			req_msg.msg_type = REQUEST_CTLD_MULT_MSG;
2632 			req_msg.data     = &ctld_req_msg;
2633 			rc = _send_recv_msg(cluster, &req_msg, &resp_msg,
2634 					    false);
2635 
2636 			/* Process the response */
2637 			if ((rc == SLURM_SUCCESS) &&
2638 			    (resp_msg.msg_type == RESPONSE_CTLD_MULT_MSG)) {
2639 				/* Remove successfully processed RPCs */
2640 				resp_inx = 0;
2641 				success_bits = _parse_resp_ctld_mult(&resp_msg);
2642 				success_size = bit_size(success_bits);
2643 				rpc_iter = list_iterator_create(cluster->
2644 								send_rpc);
2645 				while ((rpc_rec = list_next(rpc_iter))) {
2646 					if (rpc_rec->last_try != now)
2647 						continue;
2648 					if (resp_inx >= success_size) {
2649 						error("%s: bitmap too small (%d >= %d)",
2650 						      __func__, resp_inx,
2651 						      success_size);
2652 						break;
2653 					}
2654 					if (bit_test(success_bits, resp_inx++))
2655 						list_delete_item(rpc_iter);
2656 				}
2657 				list_iterator_destroy(rpc_iter);
2658 				FREE_NULL_BITMAP(success_bits);
2659 			} else {
2660 				/* Failed to process combined RPC.
2661 				 * Leave all RPCs on the queue. */
2662 				if (rc != SLURM_SUCCESS) {
2663 					if (_comm_fail_log(cluster)) {
2664 						error("%s: Failed to send RPC: %s",
2665 						      __func__,
2666 						      slurm_strerror(rc));
2667 					} else {
2668 						debug("%s: Failed to send RPC: %s",
2669 						      __func__,
2670 						      slurm_strerror(rc));
2671 					}
2672 				} else if (resp_msg.msg_type ==
2673 					   PERSIST_RC) {
2674 					persist_rc_msg_t *msg;
2675 					char *err_str;
2676 					msg = resp_msg.data;
2677 					if (msg->comment)
2678 						err_str = msg->comment;
2679 					else
2680 						err_str=slurm_strerror(msg->rc);
2681 					error("%s: failed to process msg: %s",
2682 					      __func__, err_str);
2683 				} else if (resp_msg.msg_type ==
2684 					   RESPONSE_SLURM_RC) {
2685 					rc = slurm_get_return_code(
2686 						resp_msg.msg_type,
2687 						resp_msg.data);
2688 					error("%s: failed to process msg: %s",
2689 					      __func__, slurm_strerror(rc));
2690 				} else {
2691 					error("%s: Invalid response msg_type: %u",
2692 					      __func__, resp_msg.msg_type);
2693 				}
2694 			}
2695 			(void) slurm_free_msg_data(resp_msg.msg_type,
2696 						   resp_msg.data);
2697 
2698 			list_destroy(ctld_req_msg.my_list);
2699 		}
2700 		list_iterator_destroy(cluster_iter);
2701 
2702 		unlock_slurmctld(fed_read_lock);
2703 	}
2704 
2705 	/* Log the abandoned RPCs */
2706 	lock_slurmctld(fed_read_lock);
2707 	if (!fed_mgr_fed_rec)
2708 		goto end_it;
2709 
2710 	cluster_iter = list_iterator_create(fed_mgr_fed_rec->cluster_list);
2711 	while ((cluster = list_next(cluster_iter))) {
2712 		if (cluster->send_rpc == NULL)
2713 			continue;
2714 
2715 		rpc_iter = list_iterator_create(cluster->send_rpc);
2716 		while ((rpc_rec = list_next(rpc_iter))) {
2717 			info("%s: %s JobId=%u request to cluster %s aborted",
2718 			     __func__, rpc_num2string(rpc_rec->msg_type),
2719 			     rpc_rec->job_id, cluster->name);
2720 			list_delete_item(rpc_iter);
2721 		}
2722 		list_iterator_destroy(rpc_iter);
2723 		FREE_NULL_LIST(cluster->send_rpc);
2724 	}
2725 	list_iterator_destroy(cluster_iter);
2726 
2727 end_it:
2728 	unlock_slurmctld(fed_read_lock);
2729 
2730 	return NULL;
2731 }
2732 
_spawn_threads(void)2733 static void _spawn_threads(void)
2734 {
2735 	slurm_mutex_lock(&agent_mutex);
2736 	slurm_thread_create(&agent_thread_id, _agent_thread, NULL);
2737 	slurm_mutex_unlock(&agent_mutex);
2738 
2739 	slurm_mutex_lock(&job_update_mutex);
2740 	slurm_thread_create(&fed_job_update_thread_id,
2741 			    _fed_job_update_thread, NULL);
2742 	slurm_mutex_unlock(&job_update_mutex);
2743 
2744 	slurm_mutex_lock(&remote_dep_recv_mutex);
2745 	slurm_thread_create(&remote_dep_thread_id,
2746 			    _remote_dep_recv_thread, NULL);
2747 	slurm_mutex_unlock(&remote_dep_recv_mutex);
2748 
2749 	slurm_thread_create(&dep_job_thread_id, _test_dep_job_thread, NULL);
2750 
2751 	slurm_mutex_lock(&origin_dep_update_mutex);
2752 	slurm_thread_create(&origin_dep_thread_id, _origin_dep_update_thread,
2753 			    NULL);
2754 	slurm_mutex_unlock(&origin_dep_update_mutex);
2755 }
2756 
_add_missing_fed_job_info()2757 static void _add_missing_fed_job_info()
2758 {
2759 	job_record_t *job_ptr;
2760 	ListIterator job_itr;
2761 
2762 	slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
2763 
2764 	/* Sanity check and add any missing job_info structs */
2765 	lock_slurmctld(job_read_lock);
2766 	job_itr = list_iterator_create(job_list);
2767 	while ((job_ptr = list_next(job_itr))) {
2768 		uint32_t origin_id;
2769 
2770 		if (!_is_fed_job(job_ptr, &origin_id))
2771 			continue;
2772 
2773 		if (!_find_fed_job_info(job_ptr->job_id)) {
2774 			info("adding missing fed_job_info for job %pJ",
2775 			     job_ptr);
2776 			add_fed_job_info(job_ptr);
2777 		}
2778 	}
2779 	list_iterator_destroy(job_itr);
2780 	unlock_slurmctld(job_read_lock);
2781 }
2782 
fed_mgr_init(void * db_conn)2783 extern int fed_mgr_init(void *db_conn)
2784 {
2785 	int rc = SLURM_SUCCESS;
2786 	uint64_t tmp = 0;
2787 	slurmdb_federation_cond_t fed_cond;
2788 	List fed_list;
2789 	slurmdb_federation_rec_t *fed = NULL, *state_fed = NULL;
2790 	slurmdb_cluster_rec_t *state_cluster = NULL;
2791 
2792 	slurm_mutex_lock(&init_mutex);
2793 
2794 	if (inited) {
2795 		slurm_mutex_unlock(&init_mutex);
2796 		return SLURM_SUCCESS;
2797 	}
2798 
2799 	if (!association_based_accounting)
2800 		goto end_it;
2801 
2802 	slurm_mutex_lock(&fed_job_list_mutex);
2803 	if (!fed_job_list)
2804 		fed_job_list = list_create(xfree_ptr);
2805 	slurm_mutex_unlock(&fed_job_list_mutex);
2806 
2807 	/*
2808 	 * fed_job_update_list should only be appended to and popped from.
2809 	 * So rely on the list's lock. If there are ever changes to iterate the
2810 	 * list, then a lock will be needed around the list.
2811 	 */
2812 	if (!fed_job_update_list)
2813 		fed_job_update_list = list_create(_destroy_fed_job_update_info);
2814 
2815 	/*
2816 	 * remote_dep_recv_list should only be appended to and popped from.
2817 	 * So rely on the list's lock. If there are ever changes to iterate the
2818 	 * list, then a lock will be needed around the list.
2819 	 */
2820 	if (!remote_dep_recv_list)
2821 		remote_dep_recv_list = list_create(_destroy_dep_msg);
2822 
2823 	/*
2824 	 * origin_dep_update_list should only be read from or modified with
2825 	 * list_* functions (such as append, pop, count).
2826 	 * So rely on the list's lock. If there are ever changes to iterate the
2827 	 * list, then a lock will be needed around the list.
2828 	 */
2829 	if (!origin_dep_update_list)
2830 		origin_dep_update_list = list_create(_destroy_dep_update_msg);
2831 
2832 	slurm_mutex_lock(&dep_job_list_mutex);
2833 	if (!remote_dep_job_list)
2834 		remote_dep_job_list = list_create(_destroy_dep_job);
2835 	slurm_mutex_unlock(&dep_job_list_mutex);
2836 
2837 	slurm_persist_conn_recv_server_init();
2838 	_spawn_threads();
2839 
2840 	if (running_cache) {
2841 		debug("Database appears down, reading federations from state file.");
2842 		fed = _state_load(slurmctld_conf.state_save_location);
2843 		if (!fed) {
2844 			debug2("No federation state");
2845 			rc = SLURM_SUCCESS;
2846 			goto end_it;
2847 		}
2848 	} else {
2849 		state_fed = _state_load(slurmctld_conf.state_save_location);
2850 		if (state_fed)
2851 			state_cluster = list_find_first(
2852 						state_fed->cluster_list,
2853 						slurmdb_find_cluster_in_list,
2854 						slurmctld_conf.cluster_name);
2855 
2856 		slurmdb_init_federation_cond(&fed_cond, 0);
2857 		fed_cond.cluster_list = list_create(NULL);
2858 		list_append(fed_cond.cluster_list, slurmctld_conf.cluster_name);
2859 
2860 		fed_list = acct_storage_g_get_federations(
2861 						db_conn,
2862 						slurmctld_conf.slurm_user_id,
2863 						&fed_cond);
2864 		FREE_NULL_LIST(fed_cond.cluster_list);
2865 		if (!fed_list) {
2866 			error("failed to get a federation list");
2867 			rc = SLURM_ERROR;
2868 			goto end_it;
2869 		}
2870 
2871 		if (list_count(fed_list) == 1)
2872 			fed = list_pop(fed_list);
2873 		else if (list_count(fed_list) > 1) {
2874 			error("got more federations than expected");
2875 			rc = SLURM_ERROR;
2876 		}
2877 		FREE_NULL_LIST(fed_list);
2878 	}
2879 
2880 	if (fed) {
2881 		slurmdb_cluster_rec_t *cluster = NULL;
2882 		slurmctld_lock_t fedr_jobw_lock = {
2883 			NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
2884 
2885 		if ((cluster = list_find_first(fed->cluster_list,
2886 					       slurmdb_find_cluster_in_list,
2887 					       slurmctld_conf.cluster_name))) {
2888 			job_record_t *job_ptr;
2889 			ListIterator itr;
2890 
2891 			_join_federation(fed, cluster, &tmp);
2892 
2893 			/* Find clusters that were removed from the federation
2894 			 * since the last time we got an update */
2895 			lock_slurmctld(fedr_jobw_lock);
2896 			if (state_fed && state_cluster && fed_mgr_fed_rec)
2897 				_handle_removed_clusters(state_fed, &tmp);
2898 
2899 			/* Send remote dependencies to siblings. */
2900 			itr = list_iterator_create(job_list);
2901 			while ((job_ptr = list_next(itr))) {
2902 				if (job_ptr->details &&
2903 				    job_ptr->details->dependency &&
2904 				    list_count(job_ptr->details->depend_list) &&
2905 				    fed_mgr_submit_remote_dependencies(job_ptr,
2906 								       false,
2907 								       false))
2908 					error("%s: Failed to send %pJ dependencies to some or all siblings",
2909 					      __func__, job_ptr);
2910 			}
2911 			list_iterator_destroy(itr);
2912 			unlock_slurmctld(fedr_jobw_lock);
2913 		} else {
2914 			slurmdb_destroy_federation_rec(fed);
2915 			error("failed to get cluster from federation that we requested");
2916 			rc = SLURM_ERROR;
2917 		}
2918 	} else if (state_fed && state_cluster) {
2919 		/* cluster has been removed from federation while it was down.
2920 		 * Need to clear up jobs */
2921 		slurmctld_lock_t fedw_jobw_lock = {
2922 			NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
2923 
2924 		info("self was removed from federation since last start");
2925 		lock_slurmctld(fedw_jobw_lock);
2926 		fed_mgr_cluster_rec = state_cluster;
2927 		_cleanup_removed_origin_jobs();
2928 		fed_mgr_cluster_rec = NULL;
2929 		unlock_slurmctld(fedw_jobw_lock);
2930 	}
2931 
2932 	slurmdb_destroy_federation_rec(state_fed);
2933 
2934 end_it:
2935 	/* Call whether state file existed or not. */
2936 	_add_missing_fed_job_info();
2937 
2938 	inited = true;
2939 	slurm_mutex_unlock(&init_mutex);
2940 
2941 	return rc;
2942 }
2943 
fed_mgr_fini(void)2944 extern int fed_mgr_fini(void)
2945 {
2946 	slurmctld_lock_t fed_write_lock = {
2947 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
2948 
2949 	slurm_mutex_lock(&init_mutex);
2950 	inited = false;
2951 	slurm_mutex_unlock(&init_mutex);
2952 
2953 	lock_slurmctld(fed_write_lock);
2954 	/* Call _leave_federation() before slurm_persist_conn_recv_server_fini()
2955 	 * as this will NULL out the cluster's recv persistent connection before
2956 	 * _server_fini() actually destroy's it. That way the cluster's recv
2957 	 * connection won't be pointing to bad memory. */
2958 	_leave_federation();
2959 	unlock_slurmctld(fed_write_lock);
2960 
2961 	slurm_persist_conn_recv_server_fini();
2962 
2963 	if (agent_thread_id)
2964 		pthread_join(agent_thread_id, NULL);
2965 
2966 	if (fed_job_update_thread_id)
2967 		pthread_join(fed_job_update_thread_id, NULL);
2968 
2969 	if (remote_dep_thread_id)
2970 		pthread_join(remote_dep_thread_id, NULL);
2971 
2972 	if (dep_job_thread_id)
2973 		pthread_join(dep_job_thread_id, NULL);
2974 
2975 	if (origin_dep_thread_id)
2976 		pthread_join(origin_dep_thread_id, NULL);
2977 
2978 	_remove_job_watch_thread();
2979 
2980 	slurm_mutex_lock(&fed_job_list_mutex);
2981 	FREE_NULL_LIST(fed_job_list);
2982 	slurm_mutex_unlock(&fed_job_list_mutex);
2983 
2984 	FREE_NULL_LIST(fed_job_update_list);
2985 
2986 	return SLURM_SUCCESS;
2987 }
2988 
_handle_dependencies_for_modified_fed(uint64_t added_clusters,uint64_t removed_clusters)2989 static void _handle_dependencies_for_modified_fed(uint64_t added_clusters,
2990 						  uint64_t removed_clusters)
2991 {
2992 	uint32_t origin_id;
2993 	job_record_t *job_ptr;
2994 	ListIterator itr;
2995 	depend_spec_t find_dep = { 0 };
2996 
2997 	xassert(verify_lock(JOB_LOCK, READ_LOCK));
2998 	xassert(verify_lock(FED_LOCK, READ_LOCK));
2999 
3000 	if (!fed_mgr_cluster_rec)
3001 		return;
3002 
3003 	find_dep.depend_type = SLURM_DEPEND_SINGLETON;
3004 	itr = list_iterator_create(job_list);
3005 	while ((job_ptr = list_next(itr))) {
3006 		if (added_clusters && IS_JOB_PENDING(job_ptr) &&
3007 		    _is_fed_job(job_ptr, &origin_id) &&
3008 		    find_dependency(job_ptr, &find_dep))
3009 			fed_mgr_submit_remote_dependencies(job_ptr, true,
3010 							   false);
3011 		/*
3012 		 * Make sure any remote dependencies are immediately
3013 		 * marked as invalid.
3014 		 */
3015 		if (removed_clusters)
3016 			test_job_dependency(job_ptr, NULL);
3017 	}
3018 	list_iterator_destroy(itr);
3019 }
3020 
fed_mgr_update_feds(slurmdb_update_object_t * update)3021 extern int fed_mgr_update_feds(slurmdb_update_object_t *update)
3022 {
3023 	uint64_t added_clusters = 0, removed_clusters = 0;
3024 	List feds;
3025 	slurmdb_federation_rec_t *fed   = NULL;
3026 	slurmdb_cluster_rec_t *cluster  = NULL;
3027 	slurmctld_lock_t fedr_jobw_lock = {
3028 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
3029 	slurmctld_lock_t fedw_jobw_lock    = {
3030 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, WRITE_LOCK };
3031 
3032 	if (!update->objects)
3033 		return SLURM_SUCCESS;
3034 
3035 	slurm_mutex_lock(&init_mutex);
3036 	if (!inited) {
3037 		slurm_mutex_unlock(&init_mutex);
3038 		return SLURM_SUCCESS; /* we haven't started the fed mgr and we
3039 				       * can't start it from here, don't worry
3040 				       * all will get set up later. */
3041 	}
3042 	slurm_mutex_unlock(&init_mutex);
3043 	/* we only want one update happening at a time. */
3044 	slurm_mutex_lock(&update_mutex);
3045 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
3046 		info("Got a federation update");
3047 
3048 	feds = update->objects;
3049 
3050 	/* find the federation that this cluster is in.
3051 	 * if it's changed from last time then update stored information.
3052 	 * grab other clusters in federation
3053 	 * establish connections with each cluster in federation */
3054 
3055 	/* what if a remote cluster is removed from federation.
3056 	 * have to detect that and close the connection to the remote */
3057 	while ((fed = list_pop(feds))) {
3058 		if (fed->cluster_list &&
3059 		    (cluster = list_find_first(fed->cluster_list,
3060 					       slurmdb_find_cluster_in_list,
3061 					       slurmctld_conf.cluster_name))) {
3062 			/* Find clusters that were removed from the federation
3063 			 * since the last time we got an update */
3064 			lock_slurmctld(fedr_jobw_lock);
3065 			if (fed_mgr_fed_rec)
3066 				_handle_removed_clusters(fed,
3067 							 &removed_clusters);
3068 			unlock_slurmctld(fedr_jobw_lock);
3069 			_join_federation(fed, cluster, &added_clusters);
3070 
3071 			if (added_clusters || removed_clusters) {
3072 				lock_slurmctld(fedr_jobw_lock);
3073 				log_flag(DEPENDENCY, "%s: Cluster(s) added: 0x%"PRIx64"; removed: 0x%"PRIx64,
3074 					 __func__, added_clusters,
3075 					 removed_clusters);
3076 				_handle_dependencies_for_modified_fed(
3077 							added_clusters,
3078 							removed_clusters);
3079 				unlock_slurmctld(fedr_jobw_lock);
3080 			}
3081 			break;
3082 		}
3083 		slurmdb_destroy_federation_rec(fed);
3084 	}
3085 
3086 	if (!fed && fed_mgr_fed_rec) {
3087 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
3088 			info("Not part of any federation");
3089 		lock_slurmctld(fedw_jobw_lock);
3090 		_cleanup_removed_origin_jobs();
3091 		_leave_federation();
3092 		unlock_slurmctld(fedw_jobw_lock);
3093 	}
3094 	slurm_mutex_unlock(&update_mutex);
3095 	return SLURM_SUCCESS;
3096 }
3097 
_pack_fed_job_info(fed_job_info_t * job_info,Buf buffer,uint16_t protocol_version)3098 static void _pack_fed_job_info(fed_job_info_t *job_info, Buf buffer,
3099 			       uint16_t protocol_version)
3100 {
3101 	int i;
3102 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3103 		pack32(job_info->cluster_lock, buffer);
3104 		pack32(job_info->job_id, buffer);
3105 		pack64(job_info->siblings_active, buffer);
3106 		pack64(job_info->siblings_viable, buffer);
3107 
3108 		for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3109 			pack32(job_info->updating_sibs[i], buffer);
3110 		for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3111 			pack_time(job_info->updating_time[i], buffer);
3112 	} else {
3113 		error("%s: protocol_version %hu not supported.",
3114 		      __func__, protocol_version);
3115 	}
3116 }
3117 
_unpack_fed_job_info(fed_job_info_t ** job_info_pptr,Buf buffer,uint16_t protocol_version)3118 static int _unpack_fed_job_info(fed_job_info_t **job_info_pptr, Buf buffer,
3119 				 uint16_t protocol_version)
3120 {
3121 	int i;
3122 	fed_job_info_t *job_info = xmalloc(sizeof(fed_job_info_t));
3123 
3124 	*job_info_pptr = job_info;
3125 
3126 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3127 		safe_unpack32(&job_info->cluster_lock, buffer);
3128 		safe_unpack32(&job_info->job_id, buffer);
3129 		safe_unpack64(&job_info->siblings_active, buffer);
3130 		safe_unpack64(&job_info->siblings_viable, buffer);
3131 
3132 		for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3133 			safe_unpack32(&job_info->updating_sibs[i], buffer);
3134 		for (i = 0; i <= MAX_FED_CLUSTERS; i++)
3135 			safe_unpack_time(&job_info->updating_time[i], buffer);
3136 	} else {
3137 		error("%s: protocol_version %hu not supported.",
3138 		      __func__, protocol_version);
3139 		goto unpack_error;
3140 	}
3141 
3142 	return SLURM_SUCCESS;
3143 
3144 unpack_error:
3145 	xfree(job_info);
3146 	*job_info_pptr = NULL;
3147 	return SLURM_ERROR;
3148 }
3149 
_dump_fed_job_list(Buf buffer,uint16_t protocol_version)3150 static void _dump_fed_job_list(Buf buffer, uint16_t protocol_version)
3151 {
3152 	uint32_t count = NO_VAL;
3153 	fed_job_info_t *fed_job_info;
3154 
3155 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3156 		/*
3157 		 * Need to be in the lock to prevent the window between getting
3158 		 * the count and actually looping on the list.
3159 		 */
3160 		slurm_mutex_lock(&fed_job_list_mutex);
3161 		if (fed_job_list)
3162 			count = list_count(fed_job_list);
3163 		else
3164 			count = NO_VAL;
3165 
3166 		pack32(count, buffer);
3167 		if (count && (count != NO_VAL)) {
3168 			ListIterator itr = list_iterator_create(fed_job_list);
3169 			while ((fed_job_info = list_next(itr))) {
3170 				_pack_fed_job_info(fed_job_info, buffer,
3171 						   protocol_version);
3172 			}
3173 			list_iterator_destroy(itr);
3174 		}
3175 		slurm_mutex_unlock(&fed_job_list_mutex);
3176 	} else {
3177 		error("%s: protocol_version %hu not supported.",
3178 		      __func__, protocol_version);
3179 	}
3180 }
3181 
_load_fed_job_list(Buf buffer,uint16_t protocol_version)3182 static List _load_fed_job_list(Buf buffer, uint16_t protocol_version)
3183 {
3184 	int i;
3185 	uint32_t count;
3186 	fed_job_info_t *tmp_job_info = NULL;
3187 	List tmp_list = NULL;
3188 
3189 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3190 		safe_unpack32(&count, buffer);
3191 		if (count > NO_VAL)
3192 			goto unpack_error;
3193 		if (count != NO_VAL) {
3194 			tmp_list = list_create(xfree_ptr);
3195 
3196 			for (i = 0; i < count; i++) {
3197 				if (_unpack_fed_job_info(&tmp_job_info, buffer,
3198 						     protocol_version))
3199 					goto unpack_error;
3200 				list_append(tmp_list, tmp_job_info);
3201 			}
3202 		}
3203 	} else {
3204 		error("%s: protocol_version %hu not supported.",
3205 		      __func__, protocol_version);
3206 	}
3207 
3208 	return tmp_list;
3209 
3210 unpack_error:
3211 	FREE_NULL_LIST(tmp_list);
3212 	return NULL;
3213 }
3214 
3215 /*
3216  * If this changes, then _pack_dep_msg() in slurm_protocol_pack.c probably
3217  * needs to change.
3218  */
_pack_remote_dep_job(job_record_t * job_ptr,Buf buffer,uint16_t protocol_version)3219 static void _pack_remote_dep_job(job_record_t *job_ptr, Buf buffer,
3220 				 uint16_t protocol_version)
3221 {
3222 	if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3223 		pack32(job_ptr->array_job_id, buffer);
3224 		pack32(job_ptr->array_task_id, buffer);
3225 		pack_dep_list(job_ptr->details->depend_list, buffer,
3226 			      protocol_version);
3227 		packstr(job_ptr->details->dependency, buffer);
3228 		packbool(job_ptr->array_recs ? true : false, buffer);
3229 		pack32(job_ptr->job_id, buffer);
3230 		packstr(job_ptr->name, buffer);
3231 		pack32(job_ptr->user_id, buffer);
3232 	} else {
3233 		error("%s: protocol_version %hu not supported.",
3234 		      __func__, protocol_version);
3235 	}
3236 }
3237 
3238 /*
3239  * If this changes, then _unpack_dep_msg() in slurm_protocol_pack.c probably
3240  * needs to change.
3241  */
_unpack_remote_dep_job(job_record_t ** job_pptr,Buf buffer,uint16_t protocol_version)3242 static int _unpack_remote_dep_job(job_record_t **job_pptr, Buf buffer,
3243 				  uint16_t protocol_version)
3244 {
3245 	uint32_t uint32_tmp;
3246 	bool is_array;
3247 	job_record_t *job_ptr;
3248 
3249 	xassert(job_pptr);
3250 
3251 	job_ptr = xmalloc(sizeof *job_ptr);
3252 	job_ptr->magic = JOB_MAGIC;
3253 	job_ptr->details = xmalloc(sizeof *(job_ptr->details));
3254 	job_ptr->details->magic = DETAILS_MAGIC;
3255 	job_ptr->fed_details = xmalloc(sizeof *(job_ptr->fed_details));
3256 	*job_pptr = job_ptr;
3257 
3258 	if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3259 		safe_unpack32(&job_ptr->array_job_id, buffer);
3260 		safe_unpack32(&job_ptr->array_task_id, buffer);
3261 		unpack_dep_list(&job_ptr->details->depend_list, buffer,
3262 				protocol_version);
3263 		safe_unpackstr_xmalloc(&job_ptr->details->dependency,
3264 				       &uint32_tmp, buffer);
3265 		safe_unpackbool(&is_array, buffer);
3266 		if (is_array)
3267 			job_ptr->array_recs =
3268 				xmalloc(sizeof *(job_ptr->array_recs));
3269 		safe_unpack32(&job_ptr->job_id, buffer);
3270 		safe_unpackstr_xmalloc(&job_ptr->name, &uint32_tmp, buffer);
3271 		safe_unpack32(&job_ptr->user_id, buffer);
3272 	} else {
3273 		error("%s: protocol_version %hu not supported.",
3274 		      __func__, protocol_version);
3275 		goto unpack_error;
3276 	}
3277 
3278 	return SLURM_SUCCESS;
3279 
3280 unpack_error:
3281 	_destroy_dep_job(job_ptr);
3282 	*job_pptr = NULL;
3283 	return SLURM_ERROR;
3284 }
3285 
_dump_remote_dep_job_list(Buf buffer,uint16_t protocol_version)3286 static void _dump_remote_dep_job_list(Buf buffer, uint16_t protocol_version)
3287 {
3288 	uint32_t count = NO_VAL;
3289 	job_record_t *job_ptr;
3290 
3291 	if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3292 		slurm_mutex_lock(&dep_job_list_mutex);
3293 		if (remote_dep_job_list)
3294 			count = list_count(remote_dep_job_list);
3295 		else
3296 			count = NO_VAL;
3297 		pack32(count, buffer);
3298 		if (count && (count != NO_VAL)) {
3299 			ListIterator itr =
3300 				list_iterator_create(remote_dep_job_list);
3301 			while ((job_ptr = list_next(itr)))
3302 				_pack_remote_dep_job(job_ptr, buffer,
3303 						     protocol_version);
3304 			list_iterator_destroy(itr);
3305 		}
3306 		slurm_mutex_unlock(&dep_job_list_mutex);
3307 	} else {
3308 		error("%s: protocol_version %hu not supported.",
3309 		      __func__, protocol_version);
3310 	}
3311 }
3312 
_load_remote_dep_job_list(Buf buffer,uint16_t protocol_version)3313 static List _load_remote_dep_job_list(Buf buffer, uint16_t protocol_version)
3314 {
3315 	uint32_t count, i;
3316 	List tmp_list = NULL;
3317 	job_record_t *job_ptr = NULL;
3318 
3319 	if (protocol_version >= SLURM_20_02_PROTOCOL_VERSION) {
3320 		safe_unpack32(&count, buffer);
3321 		if (count > NO_VAL)
3322 			goto unpack_error;
3323 		if (count != NO_VAL) {
3324 			tmp_list = list_create(_destroy_dep_job);
3325 			for (i = 0; i < count; i++) {
3326 				if (_unpack_remote_dep_job(&job_ptr, buffer,
3327 							   protocol_version))
3328 					goto unpack_error;
3329 				list_append(tmp_list, job_ptr);
3330 			}
3331 		}
3332 	} else if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
3333 		/*
3334 		 * This function didn't exist until 20.02. Add this block
3335 		 * to silence errors caused by upgrading an 18.08 or 19.05
3336 		 * versioned state file.
3337 		 */
3338 		debug3("%s: old protocol version", __func__);
3339 	} else {
3340 		error("%s: protocol_version %hu not supported.",
3341 		      __func__, protocol_version);
3342 		goto unpack_error;
3343 	}
3344 	return tmp_list;
3345 
3346 unpack_error:
3347 	FREE_NULL_LIST(tmp_list);
3348 	return NULL;
3349 }
3350 
fed_mgr_state_save(char * state_save_location)3351 extern int fed_mgr_state_save(char *state_save_location)
3352 {
3353 	int error_code = 0, log_fd;
3354 	char *old_file = NULL, *new_file = NULL, *reg_file = NULL;
3355 	slurmctld_lock_t fed_read_lock = {
3356 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
3357 
3358 	Buf buffer = init_buf(0);
3359 
3360 	DEF_TIMERS;
3361 
3362 	START_TIMER;
3363 
3364 	/* write header: version, time */
3365 	pack16(SLURM_PROTOCOL_VERSION, buffer);
3366 	pack_time(time(NULL), buffer);
3367 
3368 	lock_slurmctld(fed_read_lock);
3369 	slurmdb_pack_federation_rec(fed_mgr_fed_rec, SLURM_PROTOCOL_VERSION,
3370 				    buffer);
3371 	unlock_slurmctld(fed_read_lock);
3372 
3373 	_dump_fed_job_list(buffer, SLURM_PROTOCOL_VERSION);
3374 	_dump_remote_dep_job_list(buffer, SLURM_PROTOCOL_VERSION);
3375 
3376 	/* write the buffer to file */
3377 	reg_file = xstrdup_printf("%s/%s", state_save_location,
3378 				  FED_MGR_STATE_FILE);
3379 	old_file = xstrdup_printf("%s.old", reg_file);
3380 	new_file = xstrdup_printf("%s.new", reg_file);
3381 
3382 	log_fd = creat(new_file, 0600);
3383 	if (log_fd < 0) {
3384 		error("Can't save state, create file %s error %m", new_file);
3385 		error_code = errno;
3386 	} else {
3387 		int pos = 0, nwrite = get_buf_offset(buffer), amount;
3388 		char *data = (char *)get_buf_data(buffer);
3389 		while (nwrite > 0) {
3390 			amount = write(log_fd, &data[pos], nwrite);
3391 			if ((amount < 0) && (errno != EINTR)) {
3392 				error("Error writing file %s, %m", new_file);
3393 				error_code = errno;
3394 				break;
3395 			}
3396 			nwrite -= amount;
3397 			pos    += amount;
3398 		}
3399 		fsync(log_fd);
3400 		close(log_fd);
3401 	}
3402 	if (error_code)
3403 		(void) unlink(new_file);
3404 	else {			/* file shuffle */
3405 		(void) unlink(old_file);
3406 		if (link(reg_file, old_file))
3407 			debug4("unable to create link for %s -> %s: %m",
3408 			       reg_file, old_file);
3409 		(void) unlink(reg_file);
3410 		if (link(new_file, reg_file))
3411 			debug4("unable to create link for %s -> %s: %m",
3412 			       new_file, reg_file);
3413 		(void) unlink(new_file);
3414 	}
3415 	xfree(old_file);
3416 	xfree(reg_file);
3417 	xfree(new_file);
3418 
3419 	free_buf(buffer);
3420 
3421 	END_TIMER2("fed_mgr_state_save");
3422 
3423 	return error_code;
3424 }
3425 
_state_load(char * state_save_location)3426 static slurmdb_federation_rec_t *_state_load(char *state_save_location)
3427 {
3428 	Buf buffer = NULL;
3429 	char *state_file;
3430 	time_t buf_time;
3431 	uint16_t ver = 0;
3432 	int error_code = SLURM_SUCCESS;
3433 	slurmdb_federation_rec_t *ret_fed = NULL;
3434 	List tmp_list = NULL;
3435 
3436 	slurmctld_lock_t job_read_lock = { .job = READ_LOCK };
3437 
3438 	state_file = xstrdup_printf("%s/%s", state_save_location,
3439 				    FED_MGR_STATE_FILE);
3440 	if (!(buffer = create_mmap_buf(state_file))) {
3441 		error("No fed_mgr state file (%s) to recover", state_file);
3442 		xfree(state_file);
3443 		return NULL;
3444 	}
3445 	xfree(state_file);
3446 
3447 	safe_unpack16(&ver, buffer);
3448 
3449 	debug3("Version in fed_mgr_state header is %u", ver);
3450 	if (ver > SLURM_PROTOCOL_VERSION || ver < SLURM_MIN_PROTOCOL_VERSION) {
3451 		if (!ignore_state_errors)
3452 			fatal("Can not recover fed_mgr state, incompatible version, got %u need > %u <= %u, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.",
3453 			      ver, SLURM_MIN_PROTOCOL_VERSION,
3454 			      SLURM_PROTOCOL_VERSION);
3455 		error("***********************************************");
3456 		error("Can not recover fed_mgr state, incompatible version, "
3457 		      "got %u need > %u <= %u", ver,
3458 		      SLURM_MIN_PROTOCOL_VERSION, SLURM_PROTOCOL_VERSION);
3459 		error("***********************************************");
3460 		free_buf(buffer);
3461 		return NULL;
3462 	}
3463 
3464 	safe_unpack_time(&buf_time, buffer);
3465 
3466 	error_code = slurmdb_unpack_federation_rec((void **)&ret_fed, ver,
3467 						   buffer);
3468 	if (error_code != SLURM_SUCCESS)
3469 		goto unpack_error;
3470 	else if (!ret_fed || !ret_fed->name ||
3471 		 !list_count(ret_fed->cluster_list)) {
3472 		slurmdb_destroy_federation_rec(ret_fed);
3473 		ret_fed = NULL;
3474 		debug("No feds to retrieve from state");
3475 	} else {
3476 		/* We want to free the connections here since they don't exist
3477 		 * anymore, but they were packed when state was saved. */
3478 		slurmdb_cluster_rec_t *cluster;
3479 		ListIterator itr = list_iterator_create(
3480 			ret_fed->cluster_list);
3481 		while ((cluster = list_next(itr))) {
3482 			slurm_persist_conn_destroy(cluster->fed.recv);
3483 			cluster->fed.recv = NULL;
3484 			slurm_persist_conn_destroy(cluster->fed.send);
3485 			cluster->fed.send = NULL;
3486 		}
3487 		list_iterator_destroy(itr);
3488 	}
3489 
3490 	/* Load in fed_job_list and transfer objects to actual fed_job_list only
3491 	 * if there is an actual job for the job */
3492 	if ((tmp_list = _load_fed_job_list(buffer, ver))) {
3493 		fed_job_info_t *tmp_info;
3494 
3495 		slurm_mutex_lock(&fed_job_list_mutex);
3496 		if (fed_job_list) {
3497 			lock_slurmctld(job_read_lock);
3498 			while ((tmp_info = list_pop(tmp_list))) {
3499 				if (find_job_record(tmp_info->job_id))
3500 					list_append(fed_job_list, tmp_info);
3501 				else
3502 					xfree(tmp_info);
3503 			}
3504 			unlock_slurmctld(job_read_lock);
3505 		}
3506 		slurm_mutex_unlock(&fed_job_list_mutex);
3507 
3508 	}
3509 	FREE_NULL_LIST(tmp_list);
3510 
3511 	/*
3512 	 * Load in remote_dep_job_list and transfer to actual
3513 	 * remote_dep_job_list. If the actual list already has that job,
3514 	 * just throw away this one.
3515 	 */
3516 	if ((tmp_list = _load_remote_dep_job_list(buffer, ver))) {
3517 		job_record_t *tmp_dep_job;
3518 		slurm_mutex_lock(&dep_job_list_mutex);
3519 		while ((tmp_dep_job = list_pop(tmp_list))) {
3520 			if (!remote_dep_job_list)
3521 				remote_dep_job_list =
3522 					list_create(_destroy_dep_job);
3523 			if (!list_find_first(remote_dep_job_list,
3524 					    _find_job_by_id,
3525 					    &tmp_dep_job->job_id) &&
3526 			    tmp_dep_job->details->dependency) {
3527 				list_append(remote_dep_job_list, tmp_dep_job);
3528 			} /* else it will get free'd with FREE_NULL_LIST */
3529 		}
3530 		slurm_mutex_unlock(&dep_job_list_mutex);
3531 	}
3532 	FREE_NULL_LIST(tmp_list);
3533 
3534 	free_buf(buffer);
3535 
3536 	return ret_fed;
3537 
3538 unpack_error:
3539 	if (!ignore_state_errors)
3540 		fatal("Incomplete fed_mgr state file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
3541 	error("Incomplete fed_mgr state file");
3542 	free_buf(buffer);
3543 
3544 	return NULL;
3545 }
3546 
3547 /*
3548  * Returns federated job id (<local id> + <cluster id>.
3549  * Bits  0-25: Local job id
3550  * Bits 26-31: Cluster id
3551  */
fed_mgr_get_job_id(uint32_t orig)3552 extern uint32_t fed_mgr_get_job_id(uint32_t orig)
3553 {
3554 	if (!fed_mgr_cluster_rec)
3555 		return orig;
3556 	return orig + (fed_mgr_cluster_rec->fed.id << FED_MGR_CLUSTER_ID_BEGIN);
3557 }
3558 
3559 /*
3560  * Returns the local job id from a federated job id.
3561  */
fed_mgr_get_local_id(uint32_t id)3562 extern uint32_t fed_mgr_get_local_id(uint32_t id)
3563 {
3564 	return id & MAX_JOB_ID;
3565 }
3566 
3567 /*
3568  * Returns the cluster id from a federated job id.
3569  */
fed_mgr_get_cluster_id(uint32_t id)3570 extern uint32_t fed_mgr_get_cluster_id(uint32_t id)
3571 {
3572 	return id >> FED_MGR_CLUSTER_ID_BEGIN;
3573 }
3574 
fed_mgr_add_sibling_conn(slurm_persist_conn_t * persist_conn,char ** out_buffer)3575 extern int fed_mgr_add_sibling_conn(slurm_persist_conn_t *persist_conn,
3576 				    char **out_buffer)
3577 {
3578 	slurmdb_cluster_rec_t *cluster = NULL;
3579 	slurmctld_lock_t fed_read_lock = {
3580 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
3581 
3582 	lock_slurmctld(fed_read_lock);
3583 
3584 	if (!fed_mgr_fed_rec) {
3585 		unlock_slurmctld(fed_read_lock);
3586 		*out_buffer = xstrdup_printf(
3587 			"no fed_mgr_fed_rec on cluster %s yet.",
3588 			slurmctld_conf.cluster_name);
3589 		/* This really isn't an error.  If the cluster doesn't know it
3590 		 * is in a federation this could happen on the initial
3591 		 * connection from a sibling that found out about the addition
3592 		 * before I did.
3593 		 */
3594 		debug("%s: %s", __func__, *out_buffer);
3595 		/* The other side needs to see this as an error though or the
3596 		 * connection won't be completely established.
3597 		 */
3598 		return SLURM_ERROR;
3599 	}
3600 
3601 	if (!fed_mgr_cluster_rec) {
3602 		unlock_slurmctld(fed_read_lock);
3603 		*out_buffer = xstrdup_printf(
3604 			"no fed_mgr_cluster_rec on cluster %s?  "
3605 			"This should never happen",
3606 			slurmctld_conf.cluster_name);
3607 		error("%s: %s", __func__, *out_buffer);
3608 		return SLURM_ERROR;
3609 	}
3610 
3611 	if (!(cluster =
3612 	      fed_mgr_get_cluster_by_name(persist_conn->cluster_name))) {
3613 		unlock_slurmctld(fed_read_lock);
3614 		*out_buffer = xstrdup_printf(
3615 			"%s isn't a known sibling of ours, but tried to connect to cluster %s federation %s",
3616 			persist_conn->cluster_name, slurmctld_conf.cluster_name,
3617 			fed_mgr_fed_rec->name);
3618 		error("%s: %s", __func__, *out_buffer);
3619 		return SLURM_ERROR;
3620 	}
3621 
3622 	persist_conn->callback_fini = _persist_callback_fini;
3623 	persist_conn->flags |= PERSIST_FLAG_ALREADY_INITED;
3624 
3625 	/* If this pointer exists it will be handled by the persist_conn code,
3626 	 * don't free
3627 	 */
3628 	//slurm_persist_conn_destroy(cluster->fed.recv);
3629 
3630 	/* Preserve the persist_conn so that the cluster can get the remote
3631 	 * side's hostname and port to talk back to if it doesn't have it yet.
3632 	 * See _open_controller_conn().
3633 	 * Don't lock the cluster's lock here because a (almost)deadlock
3634 	 * could occur if this cluster is opening a connection to the remote
3635 	 * cluster at the same time the remote cluster is connecting to this
3636 	 * cluster since the both sides will have the mutex locked in order to
3637 	 * send/recv. If it did happen the connection will eventually
3638 	 * timeout and resolved itself. */
3639 	cluster->fed.recv = persist_conn;
3640 
3641 	slurm_persist_conn_recv_thread_init(persist_conn, -1, persist_conn);
3642 	_q_send_job_sync(cluster->name);
3643 
3644 	unlock_slurmctld(fed_read_lock);
3645 
3646 	return SLURM_SUCCESS;
3647 }
3648 
3649 /*
3650  * Convert comma separated list of cluster names to bitmap of cluster ids.
3651  */
_validate_cluster_names(char * clusters,uint64_t * cluster_bitmap)3652 static int _validate_cluster_names(char *clusters, uint64_t *cluster_bitmap)
3653 {
3654 	int rc = SLURM_SUCCESS;
3655 	uint64_t cluster_ids = 0;
3656 	List cluster_names;
3657 
3658 	xassert(clusters);
3659 
3660 	if (!xstrcasecmp(clusters, "all") ||
3661 	    (clusters && (*clusters == '\0'))) {
3662 		cluster_ids = _get_all_sibling_bits();
3663 		goto end_it;
3664 	}
3665 
3666 	cluster_names = list_create(xfree_ptr);
3667 	if (slurm_addto_char_list(cluster_names, clusters)) {
3668 		ListIterator itr = list_iterator_create(cluster_names);
3669 		char *cluster_name;
3670 		slurmdb_cluster_rec_t *sibling;
3671 
3672 		while ((cluster_name = list_next(itr))) {
3673 			if (!(sibling =
3674 			     fed_mgr_get_cluster_by_name(cluster_name))) {
3675 				error("didn't find requested cluster name %s in list of federated clusters",
3676 				      cluster_name);
3677 				rc = SLURM_ERROR;
3678 				break;
3679 			}
3680 
3681 			cluster_ids |= FED_SIBLING_BIT(sibling->fed.id);
3682 		}
3683 		list_iterator_destroy(itr);
3684 	}
3685 	FREE_NULL_LIST(cluster_names);
3686 
3687 end_it:
3688 	if (cluster_bitmap)
3689 		*cluster_bitmap = cluster_ids;
3690 
3691 	return rc;
3692 }
3693 
3694 /* Update remote sibling job's viable_siblings bitmaps.
3695  *
3696  * IN job_id      - job_id of job to update.
3697  * IN job_specs   - job_specs to update job_id with.
3698  * IN viable_sibs - viable siblings bitmap to send to sibling jobs.
3699  * IN update_sibs - bitmap of siblings to update.
3700  */
fed_mgr_update_job(uint32_t job_id,job_desc_msg_t * job_specs,uint64_t update_sibs,uid_t uid)3701 extern int fed_mgr_update_job(uint32_t job_id, job_desc_msg_t *job_specs,
3702 			      uint64_t update_sibs, uid_t uid)
3703 {
3704 	ListIterator sib_itr;
3705 	slurmdb_cluster_rec_t *sibling;
3706 	fed_job_info_t *job_info;
3707 
3708 	slurm_mutex_lock(&fed_job_list_mutex);
3709 	if (!(job_info = _find_fed_job_info(job_id))) {
3710 		error("Didn't find JobId=%u in fed_job_list", job_id);
3711 		slurm_mutex_unlock(&fed_job_list_mutex);
3712 		return SLURM_ERROR;
3713 	}
3714 
3715 	sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
3716 	while ((sibling = list_next(sib_itr))) {
3717 		/* Local is handled outside */
3718 		if (sibling == fed_mgr_cluster_rec)
3719 			continue;
3720 
3721 		if (!(update_sibs & FED_SIBLING_BIT(sibling->fed.id)))
3722 			continue;
3723 
3724 		if (_persist_update_job(sibling, job_id, job_specs, uid)) {
3725 			error("failed to update sibling job on sibling %s",
3726 			      sibling->name);
3727 			continue;
3728 		}
3729 
3730 		job_info->updating_sibs[sibling->fed.id]++;
3731 		job_info->updating_time[sibling->fed.id] = time(NULL);
3732 	}
3733 	list_iterator_destroy(sib_itr);
3734 	slurm_mutex_unlock(&fed_job_list_mutex);
3735 
3736 	return SLURM_SUCCESS;
3737 }
3738 
3739 /*
3740  * Submit sibling jobs to designated siblings.
3741  *
3742  * Will update job_desc->fed_siblings_active with the successful submissions.
3743  * Will not send to siblings if they are in
3744  * job_desc->fed_details->siblings_active.
3745  *
3746  * IN job_desc - job_desc containing job_id and fed_siblings_viable of job to be
3747  * 	submitted.
3748  * IN msg - contains the original job_desc buffer to send to the siblings.
3749  * IN alloc_only - true if just an allocation. false if a batch job.
3750  * IN dest_sibs - bitmap of viable siblings to submit to.
3751  * RET returns SLURM_SUCCESS if all siblings received the job successfully or
3752  * 	SLURM_ERROR if any siblings failed to receive the job. If a sibling
3753  * 	fails, then the successful siblings will be updated with the correct
3754  * 	sibling bitmap.
3755  */
_submit_sibling_jobs(job_desc_msg_t * job_desc,slurm_msg_t * msg,bool alloc_only,uint64_t dest_sibs)3756 static int _submit_sibling_jobs(job_desc_msg_t *job_desc, slurm_msg_t *msg,
3757 				bool alloc_only, uint64_t dest_sibs)
3758 {
3759 	int ret_rc = SLURM_SUCCESS;
3760 	ListIterator sib_itr;
3761 	sib_msg_t sib_msg = {0};
3762 	slurmdb_cluster_rec_t *sibling = NULL;
3763         slurm_msg_t req_msg;
3764 	uint16_t last_rpc_version = NO_VAL16;
3765 	Buf buffer = NULL;
3766 
3767 	xassert(job_desc);
3768 	xassert(msg);
3769 
3770 	sib_msg.data_buffer  = msg->buffer;
3771 	sib_msg.data_offset  = msg->body_offset;
3772 	sib_msg.data_type    = msg->msg_type;
3773 	sib_msg.data_version = msg->protocol_version;
3774 	sib_msg.fed_siblings = job_desc->fed_siblings_viable;
3775 	sib_msg.job_id       = job_desc->job_id;
3776 	sib_msg.resp_host    = job_desc->resp_host;
3777 	sib_msg.submit_host  = job_desc->alloc_node;
3778 
3779 	slurm_msg_t_init(&req_msg);
3780 	req_msg.msg_type = REQUEST_SIB_MSG;
3781 	req_msg.data     = &sib_msg;
3782 
3783 	sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
3784 	while ((sibling = list_next(sib_itr))) {
3785 		int rc;
3786 		if (sibling == fed_mgr_cluster_rec)
3787 			continue;
3788 
3789 		/* Only send to specific siblings */
3790 		if (!(dest_sibs & FED_SIBLING_BIT(sibling->fed.id)))
3791 			continue;
3792 
3793 		/* skip sibling if the sibling already has a job */
3794 		if (job_desc->fed_siblings_active &
3795 		    FED_SIBLING_BIT(sibling->fed.id))
3796 			continue;
3797 
3798 		if (alloc_only)
3799 			sib_msg.sib_msg_type = FED_JOB_SUBMIT_INT;
3800 		else
3801 			sib_msg.sib_msg_type = FED_JOB_SUBMIT_BATCH;
3802 
3803 		/* Pack message buffer according to sibling's rpc version. A
3804 		 * submission from a client will already have a buffer with the
3805 		 * packed job_desc from the client. If this controller is
3806 		 * submitting new sibling jobs then the buffer needs to be
3807 		 * packed according to each siblings rpc_version. */
3808 		if (!msg->buffer &&
3809 		    (last_rpc_version != sibling->rpc_version)) {
3810 			free_buf(buffer);
3811 			msg->protocol_version = sibling->rpc_version;
3812 			buffer = init_buf(BUF_SIZE);
3813 			pack_msg(msg, buffer);
3814 			sib_msg.data_buffer  = buffer;
3815 			sib_msg.data_version = msg->protocol_version;
3816 
3817 			last_rpc_version = sibling->rpc_version;
3818 		}
3819 
3820 		req_msg.protocol_version = sibling->rpc_version;
3821 
3822 		if (!(rc = _queue_rpc(sibling, &req_msg, 0, false)))
3823 			job_desc->fed_siblings_active |=
3824 				FED_SIBLING_BIT(sibling->fed.id);
3825 		ret_rc |= rc;
3826 	}
3827 	list_iterator_destroy(sib_itr);
3828 
3829 	free_buf(buffer);
3830 
3831 	return ret_rc;
3832 }
3833 
3834 /*
3835  * Prepare and submit new sibling jobs built from an existing job.
3836  *
3837  * IN job_ptr - job to submit to remote siblings.
3838  * IN dest_sibs - bitmap of viable siblings to submit to.
3839  */
_prepare_submit_siblings(job_record_t * job_ptr,uint64_t dest_sibs)3840 static int _prepare_submit_siblings(job_record_t *job_ptr, uint64_t dest_sibs)
3841 {
3842 	int rc = SLURM_SUCCESS;
3843 	uint32_t origin_id;
3844 	job_desc_msg_t *job_desc;
3845 	slurm_msg_t msg;
3846 
3847 	xassert(job_ptr);
3848 	xassert(job_ptr->details);
3849 
3850 	if (!_is_fed_job(job_ptr, &origin_id))
3851 		return SLURM_SUCCESS;
3852 
3853 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
3854 		info("submitting new siblings for %pJ", job_ptr);
3855 
3856 	if (!(job_desc = copy_job_record_to_job_desc(job_ptr)))
3857 		return SLURM_ERROR;
3858 
3859 	/*
3860 	 * Since job_ptr could have had defaults filled on the origin cluster,
3861 	 * clear these before sibling submission if default flag is set
3862 	 */
3863 	if (job_desc->bitflags & USE_DEFAULT_ACCT)
3864 		xfree(job_desc->account);
3865 	if (job_desc->bitflags & USE_DEFAULT_PART)
3866 		xfree(job_desc->partition);
3867 	if (job_desc->bitflags & USE_DEFAULT_QOS)
3868 		xfree(job_desc->qos);
3869 	if (job_desc->bitflags & USE_DEFAULT_WCKEY)
3870 		xfree(job_desc->wckey);
3871 
3872 	/* Have to pack job_desc into a buffer. _submit_sibling_jobs will pack
3873 	 * the job_desc according to each sibling's rpc_version. */
3874 	slurm_msg_t_init(&msg);
3875 	msg.msg_type         = REQUEST_RESOURCE_ALLOCATION;
3876 	msg.data             = job_desc;
3877 
3878 	if (_submit_sibling_jobs(job_desc, &msg, false, dest_sibs))
3879 		error("Failed to submit fed job to siblings");
3880 
3881 	/* mark this cluster as an active sibling */
3882 	if (job_desc->fed_siblings_viable &
3883 	    FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))
3884 		job_desc->fed_siblings_active |=
3885 			FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
3886 
3887 	/* Add new active jobs to siblings_active bitmap */
3888 	job_ptr->fed_details->siblings_active |= job_desc->fed_siblings_active;
3889 	update_job_fed_details(job_ptr);
3890 
3891 	/* free the environment since all strings are stored in one
3892 	 * xmalloced buffer */
3893 	if (job_desc->environment) {
3894 		xfree(job_desc->environment[0]);
3895 		xfree(job_desc->environment);
3896 		job_desc->env_size = 0;
3897 	}
3898 	slurm_free_job_desc_msg(job_desc);
3899 
3900 	return rc;
3901 }
3902 
_get_all_sibling_bits()3903 static uint64_t _get_all_sibling_bits()
3904 {
3905 	ListIterator itr;
3906 	slurmdb_cluster_rec_t *cluster;
3907 	uint64_t sib_bits = 0;
3908 
3909 	if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
3910 		goto fini;
3911 
3912 	itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
3913 	while ((cluster = list_next(itr))) {
3914 		sib_bits |= FED_SIBLING_BIT(cluster->fed.id);
3915 	}
3916 	list_iterator_destroy(itr);
3917 
3918 fini:
3919 	return sib_bits;
3920 }
3921 
_remove_inactive_sibs(void * object,void * arg)3922 static int _remove_inactive_sibs(void *object, void *arg)
3923 {
3924 	slurmdb_cluster_rec_t *sibling = (slurmdb_cluster_rec_t *)object;
3925 	uint64_t *viable_sibs = (uint64_t *)arg;
3926 	uint32_t cluster_state = sibling->fed.state;
3927 	int  base_state  = (cluster_state & CLUSTER_FED_STATE_BASE);
3928 	bool drain_flag  = (cluster_state & CLUSTER_FED_STATE_DRAIN);
3929 
3930 	if (drain_flag ||
3931 	    (base_state == CLUSTER_FED_STATE_INACTIVE))
3932 		*viable_sibs &= ~(FED_SIBLING_BIT(sibling->fed.id));
3933 
3934 	return SLURM_SUCCESS;
3935 }
3936 
_get_viable_sibs(char * req_clusters,uint64_t feature_sibs,bool is_array_job,char ** err_msg)3937 static uint64_t _get_viable_sibs(char *req_clusters, uint64_t feature_sibs,
3938 				 bool is_array_job, char **err_msg)
3939 {
3940 	uint64_t viable_sibs = _get_all_sibling_bits();
3941 	if (req_clusters)
3942 		_validate_cluster_names(req_clusters, &viable_sibs);
3943 	if (feature_sibs)
3944 		viable_sibs &= feature_sibs;
3945 
3946 	/* filter out clusters that are inactive or draining */
3947 	list_for_each(fed_mgr_fed_rec->cluster_list, _remove_inactive_sibs,
3948 		      &viable_sibs);
3949 
3950 	if (is_array_job) { /* lock array jobs to local cluster */
3951 		uint32_t tmp_viable = viable_sibs &
3952 			FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
3953 		if (viable_sibs && !tmp_viable) {
3954 			info("federated job arrays must run on local cluster");
3955 			if (err_msg) {
3956 				xfree(*err_msg);
3957 				xstrfmtcat(*err_msg, "federated job arrays must run on local cluster");
3958 			}
3959 		}
3960 		viable_sibs = tmp_viable;
3961 	}
3962 
3963 	return viable_sibs;
3964 }
3965 
_add_remove_sibling_jobs(job_record_t * job_ptr)3966 static void _add_remove_sibling_jobs(job_record_t *job_ptr)
3967 {
3968 	fed_job_info_t *job_info;
3969 	uint32_t origin_id = 0;
3970 	uint64_t new_sibs = 0, old_sibs = 0, add_sibs = 0,
3971 		 rem_sibs = 0, feature_sibs = 0;
3972 
3973 	xassert(job_ptr);
3974 
3975 	origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
3976 
3977 	/* if job is not pending then remove removed siblings and add
3978 	 * new siblings. */
3979 	old_sibs = job_ptr->fed_details->siblings_active;
3980 
3981 	_validate_cluster_features(job_ptr->details->cluster_features,
3982 				   &feature_sibs);
3983 
3984 	new_sibs = _get_viable_sibs(job_ptr->clusters, feature_sibs,
3985 				    job_ptr->array_recs ? true : false, NULL);
3986 	job_ptr->fed_details->siblings_viable = new_sibs;
3987 
3988 	add_sibs =  new_sibs & ~old_sibs;
3989 	rem_sibs = ~new_sibs &  old_sibs;
3990 
3991 	if (rem_sibs) {
3992 		time_t now = time(NULL);
3993 		_revoke_sibling_jobs(job_ptr->job_id,
3994 				     fed_mgr_cluster_rec->fed.id,
3995 				     rem_sibs, now);
3996 		if (fed_mgr_is_origin_job(job_ptr) &&
3997 		    (rem_sibs & FED_SIBLING_BIT(origin_id))) {
3998 			fed_mgr_job_revoke(job_ptr, false, JOB_CANCELLED, 0,
3999 					   now);
4000 		}
4001 
4002 		job_ptr->fed_details->siblings_active &= ~rem_sibs;
4003 	}
4004 
4005 	/* Don't submit new sibilings if the job is held */
4006 	if (job_ptr->priority != 0 && add_sibs)
4007 		_prepare_submit_siblings(
4008 				job_ptr,
4009 				job_ptr->fed_details->siblings_viable);
4010 
4011 	/* unrevoke the origin job */
4012 	if (fed_mgr_is_origin_job(job_ptr) &&
4013 	    (add_sibs & FED_SIBLING_BIT(origin_id)))
4014 		job_ptr->job_state &= ~JOB_REVOKED;
4015 
4016 	/* Can't have the mutex while calling fed_mgr_job_revoke because it will
4017 	 * lock the mutex as well. */
4018 	slurm_mutex_lock(&fed_job_list_mutex);
4019 	if ((job_info = _find_fed_job_info(job_ptr->job_id))) {
4020 		job_info->siblings_viable =
4021 			job_ptr->fed_details->siblings_viable;
4022 		job_info->siblings_active =
4023 			job_ptr->fed_details->siblings_active;
4024 	}
4025 	slurm_mutex_unlock(&fed_job_list_mutex);
4026 
4027 	/* Update where sibling jobs are running */
4028 	update_job_fed_details(job_ptr);
4029 }
4030 
_job_has_pending_updates(fed_job_info_t * job_info)4031 static bool _job_has_pending_updates(fed_job_info_t *job_info)
4032 {
4033 	int i;
4034 	xassert(job_info);
4035 	static const int UPDATE_DELAY = 60;
4036 	time_t now = time(NULL);
4037 
4038 	for (i = 1; i <= MAX_FED_CLUSTERS; i++) {
4039 		if (job_info->updating_sibs[i]) {
4040 		    if (job_info->updating_time[i] > (now - UPDATE_DELAY)) {
4041 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4042 				info("JobId=%u is waiting for %d update responses from cluster id %d",
4043 				     job_info->job_id,
4044 				     job_info->updating_sibs[i], i);
4045 			return true;
4046 		    } else {
4047 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4048 				info("JobId=%u is had pending updates (%d) for cluster id %d, but haven't heard back from it for %ld seconds. Clearing the cluster's updating state",
4049 				     job_info->job_id,
4050 				     job_info->updating_sibs[i],
4051 				     i,
4052 				     now - job_info->updating_time[i]);
4053 			job_info->updating_sibs[i] = 0;
4054 		    }
4055 		}
4056 
4057 	}
4058 
4059 	return false;
4060 }
4061 
4062 /*
4063  * Validate requested job cluster features against each cluster's features.
4064  *
4065  * IN  spec_features  - cluster features that the job requested.
4066  * OUT cluster_bitmap - bitmap of clusters that have matching features.
4067  * RET SLURM_ERROR if no cluster has any of the requested features,
4068  *     SLURM_SUCESS otherwise.
4069  */
_validate_cluster_features(char * spec_features,uint64_t * cluster_bitmap)4070 static int _validate_cluster_features(char *spec_features,
4071 				      uint64_t *cluster_bitmap)
4072 {
4073 	int rc = SLURM_SUCCESS;
4074 	bool negative_logic = false;
4075 	uint64_t feature_sibs = 0;
4076 	char *feature = NULL;
4077 	slurmdb_cluster_rec_t *sib;
4078 	List req_features;
4079 	ListIterator feature_itr, sib_itr;
4080 
4081 	if (!spec_features || !fed_mgr_fed_rec) {
4082 		if (cluster_bitmap)
4083 			*cluster_bitmap = feature_sibs;
4084 		return rc;
4085 	}
4086 
4087 	if (*spec_features == '\0') {
4088 		if (cluster_bitmap)
4089 			*cluster_bitmap = _get_all_sibling_bits();
4090 		return rc;
4091 	}
4092 
4093 	req_features = list_create(xfree_ptr);
4094 	slurm_addto_char_list(req_features, spec_features);
4095 
4096 	feature_itr = list_iterator_create(req_features);
4097 	sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
4098 
4099 	feature = list_peek(req_features);
4100 	if (feature && feature[0] == '!') {
4101 		feature_sibs = _get_all_sibling_bits();
4102 		negative_logic = true;
4103 	}
4104 
4105 	while ((feature = list_next(feature_itr))) {
4106 		if (negative_logic && feature[0] == '!')
4107 			feature++;
4108 		bool found = false;
4109 		while ((sib = list_next(sib_itr))) {
4110 			if (sib->fed.feature_list &&
4111 			    list_find_first(sib->fed.feature_list,
4112 					    slurm_find_char_in_list, feature)) {
4113 				if (negative_logic) {
4114 					feature_sibs &=
4115 						~FED_SIBLING_BIT(sib->fed.id);
4116 				} else {
4117 					feature_sibs |=
4118 						FED_SIBLING_BIT(sib->fed.id);
4119 				}
4120 				found = true;
4121 			}
4122 		}
4123 
4124 		if (!found) {
4125 			error("didn't find at least one cluster with the feature '%s'",
4126 			      feature);
4127 			rc = SLURM_ERROR;
4128 			goto end_features;
4129 		}
4130 		if (negative_logic && !feature_sibs) {
4131 			error("eliminated all viable clusters with constraint '%s'",
4132 			      feature);
4133 			rc = SLURM_ERROR;
4134 			goto end_features;
4135 		}
4136 		list_iterator_reset(sib_itr);
4137 	}
4138 end_features:
4139 	list_iterator_destroy(sib_itr);
4140 	list_iterator_destroy(feature_itr);
4141 	FREE_NULL_LIST(req_features);
4142 
4143 	if (cluster_bitmap)
4144 		*cluster_bitmap = feature_sibs;
4145 
4146 	return rc;
4147 }
4148 
fed_mgr_remove_remote_dependencies(job_record_t * job_ptr)4149 extern void fed_mgr_remove_remote_dependencies(job_record_t *job_ptr)
4150 {
4151 	uint32_t origin_id;
4152 
4153 	if (!_is_fed_job(job_ptr, &origin_id) ||
4154 	    !fed_mgr_is_origin_job(job_ptr) || !job_ptr->details)
4155 		return;
4156 
4157 	fed_mgr_submit_remote_dependencies(job_ptr, false, true);
4158 }
4159 
_add_to_send_list(void * object,void * arg)4160 static int _add_to_send_list(void *object, void *arg)
4161 {
4162 	depend_spec_t *dependency = (depend_spec_t *)object;
4163 	uint64_t *send_sib_bits = (uint64_t *)arg;
4164 	uint32_t cluster_id;
4165 
4166 	if ((dependency->depend_type == SLURM_DEPEND_SINGLETON) &&
4167 	    !disable_remote_singleton) {
4168 		*send_sib_bits |= _get_all_sibling_bits();
4169 		/* Negative value short-circuits list_for_each */
4170 		return -1;
4171 	}
4172 	if (!(dependency->depend_flags & SLURM_FLAGS_REMOTE) ||
4173 	    (dependency->depend_state != DEPEND_NOT_FULFILLED))
4174 		return SLURM_SUCCESS;
4175 	cluster_id = fed_mgr_get_cluster_id(dependency->job_id);
4176 	*send_sib_bits |= FED_SIBLING_BIT(cluster_id);
4177 	return SLURM_SUCCESS;
4178 }
4179 
4180 /*
4181  * Send dependencies of job_ptr to siblings.
4182  *
4183  * If the dependency string is NULL, that means we're telling the siblings
4184  * to delete that dependency. Send empty string to indicate that.
4185  *
4186  * If send_all_sibs == true, then send dependencies to all siblings. Otherwise,
4187  * only send dependencies to siblings that own the remote jobs that job_ptr
4188  * depends on. I.e., if a sibling doesn't own any jobs that job_ptr depends on,
4189  * we won't send job_ptr's dependencies to that sibling.
4190  *
4191  * If clear_dependencies == true, then clear the dependencies on the siblings
4192  * where dependencies reside. In this case, use the job's dependency list to
4193  * find out which siblings to send the RPC to if the list is non-NULL. If the
4194  * list is NULL, then we have to send to all siblings.
4195  */
fed_mgr_submit_remote_dependencies(job_record_t * job_ptr,bool send_all_sibs,bool clear_dependencies)4196 extern int fed_mgr_submit_remote_dependencies(job_record_t *job_ptr,
4197 					      bool send_all_sibs,
4198 					      bool clear_dependencies)
4199 {
4200 	int rc = SLURM_SUCCESS;
4201 	uint64_t send_sib_bits = 0;
4202 	ListIterator sib_itr;
4203 	slurm_msg_t req_msg;
4204 	dep_msg_t dep_msg = { 0 };
4205 	slurmdb_cluster_rec_t *sibling;
4206 	uint32_t origin_id;
4207 
4208 	if (!_is_fed_job(job_ptr, &origin_id))
4209 		return SLURM_SUCCESS;
4210 
4211 	xassert(job_ptr->details);
4212 
4213 	dep_msg.job_id = job_ptr->job_id;
4214 	dep_msg.job_name = job_ptr->name;
4215 	dep_msg.array_job_id = job_ptr->array_job_id;
4216 	dep_msg.array_task_id = job_ptr->array_task_id;
4217 	dep_msg.is_array = job_ptr->array_recs ? true : false;
4218 	dep_msg.user_id = job_ptr->user_id;
4219 
4220 	if (!job_ptr->details->dependency || clear_dependencies)
4221 		/*
4222 		 * Since we have to pack these values, set dependency to empty
4223 		 * string and set depend_list to an empty list so we have
4224 		 * data to pack.
4225 		 */
4226 		dep_msg.dependency = "";
4227 	else
4228 		dep_msg.dependency = job_ptr->details->dependency;
4229 
4230 	slurm_msg_t_init(&req_msg);
4231 	req_msg.msg_type = REQUEST_SEND_DEP;
4232 	req_msg.data = &dep_msg;
4233 
4234 	if (!job_ptr->details->depend_list)
4235 		send_all_sibs = true;
4236 	if (!send_all_sibs) {
4237 		list_for_each(job_ptr->details->depend_list,
4238 			      _add_to_send_list, &send_sib_bits);
4239 	}
4240 
4241 	sib_itr = list_iterator_create(fed_mgr_fed_rec->cluster_list);
4242 	while ((sibling = list_next(sib_itr))) {
4243 		if (sibling == fed_mgr_cluster_rec)
4244 			continue;
4245 		/*
4246 		 * If we aren't sending the dependency to all siblings and
4247 		 * there isn't a dependency on this sibling, don't send
4248 		 * an RPC to this sibling.
4249 		 */
4250 		if (!send_all_sibs &&
4251 		    !(send_sib_bits & FED_SIBLING_BIT(sibling->fed.id)))
4252 			continue;
4253 
4254 		req_msg.protocol_version = sibling->rpc_version;
4255 		rc |= _queue_rpc(sibling, &req_msg, 0, false);
4256 	}
4257 	list_iterator_destroy(sib_itr);
4258 	return rc;
4259 }
4260 
4261 /* submit a federated job.
4262  *
4263  * IN msg - msg that contains packed job_desc msg to send to siblings.
4264  * IN job_desc - original job_desc msg.
4265  * IN alloc_only - true if requesting just an allocation (srun/salloc).
4266  * IN uid - uid of user requesting allocation.
4267  * IN protocol_version - version of the code the caller is using
4268  * OUT job_id_ptr - job_id of allocated job
4269  * OUT alloc_code - error_code returned from job_allocate
4270  * OUT err_msg - error message returned if any
4271  * RET returns SLURM_SUCCESS if the allocation was successful, SLURM_ERROR
4272  * 	otherwise.
4273  */
fed_mgr_job_allocate(slurm_msg_t * msg,job_desc_msg_t * job_desc,bool alloc_only,uid_t uid,uint16_t protocol_version,uint32_t * job_id_ptr,int * alloc_code,char ** err_msg)4274 extern int fed_mgr_job_allocate(slurm_msg_t *msg, job_desc_msg_t *job_desc,
4275 				bool alloc_only, uid_t uid,
4276 				uint16_t protocol_version,
4277 				uint32_t *job_id_ptr, int *alloc_code,
4278 				char **err_msg)
4279 {
4280 	uint64_t feature_sibs = 0;
4281 	job_record_t *job_ptr = NULL;
4282 	bool job_held = false;
4283 
4284 	xassert(msg);
4285 	xassert(job_desc);
4286 	xassert(job_id_ptr);
4287 	xassert(alloc_code);
4288 	xassert(err_msg);
4289 
4290 	if (job_desc->job_id != NO_VAL) {
4291 		error("attempt by uid %u to set JobId=%u. "
4292 		      "specifying a job_id is not allowed when in a federation",
4293 		      uid, job_desc->job_id);
4294 		*alloc_code = ESLURM_INVALID_JOB_ID;
4295 		return SLURM_ERROR;
4296 	}
4297 
4298 	if (_validate_cluster_features(job_desc->cluster_features,
4299 				       &feature_sibs)) {
4300 		*alloc_code = ESLURM_INVALID_CLUSTER_FEATURE;
4301 		return SLURM_ERROR;
4302 	}
4303 
4304 	/* get job_id now. Can't submit job to get job_id as job_allocate will
4305 	 * change the job_desc. */
4306 	job_desc->job_id = get_next_job_id(false);
4307 
4308 	/* Set viable siblings */
4309 	job_desc->fed_siblings_viable =
4310 		_get_viable_sibs(job_desc->clusters, feature_sibs,
4311 				 (job_desc->array_inx) ? true : false, err_msg);
4312 	if (!job_desc->fed_siblings_viable) {
4313 		*alloc_code = ESLURM_FED_NO_VALID_CLUSTERS;
4314 		return SLURM_ERROR;
4315 	}
4316 
4317 	/* ensure that fed_siblings_active is clear since this is a new job */
4318 	job_desc->fed_siblings_active = 0;
4319 
4320 	/*
4321 	 * Submit local job first. Then submit to all siblings. If the local job
4322 	 * fails, then don't worry about sending to the siblings.
4323 	 */
4324 	job_desc->het_job_offset = NO_VAL;
4325 	*alloc_code = job_allocate(job_desc, job_desc->immediate, false, NULL,
4326 				   alloc_only, uid, &job_ptr, err_msg,
4327 				   protocol_version);
4328 
4329 	if (!job_ptr || (*alloc_code && job_ptr->job_state == JOB_FAILED)) {
4330 		/* There may be an rc but the job won't be failed. Will sit in
4331 		 * qeueue */
4332 		info("failed to submit federated job to local cluster");
4333 		return SLURM_ERROR;
4334 	}
4335 
4336 	/* mark this cluster as an active sibling if it's in the viable list */
4337 	if (job_desc->fed_siblings_viable &
4338 	    FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))
4339 		job_desc->fed_siblings_active |=
4340 			FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
4341 
4342 	/* Job is not eligible on origin cluster - mark as revoked. */
4343 	if (!(job_ptr->fed_details->siblings_viable &
4344 	      FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
4345 		job_ptr->job_state |= JOB_REVOKED;
4346 
4347 	*job_id_ptr = job_ptr->job_id;
4348 
4349 	/*
4350 	 * Don't submit a job with dependencies to siblings - the origin will
4351 	 * test job dependencies and submit the job to siblings when all
4352 	 * dependencies are fulfilled.
4353 	 * job_allocate() calls job_independent() which sets the JOB_DEPENDENT
4354 	 * flag if the job is dependent, so check this after job_allocate().
4355 	 */
4356 	if ((job_desc->priority == 0) || (job_ptr->bit_flags & JOB_DEPENDENT))
4357 		job_held = true;
4358 
4359 	if (job_held) {
4360 		info("Submitted held federated %pJ to %s(self)",
4361 		     job_ptr, fed_mgr_cluster_rec->name);
4362 	} else {
4363 		info("Submitted %sfederated %pJ to %s(self)",
4364 		     (!(job_ptr->fed_details->siblings_viable &
4365 			FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)) ?
4366 		      "tracking " : ""),
4367 		     job_ptr, fed_mgr_cluster_rec->name);
4368 	}
4369 
4370 	/* Update job before submitting sibling jobs so that it will show the
4371 	 * viable siblings and potentially active local job */
4372 	job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active;
4373 	update_job_fed_details(job_ptr);
4374 
4375 	if (!job_held && _submit_sibling_jobs(
4376 				job_desc, msg, alloc_only,
4377 				job_ptr->fed_details->siblings_viable))
4378 		info("failed to submit sibling job to one or more siblings");
4379 	/* Send remote dependencies to siblings */
4380 	if ((job_ptr->bit_flags & JOB_DEPENDENT) &&
4381 	    job_ptr->details && job_ptr->details->dependency)
4382 		if (fed_mgr_submit_remote_dependencies(job_ptr, false, false))
4383 			error("%s: %pJ Failed to send remote dependencies to some or all siblings.",
4384 			      __func__, job_ptr);
4385 
4386 	job_ptr->fed_details->siblings_active = job_desc->fed_siblings_active;
4387 	update_job_fed_details(job_ptr);
4388 
4389 	/* Add record to fed job table */
4390 	add_fed_job_info(job_ptr);
4391 
4392 	return SLURM_SUCCESS;
4393 }
4394 
4395 /* Tests if the job is a tracker only federated job.
4396  * Tracker only job: a job that shouldn't run on the local cluster but should be
4397  * kept around to facilitate communications for it's sibling jobs on other
4398  * clusters.
4399  */
fed_mgr_is_tracker_only_job(job_record_t * job_ptr)4400 extern bool fed_mgr_is_tracker_only_job(job_record_t *job_ptr)
4401 {
4402 	bool rc = false;
4403 	uint32_t origin_id;
4404 
4405 	xassert(job_ptr);
4406 
4407 	if (!_is_fed_job(job_ptr, &origin_id))
4408 		return rc;
4409 
4410 	if (job_ptr->fed_details &&
4411 	    (origin_id == fed_mgr_cluster_rec->fed.id) &&
4412 	    job_ptr->fed_details->siblings_active &&
4413 	    (!(job_ptr->fed_details->siblings_active &
4414 	      FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id))))
4415 		rc = true;
4416 
4417 	if (job_ptr->fed_details &&
4418 	    job_ptr->fed_details->cluster_lock &&
4419 	    job_ptr->fed_details->cluster_lock != fed_mgr_cluster_rec->fed.id)
4420 		rc = true;
4421 
4422 	return rc;
4423 }
4424 
4425 /* Return the cluster name for the given cluster id.
4426  * Must xfree returned string
4427  */
fed_mgr_get_cluster_name(uint32_t id)4428 extern char *fed_mgr_get_cluster_name(uint32_t id)
4429 {
4430 	slurmdb_cluster_rec_t *sibling;
4431 	char *name = NULL;
4432 
4433 	if ((sibling = fed_mgr_get_cluster_by_id(id))) {
4434 		name = xstrdup(sibling->name);
4435 	}
4436 
4437 	return name;
4438 }
4439 
_is_fed_job(job_record_t * job_ptr,uint32_t * origin_id)4440 static int _is_fed_job(job_record_t *job_ptr, uint32_t *origin_id)
4441 {
4442 	xassert(job_ptr);
4443 	xassert(origin_id);
4444 
4445 	if (!fed_mgr_cluster_rec)
4446 		return false;
4447 
4448 	if ((!job_ptr->fed_details) ||
4449 	    (!(*origin_id = fed_mgr_get_cluster_id(job_ptr->job_id)))) {
4450 		debug2("job %pJ not a federated job", job_ptr);
4451 		return false;
4452 	}
4453 
4454 	return true;
4455 }
4456 
_job_unlock_spec_sibs(job_record_t * job_ptr,uint64_t spec_sibs)4457 static int _job_unlock_spec_sibs(job_record_t *job_ptr, uint64_t spec_sibs)
4458 {
4459 	uint32_t cluster_id = fed_mgr_cluster_rec->fed.id;
4460 	slurmdb_cluster_rec_t *sibling;
4461 	int sib_id = 1;
4462 
4463 	while (spec_sibs) {
4464 		if (!(spec_sibs & 1))
4465 			goto next_unlock;
4466 
4467 		if (fed_mgr_cluster_rec->fed.id == sib_id)
4468 			fed_mgr_job_lock_unset(job_ptr->job_id,
4469 					       cluster_id);
4470 		else if ((sibling = fed_mgr_get_cluster_by_id(sib_id)))
4471 			_persist_fed_job_unlock(sibling, job_ptr->job_id,
4472 						cluster_id);
4473 next_unlock:
4474 		spec_sibs >>= 1;
4475 		sib_id++;
4476 	}
4477 
4478 	return SLURM_SUCCESS;
4479 }
4480 
4481 /*
4482  * Return SLURM_SUCCESS if all siblings give lock to job; SLURM_ERROR otherwise.
4483  */
_job_lock_all_sibs(job_record_t * job_ptr)4484 static int _job_lock_all_sibs(job_record_t *job_ptr)
4485 {
4486 	slurmdb_cluster_rec_t *sibling = NULL;
4487 	int sib_id = 1;
4488 	bool all_said_yes = true;
4489 	uint64_t replied_sibs = 0, tmp_sibs = 0;
4490 	uint32_t origin_id, cluster_id;
4491 
4492 	xassert(job_ptr);
4493 
4494 	origin_id  = fed_mgr_get_cluster_id(job_ptr->job_id);
4495 	cluster_id = fed_mgr_cluster_rec->fed.id;
4496 
4497 	tmp_sibs = job_ptr->fed_details->siblings_viable &
4498 		   (~FED_SIBLING_BIT(origin_id));
4499 	while (tmp_sibs) {
4500 		if (!(tmp_sibs & 1))
4501 			goto next_lock;
4502 
4503 		if (cluster_id == sib_id) {
4504 			if (!fed_mgr_job_lock_set(job_ptr->job_id, cluster_id))
4505 				replied_sibs |= FED_SIBLING_BIT(sib_id);
4506 			else {
4507 				all_said_yes = false;
4508 				break;
4509 			}
4510 		} else if (!(sibling = fed_mgr_get_cluster_by_id(sib_id)) ||
4511 			   (!sibling->fed.send) ||
4512 			   (((slurm_persist_conn_t *)sibling->fed.send) < 0)) {
4513 			/*
4514 			 * Don't consider clusters that are down. They will sync
4515 			 * up later.
4516 			 */
4517 			goto next_lock;
4518 		} else if (!_persist_fed_job_lock(sibling, job_ptr->job_id,
4519 						  cluster_id)) {
4520 			replied_sibs |= FED_SIBLING_BIT(sib_id);
4521 		} else {
4522 			all_said_yes = false;
4523 			break;
4524 		}
4525 
4526 next_lock:
4527 		tmp_sibs >>= 1;
4528 		sib_id++;
4529 	}
4530 
4531 	/*
4532 	 * Have to talk to at least one other sibling -- if there is one -- to
4533 	 * start the job
4534 	 */
4535 	if (all_said_yes &&
4536 	    (!(job_ptr->fed_details->siblings_viable &
4537 	       ~FED_SIBLING_BIT(cluster_id)) ||
4538 	     (replied_sibs & ~(FED_SIBLING_BIT(cluster_id)))))
4539 		return SLURM_SUCCESS;
4540 
4541 	/* have to release the lock on those that said yes */
4542 	_job_unlock_spec_sibs(job_ptr, replied_sibs);
4543 
4544 	return SLURM_ERROR;
4545 }
4546 
_slurmdbd_conn_active()4547 static int _slurmdbd_conn_active()
4548 {
4549 	int active = 0;
4550 
4551 	if (acct_storage_g_get_data(acct_db_conn, ACCT_STORAGE_INFO_CONN_ACTIVE,
4552 				    &active) != SLURM_SUCCESS)
4553 		active = 0;
4554 
4555 	return active;
4556 }
4557 
4558 /*
4559  * Attempt to grab the job's federation cluster lock so that the requesting
4560  * cluster can attempt to start to the job.
4561  *
4562  * IN job - job to lock
4563  * RET returns SLURM_SUCCESS if the lock was granted, SLURM_ERROR otherwise
4564  */
fed_mgr_job_lock(job_record_t * job_ptr)4565 extern int fed_mgr_job_lock(job_record_t *job_ptr)
4566 {
4567 	int rc = SLURM_SUCCESS;
4568 	uint32_t origin_id, cluster_id;
4569 
4570 	xassert(job_ptr);
4571 
4572 	if (!_is_fed_job(job_ptr, &origin_id))
4573 		return SLURM_SUCCESS;
4574 
4575 	cluster_id = fed_mgr_cluster_rec->fed.id;
4576 
4577 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4578 		info("attempting fed job lock on %pJ by cluster_id %d",
4579 		     job_ptr, cluster_id);
4580 
4581 	if (origin_id != fed_mgr_cluster_rec->fed.id) {
4582 		slurm_persist_conn_t *origin_conn = NULL;
4583 		slurmdb_cluster_rec_t *origin_cluster;
4584 		if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
4585 			info("Unable to find origin cluster for %pJ from origin id %d",
4586 			     job_ptr, origin_id);
4587 		} else
4588 			origin_conn = (slurm_persist_conn_t *)
4589 				       origin_cluster->fed.send;
4590 
4591 		/* Check dbd is up to make sure ctld isn't on an island. */
4592 		if (acct_db_conn && _slurmdbd_conn_active() &&
4593 		    (!origin_conn || (origin_conn->fd < 0))) {
4594 			rc = _job_lock_all_sibs(job_ptr);
4595 		} else if (origin_cluster) {
4596 			rc = _persist_fed_job_lock(origin_cluster,
4597 						   job_ptr->job_id,
4598 						   cluster_id);
4599 		} else {
4600 			rc = SLURM_ERROR;
4601 		}
4602 
4603 		if (!rc) {
4604 			job_ptr->fed_details->cluster_lock = cluster_id;
4605 			fed_mgr_job_lock_set(job_ptr->job_id, cluster_id);
4606 		}
4607 
4608 		return rc;
4609 	}
4610 
4611 	/* origin cluster */
4612 	rc = fed_mgr_job_lock_set(job_ptr->job_id, cluster_id);
4613 
4614 	return rc;
4615 }
4616 
fed_mgr_job_lock_set(uint32_t job_id,uint32_t cluster_id)4617 extern int fed_mgr_job_lock_set(uint32_t job_id, uint32_t cluster_id)
4618 {
4619 	int rc = SLURM_SUCCESS;
4620 	fed_job_info_t *job_info;
4621 
4622 	slurm_mutex_lock(&fed_job_list_mutex);
4623 
4624 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4625 		info("%s: attempting to set fed JobId=%u lock to %u",
4626 		     __func__, job_id, cluster_id);
4627 
4628 	if (!(job_info = _find_fed_job_info(job_id))) {
4629 		error("Didn't find JobId=%u in fed_job_list", job_id);
4630 		rc = SLURM_ERROR;
4631 	} else if (_job_has_pending_updates(job_info)) {
4632 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4633 			info("%s: cluster %u can't get cluster lock for JobId=%u because it has pending updates",
4634 			     __func__, cluster_id, job_id);
4635 		rc = SLURM_ERROR;
4636 	} else if (job_info->cluster_lock &&
4637 		   job_info->cluster_lock != cluster_id) {
4638 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4639 			info("%s: fed JobId=%u already locked by cluster %d",
4640 			     __func__, job_id, job_info->cluster_lock);
4641 		rc = SLURM_ERROR;
4642 	} else {
4643 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4644 			info("%s: fed JobId=%u locked by %u",
4645 			     __func__, job_id, cluster_id);
4646 
4647 		job_info->cluster_lock = cluster_id;
4648 	}
4649 
4650 	slurm_mutex_unlock(&fed_job_list_mutex);
4651 
4652 	return rc;
4653 }
4654 
fed_mgr_job_is_self_owned(job_record_t * job_ptr)4655 extern bool fed_mgr_job_is_self_owned(job_record_t *job_ptr)
4656 {
4657 	if (!fed_mgr_cluster_rec || !job_ptr->fed_details ||
4658 	    (job_ptr->fed_details->cluster_lock == fed_mgr_cluster_rec->fed.id))
4659 		return true;
4660 
4661 	return false;
4662 }
4663 
fed_mgr_job_is_locked(job_record_t * job_ptr)4664 extern bool fed_mgr_job_is_locked(job_record_t *job_ptr)
4665 {
4666 	if (!job_ptr->fed_details ||
4667 	    job_ptr->fed_details->cluster_lock)
4668 		return true;
4669 
4670 	return false;
4671 }
4672 
_q_sib_job_start(slurm_msg_t * msg)4673 static void _q_sib_job_start(slurm_msg_t *msg)
4674 {
4675 	sib_msg_t *sib_msg = msg->data;
4676 	fed_job_update_info_t *job_update_info;
4677 
4678 	/* add todo to remove remote siblings if the origin job */
4679 	job_update_info = xmalloc(sizeof(fed_job_update_info_t));
4680 	job_update_info->type         = FED_JOB_START;
4681 	job_update_info->job_id       = sib_msg->job_id;
4682 	job_update_info->start_time   = sib_msg->start_time;
4683 	job_update_info->cluster_lock = sib_msg->cluster_id;
4684 
4685 	_append_job_update(job_update_info);
4686 }
4687 
fed_mgr_job_lock_unset(uint32_t job_id,uint32_t cluster_id)4688 extern int fed_mgr_job_lock_unset(uint32_t job_id, uint32_t cluster_id)
4689 {
4690 	int rc = SLURM_SUCCESS;
4691 	fed_job_info_t * job_info;
4692 
4693 	slurm_mutex_lock(&fed_job_list_mutex);
4694 
4695 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4696 		info("%s: attempting to unlock fed JobId=%u by cluster %u",
4697 		     __func__, job_id, cluster_id);
4698 
4699 	if (!(job_info = _find_fed_job_info(job_id))) {
4700 		error("Didn't find JobId=%u in fed_job_list", job_id);
4701 		rc = SLURM_ERROR;
4702 	} else if (job_info->cluster_lock &&
4703 		   job_info->cluster_lock != cluster_id) {
4704 		error("attempt to unlock sib JobId=%u by cluster %d which doesn't have job lock",
4705 		      job_id, cluster_id);
4706 		rc = SLURM_ERROR;
4707 	} else {
4708 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4709 			info("%s: fed JobId=%u unlocked by %u",
4710 			     __func__, job_id, cluster_id);
4711 		job_info->cluster_lock = 0;
4712 	}
4713 
4714 	slurm_mutex_unlock(&fed_job_list_mutex);
4715 
4716 	return rc;
4717 }
4718 
4719 /*
4720  * Release the job's federation cluster lock so that other cluster's can try to
4721  * start the job.
4722  *
4723  * IN job        - job to unlock
4724  * RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise
4725  */
fed_mgr_job_unlock(job_record_t * job_ptr)4726 extern int fed_mgr_job_unlock(job_record_t *job_ptr)
4727 {
4728 	int rc = SLURM_SUCCESS;
4729 	uint32_t origin_id, cluster_id;
4730 
4731 	if (!_is_fed_job(job_ptr, &origin_id))
4732 		return SLURM_SUCCESS;
4733 
4734 	cluster_id = fed_mgr_cluster_rec->fed.id;
4735 
4736 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4737 		info("releasing fed job lock on %pJ by cluster_id %d",
4738 		     job_ptr, cluster_id);
4739 
4740 	if (origin_id != fed_mgr_cluster_rec->fed.id) {
4741 		slurm_persist_conn_t *origin_conn = NULL;
4742 		slurmdb_cluster_rec_t *origin_cluster;
4743 		if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
4744 			info("Unable to find origin cluster for %pJ from origin id %d",
4745 			     job_ptr, origin_id);
4746 		} else {
4747 			origin_conn = (slurm_persist_conn_t *)
4748 				origin_cluster->fed.send;
4749 		}
4750 
4751 		if (!origin_conn || (origin_conn->fd < 0)) {
4752 			uint64_t tmp_sibs;
4753 			tmp_sibs = job_ptr->fed_details->siblings_viable &
4754 				   ~FED_SIBLING_BIT(origin_id);
4755 			rc = _job_unlock_spec_sibs(job_ptr, tmp_sibs);
4756 		} else {
4757 			rc = _persist_fed_job_unlock(origin_cluster,
4758 						     job_ptr->job_id,
4759 						     cluster_id);
4760 		}
4761 
4762 		if (!rc) {
4763 			job_ptr->fed_details->cluster_lock = 0;
4764 			fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id);
4765 		}
4766 
4767 		return rc;
4768 	}
4769 
4770 	/* Origin Cluster */
4771 	rc = fed_mgr_job_lock_unset(job_ptr->job_id, cluster_id);
4772 
4773 	return rc;
4774 }
4775 
4776 /*
4777  * Notify origin cluster that cluster_id started job.
4778  *
4779  * Cancels remaining sibling jobs.
4780  *
4781  * IN job_ptr    - job_ptr of job to unlock
4782  * IN start_time - start_time of the job.
4783  * RET returns SLURM_SUCCESS if the lock was released, SLURM_ERROR otherwise
4784  */
fed_mgr_job_start(job_record_t * job_ptr,time_t start_time)4785 extern int fed_mgr_job_start(job_record_t *job_ptr, time_t start_time)
4786 {
4787 	int rc = SLURM_SUCCESS;
4788 	uint32_t origin_id, cluster_id;
4789 	fed_job_info_t *job_info;
4790 
4791 	assert(job_ptr);
4792 
4793 	if (!_is_fed_job(job_ptr, &origin_id))
4794 		return SLURM_SUCCESS;
4795 
4796 	cluster_id = fed_mgr_cluster_rec->fed.id;
4797 
4798 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4799 		info("start fed %pJ by cluster_id %d",
4800 		     job_ptr, cluster_id);
4801 
4802 	if (origin_id != fed_mgr_cluster_rec->fed.id) {
4803 		slurm_persist_conn_t *origin_conn = NULL;
4804 		slurmdb_cluster_rec_t *origin_cluster;
4805 		if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
4806 			info("Unable to find origin cluster for %pJ from origin id %d",
4807 			     job_ptr, origin_id);
4808 		} else {
4809 			origin_conn = (slurm_persist_conn_t *)
4810 				origin_cluster->fed.send;
4811 		}
4812 
4813 		if (!origin_conn || (origin_conn->fd < 0)) {
4814 			uint64_t viable_sibs;
4815 			viable_sibs = job_ptr->fed_details->siblings_viable;
4816 			viable_sibs &= ~FED_SIBLING_BIT(origin_id);
4817 			viable_sibs &= ~FED_SIBLING_BIT(cluster_id);
4818 			_revoke_sibling_jobs(job_ptr->job_id,
4819 					     fed_mgr_cluster_rec->fed.id,
4820 					     viable_sibs, job_ptr->start_time);
4821 			rc = SLURM_SUCCESS;
4822 		} else {
4823 			rc = _persist_fed_job_start(origin_cluster,
4824 						    job_ptr->job_id, cluster_id,
4825 						    job_ptr->start_time);
4826 		}
4827 
4828 		if (!rc) {
4829 			job_ptr->fed_details->siblings_active =
4830 				FED_SIBLING_BIT(cluster_id);
4831 			update_job_fed_details(job_ptr);
4832 		}
4833 
4834 		return rc;
4835 
4836 	}
4837 
4838 	/* Origin Cluster: */
4839 	slurm_mutex_lock(&fed_job_list_mutex);
4840 
4841 	if (!(job_info = _find_fed_job_info(job_ptr->job_id))) {
4842 		error("Didn't find %pJ in fed_job_list", job_ptr);
4843 		rc = SLURM_ERROR;
4844 	} else if (!job_info->cluster_lock) {
4845 		error("attempt to start sib JobId=%u by cluster %u, but it's not locked",
4846 		      job_info->job_id, cluster_id);
4847 		rc = SLURM_ERROR;
4848 	} else if (job_info->cluster_lock &&
4849 		   (job_info->cluster_lock != cluster_id)) {
4850 		error("attempt to start sib JobId=%u by cluster %u, which doesn't have job lock",
4851 		     job_info->job_id, cluster_id);
4852 		rc = SLURM_ERROR;
4853 	}
4854 
4855 	if (!rc)
4856 		_fed_job_start_revoke(job_info, job_ptr, start_time);
4857 
4858 	slurm_mutex_unlock(&fed_job_list_mutex);
4859 
4860 
4861 	return rc;
4862 }
4863 
4864 /*
4865  * Complete the federated job. If the job ran on a cluster other than the
4866  * origin_cluster then it notifies the origin cluster that the job finished.
4867  *
4868  * Tells the origin cluster to revoke the tracking job.
4869  *
4870  * IN job_ptr     - job_ptr of job to complete.
4871  * IN return_code - return code of job
4872  * IN start_time  - start time of the job that actually ran.
4873  * RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise
4874  */
fed_mgr_job_complete(job_record_t * job_ptr,uint32_t return_code,time_t start_time)4875 extern int fed_mgr_job_complete(job_record_t *job_ptr, uint32_t return_code,
4876 				time_t start_time)
4877 {
4878 	uint32_t origin_id;
4879 
4880 	if (job_ptr->bit_flags & SIB_JOB_FLUSH)
4881 		return SLURM_SUCCESS;
4882 
4883 	if (!_is_fed_job(job_ptr, &origin_id))
4884 		return SLURM_SUCCESS;
4885 
4886 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4887 		info("complete fed %pJ by cluster_id %d",
4888 		     job_ptr, fed_mgr_cluster_rec->fed.id);
4889 
4890 	if (origin_id == fed_mgr_cluster_rec->fed.id) {
4891 		_revoke_sibling_jobs(job_ptr->job_id,
4892 				     fed_mgr_cluster_rec->fed.id,
4893 				     job_ptr->fed_details->siblings_active,
4894 				     job_ptr->start_time);
4895 		return SLURM_SUCCESS;
4896 	}
4897 
4898 	slurmdb_cluster_rec_t *conn = fed_mgr_get_cluster_by_id(origin_id);
4899 	if (!conn) {
4900 		info("Unable to find origin cluster for %pJ from origin id %d",
4901 		     job_ptr, origin_id);
4902 		return SLURM_ERROR;
4903 	}
4904 
4905 	return _persist_fed_job_revoke(conn, job_ptr->job_id,
4906 				       job_ptr->job_state, return_code,
4907 				       start_time);
4908 }
4909 
4910 /*
4911  * Revoke all sibling jobs.
4912  *
4913  * IN job_ptr - job to revoke sibling jobs from.
4914  * RET SLURM_SUCCESS on success, SLURM_ERROR otherwise.
4915  */
fed_mgr_job_revoke_sibs(job_record_t * job_ptr)4916 extern int fed_mgr_job_revoke_sibs(job_record_t *job_ptr)
4917 {
4918 	uint32_t origin_id;
4919 	time_t now = time(NULL);
4920 
4921 	xassert(verify_lock(JOB_LOCK, READ_LOCK));
4922 	xassert(verify_lock(FED_LOCK, READ_LOCK));
4923 
4924 	if (!_is_fed_job(job_ptr, &origin_id))
4925 		return SLURM_SUCCESS;
4926 
4927 	if (origin_id != fed_mgr_cluster_rec->fed.id)
4928 		return SLURM_SUCCESS;
4929 
4930 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4931 		info("revoke fed %pJ's siblings", job_ptr);
4932 
4933 	_revoke_sibling_jobs(job_ptr->job_id, fed_mgr_cluster_rec->fed.id,
4934 			     job_ptr->fed_details->siblings_active, now);
4935 
4936 	return SLURM_SUCCESS;
4937 }
4938 
4939 /*
4940  * Revokes the federated job.
4941  *
4942  * IN job_ptr      - job_ptr of job to revoke.
4943  * IN job_complete - whether the job is done or not. If completed then sets the
4944  * 	state to JOB_REVOKED | completed_state. JOB_REVOKED otherwise.
4945  * IN completed_state - state of completed job. Only use if job_complete==true.
4946  *      If job_complete==false, then this is unused.
4947  * IN exit_code    - exit_code of job.
4948  * IN start_time   - start time of the job that actually ran.
4949  * RET returns SLURM_SUCCESS if fed job was completed, SLURM_ERROR otherwise
4950  */
fed_mgr_job_revoke(job_record_t * job_ptr,bool job_complete,uint32_t completed_state,uint32_t exit_code,time_t start_time)4951 extern int fed_mgr_job_revoke(job_record_t *job_ptr, bool job_complete,
4952 			      uint32_t completed_state, uint32_t exit_code,
4953 			      time_t start_time)
4954 {
4955 	uint32_t origin_id;
4956 	uint32_t state = JOB_REVOKED;
4957 
4958 	if (IS_JOB_COMPLETED(job_ptr)) /* job already completed */
4959 		return SLURM_SUCCESS;
4960 
4961 	if (!_is_fed_job(job_ptr, &origin_id))
4962 		return SLURM_SUCCESS;
4963 
4964 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
4965 		info("revoking fed %pJ (%s)",
4966 		     job_ptr, job_complete ? "REVOKED|CANCELLED" : "REVOKED");
4967 
4968 	/* Check if the job exited with one of the configured requeue values. */
4969 	job_ptr->exit_code = exit_code;
4970 	if (job_hold_requeue(job_ptr)) {
4971 		batch_requeue_fini(job_ptr);
4972 		return SLURM_SUCCESS;
4973 	}
4974 	/*
4975 	 * Only set to a "completed" state (i.e., state > JOB_SUSPENDED)
4976 	 * if job_complete is true.
4977 	 */
4978 	if (job_complete) {
4979 		if (completed_state > JOB_SUSPENDED)
4980 			state |= completed_state;
4981 		else
4982 			state |= JOB_CANCELLED;
4983 	}
4984 
4985 	job_ptr->job_state  = state;
4986 	job_ptr->start_time = start_time;
4987 	job_ptr->end_time   = start_time;
4988 	job_ptr->state_reason = WAIT_NO_REASON;
4989 	xfree(job_ptr->state_desc);
4990 
4991 	/*
4992 	 * Since the job is purged/revoked quickly on the non-origin side it's
4993 	 * possible that the job_start message has not been sent yet. Send it
4994 	 * now so that the db record gets the uid set -- which the complete
4995 	 * message doesn't send.
4996 	 */
4997 	if (!job_ptr->db_index && (origin_id != fed_mgr_cluster_rec->fed.id)) {
4998 		if (IS_JOB_FINISHED(job_ptr))
4999 			jobacct_storage_g_job_start(acct_db_conn, job_ptr);
5000 		else
5001 			info("%s: %pJ isn't finished and isn't an origin job (%u != %u) and doesn't have a db_index yet. We aren't sending a start message to the database.",
5002 			     __func__, job_ptr, origin_id,
5003 			     fed_mgr_cluster_rec->fed.id);
5004 	}
5005 
5006 	job_completion_logger(job_ptr, false);
5007 
5008 	/* Don't remove the origin job */
5009 	if (origin_id == fed_mgr_cluster_rec->fed.id)
5010 		return SLURM_SUCCESS;
5011 
5012 	/* Purge the revoked job -- remote only */
5013 	unlink_job_record(job_ptr);
5014 
5015 	return SLURM_SUCCESS;
5016 }
5017 
5018 /* Convert cluster ids to cluster names.
5019  *
5020  * RET: return string of comma-separated cluster names.
5021  *      Must free returned string.
5022  */
fed_mgr_cluster_ids_to_names(uint64_t cluster_ids)5023 extern char *fed_mgr_cluster_ids_to_names(uint64_t cluster_ids)
5024 {
5025 	int bit = 1;
5026 	char *names = NULL;
5027 
5028 	if (!fed_mgr_fed_rec || !fed_mgr_fed_rec->cluster_list)
5029 		return names;
5030 
5031 	while (cluster_ids) {
5032 		if (cluster_ids & 1) {
5033 			slurmdb_cluster_rec_t *sibling;
5034 			if ((sibling = fed_mgr_get_cluster_by_id(bit))) {
5035 				xstrfmtcat(names, "%s%s",
5036 					   (names) ? "," : "", sibling->name);
5037 			} else {
5038 				error("Couldn't find a sibling cluster with id %d",
5039 				      bit);
5040 			}
5041 		}
5042 
5043 		cluster_ids >>= 1;
5044 		bit++;
5045 	}
5046 
5047 	return names;
5048 }
5049 
5050 /*
5051  * Tests whether a federated job can be requeued.
5052  *
5053  * If called from the remote cluster (non-origin) then it will send a requeue
5054  * request to the origin to have the origin cancel this job. In this case, it
5055  * will return success and set the JOB_REQUEUE_FED flag and wait to be killed.
5056  *
5057  * If it is the origin job, it will also cancel a running remote job. New
5058  * federated sibling jobs will be submitted after the job has completed (e.g.
5059  * after epilog) in fed_mgr_job_requeue().
5060  *
5061  * IN job_ptr - job to requeue.
5062  * IN flags   - flags for the requeue (e.g. JOB_RECONFIG_FAIL).
5063  * RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR
5064  * 	otherwise.
5065  */
fed_mgr_job_requeue_test(job_record_t * job_ptr,uint32_t flags)5066 extern int fed_mgr_job_requeue_test(job_record_t *job_ptr, uint32_t flags)
5067 {
5068 	uint32_t origin_id;
5069 
5070 	if (!_is_fed_job(job_ptr, &origin_id))
5071 		return SLURM_SUCCESS;
5072 
5073 	if (origin_id != fed_mgr_cluster_rec->fed.id) {
5074 		slurmdb_cluster_rec_t *origin_cluster;
5075 		if (!(origin_cluster = fed_mgr_get_cluster_by_id(origin_id))) {
5076 			error("Unable to find origin cluster for %pJ from origin id %d",
5077 			      job_ptr, origin_id);
5078 			return SLURM_ERROR;
5079 		}
5080 
5081 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5082 			info("requeueing fed job %pJ on origin cluster %d",
5083 			     job_ptr, origin_id);
5084 
5085 		_persist_fed_job_requeue(origin_cluster, job_ptr->job_id,
5086 					 flags);
5087 
5088 		job_ptr->job_state |= JOB_REQUEUE_FED;
5089 
5090 		return SLURM_SUCCESS;
5091 	}
5092 
5093 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5094 		info("requeueing fed %pJ by cluster_id %d",
5095 		     job_ptr, fed_mgr_cluster_rec->fed.id);
5096 
5097 	/* If the job is currently running locally, then cancel the running job
5098 	 * and set a flag that it's being requeued. Then when the epilog
5099 	 * complete comes in submit the siblings to the other clusters.
5100 	 * Have to check this after checking for origin else it won't get to the
5101 	 * origin. */
5102 	if (IS_JOB_RUNNING(job_ptr))
5103 		return SLURM_SUCCESS;
5104 
5105 	/* If a sibling job is running remotely, then cancel the remote job and
5106 	 * wait till job finishes (e.g. after long epilog) and then resubmit the
5107 	 * siblings in fed_mgr_job_requeue(). */
5108 	if (IS_JOB_PENDING(job_ptr) && IS_JOB_REVOKED(job_ptr)) {
5109 		slurmdb_cluster_rec_t *remote_cluster;
5110 		if (!(remote_cluster =
5111 		      fed_mgr_get_cluster_by_id(
5112 					job_ptr->fed_details->cluster_lock))) {
5113 			error("Unable to find remote cluster for %pJ from cluster lock %d",
5114 			      job_ptr, job_ptr->fed_details->cluster_lock);
5115 			return SLURM_ERROR;
5116 		}
5117 
5118 		if (_persist_fed_job_cancel(remote_cluster, job_ptr->job_id,
5119 					    SIGKILL, KILL_FED_REQUEUE, 0)) {
5120 			error("failed to kill/requeue fed %pJ",
5121 			      job_ptr);
5122 		}
5123 	}
5124 
5125 	return SLURM_SUCCESS;
5126 }
5127 
5128 /*
5129  * Submits requeued sibling jobs.
5130  *
5131  * IN job_ptr - job to requeue.
5132  * RET returns SLURM_SUCCESS if siblings submitted successfully, SLURM_ERROR
5133  * 	otherwise.
5134  */
fed_mgr_job_requeue(job_record_t * job_ptr)5135 extern int fed_mgr_job_requeue(job_record_t *job_ptr)
5136 {
5137 	int rc = SLURM_SUCCESS;
5138 	uint32_t origin_id;
5139 	uint64_t feature_sibs = 0;
5140 	fed_job_info_t *job_info;
5141 
5142 	xassert(job_ptr);
5143 	xassert(job_ptr->details);
5144 
5145 	if (!_is_fed_job(job_ptr, &origin_id))
5146 		return SLURM_SUCCESS;
5147 
5148 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5149 		info("requeueing fed job %pJ", job_ptr);
5150 
5151 	/* clear where actual siblings were */
5152 	job_ptr->fed_details->siblings_active = 0;
5153 
5154 	slurm_mutex_lock(&fed_job_list_mutex);
5155 	if (!(job_info = _find_fed_job_info(job_ptr->job_id))) {
5156 		error("%s: failed to find fed job info for fed %pJ",
5157 		      __func__, job_ptr);
5158 	}
5159 
5160 	/* don't submit siblings for jobs that are held */
5161 	if (job_ptr->priority == 0) {
5162 		job_ptr->job_state &= (~JOB_REQUEUE_FED);
5163 
5164 		update_job_fed_details(job_ptr);
5165 
5166 		/* clear cluster lock */
5167 		job_ptr->fed_details->cluster_lock = 0;
5168 		if (job_info)
5169 			job_info->cluster_lock = 0;
5170 
5171 		slurm_mutex_unlock(&fed_job_list_mutex);
5172 		return SLURM_SUCCESS;
5173 	}
5174 
5175 	/* Don't worry about testing which clusters can start the job the
5176 	 * soonest since they can't start the job for 120 seconds anyways. */
5177 
5178 	/* Get new viable siblings since the job might just have one viable
5179 	 * sibling listed if the sibling was the cluster that could start the
5180 	 * job the soonest. */
5181 	_validate_cluster_features(job_ptr->details->cluster_features,
5182 				   &feature_sibs);
5183 	job_ptr->fed_details->siblings_viable =
5184 		_get_viable_sibs(job_ptr->clusters, feature_sibs,
5185 				 job_ptr->array_recs ? true : false, NULL);
5186 
5187 	_prepare_submit_siblings(job_ptr,
5188 				 job_ptr->fed_details->siblings_viable);
5189 
5190 	job_ptr->job_state &= (~JOB_REQUEUE_FED);
5191 
5192 	if (!(job_ptr->fed_details->siblings_viable &
5193 	      FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id)))
5194 		job_ptr->job_state |= JOB_REVOKED;
5195 	else
5196 		job_ptr->job_state &= ~JOB_REVOKED;
5197 
5198 	/* clear cluster lock */
5199 	job_ptr->fed_details->cluster_lock = 0;
5200 	if (job_info) {
5201 		job_info->cluster_lock = 0;
5202 		job_info->siblings_viable =
5203 			job_ptr->fed_details->siblings_viable;
5204 		job_info->siblings_active =
5205 			job_ptr->fed_details->siblings_active;
5206 	}
5207 	slurm_mutex_unlock(&fed_job_list_mutex);
5208 
5209 	return rc;
5210 }
5211 
5212 /* Cancel sibling jobs. Just send request to itself */
_cancel_sibling_jobs(job_record_t * job_ptr,uint16_t signal,uint16_t flags,uid_t uid,bool kill_viable)5213 static int _cancel_sibling_jobs(job_record_t *job_ptr, uint16_t signal,
5214 				uint16_t flags, uid_t uid, bool kill_viable)
5215 {
5216 	int id = 1;
5217 	uint64_t tmp_sibs;
5218 	slurm_persist_conn_t *sib_conn;
5219 
5220 	if (kill_viable) {
5221 		tmp_sibs = job_ptr->fed_details->siblings_viable;
5222 		flags |= KILL_NO_SIBS;
5223 	} else {
5224 		tmp_sibs = job_ptr->fed_details->siblings_active;
5225 		flags &= ~KILL_NO_SIBS;
5226 	}
5227 
5228 	while (tmp_sibs) {
5229 		if ((tmp_sibs & 1) &&
5230 		    (id != fed_mgr_cluster_rec->fed.id)) {
5231 			slurmdb_cluster_rec_t *cluster =
5232 				fed_mgr_get_cluster_by_id(id);
5233 			if (!cluster) {
5234 				error("couldn't find cluster rec by id %d", id);
5235 				goto next_job;
5236 			}
5237 
5238 			/* Don't send request to siblings that are down when
5239 			 * killing viables */
5240 			sib_conn = (slurm_persist_conn_t *)cluster->fed.send;
5241 			if (kill_viable && (!sib_conn || sib_conn->fd == -1))
5242 				goto next_job;
5243 
5244 			_persist_fed_job_cancel(cluster, job_ptr->job_id,
5245 						signal, flags, uid);
5246 		}
5247 
5248 next_job:
5249 		tmp_sibs >>= 1;
5250 		id++;
5251 	}
5252 
5253 	return SLURM_SUCCESS;
5254 }
5255 
5256 /* Cancel sibling jobs of a federated job
5257  *
5258  * IN job_ptr - job to cancel
5259  * IN signal  - signal to send to job
5260  * IN flags   - KILL_.* flags
5261  * IN uid     - uid making request
5262  * IN kill_viable - if true cancel viable_sibs, if false cancel active_sibs
5263  */
fed_mgr_job_cancel(job_record_t * job_ptr,uint16_t signal,uint16_t flags,uid_t uid,bool kill_viable)5264 extern int fed_mgr_job_cancel(job_record_t *job_ptr, uint16_t signal,
5265 			      uint16_t flags, uid_t uid, bool kill_viable)
5266 {
5267 	uint32_t origin_id;
5268 
5269 	xassert(job_ptr);
5270 
5271 	if (!_is_fed_job(job_ptr, &origin_id))
5272 		return SLURM_SUCCESS;
5273 
5274 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5275 		info("cancel fed %pJ by local cluster", job_ptr);
5276 
5277 	_cancel_sibling_jobs(job_ptr, signal, flags, uid, kill_viable);
5278 
5279 	return SLURM_SUCCESS;
5280 }
5281 
fed_mgr_job_started_on_sib(job_record_t * job_ptr)5282 extern bool fed_mgr_job_started_on_sib(job_record_t *job_ptr)
5283 {
5284 	uint32_t origin_id;
5285 
5286 	xassert(job_ptr);
5287 
5288 	/*
5289 	 * When a sibling starts the job, the job becomes revoked on the origin
5290 	 * and the job's cluster_lock is set to that sibling's id.
5291 	 * Don't use fed_mgr_is_origin_job() because that return true if
5292 	 * _is_fed_job() returns false (the job isn't federated), and that's
5293 	 * the opposite of what we want here.
5294 	 */
5295 	return _is_fed_job(job_ptr, &origin_id) &&
5296 		(fed_mgr_cluster_rec->fed.id == origin_id) &&
5297 		IS_JOB_REVOKED(job_ptr) && job_ptr->fed_details->cluster_lock &&
5298 		(job_ptr->fed_details->cluster_lock !=
5299 		 fed_mgr_cluster_rec->fed.id);
5300 }
5301 
fed_mgr_is_job_id_in_fed(uint32_t job_id)5302 extern bool fed_mgr_is_job_id_in_fed(uint32_t job_id)
5303 {
5304 	uint32_t cluster_id;
5305 
5306 	if (!fed_mgr_cluster_rec)
5307 		return false;
5308 
5309 	cluster_id = fed_mgr_get_cluster_id(job_id);
5310 	if (!cluster_id)
5311 		return false;
5312 
5313 	return FED_SIBLING_BIT(cluster_id) & _get_all_sibling_bits();
5314 }
5315 
fed_mgr_is_origin_job(job_record_t * job_ptr)5316 extern int fed_mgr_is_origin_job(job_record_t *job_ptr)
5317 {
5318 	uint32_t origin_id;
5319 
5320 	xassert(job_ptr);
5321 
5322 	if (!_is_fed_job(job_ptr, &origin_id))
5323 		return true;
5324 
5325 	if (fed_mgr_cluster_rec->fed.id != origin_id)
5326 		return false;
5327 
5328 	return true;
5329 }
5330 
5331 /*
5332  * Use this instead of fed_mgr_is_origin_job if job_ptr is not available.
5333  */
fed_mgr_is_origin_job_id(uint32_t job_id)5334 extern bool fed_mgr_is_origin_job_id(uint32_t job_id)
5335 {
5336 	uint32_t origin_id = fed_mgr_get_cluster_id(job_id);
5337 
5338 	if (!fed_mgr_cluster_rec || !origin_id) {
5339 		debug2("%s: job %u is not a federated job", __func__, job_id);
5340 		return true;
5341 	}
5342 
5343 	if (fed_mgr_cluster_rec->fed.id == origin_id)
5344 		return true;
5345 	return false;
5346 }
5347 
5348 /*
5349  * Check if all siblings have fulfilled the singleton dependency.
5350  * Return true if all clusters have checked in that they've fulfilled this
5351  * singleton dependency.
5352  *
5353  * IN job_ptr - job with dependency to check
5354  * IN dep_ptr - dependency to check. If it's not singleton, just return true.
5355  * IN set_cluster_bit - if true, set the bit for this cluster indicating
5356  *                      that this cluster has fulfilled the dependency.
5357  */
fed_mgr_is_singleton_satisfied(job_record_t * job_ptr,depend_spec_t * dep_ptr,bool set_cluster_bit)5358 extern bool fed_mgr_is_singleton_satisfied(job_record_t *job_ptr,
5359 					   depend_spec_t *dep_ptr,
5360 					   bool set_cluster_bit)
5361 {
5362 	uint32_t origin_id;
5363 	uint64_t sib_bits;
5364 
5365 	xassert(job_ptr);
5366 	xassert(dep_ptr);
5367 
5368 	if (!_is_fed_job(job_ptr, &origin_id) || disable_remote_singleton)
5369 		return true;
5370 	if (dep_ptr->depend_type != SLURM_DEPEND_SINGLETON) {
5371 		error("%s: Got non-singleton dependency (type %u) for %pJ. This should never happen.",
5372 		      __func__, dep_ptr->depend_type, job_ptr);
5373 		return true;
5374 	}
5375 
5376 	/* Set the bit for this cluster indicating that it has been satisfied */
5377 	if (set_cluster_bit)
5378 		dep_ptr->singleton_bits |=
5379 			FED_SIBLING_BIT(fed_mgr_cluster_rec->fed.id);
5380 
5381 	if (fed_mgr_cluster_rec->fed.id != origin_id) {
5382 		return true;
5383 	}
5384 
5385 	/*
5386 	 * Only test for current siblings; if a sibling was removed but
5387 	 * previously had passed a singleton dependency, that bit may be
5388 	 * set in dep_ptr->singleton_bits.
5389 	 */
5390 	sib_bits = _get_all_sibling_bits();
5391 	return (dep_ptr->singleton_bits & sib_bits) == sib_bits;
5392 }
5393 
5394 /*
5395  * Update a job's required clusters.
5396  *
5397  * Results in siblings being removed and added.
5398  *
5399  * IN job_ptr       - job to update.
5400  * IN spec_clusters - comma-separated list of cluster names.
5401  * RET return SLURM_SUCCESS on success, error code otherwise.
5402  */
fed_mgr_update_job_clusters(job_record_t * job_ptr,char * spec_clusters)5403 extern int fed_mgr_update_job_clusters(job_record_t *job_ptr,
5404 				       char *spec_clusters)
5405 {
5406 	int rc = SLURM_SUCCESS;
5407 	uint32_t origin_id;
5408 
5409 	xassert(job_ptr);
5410 	xassert(spec_clusters);
5411 
5412 	if (!_is_fed_job(job_ptr, &origin_id)) {
5413 		sched_info("update_job: not a fed job");
5414 		rc = SLURM_ERROR;
5415 	} else if ((!IS_JOB_PENDING(job_ptr)) ||
5416 		   job_ptr->fed_details->cluster_lock) {
5417 		rc = ESLURM_JOB_NOT_PENDING;
5418 	} else if (!fed_mgr_fed_rec) {
5419 		sched_info("update_job: setting Clusters on a non-active federated cluster for %pJ",
5420 			   job_ptr);
5421 		rc = ESLURM_JOB_NOT_FEDERATED;
5422 	} else if (_validate_cluster_names(spec_clusters, NULL)) {
5423 		sched_info("update_job: invalid Clusters for %pJ: %s",
5424 			   job_ptr, spec_clusters);
5425 		rc = ESLURM_INVALID_CLUSTER_NAME;
5426 	} else {
5427 		xfree(job_ptr->clusters);
5428 		if (spec_clusters[0] == '\0')
5429 			sched_info("update_job: cleared Clusters for %pJ",
5430 				   job_ptr);
5431 		else if (*spec_clusters)
5432 			job_ptr->clusters =
5433 				xstrdup(spec_clusters);
5434 
5435 		if (fed_mgr_is_origin_job(job_ptr))
5436 			_add_remove_sibling_jobs(job_ptr);
5437 	}
5438 
5439 	return rc;
5440 }
5441 
5442 /*
5443  * Update a job's cluster features.
5444  *
5445  * Results in siblings being removed and added.
5446  *
5447  * IN job_ptr      - job to update cluster features.
5448  * IN req_features - comma-separated list of feature names.
5449  * RET return SLURM_SUCCESS on success, error code otherwise.
5450  */
fed_mgr_update_job_cluster_features(job_record_t * job_ptr,char * req_features)5451 extern int fed_mgr_update_job_cluster_features(job_record_t *job_ptr,
5452 					       char *req_features)
5453 {
5454 	int rc = SLURM_SUCCESS;
5455 	uint32_t origin_id;
5456 
5457 	xassert(job_ptr);
5458 	xassert(req_features);
5459 
5460 	if (!_is_fed_job(job_ptr, &origin_id)) {
5461 		sched_info("update_job: not a fed job");
5462 		rc = SLURM_ERROR;
5463 	} else if ((!IS_JOB_PENDING(job_ptr)) ||
5464 		   job_ptr->fed_details->cluster_lock) {
5465 		rc = ESLURM_JOB_NOT_PENDING;
5466 	} else if (!fed_mgr_fed_rec) {
5467 		sched_info("update_job: setting ClusterFeatures on a non-active federated cluster for %pJ",
5468 			   job_ptr);
5469 		rc = ESLURM_JOB_NOT_FEDERATED;
5470 	} else if (_validate_cluster_features(req_features, NULL)) {
5471 		sched_info("update_job: invalid ClusterFeatures for %pJ",
5472 			   job_ptr);
5473 		rc = ESLURM_INVALID_CLUSTER_FEATURE;
5474 	} else {
5475 		xfree(job_ptr->details->cluster_features);
5476 		if (req_features[0] == '\0')
5477 			sched_info("update_job: cleared ClusterFeatures for %pJ",
5478 				   job_ptr);
5479 		else if (*req_features)
5480 			job_ptr->details->cluster_features =
5481 				xstrdup(req_features);
5482 
5483 		if (fed_mgr_is_origin_job(job_ptr))
5484 			_add_remove_sibling_jobs(job_ptr);
5485 	}
5486 
5487 	return rc;
5488 }
5489 
_reconcile_fed_job(job_record_t * job_ptr,reconcile_sib_t * rec_sib)5490 static int _reconcile_fed_job(job_record_t *job_ptr, reconcile_sib_t *rec_sib)
5491 {
5492 	int i;
5493 	bool found_job = false;
5494 	job_info_msg_t *remote_jobs_ptr = rec_sib->job_info_msg;
5495 	uint32_t origin_id    = fed_mgr_get_cluster_id(job_ptr->job_id);
5496 	uint32_t sibling_id   = rec_sib->sibling_id;
5497 	uint64_t sibling_bit  = FED_SIBLING_BIT(sibling_id);
5498 	char    *sibling_name = rec_sib->sibling_name;
5499 	slurm_job_info_t *remote_job  = NULL;
5500 	fed_job_info_t *job_info;
5501 
5502 	xassert(job_ptr);
5503 	xassert(remote_jobs_ptr);
5504 
5505 	/*
5506 	 * Only look at jobs that:
5507 	 * 1. originate from the remote sibling
5508 	 * 2. originate from this cluster
5509 	 * 3. if the sibling is in the job's viable list.
5510 	 */
5511 	if (!job_ptr->fed_details ||
5512 	    !job_ptr->details ||
5513 	    (job_ptr->details->submit_time >= rec_sib->sync_time) ||
5514 	    IS_JOB_COMPLETED(job_ptr) || IS_JOB_COMPLETING(job_ptr) ||
5515 	    ((fed_mgr_get_cluster_id(job_ptr->job_id) != sibling_id) &&
5516 	     (!fed_mgr_is_origin_job(job_ptr)) &&
5517 	     (!(job_ptr->fed_details->siblings_viable & sibling_bit)))) {
5518 		return SLURM_SUCCESS;
5519 	}
5520 
5521 	for (i = 0; i < remote_jobs_ptr->record_count; i++) {
5522 		remote_job = &remote_jobs_ptr->job_array[i];
5523 		if (job_ptr->job_id == remote_job->job_id) {
5524 			found_job = true;
5525 			break;
5526 		}
5527 	}
5528 
5529 	/* Jobs that originated on the remote sibling */
5530 	if (origin_id == sibling_id) {
5531 		if (!found_job ||
5532 		    (remote_job && IS_JOB_COMPLETED(remote_job))) {
5533 			/* origin job is missing on remote sibling or is
5534 			 * completed. Could have been removed from a clean
5535 			 * start. */
5536 			info("%s: origin %pJ is missing (or completed) from origin %s. Killing this copy of the job",
5537 			     __func__, job_ptr, sibling_name);
5538 			job_ptr->bit_flags |= SIB_JOB_FLUSH;
5539 			job_signal(job_ptr, SIGKILL, KILL_NO_SIBS, 0, false);
5540 		} else {
5541 			info("%s: origin %s still has %pJ",
5542 			     __func__, sibling_name, job_ptr);
5543 		}
5544 	/* Jobs that are shared between two the siblings -- not originating from
5545 	 * either one */
5546 	} else if (origin_id != fed_mgr_cluster_rec->fed.id) {
5547 		if (!found_job) {
5548 			/* Only care about jobs that are currently there. */
5549 		} else if (IS_JOB_PENDING(job_ptr) && IS_JOB_CANCELLED(remote_job)) {
5550 			info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin and sibling were down",
5551 			     __func__, job_ptr, sibling_name);
5552 			job_ptr->job_state  = JOB_CANCELLED;
5553 			job_ptr->start_time = remote_job->start_time;
5554 			job_ptr->end_time   = remote_job->end_time;
5555 			job_ptr->state_reason = WAIT_NO_REASON;
5556 			xfree(job_ptr->state_desc);
5557 			job_completion_logger(job_ptr, false);
5558 		} else if (IS_JOB_PENDING(job_ptr) &&
5559 			   (IS_JOB_RUNNING(remote_job) ||
5560 			    IS_JOB_COMPLETING(remote_job))) {
5561 			info("%s: %pJ is running on sibling %s, must have been started while the origin and sibling were down",
5562 			     __func__, job_ptr, sibling_name);
5563 
5564 			fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5565 					   remote_job->exit_code,
5566 					   job_ptr->start_time);
5567 			/* return now because job_ptr have been free'd */
5568 			return SLURM_SUCCESS;
5569 		} else if (IS_JOB_PENDING(job_ptr) &&
5570 			   (IS_JOB_COMPLETED(remote_job))) {
5571 			info("%s: %pJ is completed on sibling %s, must have been started and completed while the origin and sibling were down",
5572 			     __func__, job_ptr, sibling_name);
5573 
5574 			fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5575 					   remote_job->exit_code,
5576 					   job_ptr->start_time);
5577 			/* return now because job_ptr have been free'd */
5578 			return SLURM_SUCCESS;
5579 		}
5580 
5581 	/* Origin Jobs */
5582 	} else if (!found_job) {
5583 		info("%s: didn't find %pJ on cluster %s",
5584 		     __func__, job_ptr, sibling_name);
5585 
5586 		/* Remove from active siblings */
5587 		if (!(job_ptr->fed_details->siblings_active & sibling_bit)) {
5588 			/* The sibling is a viable sibling but the sibling is
5589 			 * not active and there is no job there. This is ok. */
5590 			info("%s: %s is a viable but not active sibling of %pJ. This is ok.",
5591 			     __func__, sibling_name, job_ptr);
5592 
5593 #if 0
5594 /* Don't submit new sibling jobs if they're not found on the cluster. They could
5595  * have been removed while the cluster was down. */
5596 		} else if (!job_ptr->fed_details->cluster_lock) {
5597 			/* If the origin job isn't locked, then submit a sibling
5598 			 * to this cluster. */
5599 			/* Only do this if it was an active job. Could have been
5600 			 * removed with --cancel-sibling */
5601 			info("%s: %s is an active sibling of %pJ, attempting to submit new sibling job to the cluster.",
5602 			     __func__, sibling_name, job_ptr);
5603 			_prepare_submit_siblings(job_ptr, sibling_bit);
5604 #endif
5605 		} else if (job_ptr->fed_details->cluster_lock == sibling_id) {
5606 			/* The origin thinks that the sibling was running the
5607 			 * job. It could have completed while this cluster was
5608 			 * down or the sibling removed it by clearing out jobs
5609 			 * (e.g. slurmctld -c). */
5610 			info("%s: origin %pJ was running on sibling %s, but it's not there. Assuming that the job completed",
5611 			     __func__, job_ptr, sibling_name);
5612 			fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED, 0,
5613 					   job_ptr->start_time);
5614 		} else {
5615 			/* The origin job has a lock but it's not on the sibling
5616 			 * being reconciled. The job could have been started by
5617 			 * another cluster while the sibling was down. Or the
5618 			 * original sibling job submission could have failed. Or
5619 			 * the origin started the job on the different sibling
5620 			 * before the sibling before the sibling went down and
5621 			 * came back up (normal situation). */
5622 			info("%s: origin %pJ is currently locked by sibling %d, this is ok",
5623 			     __func__, job_ptr,
5624 			     job_ptr->fed_details->cluster_lock);
5625 			job_ptr->fed_details->siblings_active &= ~sibling_bit;
5626 		}
5627 	} else if (remote_job) {
5628 		info("%s: %pJ found on remote sibling %s state:%s",
5629 		     __func__, job_ptr, sibling_name,
5630 		     job_state_string(remote_job->job_state));
5631 
5632 		if (job_ptr->fed_details->cluster_lock == sibling_id) {
5633 			if (IS_JOB_COMPLETE(remote_job)) {
5634 				info("%s: %pJ on sibling %s is already completed, completing the origin job",
5635 				     __func__, job_ptr, sibling_name);
5636 				fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5637 						   remote_job->exit_code,
5638 						   job_ptr->start_time);
5639 			} else if (IS_JOB_CANCELLED(remote_job)) {
5640 				info("%s: %pJ on sibling %s is already cancelled, completing the origin job",
5641 				     __func__, job_ptr, sibling_name);
5642 				fed_mgr_job_revoke(job_ptr, true, JOB_CANCELLED,
5643 						   remote_job->exit_code,
5644 						   job_ptr->start_time);
5645 			} else if (!IS_JOB_RUNNING(remote_job)) {
5646 				/* The job could be pending if it was requeued
5647 				 * due to node failure */
5648 				info("%s: %pJ on sibling %s has job lock but job is not running (state:%s)",
5649 				     __func__, job_ptr, sibling_name,
5650 				     job_state_string(remote_job->job_state));
5651 			}
5652 		} else if (job_ptr->fed_details->cluster_lock) {
5653 			/* The remote might have had a sibling job before it
5654 			 * went away and the origin started another job while it
5655 			 * was away. The remote job needs to be revoked. */
5656 			info("%s: %pJ found on sibling %s but job is locked by cluster id %d",
5657 			     __func__, job_ptr, sibling_name,
5658 			     job_ptr->fed_details->cluster_lock);
5659 
5660 			if (IS_JOB_PENDING(remote_job)) {
5661 				info("%s: %pJ is on %s in a pending state but cluster %d has the lock on it -- revoking the remote sibling job",
5662 				     __func__, job_ptr, sibling_name,
5663 				     job_ptr->fed_details->cluster_lock);
5664 				_revoke_sibling_jobs(
5665 						job_ptr->job_id,
5666 						fed_mgr_cluster_rec->fed.id,
5667 						sibling_bit,
5668 						job_ptr->start_time);
5669 			} else {
5670 				/* should this job get cancelled? Would have to
5671 				 * check cluster_lock before cancelling it to
5672 				 * make sure that it's not there. */
5673 				info("%s: %pJ has a lock on sibling id %d, but found a non-pending job on sibling %s.",
5674 				     __func__, job_ptr,
5675 				     job_ptr->fed_details->cluster_lock,
5676 				     sibling_name);
5677 
5678 				_revoke_sibling_jobs(
5679 						job_ptr->job_id,
5680 						fed_mgr_cluster_rec->fed.id,
5681 						sibling_bit,
5682 						job_ptr->start_time);
5683 			}
5684 		} else {
5685 			if (!(job_ptr->fed_details->siblings_active &
5686 			      sibling_bit)) {
5687 				info("%s: %pJ on sibling %s but it wasn't in the active list. Adding to active list.",
5688 				     __func__, job_ptr, sibling_name);
5689 				job_ptr->fed_details->siblings_active |=
5690 					sibling_bit;
5691 			}
5692 			if (IS_JOB_CANCELLED(remote_job)) {
5693 				info("%s: %pJ is cancelled on sibling %s, must have been cancelled while the origin was down",
5694 				     __func__, job_ptr, sibling_name);
5695 				job_ptr->job_state  = JOB_CANCELLED;
5696 				job_ptr->start_time = remote_job->start_time;
5697 				job_ptr->end_time   = remote_job->end_time;
5698 				job_ptr->state_reason = WAIT_NO_REASON;
5699 				xfree(job_ptr->state_desc);
5700 				job_completion_logger(job_ptr, false);
5701 
5702 			} else if (IS_JOB_COMPLETED(remote_job)) {
5703 				info("%s: %pJ is completed on sibling %s but the origin cluster wasn't part of starting the job, must have been started while the origin was down",
5704 				     __func__, job_ptr, sibling_name);
5705 				_do_fed_job_complete(job_ptr, JOB_CANCELLED,
5706 						     remote_job->exit_code,
5707 						     remote_job->start_time);
5708 
5709 			} else if (IS_JOB_RUNNING(remote_job) ||
5710 				   IS_JOB_COMPLETING(remote_job)) {
5711 				info("%s: origin doesn't think that %pJ should be running on sibling %s but it is. %s could have started the job while this cluster was down.",
5712 				     __func__, job_ptr, sibling_name,
5713 				     sibling_name);
5714 				/* Job was started while we were down. Set this
5715 				 * job to RV and cancel other siblings */
5716 				fed_job_info_t *job_info;
5717 				slurm_mutex_lock(&fed_job_list_mutex);
5718 				if ((job_info =
5719 				     _find_fed_job_info(job_ptr->job_id))) {
5720 					job_info->cluster_lock = sibling_id;
5721 					job_ptr->fed_details->cluster_lock =
5722 						sibling_id;
5723 
5724 					/* Remove sibling jobs */
5725 					_fed_job_start_revoke(
5726 							job_info, job_ptr,
5727 							remote_job->start_time);
5728 
5729 					/* Set job as RV to track running job */
5730 					fed_mgr_job_revoke(
5731 							job_ptr, false,
5732 							JOB_CANCELLED, 0,
5733 							remote_job->start_time);
5734 				}
5735 				slurm_mutex_unlock(&fed_job_list_mutex);
5736 			}
5737 			/* else all good */
5738 		}
5739 	}
5740 
5741 	/* Update job_info with updated siblings */
5742 	slurm_mutex_lock(&fed_job_list_mutex);
5743 	if ((job_info = _find_fed_job_info(job_ptr->job_id))) {
5744 		job_info->siblings_viable =
5745 			job_ptr->fed_details->siblings_viable;
5746 		job_info->siblings_active =
5747 			job_ptr->fed_details->siblings_active;
5748 	} else {
5749 		error("%s: failed to find fed job info for fed %pJ",
5750 		      __func__, job_ptr);
5751 	}
5752 	slurm_mutex_unlock(&fed_job_list_mutex);
5753 
5754 	return SLURM_SUCCESS;
5755 }
5756 
5757 /*
5758  * Sync jobs with the given sibling name.
5759  *
5760  * IN sib_name - name of the sibling to sync with.
5761  */
_sync_jobs(const char * sib_name,job_info_msg_t * job_info_msg,time_t sync_time)5762 static int _sync_jobs(const char *sib_name, job_info_msg_t *job_info_msg,
5763 		      time_t sync_time)
5764 {
5765 	ListIterator itr;
5766 	reconcile_sib_t rec_sib = {0};
5767 	slurmdb_cluster_rec_t *sib;
5768 	job_record_t *job_ptr;
5769 
5770 	if (!(sib = fed_mgr_get_cluster_by_name((char *)sib_name))) {
5771 		error("Couldn't find sibling by name '%s'", sib_name);
5772 		return SLURM_ERROR;
5773 	}
5774 
5775 	rec_sib.sibling_id   = sib->fed.id;
5776 	rec_sib.sibling_name = sib->name;
5777 	rec_sib.job_info_msg = job_info_msg;
5778 	rec_sib.sync_time    = sync_time;
5779 
5780 	itr = list_iterator_create(job_list);
5781 	while ((job_ptr = list_next(itr)))
5782 		_reconcile_fed_job(job_ptr, &rec_sib);
5783 	list_iterator_destroy(itr);
5784 
5785 	sib->fed.sync_recvd = true;
5786 
5787 	return SLURM_SUCCESS;
5788 }
5789 
5790 /*
5791  * Remove active sibling from the given job.
5792  *
5793  * IN job_id   - job_id of job to remove active sibling from.
5794  * IN sib_name - name of sibling job to remove from active siblings.
5795  * RET SLURM_SUCCESS on success, error code on error.
5796  */
fed_mgr_remove_active_sibling(uint32_t job_id,char * sib_name)5797 extern int fed_mgr_remove_active_sibling(uint32_t job_id, char *sib_name)
5798 {
5799 	uint32_t origin_id;
5800 	job_record_t *job_ptr = NULL;
5801 	slurmdb_cluster_rec_t *sibling;
5802 
5803 	if (!(job_ptr = find_job_record(job_id)))
5804 		return ESLURM_INVALID_JOB_ID;
5805 
5806 	if (!_is_fed_job(job_ptr, &origin_id))
5807 		return ESLURM_JOB_NOT_FEDERATED;
5808 
5809 	if (job_ptr->fed_details->cluster_lock)
5810 		return ESLURM_JOB_NOT_PENDING;
5811 
5812 	if (!(sibling = fed_mgr_get_cluster_by_name(sib_name)))
5813 		return ESLURM_INVALID_CLUSTER_NAME;
5814 
5815 	if (job_ptr->fed_details->siblings_active &
5816 	    FED_SIBLING_BIT(sibling->fed.id)) {
5817 		time_t now = time(NULL);
5818 		if (fed_mgr_cluster_rec == sibling)
5819 			fed_mgr_job_revoke(job_ptr, false, 0, JOB_CANCELLED,
5820 					   now);
5821 		else
5822 			_revoke_sibling_jobs(job_ptr->job_id,
5823 					     fed_mgr_cluster_rec->fed.id,
5824 					     FED_SIBLING_BIT(sibling->fed.id),
5825 					     now);
5826 		job_ptr->fed_details->siblings_active &=
5827 			~(FED_SIBLING_BIT(sibling->fed.id));
5828 		update_job_fed_details(job_ptr);
5829 	}
5830 
5831 	return SLURM_SUCCESS;
5832 }
5833 
_q_sib_job_submission(slurm_msg_t * msg,bool interactive_job)5834 static int _q_sib_job_submission(slurm_msg_t *msg, bool interactive_job)
5835 {
5836 	fed_job_update_info_t *job_update_info = NULL;
5837 	sib_msg_t *sib_msg            = msg->data;
5838 	job_desc_msg_t *job_desc      = sib_msg->data;
5839 	job_desc->job_id              = sib_msg->job_id;
5840 	job_desc->fed_siblings_viable = sib_msg->fed_siblings;
5841 	job_desc->alloc_node          = sib_msg->submit_host;
5842 	/*
5843 	 * If the job has a dependency, it won't be submitted to siblings
5844 	 * or it will be revoked from siblings if it became dependent.
5845 	 * So, the sibling should ignore job_desc->dependency since it's
5846 	 */
5847 	xfree(job_desc->dependency);
5848 	if (interactive_job)
5849 		job_desc->resp_host = xstrdup(sib_msg->resp_host);
5850 
5851 	/* NULL out the data pointer because we are storing the pointer on the
5852 	 * fed job update queue to be handled later. */
5853 	sib_msg->data = NULL;
5854 
5855 	/* set protocol version to that of the client's version so that
5856 	 * the job's start_protocol_version is that of the client's and
5857 	 * not the calling controllers. */
5858 	job_update_info = xmalloc(sizeof(fed_job_update_info_t));
5859 
5860 	job_update_info->job_id           = job_desc->job_id;
5861 	job_update_info->submit_cluster   = xstrdup(msg->conn->cluster_name);
5862 	job_update_info->submit_desc      = job_desc;
5863 	job_update_info->submit_proto_ver = msg->protocol_version;
5864 
5865 	if (interactive_job)
5866 		job_update_info->type     = FED_JOB_SUBMIT_INT;
5867 	else
5868 		job_update_info->type     = FED_JOB_SUBMIT_BATCH;
5869 
5870 	_append_job_update(job_update_info);
5871 
5872 	return SLURM_SUCCESS;
5873 }
5874 
_q_sib_submit_response(slurm_msg_t * msg)5875 static int _q_sib_submit_response(slurm_msg_t *msg)
5876 {
5877 	int rc = SLURM_SUCCESS;
5878 	sib_msg_t *sib_msg;
5879 	fed_job_update_info_t *job_update_info = NULL;
5880 
5881 	xassert(msg);
5882 	xassert(msg->conn);
5883 
5884 	sib_msg = msg->data;
5885 
5886 	/* if failure then remove from active siblings */
5887 	if (sib_msg && sib_msg->return_code) {
5888 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
5889 			info("%s: cluster %s failed to submit sibling JobId=%u. Removing from active_sibs. (error:%d)",
5890 			     __func__, msg->conn->cluster_name,
5891 			     sib_msg->job_id,
5892 			     sib_msg->return_code);
5893 
5894 		job_update_info = xmalloc(sizeof(fed_job_update_info_t));
5895 		job_update_info->job_id       = sib_msg->job_id;
5896 		job_update_info->type         = FED_JOB_REMOVE_ACTIVE_SIB_BIT;
5897 		job_update_info->siblings_str =
5898 			xstrdup(msg->conn->cluster_name);
5899 		_append_job_update(job_update_info);
5900 	}
5901 
5902 	return rc;
5903 }
5904 
_q_sib_job_update(slurm_msg_t * msg,uint32_t uid)5905 static int _q_sib_job_update(slurm_msg_t *msg, uint32_t uid)
5906 {
5907 	sib_msg_t *sib_msg = msg->data;
5908 	job_desc_msg_t *job_desc = sib_msg->data;
5909 	fed_job_update_info_t *job_update_info =
5910 		xmalloc(sizeof(fed_job_update_info_t));
5911 
5912 	/* NULL out the data pointer because we are storing the pointer on the
5913 	 * fed job update queue to be handled later. */
5914 	sib_msg->data = NULL;
5915 
5916 	job_update_info->type           = FED_JOB_UPDATE;
5917 	job_update_info->submit_desc    = job_desc;
5918 	job_update_info->job_id         = sib_msg->job_id;
5919 	job_update_info->uid            = uid;
5920 	job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
5921 
5922 	_append_job_update(job_update_info);
5923 
5924 	return SLURM_SUCCESS;
5925 }
5926 
_q_sib_job_cancel(slurm_msg_t * msg,uint32_t uid)5927 static int _q_sib_job_cancel(slurm_msg_t *msg, uint32_t uid)
5928 {
5929 	int rc = SLURM_SUCCESS;
5930 	uint32_t req_uid;
5931 	sib_msg_t *sib_msg = msg->data;
5932 	job_step_kill_msg_t *kill_msg = (job_step_kill_msg_t *)sib_msg->data;
5933 	fed_job_update_info_t *job_update_info =
5934 		xmalloc(sizeof(fed_job_update_info_t));
5935 
5936 	/* NULL out the data pointer because we are storing the pointer on the
5937 	 * fed job update queue to be handled later. */
5938 	sib_msg->data = NULL;
5939 
5940 	if (sib_msg->req_uid)
5941 		req_uid = sib_msg->req_uid;
5942 	else
5943 		req_uid = uid;
5944 
5945 	job_update_info->type     = FED_JOB_CANCEL;
5946 	job_update_info->job_id   = kill_msg->job_id;
5947 	job_update_info->kill_msg = kill_msg;
5948 	job_update_info->uid      = req_uid;
5949 
5950 	_append_job_update(job_update_info);
5951 
5952 	return rc;
5953 }
5954 
_q_sib_job_complete(slurm_msg_t * msg)5955 static int _q_sib_job_complete(slurm_msg_t *msg)
5956 {
5957 	int rc = SLURM_SUCCESS;
5958 	sib_msg_t *sib_msg = msg->data;
5959 	fed_job_update_info_t *job_update_info =
5960 		xmalloc(sizeof(fed_job_update_info_t));
5961 
5962 	job_update_info->type        = FED_JOB_COMPLETE;
5963 	job_update_info->job_id      = sib_msg->job_id;
5964 	job_update_info->job_state   = sib_msg->job_state;
5965 	job_update_info->start_time  = sib_msg->start_time;
5966 	job_update_info->return_code = sib_msg->return_code;
5967 
5968 	_append_job_update(job_update_info);
5969 
5970 	return rc;
5971 }
5972 
_q_sib_job_update_response(slurm_msg_t * msg)5973 static int _q_sib_job_update_response(slurm_msg_t *msg)
5974 {
5975 	int rc = SLURM_SUCCESS;
5976 	sib_msg_t *sib_msg = msg->data;
5977 
5978 	fed_job_update_info_t *job_update_info =
5979 		xmalloc(sizeof(fed_job_update_info_t));
5980 
5981 	job_update_info->type           = FED_JOB_UPDATE_RESPONSE;
5982 	job_update_info->job_id         = sib_msg->job_id;
5983 	job_update_info->return_code    = sib_msg->return_code;
5984 	job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
5985 
5986 	_append_job_update(job_update_info);
5987 
5988 	return rc;
5989 }
5990 
_q_sib_job_requeue(slurm_msg_t * msg,uint32_t uid)5991 static int _q_sib_job_requeue(slurm_msg_t *msg, uint32_t uid)
5992 {
5993 	int rc = SLURM_SUCCESS;
5994 	sib_msg_t *sib_msg     = msg->data;
5995 	requeue_msg_t *req_ptr = (requeue_msg_t *)sib_msg->data;
5996 	fed_job_update_info_t *job_update_info =
5997 		xmalloc(sizeof(fed_job_update_info_t));
5998 
5999 	job_update_info->type   = FED_JOB_REQUEUE;
6000 	job_update_info->job_id = req_ptr->job_id;
6001 	job_update_info->flags  = req_ptr->flags;
6002 	job_update_info->uid    = uid;
6003 
6004 	_append_job_update(job_update_info);
6005 
6006 	return rc;
6007 }
6008 
_q_send_job_sync(char * sib_name)6009 static int _q_send_job_sync(char *sib_name)
6010 {
6011 	int rc = SLURM_SUCCESS;
6012 	fed_job_update_info_t *job_update_info =
6013 		xmalloc(sizeof(fed_job_update_info_t));
6014 
6015 	job_update_info->type           = FED_SEND_JOB_SYNC;
6016 	job_update_info->submit_cluster = xstrdup(sib_name);
6017 
6018 	_append_job_update(job_update_info);
6019 
6020 	return rc;
6021 }
6022 
_q_sib_job_sync(slurm_msg_t * msg)6023 static int _q_sib_job_sync(slurm_msg_t *msg)
6024 {
6025 	int rc = SLURM_SUCCESS;
6026 	sib_msg_t *sib_msg = msg->data;
6027 	job_info_msg_t *job_info_msg = (job_info_msg_t *)sib_msg->data;
6028 	fed_job_update_info_t *job_update_info =
6029 		xmalloc(sizeof(fed_job_update_info_t));
6030 
6031 	/* NULL out the data pointer because we are storing the pointer on the
6032 	 * fed job update queue to be handled later. */
6033 	sib_msg->data = NULL;
6034 
6035 	job_update_info->type           = FED_JOB_SYNC;
6036 	job_update_info->job_info_msg   = job_info_msg;
6037 	job_update_info->start_time     = sib_msg->start_time;
6038 	job_update_info->submit_cluster = xstrdup(msg->conn->cluster_name);
6039 
6040 	_append_job_update(job_update_info);
6041 
6042 	return rc;
6043 }
6044 
fed_mgr_q_update_origin_dep_msg(slurm_msg_t * msg)6045 extern int fed_mgr_q_update_origin_dep_msg(slurm_msg_t *msg)
6046 {
6047 	dep_update_origin_msg_t *update_deps;
6048 	dep_update_origin_msg_t *update_msg = msg->data;
6049 
6050 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
6051 		info("%s: Got %s: Job %u",
6052 		     __func__, rpc_num2string(msg->msg_type),
6053 		     update_msg->job_id);
6054 
6055 	/* update_msg will get free'd, so copy it */
6056 	update_deps = xmalloc(sizeof *update_deps);
6057 	update_deps->depend_list = update_msg->depend_list;
6058 	update_deps->job_id = update_msg->job_id;
6059 	/*
6060 	 * NULL update_msg->depend_list so it doesn't get free'd; we're
6061 	 * using it later.
6062 	 */
6063 	update_msg->depend_list = NULL;
6064 
6065 	list_append(origin_dep_update_list, update_deps);
6066 	slurm_mutex_lock(&origin_dep_update_mutex);
6067 	slurm_cond_broadcast(&origin_dep_cond);
6068 	slurm_mutex_unlock(&origin_dep_update_mutex);
6069 
6070 	return SLURM_SUCCESS;
6071 }
6072 
fed_mgr_q_dep_msg(slurm_msg_t * msg)6073 extern int fed_mgr_q_dep_msg(slurm_msg_t *msg)
6074 {
6075 	dep_msg_t *remote_dependency;
6076 	dep_msg_t *dep_msg = msg->data;
6077 
6078 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
6079 		info("%s: Got %s: Job %u",
6080 		     __func__, rpc_num2string(msg->msg_type), dep_msg->job_id);
6081 
6082 	/* dep_msg will get free'd, so copy it */
6083 	remote_dependency = xmalloc(sizeof *remote_dependency);
6084 	remote_dependency->job_id = dep_msg->job_id;
6085 	remote_dependency->job_name = dep_msg->job_name;
6086 	remote_dependency->dependency = dep_msg->dependency;
6087 	/* NULL strings so they don't get free'd */
6088 	dep_msg->job_name = NULL;
6089 	dep_msg->dependency = NULL;
6090 	remote_dependency->array_task_id = dep_msg->array_task_id;
6091 	remote_dependency->array_job_id = dep_msg->array_job_id;
6092 	remote_dependency->is_array = dep_msg->is_array;
6093 	remote_dependency->user_id = dep_msg->user_id;
6094 
6095 	list_append(remote_dep_recv_list, remote_dependency);
6096 	slurm_mutex_lock(&remote_dep_recv_mutex);
6097 	slurm_cond_broadcast(&remote_dep_cond);
6098 	slurm_mutex_unlock(&remote_dep_recv_mutex);
6099 	return SLURM_SUCCESS;
6100 }
6101 
fed_mgr_q_sib_msg(slurm_msg_t * msg,uint32_t rpc_uid)6102 extern int fed_mgr_q_sib_msg(slurm_msg_t *msg, uint32_t rpc_uid)
6103 {
6104 	sib_msg_t *sib_msg = msg->data;
6105 
6106 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_FEDR)
6107 		info("%s: sib_msg_type:%s",
6108 		     __func__, _job_update_type_str(sib_msg->sib_msg_type));
6109 
6110 	switch (sib_msg->sib_msg_type) {
6111 	case FED_JOB_CANCEL:
6112 		_q_sib_job_cancel(msg, rpc_uid);
6113 		break;
6114 	case FED_JOB_COMPLETE:
6115 		_q_sib_job_complete(msg);
6116 		break;
6117 	case FED_JOB_REQUEUE:
6118 		_q_sib_job_requeue(msg, rpc_uid);
6119 		break;
6120 	case FED_JOB_START:
6121 		_q_sib_job_start(msg);
6122 		break;
6123 	case FED_JOB_SUBMIT_BATCH:
6124 		_q_sib_job_submission(msg, false);
6125 		break;
6126 	case FED_JOB_SUBMIT_INT:
6127 		_q_sib_job_submission(msg, true);
6128 		break;
6129 	case FED_JOB_SUBMIT_RESP:
6130 		_q_sib_submit_response(msg);
6131 		break;
6132 	case FED_JOB_SYNC:
6133 		_q_sib_job_sync(msg);
6134 		break;
6135 	case FED_JOB_UPDATE:
6136 		_q_sib_job_update(msg, rpc_uid);
6137 		break;
6138 	case FED_JOB_UPDATE_RESPONSE:
6139 		_q_sib_job_update_response(msg);
6140 		break;
6141 	default:
6142 		error("%s: invalid sib_msg_type: %d",
6143 		      __func__, sib_msg->sib_msg_type);
6144 		break;
6145 	}
6146 
6147 	return SLURM_SUCCESS;
6148 }
6149 
_list_find_not_synced_sib(void * x,void * key)6150 static int _list_find_not_synced_sib(void *x, void *key)
6151 {
6152 	slurmdb_cluster_rec_t *sib = x;
6153 
6154 	if (sib != fed_mgr_cluster_rec &&
6155 	    sib->fed.send &&
6156 	    (((slurm_persist_conn_t *)sib->fed.send)->fd >= 0) &&
6157 	    !sib->fed.sync_recvd)
6158 		return 1;
6159 
6160 	return 0;
6161 }
6162 
fed_mgr_sibs_synced()6163 extern bool fed_mgr_sibs_synced()
6164 {
6165 	slurmdb_cluster_rec_t *sib;
6166 	int dummy = 1;
6167 
6168 	if (!fed_mgr_fed_rec)
6169 		return true;
6170 
6171 	if ((sib = list_find_first(fed_mgr_fed_rec->cluster_list,
6172 				   _list_find_not_synced_sib, &dummy))) {
6173 		debug("%s: sibling %s up but not synced yet",
6174 		      __func__, sib->name);
6175 		return false;
6176 	}
6177 
6178 	return true;
6179 }
6180 
fed_mgr_test_remote_dependencies(void)6181 extern void fed_mgr_test_remote_dependencies(void)
6182 {
6183 	int rc;
6184 	uint32_t origin_id;
6185 	bool was_changed;
6186 	job_record_t *job_ptr;
6187 	ListIterator itr;
6188 	slurmdb_cluster_rec_t *origin;
6189 
6190 	xassert(verify_lock(JOB_LOCK, READ_LOCK));
6191 	xassert(verify_lock(FED_LOCK, READ_LOCK));
6192 
6193 	if (!list_count(remote_dep_job_list) || !fed_mgr_fed_rec ||
6194 	    !fed_mgr_cluster_rec)
6195 		return;
6196 
6197 	slurm_mutex_lock(&dep_job_list_mutex);
6198 	itr = list_iterator_create(remote_dep_job_list);
6199 	while ((job_ptr = list_next(itr))) {
6200 		origin_id = fed_mgr_get_cluster_id(job_ptr->job_id);
6201 		origin = fed_mgr_get_cluster_by_id(origin_id);
6202 		if (!origin) {
6203 			/*
6204 			 * The origin probably left the federation. If it comes
6205 			 * back there's no guarantee it will have the same
6206 			 * cluster id as before.
6207 			 */
6208 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
6209 				info("%s: Couldn't find the origin cluster (id %u); it probably left the federation. Stop testing dependency for %pJ.",
6210 				     __func__, origin_id, job_ptr);
6211 			list_delete_item(itr);
6212 			continue;
6213 		}
6214 
6215 		rc = test_job_dependency(job_ptr, &was_changed);
6216 		if (rc == LOCAL_DEPEND) {
6217 			if (was_changed) {
6218 				if (slurmctld_conf.debug_flags &
6219 				    DEBUG_FLAG_DEPENDENCY)
6220 					info("%s: %pJ has at least 1 local dependency left.",
6221 					     __func__, job_ptr);
6222 				_update_origin_job_dep(job_ptr, origin);
6223 			}
6224 		} else if (rc == FAIL_DEPEND) {
6225 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
6226 				info("%s: %pJ test_job_dependency() failed, dependency never satisfied.",
6227 				     __func__, job_ptr);
6228 			_update_origin_job_dep(job_ptr, origin);
6229 			list_delete_item(itr);
6230 		} else { /* ((rc == REMOTE_DEPEND) || (rc == NO_DEPEND)) */
6231 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_DEPENDENCY)
6232 				info("%s: %pJ has no more dependencies left on this cluster.",
6233 				     __func__, job_ptr);
6234 			_update_origin_job_dep(job_ptr, origin);
6235 			list_delete_item(itr);
6236 		}
6237 	}
6238 	list_iterator_destroy(itr);
6239 	slurm_mutex_unlock(&dep_job_list_mutex);
6240 }
6241