1 /*****************************************************************************\
2  *  slurm_jobacct_gather.c - implementation-independent job accounting logging
3  *  functions
4  *****************************************************************************
5  *  Copyright (C) 2003-2007/ The Regents of the University of California.
6  *  Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7  *  Written by Jay Windley <jwindley@lnxi.com>, Morris Jette <jette1@llnl.com>
8  *  CODE-OCEC-09-009. All rights reserved.
9  *
10  *  Copyright (C) 2005 Hewlett-Packard Development Company, L.P.
11  *
12  *  This file is part of Slurm, a resource management program.
13  *  For details, see <https://slurm.schedmd.com/>.
14  *  Please also read the included file: DISCLAIMER.
15  *
16  *  Slurm is free software; you can redistribute it and/or modify it under
17  *  the terms of the GNU General Public License as published by the Free
18  *  Software Foundation; either version 2 of the License, or (at your option)
19  *  any later version.
20  *
21  *  In addition, as a special exception, the copyright holders give permission
22  *  to link the code of portions of this program with the OpenSSL library under
23  *  certain conditions as described in each individual source file, and
24  *  distribute linked combinations including the two. You must obey the GNU
25  *  General Public License in all respects for all of the code used other than
26  *  OpenSSL. If you modify file(s) with this exception, you may extend this
27  *  exception to your version of the file(s), but you are not obligated to do
28  *  so. If you do not wish to do so, delete this exception statement from your
29  *  version.  If you delete this exception statement from all source files in
30  *  the program, then also delete it here.
31  *
32  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
33  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
34  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
35  *  details.
36  *
37  *  You should have received a copy of the GNU General Public License along
38  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
39  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
40 \*****************************************************************************/
41 
42 /*****************************************************************************\
43  *  Modification history
44  *
45  *  19 Jan 2005 by Andy Riebs <andy.riebs@hp.com>
46  *  	 This file is derived from the file slurm_jobcomp.c, written by
47  *  	 Morris Jette, et al.
48 \*****************************************************************************/
49 
50 #ifdef HAVE_CONFIG_H
51 #  include "config.h"
52 #endif
53 
54 #include <pthread.h>
55 #include <signal.h>
56 #include <stdlib.h>
57 #include <string.h>
58 
59 #if HAVE_SYS_PRCTL_H
60 #  include <sys/prctl.h>
61 #endif
62 
63 #include "src/common/assoc_mgr.h"
64 #include "src/common/macros.h"
65 #include "src/common/pack.h"
66 #include "src/common/plugin.h"
67 #include "src/common/plugrack.h"
68 #include "src/common/read_config.h"
69 #include "src/common/slurm_acct_gather_profile.h"
70 #include "src/common/slurm_jobacct_gather.h"
71 #include "src/common/slurmdbd_defs.h"
72 #include "src/common/xmalloc.h"
73 #include "src/common/xstring.h"
74 #include "src/slurmd/slurmstepd/slurmstepd_job.h"
75 #include "src/slurmdbd/read_config.h"
76 
77 #define KB_ADJ 1024
78 #define MB_ADJ 1048576
79 
80 /*
81 ** Define slurm-specific aliases for use by plugins, see slurm_xlator.h
82 ** for details.
83  */
84 strong_alias(jobacctinfo_pack, slurm_jobacctinfo_pack);
85 strong_alias(jobacctinfo_unpack, slurm_jobacctinfo_unpack);
86 strong_alias(jobacctinfo_create, slurm_jobacctinfo_create);
87 strong_alias(jobacctinfo_destroy, slurm_jobacctinfo_destroy);
88 
89 typedef struct slurm_jobacct_gather_ops {
90 	void (*poll_data) (List task_list, bool pgid_plugin, uint64_t cont_id,
91 			   bool profile);
92 	int (*endpoll)    ();
93 	int (*add_task)   (pid_t pid, jobacct_id_t *jobacct_id);
94 } slurm_jobacct_gather_ops_t;
95 
96 /*
97  * These strings must be in the same order as the fields declared
98  * for slurm_jobacct_gather_ops_t.
99  */
100 static const char *syms[] = {
101 	"jobacct_gather_p_poll_data",
102 	"jobacct_gather_p_endpoll",
103 	"jobacct_gather_p_add_task",
104 };
105 
106 static slurm_jobacct_gather_ops_t ops;
107 static plugin_context_t *g_context = NULL;
108 static pthread_mutex_t g_context_lock = PTHREAD_MUTEX_INITIALIZER;
109 static bool init_run = false;
110 static pthread_mutex_t init_run_mutex = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_t watch_tasks_thread_id = 0;
112 
113 static int freq = 0;
114 static bool pgid_plugin = false;
115 static List task_list = NULL;
116 static uint64_t cont_id = NO_VAL64;
117 static pthread_mutex_t task_list_lock = PTHREAD_MUTEX_INITIALIZER;
118 
119 static bool jobacct_shutdown = true;
120 static pthread_mutex_t jobacct_shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
121 static bool plugin_polling = true;
122 
123 static uint32_t jobacct_job_id     = 0;
124 static uint32_t jobacct_step_id    = 0;
125 static uint64_t jobacct_mem_limit  = 0;
126 static uint64_t jobacct_vmem_limit = 0;
127 static acct_gather_profile_timer_t *profile_timer =
128 	&acct_gather_profile_timer[PROFILE_TASK];
129 
_init_tres_usage(struct jobacctinfo * jobacct,jobacct_id_t * jobacct_id,uint32_t tres_cnt)130 static void _init_tres_usage(struct jobacctinfo *jobacct,
131 			     jobacct_id_t *jobacct_id,
132 			     uint32_t tres_cnt)
133 {
134 	int alloc_size, i;
135 
136 	jobacct->tres_count = tres_cnt;
137 
138 	jobacct->tres_ids = xcalloc(tres_cnt, sizeof(uint32_t));
139 
140 	alloc_size = tres_cnt * sizeof(uint64_t);
141 
142 	jobacct->tres_usage_in_max = xmalloc(alloc_size);
143 	jobacct->tres_usage_in_max_nodeid = xmalloc(alloc_size);
144 	jobacct->tres_usage_in_max_taskid = xmalloc(alloc_size);
145 	jobacct->tres_usage_in_min = xmalloc(alloc_size);
146 	jobacct->tres_usage_in_min_nodeid = xmalloc(alloc_size);
147 	jobacct->tres_usage_in_min_taskid = xmalloc(alloc_size);
148 	jobacct->tres_usage_in_tot = xmalloc(alloc_size);
149 	jobacct->tres_usage_out_max = xmalloc(alloc_size);
150 	jobacct->tres_usage_out_max_nodeid = xmalloc(alloc_size);
151 	jobacct->tres_usage_out_max_taskid = xmalloc(alloc_size);
152 	jobacct->tres_usage_out_min = xmalloc(alloc_size);
153 	jobacct->tres_usage_out_min_nodeid = xmalloc(alloc_size);
154 	jobacct->tres_usage_out_min_taskid = xmalloc(alloc_size);
155 	jobacct->tres_usage_out_tot = xmalloc(alloc_size);
156 
157 	for (i = 0; i < jobacct->tres_count; i++) {
158 		jobacct->tres_ids[i] =
159 			assoc_mgr_tres_array ? assoc_mgr_tres_array[i]->id : i;
160 
161 		jobacct->tres_usage_in_min[i] = INFINITE64;
162 		jobacct->tres_usage_in_max[i] = INFINITE64;
163 		jobacct->tres_usage_in_tot[i] = INFINITE64;
164 		jobacct->tres_usage_out_max[i] = INFINITE64;
165 		jobacct->tres_usage_out_min[i] = INFINITE64;
166 		jobacct->tres_usage_out_tot[i] = INFINITE64;
167 
168 		if (jobacct_id && jobacct_id->taskid != NO_VAL) {
169 			jobacct->tres_usage_in_max_taskid[i] =
170 				(uint64_t) jobacct_id->taskid;
171 			jobacct->tres_usage_in_min_taskid[i] =
172 				(uint64_t) jobacct_id->taskid;
173 			jobacct->tres_usage_out_max_taskid[i] =
174 				(uint64_t) jobacct_id->taskid;
175 			jobacct->tres_usage_out_min_taskid[i] =
176 				(uint64_t) jobacct_id->taskid;
177 		} else {
178 			jobacct->tres_usage_in_max_taskid[i] = INFINITE64;
179 			jobacct->tres_usage_in_min_taskid[i] = INFINITE64;
180 			jobacct->tres_usage_out_max_taskid[i] = INFINITE64;
181 			jobacct->tres_usage_out_min_taskid[i] = INFINITE64;
182 		}
183 
184 		if (jobacct_id && jobacct_id->nodeid != NO_VAL) {
185 			jobacct->tres_usage_in_max_nodeid[i] =
186 				(uint64_t) jobacct_id->nodeid;
187 			jobacct->tres_usage_in_min_nodeid[i] =
188 				(uint64_t) jobacct_id->nodeid;
189 			jobacct->tres_usage_out_max_nodeid[i] =
190 				(uint64_t) jobacct_id->nodeid;
191 			jobacct->tres_usage_out_min_nodeid[i] =
192 				(uint64_t) jobacct_id->nodeid;
193 		} else {
194 			jobacct->tres_usage_in_max_nodeid[i] = INFINITE64;
195 			jobacct->tres_usage_in_min_nodeid[i] = INFINITE64;
196 			jobacct->tres_usage_out_max_nodeid[i] = INFINITE64;
197 			jobacct->tres_usage_out_min_nodeid[i] = INFINITE64;
198 		}
199 	}
200 }
201 
_free_tres_usage(struct jobacctinfo * jobacct)202 static void _free_tres_usage(struct jobacctinfo *jobacct)
203 {
204 
205 	if (jobacct) {
206 		xfree(jobacct->tres_ids);
207 
208 		if (jobacct->tres_list &&
209 		    (jobacct->tres_list != assoc_mgr_tres_list))
210 			FREE_NULL_LIST(jobacct->tres_list);
211 
212 		xfree(jobacct->tres_usage_in_max);
213 		xfree(jobacct->tres_usage_in_max_nodeid);
214 		xfree(jobacct->tres_usage_in_max_taskid);
215 		xfree(jobacct->tres_usage_in_min);
216 		xfree(jobacct->tres_usage_in_min_nodeid);
217 		xfree(jobacct->tres_usage_in_min_taskid);
218 		xfree(jobacct->tres_usage_in_tot);
219 		xfree(jobacct->tres_usage_out_max);
220 		xfree(jobacct->tres_usage_out_max_nodeid);
221 		xfree(jobacct->tres_usage_out_max_taskid);
222 		xfree(jobacct->tres_usage_out_min);
223 		xfree(jobacct->tres_usage_out_min_nodeid);
224 		xfree(jobacct->tres_usage_out_min_taskid);
225 		xfree(jobacct->tres_usage_out_tot);
226 	}
227 }
228 
_copy_tres_usage(jobacctinfo_t ** dest_jobacct,jobacctinfo_t * source_jobacct)229 static void _copy_tres_usage(jobacctinfo_t **dest_jobacct,
230 			     jobacctinfo_t *source_jobacct)
231 {
232 	uint32_t i=0;
233 
234 	xassert(dest_jobacct);
235 
236 	if (!*dest_jobacct)
237 		*dest_jobacct = xmalloc(sizeof(jobacctinfo_t));
238 	else
239 		_free_tres_usage(*dest_jobacct);
240 
241 	memcpy(*dest_jobacct, source_jobacct, sizeof(jobacctinfo_t));
242 
243 	_init_tres_usage(*dest_jobacct, NULL, source_jobacct->tres_count);
244 
245 	for (i = 0; i < source_jobacct->tres_count; i++) {
246 		(*dest_jobacct)->tres_usage_in_max[i] =
247 			source_jobacct->tres_usage_in_max[i];
248 		(*dest_jobacct)->tres_usage_in_max_nodeid[i] =
249 			source_jobacct->tres_usage_in_max_nodeid[i];
250 		(*dest_jobacct)->tres_usage_in_max_taskid[i] =
251 			source_jobacct->tres_usage_in_max_taskid[i];
252 		(*dest_jobacct)->tres_usage_in_min[i] =
253 			source_jobacct->tres_usage_in_min[i];
254 		(*dest_jobacct)->tres_usage_in_min_nodeid[i] =
255 			source_jobacct->tres_usage_in_min_nodeid[i];
256 		(*dest_jobacct)->tres_usage_in_min_taskid[i] =
257 			source_jobacct->tres_usage_in_min_taskid[i];
258 		(*dest_jobacct)->tres_usage_in_tot[i] =
259 			source_jobacct->tres_usage_in_tot[i];
260 		(*dest_jobacct)->tres_usage_out_max[i] =
261 			source_jobacct->tres_usage_out_max[i];
262 		(*dest_jobacct)->tres_usage_out_max_nodeid[i] =
263 			source_jobacct->tres_usage_out_max_nodeid[i];
264 		(*dest_jobacct)->tres_usage_out_max_taskid[i] =
265 			source_jobacct->tres_usage_out_max_taskid[i];
266 		(*dest_jobacct)->tres_usage_out_min[i] =
267 			source_jobacct->tres_usage_out_min[i];
268 		(*dest_jobacct)->tres_usage_out_min_nodeid[i] =
269 			source_jobacct->tres_usage_out_min_nodeid[i];
270 		(*dest_jobacct)->tres_usage_out_min_taskid[i] =
271 			source_jobacct->tres_usage_out_min_taskid[i];
272 		(*dest_jobacct)->tres_usage_out_tot[i] =
273 			source_jobacct->tres_usage_out_tot[i];
274 	}
275 
276 	return;
277 }
278 
279 /* _acct_kill_step() issue RPC to kill a slurm job step */
_acct_kill_step(void)280 static void _acct_kill_step(void)
281 {
282 	slurm_msg_t msg;
283 	job_step_kill_msg_t req;
284 	job_notify_msg_t notify_req;
285 
286 	slurm_msg_t_init(&msg);
287 	notify_req.job_id      = jobacct_job_id;
288 	notify_req.job_step_id = jobacct_step_id;
289 	notify_req.message     = "Exceeded job memory limit";
290 	msg.msg_type    = REQUEST_JOB_NOTIFY;
291 	msg.data        = &notify_req;
292 	slurm_send_only_controller_msg(&msg, working_cluster_rec);
293 
294 	/*
295 	 * Request message:
296 	 */
297 	memset(&req, 0, sizeof(job_step_kill_msg_t));
298 	req.job_id      = jobacct_job_id;
299 	req.job_step_id = jobacct_step_id;
300 	req.signal      = SIGKILL;
301 	req.flags       = 0;
302 	msg.msg_type    = REQUEST_CANCEL_JOB_STEP;
303 	msg.data        = &req;
304 
305 	slurm_send_only_controller_msg(&msg, working_cluster_rec);
306 }
307 
_jobacct_shutdown_test(void)308 static bool _jobacct_shutdown_test(void)
309 {
310 	bool rc;
311 	slurm_mutex_lock(&jobacct_shutdown_mutex);
312 	rc = jobacct_shutdown;
313 	slurm_mutex_unlock(&jobacct_shutdown_mutex);
314 	return rc;
315 }
316 
_poll_data(bool profile)317 static void _poll_data(bool profile)
318 {
319 	/* Update the data */
320 	slurm_mutex_lock(&task_list_lock);
321 	if (task_list)
322 		(*(ops.poll_data))(task_list, pgid_plugin, cont_id, profile);
323 	slurm_mutex_unlock(&task_list_lock);
324 }
325 
_init_run_test(void)326 static bool _init_run_test(void)
327 {
328 	bool rc;
329 	slurm_mutex_lock(&init_run_mutex);
330 	rc = init_run;
331 	slurm_mutex_unlock(&init_run_mutex);
332 	return rc;
333 }
334 
335 /* _watch_tasks() -- monitor slurm jobs and track their memory usage
336  */
337 
_watch_tasks(void * arg)338 static void *_watch_tasks(void *arg)
339 {
340 #if HAVE_SYS_PRCTL_H
341 	if (prctl(PR_SET_NAME, "acctg", NULL, NULL, NULL) < 0) {
342 		error("%s: cannot set my name to %s %m", __func__, "acctg");
343 	}
344 #endif
345 
346 	while (_init_run_test() && !_jobacct_shutdown_test() &&
347 	       acct_gather_profile_test()) {
348 		/* Do this until shutdown is requested */
349 		slurm_mutex_lock(&profile_timer->notify_mutex);
350 		slurm_cond_wait(&profile_timer->notify,
351 				&profile_timer->notify_mutex);
352 		slurm_mutex_unlock(&profile_timer->notify_mutex);
353 
354 		/* shutting down, woken by jobacct_gather_fini() */
355 		if (!_init_run_test())
356 			break;
357 
358 		slurm_mutex_lock(&g_context_lock);
359 		/* The initial poll is done after the last task is added */
360 		_poll_data(1);
361 		slurm_mutex_unlock(&g_context_lock);
362 
363 	}
364 	return NULL;
365 }
366 
_jobacctinfo_create_tres_usage(jobacct_id_t * jobacct_id,struct jobacctinfo * jobacct)367 static void _jobacctinfo_create_tres_usage(jobacct_id_t *jobacct_id,
368 					   struct jobacctinfo *jobacct)
369 {
370 	assoc_mgr_lock_t locks = { .tres = READ_LOCK };
371 
372 	assoc_mgr_lock(&locks);
373 	_init_tres_usage(jobacct, jobacct_id, g_tres_count);
374 	assoc_mgr_unlock(&locks);
375 }
376 
_jobacctinfo_aggregate_tres_usage(jobacctinfo_t * dest,jobacctinfo_t * from)377 static void _jobacctinfo_aggregate_tres_usage(jobacctinfo_t *dest,
378 					      jobacctinfo_t *from)
379 {
380 	uint32_t i = 0;
381 
382 	for (i = 0; i < dest->tres_count; i++) {
383 		if (from->tres_usage_in_max[i] != INFINITE64) {
384 			if ((dest->tres_usage_in_max[i] == INFINITE64) ||
385 			    (dest->tres_usage_in_max[i] <
386 			     from->tres_usage_in_max[i])) {
387 				dest->tres_usage_in_max[i] =
388 					from->tres_usage_in_max[i];
389 				/*
390 				 * At the time of writing Energy was only on a
391 				 * per node basis.
392 				 */
393 				if (i != TRES_ARRAY_ENERGY)
394 					dest->tres_usage_in_max_taskid[i] =
395 						from->
396 						tres_usage_in_max_taskid[i];
397 				dest->tres_usage_in_max_nodeid[i] =
398 					from->tres_usage_in_max_nodeid[i];
399 			}
400 		}
401 
402 		if (from->tres_usage_in_min[i] != INFINITE64) {
403 			if ((dest->tres_usage_in_min[i] == INFINITE64) ||
404 			    (dest->tres_usage_in_min[i] >
405 			     from->tres_usage_in_min[i])) {
406 				dest->tres_usage_in_min[i] =
407 					from->tres_usage_in_min[i];
408 				/*
409 				 * At the time of writing Energy was only on a
410 				 * per node basis.
411 				 */
412 				if (i != TRES_ARRAY_ENERGY)
413 					dest->tres_usage_in_min_taskid[i] =
414 						from->
415 						tres_usage_in_min_taskid[i];
416 				dest->tres_usage_in_min_nodeid[i] =
417 					from->tres_usage_in_min_nodeid[i];
418 			}
419 		}
420 
421 		if (from->tres_usage_in_tot[i] != INFINITE64) {
422 			if (dest->tres_usage_in_tot[i] == INFINITE64)
423 				dest->tres_usage_in_tot[i] =
424 					from->tres_usage_in_tot[i];
425 			else
426 				dest->tres_usage_in_tot[i] +=
427 					from->tres_usage_in_tot[i];
428 		}
429 
430 		if (from->tres_usage_out_max[i] != INFINITE64) {
431 			if ((dest->tres_usage_out_max[i] == INFINITE64) ||
432 			    (dest->tres_usage_out_max[i] <
433 			     from->tres_usage_out_max[i])) {
434 				dest->tres_usage_out_max[i] =
435 					from->tres_usage_out_max[i];
436 				/*
437 				 * At the time of writing Energy was only on a
438 				 * per node basis.
439 				 */
440 				if (i != TRES_ARRAY_ENERGY)
441 					dest->tres_usage_out_max_taskid[i] =
442 						from->
443 						tres_usage_out_max_taskid[i];
444 				dest->tres_usage_out_max_nodeid[i] =
445 					from->tres_usage_out_max_nodeid[i];
446 			}
447 		}
448 
449 		if (from->tres_usage_out_min[i] != INFINITE64) {
450 			if ((dest->tres_usage_out_min[i] == INFINITE64) ||
451 			    (dest->tres_usage_out_min[i] >
452 			     from->tres_usage_out_min[i])) {
453 				dest->tres_usage_out_min[i] =
454 					from->tres_usage_out_min[i];
455 				/*
456 				 * At the time of writing Energy was only on a
457 				 * per node basis.
458 				 */
459 				if (i != TRES_ARRAY_ENERGY)
460 					dest->tres_usage_out_min_taskid[i] =
461 						from->
462 						tres_usage_out_min_taskid[i];
463 				dest->tres_usage_out_min_nodeid[i] =
464 					from->tres_usage_out_min_nodeid[i];
465 			}
466 		}
467 
468 		if (from->tres_usage_out_tot[i] != INFINITE64) {
469 			if (dest->tres_usage_out_tot[i] == INFINITE64)
470 				dest->tres_usage_out_tot[i] =
471 					from->tres_usage_out_tot[i];
472 			else
473 				dest->tres_usage_out_tot[i] +=
474 					from->tres_usage_out_tot[i];
475 		}
476 	}
477 }
478 
_jobacctinfo_2_stats_tres_usage(slurmdb_stats_t * stats,jobacctinfo_t * jobacct)479 static void _jobacctinfo_2_stats_tres_usage(slurmdb_stats_t *stats,
480 					    jobacctinfo_t *jobacct)
481 {
482 	assoc_mgr_lock_t locks = { .tres = READ_LOCK };
483 	uint32_t flags = TRES_STR_FLAG_ALLOW_REAL | TRES_STR_FLAG_SIMPLE;
484 	assoc_mgr_lock(&locks);
485 
486 	stats->tres_usage_in_ave = assoc_mgr_make_tres_str_from_array(
487 		jobacct->tres_usage_in_tot, flags, true);
488 	stats->tres_usage_in_tot = xstrdup(stats->tres_usage_in_ave);
489 	stats->tres_usage_in_max = assoc_mgr_make_tres_str_from_array(
490 		jobacct->tres_usage_in_max, flags, true);
491 	stats->tres_usage_in_max_nodeid = assoc_mgr_make_tres_str_from_array(
492 		jobacct->tres_usage_in_max_nodeid, flags, true);
493 	stats->tres_usage_in_max_taskid = assoc_mgr_make_tres_str_from_array(
494 		jobacct->tres_usage_in_max_taskid, flags, true);
495 	stats->tres_usage_in_min = assoc_mgr_make_tres_str_from_array(
496 		jobacct->tres_usage_in_min, flags, true);
497 	stats->tres_usage_in_min_nodeid = assoc_mgr_make_tres_str_from_array(
498 		jobacct->tres_usage_in_min_nodeid, flags, true);
499 	stats->tres_usage_in_min_taskid = assoc_mgr_make_tres_str_from_array(
500 		jobacct->tres_usage_in_min_taskid, flags, true);
501 	stats->tres_usage_out_ave = assoc_mgr_make_tres_str_from_array(
502 		jobacct->tres_usage_out_tot, flags, true);
503 	stats->tres_usage_out_tot = xstrdup(stats->tres_usage_out_ave);
504 	stats->tres_usage_out_max = assoc_mgr_make_tres_str_from_array(
505 		jobacct->tres_usage_out_max, flags, true);
506 	stats->tres_usage_out_max_taskid = assoc_mgr_make_tres_str_from_array(
507 		jobacct->tres_usage_out_max_taskid, flags, true);
508 	stats->tres_usage_out_max_nodeid = assoc_mgr_make_tres_str_from_array(
509 		jobacct->tres_usage_out_max_nodeid, flags, true);
510 	stats->tres_usage_out_min = assoc_mgr_make_tres_str_from_array(
511 		jobacct->tres_usage_out_min, flags, true);
512 	stats->tres_usage_out_min_nodeid = assoc_mgr_make_tres_str_from_array(
513 		jobacct->tres_usage_out_min_nodeid, flags, true);
514 	stats->tres_usage_out_min_taskid = assoc_mgr_make_tres_str_from_array(
515 		jobacct->tres_usage_out_min_taskid, flags, true);
516 	assoc_mgr_unlock(&locks);
517 }
518 
jobacct_gather_init(void)519 extern int jobacct_gather_init(void)
520 {
521 	char    *plugin_type = "jobacct_gather";
522 	char	*type = NULL;
523 	int	retval=SLURM_SUCCESS;
524 
525 	if (slurmdbd_conf || (_init_run_test() && g_context))
526 		return retval;
527 
528 	slurm_mutex_lock(&g_context_lock);
529 	if (g_context)
530 		goto done;
531 
532 	type = slurm_get_jobacct_gather_type();
533 
534 	g_context = plugin_context_create(
535 		plugin_type, type, (void **)&ops, syms, sizeof(syms));
536 
537 	if (!g_context) {
538 		error("cannot create %s context for %s", plugin_type, type);
539 		retval = SLURM_ERROR;
540 		goto done;
541 	}
542 
543 	if (!xstrcasecmp(type, "jobacct_gather/none")) {
544 		plugin_polling = false;
545 		goto done;
546 	}
547 
548 	slurm_mutex_lock(&init_run_mutex);
549 	init_run = true;
550 	slurm_mutex_unlock(&init_run_mutex);
551 
552 	/* only print the WARNING messages if in the slurmctld */
553 	if (!running_in_slurmctld())
554 		goto done;
555 
556 	plugin_type = type;
557 	type = slurm_get_proctrack_type();
558 	if (!xstrcasecmp(type, "proctrack/pgid")) {
559 		info("WARNING: We will use a much slower algorithm with "
560 		     "proctrack/pgid, use Proctracktype=proctrack/linuxproc "
561 		     "or some other proctrack when using %s",
562 		     plugin_type);
563 		pgid_plugin = true;
564 	}
565 	xfree(type);
566 	xfree(plugin_type);
567 
568 	type = slurm_get_accounting_storage_type();
569 	if (!xstrcasecmp(type, ACCOUNTING_STORAGE_TYPE_NONE)) {
570 		error("WARNING: Even though we are collecting accounting "
571 		      "information you have asked for it not to be stored "
572 		      "(%s) if this is not what you have in mind you will "
573 		      "need to change it.", ACCOUNTING_STORAGE_TYPE_NONE);
574 	}
575 
576 done:
577 	slurm_mutex_unlock(&g_context_lock);
578 	xfree(type);
579 
580 	return(retval);
581 }
582 
jobacct_gather_fini(void)583 extern int jobacct_gather_fini(void)
584 {
585 	int rc = SLURM_SUCCESS;
586 
587 	slurm_mutex_lock(&g_context_lock);
588 	if (g_context) {
589 		slurm_mutex_lock(&init_run_mutex);
590 		init_run = false;
591 		slurm_mutex_unlock(&init_run_mutex);
592 
593 		if (watch_tasks_thread_id) {
594 			slurm_mutex_unlock(&g_context_lock);
595 			slurm_mutex_lock(&profile_timer->notify_mutex);
596 			slurm_cond_signal(&profile_timer->notify);
597 			slurm_mutex_unlock(&profile_timer->notify_mutex);
598 			pthread_join(watch_tasks_thread_id, NULL);
599 			slurm_mutex_lock(&g_context_lock);
600 		}
601 
602 		rc = plugin_context_destroy(g_context);
603 		g_context = NULL;
604 	}
605 	slurm_mutex_unlock(&g_context_lock);
606 
607 	return rc;
608 }
609 
jobacct_gather_startpoll(uint16_t frequency)610 extern int jobacct_gather_startpoll(uint16_t frequency)
611 {
612 	int retval = SLURM_SUCCESS;
613 
614 	if (!plugin_polling)
615 		return SLURM_SUCCESS;
616 
617 	if (jobacct_gather_init() < 0)
618 		return SLURM_ERROR;
619 
620 	if (!_jobacct_shutdown_test()) {
621 		error("jobacct_gather_startpoll: poll already started!");
622 		return retval;
623 	}
624 	slurm_mutex_lock(&jobacct_shutdown_mutex);
625 	jobacct_shutdown = false;
626 	slurm_mutex_unlock(&jobacct_shutdown_mutex);
627 
628 	freq = frequency;
629 
630 	task_list = list_create(jobacctinfo_destroy);
631 	if (frequency == 0) {	/* don't want dynamic monitoring? */
632 		debug2("jobacct_gather dynamic logging disabled");
633 		return retval;
634 	}
635 
636 	/* create polling thread */
637 	slurm_thread_create(&watch_tasks_thread_id, _watch_tasks, NULL);
638 
639 	debug3("jobacct_gather dynamic logging enabled");
640 
641 	return retval;
642 }
643 
jobacct_gather_endpoll(void)644 extern int jobacct_gather_endpoll(void)
645 {
646 	int retval = SLURM_SUCCESS;
647 
648 	if (jobacct_gather_init() < 0)
649 		return SLURM_ERROR;
650 
651 	slurm_mutex_lock(&jobacct_shutdown_mutex);
652 	jobacct_shutdown = true;
653 	slurm_mutex_unlock(&jobacct_shutdown_mutex);
654 	slurm_mutex_lock(&task_list_lock);
655 	FREE_NULL_LIST(task_list);
656 
657 	retval = (*(ops.endpoll))();
658 
659 	slurm_mutex_unlock(&task_list_lock);
660 
661 	return retval;
662 }
663 
jobacct_gather_add_task(pid_t pid,jobacct_id_t * jobacct_id,int poll)664 extern int jobacct_gather_add_task(pid_t pid, jobacct_id_t *jobacct_id,
665 				   int poll)
666 {
667 	struct jobacctinfo *jobacct;
668 
669 	if (jobacct_gather_init() < 0)
670 		return SLURM_ERROR;
671 
672 	if (!plugin_polling)
673 		return SLURM_SUCCESS;
674 
675 	if (_jobacct_shutdown_test())
676 		return SLURM_ERROR;
677 
678 	jobacct = jobacctinfo_create(jobacct_id);
679 
680 	slurm_mutex_lock(&task_list_lock);
681 	if (pid <= 0) {
682 		error("invalid pid given (%d) for task acct", pid);
683 		goto error;
684 	} else if (!task_list) {
685 		error("no task list created!");
686 		goto error;
687 	}
688 
689 	jobacct->pid = pid;
690 	memcpy(&jobacct->id, jobacct_id, sizeof(jobacct_id_t));
691 	debug2("adding task %u pid %d on node %u to jobacct",
692 	       jobacct_id->taskid, pid, jobacct_id->nodeid);
693 	(*(ops.add_task))(pid, jobacct_id);
694 	list_push(task_list, jobacct);
695 	slurm_mutex_unlock(&task_list_lock);
696 
697 	if (poll == 1)
698 		_poll_data(1);
699 
700 	return SLURM_SUCCESS;
701 error:
702 	slurm_mutex_unlock(&task_list_lock);
703 	jobacctinfo_destroy(jobacct);
704 	return SLURM_ERROR;
705 }
706 
jobacct_gather_stat_task(pid_t pid)707 extern jobacctinfo_t *jobacct_gather_stat_task(pid_t pid)
708 {
709 	if (!plugin_polling || _jobacct_shutdown_test())
710 		return NULL;
711 
712 	_poll_data(0);
713 
714 	if (pid) {
715 		struct jobacctinfo *jobacct = NULL;
716 		struct jobacctinfo *ret_jobacct = NULL;
717 		ListIterator itr = NULL;
718 
719 		slurm_mutex_lock(&task_list_lock);
720 		if (!task_list) {
721 			error("no task list created!");
722 			goto error;
723 		}
724 
725 		itr = list_iterator_create(task_list);
726 		while ((jobacct = list_next(itr))) {
727 			if (jobacct->pid == pid)
728 				break;
729 		}
730 		list_iterator_destroy(itr);
731 		if (jobacct == NULL)
732 			goto error;
733 
734 		_copy_tres_usage(&ret_jobacct, jobacct);
735 
736 	error:
737 		slurm_mutex_unlock(&task_list_lock);
738 		return ret_jobacct;
739 	}
740 
741 	return NULL;
742 }
743 
jobacct_gather_remove_task(pid_t pid)744 extern jobacctinfo_t *jobacct_gather_remove_task(pid_t pid)
745 {
746 	struct jobacctinfo *jobacct = NULL;
747 	ListIterator itr = NULL;
748 
749 	if (!plugin_polling)
750 		return NULL;
751 
752 	/* poll data one last time before removing task
753 	 * mainly for updating energy consumption */
754 	_poll_data(1);
755 
756 	if (_jobacct_shutdown_test())
757 		return NULL;
758 
759 	slurm_mutex_lock(&task_list_lock);
760 	if (!task_list) {
761 		error("no task list created!");
762 		goto error;
763 	}
764 
765 	itr = list_iterator_create(task_list);
766 	while((jobacct = list_next(itr))) {
767 		if (jobacct->pid == pid) {
768 			list_remove(itr);
769 			break;
770 		}
771 	}
772 	list_iterator_destroy(itr);
773 	if (jobacct) {
774 		debug2("removing task %u pid %d from jobacct",
775 		       jobacct->id.taskid, jobacct->pid);
776 	} else {
777 		debug2("pid(%d) not being watched in jobacct!", pid);
778 	}
779 error:
780 	slurm_mutex_unlock(&task_list_lock);
781 	return jobacct;
782 }
783 
jobacct_gather_set_proctrack_container_id(uint64_t id)784 extern int jobacct_gather_set_proctrack_container_id(uint64_t id)
785 {
786 	if (!plugin_polling || pgid_plugin)
787 		return SLURM_SUCCESS;
788 
789 	if (cont_id != NO_VAL64)
790 		info("Warning: jobacct: set_proctrack_container_id: cont_id "
791 		     "is already set to %"PRIu64" you are setting it to "
792 		     "%"PRIu64"", cont_id, id);
793 	if (id <= 0) {
794 		error("jobacct: set_proctrack_container_id: "
795 		      "I was given most likely an unset cont_id %"PRIu64"",
796 		      id);
797 		return SLURM_ERROR;
798 	}
799 	cont_id = id;
800 
801 	return SLURM_SUCCESS;
802 }
803 
jobacct_gather_set_mem_limit(uint32_t job_id,uint32_t step_id,uint64_t mem_limit)804 extern int jobacct_gather_set_mem_limit(uint32_t job_id,
805 					uint32_t step_id,
806 					uint64_t mem_limit)
807 {
808 	if (!plugin_polling)
809 		return SLURM_SUCCESS;
810 
811 	if ((job_id == 0) || (mem_limit == 0)) {
812 		error("jobacct_gather_set_mem_limit: jobid:%u "
813 		      "mem_limit:%"PRIu64"", job_id, mem_limit);
814 		return SLURM_ERROR;
815 	}
816 
817 	jobacct_job_id      = job_id;
818 	jobacct_step_id     = step_id;
819 	jobacct_mem_limit   = mem_limit * 1048576; /* MB to B */
820 	jobacct_vmem_limit  = jobacct_mem_limit;
821 	jobacct_vmem_limit *= (slurm_get_vsize_factor() / 100.0);
822 	return SLURM_SUCCESS;
823 }
824 
jobacct_gather_handle_mem_limit(uint64_t total_job_mem,uint64_t total_job_vsize)825 extern void jobacct_gather_handle_mem_limit(uint64_t total_job_mem,
826 					    uint64_t total_job_vsize)
827 {
828 	if (!plugin_polling)
829 		return;
830 
831 	if (jobacct_mem_limit) {
832 		if (jobacct_step_id == NO_VAL) {
833 			debug("Job %u memory used:%"PRIu64" limit:%"PRIu64" B",
834 			      jobacct_job_id, total_job_mem, jobacct_mem_limit);
835 		} else {
836 			debug("Step %u.%u memory used:%"PRIu64" "
837 			      "limit:%"PRIu64" B",
838 			      jobacct_job_id, jobacct_step_id,
839 			      total_job_mem, jobacct_mem_limit);
840 		}
841 	}
842 	if (jobacct_job_id && jobacct_mem_limit &&
843 	    (total_job_mem > jobacct_mem_limit)) {
844 		if (jobacct_step_id == NO_VAL) {
845 			error("Job %u exceeded memory limit "
846 			      "(%"PRIu64" > %"PRIu64"), being "
847 			      "killed", jobacct_job_id, total_job_mem,
848 			      jobacct_mem_limit);
849 		} else {
850 			error("Step %u.%u exceeded memory limit "
851 			      "(%"PRIu64" > %"PRIu64"), "
852 			      "being killed", jobacct_job_id, jobacct_step_id,
853 			      total_job_mem, jobacct_mem_limit);
854 		}
855 		_acct_kill_step();
856 	} else if (jobacct_job_id && jobacct_vmem_limit &&
857 		   (total_job_vsize > jobacct_vmem_limit)) {
858 		if (jobacct_step_id == NO_VAL) {
859 			error("Job %u exceeded virtual memory limit "
860 			      "(%"PRIu64" > %"PRIu64"), being killed",
861 			      jobacct_job_id,
862 			      total_job_vsize, jobacct_vmem_limit);
863 		} else {
864 			error("Step %u.%u exceeded virtual memory limit "
865 			      "(%"PRIu64" > %"PRIu64"), being killed",
866 			      jobacct_job_id,
867 			      jobacct_step_id, total_job_vsize,
868 			      jobacct_vmem_limit);
869 		}
870 		_acct_kill_step();
871 	}
872 }
873 
874 /********************* jobacctinfo functions ******************************/
875 
jobacctinfo_create(jobacct_id_t * jobacct_id)876 extern jobacctinfo_t *jobacctinfo_create(jobacct_id_t *jobacct_id)
877 {
878 	struct jobacctinfo *jobacct;
879 	jobacct_id_t temp_id;
880 
881 	if (!plugin_polling)
882 		return NULL;
883 
884 	jobacct = xmalloc(sizeof(struct jobacctinfo));
885 
886 	if (!jobacct_id) {
887 		temp_id.taskid = NO_VAL;
888 		temp_id.nodeid = NO_VAL;
889 		jobacct_id = &temp_id;
890 	}
891 
892 	jobacct->dataset_id = -1;
893 	jobacct->sys_cpu_sec = 0;
894 	jobacct->sys_cpu_usec = 0;
895 	jobacct->user_cpu_sec = 0;
896 	jobacct->user_cpu_usec = 0;
897 
898 	_jobacctinfo_create_tres_usage(jobacct_id, jobacct);
899 	return jobacct;
900 }
901 
jobacctinfo_destroy(void * object)902 extern void jobacctinfo_destroy(void *object)
903 {
904 	struct jobacctinfo *jobacct = (struct jobacctinfo *)object;
905 
906 	_free_tres_usage(jobacct);
907 	xfree(jobacct);
908 }
909 
jobacctinfo_setinfo(jobacctinfo_t * jobacct,enum jobacct_data_type type,void * data,uint16_t protocol_version)910 extern int jobacctinfo_setinfo(jobacctinfo_t *jobacct,
911 			       enum jobacct_data_type type, void *data,
912 			       uint16_t protocol_version)
913 {
914 	int rc = SLURM_SUCCESS;
915 	int *fd = (int *)data;
916 	struct rusage *rusage = (struct rusage *)data;
917 	uint64_t *uint64 = (uint64_t *) data;
918 	struct jobacctinfo *send = (struct jobacctinfo *) data;
919 	Buf buffer = NULL;
920 
921 	if (!plugin_polling)
922 		return SLURM_SUCCESS;
923 
924 	switch (type) {
925 	case JOBACCT_DATA_TOTAL:
926 		if (!jobacct) {
927 			/* Avoid possible memory leak from _copy_tres_usage() */
928 			error("%s: \'jobacct\' argument is NULL", __func__);
929 			rc = SLURM_ERROR;
930 		} else
931 			_copy_tres_usage(&jobacct, send);
932 		break;
933 	case JOBACCT_DATA_PIPE:
934 		if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
935 			int len;
936 			assoc_mgr_lock_t locks = { .tres = READ_LOCK };
937 
938 			buffer = init_buf(0);
939 
940 			if (jobacct) {
941 				assoc_mgr_lock(&locks);
942 				jobacct->tres_list = assoc_mgr_tres_list;
943 			}
944 
945 			jobacctinfo_pack(jobacct, protocol_version,
946 					 PROTOCOL_TYPE_SLURM, buffer);
947 
948 			if (jobacct) {
949 				assoc_mgr_unlock(&locks);
950 				jobacct->tres_list = NULL;
951 			}
952 
953 			len = get_buf_offset(buffer);
954 			safe_write(*fd, &len, sizeof(int));
955 			safe_write(*fd, get_buf_data(buffer), len);
956 			FREE_NULL_BUFFER(buffer);
957 		}
958 
959 		break;
960 	case JOBACCT_DATA_RUSAGE:
961 		if (rusage->ru_utime.tv_sec > jobacct->user_cpu_sec)
962 			jobacct->user_cpu_sec = rusage->ru_utime.tv_sec;
963 		jobacct->user_cpu_usec = rusage->ru_utime.tv_usec;
964 		if (rusage->ru_stime.tv_sec > jobacct->sys_cpu_sec)
965 			jobacct->sys_cpu_sec = rusage->ru_stime.tv_sec;
966 		jobacct->sys_cpu_usec = rusage->ru_stime.tv_usec;
967 		break;
968 	case JOBACCT_DATA_TOT_RSS:
969 		jobacct->tres_usage_in_tot[TRES_ARRAY_MEM] = *uint64;
970 		break;
971 	case JOBACCT_DATA_TOT_VSIZE:
972 		jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM] = *uint64;
973 		break;
974 	default:
975 		debug("%s: data_type %d invalid", __func__, type);
976 	}
977 
978 	return rc;
979 
980 rwfail:
981 	FREE_NULL_BUFFER(buffer);
982 	return SLURM_ERROR;
983 }
984 
jobacctinfo_getinfo(jobacctinfo_t * jobacct,enum jobacct_data_type type,void * data,uint16_t protocol_version)985 extern int jobacctinfo_getinfo(
986 	jobacctinfo_t *jobacct, enum jobacct_data_type type, void *data,
987 	uint16_t protocol_version)
988 {
989 	int rc = SLURM_SUCCESS;
990 	int *fd = (int *)data;
991 	uint64_t *uint64 = (uint64_t *) data;
992 	struct rusage *rusage = (struct rusage *)data;
993 	struct jobacctinfo *send = (struct jobacctinfo *) data;
994 	char *buf = NULL;
995 
996 	if (!plugin_polling)
997 		return SLURM_SUCCESS;
998 
999 	/* jobacct needs to be allocated before this is called.	*/
1000 	xassert(jobacct);
1001 
1002 	switch (type) {
1003 	case JOBACCT_DATA_TOTAL:
1004 		if (!send) {
1005 			/* Avoid possible memory leak from _copy_tres_usage() */
1006 			error("%s: \'data\' argument is NULL", __func__);
1007 			rc = SLURM_ERROR;
1008 		} else
1009 			_copy_tres_usage(&send, jobacct);
1010 		break;
1011 	case JOBACCT_DATA_PIPE:
1012 		if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
1013 			int len;
1014 			Buf buffer;
1015 
1016 			safe_read(*fd, &len, sizeof(int));
1017 			buf = xmalloc(len);
1018 			safe_read(*fd, buf, len);
1019 			buffer = create_buf(buf, len);
1020 			jobacctinfo_unpack(&jobacct, protocol_version,
1021 					   PROTOCOL_TYPE_SLURM, buffer, 0);
1022 			free_buf(buffer);
1023 		}
1024 
1025 		break;
1026 	case JOBACCT_DATA_RUSAGE:
1027 		memset(rusage, 0, sizeof(struct rusage));
1028 		rusage->ru_utime.tv_sec = jobacct->user_cpu_sec;
1029 		rusage->ru_utime.tv_usec = jobacct->user_cpu_usec;
1030 		rusage->ru_stime.tv_sec = jobacct->sys_cpu_sec;
1031 		rusage->ru_stime.tv_usec = jobacct->sys_cpu_usec;
1032 		break;
1033 	case JOBACCT_DATA_TOT_RSS:
1034 		*uint64 = jobacct->tres_usage_in_tot[TRES_ARRAY_MEM];
1035 		break;
1036 	case JOBACCT_DATA_TOT_VSIZE:
1037 		*uint64 = jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM];
1038 		break;
1039 	default:
1040 		debug("%s: data_type %d invalid", __func__, type);
1041 	}
1042 	return rc;
1043 
1044 rwfail:
1045 	xfree(buf);
1046 	return SLURM_ERROR;
1047 }
1048 
jobacctinfo_pack(jobacctinfo_t * jobacct,uint16_t rpc_version,uint16_t protocol_type,Buf buffer)1049 extern void jobacctinfo_pack(jobacctinfo_t *jobacct,
1050 			     uint16_t rpc_version, uint16_t protocol_type,
1051 			     Buf buffer)
1052 {
1053 	bool no_pack;
1054 
1055 	no_pack = (!plugin_polling && (protocol_type != PROTOCOL_TYPE_DBD));
1056 
1057 	if (!jobacct || no_pack) {
1058 		pack8((uint8_t) 0, buffer);
1059 		return;
1060 	}
1061 
1062 	pack8((uint8_t) 1, buffer);
1063 
1064 	if (rpc_version >= SLURM_MIN_PROTOCOL_VERSION) {
1065 		pack32((uint32_t)jobacct->user_cpu_sec, buffer);
1066 		pack32((uint32_t)jobacct->user_cpu_usec, buffer);
1067 		pack32((uint32_t)jobacct->sys_cpu_sec, buffer);
1068 		pack32((uint32_t)jobacct->sys_cpu_usec, buffer);
1069 		pack32((uint32_t)jobacct->act_cpufreq, buffer);
1070 		pack64((uint64_t)jobacct->energy.consumed_energy, buffer);
1071 
1072 		pack32_array(jobacct->tres_ids, jobacct->tres_count, buffer);
1073 
1074 		slurm_pack_list(jobacct->tres_list,
1075 				slurmdb_pack_tres_rec, buffer,
1076 				SLURM_PROTOCOL_VERSION);
1077 
1078 		pack64_array(jobacct->tres_usage_in_max,
1079 			     jobacct->tres_count, buffer);
1080 		pack64_array(jobacct->tres_usage_in_max_nodeid,
1081 			     jobacct->tres_count, buffer);
1082 		pack64_array(jobacct->tres_usage_in_max_taskid,
1083 			     jobacct->tres_count, buffer);
1084 		pack64_array(jobacct->tres_usage_in_min,
1085 			     jobacct->tres_count, buffer);
1086 		pack64_array(jobacct->tres_usage_in_min_nodeid,
1087 			     jobacct->tres_count, buffer);
1088 		pack64_array(jobacct->tres_usage_in_min_taskid,
1089 			     jobacct->tres_count, buffer);
1090 		pack64_array(jobacct->tres_usage_in_tot,
1091 			     jobacct->tres_count, buffer);
1092 		pack64_array(jobacct->tres_usage_out_max,
1093 			     jobacct->tres_count, buffer);
1094 		pack64_array(jobacct->tres_usage_out_max_nodeid,
1095 			     jobacct->tres_count, buffer);
1096 		pack64_array(jobacct->tres_usage_out_max_taskid,
1097 			     jobacct->tres_count, buffer);
1098 		pack64_array(jobacct->tres_usage_out_min,
1099 			     jobacct->tres_count, buffer);
1100 		pack64_array(jobacct->tres_usage_out_min_nodeid,
1101 			     jobacct->tres_count, buffer);
1102 		pack64_array(jobacct->tres_usage_out_min_taskid,
1103 			     jobacct->tres_count, buffer);
1104 		pack64_array(jobacct->tres_usage_out_tot,
1105 			     jobacct->tres_count, buffer);
1106 	} else {
1107 		info("jobacctinfo_pack version %u not supported", rpc_version);
1108 		return;
1109 	}
1110 }
1111 
jobacctinfo_unpack(jobacctinfo_t ** jobacct,uint16_t rpc_version,uint16_t protocol_type,Buf buffer,bool alloc)1112 extern int jobacctinfo_unpack(jobacctinfo_t **jobacct,
1113 			      uint16_t rpc_version, uint16_t protocol_type,
1114 			      Buf buffer, bool alloc)
1115 {
1116 	uint32_t uint32_tmp;
1117 	uint8_t  uint8_tmp;
1118 
1119 	if (jobacct_gather_init() < 0)
1120 		return SLURM_ERROR;
1121 
1122 	safe_unpack8(&uint8_tmp, buffer);
1123 	if (uint8_tmp == (uint8_t) 0)
1124 		return SLURM_SUCCESS;
1125 
1126 	xassert(jobacct);
1127 
1128 	if (alloc)
1129 		*jobacct = xmalloc(sizeof(struct jobacctinfo));
1130 	else {
1131 		xassert(*jobacct);
1132 		_free_tres_usage(*jobacct);
1133 	}
1134 
1135 	if (rpc_version >= SLURM_MIN_PROTOCOL_VERSION) {
1136 		safe_unpack32(&uint32_tmp, buffer);
1137 		(*jobacct)->user_cpu_sec = uint32_tmp;
1138 		safe_unpack32(&uint32_tmp, buffer);
1139 		(*jobacct)->user_cpu_usec = uint32_tmp;
1140 		safe_unpack32(&uint32_tmp, buffer);
1141 		(*jobacct)->sys_cpu_sec = uint32_tmp;
1142 		safe_unpack32(&uint32_tmp, buffer);
1143 		(*jobacct)->sys_cpu_usec = uint32_tmp;
1144 
1145 		safe_unpack32(&(*jobacct)->act_cpufreq, buffer);
1146 		safe_unpack64(&(*jobacct)->energy.consumed_energy, buffer);
1147 
1148 		safe_unpack32_array(&(*jobacct)->tres_ids,
1149 				    &(*jobacct)->tres_count, buffer);
1150 		slurm_unpack_list(&(*jobacct)->tres_list,
1151 				  slurmdb_unpack_tres_rec,
1152 				  slurmdb_destroy_tres_rec,
1153 				  buffer, rpc_version);
1154 		safe_unpack64_array(&(*jobacct)->tres_usage_in_max,
1155 				    &uint32_tmp, buffer);
1156 		safe_unpack64_array(&(*jobacct)->tres_usage_in_max_nodeid,
1157 				    &uint32_tmp, buffer);
1158 		safe_unpack64_array(&(*jobacct)->tres_usage_in_max_taskid,
1159 				    &uint32_tmp, buffer);
1160 		safe_unpack64_array(&(*jobacct)->tres_usage_in_min,
1161 				    &uint32_tmp, buffer);
1162 		safe_unpack64_array(&(*jobacct)->tres_usage_in_min_nodeid,
1163 				    &uint32_tmp, buffer);
1164 		safe_unpack64_array(&(*jobacct)->tres_usage_in_min_taskid,
1165 				    &uint32_tmp, buffer);
1166 		safe_unpack64_array(&(*jobacct)->tres_usage_in_tot,
1167 				    &uint32_tmp, buffer);
1168 		safe_unpack64_array(&(*jobacct)->tres_usage_out_max,
1169 				    &uint32_tmp, buffer);
1170 		safe_unpack64_array(&(*jobacct)->tres_usage_out_max_nodeid,
1171 				    &uint32_tmp, buffer);
1172 		safe_unpack64_array(&(*jobacct)->tres_usage_out_max_taskid,
1173 				    &uint32_tmp, buffer);
1174 		safe_unpack64_array(&(*jobacct)->tres_usage_out_min,
1175 				    &uint32_tmp, buffer);
1176 		safe_unpack64_array(&(*jobacct)->tres_usage_out_min_nodeid,
1177 				    &uint32_tmp, buffer);
1178 		safe_unpack64_array(&(*jobacct)->tres_usage_out_min_taskid,
1179 				    &uint32_tmp, buffer);
1180 		safe_unpack64_array(&(*jobacct)->tres_usage_out_tot,
1181 				    &uint32_tmp, buffer);
1182 	} else {
1183 		info("jobacctinfo_unpack version %u not supported",
1184 		     rpc_version);
1185 		return SLURM_ERROR;
1186 	}
1187 
1188 	return SLURM_SUCCESS;
1189 
1190 unpack_error:
1191 	debug2("jobacctinfo_unpack: unpack_error: size_buf(buffer) %u",
1192 	       size_buf(buffer));
1193 	if (alloc)
1194 		jobacctinfo_destroy(*jobacct);
1195 
1196        	return SLURM_ERROR;
1197 }
1198 
jobacctinfo_aggregate(jobacctinfo_t * dest,jobacctinfo_t * from)1199 extern void jobacctinfo_aggregate(jobacctinfo_t *dest, jobacctinfo_t *from)
1200 {
1201 	if (!plugin_polling)
1202 		return;
1203 
1204 	xassert(dest);
1205 
1206 	if (!from)
1207 		return;
1208 
1209 	dest->user_cpu_sec	+= from->user_cpu_sec;
1210 	dest->user_cpu_usec	+= from->user_cpu_usec;
1211 	while (dest->user_cpu_usec >= 1E6) {
1212 		dest->user_cpu_sec++;
1213 		dest->user_cpu_usec -= 1E6;
1214 	}
1215 	dest->sys_cpu_sec	+= from->sys_cpu_sec;
1216 	dest->sys_cpu_usec	+= from->sys_cpu_usec;
1217 	while (dest->sys_cpu_usec >= 1E6) {
1218 		dest->sys_cpu_sec++;
1219 		dest->sys_cpu_usec -= 1E6;
1220 	}
1221 	dest->act_cpufreq 	+= from->act_cpufreq;
1222 	if (dest->energy.consumed_energy != NO_VAL64) {
1223 		if (from->energy.consumed_energy == NO_VAL64)
1224 			dest->energy.consumed_energy = NO_VAL64;
1225 		else
1226 			dest->energy.consumed_energy +=
1227 					from->energy.consumed_energy;
1228 	}
1229 
1230 	_jobacctinfo_aggregate_tres_usage(dest, from);
1231 }
1232 
jobacctinfo_2_stats(slurmdb_stats_t * stats,jobacctinfo_t * jobacct)1233 extern void jobacctinfo_2_stats(slurmdb_stats_t *stats, jobacctinfo_t *jobacct)
1234 {
1235 	xassert(jobacct);
1236 	xassert(stats);
1237 
1238 	stats->act_cpufreq = (double)jobacct->act_cpufreq;
1239 
1240 	if (jobacct->energy.consumed_energy == NO_VAL64)
1241 		stats->consumed_energy = NO_VAL64;
1242 	else
1243 		stats->consumed_energy =
1244 			(double)jobacct->energy.consumed_energy;
1245 
1246 	_jobacctinfo_2_stats_tres_usage(stats, jobacct);
1247 }
1248