1 /*****************************************************************************\
2  *  job_info.c - get/print the job state information of slurm
3  *****************************************************************************
4  *  Portions Copyright (C) 2010-2017 SchedMD LLC <https://www.schedmd.com>.
5  *  Copyright (C) 2002-2007 The Regents of the University of California.
6  *  Copyright (C) 2008-2010 Lawrence Livermore National Security.
7  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
8  *  Written by Morris Jette <jette1@llnl.gov> et. al.
9  *  CODE-OCEC-09-009. All rights reserved.
10  *
11  *  This file is part of Slurm, a resource management program.
12  *  For details, see <https://slurm.schedmd.com/>.
13  *  Please also read the included file: DISCLAIMER.
14  *
15  *  Slurm is free software; you can redistribute it and/or modify it under
16  *  the terms of the GNU General Public License as published by the Free
17  *  Software Foundation; either version 2 of the License, or (at your option)
18  *  any later version.
19  *
20  *  In addition, as a special exception, the copyright holders give permission
21  *  to link the code of portions of this program with the OpenSSL library under
22  *  certain conditions as described in each individual source file, and
23  *  distribute linked combinations including the two. You must obey the GNU
24  *  General Public License in all respects for all of the code used other than
25  *  OpenSSL. If you modify file(s) with this exception, you may extend this
26  *  exception to your version of the file(s), but you are not obligated to do
27  *  so. If you do not wish to do so, delete this exception statement from your
28  *  version.  If you delete this exception statement from all source files in
29  *  the program, then also delete it here.
30  *
31  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
32  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
33  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
34  *  details.
35  *
36  *  You should have received a copy of the GNU General Public License along
37  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
38  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
39 \*****************************************************************************/
40 
41 #include <ctype.h>
42 #include <errno.h>
43 #include <grp.h>
44 #include <pwd.h>
45 #include <stdio.h>
46 #include <stdlib.h>
47 #include <string.h>
48 #include <sys/param.h>
49 #include <sys/types.h>
50 #include <sys/wait.h>
51 #include <time.h>
52 
53 #include "slurm/slurm.h"
54 #include "slurm/slurmdb.h"
55 #include "slurm/slurm_errno.h"
56 
57 #include "src/common/cpu_frequency.h"
58 #include "src/common/forward.h"
59 #include "src/common/macros.h"
60 #include "src/common/node_select.h"
61 #include "src/common/parse_time.h"
62 #include "src/common/proc_args.h"
63 #include "src/common/slurm_auth.h"
64 #include "src/common/slurm_protocol_api.h"
65 #include "src/common/strlcpy.h"
66 #include "src/common/uid.h"
67 #include "src/common/uthash/uthash.h"
68 #include "src/common/xmalloc.h"
69 #include "src/common/xstring.h"
70 
71 /* Use a hash table to identify duplicate job records across the clusters in
72  * a federation */
73 #define JOB_HASH_SIZE 1000
74 
75 /* Data structures for pthreads used to gather job information from multiple
76  * clusters in parallel */
77 typedef struct load_job_req_struct {
78 	slurmdb_cluster_rec_t *cluster;
79 	bool local_cluster;
80 	slurm_msg_t *req_msg;
81 	List resp_msg_list;
82 } load_job_req_struct_t;
83 
84 typedef struct load_job_resp_struct {
85 	job_info_msg_t *new_msg;
86 } load_job_resp_struct_t;
87 
88 typedef struct load_job_prio_resp_struct {
89 	bool local_cluster;
90 	priority_factors_response_msg_t *new_msg;
91 } load_job_prio_resp_struct_t;
92 
93 static pthread_mutex_t job_node_info_lock = PTHREAD_MUTEX_INITIALIZER;
94 static node_info_msg_t *job_node_ptr = NULL;
95 
96 /* This set of functions loads/free node information so that we can map a job's
97  * core bitmap to it's CPU IDs based upon the thread count on each node. */
_load_node_info(void)98 static void _load_node_info(void)
99 {
100 	slurm_mutex_lock(&job_node_info_lock);
101 	if (!job_node_ptr)
102 		(void) slurm_load_node((time_t) NULL, &job_node_ptr, 0);
103 	slurm_mutex_unlock(&job_node_info_lock);
104 }
105 
_threads_per_core(char * host)106 static uint32_t _threads_per_core(char *host)
107 {
108 	uint32_t i, threads = 1;
109 
110 	if (!job_node_ptr || !host)
111 		return threads;
112 
113 	slurm_mutex_lock(&job_node_info_lock);
114 	for (i = 0; i < job_node_ptr->record_count; i++) {
115 		if (job_node_ptr->node_array[i].name &&
116 		    !xstrcmp(host, job_node_ptr->node_array[i].name)) {
117 			threads = job_node_ptr->node_array[i].threads;
118 			break;
119 		}
120 	}
121 	slurm_mutex_unlock(&job_node_info_lock);
122 	return threads;
123 }
_free_node_info(void)124 static void _free_node_info(void)
125 {
126 #if 0
127 	slurm_mutex_lock(&job_node_info_lock);
128 	if (job_node_ptr) {
129 		slurm_free_node_info_msg(job_node_ptr);
130 		job_node_ptr = NULL;
131 	}
132 	slurm_mutex_unlock(&job_node_info_lock);
133 #endif
134 }
135 
136 /* Perform file name substitutions
137  * %A - Job array's master job allocation number.
138  * %a - Job array ID (index) number.
139  * %j - Job ID
140  * %u - User name
141  * %x - Job name
142  */
_fname_format(char * buf,int buf_size,job_info_t * job_ptr,char * fname)143 static void _fname_format(char *buf, int buf_size, job_info_t * job_ptr,
144 			  char *fname)
145 {
146 	char *ptr, *tmp, *tmp2 = NULL, *user;
147 
148 	tmp = xstrdup(fname);
149 	while ((ptr = strstr(tmp, "%A"))) {	/* Array job ID */
150 		ptr[0] = '\0';
151 		if (job_ptr->array_task_id == NO_VAL) {
152 			/* Not a job array */
153 			xstrfmtcat(tmp2, "%s%u%s", tmp, job_ptr->job_id, ptr+2);
154 		} else {
155 			xstrfmtcat(tmp2, "%s%u%s", tmp, job_ptr->array_job_id,
156 				   ptr+2);
157 		}
158 		xfree(tmp);	/* transfer the results */
159 		tmp = tmp2;
160 		tmp2 = NULL;
161 	}
162 	while ((ptr = strstr(tmp, "%a"))) {	/* Array task ID */
163 		ptr[0] = '\0';
164 		xstrfmtcat(tmp2, "%s%u%s", tmp, job_ptr->array_task_id, ptr+2);
165 		xfree(tmp);	/* transfer the results */
166 		tmp = tmp2;
167 		tmp2 = NULL;
168 	}
169 	while ((ptr = strstr(tmp, "%j"))) {	/* Job ID */
170 		ptr[0] = '\0';
171 		xstrfmtcat(tmp2, "%s%u%s", tmp, job_ptr->job_id, ptr+2);
172 		xfree(tmp);	/* transfer the results */
173 		tmp = tmp2;
174 		tmp2 = NULL;
175 	}
176 	while ((ptr = strstr(tmp, "%u"))) {	/* User name */
177 		ptr[0] = '\0';
178 		user = uid_to_string((uid_t) job_ptr->user_id);
179 		xstrfmtcat(tmp2, "%s%s%s", tmp, user, ptr+2);
180 		xfree(user);
181 		xfree(tmp);	/* transfer the results */
182 		tmp = tmp2;
183 		tmp2 = NULL;
184 	}
185 	xstrsubstituteall(tmp, "%x", job_ptr->name);	/* Job name */
186 
187 	if (tmp[0] == '/')
188 		snprintf(buf, buf_size, "%s", tmp);
189 	else
190 		snprintf(buf, buf_size, "%s/%s", job_ptr->work_dir, tmp);
191 	xfree(tmp);
192 }
193 
194 /* Given a job record pointer, return its stderr path in buf */
slurm_get_job_stderr(char * buf,int buf_size,job_info_t * job_ptr)195 extern void slurm_get_job_stderr(char *buf, int buf_size, job_info_t * job_ptr)
196 {
197 	if (job_ptr == NULL)
198 		snprintf(buf, buf_size, "%s", "job pointer is NULL");
199 	else if (job_ptr->std_err)
200 		_fname_format(buf, buf_size, job_ptr, job_ptr->std_err);
201 	else if (job_ptr->batch_flag == 0)
202 		snprintf(buf, buf_size, "%s", "");
203 	else if (job_ptr->std_out)
204 		_fname_format(buf, buf_size, job_ptr, job_ptr->std_out);
205 	else if (job_ptr->array_job_id) {
206 		snprintf(buf, buf_size, "%s/slurm-%u_%u.out",
207 			 job_ptr->work_dir,
208 			 job_ptr->array_job_id, job_ptr->array_task_id);
209 	} else {
210 		snprintf(buf, buf_size, "%s/slurm-%u.out",
211 			 job_ptr->work_dir, job_ptr->job_id);
212 	}
213 }
214 
215 /* Given a job record pointer, return its stdin path in buf */
slurm_get_job_stdin(char * buf,int buf_size,job_info_t * job_ptr)216 extern void slurm_get_job_stdin(char *buf, int buf_size, job_info_t * job_ptr)
217 {
218 	if (job_ptr == NULL)
219 		snprintf(buf, buf_size, "%s", "job pointer is NULL");
220 	else if (job_ptr->std_in)
221 		_fname_format(buf, buf_size, job_ptr, job_ptr->std_in);
222 	else if (job_ptr->batch_flag == 0)
223 		snprintf(buf, buf_size, "%s", "");
224 	else
225 		snprintf(buf, buf_size, "%s", "/dev/null");
226 }
227 
228 /* Given a job record pointer, return its stdout path in buf */
slurm_get_job_stdout(char * buf,int buf_size,job_info_t * job_ptr)229 extern void slurm_get_job_stdout(char *buf, int buf_size, job_info_t * job_ptr)
230 {
231 	if (job_ptr == NULL)
232 		snprintf(buf, buf_size, "%s", "job pointer is NULL");
233 	else if (job_ptr->std_out)
234 		_fname_format(buf, buf_size, job_ptr, job_ptr->std_out);
235 	else if (job_ptr->batch_flag == 0)
236 		snprintf(buf, buf_size, "%s", "");
237 	else if (job_ptr->array_job_id) {
238 		snprintf(buf, buf_size, "%s/slurm-%u_%u.out",
239 			 job_ptr->work_dir,
240 			 job_ptr->array_job_id, job_ptr->array_task_id);
241 	} else {
242 		snprintf(buf, buf_size, "%s/slurm-%u.out",
243 			 job_ptr->work_dir, job_ptr->job_id);
244 	}
245 }
246 
247 /*
248  * slurm_xlate_job_id - Translate a Slurm job ID string into a slurm job ID
249  *	number. If this job ID contains an array index, map this to the
250  *	equivalent Slurm job ID number (e.g. "123_2" to 124)
251  *
252  * IN job_id_str - String containing a single job ID number
253  * RET - equivalent job ID number or 0 on error
254  */
slurm_xlate_job_id(char * job_id_str)255 extern uint32_t slurm_xlate_job_id(char *job_id_str)
256 {
257 	char *next_str;
258 	uint32_t i, job_id;
259 	uint16_t array_id;
260 	job_info_msg_t *resp = NULL;
261 	slurm_job_info_t *job_ptr;
262 
263 	job_id = (uint32_t) strtol(job_id_str, &next_str, 10);
264 	if (next_str[0] == '\0')
265 		return job_id;
266 	if (next_str[0] != '_')
267 		return (uint32_t) 0;
268 	array_id = (uint16_t) strtol(next_str + 1, &next_str, 10);
269 	if (next_str[0] != '\0')
270 		return (uint32_t) 0;
271 	if ((slurm_load_job(&resp, job_id, SHOW_ALL) != 0) || (resp == NULL))
272 		return (uint32_t) 0;
273 	job_id = 0;
274 	for (i = 0, job_ptr = resp->job_array; i < resp->record_count;
275 	     i++, job_ptr++) {
276 		if (job_ptr->array_task_id == array_id) {
277 			job_id = job_ptr->job_id;
278 			break;
279 		}
280 	}
281 	slurm_free_job_info_msg(resp);
282 	return job_id;
283 }
284 
285 /*
286  * slurm_print_job_info_msg - output information about all Slurm
287  *	jobs based upon message as loaded using slurm_load_jobs
288  * IN out - file to write to
289  * IN job_info_msg_ptr - job information message pointer
290  * IN one_liner - print as a single line if true
291  */
292 extern void
slurm_print_job_info_msg(FILE * out,job_info_msg_t * jinfo,int one_liner)293 slurm_print_job_info_msg ( FILE* out, job_info_msg_t *jinfo, int one_liner )
294 {
295 	int i;
296 	job_info_t *job_ptr = jinfo->job_array;
297 	char time_str[32];
298 
299 	slurm_make_time_str ((time_t *)&jinfo->last_update, time_str,
300 		sizeof(time_str));
301 	fprintf( out, "Job data as of %s, record count %d\n",
302 		 time_str, jinfo->record_count);
303 
304 	for (i = 0; i < jinfo->record_count; i++)
305 		slurm_print_job_info(out, &job_ptr[i], one_liner);
306 }
307 
_sprint_range(char * str,uint32_t str_size,uint32_t lower,uint32_t upper)308 static void _sprint_range(char *str, uint32_t str_size,
309 			  uint32_t lower, uint32_t upper)
310 {
311 	if (upper > 0)
312 		snprintf(str, str_size, "%u-%u", lower, upper);
313 	else
314 		snprintf(str, str_size, "%u", lower);
315 }
316 
317 /*
318  * slurm_print_job_info - output information about a specific Slurm
319  *	job based upon message as loaded using slurm_load_jobs
320  * IN out - file to write to
321  * IN job_ptr - an individual job information record pointer
322  * IN one_liner - print as a single line if true
323  */
324 extern void
slurm_print_job_info(FILE * out,job_info_t * job_ptr,int one_liner)325 slurm_print_job_info ( FILE* out, job_info_t * job_ptr, int one_liner )
326 {
327 	char *print_this;
328 
329 	_load_node_info();
330 	if ((print_this = slurm_sprint_job_info(job_ptr, one_liner))) {
331 		fprintf(out, "%s", print_this);
332 		xfree(print_this);
333 	}
334 	_free_node_info();
335 }
336 
337 /*
338  * slurm_sprint_job_info - output information about a specific Slurm
339  *	job based upon message as loaded using slurm_load_jobs
340  * IN job_ptr - an individual job information record pointer
341  * IN one_liner - print as a single line if true
342  * RET out - char * containing formatted output (must be freed after call)
343  *           NULL is returned on failure.
344  */
345 extern char *
slurm_sprint_job_info(job_info_t * job_ptr,int one_liner)346 slurm_sprint_job_info ( job_info_t * job_ptr, int one_liner )
347 {
348 	int i, j, k;
349 	char time_str[32], *group_name, *user_name;
350 	char *gres_last = "", tmp1[128], tmp2[128];
351 	char *tmp6_ptr;
352 	char tmp_line[1024 * 128];
353 	char tmp_path[MAXPATHLEN];
354 	uint16_t exit_status = 0, term_sig = 0;
355 	job_resources_t *job_resrcs = job_ptr->job_resrcs;
356 	char *out = NULL;
357 	time_t run_time;
358 	uint32_t min_nodes, max_nodes = 0;
359 	char *nodelist = "NodeList";
360 	bitstr_t *cpu_bitmap;
361 	char *host;
362 	int sock_inx, sock_reps, last;
363 	int abs_node_inx, rel_node_inx;
364 	int64_t nice;
365 	int bit_inx, bit_reps;
366 	uint64_t *last_mem_alloc_ptr = NULL;
367 	uint64_t last_mem_alloc = NO_VAL64;
368 	char *last_hosts;
369 	hostlist_t hl, hl_last;
370 	uint32_t threads;
371 	char *line_end = (one_liner) ? " " : "\n   ";
372 
373 	if (job_ptr->job_id == 0)	/* Duplicated sibling job record */
374 		return NULL;
375 
376 	/****** Line 1 ******/
377 	xstrfmtcat(out, "JobId=%u ", job_ptr->job_id);
378 
379 	if (job_ptr->array_job_id) {
380 		if (job_ptr->array_task_str) {
381 			xstrfmtcat(out, "ArrayJobId=%u ArrayTaskId=%s ",
382 				   job_ptr->array_job_id,
383 				   job_ptr->array_task_str);
384 		} else {
385 			xstrfmtcat(out, "ArrayJobId=%u ArrayTaskId=%u ",
386 				   job_ptr->array_job_id,
387 				   job_ptr->array_task_id);
388 		}
389 		if (job_ptr->array_max_tasks) {
390 			xstrfmtcat(out, "ArrayTaskThrottle=%u ",
391 				   job_ptr->array_max_tasks);
392 		}
393 	} else if (job_ptr->het_job_id) {
394 		xstrfmtcat(out, "HetJobId=%u HetJobOffset=%u ",
395 			   job_ptr->het_job_id, job_ptr->het_job_offset);
396 	}
397 	xstrfmtcat(out, "JobName=%s", job_ptr->name);
398 	xstrcat(out, line_end);
399 
400 	/****** Line ******/
401 	if (job_ptr->het_job_id_set) {
402 		xstrfmtcat(out, "HetJobIdSet=%s", job_ptr->het_job_id_set);
403 		xstrcat(out, line_end);
404 	}
405 
406 	/****** Line ******/
407 	user_name = uid_to_string((uid_t) job_ptr->user_id);
408 	group_name = gid_to_string((gid_t) job_ptr->group_id);
409 	xstrfmtcat(out, "UserId=%s(%u) GroupId=%s(%u) MCS_label=%s",
410 		   user_name, job_ptr->user_id, group_name, job_ptr->group_id,
411 		   (job_ptr->mcs_label==NULL) ? "N/A" : job_ptr->mcs_label);
412 	xfree(user_name);
413 	xfree(group_name);
414 	xstrcat(out, line_end);
415 
416 	/****** Line 3 ******/
417 	nice = ((int64_t)job_ptr->nice) - NICE_OFFSET;
418 	xstrfmtcat(out, "Priority=%u Nice=%"PRIi64" Account=%s QOS=%s",
419 		   job_ptr->priority, nice, job_ptr->account, job_ptr->qos);
420 	if (slurm_get_track_wckey())
421 		xstrfmtcat(out, " WCKey=%s", job_ptr->wckey);
422 	xstrcat(out, line_end);
423 
424 	/****** Line 4 ******/
425 	xstrfmtcat(out, "JobState=%s ", job_state_string(job_ptr->job_state));
426 
427 	if (job_ptr->state_desc) {
428 		/* Replace white space with underscore for easier parsing */
429 		for (j=0; job_ptr->state_desc[j]; j++) {
430 			if (isspace((int)job_ptr->state_desc[j]))
431 				job_ptr->state_desc[j] = '_';
432 		}
433 		xstrfmtcat(out, "Reason=%s ", job_ptr->state_desc);
434 	} else
435 		xstrfmtcat(out, "Reason=%s ", job_reason_string(job_ptr->state_reason));
436 
437 	xstrfmtcat(out, "Dependency=%s", job_ptr->dependency);
438 	xstrcat(out, line_end);
439 
440 	/****** Line 5 ******/
441 	xstrfmtcat(out, "Requeue=%u Restarts=%u BatchFlag=%u Reboot=%u ",
442 		 job_ptr->requeue, job_ptr->restart_cnt, job_ptr->batch_flag,
443 		 job_ptr->reboot);
444 	exit_status = term_sig = 0;
445 	if (WIFSIGNALED(job_ptr->exit_code))
446 		term_sig = WTERMSIG(job_ptr->exit_code);
447 	else if (WIFEXITED(job_ptr->exit_code))
448 		exit_status = WEXITSTATUS(job_ptr->exit_code);
449 	xstrfmtcat(out, "ExitCode=%u:%u", exit_status, term_sig);
450 	xstrcat(out, line_end);
451 
452 	/****** Line 5a (optional) ******/
453 	if (job_ptr->show_flags & SHOW_DETAIL) {
454 		exit_status = term_sig = 0;
455 		if (WIFSIGNALED(job_ptr->derived_ec))
456 			term_sig = WTERMSIG(job_ptr->derived_ec);
457 		else if (WIFEXITED(job_ptr->derived_ec))
458 			exit_status = WEXITSTATUS(job_ptr->derived_ec);
459 		xstrfmtcat(out, "DerivedExitCode=%u:%u", exit_status, term_sig);
460 		xstrcat(out, line_end);
461 	}
462 
463 	/****** Line 6 ******/
464 	if (IS_JOB_PENDING(job_ptr))
465 		run_time = 0;
466 	else if (IS_JOB_SUSPENDED(job_ptr))
467 		run_time = job_ptr->pre_sus_time;
468 	else {
469 		time_t end_time;
470 		if (IS_JOB_RUNNING(job_ptr) || (job_ptr->end_time == 0))
471 			end_time = time(NULL);
472 		else
473 			end_time = job_ptr->end_time;
474 		if (job_ptr->suspend_time) {
475 			run_time = (time_t)
476 				(difftime(end_time, job_ptr->suspend_time)
477 				 + job_ptr->pre_sus_time);
478 		} else
479 			run_time = (time_t)
480 				difftime(end_time, job_ptr->start_time);
481 	}
482 	secs2time_str(run_time, time_str, sizeof(time_str));
483 	xstrfmtcat(out, "RunTime=%s ", time_str);
484 
485 	if (job_ptr->time_limit == NO_VAL)
486 		xstrcat(out, "TimeLimit=Partition_Limit ");
487 	else {
488 		mins2time_str(job_ptr->time_limit, time_str, sizeof(time_str));
489 		xstrfmtcat(out, "TimeLimit=%s ", time_str);
490 	}
491 
492 	if (job_ptr->time_min == 0)
493 		xstrcat(out, "TimeMin=N/A");
494 	else {
495 		mins2time_str(job_ptr->time_min, time_str, sizeof(time_str));
496 		xstrfmtcat(out, "TimeMin=%s", time_str);
497 	}
498 	xstrcat(out, line_end);
499 
500 	/****** Line 7 ******/
501 	slurm_make_time_str(&job_ptr->submit_time, time_str, sizeof(time_str));
502 	xstrfmtcat(out, "SubmitTime=%s ", time_str);
503 
504 	slurm_make_time_str(&job_ptr->eligible_time, time_str, sizeof(time_str));
505 	xstrfmtcat(out, "EligibleTime=%s", time_str);
506 
507 	xstrcat(out, line_end);
508 
509 	/****** Line 7.5 ******/
510 	slurm_make_time_str(&job_ptr->accrue_time, time_str, sizeof(time_str));
511 	xstrfmtcat(out, "AccrueTime=%s", time_str);
512 
513 	xstrcat(out, line_end);
514 
515 	/****** Line 8 (optional) ******/
516 	if (job_ptr->resize_time) {
517 		slurm_make_time_str(&job_ptr->resize_time, time_str, sizeof(time_str));
518 		xstrfmtcat(out, "ResizeTime=%s", time_str);
519 		xstrcat(out, line_end);
520 	}
521 
522 	/****** Line 9 ******/
523 	slurm_make_time_str(&job_ptr->start_time, time_str, sizeof(time_str));
524 	xstrfmtcat(out, "StartTime=%s ", time_str);
525 
526 	if ((job_ptr->time_limit == INFINITE) &&
527 	    (job_ptr->end_time > time(NULL)))
528 		xstrcat(out, "EndTime=Unknown ");
529 	else {
530 		slurm_make_time_str(&job_ptr->end_time, time_str, sizeof(time_str));
531 		xstrfmtcat(out, "EndTime=%s ", time_str);
532 	}
533 
534 	if (job_ptr->deadline) {
535 		slurm_make_time_str(&job_ptr->deadline, time_str, sizeof(time_str));
536 		xstrfmtcat(out, "Deadline=%s", time_str);
537 	} else {
538 		xstrcat(out, "Deadline=N/A");
539 	}
540 
541 	xstrcat(out, line_end);
542 
543 	/****** Line ******/
544 	/*
545 	 * only print this line if preemption is enabled and job started
546 	 * 	see src/slurmctld/job_mgr.c:pack_job, 'preemptable'
547 	 */
548 	if (job_ptr->preemptable_time) {
549 		slurm_make_time_str(&job_ptr->preemptable_time,
550 				    time_str, sizeof(time_str));
551 		xstrfmtcat(out, "PreemptEligibleTime=%s ", time_str);
552 
553 		if (job_ptr->preempt_time == 0)
554 			xstrcat(out, "PreemptTime=None");
555 		else {
556 			slurm_make_time_str(&job_ptr->preempt_time, time_str,
557 					    sizeof(time_str));
558 			xstrfmtcat(out, "PreemptTime=%s", time_str);
559 		}
560 
561 		xstrcat(out, line_end);
562 	}
563 
564 	/****** Line 10 ******/
565 	if (job_ptr->suspend_time) {
566 		slurm_make_time_str(&job_ptr->suspend_time, time_str, sizeof(time_str));
567 		xstrfmtcat(out, "SuspendTime=%s ", time_str);
568 	} else
569 		xstrcat(out, "SuspendTime=None ");
570 
571 	xstrfmtcat(out, "SecsPreSuspend=%ld ", (long int)job_ptr->pre_sus_time);
572 
573 	slurm_make_time_str(&job_ptr->last_sched_eval, time_str,
574 			    sizeof(time_str));
575 	xstrfmtcat(out, "LastSchedEval=%s", time_str);
576 	xstrcat(out, line_end);
577 
578 	/****** Line 11 ******/
579 	xstrfmtcat(out, "Partition=%s AllocNode:Sid=%s:%u",
580 		   job_ptr->partition, job_ptr->alloc_node, job_ptr->alloc_sid);
581 	xstrcat(out, line_end);
582 
583 	/****** Line 12 ******/
584 	xstrfmtcat(out, "Req%s=%s Exc%s=%s", nodelist, job_ptr->req_nodes,
585 		   nodelist, job_ptr->exc_nodes);
586 	xstrcat(out, line_end);
587 
588 	/****** Line 13 ******/
589 	xstrfmtcat(out, "%s=%s", nodelist, job_ptr->nodes);
590 
591 	if (job_ptr->sched_nodes)
592 		xstrfmtcat(out, " Sched%s=%s", nodelist, job_ptr->sched_nodes);
593 
594 	xstrcat(out, line_end);
595 
596 	/****** Line 14 (optional) ******/
597 	if (job_ptr->batch_features)
598 		xstrfmtcat(out, "BatchFeatures=%s", job_ptr->batch_features);
599 	if (job_ptr->batch_host) {
600 		char *sep = "";
601 		if (job_ptr->batch_features)
602 			sep = " ";
603 		xstrfmtcat(out, "%sBatchHost=%s", sep, job_ptr->batch_host);
604 	}
605 	if (job_ptr->batch_features || job_ptr->batch_host)
606 		xstrcat(out, line_end);
607 
608 	/****** Line 14a (optional) ******/
609 	if (job_ptr->fed_siblings_active || job_ptr->fed_siblings_viable) {
610 		xstrfmtcat(out, "FedOrigin=%s FedViableSiblings=%s FedActiveSiblings=%s",
611 			   job_ptr->fed_origin_str,
612 			   job_ptr->fed_siblings_viable_str,
613 			   job_ptr->fed_siblings_active_str);
614 		xstrcat(out, line_end);
615 	}
616 
617 	/****** Line 15 ******/
618 	if (IS_JOB_PENDING(job_ptr)) {
619 		min_nodes = job_ptr->num_nodes;
620 		max_nodes = job_ptr->max_nodes;
621 		if (max_nodes && (max_nodes < min_nodes))
622 			min_nodes = max_nodes;
623 	} else {
624 		min_nodes = job_ptr->num_nodes;
625 		max_nodes = 0;
626 	}
627 
628 	_sprint_range(tmp_line, sizeof(tmp_line), min_nodes, max_nodes);
629 	xstrfmtcat(out, "NumNodes=%s ", tmp_line);
630 	_sprint_range(tmp_line, sizeof(tmp_line), job_ptr->num_cpus, job_ptr->max_cpus);
631 	xstrfmtcat(out, "NumCPUs=%s ", tmp_line);
632 
633 	xstrfmtcat(out, "NumTasks=%u ", job_ptr->num_tasks);
634 	if (job_ptr->cpus_per_task == NO_VAL16)
635 		xstrfmtcat(out, "CPUs/Task=N/A ");
636 	else
637 		xstrfmtcat(out, "CPUs/Task=%u ", job_ptr->cpus_per_task);
638 
639 	if (job_ptr->boards_per_node == NO_VAL16)
640 		xstrcat(out, "ReqB:S:C:T=*:");
641 	else
642 		xstrfmtcat(out, "ReqB:S:C:T=%u:", job_ptr->boards_per_node);
643 
644 	if (job_ptr->sockets_per_board == NO_VAL16)
645 		xstrcat(out, "*:");
646 	else
647 		xstrfmtcat(out, "%u:", job_ptr->sockets_per_board);
648 
649 	if (job_ptr->cores_per_socket == NO_VAL16)
650 		xstrcat(out, "*:");
651 	else
652 		xstrfmtcat(out, "%u:", job_ptr->cores_per_socket);
653 
654 	if (job_ptr->threads_per_core == NO_VAL16)
655 		xstrcat(out, "*");
656 	else
657 		xstrfmtcat(out, "%u", job_ptr->threads_per_core);
658 
659 	xstrcat(out, line_end);
660 
661 	/****** Line 16 ******/
662 	/* Tres should already of been converted at this point from simple */
663 	xstrfmtcat(out, "TRES=%s",
664 		   job_ptr->tres_alloc_str ? job_ptr->tres_alloc_str
665 					   : job_ptr->tres_req_str);
666 	xstrcat(out, line_end);
667 
668 	/****** Line 17 ******/
669 	if (job_ptr->sockets_per_node == NO_VAL16)
670 		xstrcat(out, "Socks/Node=* ");
671 	else
672 		xstrfmtcat(out, "Socks/Node=%u ", job_ptr->sockets_per_node);
673 
674 	if (job_ptr->ntasks_per_node == NO_VAL16)
675 		xstrcat(out, "NtasksPerN:B:S:C=*:");
676 	else
677 		xstrfmtcat(out, "NtasksPerN:B:S:C=%u:", job_ptr->ntasks_per_node);
678 
679 	if (job_ptr->ntasks_per_board == NO_VAL16)
680 		xstrcat(out, "*:");
681 	else
682 		xstrfmtcat(out, "%u:", job_ptr->ntasks_per_board);
683 
684 	if ((job_ptr->ntasks_per_socket == NO_VAL16) ||
685 	    (job_ptr->ntasks_per_socket == INFINITE16))
686 		xstrcat(out, "*:");
687 	else
688 		xstrfmtcat(out, "%u:", job_ptr->ntasks_per_socket);
689 
690 	if ((job_ptr->ntasks_per_core == NO_VAL16) ||
691 	    (job_ptr->ntasks_per_core == INFINITE16))
692 		xstrcat(out, "* ");
693 	else
694 		xstrfmtcat(out, "%u ", job_ptr->ntasks_per_core);
695 
696 	if (job_ptr->core_spec == NO_VAL16)
697 		xstrcat(out, "CoreSpec=*");
698 	else if (job_ptr->core_spec & CORE_SPEC_THREAD)
699 		xstrfmtcat(out, "ThreadSpec=%d",
700 			   (job_ptr->core_spec & (~CORE_SPEC_THREAD)));
701 	else
702 		xstrfmtcat(out, "CoreSpec=%u", job_ptr->core_spec);
703 
704 	xstrcat(out, line_end);
705 
706 	if (job_resrcs && job_resrcs->core_bitmap &&
707 	    ((last = bit_fls(job_resrcs->core_bitmap)) != -1)) {
708 
709 		xstrfmtcat(out, "JOB_GRES=%s", job_ptr->gres_total);
710 		xstrcat(out, line_end);
711 
712 		hl = hostlist_create(job_resrcs->nodes);
713 		if (!hl) {
714 			error("slurm_sprint_job_info: hostlist_create: %s",
715 			      job_resrcs->nodes);
716 			return NULL;
717 		}
718 		hl_last = hostlist_create(NULL);
719 		if (!hl_last) {
720 			error("slurm_sprint_job_info: hostlist_create: NULL");
721 			hostlist_destroy(hl);
722 			return NULL;
723 		}
724 
725 		bit_inx = 0;
726 		i = sock_inx = sock_reps = 0;
727 		abs_node_inx = job_ptr->node_inx[i];
728 
729 		gres_last = "";
730 		/* tmp1[] stores the current cpu(s) allocated */
731 		tmp2[0] = '\0';	/* stores last cpu(s) allocated */
732 		for (rel_node_inx=0; rel_node_inx < job_resrcs->nhosts;
733 		     rel_node_inx++) {
734 
735 			if (sock_reps >=
736 			    job_resrcs->sock_core_rep_count[sock_inx]) {
737 				sock_inx++;
738 				sock_reps = 0;
739 			}
740 			sock_reps++;
741 
742 			bit_reps = job_resrcs->sockets_per_node[sock_inx] *
743 				   job_resrcs->cores_per_socket[sock_inx];
744 			host = hostlist_shift(hl);
745 			threads = _threads_per_core(host);
746 			cpu_bitmap = bit_alloc(bit_reps * threads);
747 			for (j = 0; j < bit_reps; j++) {
748 				if (bit_test(job_resrcs->core_bitmap, bit_inx)){
749 					for (k = 0; k < threads; k++)
750 						bit_set(cpu_bitmap,
751 							(j * threads) + k);
752 				}
753 				bit_inx++;
754 			}
755 			bit_fmt(tmp1, sizeof(tmp1), cpu_bitmap);
756 			FREE_NULL_BITMAP(cpu_bitmap);
757 			/*
758 			 * If the allocation values for this host are not the
759 			 * same as the last host, print the report of the last
760 			 * group of hosts that had identical allocation values.
761 			 */
762 			if (xstrcmp(tmp1, tmp2) ||
763 			    ((rel_node_inx < job_ptr->gres_detail_cnt) &&
764 			     xstrcmp(job_ptr->gres_detail_str[rel_node_inx],
765 				     gres_last)) ||
766 			    (last_mem_alloc_ptr != job_resrcs->memory_allocated) ||
767 			    (job_resrcs->memory_allocated &&
768 			     (last_mem_alloc !=
769 			      job_resrcs->memory_allocated[rel_node_inx]))) {
770 				if (hostlist_count(hl_last)) {
771 					last_hosts =
772 						hostlist_ranged_string_xmalloc(
773 						hl_last);
774 					xstrfmtcat(out,
775 						   "  Nodes=%s CPU_IDs=%s "
776 						   "Mem=%"PRIu64" GRES=%s",
777 						   last_hosts, tmp2,
778 						   last_mem_alloc_ptr ?
779 						   last_mem_alloc : 0,
780 						   gres_last);
781 					xfree(last_hosts);
782 					xstrcat(out, line_end);
783 
784 					hostlist_destroy(hl_last);
785 					hl_last = hostlist_create(NULL);
786 				}
787 
788 				strcpy(tmp2, tmp1);
789 				if (rel_node_inx < job_ptr->gres_detail_cnt) {
790 					gres_last = job_ptr->
791 						    gres_detail_str[rel_node_inx];
792 				} else {
793 					gres_last = "";
794 				}
795 				last_mem_alloc_ptr = job_resrcs->memory_allocated;
796 				if (last_mem_alloc_ptr)
797 					last_mem_alloc = job_resrcs->
798 						memory_allocated[rel_node_inx];
799 				else
800 					last_mem_alloc = NO_VAL64;
801 			}
802 			hostlist_push_host(hl_last, host);
803 			free(host);
804 
805 			if (bit_inx > last)
806 				break;
807 
808 			if (abs_node_inx > job_ptr->node_inx[i+1]) {
809 				i += 2;
810 				abs_node_inx = job_ptr->node_inx[i];
811 			} else {
812 				abs_node_inx++;
813 			}
814 		}
815 
816 		if (hostlist_count(hl_last)) {
817 			last_hosts = hostlist_ranged_string_xmalloc(hl_last);
818 			xstrfmtcat(out, "  Nodes=%s CPU_IDs=%s Mem=%"PRIu64" GRES=%s",
819 				 last_hosts, tmp2,
820 				 last_mem_alloc_ptr ? last_mem_alloc : 0,
821 				 gres_last);
822 			xfree(last_hosts);
823 			xstrcat(out, line_end);
824 		}
825 		hostlist_destroy(hl);
826 		hostlist_destroy(hl_last);
827 	}
828 	/****** Line 18 ******/
829 	if (job_ptr->pn_min_memory & MEM_PER_CPU) {
830 		job_ptr->pn_min_memory &= (~MEM_PER_CPU);
831 		tmp6_ptr = "CPU";
832 	} else
833 		tmp6_ptr = "Node";
834 
835 	xstrfmtcat(out, "MinCPUsNode=%u ", job_ptr->pn_min_cpus);
836 
837 	convert_num_unit((float)job_ptr->pn_min_memory, tmp1, sizeof(tmp1),
838 			 UNIT_MEGA, NO_VAL, CONVERT_NUM_UNIT_EXACT);
839 	convert_num_unit((float)job_ptr->pn_min_tmp_disk, tmp2, sizeof(tmp2),
840 			 UNIT_MEGA, NO_VAL, CONVERT_NUM_UNIT_EXACT);
841 	xstrfmtcat(out, "MinMemory%s=%s MinTmpDiskNode=%s", tmp6_ptr, tmp1, tmp2);
842 	xstrcat(out, line_end);
843 
844 	/****** Line ******/
845 	secs2time_str((time_t)job_ptr->delay_boot, tmp1, sizeof(tmp1));
846 	xstrfmtcat(out, "Features=%s DelayBoot=%s", job_ptr->features, tmp1);
847 	xstrcat(out, line_end);
848 
849 	/****** Line (optional) ******/
850 	if (job_ptr->cluster_features) {
851 		xstrfmtcat(out, "ClusterFeatures=%s",
852 			   job_ptr->cluster_features);
853 		xstrcat(out, line_end);
854 	}
855 
856 	/****** Line (optional) ******/
857 	if (job_ptr->resv_name) {
858 		xstrfmtcat(out, "Reservation=%s", job_ptr->resv_name);
859 		xstrcat(out, line_end);
860 	}
861 
862 	/****** Line 20 ******/
863 	xstrfmtcat(out, "OverSubscribe=%s Contiguous=%d Licenses=%s Network=%s",
864 		   job_share_string(job_ptr->shared), job_ptr->contiguous,
865 		   job_ptr->licenses, job_ptr->network);
866 	xstrcat(out, line_end);
867 
868 	/****** Line 21 ******/
869 	xstrfmtcat(out, "Command=%s", job_ptr->command);
870 	xstrcat(out, line_end);
871 
872 	/****** Line 22 ******/
873 	xstrfmtcat(out, "WorkDir=%s", job_ptr->work_dir);
874 
875 	/****** Line (optional) ******/
876 	if (job_ptr->admin_comment) {
877 		xstrcat(out, line_end);
878 		xstrfmtcat(out, "AdminComment=%s ", job_ptr->admin_comment);
879 	}
880 
881 	/****** Line (optional) ******/
882 	if (job_ptr->system_comment) {
883 		xstrcat(out, line_end);
884 		xstrfmtcat(out, "SystemComment=%s ", job_ptr->system_comment);
885 	}
886 
887 	/****** Line (optional) ******/
888 	if (job_ptr->comment) {
889 		xstrcat(out, line_end);
890 		xstrfmtcat(out, "Comment=%s ", job_ptr->comment);
891 	}
892 
893 	/****** Line 30 (optional) ******/
894 	if (job_ptr->batch_flag) {
895 		xstrcat(out, line_end);
896 		slurm_get_job_stderr(tmp_path, sizeof(tmp_path), job_ptr);
897 		xstrfmtcat(out, "StdErr=%s", tmp_path);
898 	}
899 
900 	/****** Line 31 (optional) ******/
901 	if (job_ptr->batch_flag) {
902 		xstrcat(out, line_end);
903 		slurm_get_job_stdin(tmp_path, sizeof(tmp_path), job_ptr);
904 		xstrfmtcat(out, "StdIn=%s", tmp_path);
905 	}
906 
907 	/****** Line 32 (optional) ******/
908 	if (job_ptr->batch_flag) {
909 		xstrcat(out, line_end);
910 		slurm_get_job_stdout(tmp_path, sizeof(tmp_path), job_ptr);
911 		xstrfmtcat(out, "StdOut=%s", tmp_path);
912 	}
913 
914 	/****** Line 34 (optional) ******/
915 	if (job_ptr->req_switch) {
916 		char time_buf[32];
917 		xstrcat(out, line_end);
918 		secs2time_str((time_t) job_ptr->wait4switch, time_buf,
919 			      sizeof(time_buf));
920 		xstrfmtcat(out, "Switches=%u@%s", job_ptr->req_switch, time_buf);
921 	}
922 
923 	/****** Line 35 (optional) ******/
924 	if (job_ptr->burst_buffer) {
925 		xstrcat(out, line_end);
926 		xstrfmtcat(out, "BurstBuffer=%s", job_ptr->burst_buffer);
927 	}
928 
929 	/****** Line (optional) ******/
930 	if (job_ptr->burst_buffer_state) {
931 		xstrcat(out, line_end);
932 		xstrfmtcat(out, "BurstBufferState=%s",
933 			   job_ptr->burst_buffer_state);
934 	}
935 
936 	/****** Line 36 (optional) ******/
937 	if (cpu_freq_debug(NULL, NULL, tmp1, sizeof(tmp1),
938 			   job_ptr->cpu_freq_gov, job_ptr->cpu_freq_min,
939 			   job_ptr->cpu_freq_max, NO_VAL) != 0) {
940 		xstrcat(out, line_end);
941 		xstrcat(out, tmp1);
942 	}
943 
944 	/****** Line 37 ******/
945 	xstrcat(out, line_end);
946 	xstrfmtcat(out, "Power=%s", power_flags_str(job_ptr->power_flags));
947 
948 	/****** Line 38 (optional) ******/
949 	if (job_ptr->bitflags &
950 	    (GRES_DISABLE_BIND | GRES_ENFORCE_BIND | KILL_INV_DEP |
951 	     NO_KILL_INV_DEP | SPREAD_JOB)) {
952 		xstrcat(out, line_end);
953 		if (job_ptr->bitflags & GRES_DISABLE_BIND)
954 			xstrcat(out, "GresEnforceBind=No");
955 		if (job_ptr->bitflags & GRES_ENFORCE_BIND)
956 			xstrcat(out, "GresEnforceBind=Yes");
957 		if (job_ptr->bitflags & KILL_INV_DEP)
958 			xstrcat(out, "KillOInInvalidDependent=Yes");
959 		if (job_ptr->bitflags & NO_KILL_INV_DEP)
960 			xstrcat(out, "KillOInInvalidDependent=No");
961 		if (job_ptr->bitflags & SPREAD_JOB)
962 			xstrcat(out, "SpreadJob=Yes");
963 	}
964 
965 	/****** Line (optional) ******/
966 	if (job_ptr->cpus_per_tres) {
967 		xstrcat(out, line_end);
968 		xstrfmtcat(out, "CpusPerTres=%s", job_ptr->cpus_per_tres);
969 	}
970 
971 	/****** Line (optional) ******/
972 	if (job_ptr->mem_per_tres) {
973 		xstrcat(out, line_end);
974 		xstrfmtcat(out, "MemPerTres=%s", job_ptr->mem_per_tres);
975 	}
976 
977 	/****** Line (optional) ******/
978 	if (job_ptr->tres_bind) {
979 		xstrcat(out, line_end);
980 		xstrfmtcat(out, "TresBind=%s", job_ptr->tres_bind);
981 	}
982 
983 	/****** Line (optional) ******/
984 	if (job_ptr->tres_freq) {
985 		xstrcat(out, line_end);
986 		xstrfmtcat(out, "TresFreq=%s", job_ptr->tres_freq);
987 	}
988 
989 	/****** Line (optional) ******/
990 	if (job_ptr->tres_per_job) {
991 		xstrcat(out, line_end);
992 		xstrfmtcat(out, "TresPerJob=%s", job_ptr->tres_per_job);
993 	}
994 
995 	/****** Line (optional) ******/
996 	if (job_ptr->tres_per_node) {
997 		xstrcat(out, line_end);
998 		xstrfmtcat(out, "TresPerNode=%s", job_ptr->tres_per_node);
999 	}
1000 
1001 	/****** Line (optional) ******/
1002 	if (job_ptr->tres_per_socket) {
1003 		xstrcat(out, line_end);
1004 		xstrfmtcat(out, "TresPerSocket=%s", job_ptr->tres_per_socket);
1005 	}
1006 
1007 	/****** Line (optional) ******/
1008 	if (job_ptr->tres_per_task) {
1009 		xstrcat(out, line_end);
1010 		xstrfmtcat(out, "TresPerTask=%s", job_ptr->tres_per_task);
1011 	}
1012 
1013 	/****** Line ******/
1014 	xstrcat(out, line_end);
1015 	xstrfmtcat(out, "MailUser=%s MailType=%s",
1016 		   job_ptr->mail_user,
1017 		   print_mail_type(job_ptr->mail_type));
1018 
1019 	/****** END OF JOB RECORD ******/
1020 	if (one_liner)
1021 		xstrcat(out, "\n");
1022 	else
1023 		xstrcat(out, "\n\n");
1024 
1025 	return out;
1026 }
1027 
1028 /* Return true if the specified job id is local to a cluster
1029  * (not a federated job) */
_test_local_job(uint32_t job_id)1030 static inline bool _test_local_job(uint32_t job_id)
1031 {
1032 	if ((job_id & (~MAX_JOB_ID)) == 0)
1033 		return true;
1034 	return false;
1035 }
1036 
1037 static int
_load_cluster_jobs(slurm_msg_t * req_msg,job_info_msg_t ** job_info_msg_pptr,slurmdb_cluster_rec_t * cluster)1038 _load_cluster_jobs(slurm_msg_t *req_msg, job_info_msg_t **job_info_msg_pptr,
1039 		   slurmdb_cluster_rec_t *cluster)
1040 {
1041 	slurm_msg_t resp_msg;
1042 	int rc = SLURM_SUCCESS;
1043 
1044 	slurm_msg_t_init(&resp_msg);
1045 
1046 	*job_info_msg_pptr = NULL;
1047 
1048 	if (slurm_send_recv_controller_msg(req_msg, &resp_msg, cluster) < 0)
1049 		return SLURM_ERROR;
1050 
1051 	switch (resp_msg.msg_type) {
1052 	case RESPONSE_JOB_INFO:
1053 		*job_info_msg_pptr = (job_info_msg_t *)resp_msg.data;
1054 		resp_msg.data = NULL;
1055 		break;
1056 	case RESPONSE_SLURM_RC:
1057 		rc = ((return_code_msg_t *) resp_msg.data)->return_code;
1058 		slurm_free_return_code_msg(resp_msg.data);
1059 		break;
1060 	default:
1061 		rc = SLURM_UNEXPECTED_MSG_ERROR;
1062 		break;
1063 	}
1064 	if (rc)
1065 		slurm_seterrno(rc);
1066 
1067 	return rc;
1068 }
1069 
1070 /* Thread to read job information from some cluster */
_load_job_thread(void * args)1071 static void *_load_job_thread(void *args)
1072 {
1073 	load_job_req_struct_t *load_args = (load_job_req_struct_t *) args;
1074 	slurmdb_cluster_rec_t *cluster = load_args->cluster;
1075 	job_info_msg_t *new_msg = NULL;
1076 	int rc;
1077 
1078 	if ((rc = _load_cluster_jobs(load_args->req_msg, &new_msg, cluster)) ||
1079 	    !new_msg) {
1080 		verbose("Error reading job information from cluster %s: %s",
1081 			cluster->name, slurm_strerror(rc));
1082 	} else {
1083 		load_job_resp_struct_t *job_resp;
1084 		job_resp = xmalloc(sizeof(load_job_resp_struct_t));
1085 		job_resp->new_msg = new_msg;
1086 		list_append(load_args->resp_msg_list, job_resp);
1087 	}
1088 	xfree(args);
1089 
1090 	return (void *) NULL;
1091 }
1092 
1093 
_sort_orig_clusters(const void * a,const void * b)1094 static int _sort_orig_clusters(const void *a, const void *b)
1095 {
1096 	slurm_job_info_t *job1 = (slurm_job_info_t *)a;
1097 	slurm_job_info_t *job2 = (slurm_job_info_t *)b;
1098 
1099 	if (!xstrcmp(job1->cluster, job1->fed_origin_str))
1100 		return -1;
1101 	if (!xstrcmp(job2->cluster, job2->fed_origin_str))
1102 		return 1;
1103 	return 0;
1104 }
1105 
_load_fed_jobs(slurm_msg_t * req_msg,job_info_msg_t ** job_info_msg_pptr,uint16_t show_flags,char * cluster_name,slurmdb_federation_rec_t * fed)1106 static int _load_fed_jobs(slurm_msg_t *req_msg,
1107 			  job_info_msg_t **job_info_msg_pptr,
1108 			  uint16_t show_flags, char *cluster_name,
1109 			  slurmdb_federation_rec_t *fed)
1110 {
1111 	int i, j;
1112 	load_job_resp_struct_t *job_resp;
1113 	job_info_msg_t *orig_msg = NULL, *new_msg = NULL;
1114 	uint32_t new_rec_cnt;
1115 	uint32_t hash_inx, *hash_tbl_size = NULL, **hash_job_id = NULL;
1116 	slurmdb_cluster_rec_t *cluster;
1117 	ListIterator iter;
1118 	int pthread_count = 0;
1119 	pthread_t *load_thread = 0;
1120 	load_job_req_struct_t *load_args;
1121 	List resp_msg_list;
1122 
1123 	*job_info_msg_pptr = NULL;
1124 
1125 	/* Spawn one pthread per cluster to collect job information */
1126 	resp_msg_list = list_create(NULL);
1127 	load_thread = xmalloc(sizeof(pthread_t) *
1128 			      list_count(fed->cluster_list));
1129 	iter = list_iterator_create(fed->cluster_list);
1130 	while ((cluster = (slurmdb_cluster_rec_t *) list_next(iter))) {
1131 		if ((cluster->control_host == NULL) ||
1132 		    (cluster->control_host[0] == '\0'))
1133 			continue;	/* Cluster down */
1134 
1135 		/* Only show jobs from the local cluster */
1136 		if ((show_flags & SHOW_LOCAL) &&
1137 		    xstrcmp(cluster->name, cluster_name))
1138 			continue;
1139 
1140 		load_args = xmalloc(sizeof(load_job_req_struct_t));
1141 		load_args->cluster = cluster;
1142 		load_args->req_msg = req_msg;
1143 		load_args->resp_msg_list = resp_msg_list;
1144 		slurm_thread_create(&load_thread[pthread_count],
1145 				    _load_job_thread, load_args);
1146 		pthread_count++;
1147 
1148 	}
1149 	list_iterator_destroy(iter);
1150 
1151 	/* Wait for all pthreads to complete */
1152 	for (i = 0; i < pthread_count; i++)
1153 		pthread_join(load_thread[i], NULL);
1154 	xfree(load_thread);
1155 
1156 	/* Merge the responses into a single response message */
1157 	iter = list_iterator_create(resp_msg_list);
1158 	while ((job_resp = (load_job_resp_struct_t *) list_next(iter))) {
1159 		new_msg = job_resp->new_msg;
1160 		if (!orig_msg) {
1161 			orig_msg = new_msg;
1162 			*job_info_msg_pptr = orig_msg;
1163 		} else {
1164 			/* Merge job records into a single response message */
1165 			orig_msg->last_update = MIN(orig_msg->last_update,
1166 						    new_msg->last_update);
1167 			new_rec_cnt = orig_msg->record_count +
1168 				      new_msg->record_count;
1169 			if (new_msg->record_count) {
1170 				orig_msg->job_array =
1171 					xrealloc(orig_msg->job_array,
1172 						 sizeof(slurm_job_info_t) *
1173 						 new_rec_cnt);
1174 				(void) memcpy(orig_msg->job_array +
1175 					      orig_msg->record_count,
1176 					      new_msg->job_array,
1177 					      sizeof(slurm_job_info_t) *
1178 					      new_msg->record_count);
1179 				orig_msg->record_count = new_rec_cnt;
1180 			}
1181 			xfree(new_msg->job_array);
1182 			xfree(new_msg);
1183 		}
1184 		xfree(job_resp);
1185 	}
1186 	list_iterator_destroy(iter);
1187 	FREE_NULL_LIST(resp_msg_list);
1188 
1189 	if (!orig_msg)
1190 		slurm_seterrno_ret(ESLURM_INVALID_JOB_ID);
1191 
1192 	/* Find duplicate job records and jobs local to other clusters and set
1193 	 * their job_id == 0 so they get skipped in reporting */
1194 	if ((show_flags & SHOW_SIBLING) == 0) {
1195 		hash_tbl_size = xmalloc(sizeof(uint32_t) * JOB_HASH_SIZE);
1196 		hash_job_id = xmalloc(sizeof(uint32_t *) * JOB_HASH_SIZE);
1197 		for (i = 0; i < JOB_HASH_SIZE; i++) {
1198 			hash_tbl_size[i] = 100;
1199 			hash_job_id[i] = xmalloc(sizeof(uint32_t ) *
1200 						 hash_tbl_size[i]);
1201 		}
1202 	}
1203 
1204 	/* Put the origin jobs at top and remove duplicates. */
1205 	qsort(orig_msg->job_array, orig_msg->record_count,
1206 	      sizeof(slurm_job_info_t), _sort_orig_clusters);
1207 	for (i = 0; orig_msg && i < orig_msg->record_count; i++) {
1208 		slurm_job_info_t *job_ptr = &orig_msg->job_array[i];
1209 
1210 		/*
1211 		 * Only show non-federated jobs that are local. Non-federated
1212 		 * jobs will not have a fed_origin_str.
1213 		 */
1214 		if (_test_local_job(job_ptr->job_id) &&
1215 		    !job_ptr->fed_origin_str &&
1216 		    xstrcmp(job_ptr->cluster, cluster_name)) {
1217 			job_ptr->job_id = 0;
1218 			continue;
1219 		}
1220 
1221 		if (show_flags & SHOW_SIBLING)
1222 			continue;
1223 		hash_inx = job_ptr->job_id % JOB_HASH_SIZE;
1224 		for (j = 0;
1225 		     (j < hash_tbl_size[hash_inx] && hash_job_id[hash_inx][j]);
1226 		     j++) {
1227 			if (job_ptr->job_id == hash_job_id[hash_inx][j]) {
1228 				job_ptr->job_id = 0;
1229 				break;
1230 			}
1231 		}
1232 		if (job_ptr->job_id == 0) {
1233 			continue;	/* Duplicate */
1234 		} else if (j >= hash_tbl_size[hash_inx]) {
1235 			hash_tbl_size[hash_inx] *= 2;
1236 			xrealloc(hash_job_id[hash_inx],
1237 				 sizeof(uint32_t) * hash_tbl_size[hash_inx]);
1238 		}
1239 		hash_job_id[hash_inx][j] = job_ptr->job_id;
1240 	}
1241 	if ((show_flags & SHOW_SIBLING) == 0) {
1242 		for (i = 0; i < JOB_HASH_SIZE; i++)
1243 			xfree(hash_job_id[i]);
1244 		xfree(hash_tbl_size);
1245 		xfree(hash_job_id);
1246 	}
1247 
1248 	return SLURM_SUCCESS;
1249 }
1250 
1251 /*
1252  * slurm_job_batch_script - retrieve the batch script for a given jobid
1253  * returns SLURM_SUCCESS, or appropriate error code
1254  */
slurm_job_batch_script(FILE * out,uint32_t jobid)1255 extern int slurm_job_batch_script(FILE *out, uint32_t jobid)
1256 {
1257 	job_id_msg_t msg;
1258 	slurm_msg_t req, resp;
1259 	int rc = SLURM_SUCCESS;
1260 
1261 	slurm_msg_t_init(&req);
1262 	slurm_msg_t_init(&resp);
1263 
1264 	memset(&msg, 0, sizeof(msg));
1265 	msg.job_id = jobid;
1266 	req.msg_type = REQUEST_BATCH_SCRIPT;
1267 	req.data = &msg;
1268 
1269 	if (slurm_send_recv_controller_msg(&req, &resp, working_cluster_rec) < 0)
1270 		return SLURM_ERROR;
1271 
1272 	if (resp.msg_type == RESPONSE_BATCH_SCRIPT) {
1273 		if (fprintf(out, "%s", (char *) resp.data) < 0)
1274 			rc = SLURM_ERROR;
1275 		xfree(resp.data);
1276 	} else if (resp.msg_type == RESPONSE_SLURM_RC) {
1277 		rc = ((return_code_msg_t *) resp.data)->return_code;
1278 		slurm_free_return_code_msg(resp.data);
1279 		if (rc)
1280 			slurm_seterrno_ret(rc);
1281 	} else {
1282 		rc = SLURM_ERROR;
1283 	}
1284 
1285 	return rc;
1286 }
1287 
1288 /*
1289  * slurm_load_jobs - issue RPC to get all job configuration
1290  *	information if changed since update_time
1291  * IN update_time - time of current configuration data
1292  * IN/OUT job_info_msg_pptr - place to store a job configuration pointer
1293  * IN show_flags -  job filtering option: 0, SHOW_ALL, SHOW_DETAIL or SHOW_LOCAL
1294  * RET 0 or -1 on error
1295  * NOTE: free the response using slurm_free_job_info_msg
1296  */
1297 extern int
slurm_load_jobs(time_t update_time,job_info_msg_t ** job_info_msg_pptr,uint16_t show_flags)1298 slurm_load_jobs (time_t update_time, job_info_msg_t **job_info_msg_pptr,
1299 		 uint16_t show_flags)
1300 {
1301 	slurm_msg_t req_msg;
1302 	job_info_request_msg_t req;
1303 	char *cluster_name = NULL;
1304 	void *ptr = NULL;
1305 	slurmdb_federation_rec_t *fed;
1306 	int rc;
1307 
1308 	if (working_cluster_rec)
1309 		cluster_name = xstrdup(working_cluster_rec->name);
1310 	else
1311 		cluster_name = slurm_get_cluster_name();
1312 	if ((show_flags & SHOW_FEDERATION) && !(show_flags & SHOW_LOCAL) &&
1313 	    (slurm_load_federation(&ptr) == SLURM_SUCCESS) &&
1314 	    cluster_in_federation(ptr, cluster_name)) {
1315 		/* In federation. Need full info from all clusters */
1316 		update_time = (time_t) 0;
1317 		show_flags &= (~SHOW_LOCAL);
1318 	} else {
1319 		/* Report local cluster info only */
1320 		show_flags |= SHOW_LOCAL;
1321 		show_flags &= (~SHOW_FEDERATION);
1322 	}
1323 
1324 	slurm_msg_t_init(&req_msg);
1325 	memset(&req, 0, sizeof(req));
1326 	req.last_update  = update_time;
1327 	req.show_flags   = show_flags;
1328 	req_msg.msg_type = REQUEST_JOB_INFO;
1329 	req_msg.data     = &req;
1330 
1331 	if (show_flags & SHOW_FEDERATION) {
1332 		fed = (slurmdb_federation_rec_t *) ptr;
1333 		rc = _load_fed_jobs(&req_msg, job_info_msg_pptr, show_flags,
1334 				    cluster_name, fed);
1335 	} else {
1336 		rc = _load_cluster_jobs(&req_msg, job_info_msg_pptr,
1337 					working_cluster_rec);
1338 	}
1339 
1340 	if (ptr)
1341 		slurm_destroy_federation_rec(ptr);
1342 	xfree(cluster_name);
1343 
1344 	return rc;
1345 }
1346 
1347 /*
1348  * slurm_load_job_user - issue RPC to get slurm information about all jobs
1349  *	to be run as the specified user
1350  * IN/OUT job_info_msg_pptr - place to store a job configuration pointer
1351  * IN user_id - ID of user we want information for
1352  * IN show_flags - job filtering options
1353  * RET 0 or -1 on error
1354  * NOTE: free the response using slurm_free_job_info_msg
1355  */
slurm_load_job_user(job_info_msg_t ** job_info_msg_pptr,uint32_t user_id,uint16_t show_flags)1356 extern int slurm_load_job_user (job_info_msg_t **job_info_msg_pptr,
1357 				uint32_t user_id,
1358 				uint16_t show_flags)
1359 {
1360 	slurm_msg_t req_msg;
1361 	job_user_id_msg_t req;
1362 	char *cluster_name = NULL;
1363 	void *ptr = NULL;
1364 	slurmdb_federation_rec_t *fed;
1365 	int rc;
1366 
1367 	cluster_name = slurm_get_cluster_name();
1368 	if ((show_flags & SHOW_LOCAL) == 0) {
1369 		if (slurm_load_federation(&ptr) ||
1370 		    !cluster_in_federation(ptr, cluster_name)) {
1371 			/* Not in federation */
1372 			show_flags |= SHOW_LOCAL;
1373 		}
1374 	}
1375 
1376 	slurm_msg_t_init(&req_msg);
1377 	memset(&req, 0, sizeof(req));
1378 	req.show_flags   = show_flags;
1379 	req.user_id      = user_id;
1380 	req_msg.msg_type = REQUEST_JOB_USER_INFO;
1381 	req_msg.data     = &req;
1382 
1383 	/* With -M option, working_cluster_rec is set and  we only get
1384 	 * information for that cluster */
1385 	if (working_cluster_rec || !ptr || (show_flags & SHOW_LOCAL)) {
1386 		rc = _load_cluster_jobs(&req_msg, job_info_msg_pptr,
1387 					working_cluster_rec);
1388 	} else {
1389 		fed = (slurmdb_federation_rec_t *) ptr;
1390 		rc = _load_fed_jobs(&req_msg, job_info_msg_pptr, show_flags,
1391 				    cluster_name, fed);
1392 	}
1393 
1394 	if (ptr)
1395 		slurm_destroy_federation_rec(ptr);
1396 	xfree(cluster_name);
1397 
1398 	return rc;
1399 }
1400 
1401 /*
1402  * slurm_load_job - issue RPC to get job information for one job ID
1403  * IN job_info_msg_pptr - place to store a job configuration pointer
1404  * IN job_id -  ID of job we want information about
1405  * IN show_flags -  job filtering option: 0, SHOW_ALL or SHOW_DETAIL
1406  * RET 0 or -1 on error
1407  * NOTE: free the response using slurm_free_job_info_msg
1408  */
1409 extern int
slurm_load_job(job_info_msg_t ** job_info_msg_pptr,uint32_t job_id,uint16_t show_flags)1410 slurm_load_job (job_info_msg_t **job_info_msg_pptr, uint32_t job_id,
1411 		uint16_t show_flags)
1412 {
1413 	slurm_msg_t req_msg;
1414 	job_id_msg_t req;
1415 	char *cluster_name = NULL;
1416 	void *ptr = NULL;
1417 	slurmdb_federation_rec_t *fed;
1418 	int rc;
1419 
1420 	cluster_name = slurm_get_cluster_name();
1421 	if ((show_flags & SHOW_LOCAL) == 0) {
1422 		if (slurm_load_federation(&ptr) ||
1423 		    !cluster_in_federation(ptr, cluster_name)) {
1424 			/* Not in federation */
1425 			show_flags |= SHOW_LOCAL;
1426 		}
1427 	}
1428 
1429 	memset(&req, 0, sizeof(req));
1430 	slurm_msg_t_init(&req_msg);
1431 	req.job_id       = job_id;
1432 	req.show_flags   = show_flags;
1433 	req_msg.msg_type = REQUEST_JOB_INFO_SINGLE;
1434 	req_msg.data     = &req;
1435 
1436 	/* With -M option, working_cluster_rec is set and  we only get
1437 	 * information for that cluster */
1438 	if (working_cluster_rec || !ptr || (show_flags & SHOW_LOCAL)) {
1439 		rc = _load_cluster_jobs(&req_msg, job_info_msg_pptr,
1440 					working_cluster_rec);
1441 	} else {
1442 		fed = (slurmdb_federation_rec_t *) ptr;
1443 		rc = _load_fed_jobs(&req_msg, job_info_msg_pptr, show_flags,
1444 				    cluster_name, fed);
1445 	}
1446 
1447 	if (ptr)
1448 		slurm_destroy_federation_rec(ptr);
1449 	xfree(cluster_name);
1450 
1451 	return rc;
1452 }
1453 
1454 /*
1455  * slurm_pid2jobid - issue RPC to get the slurm job_id given a process_id
1456  *	on this machine
1457  * IN job_pid     - process_id of interest on this machine
1458  * OUT job_id_ptr - place to store a slurm job_id
1459  * RET 0 or -1 on error
1460  */
1461 extern int
slurm_pid2jobid(pid_t job_pid,uint32_t * jobid)1462 slurm_pid2jobid (pid_t job_pid, uint32_t *jobid)
1463 {
1464 	int rc;
1465 	slurm_msg_t req_msg;
1466 	slurm_msg_t resp_msg;
1467 	job_id_request_msg_t req;
1468 	uint32_t cluster_flags = slurmdb_setup_cluster_flags();
1469 	char *this_addr;
1470 
1471 	slurm_msg_t_init(&req_msg);
1472 	slurm_msg_t_init(&resp_msg);
1473 
1474 	if (cluster_flags & CLUSTER_FLAG_MULTSD) {
1475 		if ((this_addr = getenv("SLURMD_NODENAME"))) {
1476 			slurm_conf_get_addr(this_addr, &req_msg.address,
1477 					    req_msg.flags);
1478 		} else {
1479 			this_addr = "localhost";
1480 			slurm_set_addr(&req_msg.address,
1481 				       (uint16_t)slurm_get_slurmd_port(),
1482 				       this_addr);
1483 		}
1484 	} else {
1485 		char this_host[256];
1486 		/*
1487 		 *  Set request message address to slurmd on localhost
1488 		 */
1489 		gethostname_short(this_host, sizeof(this_host));
1490 		this_addr = slurm_conf_get_nodeaddr(this_host);
1491 		if (this_addr == NULL)
1492 			this_addr = xstrdup("localhost");
1493 		slurm_set_addr(&req_msg.address,
1494 			       (uint16_t)slurm_get_slurmd_port(),
1495 			       this_addr);
1496 		xfree(this_addr);
1497 	}
1498 
1499 	memset(&req, 0, sizeof(req));
1500 	req.job_pid      = job_pid;
1501 	req_msg.msg_type = REQUEST_JOB_ID;
1502 	req_msg.data     = &req;
1503 
1504 	rc = slurm_send_recv_node_msg(&req_msg, &resp_msg, 0);
1505 
1506 	if ((rc != 0) || !resp_msg.auth_cred) {
1507 		error("slurm_pid2jobid: %m");
1508 		if (resp_msg.auth_cred)
1509 			g_slurm_auth_destroy(resp_msg.auth_cred);
1510 		return SLURM_ERROR;
1511 	}
1512 	if (resp_msg.auth_cred)
1513 		g_slurm_auth_destroy(resp_msg.auth_cred);
1514 	switch (resp_msg.msg_type) {
1515 	case RESPONSE_JOB_ID:
1516 		*jobid = ((job_id_response_msg_t *) resp_msg.data)->job_id;
1517 		slurm_free_job_id_response_msg(resp_msg.data);
1518 		break;
1519 	case RESPONSE_SLURM_RC:
1520 		rc = ((return_code_msg_t *) resp_msg.data)->return_code;
1521 		slurm_free_return_code_msg(resp_msg.data);
1522 		if (rc)
1523 			slurm_seterrno_ret(rc);
1524 		break;
1525 	default:
1526 		slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
1527 		break;
1528 	}
1529 
1530 	return SLURM_SUCCESS;
1531 }
1532 
1533 /*
1534  * slurm_get_rem_time - get the expected time remaining for a given job
1535  * IN jobid     - slurm job id
1536  * RET remaining time in seconds or -1 on error
1537  */
slurm_get_rem_time(uint32_t jobid)1538 extern long slurm_get_rem_time(uint32_t jobid)
1539 {
1540 	time_t now = time(NULL);
1541 	time_t end_time = 0;
1542 	long rc;
1543 
1544 	if (slurm_get_end_time(jobid, &end_time) != SLURM_SUCCESS)
1545 		return -1L;
1546 
1547 	rc = difftime(end_time, now);
1548 	if (rc < 0)
1549 		rc = 0L;
1550 	return rc;
1551 }
1552 
1553 /* FORTRAN VERSIONS OF slurm_get_rem_time */
islurm_get_rem_time__(uint32_t * jobid)1554 extern int32_t islurm_get_rem_time__(uint32_t *jobid)
1555 {
1556 	time_t now = time(NULL);
1557 	time_t end_time = 0;
1558 	int32_t rc;
1559 
1560 	if ((jobid == NULL)
1561 	    ||  (slurm_get_end_time(*jobid, &end_time)
1562 		 != SLURM_SUCCESS))
1563 		return 0;
1564 
1565 	rc = difftime(end_time, now);
1566 	if (rc < 0)
1567 		rc = 0;
1568 	return rc;
1569 }
islurm_get_rem_time2__()1570 extern int32_t islurm_get_rem_time2__()
1571 {
1572 	uint32_t jobid;
1573 	char *slurm_job_id = getenv("SLURM_JOB_ID");
1574 
1575 	if (slurm_job_id == NULL)
1576 		return 0;
1577 	jobid = atol(slurm_job_id);
1578 	return islurm_get_rem_time__(&jobid);
1579 }
1580 
1581 
1582 /*
1583  * slurm_get_end_time - get the expected end time for a given slurm job
1584  * IN jobid     - slurm job id
1585  * end_time_ptr - location in which to store scheduled end time for job
1586  * RET 0 or -1 on error
1587  */
1588 extern int
slurm_get_end_time(uint32_t jobid,time_t * end_time_ptr)1589 slurm_get_end_time(uint32_t jobid, time_t *end_time_ptr)
1590 {
1591 	int rc;
1592 	slurm_msg_t resp_msg;
1593 	slurm_msg_t req_msg;
1594 	job_alloc_info_msg_t job_msg;
1595 	srun_timeout_msg_t *timeout_msg;
1596 	time_t now = time(NULL);
1597 	static uint32_t jobid_cache = 0;
1598 	static uint32_t jobid_env = 0;
1599 	static time_t endtime_cache = 0;
1600 	static time_t last_test_time = 0;
1601 
1602 	slurm_msg_t_init(&req_msg);
1603 	slurm_msg_t_init(&resp_msg);
1604 
1605 	if (!end_time_ptr)
1606 		slurm_seterrno_ret(EINVAL);
1607 
1608 	if (jobid == 0) {
1609 		if (jobid_env) {
1610 			jobid = jobid_env;
1611 		} else {
1612 			char *env = getenv("SLURM_JOB_ID");
1613 			if (env) {
1614 				jobid = (uint32_t) atol(env);
1615 				jobid_env = jobid;
1616 			}
1617 		}
1618 		if (jobid == 0) {
1619 			slurm_seterrno(ESLURM_INVALID_JOB_ID);
1620 			return SLURM_ERROR;
1621 		}
1622 	}
1623 
1624 	/* Just use cached data if data less than 60 seconds old */
1625 	if ((jobid == jobid_cache)
1626 	&&  (difftime(now, last_test_time) < 60)) {
1627 		*end_time_ptr  = endtime_cache;
1628 		return SLURM_SUCCESS;
1629 	}
1630 
1631 	memset(&job_msg, 0, sizeof(job_msg));
1632 	job_msg.job_id     = jobid;
1633 	req_msg.msg_type   = REQUEST_JOB_END_TIME;
1634 	req_msg.data       = &job_msg;
1635 
1636 	if (slurm_send_recv_controller_msg(&req_msg, &resp_msg,
1637 					   working_cluster_rec) < 0)
1638 		return SLURM_ERROR;
1639 
1640 	switch (resp_msg.msg_type) {
1641 	case SRUN_TIMEOUT:
1642 		timeout_msg = (srun_timeout_msg_t *) resp_msg.data;
1643 		last_test_time = time(NULL);
1644 		jobid_cache    = jobid;
1645 		endtime_cache  = timeout_msg->timeout;
1646 		*end_time_ptr  = endtime_cache;
1647 		slurm_free_srun_timeout_msg(resp_msg.data);
1648 		break;
1649 	case RESPONSE_SLURM_RC:
1650 		rc = ((return_code_msg_t *) resp_msg.data)->return_code;
1651 		slurm_free_return_code_msg(resp_msg.data);
1652 		if (endtime_cache)
1653 			*end_time_ptr  = endtime_cache;
1654 		else if (rc)
1655 			slurm_seterrno_ret(rc);
1656 		break;
1657 	default:
1658 		if (endtime_cache)
1659 			*end_time_ptr  = endtime_cache;
1660 		else
1661 			slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
1662 		break;
1663 	}
1664 
1665 	return SLURM_SUCCESS;
1666 }
1667 
1668 /*
1669  * slurm_job_node_ready - report if nodes are ready for job to execute now
1670  * IN job_id - slurm job id
1671  * RET: READY_* values as defined in slurm.h
1672  */
slurm_job_node_ready(uint32_t job_id)1673 extern int slurm_job_node_ready(uint32_t job_id)
1674 {
1675 	slurm_msg_t req, resp;
1676 	job_id_msg_t msg;
1677 	int rc;
1678 
1679 	slurm_msg_t_init(&req);
1680 	slurm_msg_t_init(&resp);
1681 
1682 	memset(&msg, 0, sizeof(msg));
1683 	msg.job_id   = job_id;
1684 	req.msg_type = REQUEST_JOB_READY;
1685 	req.data     = &msg;
1686 
1687 	if (slurm_send_recv_controller_msg(&req, &resp, working_cluster_rec) <0)
1688 		return READY_JOB_ERROR;
1689 
1690 	if (resp.msg_type == RESPONSE_JOB_READY) {
1691 		rc = ((return_code_msg_t *) resp.data)->return_code;
1692 		slurm_free_return_code_msg(resp.data);
1693 	} else if (resp.msg_type == RESPONSE_SLURM_RC) {
1694 		int job_rc = ((return_code_msg_t *) resp.data) ->
1695 				return_code;
1696 		if ((job_rc == ESLURM_INVALID_PARTITION_NAME) ||
1697 		    (job_rc == ESLURM_INVALID_JOB_ID))
1698 			rc = READY_JOB_FATAL;
1699 		else	/* EAGAIN */
1700 			rc = READY_JOB_ERROR;
1701 		slurm_free_return_code_msg(resp.data);
1702 	} else if (resp.msg_type == RESPONSE_PROLOG_EXECUTING) {
1703 		rc = READY_JOB_ERROR;
1704 	} else {
1705 		rc = READY_JOB_ERROR;
1706 	}
1707 
1708 	return rc;
1709 }
1710 
slurm_job_cpus_allocated_on_node_id(job_resources_t * job_resrcs_ptr,int node_id)1711 extern int slurm_job_cpus_allocated_on_node_id(
1712 	job_resources_t *job_resrcs_ptr, int node_id)
1713 {
1714 	int i;
1715 	int start_node=-1; /* start with -1 less so the array reps
1716 			    * lines up correctly */
1717 
1718 	if (!job_resrcs_ptr || node_id < 0)
1719 		slurm_seterrno_ret(EINVAL);
1720 
1721 	for (i = 0; i < job_resrcs_ptr->cpu_array_cnt; i++) {
1722 		start_node += job_resrcs_ptr->cpu_array_reps[i];
1723 		if (start_node >= node_id)
1724 			break;
1725 	}
1726 
1727 	if (i >= job_resrcs_ptr->cpu_array_cnt)
1728 		return (0); /* nodeid not in this job */
1729 
1730 	return job_resrcs_ptr->cpu_array_value[i];
1731 }
1732 
slurm_job_cpus_allocated_on_node(job_resources_t * job_resrcs_ptr,const char * node)1733 extern int slurm_job_cpus_allocated_on_node(job_resources_t *job_resrcs_ptr,
1734 					    const char *node)
1735 {
1736 	hostlist_t node_hl;
1737 	int node_id;
1738 
1739 	if (!job_resrcs_ptr || !node || !job_resrcs_ptr->nodes)
1740 		slurm_seterrno_ret(EINVAL);
1741 
1742 	node_hl = hostlist_create(job_resrcs_ptr->nodes);
1743 	node_id = hostlist_find(node_hl, node);
1744 	hostlist_destroy(node_hl);
1745 	if (node_id == -1)
1746 		return (0); /* No cpus allocated on this node */
1747 
1748 	return slurm_job_cpus_allocated_on_node_id(job_resrcs_ptr, node_id);
1749 }
1750 
slurm_job_cpus_allocated_str_on_node_id(char * cpus,size_t cpus_len,job_resources_t * job_resrcs_ptr,int node_id)1751 int slurm_job_cpus_allocated_str_on_node_id(char *cpus,
1752 					    size_t cpus_len,
1753 					    job_resources_t *job_resrcs_ptr,
1754 					    int node_id)
1755 {
1756 	uint32_t threads = 1;
1757 	int inx = 0;
1758 	bitstr_t *cpu_bitmap;
1759 	int j, k, bit_inx, bit_reps, hi;
1760 
1761 	if (!job_resrcs_ptr || node_id < 0)
1762 		slurm_seterrno_ret(EINVAL);
1763 
1764 	/* find index in and first bit index in sock_core_rep_count[]
1765 	 * for this node id */
1766 	bit_inx = 0;
1767 	hi = node_id + 1;    /* change from 0-origin to 1-origin */
1768 	for (inx = 0; hi; inx++) {
1769 		if (hi > job_resrcs_ptr->sock_core_rep_count[inx]) {
1770 			bit_inx += job_resrcs_ptr->sockets_per_node[inx] *
1771 				   job_resrcs_ptr->cores_per_socket[inx] *
1772 				   job_resrcs_ptr->sock_core_rep_count[inx];
1773 			hi -= job_resrcs_ptr->sock_core_rep_count[inx];
1774 		} else {
1775 			bit_inx += job_resrcs_ptr->sockets_per_node[inx] *
1776 				   job_resrcs_ptr->cores_per_socket[inx] *
1777 				   (hi - 1);
1778 			break;
1779 		}
1780 	}
1781 
1782 	bit_reps = job_resrcs_ptr->sockets_per_node[inx] *
1783 		   job_resrcs_ptr->cores_per_socket[inx];
1784 
1785 	/* get the number of threads per core on this node
1786 	 */
1787 	if (job_node_ptr)
1788 		threads = job_node_ptr->node_array[node_id].threads;
1789 	cpu_bitmap = bit_alloc(bit_reps * threads);
1790 	for (j = 0; j < bit_reps; j++) {
1791 		if (bit_test(job_resrcs_ptr->core_bitmap, bit_inx)){
1792 			for (k = 0; k < threads; k++)
1793 				bit_set(cpu_bitmap,
1794 					(j * threads) + k);
1795 		}
1796 		bit_inx++;
1797 	}
1798 	bit_fmt(cpus, cpus_len, cpu_bitmap);
1799 	FREE_NULL_BITMAP(cpu_bitmap);
1800 
1801 	return SLURM_SUCCESS;
1802 }
1803 
slurm_job_cpus_allocated_str_on_node(char * cpus,size_t cpus_len,job_resources_t * job_resrcs_ptr,const char * node)1804 int slurm_job_cpus_allocated_str_on_node(char *cpus,
1805 					 size_t cpus_len,
1806 					 job_resources_t *job_resrcs_ptr,
1807 					 const char *node)
1808 {
1809 	hostlist_t node_hl;
1810 	int node_id;
1811 
1812 	if (!job_resrcs_ptr || !node || !job_resrcs_ptr->nodes)
1813 		slurm_seterrno_ret(EINVAL);
1814 
1815 	node_hl = hostlist_create(job_resrcs_ptr->nodes);
1816 	node_id = hostlist_find(node_hl, node);
1817 	hostlist_destroy(node_hl);
1818 	if (node_id == -1)
1819 		return SLURM_ERROR;
1820 
1821 	return slurm_job_cpus_allocated_str_on_node_id(cpus,
1822 						       cpus_len,
1823 						       job_resrcs_ptr,
1824 						       node_id);
1825 }
1826 
1827 /*
1828  * slurm_network_callerid - issue RPC to get the job id of a job from a remote
1829  * slurmd based upon network socket information.
1830  *
1831  * IN req - Information about network connection in question
1832  * OUT job_id -  ID of the job or NO_VAL
1833  * OUT node_name - name of the remote slurmd
1834  * IN node_name_size - size of the node_name buffer
1835  * RET SLURM_SUCCESS or SLURM_ERROR on error
1836  */
1837 extern int
slurm_network_callerid(network_callerid_msg_t req,uint32_t * job_id,char * node_name,int node_name_size)1838 slurm_network_callerid (network_callerid_msg_t req, uint32_t *job_id,
1839 	char *node_name, int node_name_size)
1840 {
1841 	int rc;
1842 	slurm_msg_t resp_msg;
1843 	slurm_msg_t req_msg;
1844 	network_callerid_resp_t *resp;
1845 	struct sockaddr_in addr;
1846 	uint32_t target_slurmd; /* change for IPv6 support */
1847 
1848 	debug("slurm_network_callerid RPC: start");
1849 
1850 	slurm_msg_t_init(&req_msg);
1851 	slurm_msg_t_init(&resp_msg);
1852 
1853 	/* ip_src is the IP we want to talk to. Hopefully there's a slurmd
1854 	 * listening there */
1855 	memset(&addr, 0, sizeof(addr));
1856 	addr.sin_family = req.af;
1857 
1858 	/* TODO: until IPv6 support is added to Slurm, we must hope that the
1859 	 * other end is IPv4 */
1860 	if (req.af == AF_INET6) {
1861 		error("IPv6 is not yet supported in Slurm");
1862 		/* For testing IPv6 callerid prior to Slurm IPv6 RPC support,
1863 		 * set a sane target, uncomment the following and comment out
1864 		 * the return code:
1865 		addr.sin_family = AF_INET;
1866 		target_slurmd = inet_addr("127.0.0.1"); //choose a test target
1867 		*/
1868 		return SLURM_ERROR;
1869 	} else
1870 		memcpy(&target_slurmd, req.ip_src, 4);
1871 
1872 	addr.sin_addr.s_addr = target_slurmd;
1873 	addr.sin_port = htons(slurm_get_slurmd_port());
1874 	req_msg.address = addr;
1875 
1876 	req_msg.msg_type = REQUEST_NETWORK_CALLERID;
1877 	req_msg.data     = &req;
1878 
1879 	if (slurm_send_recv_node_msg(&req_msg, &resp_msg, 0) < 0)
1880 		return SLURM_ERROR;
1881 
1882 	switch (resp_msg.msg_type) {
1883 		case RESPONSE_NETWORK_CALLERID:
1884 			resp = (network_callerid_resp_t*)resp_msg.data;
1885 			*job_id = resp->job_id;
1886 			strlcpy(node_name, resp->node_name, node_name_size);
1887 			break;
1888 		case RESPONSE_SLURM_RC:
1889 			rc = ((return_code_msg_t *) resp_msg.data)->return_code;
1890 			if (rc)
1891 				slurm_seterrno_ret(rc);
1892 			break;
1893 		default:
1894 			slurm_seterrno_ret(SLURM_UNEXPECTED_MSG_ERROR);
1895 			break;
1896 	}
1897 
1898 	slurm_free_network_callerid_msg(resp_msg.data);
1899 	return SLURM_SUCCESS;
1900 }
1901 
1902 static int
_load_cluster_job_prio(slurm_msg_t * req_msg,priority_factors_response_msg_t ** factors_resp,slurmdb_cluster_rec_t * cluster)1903 _load_cluster_job_prio(slurm_msg_t *req_msg,
1904 		       priority_factors_response_msg_t **factors_resp,
1905 		       slurmdb_cluster_rec_t *cluster)
1906 {
1907 	slurm_msg_t resp_msg;
1908 	int rc = SLURM_SUCCESS;
1909 
1910 	slurm_msg_t_init(&resp_msg);
1911 
1912 	if (slurm_send_recv_controller_msg(req_msg, &resp_msg, cluster) < 0)
1913 		return SLURM_ERROR;
1914 
1915 	switch (resp_msg.msg_type) {
1916 	case RESPONSE_PRIORITY_FACTORS:
1917 		*factors_resp =
1918 			(priority_factors_response_msg_t *) resp_msg.data;
1919 		resp_msg.data = NULL;
1920 		break;
1921 	case RESPONSE_SLURM_RC:
1922 		rc = ((return_code_msg_t *) resp_msg.data)->return_code;
1923 		slurm_free_return_code_msg(resp_msg.data);
1924 		break;
1925 	default:
1926 		rc = SLURM_UNEXPECTED_MSG_ERROR;
1927 		break;
1928 	}
1929 	if (rc)
1930 		slurm_seterrno(rc);
1931 
1932 	return rc;
1933 }
1934 
1935 /* Sort responses so local cluster response is first */
_local_resp_first_prio(void * x,void * y)1936 static int _local_resp_first_prio(void *x, void *y)
1937 {
1938 	load_job_prio_resp_struct_t *resp_x = *(load_job_prio_resp_struct_t **)x;
1939 	load_job_prio_resp_struct_t *resp_y = *(load_job_prio_resp_struct_t **)y;
1940 
1941 	if (resp_x->local_cluster)
1942 		return -1;
1943 	if (resp_y->local_cluster)
1944 		return 1;
1945 	return 0;
1946 }
1947 
1948 /* Add cluster_name to job priority info records */
_add_cluster_name(priority_factors_response_msg_t * new_msg,char * cluster_name)1949 static void _add_cluster_name(priority_factors_response_msg_t *new_msg,
1950 			      char *cluster_name)
1951 {
1952 	priority_factors_object_t *prio_obj;
1953 	ListIterator iter;
1954 
1955 	if (!new_msg || !new_msg->priority_factors_list)
1956 		return;
1957 
1958 	iter = list_iterator_create(new_msg->priority_factors_list);
1959 	while ((prio_obj = (priority_factors_object_t *) list_next(iter)))
1960 		prio_obj->cluster_name = xstrdup(cluster_name);
1961 	list_iterator_destroy(iter);
1962 }
1963 
1964 /* Thread to read job priority factor information from some cluster */
_load_job_prio_thread(void * args)1965 static void *_load_job_prio_thread(void *args)
1966 {
1967 	load_job_req_struct_t *load_args = (load_job_req_struct_t *) args;
1968 	slurmdb_cluster_rec_t *cluster = load_args->cluster;
1969 	priority_factors_response_msg_t *new_msg = NULL;
1970 	int rc;
1971 
1972 	if ((rc = _load_cluster_job_prio(load_args->req_msg, &new_msg,
1973 					 cluster)) || !new_msg) {
1974 		verbose("Error reading job information from cluster %s: %s",
1975 			cluster->name, slurm_strerror(rc));
1976 	} else {
1977 		load_job_prio_resp_struct_t *job_resp;
1978 		_add_cluster_name(new_msg, cluster->name);
1979 
1980 
1981 		job_resp = xmalloc(sizeof(load_job_prio_resp_struct_t));
1982 		job_resp->local_cluster = load_args->local_cluster;
1983 		job_resp->new_msg = new_msg;
1984 		list_append(load_args->resp_msg_list, job_resp);
1985 	}
1986 	xfree(args);
1987 
1988 	return (void *) NULL;
1989 }
1990 
_load_fed_job_prio(slurm_msg_t * req_msg,priority_factors_response_msg_t ** factors_resp,uint16_t show_flags,char * cluster_name,slurmdb_federation_rec_t * fed)1991 static int _load_fed_job_prio(slurm_msg_t *req_msg,
1992 			      priority_factors_response_msg_t **factors_resp,
1993 			      uint16_t show_flags, char *cluster_name,
1994 			      slurmdb_federation_rec_t *fed)
1995 {
1996 	int i, j;
1997 	int local_job_cnt = 0;
1998 	load_job_prio_resp_struct_t *job_resp;
1999 	priority_factors_response_msg_t *orig_msg = NULL, *new_msg = NULL;
2000 	priority_factors_object_t *prio_obj;
2001 	uint32_t hash_job_inx, *hash_tbl_size = NULL, **hash_job_id = NULL;
2002 	uint32_t hash_part_inx, **hash_part_id = NULL;
2003 	slurmdb_cluster_rec_t *cluster;
2004 	ListIterator iter;
2005 	int pthread_count = 0;
2006 	pthread_t *load_thread = 0;
2007 	load_job_req_struct_t *load_args;
2008 	List resp_msg_list;
2009 
2010 	*factors_resp = NULL;
2011 
2012 	/* Spawn one pthread per cluster to collect job information */
2013 	resp_msg_list = list_create(NULL);
2014 	load_thread = xmalloc(sizeof(pthread_t) *
2015 			      list_count(fed->cluster_list));
2016 	iter = list_iterator_create(fed->cluster_list);
2017 	while ((cluster = (slurmdb_cluster_rec_t *) list_next(iter))) {
2018 		bool local_cluster = false;
2019 		if ((cluster->control_host == NULL) ||
2020 		    (cluster->control_host[0] == '\0'))
2021 			continue;	/* Cluster down */
2022 
2023 		if (!xstrcmp(cluster->name, cluster_name))
2024 			local_cluster = true;
2025 		if ((show_flags & SHOW_LOCAL) && !local_cluster)
2026 			continue;
2027 
2028 		load_args = xmalloc(sizeof(load_job_req_struct_t));
2029 		load_args->cluster = cluster;
2030 		load_args->local_cluster = local_cluster;
2031 		load_args->req_msg = req_msg;
2032 		load_args->resp_msg_list = resp_msg_list;
2033 		slurm_thread_create(&load_thread[pthread_count],
2034 				    _load_job_prio_thread, load_args);
2035 		pthread_count++;
2036 
2037 	}
2038 	list_iterator_destroy(iter);
2039 
2040 	/* Wait for all pthreads to complete */
2041 	for (i = 0; i < pthread_count; i++)
2042 		pthread_join(load_thread[i], NULL);
2043 	xfree(load_thread);
2044 
2045 	/* Move the response from the local cluster (if any) to top of list */
2046 	list_sort(resp_msg_list, _local_resp_first_prio);
2047 
2048 	/* Merge the responses into a single response message */
2049 	iter = list_iterator_create(resp_msg_list);
2050 	while ((job_resp = (load_job_prio_resp_struct_t *) list_next(iter))) {
2051 		new_msg = job_resp->new_msg;
2052 		if (!new_msg->priority_factors_list) {
2053 			/* Just skip this one. */
2054 		} else if (!orig_msg) {
2055 			orig_msg = new_msg;
2056 			if (job_resp->local_cluster) {
2057 				local_job_cnt = list_count(
2058 						new_msg->priority_factors_list);
2059 			}
2060 			*factors_resp = orig_msg;
2061 		} else {
2062 			/* Merge prio records into a single response message */
2063 			list_transfer(orig_msg->priority_factors_list,
2064 				      new_msg->priority_factors_list);
2065 			FREE_NULL_LIST(new_msg->priority_factors_list);
2066 			xfree(new_msg);
2067 		}
2068 		xfree(job_resp);
2069 	}
2070 	list_iterator_destroy(iter);
2071 	FREE_NULL_LIST(resp_msg_list);
2072 
2073 	/* In a single cluster scenario with no jobs, the priority_factors_list
2074 	 * will be NULL. sprio will handle this above. If the user requests
2075 	 * specific jobids it will give the corresponding error otherwise the
2076 	 * header will be printed and no jobs will be printed out. */
2077 	if (!*factors_resp) {
2078 		*factors_resp =
2079 			xmalloc(sizeof(priority_factors_response_msg_t));
2080 		return SLURM_SUCCESS;
2081 	}
2082 
2083 	/* Find duplicate job records and jobs local to other clusters and set
2084 	 * their job_id == 0 so they get skipped in reporting */
2085 	if ((show_flags & SHOW_SIBLING) == 0) {
2086 		hash_tbl_size = xmalloc(sizeof(uint32_t)   * JOB_HASH_SIZE);
2087 		hash_job_id   = xmalloc(sizeof(uint32_t *) * JOB_HASH_SIZE);
2088 		hash_part_id  = xmalloc(sizeof(uint32_t *) * JOB_HASH_SIZE);
2089 		for (i = 0; i < JOB_HASH_SIZE; i++) {
2090 			hash_tbl_size[i] = 100;
2091 			hash_job_id[i]  = xmalloc(sizeof(uint32_t) *
2092 						  hash_tbl_size[i]);
2093 			hash_part_id[i] = xmalloc(sizeof(uint32_t) *
2094 						  hash_tbl_size[i]);
2095 		}
2096 	}
2097 	iter = list_iterator_create(orig_msg->priority_factors_list);
2098 	i = 0;
2099 	while ((prio_obj = (priority_factors_object_t *) list_next(iter))) {
2100 		bool found_job = false, local_cluster = false;
2101 		if (i++ < local_job_cnt) {
2102 			local_cluster = true;
2103 		} else if (_test_local_job(prio_obj->job_id)) {
2104 			list_delete_item(iter);
2105 			continue;
2106 		}
2107 
2108 		if (show_flags & SHOW_SIBLING)
2109 			continue;
2110 		hash_job_inx = prio_obj->job_id % JOB_HASH_SIZE;
2111 		if (prio_obj->partition) {
2112 			HASH_FCN(prio_obj->partition,
2113 				 strlen(prio_obj->partition), hash_part_inx);
2114 		} else {
2115 			hash_part_inx = 0;
2116 		}
2117 		for (j = 0;
2118 		     ((j < hash_tbl_size[hash_job_inx]) &&
2119 		      hash_job_id[hash_job_inx][j]); j++) {
2120 			if ((prio_obj->job_id ==
2121 			     hash_job_id[hash_job_inx][j]) &&
2122 			    (hash_part_inx == hash_part_id[hash_job_inx][j])) {
2123 				found_job = true;
2124 				break;
2125 			}
2126 		}
2127 		if (found_job && local_cluster) {
2128 			/* Local job in multiple partitions */
2129 			continue;
2130 		} if (found_job) {
2131 			/* Duplicate remote job,
2132 			 * possible in multiple partitions */
2133 			list_delete_item(iter);
2134 			continue;
2135 		} else if (j >= hash_tbl_size[hash_job_inx]) {
2136 			hash_tbl_size[hash_job_inx] *= 2;
2137 			xrealloc(hash_job_id[hash_job_inx],
2138 				 sizeof(uint32_t) * hash_tbl_size[hash_job_inx]);
2139 		}
2140 		hash_job_id[hash_job_inx][j]  = prio_obj->job_id;
2141 		hash_part_id[hash_job_inx][j] = hash_part_inx;
2142 	}
2143 	list_iterator_destroy(iter);
2144 	if ((show_flags & SHOW_SIBLING) == 0) {
2145 		for (i = 0; i < JOB_HASH_SIZE; i++) {
2146 			xfree(hash_job_id[i]);
2147 			xfree(hash_part_id[i]);
2148 		}
2149 		xfree(hash_tbl_size);
2150 		xfree(hash_job_id);
2151 		xfree(hash_part_id);
2152 	}
2153 
2154 	return SLURM_SUCCESS;
2155 }
2156 
2157 /*
2158  * slurm_load_job_prio - issue RPC to get job priority information for
2159  *	jobs which pass filter test
2160  * OUT factors_resp - job priority factors
2161  * IN job_id_list - list of job IDs to be reported
2162  * IN partitions - comma delimited list of partition names to be reported
2163  * IN uid_list - list of user IDs to be reported
2164  * IN show_flags -  job filtering option: 0, SHOW_LOCAL and/or SHOW_SIBLING
2165  * RET 0 or -1 on error
2166  * NOTE: free the response using slurm_free_priority_factors_response_msg()
2167  */
2168 extern int
slurm_load_job_prio(priority_factors_response_msg_t ** factors_resp,List job_id_list,char * partitions,List uid_list,uint16_t show_flags)2169 slurm_load_job_prio(priority_factors_response_msg_t **factors_resp,
2170 		    List job_id_list, char *partitions, List uid_list,
2171 		    uint16_t show_flags)
2172 {
2173 	slurm_msg_t req_msg;
2174 	priority_factors_request_msg_t factors_req;
2175 	char *cluster_name = NULL;
2176 	void *ptr = NULL;
2177 	slurmdb_federation_rec_t *fed;
2178 	int rc;
2179 
2180 	cluster_name = slurm_get_cluster_name();
2181 	if ((show_flags & SHOW_FEDERATION) && !(show_flags & SHOW_LOCAL) &&
2182 	    (slurm_load_federation(&ptr) == SLURM_SUCCESS) &&
2183 	    cluster_in_federation(ptr, cluster_name)) {
2184 		/* In federation. Need full info from all clusters */
2185 		show_flags &= (~SHOW_LOCAL);
2186 	} else {
2187 		/* Report local cluster info only */
2188 		show_flags |= SHOW_LOCAL;
2189 		show_flags &= (~SHOW_FEDERATION);
2190 	}
2191 
2192 	memset(&factors_req, 0, sizeof(factors_req));
2193 	factors_req.job_id_list = job_id_list;
2194 	factors_req.partitions  = partitions;
2195 	factors_req.uid_list    = uid_list;
2196 
2197 	slurm_msg_t_init(&req_msg);
2198 	req_msg.msg_type = REQUEST_PRIORITY_FACTORS;
2199 	req_msg.data     = &factors_req;
2200 
2201 	/* With -M option, working_cluster_rec is set and  we only get
2202 	 * information for that cluster */
2203 	if (show_flags & SHOW_FEDERATION) {
2204 		fed = (slurmdb_federation_rec_t *) ptr;
2205 		rc = _load_fed_job_prio(&req_msg, factors_resp, show_flags,
2206 					cluster_name, fed);
2207 	} else {
2208 		rc = _load_cluster_job_prio(&req_msg, factors_resp,
2209 					    working_cluster_rec);
2210 	}
2211 
2212 	if (ptr)
2213 		slurm_destroy_federation_rec(ptr);
2214 	xfree(cluster_name);
2215 
2216 	return rc;
2217 }
2218