1 /*****************************************************************************\
2  *  agent.c - parallel background communication functions. This is where
3  *	logic could be placed for broadcast communications.
4  *****************************************************************************
5  *  Copyright (C) 2002-2007 The Regents of the University of California.
6  *  Copyright (C) 2008-2010 Lawrence Livermore National Security.
7  *  Portions Copyright (C) 2010-2015 SchedMD LLC <https://www.schedmd.com>.
8  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
9  *  Written by Morris Jette <jette1@llnl.gov>, et. al.
10  *  Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
11  *  CODE-OCEC-09-009. All rights reserved.
12  *
13  *  This file is part of Slurm, a resource management program.
14  *  For details, see <https://slurm.schedmd.com/>.
15  *  Please also read the included file: DISCLAIMER.
16  *
17  *  Slurm is free software; you can redistribute it and/or modify it under
18  *  the terms of the GNU General Public License as published by the Free
19  *  Software Foundation; either version 2 of the License, or (at your option)
20  *  any later version.
21  *
22  *  In addition, as a special exception, the copyright holders give permission
23  *  to link the code of portions of this program with the OpenSSL library under
24  *  certain conditions as described in each individual source file, and
25  *  distribute linked combinations including the two. You must obey the GNU
26  *  General Public License in all respects for all of the code used other than
27  *  OpenSSL. If you modify file(s) with this exception, you may extend this
28  *  exception to your version of the file(s), but you are not obligated to do
29  *  so. If you do not wish to do so, delete this exception statement from your
30  *  version.  If you delete this exception statement from all source files in
31  *  the program, then also delete it here.
32  *
33  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
34  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
35  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
36  *  details.
37  *
38  *  You should have received a copy of the GNU General Public License along
39  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
40  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
41  *****************************************************************************
42  *  Theory of operation:
43  *
44  *  The functions below permit slurm to initiate parallel tasks as a
45  *  detached thread and let the functions below make sure the work happens.
46  *  For example, when a job's time limit is to be changed slurmctld needs
47  *  to notify the slurmd on every node to which the job was allocated.
48  *  We don't want to hang slurmctld's primary function (the job update RPC)
49  *  to perform this work, so it just initiates an agent to perform the work.
50  *  The agent is passed all details required to perform the work, so it will
51  *  be possible to execute the agent as an pthread, process, or even a daemon
52  *  on some other computer.
53  *
54  *  The main agent thread creates a separate thread for each node to be
55  *  communicated with up to AGENT_THREAD_COUNT. A special watchdog thread
56  *  sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)
57  *  for more than MessageTimeout seconds.
58  *  The agent responds to slurmctld via a function call or an RPC as required.
59  *  For example, informing slurmctld that some node is not responding.
60  *
61  *  All the state for each thread is maintained in thd_t struct, which is
62  *  used by the watchdog thread as well as the communication threads.
63 \*****************************************************************************/
64 
65 #include "config.h"
66 
67 #if HAVE_SYS_PRCTL_H
68 #include <sys/prctl.h>
69 #endif
70 
71 #include <errno.h>
72 #include <pthread.h>
73 #include <pwd.h>
74 #include <signal.h>
75 #include <stdlib.h>
76 #include <string.h>
77 #include <sys/types.h>
78 #include <sys/wait.h>
79 #include <unistd.h>
80 
81 #include "src/common/forward.h"
82 #include "src/common/list.h"
83 #include "src/common/log.h"
84 #include "src/common/macros.h"
85 #include "src/common/node_select.h"
86 #include "src/common/parse_time.h"
87 #include "src/common/slurm_protocol_api.h"
88 #include "src/common/slurm_protocol_interface.h"
89 #include "src/common/uid.h"
90 #include "src/common/xsignal.h"
91 #include "src/common/xassert.h"
92 #include "src/common/xmalloc.h"
93 #include "src/common/xstring.h"
94 #include "src/slurmctld/agent.h"
95 #include "src/slurmctld/front_end.h"
96 #include "src/slurmctld/job_scheduler.h"
97 #include "src/slurmctld/locks.h"
98 #include "src/slurmctld/ping_nodes.h"
99 #include "src/slurmctld/slurmctld.h"
100 #include "src/slurmctld/state_save.h"
101 #include "src/slurmctld/srun_comm.h"
102 
103 #define MAX_RETRIES		100
104 #define MAX_RPC_PACK_CNT	100
105 #define RPC_PACK_MAX_AGE	30	/* Rebuild data over 30 seconds old */
106 #define DUMP_RPC_COUNT 		25
107 #define HOSTLIST_MAX_SIZE 	80
108 
109 typedef enum {
110 	DSH_NEW,        /* Request not yet started */
111 	DSH_ACTIVE,     /* Request in progress */
112 	DSH_DONE,       /* Request completed normally */
113 	DSH_NO_RESP,    /* Request timed out */
114 	DSH_FAILED,     /* Request resulted in error */
115 	DSH_DUP_JOBID	/* Request resulted in duplicate job ID error */
116 } state_t;
117 
118 typedef struct thd_complete {
119 	bool work_done; 	/* assume all threads complete */
120 	int fail_cnt;		/* assume no threads failures */
121 	int no_resp_cnt;	/* assume all threads respond */
122 	int retry_cnt;		/* assume no required retries */
123 	int max_delay;
124 	time_t now;
125 } thd_complete_t;
126 
127 typedef struct thd {
128 	pthread_t thread;		/* thread ID */
129 	state_t state;			/* thread state */
130 	time_t start_time;		/* start time */
131 	time_t end_time;		/* end time or delta time
132 					 * upon termination */
133 	slurm_addr_t *addr;		/* specific addr to send to
134 					 * will not do nodelist if set */
135 	char *nodelist;			/* list of nodes to send to */
136 	List ret_list;
137 } thd_t;
138 
139 typedef struct agent_info {
140 	pthread_mutex_t thread_mutex;	/* agent specific mutex */
141 	pthread_cond_t thread_cond;	/* agent specific condition */
142 	uint32_t thread_count;		/* number of threads records */
143 	uint32_t threads_active;	/* currently active threads */
144 	uint16_t retry;			/* if set, keep trying */
145 	thd_t *thread_struct;		/* thread structures */
146 	bool get_reply;			/* flag if reply expected */
147 	slurm_msg_type_t msg_type;	/* RPC to be issued */
148 	void **msg_args_pptr;		/* RPC data to be used */
149 	uint16_t protocol_version;	/* if set, use this version */
150 } agent_info_t;
151 
152 typedef struct task_info {
153 	pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific
154 					    * mutex */
155 	pthread_cond_t *thread_cond_ptr;/* pointer to agent specific
156 					 * condition */
157 	uint32_t *threads_active_ptr;	/* currently active thread ptr */
158 	thd_t *thread_struct_ptr;	/* thread structures ptr */
159 	bool get_reply;			/* flag if reply expected */
160 	slurm_msg_type_t msg_type;	/* RPC to be issued */
161 	void *msg_args_ptr;		/* ptr to RPC data to be used */
162 	uint16_t protocol_version;	/* if set, use this version */
163 } task_info_t;
164 
165 typedef struct queued_request {
166 	agent_arg_t* agent_arg_ptr;	/* The queued request */
167 	time_t       first_attempt;	/* Time of first check for batch
168 					 * launch RPC *only* */
169 	time_t       last_attempt;	/* Time of last xmit attempt */
170 } queued_request_t;
171 
172 typedef struct mail_info {
173 	char *user_name;
174 	char *message;
175 } mail_info_t;
176 
177 static void _agent_defer(void);
178 static void _agent_retry(int min_wait, bool wait_too);
179 static int  _batch_launch_defer(queued_request_t *queued_req_ptr);
180 static void _reboot_from_ctld(agent_arg_t *agent_arg_ptr);
181 static int  _signal_defer(queued_request_t *queued_req_ptr);
182 static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type);
183 static void _list_delete_retry(void *retry_entry);
184 static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
185 static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
186 static void _notify_slurmctld_jobs(agent_info_t *agent_ptr);
187 static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
188 		int no_resp_cnt, int retry_cnt);
189 static void _purge_agent_args(agent_arg_t *agent_arg_ptr);
190 static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
191 static int  _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
192 			   int *count, int *spot);
193 static void _sig_handler(int dummy);
194 static void *_thread_per_group_rpc(void *args);
195 static int   _valid_agent_arg(agent_arg_t *agent_arg_ptr);
196 static void *_wdog(void *args);
197 
198 static mail_info_t *_mail_alloc(void);
199 static void  _mail_free(void *arg);
200 static void *_mail_proc(void *arg);
201 static char *_mail_type_str(uint16_t mail_type);
202 static char **_build_mail_env(void);
203 
204 static pthread_mutex_t defer_mutex = PTHREAD_MUTEX_INITIALIZER;
205 static pthread_mutex_t mail_mutex  = PTHREAD_MUTEX_INITIALIZER;
206 static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
207 static List defer_list = NULL;		/* agent_arg_t list for requests
208 					 * requiring job write lock */
209 static List mail_list = NULL;		/* pending e-mail requests */
210 static List retry_list = NULL;		/* agent_arg_t list for retry */
211 
212 
213 static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t  agent_cnt_cond  = PTHREAD_COND_INITIALIZER;
215 static int agent_cnt = 0;
216 static int agent_thread_cnt = 0;
217 static int mail_thread_cnt = 0;
218 static uint16_t message_timeout = NO_VAL16;
219 
220 static pthread_mutex_t pending_mutex = PTHREAD_MUTEX_INITIALIZER;
221 static pthread_cond_t  pending_cond = PTHREAD_COND_INITIALIZER;
222 static int pending_wait_time = NO_VAL16;
223 static bool pending_mail = false;
224 static bool pending_thread_running = false;
225 
226 static bool run_scheduler    = false;
227 
228 static uint32_t *rpc_stat_counts = NULL, *rpc_stat_types = NULL;
229 static uint32_t stat_type_count = 0;
230 static uint32_t rpc_count = 0;
231 static uint32_t *rpc_type_list;
232 static char **rpc_host_list = NULL;
233 static time_t cache_build_time = 0;
234 
235 /*
236  * agent - party responsible for transmitting an common RPC in parallel
237  *	across a set of nodes. Use agent_queue_request() if immediate
238  *	execution is not essential.
239  * IN pointer to agent_arg_t, which is xfree'd (including hostlist,
240  *	and msg_args) upon completion
241  * RET always NULL (function format just for use as pthread)
242  */
agent(void * args)243 void *agent(void *args)
244 {
245 	int i, delay;
246 	pthread_t thread_wdog = 0;
247 	agent_arg_t *agent_arg_ptr = args;
248 	agent_info_t *agent_info_ptr = NULL;
249 	thd_t *thread_ptr;
250 	task_info_t *task_specific_ptr;
251 	time_t begin_time;
252 	bool spawn_retry_agent = false;
253 	int rpc_thread_cnt;
254 	static time_t sched_update = 0;
255 	static bool reboot_from_ctld = false;
256 
257 #if HAVE_SYS_PRCTL_H
258 	if (prctl(PR_SET_NAME, "agent", NULL, NULL, NULL) < 0) {
259 		error("%s: cannot set my name to %s %m", __func__, "agent");
260 	}
261 #endif
262 
263 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
264 		info("%s: Agent_cnt=%d agent_thread_cnt=%d with msg_type=%s retry_list_size=%d",
265 		     __func__, agent_cnt, agent_thread_cnt,
266 		     rpc_num2string(agent_arg_ptr->msg_type),
267 		     retry_list_size());
268 	}
269 	slurm_mutex_lock(&agent_cnt_mutex);
270 
271 	if (sched_update != slurmctld_conf.last_update) {
272 #ifdef HAVE_NATIVE_CRAY
273 		reboot_from_ctld = true;
274 #else
275 		char *ctld_params = slurm_get_slurmctld_params();
276 
277 		reboot_from_ctld = false;
278 		if (xstrcasestr(ctld_params, "reboot_from_controller"))
279 			reboot_from_ctld = true;
280 		xfree(ctld_params);
281 #endif
282 		sched_update = slurmctld_conf.last_update;
283 	}
284 
285 	rpc_thread_cnt = 2 + MIN(agent_arg_ptr->node_count, AGENT_THREAD_COUNT);
286 	while (1) {
287 		if (slurmctld_config.shutdown_time ||
288 		    ((agent_thread_cnt+rpc_thread_cnt) <= MAX_SERVER_THREADS)) {
289 			agent_cnt++;
290 			agent_thread_cnt += rpc_thread_cnt;
291 			break;
292 		} else {	/* wait for state change and retry */
293 			slurm_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
294 		}
295 	}
296 	slurm_mutex_unlock(&agent_cnt_mutex);
297 	if (slurmctld_config.shutdown_time)
298 		goto cleanup;
299 
300 	/* basic argument value tests */
301 	begin_time = time(NULL);
302 	if (_valid_agent_arg(agent_arg_ptr))
303 		goto cleanup;
304 
305 	if (reboot_from_ctld &&
306 	    (agent_arg_ptr->msg_type == REQUEST_REBOOT_NODES)) {
307 		_reboot_from_ctld(agent_arg_ptr);
308 		goto cleanup;
309 	}
310 
311 	/* initialize the agent data structures */
312 	agent_info_ptr = _make_agent_info(agent_arg_ptr);
313 	thread_ptr = agent_info_ptr->thread_struct;
314 
315 	/* start the watchdog thread */
316 	slurm_thread_create(&thread_wdog, _wdog, agent_info_ptr);
317 
318 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
319 		info("%s: New agent thread_count:%d threads_active:%d retry:%c get_reply:%c msg_type:%s protocol_version:%hu",
320 		     __func__, agent_info_ptr->thread_count,
321 		     agent_info_ptr->threads_active,
322 		     agent_info_ptr->retry ? 'T' : 'F',
323 		     agent_info_ptr->get_reply ? 'T' : 'F',
324 		     rpc_num2string(agent_arg_ptr->msg_type),
325 		     agent_info_ptr->protocol_version);
326 	}
327 	/* start all the other threads (up to AGENT_THREAD_COUNT active) */
328 	for (i = 0; i < agent_info_ptr->thread_count; i++) {
329 		/* wait until "room" for another thread */
330 		slurm_mutex_lock(&agent_info_ptr->thread_mutex);
331 		while (agent_info_ptr->threads_active >=
332 		       AGENT_THREAD_COUNT) {
333 			slurm_cond_wait(&agent_info_ptr->thread_cond,
334 					&agent_info_ptr->thread_mutex);
335 		}
336 
337 		/*
338 		 * create thread specific data,
339 		 * NOTE: freed from _thread_per_group_rpc()
340 		 */
341 		task_specific_ptr = _make_task_data(agent_info_ptr, i);
342 
343 		slurm_thread_create_detached(&thread_ptr[i].thread,
344 					     _thread_per_group_rpc,
345 					     task_specific_ptr);
346 		agent_info_ptr->threads_active++;
347 		slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
348 	}
349 
350 	/* Wait for termination of remaining threads */
351 	pthread_join(thread_wdog, NULL);
352 	delay = (int) difftime(time(NULL), begin_time);
353 	if (delay > (slurm_get_msg_timeout() * 2)) {
354 		info("agent msg_type=%u ran for %d seconds",
355 			agent_arg_ptr->msg_type,  delay);
356 	}
357 	slurm_mutex_lock(&agent_info_ptr->thread_mutex);
358 	while (agent_info_ptr->threads_active != 0) {
359 		slurm_cond_wait(&agent_info_ptr->thread_cond,
360 				&agent_info_ptr->thread_mutex);
361 	}
362 	slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
363 
364 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
365 		info("%s: end agent thread_count:%d threads_active:%d retry:%c get_reply:%c msg_type:%s protocol_version:%hu",
366 		     __func__, agent_info_ptr->thread_count,
367 		     agent_info_ptr->threads_active,
368 		     agent_info_ptr->retry ? 'T' : 'F',
369 		     agent_info_ptr->get_reply ? 'T' : 'F',
370 		     rpc_num2string(agent_arg_ptr->msg_type),
371 		     agent_info_ptr->protocol_version);
372 	}
373 
374 cleanup:
375 	_purge_agent_args(agent_arg_ptr);
376 
377 	if (agent_info_ptr) {
378 		xfree(agent_info_ptr->thread_struct);
379 		xfree(agent_info_ptr);
380 	}
381 	slurm_mutex_lock(&agent_cnt_mutex);
382 
383 	if (agent_cnt > 0) {
384 		agent_cnt--;
385 	} else {
386 		error("agent_cnt underflow");
387 		agent_cnt = 0;
388 	}
389 	if (agent_thread_cnt >= rpc_thread_cnt) {
390 		agent_thread_cnt -= rpc_thread_cnt;
391 	} else {
392 		error("agent_thread_cnt underflow");
393 		agent_thread_cnt = 0;
394 	}
395 
396 	if ((agent_thread_cnt + AGENT_THREAD_COUNT + 2) < MAX_SERVER_THREADS)
397 		spawn_retry_agent = true;
398 
399 	slurm_cond_broadcast(&agent_cnt_cond);
400 	slurm_mutex_unlock(&agent_cnt_mutex);
401 
402 	if (spawn_retry_agent)
403 		agent_trigger(RPC_RETRY_INTERVAL, true);
404 
405 	return NULL;
406 }
407 
408 /* Basic validity test of agent argument */
_valid_agent_arg(agent_arg_t * agent_arg_ptr)409 static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
410 {
411 	int hostlist_cnt;
412 
413 	xassert(agent_arg_ptr);
414 	xassert(agent_arg_ptr->hostlist);
415 
416 	if (agent_arg_ptr->node_count == 0)
417 		return SLURM_ERROR;	/* no messages to be sent */
418 	hostlist_cnt = hostlist_count(agent_arg_ptr->hostlist);
419 	if (agent_arg_ptr->node_count != hostlist_cnt) {
420 		error("%s: node_count RPC different from hosts listed (%d!=%d)",
421 		     __func__, agent_arg_ptr->node_count, hostlist_cnt);
422 		return SLURM_ERROR;	/* no messages to be sent */
423 	}
424 	return SLURM_SUCCESS;
425 }
426 
_make_agent_info(agent_arg_t * agent_arg_ptr)427 static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
428 {
429 	int i = 0, j = 0;
430 	agent_info_t *agent_info_ptr = NULL;
431 	thd_t *thread_ptr = NULL;
432 	int *span = NULL;
433 	int thr_count = 0;
434 	hostlist_t hl = NULL;
435 	char *name = NULL;
436 
437 	agent_info_ptr = xmalloc(sizeof(agent_info_t));
438 	slurm_mutex_init(&agent_info_ptr->thread_mutex);
439 	slurm_cond_init(&agent_info_ptr->thread_cond, NULL);
440 	agent_info_ptr->thread_count   = agent_arg_ptr->node_count;
441 	agent_info_ptr->retry          = agent_arg_ptr->retry;
442 	agent_info_ptr->threads_active = 0;
443 	thread_ptr = xcalloc(agent_info_ptr->thread_count, sizeof(thd_t));
444 	memset(thread_ptr, 0, (agent_info_ptr->thread_count * sizeof(thd_t)));
445 	agent_info_ptr->thread_struct  = thread_ptr;
446 	agent_info_ptr->msg_type       = agent_arg_ptr->msg_type;
447 	agent_info_ptr->msg_args_pptr  = &agent_arg_ptr->msg_args;
448 	agent_info_ptr->protocol_version = agent_arg_ptr->protocol_version;
449 
450 	if ((agent_arg_ptr->msg_type != REQUEST_JOB_NOTIFY)	&&
451 	    (agent_arg_ptr->msg_type != REQUEST_REBOOT_NODES)	&&
452 	    (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE)	&&
453 	    (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE_WITH_CONFIG) &&
454 	    (agent_arg_ptr->msg_type != REQUEST_SHUTDOWN)	&&
455 	    (agent_arg_ptr->msg_type != SRUN_EXEC)		&&
456 	    (agent_arg_ptr->msg_type != SRUN_TIMEOUT)		&&
457 	    (agent_arg_ptr->msg_type != SRUN_NODE_FAIL)		&&
458 	    (agent_arg_ptr->msg_type != SRUN_REQUEST_SUSPEND)	&&
459 	    (agent_arg_ptr->msg_type != SRUN_USER_MSG)		&&
460 	    (agent_arg_ptr->msg_type != SRUN_STEP_MISSING)	&&
461 	    (agent_arg_ptr->msg_type != SRUN_STEP_SIGNAL)	&&
462 	    (agent_arg_ptr->msg_type != SRUN_JOB_COMPLETE)) {
463 #ifdef HAVE_FRONT_END
464 		span = set_span(agent_arg_ptr->node_count,
465 				agent_arg_ptr->node_count);
466 #else
467 		/* Sending message to a possibly large number of slurmd.
468 		 * Push all message forwarding to slurmd in order to
469 		 * offload as much work from slurmctld as possible. */
470 		span = set_span(agent_arg_ptr->node_count, 1);
471 #endif
472 		agent_info_ptr->get_reply = true;
473 	} else {
474 		/* Message is going to one node (for srun) or we want
475 		 * it to get processed ASAP (SHUTDOWN or RECONFIGURE).
476 		 * Send the message directly to each node. */
477 		span = set_span(agent_arg_ptr->node_count,
478 				agent_arg_ptr->node_count);
479 	}
480 	i = 0;
481 	while (i < agent_info_ptr->thread_count) {
482 		thread_ptr[thr_count].state      = DSH_NEW;
483 		thread_ptr[thr_count].addr = agent_arg_ptr->addr;
484 		name = hostlist_shift(agent_arg_ptr->hostlist);
485 		if (!name) {
486 			debug3("no more nodes to send to");
487 			break;
488 		}
489 		hl = hostlist_create(name);
490 		if (thread_ptr[thr_count].addr && span[thr_count]) {
491 			debug("warning: you will only be sending this to %s",
492 			      name);
493 			span[thr_count] = 0;
494 		}
495 		free(name);
496 		i++;
497 		for (j = 0; j < span[thr_count]; j++) {
498 			name = hostlist_shift(agent_arg_ptr->hostlist);
499 			if (!name)
500 				break;
501 			hostlist_push_host(hl, name);
502 			free(name);
503 			i++;
504 		}
505 		hostlist_uniq(hl);
506 		thread_ptr[thr_count].nodelist =
507 			hostlist_ranged_string_xmalloc(hl);
508 		hostlist_destroy(hl);
509 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
510 			info("%s: sending msg_type %s to nodes %s",
511 			     __func__, rpc_num2string(agent_arg_ptr->msg_type),
512 			     thread_ptr[thr_count].nodelist);
513 
514 		}
515 		thr_count++;
516 	}
517 	xfree(span);
518 	agent_info_ptr->thread_count = thr_count;
519 	return agent_info_ptr;
520 }
521 
_make_task_data(agent_info_t * agent_info_ptr,int inx)522 static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
523 {
524 	task_info_t *task_info_ptr;
525 	task_info_ptr = xmalloc(sizeof(task_info_t));
526 
527 	task_info_ptr->thread_mutex_ptr  = &agent_info_ptr->thread_mutex;
528 	task_info_ptr->thread_cond_ptr   = &agent_info_ptr->thread_cond;
529 	task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
530 	task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
531 	task_info_ptr->get_reply         = agent_info_ptr->get_reply;
532 	task_info_ptr->msg_type          = agent_info_ptr->msg_type;
533 	task_info_ptr->msg_args_ptr      = *agent_info_ptr->msg_args_pptr;
534 	task_info_ptr->protocol_version  = agent_info_ptr->protocol_version;
535 
536 	return task_info_ptr;
537 }
538 
_update_wdog_state(thd_t * thread_ptr,state_t * state,thd_complete_t * thd_comp)539 static void _update_wdog_state(thd_t *thread_ptr,
540 			       state_t *state,
541 			       thd_complete_t *thd_comp)
542 {
543 	switch (*state) {
544 	case DSH_ACTIVE:
545 		thd_comp->work_done = false;
546 		if (thread_ptr->end_time <= thd_comp->now) {
547 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
548 				info("%s: agent thread %lu timed out", __func__,
549 				     (unsigned long) thread_ptr->thread);
550 			}
551 			if (pthread_kill(thread_ptr->thread, SIGUSR1) == ESRCH)
552 				*state = DSH_NO_RESP;
553 			else
554 				thread_ptr->end_time += message_timeout;
555 		}
556 		break;
557 	case DSH_NEW:
558 		thd_comp->work_done = false;
559 		break;
560 	case DSH_DONE:
561 		if (thd_comp->max_delay < (int)thread_ptr->end_time)
562 			thd_comp->max_delay = (int)thread_ptr->end_time;
563 		break;
564 	case DSH_NO_RESP:
565 		thd_comp->no_resp_cnt++;
566 		thd_comp->retry_cnt++;
567 		break;
568 	case DSH_FAILED:
569 	case DSH_DUP_JOBID:
570 		thd_comp->fail_cnt++;
571 		break;
572 	}
573 }
574 
575 /*
576  * _wdog - Watchdog thread. Send SIGUSR1 to threads which have been active
577  *	for too long.
578  * IN args - pointer to agent_info_t with info on threads to watch
579  * Sleep between polls with exponential times (from 0.005 to 1.0 second)
580  */
_wdog(void * args)581 static void *_wdog(void *args)
582 {
583 	bool srun_agent = false;
584 	int i;
585 	agent_info_t *agent_ptr = (agent_info_t *) args;
586 	thd_t *thread_ptr = agent_ptr->thread_struct;
587 	unsigned long usec = 5000;
588 	ListIterator itr;
589 	thd_complete_t thd_comp;
590 	ret_data_info_t *ret_data_info = NULL;
591 
592 	if ( (agent_ptr->msg_type == SRUN_JOB_COMPLETE)			||
593 	     (agent_ptr->msg_type == SRUN_REQUEST_SUSPEND)		||
594 	     (agent_ptr->msg_type == SRUN_STEP_MISSING)			||
595 	     (agent_ptr->msg_type == SRUN_STEP_SIGNAL)			||
596 	     (agent_ptr->msg_type == SRUN_EXEC)				||
597 	     (agent_ptr->msg_type == SRUN_NODE_FAIL)			||
598 	     (agent_ptr->msg_type == SRUN_PING)				||
599 	     (agent_ptr->msg_type == SRUN_TIMEOUT)			||
600 	     (agent_ptr->msg_type == SRUN_USER_MSG)			||
601 	     (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION)	||
602 	     (agent_ptr->msg_type == RESPONSE_HET_JOB_ALLOCATION) )
603 		srun_agent = true;
604 
605 	thd_comp.max_delay = 0;
606 
607 	while (1) {
608 		thd_comp.work_done   = true;/* assume all threads complete */
609 		thd_comp.fail_cnt    = 0;   /* assume no threads failures */
610 		thd_comp.no_resp_cnt = 0;   /* assume all threads respond */
611 		thd_comp.retry_cnt   = 0;   /* assume no required retries */
612 		thd_comp.now         = time(NULL);
613 
614 		usleep(usec);
615 		usec = MIN((usec * 2), 1000000);
616 
617 		slurm_mutex_lock(&agent_ptr->thread_mutex);
618 		for (i = 0; i < agent_ptr->thread_count; i++) {
619 			//info("thread name %s",thread_ptr[i].node_name);
620 			if (!thread_ptr[i].ret_list) {
621 				_update_wdog_state(&thread_ptr[i],
622 						   &thread_ptr[i].state,
623 						   &thd_comp);
624 			} else {
625 				itr = list_iterator_create(
626 					thread_ptr[i].ret_list);
627 				while ((ret_data_info = list_next(itr))) {
628 					_update_wdog_state(&thread_ptr[i],
629 							   &ret_data_info->err,
630 							   &thd_comp);
631 				}
632 				list_iterator_destroy(itr);
633 			}
634 		}
635 		if (thd_comp.work_done)
636 			break;
637 
638 		slurm_mutex_unlock(&agent_ptr->thread_mutex);
639 	}
640 
641 	if (srun_agent) {
642 		_notify_slurmctld_jobs(agent_ptr);
643 	} else {
644 		_notify_slurmctld_nodes(agent_ptr,
645 					thd_comp.no_resp_cnt,
646 					thd_comp.retry_cnt);
647 	}
648 
649 	for (i = 0; i < agent_ptr->thread_count; i++) {
650 		FREE_NULL_LIST(thread_ptr[i].ret_list);
651 		xfree(thread_ptr[i].nodelist);
652 	}
653 
654 	if (thd_comp.max_delay &&
655 	    (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT)) {
656 		info("%s: agent maximum delay %d seconds", __func__,
657 		     thd_comp.max_delay);
658 	}
659 
660 	slurm_mutex_unlock(&agent_ptr->thread_mutex);
661 	return (void *) NULL;
662 }
663 
_notify_slurmctld_jobs(agent_info_t * agent_ptr)664 static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
665 {
666 	/* Locks: Write job */
667 	slurmctld_lock_t job_write_lock =
668 	    { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
669 	uint32_t job_id = 0, step_id = 0;
670 	thd_t *thread_ptr = agent_ptr->thread_struct;
671 
672 	if        (agent_ptr->msg_type == SRUN_PING) {
673 		srun_ping_msg_t *msg = *agent_ptr->msg_args_pptr;
674 		job_id  = msg->job_id;
675 		step_id = msg->step_id;
676 	} else if (agent_ptr->msg_type == SRUN_TIMEOUT) {
677 		srun_timeout_msg_t *msg = *agent_ptr->msg_args_pptr;
678 		job_id  = msg->job_id;
679 		step_id = msg->step_id;
680 	} else if (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) {
681 		resource_allocation_response_msg_t *msg =
682 			*agent_ptr->msg_args_pptr;
683 		job_id  = msg->job_id;
684 		step_id = NO_VAL;
685 	} else if (agent_ptr->msg_type == RESPONSE_HET_JOB_ALLOCATION) {
686 		List het_alloc_list = *agent_ptr->msg_args_pptr;
687 		resource_allocation_response_msg_t *msg;
688 		if (!het_alloc_list || (list_count(het_alloc_list) == 0))
689 			return;
690 		msg = list_peek(het_alloc_list);
691 		job_id  = msg->job_id;
692 		step_id = NO_VAL;
693 	} else if ((agent_ptr->msg_type == SRUN_JOB_COMPLETE)		||
694 		   (agent_ptr->msg_type == SRUN_REQUEST_SUSPEND)	||
695 		   (agent_ptr->msg_type == SRUN_STEP_MISSING)		||
696 		   (agent_ptr->msg_type == SRUN_STEP_SIGNAL)		||
697 		   (agent_ptr->msg_type == SRUN_EXEC)			||
698 		   (agent_ptr->msg_type == SRUN_USER_MSG)) {
699 		return;		/* no need to note srun response */
700 	} else if (agent_ptr->msg_type == SRUN_NODE_FAIL) {
701 		return;		/* no need to note srun response */
702 	} else {
703 		error("%s: invalid msg_type %u", __func__, agent_ptr->msg_type);
704 		return;
705 	}
706 	lock_slurmctld(job_write_lock);
707 	if  (thread_ptr[0].state == DSH_DONE) {
708 		srun_response(job_id, step_id);
709 	}
710 
711 	unlock_slurmctld(job_write_lock);
712 }
713 
_notify_slurmctld_nodes(agent_info_t * agent_ptr,int no_resp_cnt,int retry_cnt)714 static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
715 				    int no_resp_cnt, int retry_cnt)
716 {
717 	ListIterator itr = NULL;
718 	ret_data_info_t *ret_data_info = NULL;
719 	state_t state;
720 	int is_ret_list = 1;
721 	/* Locks: Read config, write node */
722 	slurmctld_lock_t node_write_lock =
723 		{ .conf = READ_LOCK, .node = WRITE_LOCK };
724 	thd_t *thread_ptr = agent_ptr->thread_struct;
725 	int i;
726 
727 	/* Notify slurmctld of non-responding nodes */
728 	if (no_resp_cnt) {
729 		/* Update node table data for non-responding nodes */
730 		if (agent_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
731 			/* Requeue the request */
732 			batch_job_launch_msg_t *launch_msg_ptr =
733 					*agent_ptr->msg_args_pptr;
734 			uint32_t job_id = launch_msg_ptr->job_id;
735 			/* Locks: Write job, write node, read federation */
736 			slurmctld_lock_t job_write_lock =
737 				{ .job  = WRITE_LOCK,
738 				  .node = WRITE_LOCK,
739 				  .fed  = READ_LOCK };
740 
741 			lock_slurmctld(job_write_lock);
742 			job_complete(job_id, slurmctld_conf.slurm_user_id,
743 				     true, false, 0);
744 			unlock_slurmctld(job_write_lock);
745 		}
746 	}
747 	if (retry_cnt && agent_ptr->retry)
748 		_queue_agent_retry(agent_ptr, retry_cnt);
749 
750 	/* Update last_response on responding nodes */
751 	lock_slurmctld(node_write_lock);
752 	for (i = 0; i < agent_ptr->thread_count; i++) {
753 		char *down_msg, *node_names;
754 		slurm_msg_type_t resp_type = RESPONSE_SLURM_RC;
755 
756 		if (!thread_ptr[i].ret_list) {
757 			state = thread_ptr[i].state;
758 			is_ret_list = 0;
759 			goto switch_on_state;
760 		}
761 		is_ret_list = 1;
762 
763 		itr = list_iterator_create(thread_ptr[i].ret_list);
764 		while ((ret_data_info = list_next(itr))) {
765 			state = ret_data_info->err;
766 		switch_on_state:
767 			if (is_ret_list) {
768 				node_names = ret_data_info->node_name;
769 				resp_type = ret_data_info->type;
770 			} else
771 				node_names = thread_ptr[i].nodelist;
772 
773 			switch (state) {
774 			case DSH_NO_RESP:
775 				node_not_resp(node_names,
776 					      thread_ptr[i].start_time,
777 					      resp_type);
778 				break;
779 			case DSH_FAILED:
780 #ifdef HAVE_FRONT_END
781 				down_msg = "";
782 #else
783 				drain_nodes(node_names,
784 					    "Prolog/Epilog failure",
785 					    slurmctld_conf.slurm_user_id);
786 				down_msg = ", set to state DRAIN";
787 #endif
788 				error("Prolog/Epilog failure on nodes %s%s",
789 				      node_names, down_msg);
790 				break;
791 			case DSH_DUP_JOBID:
792 #ifdef HAVE_FRONT_END
793 				down_msg = "";
794 #else
795 				drain_nodes(node_names,
796 					    "Duplicate jobid",
797 					    slurmctld_conf.slurm_user_id);
798 				down_msg = ", set to state DRAIN";
799 #endif
800 				error("Duplicate jobid on nodes %s%s",
801 				      node_names, down_msg);
802 				break;
803 			case DSH_DONE:
804 				node_did_resp(node_names);
805 				break;
806 			default:
807 				error("unknown state returned for %s",
808 				      node_names);
809 				break;
810 			}
811 			if (!is_ret_list)
812 				goto finished;
813 		}
814 		list_iterator_destroy(itr);
815 finished:	;
816 	}
817 	unlock_slurmctld(node_write_lock);
818 	if (run_scheduler) {
819 		run_scheduler = false;
820 		/* below functions all have their own locking */
821 		if (schedule(0))	{
822 			schedule_job_save();
823 			schedule_node_save();
824 		}
825 	}
826 	if ((agent_ptr->msg_type == REQUEST_PING) ||
827 	    (agent_ptr->msg_type == REQUEST_HEALTH_CHECK) ||
828 	    (agent_ptr->msg_type == REQUEST_ACCT_GATHER_UPDATE) ||
829 	    (agent_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS))
830 		ping_end();
831 }
832 
833 /* Report a communications error for specified node
834  * This also gets logged as a non-responsive node */
_comm_err(char * node_name,slurm_msg_type_t msg_type)835 static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type)
836 {
837 	int rc = 1;
838 
839 	if ((rc = is_node_resp (node_name)))
840 		verbose("agent/is_node_resp: node:%s RPC:%s : %m",
841 			node_name, rpc_num2string(msg_type));
842 	return rc;
843 }
844 
845 /* return a value for which WEXITSTATUS() returns 1 */
_wif_status(void)846 static int _wif_status(void)
847 {
848 	static int rc = 0;
849 	int i;
850 
851 	if (rc)
852 		return rc;
853 
854 	rc = 1;
855 	for (i=0; i<64; i++) {
856 		if (WEXITSTATUS(rc))
857 			return rc;
858 		rc = rc << 1;
859 	}
860 	error("Could not identify WEXITSTATUS");
861 	rc = 1;
862 	return rc;
863 }
864 
865 /*
866  * _thread_per_group_rpc - thread to issue an RPC for a group of nodes
867  *                         sending message out to one and forwarding it to
868  *                         others if necessary.
869  * IN/OUT args - pointer to task_info_t, xfree'd on completion
870  */
_thread_per_group_rpc(void * args)871 static void *_thread_per_group_rpc(void *args)
872 {
873 	int rc = SLURM_SUCCESS;
874 	slurm_msg_t msg;
875 	task_info_t *task_ptr = (task_info_t *) args;
876 	/* we cache some pointers from task_info_t because we need
877 	 * to xfree args before being finished with their use. xfree
878 	 * is required for timely termination of this pthread because
879 	 * xfree could lock it at the end, preventing a timely
880 	 * thread_exit */
881 	pthread_mutex_t *thread_mutex_ptr   = task_ptr->thread_mutex_ptr;
882 	pthread_cond_t  *thread_cond_ptr    = task_ptr->thread_cond_ptr;
883 	uint32_t        *threads_active_ptr = task_ptr->threads_active_ptr;
884 	thd_t           *thread_ptr         = task_ptr->thread_struct_ptr;
885 	state_t thread_state = DSH_NO_RESP;
886 	slurm_msg_type_t msg_type = task_ptr->msg_type;
887 	bool is_kill_msg, srun_agent;
888 	List ret_list = NULL;
889 	ListIterator itr;
890 	ret_data_info_t *ret_data_info = NULL;
891 	int sig_array[2] = {SIGUSR1, 0};
892 	/* Locks: Write job, write node */
893 	slurmctld_lock_t job_write_lock = {
894 		NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
895 	/* Lock: Read node */
896 	slurmctld_lock_t node_read_lock = {
897 		NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
898 	/* Lock: Write node */
899 	slurmctld_lock_t node_write_lock = {
900 		NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
901 	uint32_t job_id;
902 
903 	xassert(args != NULL);
904 	xsignal(SIGUSR1, _sig_handler);
905 	xsignal_unblock(sig_array);
906 	is_kill_msg = (	(msg_type == REQUEST_KILL_TIMELIMIT)	||
907 			(msg_type == REQUEST_KILL_PREEMPTED)	||
908 			(msg_type == REQUEST_TERMINATE_JOB) );
909 	srun_agent = (	(msg_type == SRUN_PING)			||
910 			(msg_type == SRUN_EXEC)			||
911 			(msg_type == SRUN_JOB_COMPLETE)		||
912 			(msg_type == SRUN_STEP_MISSING)		||
913 			(msg_type == SRUN_STEP_SIGNAL)		||
914 			(msg_type == SRUN_TIMEOUT)		||
915 			(msg_type == SRUN_USER_MSG)		||
916 			(msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
917 			(msg_type == SRUN_NODE_FAIL) );
918 
919 	thread_ptr->start_time = time(NULL);
920 
921 	slurm_mutex_lock(thread_mutex_ptr);
922 	thread_ptr->state = DSH_ACTIVE;
923 	thread_ptr->end_time = thread_ptr->start_time + message_timeout;
924 	slurm_mutex_unlock(thread_mutex_ptr);
925 
926 	/* send request message */
927 	slurm_msg_t_init(&msg);
928 
929 	if (task_ptr->protocol_version)
930 		msg.protocol_version = task_ptr->protocol_version;
931 
932 	msg.msg_type = msg_type;
933 	msg.data     = task_ptr->msg_args_ptr;
934 
935 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
936 		info("%s: sending %s to %s", __func__, rpc_num2string(msg_type),
937 		     thread_ptr->nodelist);
938 	}
939 
940 	if (task_ptr->get_reply) {
941 		if (thread_ptr->addr) {
942 			msg.address = *thread_ptr->addr;
943 
944 			if (!(ret_list = slurm_send_addr_recv_msgs(
945 				     &msg, thread_ptr->nodelist, 0))) {
946 				error("%s: no ret_list given", __func__);
947 				goto cleanup;
948 			}
949 		} else {
950 			if (!(ret_list = slurm_send_recv_msgs(
951 				     thread_ptr->nodelist, &msg, 0))) {
952 				error("%s: no ret_list given", __func__);
953 				goto cleanup;
954 			}
955 		}
956 	} else {
957 		if (thread_ptr->addr) {
958 			//info("got the address");
959 			msg.address = *thread_ptr->addr;
960 		} else {
961 			//info("no address given");
962 			if (slurm_conf_get_addr(thread_ptr->nodelist,
963 					        &msg.address, msg.flags)
964 			    == SLURM_ERROR) {
965 				error("%s: can't find address for host %s, check slurm.conf",
966 				      __func__, thread_ptr->nodelist);
967 				goto cleanup;
968 			}
969 		}
970 		//info("sending %u to %s", msg_type, thread_ptr->nodelist);
971 		if (msg_type == SRUN_JOB_COMPLETE) {
972 			/*
973 			 * The srun runs as a single thread, while the kernel
974 			 * listen() may be queuing messages for further
975 			 * processing. If we get our SYN in the listen queue
976 			 * at the same time the last MESSAGE_TASK_EXIT is being
977 			 * processed, srun may exit meaning this message is
978 			 * never received, leading to a series of error
979 			 * messages from slurm_send_only_node_msg().
980 			 * So, we use this different function that blindly
981 			 * flings the message out and disregards any
982 			 * communication problems that may arise.
983 			 */
984 			slurm_send_msg_maybe(&msg);
985 			thread_state = DSH_DONE;
986 		} else if (slurm_send_only_node_msg(&msg) == SLURM_SUCCESS) {
987 			thread_state = DSH_DONE;
988 		} else {
989 			if (!srun_agent) {
990 				lock_slurmctld(node_read_lock);
991 				_comm_err(thread_ptr->nodelist, msg_type);
992 				unlock_slurmctld(node_read_lock);
993 			}
994 		}
995 		goto cleanup;
996 	}
997 
998 	//info("got %d messages back", list_count(ret_list));
999 	itr = list_iterator_create(ret_list);
1000 	while ((ret_data_info = list_next(itr))) {
1001 		rc = slurm_get_return_code(ret_data_info->type,
1002 					   ret_data_info->data);
1003 		/* SPECIAL CASE: Record node's CPU load */
1004 		if (ret_data_info->type == RESPONSE_PING_SLURMD) {
1005 			ping_slurmd_resp_msg_t *ping_resp;
1006 			ping_resp = (ping_slurmd_resp_msg_t *)
1007 				    ret_data_info->data;
1008 			lock_slurmctld(node_write_lock);
1009 			reset_node_load(ret_data_info->node_name,
1010 					ping_resp->cpu_load);
1011 			reset_node_free_mem(ret_data_info->node_name,
1012 					    ping_resp->free_mem);
1013 			unlock_slurmctld(node_write_lock);
1014 		}
1015 		/* SPECIAL CASE: Mark node as IDLE if job already complete */
1016 		if (is_kill_msg &&
1017 		    (rc == ESLURMD_KILL_JOB_ALREADY_COMPLETE)) {
1018 			kill_job_msg_t *kill_job;
1019 			kill_job = (kill_job_msg_t *)
1020 				task_ptr->msg_args_ptr;
1021 			rc = SLURM_SUCCESS;
1022 			lock_slurmctld(job_write_lock);
1023 			if (job_epilog_complete(kill_job->job_id,
1024 						ret_data_info->node_name, rc))
1025 				run_scheduler = true;
1026 			unlock_slurmctld(job_write_lock);
1027 		}
1028 
1029 		/* SPECIAL CASE: Record node's CPU load */
1030 		if (ret_data_info->type == RESPONSE_ACCT_GATHER_UPDATE) {
1031 			lock_slurmctld(node_write_lock);
1032 			update_node_record_acct_gather_data(
1033 				ret_data_info->data);
1034 			unlock_slurmctld(node_write_lock);
1035 		}
1036 
1037 		/* SPECIAL CASE: Requeue/hold non-startable batch job,
1038 		 * Requeue job prolog failure or duplicate job ID */
1039 		if ((msg_type == REQUEST_BATCH_JOB_LAUNCH) &&
1040 		    (rc != SLURM_SUCCESS) && (rc != ESLURMD_PROLOG_FAILED) &&
1041 		    (rc != ESLURM_DUPLICATE_JOB_ID) &&
1042 		    (ret_data_info->type != RESPONSE_FORWARD_FAILED)) {
1043 			batch_job_launch_msg_t *launch_msg_ptr =
1044 				task_ptr->msg_args_ptr;
1045 			job_id = launch_msg_ptr->job_id;
1046 			info("Killing non-startable batch JobId=%u: %s",
1047 			     job_id, slurm_strerror(rc));
1048 			thread_state = DSH_DONE;
1049 			ret_data_info->err = thread_state;
1050 			lock_slurmctld(job_write_lock);
1051 			job_complete(job_id, slurmctld_conf.slurm_user_id,
1052 				     false, false, _wif_status());
1053 			unlock_slurmctld(job_write_lock);
1054 			continue;
1055 		} else if ((msg_type == RESPONSE_RESOURCE_ALLOCATION) &&
1056 			   (rc == SLURM_COMMUNICATIONS_CONNECTION_ERROR)) {
1057 			/* Communication issue to srun that launched the job
1058 			 * Cancel rather than leave a stray-but-empty job
1059 			 * behind on the allocated nodes. */
1060 			resource_allocation_response_msg_t *msg_ptr =
1061 				task_ptr->msg_args_ptr;
1062 			job_id = msg_ptr->job_id;
1063 			info("Killing interactive JobId=%u: %s",
1064 			     job_id, slurm_strerror(rc));
1065 			thread_state = DSH_FAILED;
1066 			lock_slurmctld(job_write_lock);
1067 			job_complete(job_id, slurmctld_conf.slurm_user_id,
1068 				     false, false, _wif_status());
1069 			unlock_slurmctld(job_write_lock);
1070 			continue;
1071 		} else if ((msg_type == RESPONSE_HET_JOB_ALLOCATION) &&
1072 			   (rc == SLURM_COMMUNICATIONS_CONNECTION_ERROR)) {
1073 			/* Communication issue to srun that launched the job
1074 			 * Cancel rather than leave a stray-but-empty job
1075 			 * behind on the allocated nodes. */
1076 			List het_alloc_list = task_ptr->msg_args_ptr;
1077 			resource_allocation_response_msg_t *msg_ptr;
1078 			if (!het_alloc_list ||
1079 			    (list_count(het_alloc_list) == 0))
1080 				continue;
1081 			msg_ptr = list_peek(het_alloc_list);
1082 			job_id = msg_ptr->job_id;
1083 			info("Killing interactive JobId=%u: %s",
1084 			     job_id, slurm_strerror(rc));
1085 			thread_state = DSH_FAILED;
1086 			lock_slurmctld(job_write_lock);
1087 			job_complete(job_id, slurmctld_conf.slurm_user_id,
1088 				     false, false, _wif_status());
1089 			unlock_slurmctld(job_write_lock);
1090 			continue;
1091 		}
1092 
1093 		if (msg_type == REQUEST_SIGNAL_TASKS) {
1094 			job_record_t *job_ptr;
1095 			signal_tasks_msg_t *msg_ptr =
1096 				task_ptr->msg_args_ptr;
1097 
1098 			if ((msg_ptr->signal == SIGCONT) ||
1099 			    (msg_ptr->signal == SIGSTOP)) {
1100 				job_id = msg_ptr->job_id;
1101 				lock_slurmctld(job_write_lock);
1102 				job_ptr = find_job_record(job_id);
1103 				if (job_ptr == NULL) {
1104 					info("%s: invalid JobId=%u",
1105 					     __func__, job_id);
1106 				} else if (rc == SLURM_SUCCESS) {
1107 					if (msg_ptr->signal == SIGSTOP) {
1108 						job_ptr->job_state |=
1109 							JOB_STOPPED;
1110 					} else { // SIGCONT
1111 						job_ptr->job_state &=
1112 							~JOB_STOPPED;
1113 					}
1114 				}
1115 
1116 				if (job_ptr)
1117 					job_ptr->job_state &= ~JOB_SIGNALING;
1118 
1119 				unlock_slurmctld(job_write_lock);
1120 			}
1121 		}
1122 
1123 		if (((msg_type == REQUEST_SIGNAL_TASKS) ||
1124 		     (msg_type == REQUEST_TERMINATE_TASKS)) &&
1125 		     (rc == ESRCH)) {
1126 			/* process is already dead, not a real error */
1127 			rc = SLURM_SUCCESS;
1128 		}
1129 
1130 		switch (rc) {
1131 		case SLURM_SUCCESS:
1132 			/* debug("agent processed RPC to node %s", */
1133 			/*       ret_data_info->node_name); */
1134 			thread_state = DSH_DONE;
1135 			break;
1136 		case SLURM_UNKNOWN_FORWARD_ADDR:
1137 			error("We were unable to forward message to '%s'.  "
1138 			      "Make sure the slurm.conf for each slurmd "
1139 			      "contain all other nodes in your system.",
1140 			      ret_data_info->node_name);
1141 			thread_state = DSH_NO_RESP;
1142 			break;
1143 		case ESLURMD_EPILOG_FAILED:
1144 			error("Epilog failure on host %s, "
1145 			      "setting DOWN",
1146 			      ret_data_info->node_name);
1147 
1148 			thread_state = DSH_FAILED;
1149 			break;
1150 		case ESLURMD_PROLOG_FAILED:
1151 			thread_state = DSH_FAILED;
1152 			break;
1153 		case ESLURM_DUPLICATE_JOB_ID:
1154 			thread_state = DSH_DUP_JOBID;
1155 			break;
1156 		case ESLURM_INVALID_JOB_ID:
1157 			/* Not indicative of a real error */
1158 		case ESLURMD_JOB_NOTRUNNING:
1159 			/* Not indicative of a real error */
1160 			if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
1161 				info("%s: RPC to node %s failed, job not running",
1162 				     __func__, ret_data_info->node_name);
1163 			}
1164 			thread_state = DSH_DONE;
1165 			break;
1166 		default:
1167 			if (!srun_agent) {
1168 				if (ret_data_info->err)
1169 					errno = ret_data_info->err;
1170 				else
1171 					errno = rc;
1172 				lock_slurmctld(node_read_lock);
1173 				rc = _comm_err(ret_data_info->node_name,
1174 					       msg_type);
1175 				unlock_slurmctld(node_read_lock);
1176 			}
1177 
1178 			if (srun_agent)
1179 				thread_state = DSH_FAILED;
1180 			else if (rc || (ret_data_info->type ==
1181 					RESPONSE_FORWARD_FAILED))
1182 				/* check if a forward failed */
1183 				thread_state = DSH_NO_RESP;
1184 			else {	/* some will fail that don't mean anything went
1185 				 * bad like a job term request on a job that is
1186 				 * already finished, we will just exit on those
1187 				 * cases */
1188 				thread_state = DSH_DONE;
1189 			}
1190 		}
1191 		ret_data_info->err = thread_state;
1192 	}
1193 	list_iterator_destroy(itr);
1194 
1195 cleanup:
1196 	xfree(args);
1197 	if (!ret_list && (msg_type == REQUEST_SIGNAL_TASKS)) {
1198 		job_record_t *job_ptr;
1199 		signal_tasks_msg_t *msg_ptr =
1200 			task_ptr->msg_args_ptr;
1201 		if ((msg_ptr->signal == SIGCONT) ||
1202 		    (msg_ptr->signal == SIGSTOP)) {
1203 			job_id = msg_ptr->job_id;
1204 			lock_slurmctld(job_write_lock);
1205 			job_ptr = find_job_record(job_id);
1206 			if (job_ptr)
1207 				job_ptr->job_state &= ~JOB_SIGNALING;
1208 			unlock_slurmctld(job_write_lock);
1209 		}
1210 	}
1211 	/* handled at end of thread just in case resend is needed */
1212 	destroy_forward(&msg.forward);
1213 	slurm_mutex_lock(thread_mutex_ptr);
1214 	thread_ptr->ret_list = ret_list;
1215 	thread_ptr->state = thread_state;
1216 	thread_ptr->end_time = (time_t) difftime(time(NULL),
1217 						 thread_ptr->start_time);
1218 	/* Signal completion so another thread can replace us */
1219 	(*threads_active_ptr)--;
1220 	slurm_cond_signal(thread_cond_ptr);
1221 	slurm_mutex_unlock(thread_mutex_ptr);
1222 	return (void *) NULL;
1223 }
1224 
1225 /*
1226  * Signal handler.  We are really interested in interrupting hung communictions
1227  * and causing them to return EINTR. Multiple interrupts might be required.
1228  */
_sig_handler(int dummy)1229 static void _sig_handler(int dummy)
1230 {
1231 }
1232 
_setup_requeue(agent_arg_t * agent_arg_ptr,thd_t * thread_ptr,int * count,int * spot)1233 static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
1234 			  int *count, int *spot)
1235 {
1236 #ifdef HAVE_FRONT_END
1237 	front_end_record_t *node_ptr;
1238 #else
1239 	node_record_t *node_ptr;
1240 #endif
1241 	ret_data_info_t *ret_data_info = NULL;
1242 	ListIterator itr;
1243 	int rc = 0;
1244 
1245 	itr = list_iterator_create(thread_ptr->ret_list);
1246 	while ((ret_data_info = list_next(itr))) {
1247 		if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT)
1248 			info("%s: got err of %d", __func__, ret_data_info->err);
1249 		if (ret_data_info->err != DSH_NO_RESP)
1250 			continue;
1251 
1252 #ifdef HAVE_FRONT_END
1253 		node_ptr = find_front_end_record(ret_data_info->node_name);
1254 #else
1255 		node_ptr = find_node_record(ret_data_info->node_name);
1256 #endif
1257 		if (node_ptr &&
1258 		    (IS_NODE_DOWN(node_ptr) || IS_NODE_POWER_SAVE(node_ptr))) {
1259 			--(*count);
1260 		} else if (agent_arg_ptr) {
1261 			debug("%s: got the name %s to resend out of %d",
1262 			      __func__, ret_data_info->node_name, *count);
1263 			hostlist_push_host(agent_arg_ptr->hostlist,
1264 				      ret_data_info->node_name);
1265 			++(*spot);
1266 		}
1267 
1268 		if (*spot == *count) {
1269 			rc = 1;
1270 			break;
1271 		}
1272 	}
1273 	list_iterator_destroy(itr);
1274 
1275 	return rc;
1276 }
1277 
1278 /*
1279  * _queue_agent_retry - Queue any failed RPCs for later replay
1280  * IN agent_info_ptr - pointer to info on completed agent requests
1281  * IN count - number of agent requests which failed, count to requeue
1282  */
_queue_agent_retry(agent_info_t * agent_info_ptr,int count)1283 static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count)
1284 {
1285 #ifdef HAVE_FRONT_END
1286 	front_end_record_t *node_ptr;
1287 #else
1288 	node_record_t *node_ptr;
1289 #endif
1290 	agent_arg_t *agent_arg_ptr;
1291 	queued_request_t *queued_req_ptr = NULL;
1292 	thd_t *thread_ptr = agent_info_ptr->thread_struct;
1293 	int i, j;
1294 
1295 	if (count == 0)
1296 		return;
1297 
1298 	/* build agent argument with just the RPCs to retry */
1299 	agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
1300 	agent_arg_ptr->node_count = count;
1301 	agent_arg_ptr->retry = 1;
1302 	agent_arg_ptr->hostlist = hostlist_create(NULL);
1303 	agent_arg_ptr->msg_type = agent_info_ptr->msg_type;
1304 	agent_arg_ptr->msg_args = *(agent_info_ptr->msg_args_pptr);
1305 	*(agent_info_ptr->msg_args_pptr) = NULL;
1306 
1307 	j = 0;
1308 	for (i = 0; i < agent_info_ptr->thread_count; i++) {
1309 		if (!thread_ptr[i].ret_list) {
1310 			if (thread_ptr[i].state != DSH_NO_RESP)
1311 				continue;
1312 
1313 			debug("got the name %s to resend",
1314 			      thread_ptr[i].nodelist);
1315 #ifdef HAVE_FRONT_END
1316 			node_ptr = find_front_end_record(
1317 						thread_ptr[i].nodelist);
1318 #else
1319 			node_ptr = find_node_record(thread_ptr[i].nodelist);
1320 #endif
1321 			if (node_ptr &&
1322 			    (IS_NODE_DOWN(node_ptr) ||
1323 			     IS_NODE_POWER_SAVE(node_ptr))) {
1324 				/* Do not re-send RPC to DOWN node */
1325 				if (count)
1326 					count--;
1327 			} else {
1328 				hostlist_push_host(agent_arg_ptr->hostlist,
1329 						   thread_ptr[i].nodelist);
1330 				j++;
1331 			}
1332 			if (j == count)
1333 				break;
1334 		} else {
1335 			if (_setup_requeue(agent_arg_ptr, &thread_ptr[i],
1336 					   &count, &j))
1337 				break;
1338 		}
1339 	}
1340 	if (count == 0) {
1341 		/* All non-responding nodes are DOWN.
1342 		 * Do not requeue, but discard this RPC */
1343 		hostlist_destroy(agent_arg_ptr->hostlist);
1344 		*(agent_info_ptr->msg_args_pptr) = agent_arg_ptr->msg_args;
1345 		xfree(agent_arg_ptr);
1346 		return;
1347 	}
1348 	if (count != j) {
1349 		error("agent: Retry count (%d) != actual count (%d)",
1350 			count, j);
1351 		agent_arg_ptr->node_count = j;
1352 	}
1353 	debug2("Queue RPC msg_type=%u, nodes=%d for retry",
1354 	       agent_arg_ptr->msg_type, j);
1355 
1356 	/* add the requeust to a list */
1357 	queued_req_ptr = xmalloc(sizeof(queued_request_t));
1358 	queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
1359 	queued_req_ptr->last_attempt  = time(NULL);
1360 	slurm_mutex_lock(&retry_mutex);
1361 	if (retry_list == NULL)
1362 		retry_list = list_create(_list_delete_retry);
1363 	(void) list_append(retry_list, (void *) queued_req_ptr);
1364 	slurm_mutex_unlock(&retry_mutex);
1365 }
1366 
1367 /*
1368  * _list_delete_retry - delete an entry from the retry list,
1369  *	see common/list.h for documentation
1370  */
_list_delete_retry(void * retry_entry)1371 static void _list_delete_retry(void *retry_entry)
1372 {
1373 	queued_request_t *queued_req_ptr;
1374 
1375 	if (! retry_entry)
1376 		return;
1377 
1378 	queued_req_ptr = (queued_request_t *) retry_entry;
1379 	_purge_agent_args(queued_req_ptr->agent_arg_ptr);
1380 	xfree(queued_req_ptr);
1381 }
1382 
1383 /* Start a thread to manage queued agent requests */
_agent_init(void * arg)1384 static void *_agent_init(void *arg)
1385 {
1386 	int min_wait;
1387 	bool mail_too;
1388 	struct timespec ts = {0, 0};
1389 	time_t last_defer_attempt = (time_t) 0;
1390 
1391 	while (true) {
1392 		slurm_mutex_lock(&pending_mutex);
1393 		while (!slurmctld_config.shutdown_time &&
1394 		       !pending_mail && (pending_wait_time == NO_VAL16)) {
1395 			ts.tv_sec  = time(NULL) + 2;
1396 			slurm_cond_timedwait(&pending_cond, &pending_mutex,
1397 					     &ts);
1398 		}
1399 		if (slurmctld_config.shutdown_time) {
1400 			slurm_mutex_unlock(&pending_mutex);
1401 			break;
1402 		}
1403 		mail_too = pending_mail;
1404 		min_wait = pending_wait_time;
1405 		pending_mail = false;
1406 		pending_wait_time = NO_VAL16;
1407 		slurm_mutex_unlock(&pending_mutex);
1408 
1409 		if (last_defer_attempt + 2 < last_job_update) {
1410 			last_defer_attempt = time(NULL);
1411 			_agent_defer();
1412 		}
1413 
1414 		_agent_retry(min_wait, mail_too);
1415 	}
1416 
1417 	slurm_mutex_lock(&pending_mutex);
1418 	pending_thread_running = false;
1419 	slurm_mutex_unlock(&pending_mutex);
1420 	return NULL;
1421 }
1422 
agent_init(void)1423 extern void agent_init(void)
1424 {
1425 	slurm_mutex_lock(&pending_mutex);
1426 	if (pending_thread_running) {
1427 		error("%s: thread already running", __func__);
1428 		slurm_mutex_unlock(&pending_mutex);
1429 		return;
1430 	}
1431 
1432 	slurm_thread_create_detached(NULL, _agent_init, NULL);
1433 	pending_thread_running = true;
1434 	slurm_mutex_unlock(&pending_mutex);
1435 }
1436 
1437 /*
1438  * agent_trigger - Request processing of pending RPCs
1439  * IN min_wait - Minimum wait time between re-issue of a pending RPC
1440  * IN mail_too - Send pending email too, note this performed using a
1441  *	fork/waitpid, so it can take longer than just creating a pthread
1442  *	to send RPCs
1443  */
agent_trigger(int min_wait,bool mail_too)1444 extern void agent_trigger(int min_wait, bool mail_too)
1445 {
1446 	if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
1447 		info("%s: pending_wait_time=%d->%d mail_too=%c->%c Agent_cnt=%d agent_thread_cnt=%d retry_list_size=%d",
1448 		     __func__, pending_wait_time, min_wait,
1449 		     mail_too ?  'T' : 'F',
1450 		     pending_mail ? 'T' : 'F',
1451 		     agent_cnt, agent_thread_cnt,
1452 		     retry_list_size());
1453 	}
1454 
1455 	slurm_mutex_lock(&pending_mutex);
1456 	if ((pending_wait_time == NO_VAL16) ||
1457 	    (pending_wait_time >  min_wait))
1458 		pending_wait_time = min_wait;
1459 	if (mail_too)
1460 		pending_mail = mail_too;
1461 	slurm_cond_broadcast(&pending_cond);
1462 	slurm_mutex_unlock(&pending_mutex);
1463 }
1464 
1465 /* agent_pack_pending_rpc_stats - pack counts of pending RPCs into a buffer */
agent_pack_pending_rpc_stats(Buf buffer)1466 extern void agent_pack_pending_rpc_stats(Buf buffer)
1467 {
1468 	time_t now;
1469 	int i;
1470 	queued_request_t *queued_req_ptr = NULL;
1471 	agent_arg_t *agent_arg_ptr = NULL;
1472 	ListIterator list_iter;
1473 
1474 	now = time(NULL);
1475 	if (difftime(now, cache_build_time) <= RPC_PACK_MAX_AGE)
1476 		goto pack_it;	/* Send cached data */
1477 	cache_build_time = now;
1478 
1479 	if (rpc_stat_counts) {	/* Clear existing data */
1480 		stat_type_count = 0;
1481 		memset(rpc_stat_counts, 0, sizeof(uint32_t) * MAX_RPC_PACK_CNT);
1482 		memset(rpc_stat_types,  0, sizeof(uint32_t) * MAX_RPC_PACK_CNT);
1483 
1484 		rpc_count = 0;
1485 		/* the other variables need not be cleared */
1486 	} else {		/* Allocate buffers for data */
1487 		stat_type_count = 0;
1488 		rpc_stat_counts = xcalloc(MAX_RPC_PACK_CNT, sizeof(uint32_t));
1489 		rpc_stat_types  = xcalloc(MAX_RPC_PACK_CNT, sizeof(uint32_t));
1490 
1491 		rpc_count = 0;
1492 		rpc_host_list = xcalloc(DUMP_RPC_COUNT, sizeof(char *));
1493 		for (i = 0; i < DUMP_RPC_COUNT; i++) {
1494 			rpc_host_list[i] = xmalloc(HOSTLIST_MAX_SIZE);
1495 		}
1496 		rpc_type_list = xcalloc(DUMP_RPC_COUNT, sizeof(uint32_t));
1497 	}
1498 
1499 	slurm_mutex_lock(&retry_mutex);
1500 	if (retry_list) {
1501 		list_iter = list_iterator_create(retry_list);
1502 		/* iterate through list, find type slot or make a new one */
1503 		while ((queued_req_ptr = list_next(list_iter))) {
1504 			agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1505 			if (rpc_count < DUMP_RPC_COUNT) {
1506 				rpc_type_list[rpc_count] =
1507 						agent_arg_ptr->msg_type;
1508 				hostlist_ranged_string(agent_arg_ptr->hostlist,
1509 						HOSTLIST_MAX_SIZE,
1510 						rpc_host_list[rpc_count]);
1511 				rpc_count++;
1512 			}
1513 			for (i = 0; i < MAX_RPC_PACK_CNT; i++) {
1514 				if (rpc_stat_types[i] == 0) {
1515 					rpc_stat_types[i] =
1516 						agent_arg_ptr->msg_type;
1517 					stat_type_count++;
1518 				} else if (rpc_stat_types[i] !=
1519 					   agent_arg_ptr->msg_type)
1520 					continue;
1521 				rpc_stat_counts[i]++;
1522 				break;
1523 			}
1524 		}
1525 		list_iterator_destroy(list_iter);
1526 	}
1527 	slurm_mutex_unlock(&retry_mutex);
1528 
1529 pack_it:
1530 	pack32_array(rpc_stat_types,  stat_type_count, buffer);
1531 	pack32_array(rpc_stat_counts, stat_type_count, buffer);
1532 
1533 	pack32_array(rpc_type_list, rpc_count, buffer);
1534 	packstr_array(rpc_host_list, rpc_count, buffer);
1535 }
1536 
_agent_defer(void)1537 static void _agent_defer(void)
1538 {
1539 	int rc = -1;
1540 	queued_request_t *queued_req_ptr = NULL;
1541 	agent_arg_t *agent_arg_ptr = NULL;
1542 	/* Write lock on jobs */
1543 	slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
1544 
1545 	lock_slurmctld(job_write_lock);
1546 	slurm_mutex_lock(&defer_mutex);
1547 	if (defer_list) {
1548 		List tmp_list = NULL;
1549 		/* first try to find a new (never tried) record */
1550 		while ((queued_req_ptr = list_pop(defer_list))) {
1551 			agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1552 			if (agent_arg_ptr->msg_type ==
1553 			    REQUEST_BATCH_JOB_LAUNCH)
1554 				rc = _batch_launch_defer(queued_req_ptr);
1555 			else if (agent_arg_ptr->msg_type ==
1556 				 REQUEST_SIGNAL_TASKS)
1557 				rc = _signal_defer(queued_req_ptr);
1558 			else
1559 				fatal("%s: Invalid message type (%u)",
1560 				      __func__, agent_arg_ptr->msg_type);
1561 
1562 			if (rc == -1) {   /* abort request */
1563 				_purge_agent_args(
1564 					queued_req_ptr->agent_arg_ptr);
1565 				xfree(queued_req_ptr);
1566 			} else if (rc == 0) {
1567 				/* ready to process now, move to retry_list */
1568 				slurm_mutex_lock(&retry_mutex);
1569 				if (!retry_list)
1570 					retry_list =
1571 						list_create(_list_delete_retry);
1572 				list_append(retry_list, queued_req_ptr);
1573 				slurm_mutex_unlock(&retry_mutex);
1574 			} else if (rc == 1) {
1575 				if (!tmp_list)
1576 					tmp_list =
1577 						list_create(_list_delete_retry);
1578 				list_append(tmp_list, (void *)queued_req_ptr);
1579 			}
1580 		}
1581 
1582 		if (tmp_list) {
1583 			list_transfer(defer_list, tmp_list);
1584 			FREE_NULL_LIST(tmp_list);
1585 		}
1586 	}
1587 
1588 	slurm_mutex_unlock(&defer_mutex);
1589 	unlock_slurmctld(job_write_lock);
1590 
1591 	return;
1592 }
1593 
1594 /* Do the work requested by agent_retry (retry pending RPCs).
1595  * This is a separate thread so the job records can be locked */
_agent_retry(int min_wait,bool mail_too)1596 static void _agent_retry(int min_wait, bool mail_too)
1597 {
1598 	time_t now = time(NULL);
1599 	queued_request_t *queued_req_ptr = NULL;
1600 	agent_arg_t *agent_arg_ptr = NULL;
1601 	ListIterator retry_iter;
1602 	mail_info_t *mi = NULL;
1603 
1604 	slurm_mutex_lock(&retry_mutex);
1605 	if (retry_list) {
1606 		static time_t last_msg_time = (time_t) 0;
1607 		uint32_t msg_type[5] = {0, 0, 0, 0, 0};
1608 		int i = 0, list_size = list_count(retry_list);
1609 		if (((list_size > 100) &&
1610 		     (difftime(now, last_msg_time) > 300)) ||
1611 		    ((list_size > 0) &&
1612 		     (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT))) {
1613 			/* Note sizable backlog (retry_list_size()) of work */
1614 			retry_iter = list_iterator_create(retry_list);
1615 			while ((queued_req_ptr = list_next(retry_iter))) {
1616 				agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1617 				msg_type[i++] = agent_arg_ptr->msg_type;
1618 				if (i == 5)
1619 					break;
1620 			}
1621 			list_iterator_destroy(retry_iter);
1622 			info("   retry_list retry_list_size:%d msg_type=%s,%s,%s,%s,%s",
1623 			     list_size, rpc_num2string(msg_type[0]),
1624 			     rpc_num2string(msg_type[1]),
1625 			     rpc_num2string(msg_type[2]),
1626 			     rpc_num2string(msg_type[3]),
1627 			     rpc_num2string(msg_type[4]));
1628 			last_msg_time = now;
1629 		}
1630 	}
1631 
1632 	slurm_mutex_lock(&agent_cnt_mutex);
1633 	if (agent_thread_cnt + AGENT_THREAD_COUNT + 2 > MAX_SERVER_THREADS) {
1634 		/* too much work already */
1635 		slurm_mutex_unlock(&agent_cnt_mutex);
1636 		slurm_mutex_unlock(&retry_mutex);
1637 		return;
1638 	}
1639 	slurm_mutex_unlock(&agent_cnt_mutex);
1640 
1641 	if (retry_list) {
1642 		/* first try to find a new (never tried) record */
1643 		retry_iter = list_iterator_create(retry_list);
1644 		while ((queued_req_ptr = list_next(retry_iter))) {
1645  			if (queued_req_ptr->last_attempt == 0) {
1646 				list_remove(retry_iter);
1647 				break;		/* Process this request now */
1648 			}
1649 		}
1650 		list_iterator_destroy(retry_iter);
1651 	}
1652 
1653 	if (retry_list && (queued_req_ptr == NULL)) {
1654 		/* now try to find a requeue request that is
1655 		 * relatively old */
1656 		double age = 0;
1657 
1658 		retry_iter = list_iterator_create(retry_list);
1659 		/* next try to find an older record to retry */
1660 		while ((queued_req_ptr = list_next(retry_iter))) {
1661 			age = difftime(now, queued_req_ptr->last_attempt);
1662 			if (age > min_wait) {
1663 				list_remove(retry_iter);
1664 				break;
1665 			}
1666 		}
1667 		list_iterator_destroy(retry_iter);
1668 	}
1669 	slurm_mutex_unlock(&retry_mutex);
1670 
1671 	if (queued_req_ptr) {
1672 		agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1673 		xfree(queued_req_ptr);
1674 		if (agent_arg_ptr) {
1675 			debug2("Spawning RPC agent for msg_type %s",
1676 			       rpc_num2string(agent_arg_ptr->msg_type));
1677 			slurm_thread_create_detached(NULL, agent, agent_arg_ptr);
1678 		} else
1679 			error("agent_retry found record with no agent_args");
1680 	} else if (mail_too) {
1681 		slurm_mutex_lock(&agent_cnt_mutex);
1682 		slurm_mutex_lock(&mail_mutex);
1683 		while (mail_list && (agent_thread_cnt < MAX_SERVER_THREADS) &&
1684 		       (mail_thread_cnt < MAX_MAIL_THREADS)) {
1685 			mi = (mail_info_t *) list_dequeue(mail_list);
1686 			if (!mi)
1687 				break;
1688 
1689 			mail_thread_cnt++;
1690 			agent_thread_cnt++;
1691 			slurm_thread_create_detached(NULL, _mail_proc, mi);
1692 		}
1693 		slurm_mutex_unlock(&mail_mutex);
1694 		slurm_mutex_unlock(&agent_cnt_mutex);
1695 	}
1696 
1697 	return;
1698 }
1699 
1700 /*
1701  * agent_queue_request - put a new request on the queue for execution or
1702  * 	execute now if not too busy
1703  * IN agent_arg_ptr - the request to enqueue
1704  */
agent_queue_request(agent_arg_t * agent_arg_ptr)1705 void agent_queue_request(agent_arg_t *agent_arg_ptr)
1706 {
1707 	queued_request_t *queued_req_ptr = NULL;
1708 
1709 	if ((AGENT_THREAD_COUNT + 2) >= MAX_SERVER_THREADS)
1710 		fatal("AGENT_THREAD_COUNT value is too high relative to MAX_SERVER_THREADS");
1711 
1712 	if (message_timeout == NO_VAL16) {
1713 		message_timeout = MAX(slurm_get_msg_timeout(), 30);
1714 	}
1715 
1716 	if (agent_arg_ptr->msg_type == REQUEST_SHUTDOWN) {
1717 		/* execute now */
1718 		slurm_thread_create_detached(NULL, agent, agent_arg_ptr);
1719 		/* give agent a chance to start */
1720 		usleep(10000);
1721 		return;
1722 	}
1723 
1724 	queued_req_ptr = xmalloc(sizeof(queued_request_t));
1725 	queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
1726 /*	queued_req_ptr->last_attempt  = 0; Implicit */
1727 
1728 	if (((agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) &&
1729 	     (_batch_launch_defer(queued_req_ptr) != 0)) ||
1730 	    ((agent_arg_ptr->msg_type == REQUEST_SIGNAL_TASKS) &&
1731 	     (_signal_defer(queued_req_ptr) != 0))) {
1732 		slurm_mutex_lock(&defer_mutex);
1733 		if (defer_list == NULL)
1734 			defer_list = list_create(_list_delete_retry);
1735 		list_append(defer_list, (void *)queued_req_ptr);
1736 		slurm_mutex_unlock(&defer_mutex);
1737 	} else {
1738 		slurm_mutex_lock(&retry_mutex);
1739 		if (retry_list == NULL)
1740 			retry_list = list_create(_list_delete_retry);
1741 		list_append(retry_list, (void *)queued_req_ptr);
1742 		slurm_mutex_unlock(&retry_mutex);
1743 	}
1744 	/* now process the request in a separate pthread
1745 	 * (if we can create another pthread to do so) */
1746 	agent_trigger(999, false);
1747 }
1748 
1749 /* agent_purge - purge all pending RPC requests */
agent_purge(void)1750 extern void agent_purge(void)
1751 {
1752 	int i;
1753 
1754 	if (retry_list) {
1755 		slurm_mutex_lock(&retry_mutex);
1756 		FREE_NULL_LIST(retry_list);
1757 		slurm_mutex_unlock(&retry_mutex);
1758 	}
1759 	if (defer_list) {
1760 		slurm_mutex_lock(&defer_mutex);
1761 		FREE_NULL_LIST(defer_list);
1762 		slurm_mutex_unlock(&defer_mutex);
1763 	}
1764 	if (mail_list) {
1765 		slurm_mutex_lock(&mail_mutex);
1766 		FREE_NULL_LIST(mail_list);
1767 		slurm_mutex_unlock(&mail_mutex);
1768 	}
1769 
1770 	xfree(rpc_stat_counts);
1771 	xfree(rpc_stat_types);
1772 	xfree(rpc_type_list);
1773 	if (rpc_host_list) {
1774 		for (i = 0; i < DUMP_RPC_COUNT; i++)
1775 			xfree(rpc_host_list[i]);
1776 		xfree(rpc_host_list);
1777 	}
1778 }
1779 
get_agent_count(void)1780 extern int get_agent_count(void)
1781 {
1782 	int cnt;
1783 
1784 	slurm_mutex_lock(&agent_cnt_mutex);
1785 	cnt = agent_cnt;
1786 	slurm_mutex_unlock(&agent_cnt_mutex);
1787 
1788 	return cnt;
1789 }
1790 
get_agent_thread_count(void)1791 extern int get_agent_thread_count(void)
1792 {
1793 	int cnt;
1794 
1795 	slurm_mutex_lock(&agent_cnt_mutex);
1796 	cnt = agent_thread_cnt;
1797 	slurm_mutex_unlock(&agent_cnt_mutex);
1798 
1799 	return cnt;
1800 }
1801 
_purge_agent_args(agent_arg_t * agent_arg_ptr)1802 static void _purge_agent_args(agent_arg_t *agent_arg_ptr)
1803 {
1804 	if (agent_arg_ptr == NULL)
1805 		return;
1806 
1807 	hostlist_destroy(agent_arg_ptr->hostlist);
1808 	xfree(agent_arg_ptr->addr);
1809 	if (agent_arg_ptr->msg_args) {
1810 		if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
1811 			slurm_free_job_launch_msg(agent_arg_ptr->msg_args);
1812 		} else if (agent_arg_ptr->msg_type ==
1813 				RESPONSE_RESOURCE_ALLOCATION) {
1814 			resource_allocation_response_msg_t *alloc_msg =
1815 				agent_arg_ptr->msg_args;
1816 			/* NULL out working_cluster_rec because it's pointing to
1817 			 * the actual cluster_rec. */
1818 			alloc_msg->working_cluster_rec = NULL;
1819 			slurm_free_resource_allocation_response_msg(
1820 					agent_arg_ptr->msg_args);
1821 		} else if (agent_arg_ptr->msg_type ==
1822 				RESPONSE_HET_JOB_ALLOCATION) {
1823 			List alloc_list = agent_arg_ptr->msg_args;
1824 			FREE_NULL_LIST(alloc_list);
1825 		} else if ((agent_arg_ptr->msg_type == REQUEST_ABORT_JOB)    ||
1826 			 (agent_arg_ptr->msg_type == REQUEST_TERMINATE_JOB)  ||
1827 			 (agent_arg_ptr->msg_type == REQUEST_KILL_PREEMPTED) ||
1828 			 (agent_arg_ptr->msg_type == REQUEST_KILL_TIMELIMIT))
1829 			slurm_free_kill_job_msg(agent_arg_ptr->msg_args);
1830 		else if (agent_arg_ptr->msg_type == SRUN_USER_MSG)
1831 			slurm_free_srun_user_msg(agent_arg_ptr->msg_args);
1832 		else if (agent_arg_ptr->msg_type == SRUN_EXEC)
1833 			slurm_free_srun_exec_msg(agent_arg_ptr->msg_args);
1834 		else if (agent_arg_ptr->msg_type == SRUN_NODE_FAIL)
1835 			slurm_free_srun_node_fail_msg(agent_arg_ptr->msg_args);
1836 		else if (agent_arg_ptr->msg_type == SRUN_STEP_MISSING)
1837 			slurm_free_srun_step_missing_msg(
1838 				agent_arg_ptr->msg_args);
1839 		else if (agent_arg_ptr->msg_type == SRUN_STEP_SIGNAL)
1840 			slurm_free_job_step_kill_msg(
1841 				agent_arg_ptr->msg_args);
1842 		else if (agent_arg_ptr->msg_type == REQUEST_JOB_NOTIFY)
1843 			slurm_free_job_notify_msg(agent_arg_ptr->msg_args);
1844 		else if (agent_arg_ptr->msg_type == REQUEST_SUSPEND_INT)
1845 			slurm_free_suspend_int_msg(agent_arg_ptr->msg_args);
1846 		else if (agent_arg_ptr->msg_type == REQUEST_LAUNCH_PROLOG)
1847 			slurm_free_prolog_launch_msg(agent_arg_ptr->msg_args);
1848 		else
1849 			xfree(agent_arg_ptr->msg_args);
1850 	}
1851 	xfree(agent_arg_ptr);
1852 }
1853 
_mail_alloc(void)1854 static mail_info_t *_mail_alloc(void)
1855 {
1856 	return xmalloc(sizeof(mail_info_t));
1857 }
1858 
_mail_free(void * arg)1859 static void _mail_free(void *arg)
1860 {
1861 	mail_info_t *mi = (mail_info_t *) arg;
1862 
1863 	if (mi) {
1864 		xfree(mi->user_name);
1865 		xfree(mi->message);
1866 		xfree(mi);
1867 	}
1868 }
1869 
_build_mail_env(void)1870 static char **_build_mail_env(void)
1871 {
1872 	char **my_env;
1873 
1874         my_env = xmalloc(sizeof(char *));
1875         my_env[0] = NULL;
1876         if (slurmctld_conf.cluster_name) {
1877                 setenvf(&my_env, "SLURM_CLUSTER_NAME", "%s",
1878                         slurmctld_conf.cluster_name);
1879         }
1880 
1881 	 return my_env;
1882 }
1883 
1884 /* process an email request and free the record */
_mail_proc(void * arg)1885 static void *_mail_proc(void *arg)
1886 {
1887 	mail_info_t *mi = (mail_info_t *) arg;
1888 	pid_t pid;
1889 
1890 	pid = fork();
1891 	if (pid < 0) {		/* error */
1892 		error("fork(): %m");
1893 	} else if (pid == 0) {	/* child */
1894 		int fd_0, fd_1, fd_2, i;
1895 		char **my_env = NULL;
1896 		my_env = _build_mail_env();
1897 		for (i = 0; i < 1024; i++)
1898 			(void) close(i);
1899 		if ((fd_0 = open("/dev/null", O_RDWR)) == -1)	// fd = 0
1900 			error("Couldn't open /dev/null: %m");
1901 		if ((fd_1 = dup(fd_0)) == -1)			// fd = 1
1902 			error("Couldn't do a dup on fd 1: %m");
1903 		if ((fd_2 = dup(fd_0)) == -1)			// fd = 2
1904 			error("Couldn't do a dup on fd 2 %m");
1905 		execle(slurmctld_conf.mail_prog, "mail",
1906 			"-s", mi->message, mi->user_name,
1907 			NULL, my_env);
1908 		error("Failed to exec %s: %m",
1909 			slurmctld_conf.mail_prog);
1910 		_exit(1);
1911 	} else {		/* parent */
1912 		waitpid(pid, NULL, 0);
1913 	}
1914 	_mail_free(mi);
1915 	slurm_mutex_lock(&agent_cnt_mutex);
1916 	slurm_mutex_lock(&mail_mutex);
1917 	if (agent_thread_cnt)
1918 		agent_thread_cnt--;
1919 	else
1920 		error("agent_thread_cnt underflow");
1921 	if (mail_thread_cnt)
1922 		mail_thread_cnt--;
1923 	else
1924 		error("mail_thread_cnt underflow");
1925 	slurm_mutex_unlock(&mail_mutex);
1926 	slurm_mutex_unlock(&agent_cnt_mutex);
1927 
1928 	return (void *) NULL;
1929 }
1930 
_mail_type_str(uint16_t mail_type)1931 static char *_mail_type_str(uint16_t mail_type)
1932 {
1933 	if (mail_type == MAIL_JOB_BEGIN)
1934 		return "Began";
1935 	if (mail_type == MAIL_JOB_END)
1936 		return "Ended";
1937 	if (mail_type == MAIL_JOB_FAIL)
1938 		return "Failed";
1939 	if (mail_type == MAIL_JOB_REQUEUE)
1940 		return "Requeued";
1941 	if (mail_type == MAIL_JOB_STAGE_OUT)
1942 		return "StageOut/Teardown";
1943 	if (mail_type == MAIL_JOB_TIME100)
1944 		return "Reached time limit";
1945 	if (mail_type == MAIL_JOB_TIME90)
1946 		return "Reached 90% of time limit";
1947 	if (mail_type == MAIL_JOB_TIME80)
1948 		return "Reached 80% of time limit";
1949 	if (mail_type == MAIL_JOB_TIME50)
1950 		return "Reached 50% of time limit";
1951 	return "unknown";
1952 }
1953 
_set_job_time(job_record_t * job_ptr,uint16_t mail_type,char * buf,int buf_len)1954 static void _set_job_time(job_record_t *job_ptr, uint16_t mail_type,
1955 			  char *buf, int buf_len)
1956 {
1957 	time_t interval = NO_VAL;
1958 
1959 	buf[0] = '\0';
1960 	if ((mail_type == MAIL_JOB_BEGIN) && job_ptr->start_time &&
1961 	    job_ptr->details && job_ptr->details->submit_time) {
1962 		interval = job_ptr->start_time - job_ptr->details->submit_time;
1963 		snprintf(buf, buf_len, ", Queued time ");
1964 		secs2time_str(interval, buf+14, buf_len-14);
1965 		return;
1966 	}
1967 
1968 	if (((mail_type == MAIL_JOB_END) || (mail_type == MAIL_JOB_FAIL) ||
1969 	     (mail_type == MAIL_JOB_REQUEUE)) &&
1970 	    (job_ptr->start_time && job_ptr->end_time)) {
1971 		if (job_ptr->suspend_time) {
1972 			interval  = job_ptr->end_time - job_ptr->suspend_time;
1973 			interval += job_ptr->pre_sus_time;
1974 		} else
1975 			interval = job_ptr->end_time - job_ptr->start_time;
1976 		snprintf(buf, buf_len, ", Run time ");
1977 		secs2time_str(interval, buf+11, buf_len-11);
1978 		return;
1979 	}
1980 
1981 	if (((mail_type == MAIL_JOB_TIME100) ||
1982 	     (mail_type == MAIL_JOB_TIME90)  ||
1983 	     (mail_type == MAIL_JOB_TIME80)  ||
1984 	     (mail_type == MAIL_JOB_TIME50)) && job_ptr->start_time) {
1985 		if (job_ptr->suspend_time) {
1986 			interval  = time(NULL) - job_ptr->suspend_time;
1987 			interval += job_ptr->pre_sus_time;
1988 		} else
1989 			interval = time(NULL) - job_ptr->start_time;
1990 		snprintf(buf, buf_len, ", Run time ");
1991 		secs2time_str(interval, buf+11, buf_len-11);
1992 		return;
1993 	}
1994 
1995 	if ((mail_type == MAIL_JOB_STAGE_OUT) && job_ptr->end_time) {
1996 		interval = time(NULL) - job_ptr->end_time;
1997 		snprintf(buf, buf_len, " time ");
1998 		secs2time_str(interval, buf + 6, buf_len - 6);
1999 		return;
2000 	}
2001 }
2002 
_set_job_term_info(job_record_t * job_ptr,uint16_t mail_type,char * buf,int buf_len)2003 static void _set_job_term_info(job_record_t *job_ptr, uint16_t mail_type,
2004 			       char *buf, int buf_len)
2005 {
2006 	buf[0] = '\0';
2007 
2008 	if ((mail_type == MAIL_JOB_END) || (mail_type == MAIL_JOB_FAIL)) {
2009 		uint16_t base_state;
2010 		uint32_t exit_status_min, exit_status_max;
2011 		int exit_code_min, exit_code_max;
2012 
2013 		base_state = job_ptr->job_state & JOB_STATE_BASE;
2014 		if (job_ptr->array_recs &&
2015 		    !(job_ptr->mail_type & MAIL_ARRAY_TASKS)) {
2016 			/* Summarize array tasks. */
2017 			exit_status_min = job_ptr->array_recs->min_exit_code;
2018 			exit_status_max = job_ptr->array_recs->max_exit_code;
2019 			if (WIFEXITED(exit_status_min) &&
2020 			    WIFEXITED(exit_status_max)) {
2021 				char *state_string;
2022 				exit_code_min = WEXITSTATUS(exit_status_min);
2023 				exit_code_max = WEXITSTATUS(exit_status_max);
2024 				if ((exit_code_min == 0) && (exit_code_max > 0))
2025 					state_string = "Mixed";
2026 				else {
2027 					state_string =
2028 						job_state_string(base_state);
2029 				}
2030 				snprintf(buf, buf_len, ", %s, ExitCode [%d-%d]",
2031 					 state_string, exit_code_min,
2032 					 exit_code_max);
2033 			} else if (WIFSIGNALED(exit_status_max)) {
2034 				exit_code_max = WTERMSIG(exit_status_max);
2035 				snprintf(buf, buf_len, ", %s, MaxSignal [%d]",
2036 					 "Mixed", exit_code_max);
2037 			} else if (WIFEXITED(exit_status_max)) {
2038 				exit_code_max = WEXITSTATUS(exit_status_max);
2039 				snprintf(buf, buf_len, ", %s, MaxExitCode [%d]",
2040 					 "Mixed", exit_code_max);
2041 			} else {
2042 				snprintf(buf, buf_len, ", %s",
2043 					 job_state_string(base_state));
2044 			}
2045 
2046 			if (job_ptr->array_recs->array_flags &
2047 			    ARRAY_TASK_REQUEUED)
2048 				strncat(buf, ", with requeued tasks",
2049 					buf_len - strlen(buf) - 1);
2050 		} else {
2051 			exit_status_max = job_ptr->exit_code;
2052 			if (WIFEXITED(exit_status_max)) {
2053 				exit_code_max = WEXITSTATUS(exit_status_max);
2054 				snprintf(buf, buf_len, ", %s, ExitCode %d",
2055 					 job_state_string(base_state),
2056 					 exit_code_max);
2057 			} else {
2058 				snprintf(buf, buf_len, ", %s",
2059 					 job_state_string(base_state));
2060 			}
2061 		}
2062 	} else if (buf_len > 0) {
2063 		buf[0] = '\0';
2064 	}
2065 }
2066 
2067 /*
2068  * mail_job_info - Send e-mail notice of job state change
2069  * IN job_ptr - job identification
2070  * IN state_type - job transition type, see MAIL_JOB in slurm.h
2071  */
mail_job_info(job_record_t * job_ptr,uint16_t mail_type)2072 extern void mail_job_info(job_record_t *job_ptr, uint16_t mail_type)
2073 {
2074 	char job_time[128], term_msg[128];
2075 	mail_info_t *mi;
2076 
2077 	/*
2078 	 * Send mail only for first component (leader) of a hetjob,
2079 	 * not the individual job components.
2080 	 */
2081 	if (job_ptr->het_job_id && (job_ptr->het_job_offset != 0))
2082 		return;
2083 
2084 	mi = _mail_alloc();
2085 	mi->user_name = xstrdup(job_ptr->mail_user);
2086 
2087 	/* Use job array master record, if available */
2088 	if (!(job_ptr->mail_type & MAIL_ARRAY_TASKS) &&
2089 	    (job_ptr->array_task_id != NO_VAL) && !job_ptr->array_recs) {
2090 		job_record_t *master_job_ptr;
2091 		master_job_ptr = find_job_record(job_ptr->array_job_id);
2092 		if (master_job_ptr && master_job_ptr->array_recs)
2093 			job_ptr = master_job_ptr;
2094 	}
2095 
2096 	_set_job_time(job_ptr, mail_type, job_time, sizeof(job_time));
2097 	_set_job_term_info(job_ptr, mail_type, term_msg, sizeof(term_msg));
2098 	if (job_ptr->array_recs && !(job_ptr->mail_type & MAIL_ARRAY_TASKS)) {
2099 		mi->message = xstrdup_printf("Slurm Array Summary Job_id=%u_* (%u) Name=%s "
2100 					     "%s%s",
2101 					     job_ptr->array_job_id,
2102 					     job_ptr->job_id, job_ptr->name,
2103 					     _mail_type_str(mail_type),
2104 					     term_msg);
2105 	} else if (job_ptr->array_task_id != NO_VAL) {
2106 		mi->message = xstrdup_printf("Slurm Array Task Job_id=%u_%u (%u) Name=%s "
2107 					     "%s%s%s",
2108 					     job_ptr->array_job_id,
2109 					     job_ptr->array_task_id,
2110 					     job_ptr->job_id, job_ptr->name,
2111 					     _mail_type_str(mail_type),
2112 					     job_time, term_msg);
2113 	} else {
2114 		mi->message = xstrdup_printf("Slurm Job_id=%u Name=%s %s%s%s",
2115 					     job_ptr->job_id, job_ptr->name,
2116 					     _mail_type_str(mail_type),
2117 					     job_time, term_msg);
2118 	}
2119 	info("email msg to %s: %s", mi->user_name, mi->message);
2120 
2121 	slurm_mutex_lock(&mail_mutex);
2122 	if (!mail_list)
2123 		mail_list = list_create(_mail_free);
2124 	(void) list_enqueue(mail_list, (void *) mi);
2125 	slurm_mutex_unlock(&mail_mutex);
2126 	return;
2127 }
2128 
2129 /* Test if a batch launch request should be defered
2130  * RET -1: abort the request, pending job cancelled
2131  *      0: execute the request now
2132  *      1: defer the request
2133  */
_batch_launch_defer(queued_request_t * queued_req_ptr)2134 static int _batch_launch_defer(queued_request_t *queued_req_ptr)
2135 {
2136 	agent_arg_t *agent_arg_ptr;
2137 	batch_job_launch_msg_t *launch_msg_ptr;
2138 	time_t now = time(NULL);
2139 	job_record_t *job_ptr;
2140 	int nodes_ready = 0, tmp = 0;
2141 
2142 	agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
2143 	if (difftime(now, queued_req_ptr->last_attempt) < 10) {
2144 		/* Reduce overhead by only testing once every 10 secs */
2145 		return 1;
2146 	}
2147 
2148 	launch_msg_ptr = (batch_job_launch_msg_t *)agent_arg_ptr->msg_args;
2149 	job_ptr = find_job_record(launch_msg_ptr->job_id);
2150 	if ((job_ptr == NULL) ||
2151 	    (!IS_JOB_RUNNING(job_ptr) && !IS_JOB_SUSPENDED(job_ptr))) {
2152 		info("agent(batch_launch): removed pending request for cancelled JobId=%u",
2153 		     launch_msg_ptr->job_id);
2154 		return -1;	/* job cancelled while waiting */
2155 	}
2156 
2157 	if (job_ptr->details && job_ptr->details->prolog_running) {
2158 		debug2("%s: JobId=%u still waiting on %u prologs",
2159 		       __func__, job_ptr->job_id,
2160 		       job_ptr->details->prolog_running);
2161 		return 1;
2162 	}
2163 
2164 	if (job_ptr->wait_all_nodes) {
2165 		(void) job_node_ready(launch_msg_ptr->job_id, &tmp);
2166 		if (tmp == (READY_JOB_STATE | READY_NODE_STATE)) {
2167 			nodes_ready = 1;
2168 			if (launch_msg_ptr->alias_list &&
2169 			    !xstrcmp(launch_msg_ptr->alias_list, "TBD")) {
2170 				/* Update launch RPC with correct node
2171 				 * aliases */
2172 				xfree(launch_msg_ptr->alias_list);
2173 				launch_msg_ptr->alias_list = xstrdup(job_ptr->
2174 								     alias_list);
2175 			}
2176 		}
2177 	} else {
2178 #ifdef HAVE_FRONT_END
2179 		nodes_ready = 1;
2180 #else
2181 		node_record_t *node_ptr;
2182 		char *hostname;
2183 
2184 		hostname = hostlist_deranged_string_xmalloc(
2185 					agent_arg_ptr->hostlist);
2186 		node_ptr = find_node_record(hostname);
2187 		if (node_ptr == NULL) {
2188 			error("agent(batch_launch) removed pending request for JobId=%u, missing node %s",
2189 			      launch_msg_ptr->job_id, hostname);
2190 			xfree(hostname);
2191 			return -1;	/* invalid request?? */
2192 		}
2193 		xfree(hostname);
2194 		if (!IS_NODE_POWER_SAVE(node_ptr) &&
2195 		    !IS_NODE_NO_RESPOND(node_ptr)) {
2196 			nodes_ready = 1;
2197 		}
2198 #endif
2199 	}
2200 
2201 	if (nodes_ready) {
2202 		if (IS_JOB_CONFIGURING(job_ptr))
2203 			job_config_fini(job_ptr);
2204 		queued_req_ptr->last_attempt = (time_t) 0;
2205 		return 0;
2206 	}
2207 
2208 	if (queued_req_ptr->last_attempt == 0) {
2209 		queued_req_ptr->first_attempt = now;
2210 		queued_req_ptr->last_attempt  = now;
2211 	} else if (difftime(now, queued_req_ptr->first_attempt) >=
2212 				 slurm_get_resume_timeout()) {
2213 		/* Nodes will get marked DOWN and job requeued, if possible */
2214 		error("agent waited too long for nodes to respond, abort launch of JobId=%u",
2215 		      job_ptr->job_id);
2216 		return -1;
2217 	}
2218 
2219 	queued_req_ptr->last_attempt  = now;
2220 	return 1;
2221 }
2222 
2223 /* Test if a job signal request should be defered
2224  * RET -1: abort the request
2225  *      0: execute the request now
2226  *      1: defer the request
2227  */
_signal_defer(queued_request_t * queued_req_ptr)2228 static int _signal_defer(queued_request_t *queued_req_ptr)
2229 {
2230 	agent_arg_t *agent_arg_ptr;
2231 	signal_tasks_msg_t *signal_msg_ptr;
2232 	time_t now = time(NULL);
2233 	job_record_t *job_ptr;
2234 
2235 	agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
2236 	signal_msg_ptr = (signal_tasks_msg_t *)agent_arg_ptr->msg_args;
2237 	job_ptr = find_job_record(signal_msg_ptr->job_id);
2238 
2239 	if (job_ptr == NULL) {
2240 		info("agent(signal_task): removed pending request for cancelled JobId=%u",
2241 		     signal_msg_ptr->job_id);
2242 		return -1;	/* job cancelled while waiting */
2243 	}
2244 
2245 	if (job_ptr->state_reason != WAIT_PROLOG)
2246 		return 0;
2247 
2248 	if (queued_req_ptr->first_attempt == 0) {
2249 		queued_req_ptr->first_attempt = now;
2250 	} else if (difftime(now, queued_req_ptr->first_attempt) >=
2251 				 2 * slurm_get_batch_start_timeout()) {
2252 		error("agent waited too long for nodes to respond, abort signal of JobId=%u",
2253 		      job_ptr->job_id);
2254 		return -1;
2255 	}
2256 
2257 	return 1;
2258 }
2259 
2260 /* Return length of agent's retry_list */
retry_list_size(void)2261 extern int retry_list_size(void)
2262 {
2263 	if (retry_list == NULL)
2264 		return 0;
2265 	return list_count(retry_list);
2266 }
2267 
_reboot_from_ctld(agent_arg_t * agent_arg_ptr)2268 static void _reboot_from_ctld(agent_arg_t *agent_arg_ptr)
2269 {
2270 	char *argv[3], *pname;
2271 	pid_t child;
2272 	int i, rc, status = 0;
2273 
2274 	if (!agent_arg_ptr->hostlist) {
2275 		error("%s: hostlist is NULL", __func__);
2276 		return;
2277 	}
2278 	if (!slurmctld_conf.reboot_program) {
2279 		error("%s: RebootProgram is NULL", __func__);
2280 		return;
2281 	}
2282 
2283 	pname = strrchr(slurmctld_conf.reboot_program, '/');
2284 	if (pname)
2285 		argv[0] = pname + 1;
2286 	else
2287 		argv[0] = slurmctld_conf.reboot_program;
2288 	argv[1] = hostlist_deranged_string_xmalloc(agent_arg_ptr->hostlist);
2289 	argv[2] = NULL;
2290 
2291 	child = fork();
2292 	if (child == 0) {
2293 		for (i = 0; i < 1024; i++)
2294 			(void) close(i);
2295 		(void) setpgid(0, 0);
2296 		(void) execv(slurmctld_conf.reboot_program, argv);
2297 		_exit(1);
2298 	} else if (child < 0) {
2299 		error("fork: %m");
2300 	} else {
2301 		(void) waitpid(child, &status, 0);
2302 		if (WIFEXITED(status)) {
2303 			rc = WEXITSTATUS(status);
2304 			if (rc != 0) {
2305 				error("RebootProgram exit status of %d",
2306 				      rc);
2307 			}
2308 		} else if (WIFSIGNALED(status)) {
2309 			error("RebootProgram signaled: %s",
2310 			      strsignal(WTERMSIG(status)));
2311 		}
2312 	}
2313 	xfree(argv[1]);
2314 }
2315