1 /*****************************************************************************\
2 * agent.c - parallel background communication functions. This is where
3 * logic could be placed for broadcast communications.
4 *****************************************************************************
5 * Copyright (C) 2002-2007 The Regents of the University of California.
6 * Copyright (C) 2008-2010 Lawrence Livermore National Security.
7 * Portions Copyright (C) 2010-2015 SchedMD LLC <https://www.schedmd.com>.
8 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
9 * Written by Morris Jette <jette1@llnl.gov>, et. al.
10 * Derived from pdsh written by Jim Garlick <garlick1@llnl.gov>
11 * CODE-OCEC-09-009. All rights reserved.
12 *
13 * This file is part of Slurm, a resource management program.
14 * For details, see <https://slurm.schedmd.com/>.
15 * Please also read the included file: DISCLAIMER.
16 *
17 * Slurm is free software; you can redistribute it and/or modify it under
18 * the terms of the GNU General Public License as published by the Free
19 * Software Foundation; either version 2 of the License, or (at your option)
20 * any later version.
21 *
22 * In addition, as a special exception, the copyright holders give permission
23 * to link the code of portions of this program with the OpenSSL library under
24 * certain conditions as described in each individual source file, and
25 * distribute linked combinations including the two. You must obey the GNU
26 * General Public License in all respects for all of the code used other than
27 * OpenSSL. If you modify file(s) with this exception, you may extend this
28 * exception to your version of the file(s), but you are not obligated to do
29 * so. If you do not wish to do so, delete this exception statement from your
30 * version. If you delete this exception statement from all source files in
31 * the program, then also delete it here.
32 *
33 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
34 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
35 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
36 * details.
37 *
38 * You should have received a copy of the GNU General Public License along
39 * with Slurm; if not, write to the Free Software Foundation, Inc.,
40 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
41 *****************************************************************************
42 * Theory of operation:
43 *
44 * The functions below permit slurm to initiate parallel tasks as a
45 * detached thread and let the functions below make sure the work happens.
46 * For example, when a job's time limit is to be changed slurmctld needs
47 * to notify the slurmd on every node to which the job was allocated.
48 * We don't want to hang slurmctld's primary function (the job update RPC)
49 * to perform this work, so it just initiates an agent to perform the work.
50 * The agent is passed all details required to perform the work, so it will
51 * be possible to execute the agent as an pthread, process, or even a daemon
52 * on some other computer.
53 *
54 * The main agent thread creates a separate thread for each node to be
55 * communicated with up to AGENT_THREAD_COUNT. A special watchdog thread
56 * sends SIGLARM to any threads that have been active (in DSH_ACTIVE state)
57 * for more than MessageTimeout seconds.
58 * The agent responds to slurmctld via a function call or an RPC as required.
59 * For example, informing slurmctld that some node is not responding.
60 *
61 * All the state for each thread is maintained in thd_t struct, which is
62 * used by the watchdog thread as well as the communication threads.
63 \*****************************************************************************/
64
65 #include "config.h"
66
67 #if HAVE_SYS_PRCTL_H
68 #include <sys/prctl.h>
69 #endif
70
71 #include <errno.h>
72 #include <pthread.h>
73 #include <pwd.h>
74 #include <signal.h>
75 #include <stdlib.h>
76 #include <string.h>
77 #include <sys/types.h>
78 #include <sys/wait.h>
79 #include <unistd.h>
80
81 #include "src/common/forward.h"
82 #include "src/common/list.h"
83 #include "src/common/log.h"
84 #include "src/common/macros.h"
85 #include "src/common/node_select.h"
86 #include "src/common/parse_time.h"
87 #include "src/common/slurm_protocol_api.h"
88 #include "src/common/slurm_protocol_interface.h"
89 #include "src/common/uid.h"
90 #include "src/common/xsignal.h"
91 #include "src/common/xassert.h"
92 #include "src/common/xmalloc.h"
93 #include "src/common/xstring.h"
94 #include "src/slurmctld/agent.h"
95 #include "src/slurmctld/front_end.h"
96 #include "src/slurmctld/job_scheduler.h"
97 #include "src/slurmctld/locks.h"
98 #include "src/slurmctld/ping_nodes.h"
99 #include "src/slurmctld/slurmctld.h"
100 #include "src/slurmctld/state_save.h"
101 #include "src/slurmctld/srun_comm.h"
102
103 #define MAX_RETRIES 100
104 #define MAX_RPC_PACK_CNT 100
105 #define RPC_PACK_MAX_AGE 30 /* Rebuild data over 30 seconds old */
106 #define DUMP_RPC_COUNT 25
107 #define HOSTLIST_MAX_SIZE 80
108
109 typedef enum {
110 DSH_NEW, /* Request not yet started */
111 DSH_ACTIVE, /* Request in progress */
112 DSH_DONE, /* Request completed normally */
113 DSH_NO_RESP, /* Request timed out */
114 DSH_FAILED, /* Request resulted in error */
115 DSH_DUP_JOBID /* Request resulted in duplicate job ID error */
116 } state_t;
117
118 typedef struct thd_complete {
119 bool work_done; /* assume all threads complete */
120 int fail_cnt; /* assume no threads failures */
121 int no_resp_cnt; /* assume all threads respond */
122 int retry_cnt; /* assume no required retries */
123 int max_delay;
124 time_t now;
125 } thd_complete_t;
126
127 typedef struct thd {
128 pthread_t thread; /* thread ID */
129 state_t state; /* thread state */
130 time_t start_time; /* start time */
131 time_t end_time; /* end time or delta time
132 * upon termination */
133 slurm_addr_t *addr; /* specific addr to send to
134 * will not do nodelist if set */
135 char *nodelist; /* list of nodes to send to */
136 List ret_list;
137 } thd_t;
138
139 typedef struct agent_info {
140 pthread_mutex_t thread_mutex; /* agent specific mutex */
141 pthread_cond_t thread_cond; /* agent specific condition */
142 uint32_t thread_count; /* number of threads records */
143 uint32_t threads_active; /* currently active threads */
144 uint16_t retry; /* if set, keep trying */
145 thd_t *thread_struct; /* thread structures */
146 bool get_reply; /* flag if reply expected */
147 slurm_msg_type_t msg_type; /* RPC to be issued */
148 void **msg_args_pptr; /* RPC data to be used */
149 uint16_t protocol_version; /* if set, use this version */
150 } agent_info_t;
151
152 typedef struct task_info {
153 pthread_mutex_t *thread_mutex_ptr; /* pointer to agent specific
154 * mutex */
155 pthread_cond_t *thread_cond_ptr;/* pointer to agent specific
156 * condition */
157 uint32_t *threads_active_ptr; /* currently active thread ptr */
158 thd_t *thread_struct_ptr; /* thread structures ptr */
159 bool get_reply; /* flag if reply expected */
160 slurm_msg_type_t msg_type; /* RPC to be issued */
161 void *msg_args_ptr; /* ptr to RPC data to be used */
162 uint16_t protocol_version; /* if set, use this version */
163 } task_info_t;
164
165 typedef struct queued_request {
166 agent_arg_t* agent_arg_ptr; /* The queued request */
167 time_t first_attempt; /* Time of first check for batch
168 * launch RPC *only* */
169 time_t last_attempt; /* Time of last xmit attempt */
170 } queued_request_t;
171
172 typedef struct mail_info {
173 char *user_name;
174 char *message;
175 } mail_info_t;
176
177 static void _agent_defer(void);
178 static void _agent_retry(int min_wait, bool wait_too);
179 static int _batch_launch_defer(queued_request_t *queued_req_ptr);
180 static void _reboot_from_ctld(agent_arg_t *agent_arg_ptr);
181 static int _signal_defer(queued_request_t *queued_req_ptr);
182 static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type);
183 static void _list_delete_retry(void *retry_entry);
184 static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr);
185 static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx);
186 static void _notify_slurmctld_jobs(agent_info_t *agent_ptr);
187 static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
188 int no_resp_cnt, int retry_cnt);
189 static void _purge_agent_args(agent_arg_t *agent_arg_ptr);
190 static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count);
191 static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
192 int *count, int *spot);
193 static void _sig_handler(int dummy);
194 static void *_thread_per_group_rpc(void *args);
195 static int _valid_agent_arg(agent_arg_t *agent_arg_ptr);
196 static void *_wdog(void *args);
197
198 static mail_info_t *_mail_alloc(void);
199 static void _mail_free(void *arg);
200 static void *_mail_proc(void *arg);
201 static char *_mail_type_str(uint16_t mail_type);
202 static char **_build_mail_env(void);
203
204 static pthread_mutex_t defer_mutex = PTHREAD_MUTEX_INITIALIZER;
205 static pthread_mutex_t mail_mutex = PTHREAD_MUTEX_INITIALIZER;
206 static pthread_mutex_t retry_mutex = PTHREAD_MUTEX_INITIALIZER;
207 static List defer_list = NULL; /* agent_arg_t list for requests
208 * requiring job write lock */
209 static List mail_list = NULL; /* pending e-mail requests */
210 static List retry_list = NULL; /* agent_arg_t list for retry */
211
212
213 static pthread_mutex_t agent_cnt_mutex = PTHREAD_MUTEX_INITIALIZER;
214 static pthread_cond_t agent_cnt_cond = PTHREAD_COND_INITIALIZER;
215 static int agent_cnt = 0;
216 static int agent_thread_cnt = 0;
217 static int mail_thread_cnt = 0;
218 static uint16_t message_timeout = NO_VAL16;
219
220 static pthread_mutex_t pending_mutex = PTHREAD_MUTEX_INITIALIZER;
221 static pthread_cond_t pending_cond = PTHREAD_COND_INITIALIZER;
222 static int pending_wait_time = NO_VAL16;
223 static bool pending_mail = false;
224 static bool pending_thread_running = false;
225
226 static bool run_scheduler = false;
227
228 static uint32_t *rpc_stat_counts = NULL, *rpc_stat_types = NULL;
229 static uint32_t stat_type_count = 0;
230 static uint32_t rpc_count = 0;
231 static uint32_t *rpc_type_list;
232 static char **rpc_host_list = NULL;
233 static time_t cache_build_time = 0;
234
235 /*
236 * agent - party responsible for transmitting an common RPC in parallel
237 * across a set of nodes. Use agent_queue_request() if immediate
238 * execution is not essential.
239 * IN pointer to agent_arg_t, which is xfree'd (including hostlist,
240 * and msg_args) upon completion
241 * RET always NULL (function format just for use as pthread)
242 */
agent(void * args)243 void *agent(void *args)
244 {
245 int i, delay;
246 pthread_t thread_wdog = 0;
247 agent_arg_t *agent_arg_ptr = args;
248 agent_info_t *agent_info_ptr = NULL;
249 thd_t *thread_ptr;
250 task_info_t *task_specific_ptr;
251 time_t begin_time;
252 bool spawn_retry_agent = false;
253 int rpc_thread_cnt;
254 static time_t sched_update = 0;
255 static bool reboot_from_ctld = false;
256
257 #if HAVE_SYS_PRCTL_H
258 if (prctl(PR_SET_NAME, "agent", NULL, NULL, NULL) < 0) {
259 error("%s: cannot set my name to %s %m", __func__, "agent");
260 }
261 #endif
262
263 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
264 info("%s: Agent_cnt=%d agent_thread_cnt=%d with msg_type=%s retry_list_size=%d",
265 __func__, agent_cnt, agent_thread_cnt,
266 rpc_num2string(agent_arg_ptr->msg_type),
267 retry_list_size());
268 }
269 slurm_mutex_lock(&agent_cnt_mutex);
270
271 if (sched_update != slurmctld_conf.last_update) {
272 #ifdef HAVE_NATIVE_CRAY
273 reboot_from_ctld = true;
274 #else
275 char *ctld_params = slurm_get_slurmctld_params();
276
277 reboot_from_ctld = false;
278 if (xstrcasestr(ctld_params, "reboot_from_controller"))
279 reboot_from_ctld = true;
280 xfree(ctld_params);
281 #endif
282 sched_update = slurmctld_conf.last_update;
283 }
284
285 rpc_thread_cnt = 2 + MIN(agent_arg_ptr->node_count, AGENT_THREAD_COUNT);
286 while (1) {
287 if (slurmctld_config.shutdown_time ||
288 ((agent_thread_cnt+rpc_thread_cnt) <= MAX_SERVER_THREADS)) {
289 agent_cnt++;
290 agent_thread_cnt += rpc_thread_cnt;
291 break;
292 } else { /* wait for state change and retry */
293 slurm_cond_wait(&agent_cnt_cond, &agent_cnt_mutex);
294 }
295 }
296 slurm_mutex_unlock(&agent_cnt_mutex);
297 if (slurmctld_config.shutdown_time)
298 goto cleanup;
299
300 /* basic argument value tests */
301 begin_time = time(NULL);
302 if (_valid_agent_arg(agent_arg_ptr))
303 goto cleanup;
304
305 if (reboot_from_ctld &&
306 (agent_arg_ptr->msg_type == REQUEST_REBOOT_NODES)) {
307 _reboot_from_ctld(agent_arg_ptr);
308 goto cleanup;
309 }
310
311 /* initialize the agent data structures */
312 agent_info_ptr = _make_agent_info(agent_arg_ptr);
313 thread_ptr = agent_info_ptr->thread_struct;
314
315 /* start the watchdog thread */
316 slurm_thread_create(&thread_wdog, _wdog, agent_info_ptr);
317
318 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
319 info("%s: New agent thread_count:%d threads_active:%d retry:%c get_reply:%c msg_type:%s protocol_version:%hu",
320 __func__, agent_info_ptr->thread_count,
321 agent_info_ptr->threads_active,
322 agent_info_ptr->retry ? 'T' : 'F',
323 agent_info_ptr->get_reply ? 'T' : 'F',
324 rpc_num2string(agent_arg_ptr->msg_type),
325 agent_info_ptr->protocol_version);
326 }
327 /* start all the other threads (up to AGENT_THREAD_COUNT active) */
328 for (i = 0; i < agent_info_ptr->thread_count; i++) {
329 /* wait until "room" for another thread */
330 slurm_mutex_lock(&agent_info_ptr->thread_mutex);
331 while (agent_info_ptr->threads_active >=
332 AGENT_THREAD_COUNT) {
333 slurm_cond_wait(&agent_info_ptr->thread_cond,
334 &agent_info_ptr->thread_mutex);
335 }
336
337 /*
338 * create thread specific data,
339 * NOTE: freed from _thread_per_group_rpc()
340 */
341 task_specific_ptr = _make_task_data(agent_info_ptr, i);
342
343 slurm_thread_create_detached(&thread_ptr[i].thread,
344 _thread_per_group_rpc,
345 task_specific_ptr);
346 agent_info_ptr->threads_active++;
347 slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
348 }
349
350 /* Wait for termination of remaining threads */
351 pthread_join(thread_wdog, NULL);
352 delay = (int) difftime(time(NULL), begin_time);
353 if (delay > (slurm_get_msg_timeout() * 2)) {
354 info("agent msg_type=%u ran for %d seconds",
355 agent_arg_ptr->msg_type, delay);
356 }
357 slurm_mutex_lock(&agent_info_ptr->thread_mutex);
358 while (agent_info_ptr->threads_active != 0) {
359 slurm_cond_wait(&agent_info_ptr->thread_cond,
360 &agent_info_ptr->thread_mutex);
361 }
362 slurm_mutex_unlock(&agent_info_ptr->thread_mutex);
363
364 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
365 info("%s: end agent thread_count:%d threads_active:%d retry:%c get_reply:%c msg_type:%s protocol_version:%hu",
366 __func__, agent_info_ptr->thread_count,
367 agent_info_ptr->threads_active,
368 agent_info_ptr->retry ? 'T' : 'F',
369 agent_info_ptr->get_reply ? 'T' : 'F',
370 rpc_num2string(agent_arg_ptr->msg_type),
371 agent_info_ptr->protocol_version);
372 }
373
374 cleanup:
375 _purge_agent_args(agent_arg_ptr);
376
377 if (agent_info_ptr) {
378 xfree(agent_info_ptr->thread_struct);
379 xfree(agent_info_ptr);
380 }
381 slurm_mutex_lock(&agent_cnt_mutex);
382
383 if (agent_cnt > 0) {
384 agent_cnt--;
385 } else {
386 error("agent_cnt underflow");
387 agent_cnt = 0;
388 }
389 if (agent_thread_cnt >= rpc_thread_cnt) {
390 agent_thread_cnt -= rpc_thread_cnt;
391 } else {
392 error("agent_thread_cnt underflow");
393 agent_thread_cnt = 0;
394 }
395
396 if ((agent_thread_cnt + AGENT_THREAD_COUNT + 2) < MAX_SERVER_THREADS)
397 spawn_retry_agent = true;
398
399 slurm_cond_broadcast(&agent_cnt_cond);
400 slurm_mutex_unlock(&agent_cnt_mutex);
401
402 if (spawn_retry_agent)
403 agent_trigger(RPC_RETRY_INTERVAL, true);
404
405 return NULL;
406 }
407
408 /* Basic validity test of agent argument */
_valid_agent_arg(agent_arg_t * agent_arg_ptr)409 static int _valid_agent_arg(agent_arg_t *agent_arg_ptr)
410 {
411 int hostlist_cnt;
412
413 xassert(agent_arg_ptr);
414 xassert(agent_arg_ptr->hostlist);
415
416 if (agent_arg_ptr->node_count == 0)
417 return SLURM_ERROR; /* no messages to be sent */
418 hostlist_cnt = hostlist_count(agent_arg_ptr->hostlist);
419 if (agent_arg_ptr->node_count != hostlist_cnt) {
420 error("%s: node_count RPC different from hosts listed (%d!=%d)",
421 __func__, agent_arg_ptr->node_count, hostlist_cnt);
422 return SLURM_ERROR; /* no messages to be sent */
423 }
424 return SLURM_SUCCESS;
425 }
426
_make_agent_info(agent_arg_t * agent_arg_ptr)427 static agent_info_t *_make_agent_info(agent_arg_t *agent_arg_ptr)
428 {
429 int i = 0, j = 0;
430 agent_info_t *agent_info_ptr = NULL;
431 thd_t *thread_ptr = NULL;
432 int *span = NULL;
433 int thr_count = 0;
434 hostlist_t hl = NULL;
435 char *name = NULL;
436
437 agent_info_ptr = xmalloc(sizeof(agent_info_t));
438 slurm_mutex_init(&agent_info_ptr->thread_mutex);
439 slurm_cond_init(&agent_info_ptr->thread_cond, NULL);
440 agent_info_ptr->thread_count = agent_arg_ptr->node_count;
441 agent_info_ptr->retry = agent_arg_ptr->retry;
442 agent_info_ptr->threads_active = 0;
443 thread_ptr = xcalloc(agent_info_ptr->thread_count, sizeof(thd_t));
444 memset(thread_ptr, 0, (agent_info_ptr->thread_count * sizeof(thd_t)));
445 agent_info_ptr->thread_struct = thread_ptr;
446 agent_info_ptr->msg_type = agent_arg_ptr->msg_type;
447 agent_info_ptr->msg_args_pptr = &agent_arg_ptr->msg_args;
448 agent_info_ptr->protocol_version = agent_arg_ptr->protocol_version;
449
450 if ((agent_arg_ptr->msg_type != REQUEST_JOB_NOTIFY) &&
451 (agent_arg_ptr->msg_type != REQUEST_REBOOT_NODES) &&
452 (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE) &&
453 (agent_arg_ptr->msg_type != REQUEST_RECONFIGURE_WITH_CONFIG) &&
454 (agent_arg_ptr->msg_type != REQUEST_SHUTDOWN) &&
455 (agent_arg_ptr->msg_type != SRUN_EXEC) &&
456 (agent_arg_ptr->msg_type != SRUN_TIMEOUT) &&
457 (agent_arg_ptr->msg_type != SRUN_NODE_FAIL) &&
458 (agent_arg_ptr->msg_type != SRUN_REQUEST_SUSPEND) &&
459 (agent_arg_ptr->msg_type != SRUN_USER_MSG) &&
460 (agent_arg_ptr->msg_type != SRUN_STEP_MISSING) &&
461 (agent_arg_ptr->msg_type != SRUN_STEP_SIGNAL) &&
462 (agent_arg_ptr->msg_type != SRUN_JOB_COMPLETE)) {
463 #ifdef HAVE_FRONT_END
464 span = set_span(agent_arg_ptr->node_count,
465 agent_arg_ptr->node_count);
466 #else
467 /* Sending message to a possibly large number of slurmd.
468 * Push all message forwarding to slurmd in order to
469 * offload as much work from slurmctld as possible. */
470 span = set_span(agent_arg_ptr->node_count, 1);
471 #endif
472 agent_info_ptr->get_reply = true;
473 } else {
474 /* Message is going to one node (for srun) or we want
475 * it to get processed ASAP (SHUTDOWN or RECONFIGURE).
476 * Send the message directly to each node. */
477 span = set_span(agent_arg_ptr->node_count,
478 agent_arg_ptr->node_count);
479 }
480 i = 0;
481 while (i < agent_info_ptr->thread_count) {
482 thread_ptr[thr_count].state = DSH_NEW;
483 thread_ptr[thr_count].addr = agent_arg_ptr->addr;
484 name = hostlist_shift(agent_arg_ptr->hostlist);
485 if (!name) {
486 debug3("no more nodes to send to");
487 break;
488 }
489 hl = hostlist_create(name);
490 if (thread_ptr[thr_count].addr && span[thr_count]) {
491 debug("warning: you will only be sending this to %s",
492 name);
493 span[thr_count] = 0;
494 }
495 free(name);
496 i++;
497 for (j = 0; j < span[thr_count]; j++) {
498 name = hostlist_shift(agent_arg_ptr->hostlist);
499 if (!name)
500 break;
501 hostlist_push_host(hl, name);
502 free(name);
503 i++;
504 }
505 hostlist_uniq(hl);
506 thread_ptr[thr_count].nodelist =
507 hostlist_ranged_string_xmalloc(hl);
508 hostlist_destroy(hl);
509 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
510 info("%s: sending msg_type %s to nodes %s",
511 __func__, rpc_num2string(agent_arg_ptr->msg_type),
512 thread_ptr[thr_count].nodelist);
513
514 }
515 thr_count++;
516 }
517 xfree(span);
518 agent_info_ptr->thread_count = thr_count;
519 return agent_info_ptr;
520 }
521
_make_task_data(agent_info_t * agent_info_ptr,int inx)522 static task_info_t *_make_task_data(agent_info_t *agent_info_ptr, int inx)
523 {
524 task_info_t *task_info_ptr;
525 task_info_ptr = xmalloc(sizeof(task_info_t));
526
527 task_info_ptr->thread_mutex_ptr = &agent_info_ptr->thread_mutex;
528 task_info_ptr->thread_cond_ptr = &agent_info_ptr->thread_cond;
529 task_info_ptr->threads_active_ptr= &agent_info_ptr->threads_active;
530 task_info_ptr->thread_struct_ptr = &agent_info_ptr->thread_struct[inx];
531 task_info_ptr->get_reply = agent_info_ptr->get_reply;
532 task_info_ptr->msg_type = agent_info_ptr->msg_type;
533 task_info_ptr->msg_args_ptr = *agent_info_ptr->msg_args_pptr;
534 task_info_ptr->protocol_version = agent_info_ptr->protocol_version;
535
536 return task_info_ptr;
537 }
538
_update_wdog_state(thd_t * thread_ptr,state_t * state,thd_complete_t * thd_comp)539 static void _update_wdog_state(thd_t *thread_ptr,
540 state_t *state,
541 thd_complete_t *thd_comp)
542 {
543 switch (*state) {
544 case DSH_ACTIVE:
545 thd_comp->work_done = false;
546 if (thread_ptr->end_time <= thd_comp->now) {
547 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
548 info("%s: agent thread %lu timed out", __func__,
549 (unsigned long) thread_ptr->thread);
550 }
551 if (pthread_kill(thread_ptr->thread, SIGUSR1) == ESRCH)
552 *state = DSH_NO_RESP;
553 else
554 thread_ptr->end_time += message_timeout;
555 }
556 break;
557 case DSH_NEW:
558 thd_comp->work_done = false;
559 break;
560 case DSH_DONE:
561 if (thd_comp->max_delay < (int)thread_ptr->end_time)
562 thd_comp->max_delay = (int)thread_ptr->end_time;
563 break;
564 case DSH_NO_RESP:
565 thd_comp->no_resp_cnt++;
566 thd_comp->retry_cnt++;
567 break;
568 case DSH_FAILED:
569 case DSH_DUP_JOBID:
570 thd_comp->fail_cnt++;
571 break;
572 }
573 }
574
575 /*
576 * _wdog - Watchdog thread. Send SIGUSR1 to threads which have been active
577 * for too long.
578 * IN args - pointer to agent_info_t with info on threads to watch
579 * Sleep between polls with exponential times (from 0.005 to 1.0 second)
580 */
_wdog(void * args)581 static void *_wdog(void *args)
582 {
583 bool srun_agent = false;
584 int i;
585 agent_info_t *agent_ptr = (agent_info_t *) args;
586 thd_t *thread_ptr = agent_ptr->thread_struct;
587 unsigned long usec = 5000;
588 ListIterator itr;
589 thd_complete_t thd_comp;
590 ret_data_info_t *ret_data_info = NULL;
591
592 if ( (agent_ptr->msg_type == SRUN_JOB_COMPLETE) ||
593 (agent_ptr->msg_type == SRUN_REQUEST_SUSPEND) ||
594 (agent_ptr->msg_type == SRUN_STEP_MISSING) ||
595 (agent_ptr->msg_type == SRUN_STEP_SIGNAL) ||
596 (agent_ptr->msg_type == SRUN_EXEC) ||
597 (agent_ptr->msg_type == SRUN_NODE_FAIL) ||
598 (agent_ptr->msg_type == SRUN_PING) ||
599 (agent_ptr->msg_type == SRUN_TIMEOUT) ||
600 (agent_ptr->msg_type == SRUN_USER_MSG) ||
601 (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
602 (agent_ptr->msg_type == RESPONSE_HET_JOB_ALLOCATION) )
603 srun_agent = true;
604
605 thd_comp.max_delay = 0;
606
607 while (1) {
608 thd_comp.work_done = true;/* assume all threads complete */
609 thd_comp.fail_cnt = 0; /* assume no threads failures */
610 thd_comp.no_resp_cnt = 0; /* assume all threads respond */
611 thd_comp.retry_cnt = 0; /* assume no required retries */
612 thd_comp.now = time(NULL);
613
614 usleep(usec);
615 usec = MIN((usec * 2), 1000000);
616
617 slurm_mutex_lock(&agent_ptr->thread_mutex);
618 for (i = 0; i < agent_ptr->thread_count; i++) {
619 //info("thread name %s",thread_ptr[i].node_name);
620 if (!thread_ptr[i].ret_list) {
621 _update_wdog_state(&thread_ptr[i],
622 &thread_ptr[i].state,
623 &thd_comp);
624 } else {
625 itr = list_iterator_create(
626 thread_ptr[i].ret_list);
627 while ((ret_data_info = list_next(itr))) {
628 _update_wdog_state(&thread_ptr[i],
629 &ret_data_info->err,
630 &thd_comp);
631 }
632 list_iterator_destroy(itr);
633 }
634 }
635 if (thd_comp.work_done)
636 break;
637
638 slurm_mutex_unlock(&agent_ptr->thread_mutex);
639 }
640
641 if (srun_agent) {
642 _notify_slurmctld_jobs(agent_ptr);
643 } else {
644 _notify_slurmctld_nodes(agent_ptr,
645 thd_comp.no_resp_cnt,
646 thd_comp.retry_cnt);
647 }
648
649 for (i = 0; i < agent_ptr->thread_count; i++) {
650 FREE_NULL_LIST(thread_ptr[i].ret_list);
651 xfree(thread_ptr[i].nodelist);
652 }
653
654 if (thd_comp.max_delay &&
655 (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT)) {
656 info("%s: agent maximum delay %d seconds", __func__,
657 thd_comp.max_delay);
658 }
659
660 slurm_mutex_unlock(&agent_ptr->thread_mutex);
661 return (void *) NULL;
662 }
663
_notify_slurmctld_jobs(agent_info_t * agent_ptr)664 static void _notify_slurmctld_jobs(agent_info_t *agent_ptr)
665 {
666 /* Locks: Write job */
667 slurmctld_lock_t job_write_lock =
668 { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
669 uint32_t job_id = 0, step_id = 0;
670 thd_t *thread_ptr = agent_ptr->thread_struct;
671
672 if (agent_ptr->msg_type == SRUN_PING) {
673 srun_ping_msg_t *msg = *agent_ptr->msg_args_pptr;
674 job_id = msg->job_id;
675 step_id = msg->step_id;
676 } else if (agent_ptr->msg_type == SRUN_TIMEOUT) {
677 srun_timeout_msg_t *msg = *agent_ptr->msg_args_pptr;
678 job_id = msg->job_id;
679 step_id = msg->step_id;
680 } else if (agent_ptr->msg_type == RESPONSE_RESOURCE_ALLOCATION) {
681 resource_allocation_response_msg_t *msg =
682 *agent_ptr->msg_args_pptr;
683 job_id = msg->job_id;
684 step_id = NO_VAL;
685 } else if (agent_ptr->msg_type == RESPONSE_HET_JOB_ALLOCATION) {
686 List het_alloc_list = *agent_ptr->msg_args_pptr;
687 resource_allocation_response_msg_t *msg;
688 if (!het_alloc_list || (list_count(het_alloc_list) == 0))
689 return;
690 msg = list_peek(het_alloc_list);
691 job_id = msg->job_id;
692 step_id = NO_VAL;
693 } else if ((agent_ptr->msg_type == SRUN_JOB_COMPLETE) ||
694 (agent_ptr->msg_type == SRUN_REQUEST_SUSPEND) ||
695 (agent_ptr->msg_type == SRUN_STEP_MISSING) ||
696 (agent_ptr->msg_type == SRUN_STEP_SIGNAL) ||
697 (agent_ptr->msg_type == SRUN_EXEC) ||
698 (agent_ptr->msg_type == SRUN_USER_MSG)) {
699 return; /* no need to note srun response */
700 } else if (agent_ptr->msg_type == SRUN_NODE_FAIL) {
701 return; /* no need to note srun response */
702 } else {
703 error("%s: invalid msg_type %u", __func__, agent_ptr->msg_type);
704 return;
705 }
706 lock_slurmctld(job_write_lock);
707 if (thread_ptr[0].state == DSH_DONE) {
708 srun_response(job_id, step_id);
709 }
710
711 unlock_slurmctld(job_write_lock);
712 }
713
_notify_slurmctld_nodes(agent_info_t * agent_ptr,int no_resp_cnt,int retry_cnt)714 static void _notify_slurmctld_nodes(agent_info_t *agent_ptr,
715 int no_resp_cnt, int retry_cnt)
716 {
717 ListIterator itr = NULL;
718 ret_data_info_t *ret_data_info = NULL;
719 state_t state;
720 int is_ret_list = 1;
721 /* Locks: Read config, write node */
722 slurmctld_lock_t node_write_lock =
723 { .conf = READ_LOCK, .node = WRITE_LOCK };
724 thd_t *thread_ptr = agent_ptr->thread_struct;
725 int i;
726
727 /* Notify slurmctld of non-responding nodes */
728 if (no_resp_cnt) {
729 /* Update node table data for non-responding nodes */
730 if (agent_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
731 /* Requeue the request */
732 batch_job_launch_msg_t *launch_msg_ptr =
733 *agent_ptr->msg_args_pptr;
734 uint32_t job_id = launch_msg_ptr->job_id;
735 /* Locks: Write job, write node, read federation */
736 slurmctld_lock_t job_write_lock =
737 { .job = WRITE_LOCK,
738 .node = WRITE_LOCK,
739 .fed = READ_LOCK };
740
741 lock_slurmctld(job_write_lock);
742 job_complete(job_id, slurmctld_conf.slurm_user_id,
743 true, false, 0);
744 unlock_slurmctld(job_write_lock);
745 }
746 }
747 if (retry_cnt && agent_ptr->retry)
748 _queue_agent_retry(agent_ptr, retry_cnt);
749
750 /* Update last_response on responding nodes */
751 lock_slurmctld(node_write_lock);
752 for (i = 0; i < agent_ptr->thread_count; i++) {
753 char *down_msg, *node_names;
754 slurm_msg_type_t resp_type = RESPONSE_SLURM_RC;
755
756 if (!thread_ptr[i].ret_list) {
757 state = thread_ptr[i].state;
758 is_ret_list = 0;
759 goto switch_on_state;
760 }
761 is_ret_list = 1;
762
763 itr = list_iterator_create(thread_ptr[i].ret_list);
764 while ((ret_data_info = list_next(itr))) {
765 state = ret_data_info->err;
766 switch_on_state:
767 if (is_ret_list) {
768 node_names = ret_data_info->node_name;
769 resp_type = ret_data_info->type;
770 } else
771 node_names = thread_ptr[i].nodelist;
772
773 switch (state) {
774 case DSH_NO_RESP:
775 node_not_resp(node_names,
776 thread_ptr[i].start_time,
777 resp_type);
778 break;
779 case DSH_FAILED:
780 #ifdef HAVE_FRONT_END
781 down_msg = "";
782 #else
783 drain_nodes(node_names,
784 "Prolog/Epilog failure",
785 slurmctld_conf.slurm_user_id);
786 down_msg = ", set to state DRAIN";
787 #endif
788 error("Prolog/Epilog failure on nodes %s%s",
789 node_names, down_msg);
790 break;
791 case DSH_DUP_JOBID:
792 #ifdef HAVE_FRONT_END
793 down_msg = "";
794 #else
795 drain_nodes(node_names,
796 "Duplicate jobid",
797 slurmctld_conf.slurm_user_id);
798 down_msg = ", set to state DRAIN";
799 #endif
800 error("Duplicate jobid on nodes %s%s",
801 node_names, down_msg);
802 break;
803 case DSH_DONE:
804 node_did_resp(node_names);
805 break;
806 default:
807 error("unknown state returned for %s",
808 node_names);
809 break;
810 }
811 if (!is_ret_list)
812 goto finished;
813 }
814 list_iterator_destroy(itr);
815 finished: ;
816 }
817 unlock_slurmctld(node_write_lock);
818 if (run_scheduler) {
819 run_scheduler = false;
820 /* below functions all have their own locking */
821 if (schedule(0)) {
822 schedule_job_save();
823 schedule_node_save();
824 }
825 }
826 if ((agent_ptr->msg_type == REQUEST_PING) ||
827 (agent_ptr->msg_type == REQUEST_HEALTH_CHECK) ||
828 (agent_ptr->msg_type == REQUEST_ACCT_GATHER_UPDATE) ||
829 (agent_ptr->msg_type == REQUEST_NODE_REGISTRATION_STATUS))
830 ping_end();
831 }
832
833 /* Report a communications error for specified node
834 * This also gets logged as a non-responsive node */
_comm_err(char * node_name,slurm_msg_type_t msg_type)835 static inline int _comm_err(char *node_name, slurm_msg_type_t msg_type)
836 {
837 int rc = 1;
838
839 if ((rc = is_node_resp (node_name)))
840 verbose("agent/is_node_resp: node:%s RPC:%s : %m",
841 node_name, rpc_num2string(msg_type));
842 return rc;
843 }
844
845 /* return a value for which WEXITSTATUS() returns 1 */
_wif_status(void)846 static int _wif_status(void)
847 {
848 static int rc = 0;
849 int i;
850
851 if (rc)
852 return rc;
853
854 rc = 1;
855 for (i=0; i<64; i++) {
856 if (WEXITSTATUS(rc))
857 return rc;
858 rc = rc << 1;
859 }
860 error("Could not identify WEXITSTATUS");
861 rc = 1;
862 return rc;
863 }
864
865 /*
866 * _thread_per_group_rpc - thread to issue an RPC for a group of nodes
867 * sending message out to one and forwarding it to
868 * others if necessary.
869 * IN/OUT args - pointer to task_info_t, xfree'd on completion
870 */
_thread_per_group_rpc(void * args)871 static void *_thread_per_group_rpc(void *args)
872 {
873 int rc = SLURM_SUCCESS;
874 slurm_msg_t msg;
875 task_info_t *task_ptr = (task_info_t *) args;
876 /* we cache some pointers from task_info_t because we need
877 * to xfree args before being finished with their use. xfree
878 * is required for timely termination of this pthread because
879 * xfree could lock it at the end, preventing a timely
880 * thread_exit */
881 pthread_mutex_t *thread_mutex_ptr = task_ptr->thread_mutex_ptr;
882 pthread_cond_t *thread_cond_ptr = task_ptr->thread_cond_ptr;
883 uint32_t *threads_active_ptr = task_ptr->threads_active_ptr;
884 thd_t *thread_ptr = task_ptr->thread_struct_ptr;
885 state_t thread_state = DSH_NO_RESP;
886 slurm_msg_type_t msg_type = task_ptr->msg_type;
887 bool is_kill_msg, srun_agent;
888 List ret_list = NULL;
889 ListIterator itr;
890 ret_data_info_t *ret_data_info = NULL;
891 int sig_array[2] = {SIGUSR1, 0};
892 /* Locks: Write job, write node */
893 slurmctld_lock_t job_write_lock = {
894 NO_LOCK, WRITE_LOCK, WRITE_LOCK, NO_LOCK, READ_LOCK };
895 /* Lock: Read node */
896 slurmctld_lock_t node_read_lock = {
897 NO_LOCK, NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
898 /* Lock: Write node */
899 slurmctld_lock_t node_write_lock = {
900 NO_LOCK, NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK };
901 uint32_t job_id;
902
903 xassert(args != NULL);
904 xsignal(SIGUSR1, _sig_handler);
905 xsignal_unblock(sig_array);
906 is_kill_msg = ( (msg_type == REQUEST_KILL_TIMELIMIT) ||
907 (msg_type == REQUEST_KILL_PREEMPTED) ||
908 (msg_type == REQUEST_TERMINATE_JOB) );
909 srun_agent = ( (msg_type == SRUN_PING) ||
910 (msg_type == SRUN_EXEC) ||
911 (msg_type == SRUN_JOB_COMPLETE) ||
912 (msg_type == SRUN_STEP_MISSING) ||
913 (msg_type == SRUN_STEP_SIGNAL) ||
914 (msg_type == SRUN_TIMEOUT) ||
915 (msg_type == SRUN_USER_MSG) ||
916 (msg_type == RESPONSE_RESOURCE_ALLOCATION) ||
917 (msg_type == SRUN_NODE_FAIL) );
918
919 thread_ptr->start_time = time(NULL);
920
921 slurm_mutex_lock(thread_mutex_ptr);
922 thread_ptr->state = DSH_ACTIVE;
923 thread_ptr->end_time = thread_ptr->start_time + message_timeout;
924 slurm_mutex_unlock(thread_mutex_ptr);
925
926 /* send request message */
927 slurm_msg_t_init(&msg);
928
929 if (task_ptr->protocol_version)
930 msg.protocol_version = task_ptr->protocol_version;
931
932 msg.msg_type = msg_type;
933 msg.data = task_ptr->msg_args_ptr;
934
935 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
936 info("%s: sending %s to %s", __func__, rpc_num2string(msg_type),
937 thread_ptr->nodelist);
938 }
939
940 if (task_ptr->get_reply) {
941 if (thread_ptr->addr) {
942 msg.address = *thread_ptr->addr;
943
944 if (!(ret_list = slurm_send_addr_recv_msgs(
945 &msg, thread_ptr->nodelist, 0))) {
946 error("%s: no ret_list given", __func__);
947 goto cleanup;
948 }
949 } else {
950 if (!(ret_list = slurm_send_recv_msgs(
951 thread_ptr->nodelist, &msg, 0))) {
952 error("%s: no ret_list given", __func__);
953 goto cleanup;
954 }
955 }
956 } else {
957 if (thread_ptr->addr) {
958 //info("got the address");
959 msg.address = *thread_ptr->addr;
960 } else {
961 //info("no address given");
962 if (slurm_conf_get_addr(thread_ptr->nodelist,
963 &msg.address, msg.flags)
964 == SLURM_ERROR) {
965 error("%s: can't find address for host %s, check slurm.conf",
966 __func__, thread_ptr->nodelist);
967 goto cleanup;
968 }
969 }
970 //info("sending %u to %s", msg_type, thread_ptr->nodelist);
971 if (msg_type == SRUN_JOB_COMPLETE) {
972 /*
973 * The srun runs as a single thread, while the kernel
974 * listen() may be queuing messages for further
975 * processing. If we get our SYN in the listen queue
976 * at the same time the last MESSAGE_TASK_EXIT is being
977 * processed, srun may exit meaning this message is
978 * never received, leading to a series of error
979 * messages from slurm_send_only_node_msg().
980 * So, we use this different function that blindly
981 * flings the message out and disregards any
982 * communication problems that may arise.
983 */
984 slurm_send_msg_maybe(&msg);
985 thread_state = DSH_DONE;
986 } else if (slurm_send_only_node_msg(&msg) == SLURM_SUCCESS) {
987 thread_state = DSH_DONE;
988 } else {
989 if (!srun_agent) {
990 lock_slurmctld(node_read_lock);
991 _comm_err(thread_ptr->nodelist, msg_type);
992 unlock_slurmctld(node_read_lock);
993 }
994 }
995 goto cleanup;
996 }
997
998 //info("got %d messages back", list_count(ret_list));
999 itr = list_iterator_create(ret_list);
1000 while ((ret_data_info = list_next(itr))) {
1001 rc = slurm_get_return_code(ret_data_info->type,
1002 ret_data_info->data);
1003 /* SPECIAL CASE: Record node's CPU load */
1004 if (ret_data_info->type == RESPONSE_PING_SLURMD) {
1005 ping_slurmd_resp_msg_t *ping_resp;
1006 ping_resp = (ping_slurmd_resp_msg_t *)
1007 ret_data_info->data;
1008 lock_slurmctld(node_write_lock);
1009 reset_node_load(ret_data_info->node_name,
1010 ping_resp->cpu_load);
1011 reset_node_free_mem(ret_data_info->node_name,
1012 ping_resp->free_mem);
1013 unlock_slurmctld(node_write_lock);
1014 }
1015 /* SPECIAL CASE: Mark node as IDLE if job already complete */
1016 if (is_kill_msg &&
1017 (rc == ESLURMD_KILL_JOB_ALREADY_COMPLETE)) {
1018 kill_job_msg_t *kill_job;
1019 kill_job = (kill_job_msg_t *)
1020 task_ptr->msg_args_ptr;
1021 rc = SLURM_SUCCESS;
1022 lock_slurmctld(job_write_lock);
1023 if (job_epilog_complete(kill_job->job_id,
1024 ret_data_info->node_name, rc))
1025 run_scheduler = true;
1026 unlock_slurmctld(job_write_lock);
1027 }
1028
1029 /* SPECIAL CASE: Record node's CPU load */
1030 if (ret_data_info->type == RESPONSE_ACCT_GATHER_UPDATE) {
1031 lock_slurmctld(node_write_lock);
1032 update_node_record_acct_gather_data(
1033 ret_data_info->data);
1034 unlock_slurmctld(node_write_lock);
1035 }
1036
1037 /* SPECIAL CASE: Requeue/hold non-startable batch job,
1038 * Requeue job prolog failure or duplicate job ID */
1039 if ((msg_type == REQUEST_BATCH_JOB_LAUNCH) &&
1040 (rc != SLURM_SUCCESS) && (rc != ESLURMD_PROLOG_FAILED) &&
1041 (rc != ESLURM_DUPLICATE_JOB_ID) &&
1042 (ret_data_info->type != RESPONSE_FORWARD_FAILED)) {
1043 batch_job_launch_msg_t *launch_msg_ptr =
1044 task_ptr->msg_args_ptr;
1045 job_id = launch_msg_ptr->job_id;
1046 info("Killing non-startable batch JobId=%u: %s",
1047 job_id, slurm_strerror(rc));
1048 thread_state = DSH_DONE;
1049 ret_data_info->err = thread_state;
1050 lock_slurmctld(job_write_lock);
1051 job_complete(job_id, slurmctld_conf.slurm_user_id,
1052 false, false, _wif_status());
1053 unlock_slurmctld(job_write_lock);
1054 continue;
1055 } else if ((msg_type == RESPONSE_RESOURCE_ALLOCATION) &&
1056 (rc == SLURM_COMMUNICATIONS_CONNECTION_ERROR)) {
1057 /* Communication issue to srun that launched the job
1058 * Cancel rather than leave a stray-but-empty job
1059 * behind on the allocated nodes. */
1060 resource_allocation_response_msg_t *msg_ptr =
1061 task_ptr->msg_args_ptr;
1062 job_id = msg_ptr->job_id;
1063 info("Killing interactive JobId=%u: %s",
1064 job_id, slurm_strerror(rc));
1065 thread_state = DSH_FAILED;
1066 lock_slurmctld(job_write_lock);
1067 job_complete(job_id, slurmctld_conf.slurm_user_id,
1068 false, false, _wif_status());
1069 unlock_slurmctld(job_write_lock);
1070 continue;
1071 } else if ((msg_type == RESPONSE_HET_JOB_ALLOCATION) &&
1072 (rc == SLURM_COMMUNICATIONS_CONNECTION_ERROR)) {
1073 /* Communication issue to srun that launched the job
1074 * Cancel rather than leave a stray-but-empty job
1075 * behind on the allocated nodes. */
1076 List het_alloc_list = task_ptr->msg_args_ptr;
1077 resource_allocation_response_msg_t *msg_ptr;
1078 if (!het_alloc_list ||
1079 (list_count(het_alloc_list) == 0))
1080 continue;
1081 msg_ptr = list_peek(het_alloc_list);
1082 job_id = msg_ptr->job_id;
1083 info("Killing interactive JobId=%u: %s",
1084 job_id, slurm_strerror(rc));
1085 thread_state = DSH_FAILED;
1086 lock_slurmctld(job_write_lock);
1087 job_complete(job_id, slurmctld_conf.slurm_user_id,
1088 false, false, _wif_status());
1089 unlock_slurmctld(job_write_lock);
1090 continue;
1091 }
1092
1093 if (msg_type == REQUEST_SIGNAL_TASKS) {
1094 job_record_t *job_ptr;
1095 signal_tasks_msg_t *msg_ptr =
1096 task_ptr->msg_args_ptr;
1097
1098 if ((msg_ptr->signal == SIGCONT) ||
1099 (msg_ptr->signal == SIGSTOP)) {
1100 job_id = msg_ptr->job_id;
1101 lock_slurmctld(job_write_lock);
1102 job_ptr = find_job_record(job_id);
1103 if (job_ptr == NULL) {
1104 info("%s: invalid JobId=%u",
1105 __func__, job_id);
1106 } else if (rc == SLURM_SUCCESS) {
1107 if (msg_ptr->signal == SIGSTOP) {
1108 job_ptr->job_state |=
1109 JOB_STOPPED;
1110 } else { // SIGCONT
1111 job_ptr->job_state &=
1112 ~JOB_STOPPED;
1113 }
1114 }
1115
1116 if (job_ptr)
1117 job_ptr->job_state &= ~JOB_SIGNALING;
1118
1119 unlock_slurmctld(job_write_lock);
1120 }
1121 }
1122
1123 if (((msg_type == REQUEST_SIGNAL_TASKS) ||
1124 (msg_type == REQUEST_TERMINATE_TASKS)) &&
1125 (rc == ESRCH)) {
1126 /* process is already dead, not a real error */
1127 rc = SLURM_SUCCESS;
1128 }
1129
1130 switch (rc) {
1131 case SLURM_SUCCESS:
1132 /* debug("agent processed RPC to node %s", */
1133 /* ret_data_info->node_name); */
1134 thread_state = DSH_DONE;
1135 break;
1136 case SLURM_UNKNOWN_FORWARD_ADDR:
1137 error("We were unable to forward message to '%s'. "
1138 "Make sure the slurm.conf for each slurmd "
1139 "contain all other nodes in your system.",
1140 ret_data_info->node_name);
1141 thread_state = DSH_NO_RESP;
1142 break;
1143 case ESLURMD_EPILOG_FAILED:
1144 error("Epilog failure on host %s, "
1145 "setting DOWN",
1146 ret_data_info->node_name);
1147
1148 thread_state = DSH_FAILED;
1149 break;
1150 case ESLURMD_PROLOG_FAILED:
1151 thread_state = DSH_FAILED;
1152 break;
1153 case ESLURM_DUPLICATE_JOB_ID:
1154 thread_state = DSH_DUP_JOBID;
1155 break;
1156 case ESLURM_INVALID_JOB_ID:
1157 /* Not indicative of a real error */
1158 case ESLURMD_JOB_NOTRUNNING:
1159 /* Not indicative of a real error */
1160 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
1161 info("%s: RPC to node %s failed, job not running",
1162 __func__, ret_data_info->node_name);
1163 }
1164 thread_state = DSH_DONE;
1165 break;
1166 default:
1167 if (!srun_agent) {
1168 if (ret_data_info->err)
1169 errno = ret_data_info->err;
1170 else
1171 errno = rc;
1172 lock_slurmctld(node_read_lock);
1173 rc = _comm_err(ret_data_info->node_name,
1174 msg_type);
1175 unlock_slurmctld(node_read_lock);
1176 }
1177
1178 if (srun_agent)
1179 thread_state = DSH_FAILED;
1180 else if (rc || (ret_data_info->type ==
1181 RESPONSE_FORWARD_FAILED))
1182 /* check if a forward failed */
1183 thread_state = DSH_NO_RESP;
1184 else { /* some will fail that don't mean anything went
1185 * bad like a job term request on a job that is
1186 * already finished, we will just exit on those
1187 * cases */
1188 thread_state = DSH_DONE;
1189 }
1190 }
1191 ret_data_info->err = thread_state;
1192 }
1193 list_iterator_destroy(itr);
1194
1195 cleanup:
1196 xfree(args);
1197 if (!ret_list && (msg_type == REQUEST_SIGNAL_TASKS)) {
1198 job_record_t *job_ptr;
1199 signal_tasks_msg_t *msg_ptr =
1200 task_ptr->msg_args_ptr;
1201 if ((msg_ptr->signal == SIGCONT) ||
1202 (msg_ptr->signal == SIGSTOP)) {
1203 job_id = msg_ptr->job_id;
1204 lock_slurmctld(job_write_lock);
1205 job_ptr = find_job_record(job_id);
1206 if (job_ptr)
1207 job_ptr->job_state &= ~JOB_SIGNALING;
1208 unlock_slurmctld(job_write_lock);
1209 }
1210 }
1211 /* handled at end of thread just in case resend is needed */
1212 destroy_forward(&msg.forward);
1213 slurm_mutex_lock(thread_mutex_ptr);
1214 thread_ptr->ret_list = ret_list;
1215 thread_ptr->state = thread_state;
1216 thread_ptr->end_time = (time_t) difftime(time(NULL),
1217 thread_ptr->start_time);
1218 /* Signal completion so another thread can replace us */
1219 (*threads_active_ptr)--;
1220 slurm_cond_signal(thread_cond_ptr);
1221 slurm_mutex_unlock(thread_mutex_ptr);
1222 return (void *) NULL;
1223 }
1224
1225 /*
1226 * Signal handler. We are really interested in interrupting hung communictions
1227 * and causing them to return EINTR. Multiple interrupts might be required.
1228 */
_sig_handler(int dummy)1229 static void _sig_handler(int dummy)
1230 {
1231 }
1232
_setup_requeue(agent_arg_t * agent_arg_ptr,thd_t * thread_ptr,int * count,int * spot)1233 static int _setup_requeue(agent_arg_t *agent_arg_ptr, thd_t *thread_ptr,
1234 int *count, int *spot)
1235 {
1236 #ifdef HAVE_FRONT_END
1237 front_end_record_t *node_ptr;
1238 #else
1239 node_record_t *node_ptr;
1240 #endif
1241 ret_data_info_t *ret_data_info = NULL;
1242 ListIterator itr;
1243 int rc = 0;
1244
1245 itr = list_iterator_create(thread_ptr->ret_list);
1246 while ((ret_data_info = list_next(itr))) {
1247 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT)
1248 info("%s: got err of %d", __func__, ret_data_info->err);
1249 if (ret_data_info->err != DSH_NO_RESP)
1250 continue;
1251
1252 #ifdef HAVE_FRONT_END
1253 node_ptr = find_front_end_record(ret_data_info->node_name);
1254 #else
1255 node_ptr = find_node_record(ret_data_info->node_name);
1256 #endif
1257 if (node_ptr &&
1258 (IS_NODE_DOWN(node_ptr) || IS_NODE_POWER_SAVE(node_ptr))) {
1259 --(*count);
1260 } else if (agent_arg_ptr) {
1261 debug("%s: got the name %s to resend out of %d",
1262 __func__, ret_data_info->node_name, *count);
1263 hostlist_push_host(agent_arg_ptr->hostlist,
1264 ret_data_info->node_name);
1265 ++(*spot);
1266 }
1267
1268 if (*spot == *count) {
1269 rc = 1;
1270 break;
1271 }
1272 }
1273 list_iterator_destroy(itr);
1274
1275 return rc;
1276 }
1277
1278 /*
1279 * _queue_agent_retry - Queue any failed RPCs for later replay
1280 * IN agent_info_ptr - pointer to info on completed agent requests
1281 * IN count - number of agent requests which failed, count to requeue
1282 */
_queue_agent_retry(agent_info_t * agent_info_ptr,int count)1283 static void _queue_agent_retry(agent_info_t * agent_info_ptr, int count)
1284 {
1285 #ifdef HAVE_FRONT_END
1286 front_end_record_t *node_ptr;
1287 #else
1288 node_record_t *node_ptr;
1289 #endif
1290 agent_arg_t *agent_arg_ptr;
1291 queued_request_t *queued_req_ptr = NULL;
1292 thd_t *thread_ptr = agent_info_ptr->thread_struct;
1293 int i, j;
1294
1295 if (count == 0)
1296 return;
1297
1298 /* build agent argument with just the RPCs to retry */
1299 agent_arg_ptr = xmalloc(sizeof(agent_arg_t));
1300 agent_arg_ptr->node_count = count;
1301 agent_arg_ptr->retry = 1;
1302 agent_arg_ptr->hostlist = hostlist_create(NULL);
1303 agent_arg_ptr->msg_type = agent_info_ptr->msg_type;
1304 agent_arg_ptr->msg_args = *(agent_info_ptr->msg_args_pptr);
1305 *(agent_info_ptr->msg_args_pptr) = NULL;
1306
1307 j = 0;
1308 for (i = 0; i < agent_info_ptr->thread_count; i++) {
1309 if (!thread_ptr[i].ret_list) {
1310 if (thread_ptr[i].state != DSH_NO_RESP)
1311 continue;
1312
1313 debug("got the name %s to resend",
1314 thread_ptr[i].nodelist);
1315 #ifdef HAVE_FRONT_END
1316 node_ptr = find_front_end_record(
1317 thread_ptr[i].nodelist);
1318 #else
1319 node_ptr = find_node_record(thread_ptr[i].nodelist);
1320 #endif
1321 if (node_ptr &&
1322 (IS_NODE_DOWN(node_ptr) ||
1323 IS_NODE_POWER_SAVE(node_ptr))) {
1324 /* Do not re-send RPC to DOWN node */
1325 if (count)
1326 count--;
1327 } else {
1328 hostlist_push_host(agent_arg_ptr->hostlist,
1329 thread_ptr[i].nodelist);
1330 j++;
1331 }
1332 if (j == count)
1333 break;
1334 } else {
1335 if (_setup_requeue(agent_arg_ptr, &thread_ptr[i],
1336 &count, &j))
1337 break;
1338 }
1339 }
1340 if (count == 0) {
1341 /* All non-responding nodes are DOWN.
1342 * Do not requeue, but discard this RPC */
1343 hostlist_destroy(agent_arg_ptr->hostlist);
1344 *(agent_info_ptr->msg_args_pptr) = agent_arg_ptr->msg_args;
1345 xfree(agent_arg_ptr);
1346 return;
1347 }
1348 if (count != j) {
1349 error("agent: Retry count (%d) != actual count (%d)",
1350 count, j);
1351 agent_arg_ptr->node_count = j;
1352 }
1353 debug2("Queue RPC msg_type=%u, nodes=%d for retry",
1354 agent_arg_ptr->msg_type, j);
1355
1356 /* add the requeust to a list */
1357 queued_req_ptr = xmalloc(sizeof(queued_request_t));
1358 queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
1359 queued_req_ptr->last_attempt = time(NULL);
1360 slurm_mutex_lock(&retry_mutex);
1361 if (retry_list == NULL)
1362 retry_list = list_create(_list_delete_retry);
1363 (void) list_append(retry_list, (void *) queued_req_ptr);
1364 slurm_mutex_unlock(&retry_mutex);
1365 }
1366
1367 /*
1368 * _list_delete_retry - delete an entry from the retry list,
1369 * see common/list.h for documentation
1370 */
_list_delete_retry(void * retry_entry)1371 static void _list_delete_retry(void *retry_entry)
1372 {
1373 queued_request_t *queued_req_ptr;
1374
1375 if (! retry_entry)
1376 return;
1377
1378 queued_req_ptr = (queued_request_t *) retry_entry;
1379 _purge_agent_args(queued_req_ptr->agent_arg_ptr);
1380 xfree(queued_req_ptr);
1381 }
1382
1383 /* Start a thread to manage queued agent requests */
_agent_init(void * arg)1384 static void *_agent_init(void *arg)
1385 {
1386 int min_wait;
1387 bool mail_too;
1388 struct timespec ts = {0, 0};
1389 time_t last_defer_attempt = (time_t) 0;
1390
1391 while (true) {
1392 slurm_mutex_lock(&pending_mutex);
1393 while (!slurmctld_config.shutdown_time &&
1394 !pending_mail && (pending_wait_time == NO_VAL16)) {
1395 ts.tv_sec = time(NULL) + 2;
1396 slurm_cond_timedwait(&pending_cond, &pending_mutex,
1397 &ts);
1398 }
1399 if (slurmctld_config.shutdown_time) {
1400 slurm_mutex_unlock(&pending_mutex);
1401 break;
1402 }
1403 mail_too = pending_mail;
1404 min_wait = pending_wait_time;
1405 pending_mail = false;
1406 pending_wait_time = NO_VAL16;
1407 slurm_mutex_unlock(&pending_mutex);
1408
1409 if (last_defer_attempt + 2 < last_job_update) {
1410 last_defer_attempt = time(NULL);
1411 _agent_defer();
1412 }
1413
1414 _agent_retry(min_wait, mail_too);
1415 }
1416
1417 slurm_mutex_lock(&pending_mutex);
1418 pending_thread_running = false;
1419 slurm_mutex_unlock(&pending_mutex);
1420 return NULL;
1421 }
1422
agent_init(void)1423 extern void agent_init(void)
1424 {
1425 slurm_mutex_lock(&pending_mutex);
1426 if (pending_thread_running) {
1427 error("%s: thread already running", __func__);
1428 slurm_mutex_unlock(&pending_mutex);
1429 return;
1430 }
1431
1432 slurm_thread_create_detached(NULL, _agent_init, NULL);
1433 pending_thread_running = true;
1434 slurm_mutex_unlock(&pending_mutex);
1435 }
1436
1437 /*
1438 * agent_trigger - Request processing of pending RPCs
1439 * IN min_wait - Minimum wait time between re-issue of a pending RPC
1440 * IN mail_too - Send pending email too, note this performed using a
1441 * fork/waitpid, so it can take longer than just creating a pthread
1442 * to send RPCs
1443 */
agent_trigger(int min_wait,bool mail_too)1444 extern void agent_trigger(int min_wait, bool mail_too)
1445 {
1446 if (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT) {
1447 info("%s: pending_wait_time=%d->%d mail_too=%c->%c Agent_cnt=%d agent_thread_cnt=%d retry_list_size=%d",
1448 __func__, pending_wait_time, min_wait,
1449 mail_too ? 'T' : 'F',
1450 pending_mail ? 'T' : 'F',
1451 agent_cnt, agent_thread_cnt,
1452 retry_list_size());
1453 }
1454
1455 slurm_mutex_lock(&pending_mutex);
1456 if ((pending_wait_time == NO_VAL16) ||
1457 (pending_wait_time > min_wait))
1458 pending_wait_time = min_wait;
1459 if (mail_too)
1460 pending_mail = mail_too;
1461 slurm_cond_broadcast(&pending_cond);
1462 slurm_mutex_unlock(&pending_mutex);
1463 }
1464
1465 /* agent_pack_pending_rpc_stats - pack counts of pending RPCs into a buffer */
agent_pack_pending_rpc_stats(Buf buffer)1466 extern void agent_pack_pending_rpc_stats(Buf buffer)
1467 {
1468 time_t now;
1469 int i;
1470 queued_request_t *queued_req_ptr = NULL;
1471 agent_arg_t *agent_arg_ptr = NULL;
1472 ListIterator list_iter;
1473
1474 now = time(NULL);
1475 if (difftime(now, cache_build_time) <= RPC_PACK_MAX_AGE)
1476 goto pack_it; /* Send cached data */
1477 cache_build_time = now;
1478
1479 if (rpc_stat_counts) { /* Clear existing data */
1480 stat_type_count = 0;
1481 memset(rpc_stat_counts, 0, sizeof(uint32_t) * MAX_RPC_PACK_CNT);
1482 memset(rpc_stat_types, 0, sizeof(uint32_t) * MAX_RPC_PACK_CNT);
1483
1484 rpc_count = 0;
1485 /* the other variables need not be cleared */
1486 } else { /* Allocate buffers for data */
1487 stat_type_count = 0;
1488 rpc_stat_counts = xcalloc(MAX_RPC_PACK_CNT, sizeof(uint32_t));
1489 rpc_stat_types = xcalloc(MAX_RPC_PACK_CNT, sizeof(uint32_t));
1490
1491 rpc_count = 0;
1492 rpc_host_list = xcalloc(DUMP_RPC_COUNT, sizeof(char *));
1493 for (i = 0; i < DUMP_RPC_COUNT; i++) {
1494 rpc_host_list[i] = xmalloc(HOSTLIST_MAX_SIZE);
1495 }
1496 rpc_type_list = xcalloc(DUMP_RPC_COUNT, sizeof(uint32_t));
1497 }
1498
1499 slurm_mutex_lock(&retry_mutex);
1500 if (retry_list) {
1501 list_iter = list_iterator_create(retry_list);
1502 /* iterate through list, find type slot or make a new one */
1503 while ((queued_req_ptr = list_next(list_iter))) {
1504 agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1505 if (rpc_count < DUMP_RPC_COUNT) {
1506 rpc_type_list[rpc_count] =
1507 agent_arg_ptr->msg_type;
1508 hostlist_ranged_string(agent_arg_ptr->hostlist,
1509 HOSTLIST_MAX_SIZE,
1510 rpc_host_list[rpc_count]);
1511 rpc_count++;
1512 }
1513 for (i = 0; i < MAX_RPC_PACK_CNT; i++) {
1514 if (rpc_stat_types[i] == 0) {
1515 rpc_stat_types[i] =
1516 agent_arg_ptr->msg_type;
1517 stat_type_count++;
1518 } else if (rpc_stat_types[i] !=
1519 agent_arg_ptr->msg_type)
1520 continue;
1521 rpc_stat_counts[i]++;
1522 break;
1523 }
1524 }
1525 list_iterator_destroy(list_iter);
1526 }
1527 slurm_mutex_unlock(&retry_mutex);
1528
1529 pack_it:
1530 pack32_array(rpc_stat_types, stat_type_count, buffer);
1531 pack32_array(rpc_stat_counts, stat_type_count, buffer);
1532
1533 pack32_array(rpc_type_list, rpc_count, buffer);
1534 packstr_array(rpc_host_list, rpc_count, buffer);
1535 }
1536
_agent_defer(void)1537 static void _agent_defer(void)
1538 {
1539 int rc = -1;
1540 queued_request_t *queued_req_ptr = NULL;
1541 agent_arg_t *agent_arg_ptr = NULL;
1542 /* Write lock on jobs */
1543 slurmctld_lock_t job_write_lock = { .job = WRITE_LOCK };
1544
1545 lock_slurmctld(job_write_lock);
1546 slurm_mutex_lock(&defer_mutex);
1547 if (defer_list) {
1548 List tmp_list = NULL;
1549 /* first try to find a new (never tried) record */
1550 while ((queued_req_ptr = list_pop(defer_list))) {
1551 agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1552 if (agent_arg_ptr->msg_type ==
1553 REQUEST_BATCH_JOB_LAUNCH)
1554 rc = _batch_launch_defer(queued_req_ptr);
1555 else if (agent_arg_ptr->msg_type ==
1556 REQUEST_SIGNAL_TASKS)
1557 rc = _signal_defer(queued_req_ptr);
1558 else
1559 fatal("%s: Invalid message type (%u)",
1560 __func__, agent_arg_ptr->msg_type);
1561
1562 if (rc == -1) { /* abort request */
1563 _purge_agent_args(
1564 queued_req_ptr->agent_arg_ptr);
1565 xfree(queued_req_ptr);
1566 } else if (rc == 0) {
1567 /* ready to process now, move to retry_list */
1568 slurm_mutex_lock(&retry_mutex);
1569 if (!retry_list)
1570 retry_list =
1571 list_create(_list_delete_retry);
1572 list_append(retry_list, queued_req_ptr);
1573 slurm_mutex_unlock(&retry_mutex);
1574 } else if (rc == 1) {
1575 if (!tmp_list)
1576 tmp_list =
1577 list_create(_list_delete_retry);
1578 list_append(tmp_list, (void *)queued_req_ptr);
1579 }
1580 }
1581
1582 if (tmp_list) {
1583 list_transfer(defer_list, tmp_list);
1584 FREE_NULL_LIST(tmp_list);
1585 }
1586 }
1587
1588 slurm_mutex_unlock(&defer_mutex);
1589 unlock_slurmctld(job_write_lock);
1590
1591 return;
1592 }
1593
1594 /* Do the work requested by agent_retry (retry pending RPCs).
1595 * This is a separate thread so the job records can be locked */
_agent_retry(int min_wait,bool mail_too)1596 static void _agent_retry(int min_wait, bool mail_too)
1597 {
1598 time_t now = time(NULL);
1599 queued_request_t *queued_req_ptr = NULL;
1600 agent_arg_t *agent_arg_ptr = NULL;
1601 ListIterator retry_iter;
1602 mail_info_t *mi = NULL;
1603
1604 slurm_mutex_lock(&retry_mutex);
1605 if (retry_list) {
1606 static time_t last_msg_time = (time_t) 0;
1607 uint32_t msg_type[5] = {0, 0, 0, 0, 0};
1608 int i = 0, list_size = list_count(retry_list);
1609 if (((list_size > 100) &&
1610 (difftime(now, last_msg_time) > 300)) ||
1611 ((list_size > 0) &&
1612 (slurmctld_conf.debug_flags & DEBUG_FLAG_AGENT))) {
1613 /* Note sizable backlog (retry_list_size()) of work */
1614 retry_iter = list_iterator_create(retry_list);
1615 while ((queued_req_ptr = list_next(retry_iter))) {
1616 agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1617 msg_type[i++] = agent_arg_ptr->msg_type;
1618 if (i == 5)
1619 break;
1620 }
1621 list_iterator_destroy(retry_iter);
1622 info(" retry_list retry_list_size:%d msg_type=%s,%s,%s,%s,%s",
1623 list_size, rpc_num2string(msg_type[0]),
1624 rpc_num2string(msg_type[1]),
1625 rpc_num2string(msg_type[2]),
1626 rpc_num2string(msg_type[3]),
1627 rpc_num2string(msg_type[4]));
1628 last_msg_time = now;
1629 }
1630 }
1631
1632 slurm_mutex_lock(&agent_cnt_mutex);
1633 if (agent_thread_cnt + AGENT_THREAD_COUNT + 2 > MAX_SERVER_THREADS) {
1634 /* too much work already */
1635 slurm_mutex_unlock(&agent_cnt_mutex);
1636 slurm_mutex_unlock(&retry_mutex);
1637 return;
1638 }
1639 slurm_mutex_unlock(&agent_cnt_mutex);
1640
1641 if (retry_list) {
1642 /* first try to find a new (never tried) record */
1643 retry_iter = list_iterator_create(retry_list);
1644 while ((queued_req_ptr = list_next(retry_iter))) {
1645 if (queued_req_ptr->last_attempt == 0) {
1646 list_remove(retry_iter);
1647 break; /* Process this request now */
1648 }
1649 }
1650 list_iterator_destroy(retry_iter);
1651 }
1652
1653 if (retry_list && (queued_req_ptr == NULL)) {
1654 /* now try to find a requeue request that is
1655 * relatively old */
1656 double age = 0;
1657
1658 retry_iter = list_iterator_create(retry_list);
1659 /* next try to find an older record to retry */
1660 while ((queued_req_ptr = list_next(retry_iter))) {
1661 age = difftime(now, queued_req_ptr->last_attempt);
1662 if (age > min_wait) {
1663 list_remove(retry_iter);
1664 break;
1665 }
1666 }
1667 list_iterator_destroy(retry_iter);
1668 }
1669 slurm_mutex_unlock(&retry_mutex);
1670
1671 if (queued_req_ptr) {
1672 agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
1673 xfree(queued_req_ptr);
1674 if (agent_arg_ptr) {
1675 debug2("Spawning RPC agent for msg_type %s",
1676 rpc_num2string(agent_arg_ptr->msg_type));
1677 slurm_thread_create_detached(NULL, agent, agent_arg_ptr);
1678 } else
1679 error("agent_retry found record with no agent_args");
1680 } else if (mail_too) {
1681 slurm_mutex_lock(&agent_cnt_mutex);
1682 slurm_mutex_lock(&mail_mutex);
1683 while (mail_list && (agent_thread_cnt < MAX_SERVER_THREADS) &&
1684 (mail_thread_cnt < MAX_MAIL_THREADS)) {
1685 mi = (mail_info_t *) list_dequeue(mail_list);
1686 if (!mi)
1687 break;
1688
1689 mail_thread_cnt++;
1690 agent_thread_cnt++;
1691 slurm_thread_create_detached(NULL, _mail_proc, mi);
1692 }
1693 slurm_mutex_unlock(&mail_mutex);
1694 slurm_mutex_unlock(&agent_cnt_mutex);
1695 }
1696
1697 return;
1698 }
1699
1700 /*
1701 * agent_queue_request - put a new request on the queue for execution or
1702 * execute now if not too busy
1703 * IN agent_arg_ptr - the request to enqueue
1704 */
agent_queue_request(agent_arg_t * agent_arg_ptr)1705 void agent_queue_request(agent_arg_t *agent_arg_ptr)
1706 {
1707 queued_request_t *queued_req_ptr = NULL;
1708
1709 if ((AGENT_THREAD_COUNT + 2) >= MAX_SERVER_THREADS)
1710 fatal("AGENT_THREAD_COUNT value is too high relative to MAX_SERVER_THREADS");
1711
1712 if (message_timeout == NO_VAL16) {
1713 message_timeout = MAX(slurm_get_msg_timeout(), 30);
1714 }
1715
1716 if (agent_arg_ptr->msg_type == REQUEST_SHUTDOWN) {
1717 /* execute now */
1718 slurm_thread_create_detached(NULL, agent, agent_arg_ptr);
1719 /* give agent a chance to start */
1720 usleep(10000);
1721 return;
1722 }
1723
1724 queued_req_ptr = xmalloc(sizeof(queued_request_t));
1725 queued_req_ptr->agent_arg_ptr = agent_arg_ptr;
1726 /* queued_req_ptr->last_attempt = 0; Implicit */
1727
1728 if (((agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) &&
1729 (_batch_launch_defer(queued_req_ptr) != 0)) ||
1730 ((agent_arg_ptr->msg_type == REQUEST_SIGNAL_TASKS) &&
1731 (_signal_defer(queued_req_ptr) != 0))) {
1732 slurm_mutex_lock(&defer_mutex);
1733 if (defer_list == NULL)
1734 defer_list = list_create(_list_delete_retry);
1735 list_append(defer_list, (void *)queued_req_ptr);
1736 slurm_mutex_unlock(&defer_mutex);
1737 } else {
1738 slurm_mutex_lock(&retry_mutex);
1739 if (retry_list == NULL)
1740 retry_list = list_create(_list_delete_retry);
1741 list_append(retry_list, (void *)queued_req_ptr);
1742 slurm_mutex_unlock(&retry_mutex);
1743 }
1744 /* now process the request in a separate pthread
1745 * (if we can create another pthread to do so) */
1746 agent_trigger(999, false);
1747 }
1748
1749 /* agent_purge - purge all pending RPC requests */
agent_purge(void)1750 extern void agent_purge(void)
1751 {
1752 int i;
1753
1754 if (retry_list) {
1755 slurm_mutex_lock(&retry_mutex);
1756 FREE_NULL_LIST(retry_list);
1757 slurm_mutex_unlock(&retry_mutex);
1758 }
1759 if (defer_list) {
1760 slurm_mutex_lock(&defer_mutex);
1761 FREE_NULL_LIST(defer_list);
1762 slurm_mutex_unlock(&defer_mutex);
1763 }
1764 if (mail_list) {
1765 slurm_mutex_lock(&mail_mutex);
1766 FREE_NULL_LIST(mail_list);
1767 slurm_mutex_unlock(&mail_mutex);
1768 }
1769
1770 xfree(rpc_stat_counts);
1771 xfree(rpc_stat_types);
1772 xfree(rpc_type_list);
1773 if (rpc_host_list) {
1774 for (i = 0; i < DUMP_RPC_COUNT; i++)
1775 xfree(rpc_host_list[i]);
1776 xfree(rpc_host_list);
1777 }
1778 }
1779
get_agent_count(void)1780 extern int get_agent_count(void)
1781 {
1782 int cnt;
1783
1784 slurm_mutex_lock(&agent_cnt_mutex);
1785 cnt = agent_cnt;
1786 slurm_mutex_unlock(&agent_cnt_mutex);
1787
1788 return cnt;
1789 }
1790
get_agent_thread_count(void)1791 extern int get_agent_thread_count(void)
1792 {
1793 int cnt;
1794
1795 slurm_mutex_lock(&agent_cnt_mutex);
1796 cnt = agent_thread_cnt;
1797 slurm_mutex_unlock(&agent_cnt_mutex);
1798
1799 return cnt;
1800 }
1801
_purge_agent_args(agent_arg_t * agent_arg_ptr)1802 static void _purge_agent_args(agent_arg_t *agent_arg_ptr)
1803 {
1804 if (agent_arg_ptr == NULL)
1805 return;
1806
1807 hostlist_destroy(agent_arg_ptr->hostlist);
1808 xfree(agent_arg_ptr->addr);
1809 if (agent_arg_ptr->msg_args) {
1810 if (agent_arg_ptr->msg_type == REQUEST_BATCH_JOB_LAUNCH) {
1811 slurm_free_job_launch_msg(agent_arg_ptr->msg_args);
1812 } else if (agent_arg_ptr->msg_type ==
1813 RESPONSE_RESOURCE_ALLOCATION) {
1814 resource_allocation_response_msg_t *alloc_msg =
1815 agent_arg_ptr->msg_args;
1816 /* NULL out working_cluster_rec because it's pointing to
1817 * the actual cluster_rec. */
1818 alloc_msg->working_cluster_rec = NULL;
1819 slurm_free_resource_allocation_response_msg(
1820 agent_arg_ptr->msg_args);
1821 } else if (agent_arg_ptr->msg_type ==
1822 RESPONSE_HET_JOB_ALLOCATION) {
1823 List alloc_list = agent_arg_ptr->msg_args;
1824 FREE_NULL_LIST(alloc_list);
1825 } else if ((agent_arg_ptr->msg_type == REQUEST_ABORT_JOB) ||
1826 (agent_arg_ptr->msg_type == REQUEST_TERMINATE_JOB) ||
1827 (agent_arg_ptr->msg_type == REQUEST_KILL_PREEMPTED) ||
1828 (agent_arg_ptr->msg_type == REQUEST_KILL_TIMELIMIT))
1829 slurm_free_kill_job_msg(agent_arg_ptr->msg_args);
1830 else if (agent_arg_ptr->msg_type == SRUN_USER_MSG)
1831 slurm_free_srun_user_msg(agent_arg_ptr->msg_args);
1832 else if (agent_arg_ptr->msg_type == SRUN_EXEC)
1833 slurm_free_srun_exec_msg(agent_arg_ptr->msg_args);
1834 else if (agent_arg_ptr->msg_type == SRUN_NODE_FAIL)
1835 slurm_free_srun_node_fail_msg(agent_arg_ptr->msg_args);
1836 else if (agent_arg_ptr->msg_type == SRUN_STEP_MISSING)
1837 slurm_free_srun_step_missing_msg(
1838 agent_arg_ptr->msg_args);
1839 else if (agent_arg_ptr->msg_type == SRUN_STEP_SIGNAL)
1840 slurm_free_job_step_kill_msg(
1841 agent_arg_ptr->msg_args);
1842 else if (agent_arg_ptr->msg_type == REQUEST_JOB_NOTIFY)
1843 slurm_free_job_notify_msg(agent_arg_ptr->msg_args);
1844 else if (agent_arg_ptr->msg_type == REQUEST_SUSPEND_INT)
1845 slurm_free_suspend_int_msg(agent_arg_ptr->msg_args);
1846 else if (agent_arg_ptr->msg_type == REQUEST_LAUNCH_PROLOG)
1847 slurm_free_prolog_launch_msg(agent_arg_ptr->msg_args);
1848 else
1849 xfree(agent_arg_ptr->msg_args);
1850 }
1851 xfree(agent_arg_ptr);
1852 }
1853
_mail_alloc(void)1854 static mail_info_t *_mail_alloc(void)
1855 {
1856 return xmalloc(sizeof(mail_info_t));
1857 }
1858
_mail_free(void * arg)1859 static void _mail_free(void *arg)
1860 {
1861 mail_info_t *mi = (mail_info_t *) arg;
1862
1863 if (mi) {
1864 xfree(mi->user_name);
1865 xfree(mi->message);
1866 xfree(mi);
1867 }
1868 }
1869
_build_mail_env(void)1870 static char **_build_mail_env(void)
1871 {
1872 char **my_env;
1873
1874 my_env = xmalloc(sizeof(char *));
1875 my_env[0] = NULL;
1876 if (slurmctld_conf.cluster_name) {
1877 setenvf(&my_env, "SLURM_CLUSTER_NAME", "%s",
1878 slurmctld_conf.cluster_name);
1879 }
1880
1881 return my_env;
1882 }
1883
1884 /* process an email request and free the record */
_mail_proc(void * arg)1885 static void *_mail_proc(void *arg)
1886 {
1887 mail_info_t *mi = (mail_info_t *) arg;
1888 pid_t pid;
1889
1890 pid = fork();
1891 if (pid < 0) { /* error */
1892 error("fork(): %m");
1893 } else if (pid == 0) { /* child */
1894 int fd_0, fd_1, fd_2, i;
1895 char **my_env = NULL;
1896 my_env = _build_mail_env();
1897 for (i = 0; i < 1024; i++)
1898 (void) close(i);
1899 if ((fd_0 = open("/dev/null", O_RDWR)) == -1) // fd = 0
1900 error("Couldn't open /dev/null: %m");
1901 if ((fd_1 = dup(fd_0)) == -1) // fd = 1
1902 error("Couldn't do a dup on fd 1: %m");
1903 if ((fd_2 = dup(fd_0)) == -1) // fd = 2
1904 error("Couldn't do a dup on fd 2 %m");
1905 execle(slurmctld_conf.mail_prog, "mail",
1906 "-s", mi->message, mi->user_name,
1907 NULL, my_env);
1908 error("Failed to exec %s: %m",
1909 slurmctld_conf.mail_prog);
1910 _exit(1);
1911 } else { /* parent */
1912 waitpid(pid, NULL, 0);
1913 }
1914 _mail_free(mi);
1915 slurm_mutex_lock(&agent_cnt_mutex);
1916 slurm_mutex_lock(&mail_mutex);
1917 if (agent_thread_cnt)
1918 agent_thread_cnt--;
1919 else
1920 error("agent_thread_cnt underflow");
1921 if (mail_thread_cnt)
1922 mail_thread_cnt--;
1923 else
1924 error("mail_thread_cnt underflow");
1925 slurm_mutex_unlock(&mail_mutex);
1926 slurm_mutex_unlock(&agent_cnt_mutex);
1927
1928 return (void *) NULL;
1929 }
1930
_mail_type_str(uint16_t mail_type)1931 static char *_mail_type_str(uint16_t mail_type)
1932 {
1933 if (mail_type == MAIL_JOB_BEGIN)
1934 return "Began";
1935 if (mail_type == MAIL_JOB_END)
1936 return "Ended";
1937 if (mail_type == MAIL_JOB_FAIL)
1938 return "Failed";
1939 if (mail_type == MAIL_JOB_REQUEUE)
1940 return "Requeued";
1941 if (mail_type == MAIL_JOB_STAGE_OUT)
1942 return "StageOut/Teardown";
1943 if (mail_type == MAIL_JOB_TIME100)
1944 return "Reached time limit";
1945 if (mail_type == MAIL_JOB_TIME90)
1946 return "Reached 90% of time limit";
1947 if (mail_type == MAIL_JOB_TIME80)
1948 return "Reached 80% of time limit";
1949 if (mail_type == MAIL_JOB_TIME50)
1950 return "Reached 50% of time limit";
1951 return "unknown";
1952 }
1953
_set_job_time(job_record_t * job_ptr,uint16_t mail_type,char * buf,int buf_len)1954 static void _set_job_time(job_record_t *job_ptr, uint16_t mail_type,
1955 char *buf, int buf_len)
1956 {
1957 time_t interval = NO_VAL;
1958
1959 buf[0] = '\0';
1960 if ((mail_type == MAIL_JOB_BEGIN) && job_ptr->start_time &&
1961 job_ptr->details && job_ptr->details->submit_time) {
1962 interval = job_ptr->start_time - job_ptr->details->submit_time;
1963 snprintf(buf, buf_len, ", Queued time ");
1964 secs2time_str(interval, buf+14, buf_len-14);
1965 return;
1966 }
1967
1968 if (((mail_type == MAIL_JOB_END) || (mail_type == MAIL_JOB_FAIL) ||
1969 (mail_type == MAIL_JOB_REQUEUE)) &&
1970 (job_ptr->start_time && job_ptr->end_time)) {
1971 if (job_ptr->suspend_time) {
1972 interval = job_ptr->end_time - job_ptr->suspend_time;
1973 interval += job_ptr->pre_sus_time;
1974 } else
1975 interval = job_ptr->end_time - job_ptr->start_time;
1976 snprintf(buf, buf_len, ", Run time ");
1977 secs2time_str(interval, buf+11, buf_len-11);
1978 return;
1979 }
1980
1981 if (((mail_type == MAIL_JOB_TIME100) ||
1982 (mail_type == MAIL_JOB_TIME90) ||
1983 (mail_type == MAIL_JOB_TIME80) ||
1984 (mail_type == MAIL_JOB_TIME50)) && job_ptr->start_time) {
1985 if (job_ptr->suspend_time) {
1986 interval = time(NULL) - job_ptr->suspend_time;
1987 interval += job_ptr->pre_sus_time;
1988 } else
1989 interval = time(NULL) - job_ptr->start_time;
1990 snprintf(buf, buf_len, ", Run time ");
1991 secs2time_str(interval, buf+11, buf_len-11);
1992 return;
1993 }
1994
1995 if ((mail_type == MAIL_JOB_STAGE_OUT) && job_ptr->end_time) {
1996 interval = time(NULL) - job_ptr->end_time;
1997 snprintf(buf, buf_len, " time ");
1998 secs2time_str(interval, buf + 6, buf_len - 6);
1999 return;
2000 }
2001 }
2002
_set_job_term_info(job_record_t * job_ptr,uint16_t mail_type,char * buf,int buf_len)2003 static void _set_job_term_info(job_record_t *job_ptr, uint16_t mail_type,
2004 char *buf, int buf_len)
2005 {
2006 buf[0] = '\0';
2007
2008 if ((mail_type == MAIL_JOB_END) || (mail_type == MAIL_JOB_FAIL)) {
2009 uint16_t base_state;
2010 uint32_t exit_status_min, exit_status_max;
2011 int exit_code_min, exit_code_max;
2012
2013 base_state = job_ptr->job_state & JOB_STATE_BASE;
2014 if (job_ptr->array_recs &&
2015 !(job_ptr->mail_type & MAIL_ARRAY_TASKS)) {
2016 /* Summarize array tasks. */
2017 exit_status_min = job_ptr->array_recs->min_exit_code;
2018 exit_status_max = job_ptr->array_recs->max_exit_code;
2019 if (WIFEXITED(exit_status_min) &&
2020 WIFEXITED(exit_status_max)) {
2021 char *state_string;
2022 exit_code_min = WEXITSTATUS(exit_status_min);
2023 exit_code_max = WEXITSTATUS(exit_status_max);
2024 if ((exit_code_min == 0) && (exit_code_max > 0))
2025 state_string = "Mixed";
2026 else {
2027 state_string =
2028 job_state_string(base_state);
2029 }
2030 snprintf(buf, buf_len, ", %s, ExitCode [%d-%d]",
2031 state_string, exit_code_min,
2032 exit_code_max);
2033 } else if (WIFSIGNALED(exit_status_max)) {
2034 exit_code_max = WTERMSIG(exit_status_max);
2035 snprintf(buf, buf_len, ", %s, MaxSignal [%d]",
2036 "Mixed", exit_code_max);
2037 } else if (WIFEXITED(exit_status_max)) {
2038 exit_code_max = WEXITSTATUS(exit_status_max);
2039 snprintf(buf, buf_len, ", %s, MaxExitCode [%d]",
2040 "Mixed", exit_code_max);
2041 } else {
2042 snprintf(buf, buf_len, ", %s",
2043 job_state_string(base_state));
2044 }
2045
2046 if (job_ptr->array_recs->array_flags &
2047 ARRAY_TASK_REQUEUED)
2048 strncat(buf, ", with requeued tasks",
2049 buf_len - strlen(buf) - 1);
2050 } else {
2051 exit_status_max = job_ptr->exit_code;
2052 if (WIFEXITED(exit_status_max)) {
2053 exit_code_max = WEXITSTATUS(exit_status_max);
2054 snprintf(buf, buf_len, ", %s, ExitCode %d",
2055 job_state_string(base_state),
2056 exit_code_max);
2057 } else {
2058 snprintf(buf, buf_len, ", %s",
2059 job_state_string(base_state));
2060 }
2061 }
2062 } else if (buf_len > 0) {
2063 buf[0] = '\0';
2064 }
2065 }
2066
2067 /*
2068 * mail_job_info - Send e-mail notice of job state change
2069 * IN job_ptr - job identification
2070 * IN state_type - job transition type, see MAIL_JOB in slurm.h
2071 */
mail_job_info(job_record_t * job_ptr,uint16_t mail_type)2072 extern void mail_job_info(job_record_t *job_ptr, uint16_t mail_type)
2073 {
2074 char job_time[128], term_msg[128];
2075 mail_info_t *mi;
2076
2077 /*
2078 * Send mail only for first component (leader) of a hetjob,
2079 * not the individual job components.
2080 */
2081 if (job_ptr->het_job_id && (job_ptr->het_job_offset != 0))
2082 return;
2083
2084 mi = _mail_alloc();
2085 mi->user_name = xstrdup(job_ptr->mail_user);
2086
2087 /* Use job array master record, if available */
2088 if (!(job_ptr->mail_type & MAIL_ARRAY_TASKS) &&
2089 (job_ptr->array_task_id != NO_VAL) && !job_ptr->array_recs) {
2090 job_record_t *master_job_ptr;
2091 master_job_ptr = find_job_record(job_ptr->array_job_id);
2092 if (master_job_ptr && master_job_ptr->array_recs)
2093 job_ptr = master_job_ptr;
2094 }
2095
2096 _set_job_time(job_ptr, mail_type, job_time, sizeof(job_time));
2097 _set_job_term_info(job_ptr, mail_type, term_msg, sizeof(term_msg));
2098 if (job_ptr->array_recs && !(job_ptr->mail_type & MAIL_ARRAY_TASKS)) {
2099 mi->message = xstrdup_printf("Slurm Array Summary Job_id=%u_* (%u) Name=%s "
2100 "%s%s",
2101 job_ptr->array_job_id,
2102 job_ptr->job_id, job_ptr->name,
2103 _mail_type_str(mail_type),
2104 term_msg);
2105 } else if (job_ptr->array_task_id != NO_VAL) {
2106 mi->message = xstrdup_printf("Slurm Array Task Job_id=%u_%u (%u) Name=%s "
2107 "%s%s%s",
2108 job_ptr->array_job_id,
2109 job_ptr->array_task_id,
2110 job_ptr->job_id, job_ptr->name,
2111 _mail_type_str(mail_type),
2112 job_time, term_msg);
2113 } else {
2114 mi->message = xstrdup_printf("Slurm Job_id=%u Name=%s %s%s%s",
2115 job_ptr->job_id, job_ptr->name,
2116 _mail_type_str(mail_type),
2117 job_time, term_msg);
2118 }
2119 info("email msg to %s: %s", mi->user_name, mi->message);
2120
2121 slurm_mutex_lock(&mail_mutex);
2122 if (!mail_list)
2123 mail_list = list_create(_mail_free);
2124 (void) list_enqueue(mail_list, (void *) mi);
2125 slurm_mutex_unlock(&mail_mutex);
2126 return;
2127 }
2128
2129 /* Test if a batch launch request should be defered
2130 * RET -1: abort the request, pending job cancelled
2131 * 0: execute the request now
2132 * 1: defer the request
2133 */
_batch_launch_defer(queued_request_t * queued_req_ptr)2134 static int _batch_launch_defer(queued_request_t *queued_req_ptr)
2135 {
2136 agent_arg_t *agent_arg_ptr;
2137 batch_job_launch_msg_t *launch_msg_ptr;
2138 time_t now = time(NULL);
2139 job_record_t *job_ptr;
2140 int nodes_ready = 0, tmp = 0;
2141
2142 agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
2143 if (difftime(now, queued_req_ptr->last_attempt) < 10) {
2144 /* Reduce overhead by only testing once every 10 secs */
2145 return 1;
2146 }
2147
2148 launch_msg_ptr = (batch_job_launch_msg_t *)agent_arg_ptr->msg_args;
2149 job_ptr = find_job_record(launch_msg_ptr->job_id);
2150 if ((job_ptr == NULL) ||
2151 (!IS_JOB_RUNNING(job_ptr) && !IS_JOB_SUSPENDED(job_ptr))) {
2152 info("agent(batch_launch): removed pending request for cancelled JobId=%u",
2153 launch_msg_ptr->job_id);
2154 return -1; /* job cancelled while waiting */
2155 }
2156
2157 if (job_ptr->details && job_ptr->details->prolog_running) {
2158 debug2("%s: JobId=%u still waiting on %u prologs",
2159 __func__, job_ptr->job_id,
2160 job_ptr->details->prolog_running);
2161 return 1;
2162 }
2163
2164 if (job_ptr->wait_all_nodes) {
2165 (void) job_node_ready(launch_msg_ptr->job_id, &tmp);
2166 if (tmp == (READY_JOB_STATE | READY_NODE_STATE)) {
2167 nodes_ready = 1;
2168 if (launch_msg_ptr->alias_list &&
2169 !xstrcmp(launch_msg_ptr->alias_list, "TBD")) {
2170 /* Update launch RPC with correct node
2171 * aliases */
2172 xfree(launch_msg_ptr->alias_list);
2173 launch_msg_ptr->alias_list = xstrdup(job_ptr->
2174 alias_list);
2175 }
2176 }
2177 } else {
2178 #ifdef HAVE_FRONT_END
2179 nodes_ready = 1;
2180 #else
2181 node_record_t *node_ptr;
2182 char *hostname;
2183
2184 hostname = hostlist_deranged_string_xmalloc(
2185 agent_arg_ptr->hostlist);
2186 node_ptr = find_node_record(hostname);
2187 if (node_ptr == NULL) {
2188 error("agent(batch_launch) removed pending request for JobId=%u, missing node %s",
2189 launch_msg_ptr->job_id, hostname);
2190 xfree(hostname);
2191 return -1; /* invalid request?? */
2192 }
2193 xfree(hostname);
2194 if (!IS_NODE_POWER_SAVE(node_ptr) &&
2195 !IS_NODE_NO_RESPOND(node_ptr)) {
2196 nodes_ready = 1;
2197 }
2198 #endif
2199 }
2200
2201 if (nodes_ready) {
2202 if (IS_JOB_CONFIGURING(job_ptr))
2203 job_config_fini(job_ptr);
2204 queued_req_ptr->last_attempt = (time_t) 0;
2205 return 0;
2206 }
2207
2208 if (queued_req_ptr->last_attempt == 0) {
2209 queued_req_ptr->first_attempt = now;
2210 queued_req_ptr->last_attempt = now;
2211 } else if (difftime(now, queued_req_ptr->first_attempt) >=
2212 slurm_get_resume_timeout()) {
2213 /* Nodes will get marked DOWN and job requeued, if possible */
2214 error("agent waited too long for nodes to respond, abort launch of JobId=%u",
2215 job_ptr->job_id);
2216 return -1;
2217 }
2218
2219 queued_req_ptr->last_attempt = now;
2220 return 1;
2221 }
2222
2223 /* Test if a job signal request should be defered
2224 * RET -1: abort the request
2225 * 0: execute the request now
2226 * 1: defer the request
2227 */
_signal_defer(queued_request_t * queued_req_ptr)2228 static int _signal_defer(queued_request_t *queued_req_ptr)
2229 {
2230 agent_arg_t *agent_arg_ptr;
2231 signal_tasks_msg_t *signal_msg_ptr;
2232 time_t now = time(NULL);
2233 job_record_t *job_ptr;
2234
2235 agent_arg_ptr = queued_req_ptr->agent_arg_ptr;
2236 signal_msg_ptr = (signal_tasks_msg_t *)agent_arg_ptr->msg_args;
2237 job_ptr = find_job_record(signal_msg_ptr->job_id);
2238
2239 if (job_ptr == NULL) {
2240 info("agent(signal_task): removed pending request for cancelled JobId=%u",
2241 signal_msg_ptr->job_id);
2242 return -1; /* job cancelled while waiting */
2243 }
2244
2245 if (job_ptr->state_reason != WAIT_PROLOG)
2246 return 0;
2247
2248 if (queued_req_ptr->first_attempt == 0) {
2249 queued_req_ptr->first_attempt = now;
2250 } else if (difftime(now, queued_req_ptr->first_attempt) >=
2251 2 * slurm_get_batch_start_timeout()) {
2252 error("agent waited too long for nodes to respond, abort signal of JobId=%u",
2253 job_ptr->job_id);
2254 return -1;
2255 }
2256
2257 return 1;
2258 }
2259
2260 /* Return length of agent's retry_list */
retry_list_size(void)2261 extern int retry_list_size(void)
2262 {
2263 if (retry_list == NULL)
2264 return 0;
2265 return list_count(retry_list);
2266 }
2267
_reboot_from_ctld(agent_arg_t * agent_arg_ptr)2268 static void _reboot_from_ctld(agent_arg_t *agent_arg_ptr)
2269 {
2270 char *argv[3], *pname;
2271 pid_t child;
2272 int i, rc, status = 0;
2273
2274 if (!agent_arg_ptr->hostlist) {
2275 error("%s: hostlist is NULL", __func__);
2276 return;
2277 }
2278 if (!slurmctld_conf.reboot_program) {
2279 error("%s: RebootProgram is NULL", __func__);
2280 return;
2281 }
2282
2283 pname = strrchr(slurmctld_conf.reboot_program, '/');
2284 if (pname)
2285 argv[0] = pname + 1;
2286 else
2287 argv[0] = slurmctld_conf.reboot_program;
2288 argv[1] = hostlist_deranged_string_xmalloc(agent_arg_ptr->hostlist);
2289 argv[2] = NULL;
2290
2291 child = fork();
2292 if (child == 0) {
2293 for (i = 0; i < 1024; i++)
2294 (void) close(i);
2295 (void) setpgid(0, 0);
2296 (void) execv(slurmctld_conf.reboot_program, argv);
2297 _exit(1);
2298 } else if (child < 0) {
2299 error("fork: %m");
2300 } else {
2301 (void) waitpid(child, &status, 0);
2302 if (WIFEXITED(status)) {
2303 rc = WEXITSTATUS(status);
2304 if (rc != 0) {
2305 error("RebootProgram exit status of %d",
2306 rc);
2307 }
2308 } else if (WIFSIGNALED(status)) {
2309 error("RebootProgram signaled: %s",
2310 strsignal(WTERMSIG(status)));
2311 }
2312 }
2313 xfree(argv[1]);
2314 }
2315