1 /*****************************************************************************\
2 * slurm_jobacct_gather.c - implementation-independent job accounting logging
3 * functions
4 *****************************************************************************
5 * Copyright (C) 2003-2007/ The Regents of the University of California.
6 * Produced at Lawrence Livermore National Laboratory (cf, DISCLAIMER).
7 * Written by Jay Windley <jwindley@lnxi.com>, Morris Jette <jette1@llnl.com>
8 * CODE-OCEC-09-009. All rights reserved.
9 *
10 * Copyright (C) 2005 Hewlett-Packard Development Company, L.P.
11 *
12 * This file is part of Slurm, a resource management program.
13 * For details, see <https://slurm.schedmd.com/>.
14 * Please also read the included file: DISCLAIMER.
15 *
16 * Slurm is free software; you can redistribute it and/or modify it under
17 * the terms of the GNU General Public License as published by the Free
18 * Software Foundation; either version 2 of the License, or (at your option)
19 * any later version.
20 *
21 * In addition, as a special exception, the copyright holders give permission
22 * to link the code of portions of this program with the OpenSSL library under
23 * certain conditions as described in each individual source file, and
24 * distribute linked combinations including the two. You must obey the GNU
25 * General Public License in all respects for all of the code used other than
26 * OpenSSL. If you modify file(s) with this exception, you may extend this
27 * exception to your version of the file(s), but you are not obligated to do
28 * so. If you do not wish to do so, delete this exception statement from your
29 * version. If you delete this exception statement from all source files in
30 * the program, then also delete it here.
31 *
32 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
33 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
34 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
35 * details.
36 *
37 * You should have received a copy of the GNU General Public License along
38 * with Slurm; if not, write to the Free Software Foundation, Inc.,
39 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
40 \*****************************************************************************/
41
42 /*****************************************************************************\
43 * Modification history
44 *
45 * 19 Jan 2005 by Andy Riebs <andy.riebs@hp.com>
46 * This file is derived from the file slurm_jobcomp.c, written by
47 * Morris Jette, et al.
48 \*****************************************************************************/
49
50 #ifdef HAVE_CONFIG_H
51 # include "config.h"
52 #endif
53
54 #include <pthread.h>
55 #include <signal.h>
56 #include <stdlib.h>
57 #include <string.h>
58
59 #if HAVE_SYS_PRCTL_H
60 # include <sys/prctl.h>
61 #endif
62
63 #include "src/common/assoc_mgr.h"
64 #include "src/common/macros.h"
65 #include "src/common/pack.h"
66 #include "src/common/plugin.h"
67 #include "src/common/plugrack.h"
68 #include "src/common/read_config.h"
69 #include "src/common/slurm_acct_gather_profile.h"
70 #include "src/common/slurm_jobacct_gather.h"
71 #include "src/common/slurmdbd_defs.h"
72 #include "src/common/xmalloc.h"
73 #include "src/common/xstring.h"
74 #include "src/slurmd/slurmstepd/slurmstepd_job.h"
75 #include "src/slurmdbd/read_config.h"
76
77 #define KB_ADJ 1024
78 #define MB_ADJ 1048576
79
80 /*
81 ** Define slurm-specific aliases for use by plugins, see slurm_xlator.h
82 ** for details.
83 */
84 strong_alias(jobacctinfo_pack, slurm_jobacctinfo_pack);
85 strong_alias(jobacctinfo_unpack, slurm_jobacctinfo_unpack);
86 strong_alias(jobacctinfo_create, slurm_jobacctinfo_create);
87 strong_alias(jobacctinfo_destroy, slurm_jobacctinfo_destroy);
88
89 typedef struct slurm_jobacct_gather_ops {
90 void (*poll_data) (List task_list, bool pgid_plugin, uint64_t cont_id,
91 bool profile);
92 int (*endpoll) ();
93 int (*add_task) (pid_t pid, jobacct_id_t *jobacct_id);
94 } slurm_jobacct_gather_ops_t;
95
96 /*
97 * These strings must be in the same order as the fields declared
98 * for slurm_jobacct_gather_ops_t.
99 */
100 static const char *syms[] = {
101 "jobacct_gather_p_poll_data",
102 "jobacct_gather_p_endpoll",
103 "jobacct_gather_p_add_task",
104 };
105
106 static slurm_jobacct_gather_ops_t ops;
107 static plugin_context_t *g_context = NULL;
108 static pthread_mutex_t g_context_lock = PTHREAD_MUTEX_INITIALIZER;
109 static bool init_run = false;
110 static pthread_mutex_t init_run_mutex = PTHREAD_MUTEX_INITIALIZER;
111 static pthread_t watch_tasks_thread_id = 0;
112
113 static int freq = 0;
114 static bool pgid_plugin = false;
115 static List task_list = NULL;
116 static uint64_t cont_id = NO_VAL64;
117 static pthread_mutex_t task_list_lock = PTHREAD_MUTEX_INITIALIZER;
118
119 static bool jobacct_shutdown = true;
120 static pthread_mutex_t jobacct_shutdown_mutex = PTHREAD_MUTEX_INITIALIZER;
121 static bool plugin_polling = true;
122
123 static uint32_t jobacct_job_id = 0;
124 static uint32_t jobacct_step_id = 0;
125 static uint64_t jobacct_mem_limit = 0;
126 static uint64_t jobacct_vmem_limit = 0;
127 static acct_gather_profile_timer_t *profile_timer =
128 &acct_gather_profile_timer[PROFILE_TASK];
129
_init_tres_usage(struct jobacctinfo * jobacct,jobacct_id_t * jobacct_id,uint32_t tres_cnt)130 static void _init_tres_usage(struct jobacctinfo *jobacct,
131 jobacct_id_t *jobacct_id,
132 uint32_t tres_cnt)
133 {
134 int alloc_size, i;
135
136 jobacct->tres_count = tres_cnt;
137
138 jobacct->tres_ids = xcalloc(tres_cnt, sizeof(uint32_t));
139
140 alloc_size = tres_cnt * sizeof(uint64_t);
141
142 jobacct->tres_usage_in_max = xmalloc(alloc_size);
143 jobacct->tres_usage_in_max_nodeid = xmalloc(alloc_size);
144 jobacct->tres_usage_in_max_taskid = xmalloc(alloc_size);
145 jobacct->tres_usage_in_min = xmalloc(alloc_size);
146 jobacct->tres_usage_in_min_nodeid = xmalloc(alloc_size);
147 jobacct->tres_usage_in_min_taskid = xmalloc(alloc_size);
148 jobacct->tres_usage_in_tot = xmalloc(alloc_size);
149 jobacct->tres_usage_out_max = xmalloc(alloc_size);
150 jobacct->tres_usage_out_max_nodeid = xmalloc(alloc_size);
151 jobacct->tres_usage_out_max_taskid = xmalloc(alloc_size);
152 jobacct->tres_usage_out_min = xmalloc(alloc_size);
153 jobacct->tres_usage_out_min_nodeid = xmalloc(alloc_size);
154 jobacct->tres_usage_out_min_taskid = xmalloc(alloc_size);
155 jobacct->tres_usage_out_tot = xmalloc(alloc_size);
156
157 for (i = 0; i < jobacct->tres_count; i++) {
158 jobacct->tres_ids[i] =
159 assoc_mgr_tres_array ? assoc_mgr_tres_array[i]->id : i;
160
161 jobacct->tres_usage_in_min[i] = INFINITE64;
162 jobacct->tres_usage_in_max[i] = INFINITE64;
163 jobacct->tres_usage_in_tot[i] = INFINITE64;
164 jobacct->tres_usage_out_max[i] = INFINITE64;
165 jobacct->tres_usage_out_min[i] = INFINITE64;
166 jobacct->tres_usage_out_tot[i] = INFINITE64;
167
168 if (jobacct_id && jobacct_id->taskid != NO_VAL) {
169 jobacct->tres_usage_in_max_taskid[i] =
170 (uint64_t) jobacct_id->taskid;
171 jobacct->tres_usage_in_min_taskid[i] =
172 (uint64_t) jobacct_id->taskid;
173 jobacct->tres_usage_out_max_taskid[i] =
174 (uint64_t) jobacct_id->taskid;
175 jobacct->tres_usage_out_min_taskid[i] =
176 (uint64_t) jobacct_id->taskid;
177 } else {
178 jobacct->tres_usage_in_max_taskid[i] = INFINITE64;
179 jobacct->tres_usage_in_min_taskid[i] = INFINITE64;
180 jobacct->tres_usage_out_max_taskid[i] = INFINITE64;
181 jobacct->tres_usage_out_min_taskid[i] = INFINITE64;
182 }
183
184 if (jobacct_id && jobacct_id->nodeid != NO_VAL) {
185 jobacct->tres_usage_in_max_nodeid[i] =
186 (uint64_t) jobacct_id->nodeid;
187 jobacct->tres_usage_in_min_nodeid[i] =
188 (uint64_t) jobacct_id->nodeid;
189 jobacct->tres_usage_out_max_nodeid[i] =
190 (uint64_t) jobacct_id->nodeid;
191 jobacct->tres_usage_out_min_nodeid[i] =
192 (uint64_t) jobacct_id->nodeid;
193 } else {
194 jobacct->tres_usage_in_max_nodeid[i] = INFINITE64;
195 jobacct->tres_usage_in_min_nodeid[i] = INFINITE64;
196 jobacct->tres_usage_out_max_nodeid[i] = INFINITE64;
197 jobacct->tres_usage_out_min_nodeid[i] = INFINITE64;
198 }
199 }
200 }
201
_free_tres_usage(struct jobacctinfo * jobacct)202 static void _free_tres_usage(struct jobacctinfo *jobacct)
203 {
204
205 if (jobacct) {
206 xfree(jobacct->tres_ids);
207
208 if (jobacct->tres_list &&
209 (jobacct->tres_list != assoc_mgr_tres_list))
210 FREE_NULL_LIST(jobacct->tres_list);
211
212 xfree(jobacct->tres_usage_in_max);
213 xfree(jobacct->tres_usage_in_max_nodeid);
214 xfree(jobacct->tres_usage_in_max_taskid);
215 xfree(jobacct->tres_usage_in_min);
216 xfree(jobacct->tres_usage_in_min_nodeid);
217 xfree(jobacct->tres_usage_in_min_taskid);
218 xfree(jobacct->tres_usage_in_tot);
219 xfree(jobacct->tres_usage_out_max);
220 xfree(jobacct->tres_usage_out_max_nodeid);
221 xfree(jobacct->tres_usage_out_max_taskid);
222 xfree(jobacct->tres_usage_out_min);
223 xfree(jobacct->tres_usage_out_min_nodeid);
224 xfree(jobacct->tres_usage_out_min_taskid);
225 xfree(jobacct->tres_usage_out_tot);
226 }
227 }
228
_copy_tres_usage(jobacctinfo_t ** dest_jobacct,jobacctinfo_t * source_jobacct)229 static void _copy_tres_usage(jobacctinfo_t **dest_jobacct,
230 jobacctinfo_t *source_jobacct)
231 {
232 uint32_t i=0;
233
234 xassert(dest_jobacct);
235
236 if (!*dest_jobacct)
237 *dest_jobacct = xmalloc(sizeof(jobacctinfo_t));
238 else
239 _free_tres_usage(*dest_jobacct);
240
241 memcpy(*dest_jobacct, source_jobacct, sizeof(jobacctinfo_t));
242
243 _init_tres_usage(*dest_jobacct, NULL, source_jobacct->tres_count);
244
245 for (i = 0; i < source_jobacct->tres_count; i++) {
246 (*dest_jobacct)->tres_usage_in_max[i] =
247 source_jobacct->tres_usage_in_max[i];
248 (*dest_jobacct)->tres_usage_in_max_nodeid[i] =
249 source_jobacct->tres_usage_in_max_nodeid[i];
250 (*dest_jobacct)->tres_usage_in_max_taskid[i] =
251 source_jobacct->tres_usage_in_max_taskid[i];
252 (*dest_jobacct)->tres_usage_in_min[i] =
253 source_jobacct->tres_usage_in_min[i];
254 (*dest_jobacct)->tres_usage_in_min_nodeid[i] =
255 source_jobacct->tres_usage_in_min_nodeid[i];
256 (*dest_jobacct)->tres_usage_in_min_taskid[i] =
257 source_jobacct->tres_usage_in_min_taskid[i];
258 (*dest_jobacct)->tres_usage_in_tot[i] =
259 source_jobacct->tres_usage_in_tot[i];
260 (*dest_jobacct)->tres_usage_out_max[i] =
261 source_jobacct->tres_usage_out_max[i];
262 (*dest_jobacct)->tres_usage_out_max_nodeid[i] =
263 source_jobacct->tres_usage_out_max_nodeid[i];
264 (*dest_jobacct)->tres_usage_out_max_taskid[i] =
265 source_jobacct->tres_usage_out_max_taskid[i];
266 (*dest_jobacct)->tres_usage_out_min[i] =
267 source_jobacct->tres_usage_out_min[i];
268 (*dest_jobacct)->tres_usage_out_min_nodeid[i] =
269 source_jobacct->tres_usage_out_min_nodeid[i];
270 (*dest_jobacct)->tres_usage_out_min_taskid[i] =
271 source_jobacct->tres_usage_out_min_taskid[i];
272 (*dest_jobacct)->tres_usage_out_tot[i] =
273 source_jobacct->tres_usage_out_tot[i];
274 }
275
276 return;
277 }
278
279 /* _acct_kill_step() issue RPC to kill a slurm job step */
_acct_kill_step(void)280 static void _acct_kill_step(void)
281 {
282 slurm_msg_t msg;
283 job_step_kill_msg_t req;
284 job_notify_msg_t notify_req;
285
286 slurm_msg_t_init(&msg);
287 notify_req.job_id = jobacct_job_id;
288 notify_req.job_step_id = jobacct_step_id;
289 notify_req.message = "Exceeded job memory limit";
290 msg.msg_type = REQUEST_JOB_NOTIFY;
291 msg.data = ¬ify_req;
292 slurm_send_only_controller_msg(&msg, working_cluster_rec);
293
294 /*
295 * Request message:
296 */
297 memset(&req, 0, sizeof(job_step_kill_msg_t));
298 req.job_id = jobacct_job_id;
299 req.job_step_id = jobacct_step_id;
300 req.signal = SIGKILL;
301 req.flags = 0;
302 msg.msg_type = REQUEST_CANCEL_JOB_STEP;
303 msg.data = &req;
304
305 slurm_send_only_controller_msg(&msg, working_cluster_rec);
306 }
307
_jobacct_shutdown_test(void)308 static bool _jobacct_shutdown_test(void)
309 {
310 bool rc;
311 slurm_mutex_lock(&jobacct_shutdown_mutex);
312 rc = jobacct_shutdown;
313 slurm_mutex_unlock(&jobacct_shutdown_mutex);
314 return rc;
315 }
316
_poll_data(bool profile)317 static void _poll_data(bool profile)
318 {
319 /* Update the data */
320 slurm_mutex_lock(&task_list_lock);
321 if (task_list)
322 (*(ops.poll_data))(task_list, pgid_plugin, cont_id, profile);
323 slurm_mutex_unlock(&task_list_lock);
324 }
325
_init_run_test(void)326 static bool _init_run_test(void)
327 {
328 bool rc;
329 slurm_mutex_lock(&init_run_mutex);
330 rc = init_run;
331 slurm_mutex_unlock(&init_run_mutex);
332 return rc;
333 }
334
335 /* _watch_tasks() -- monitor slurm jobs and track their memory usage
336 */
337
_watch_tasks(void * arg)338 static void *_watch_tasks(void *arg)
339 {
340 #if HAVE_SYS_PRCTL_H
341 if (prctl(PR_SET_NAME, "acctg", NULL, NULL, NULL) < 0) {
342 error("%s: cannot set my name to %s %m", __func__, "acctg");
343 }
344 #endif
345
346 while (_init_run_test() && !_jobacct_shutdown_test() &&
347 acct_gather_profile_test()) {
348 /* Do this until shutdown is requested */
349 slurm_mutex_lock(&profile_timer->notify_mutex);
350 slurm_cond_wait(&profile_timer->notify,
351 &profile_timer->notify_mutex);
352 slurm_mutex_unlock(&profile_timer->notify_mutex);
353
354 /* shutting down, woken by jobacct_gather_fini() */
355 if (!_init_run_test())
356 break;
357
358 slurm_mutex_lock(&g_context_lock);
359 /* The initial poll is done after the last task is added */
360 _poll_data(1);
361 slurm_mutex_unlock(&g_context_lock);
362
363 }
364 return NULL;
365 }
366
_jobacctinfo_create_tres_usage(jobacct_id_t * jobacct_id,struct jobacctinfo * jobacct)367 static void _jobacctinfo_create_tres_usage(jobacct_id_t *jobacct_id,
368 struct jobacctinfo *jobacct)
369 {
370 assoc_mgr_lock_t locks = { .tres = READ_LOCK };
371
372 assoc_mgr_lock(&locks);
373 _init_tres_usage(jobacct, jobacct_id, g_tres_count);
374 assoc_mgr_unlock(&locks);
375 }
376
_jobacctinfo_aggregate_tres_usage(jobacctinfo_t * dest,jobacctinfo_t * from)377 static void _jobacctinfo_aggregate_tres_usage(jobacctinfo_t *dest,
378 jobacctinfo_t *from)
379 {
380 uint32_t i = 0;
381
382 for (i = 0; i < dest->tres_count; i++) {
383 if (from->tres_usage_in_max[i] != INFINITE64) {
384 if ((dest->tres_usage_in_max[i] == INFINITE64) ||
385 (dest->tres_usage_in_max[i] <
386 from->tres_usage_in_max[i])) {
387 dest->tres_usage_in_max[i] =
388 from->tres_usage_in_max[i];
389 /*
390 * At the time of writing Energy was only on a
391 * per node basis.
392 */
393 if (i != TRES_ARRAY_ENERGY)
394 dest->tres_usage_in_max_taskid[i] =
395 from->
396 tres_usage_in_max_taskid[i];
397 dest->tres_usage_in_max_nodeid[i] =
398 from->tres_usage_in_max_nodeid[i];
399 }
400 }
401
402 if (from->tres_usage_in_min[i] != INFINITE64) {
403 if ((dest->tres_usage_in_min[i] == INFINITE64) ||
404 (dest->tres_usage_in_min[i] >
405 from->tres_usage_in_min[i])) {
406 dest->tres_usage_in_min[i] =
407 from->tres_usage_in_min[i];
408 /*
409 * At the time of writing Energy was only on a
410 * per node basis.
411 */
412 if (i != TRES_ARRAY_ENERGY)
413 dest->tres_usage_in_min_taskid[i] =
414 from->
415 tres_usage_in_min_taskid[i];
416 dest->tres_usage_in_min_nodeid[i] =
417 from->tres_usage_in_min_nodeid[i];
418 }
419 }
420
421 if (from->tres_usage_in_tot[i] != INFINITE64) {
422 if (dest->tres_usage_in_tot[i] == INFINITE64)
423 dest->tres_usage_in_tot[i] =
424 from->tres_usage_in_tot[i];
425 else
426 dest->tres_usage_in_tot[i] +=
427 from->tres_usage_in_tot[i];
428 }
429
430 if (from->tres_usage_out_max[i] != INFINITE64) {
431 if ((dest->tres_usage_out_max[i] == INFINITE64) ||
432 (dest->tres_usage_out_max[i] <
433 from->tres_usage_out_max[i])) {
434 dest->tres_usage_out_max[i] =
435 from->tres_usage_out_max[i];
436 /*
437 * At the time of writing Energy was only on a
438 * per node basis.
439 */
440 if (i != TRES_ARRAY_ENERGY)
441 dest->tres_usage_out_max_taskid[i] =
442 from->
443 tres_usage_out_max_taskid[i];
444 dest->tres_usage_out_max_nodeid[i] =
445 from->tres_usage_out_max_nodeid[i];
446 }
447 }
448
449 if (from->tres_usage_out_min[i] != INFINITE64) {
450 if ((dest->tres_usage_out_min[i] == INFINITE64) ||
451 (dest->tres_usage_out_min[i] >
452 from->tres_usage_out_min[i])) {
453 dest->tres_usage_out_min[i] =
454 from->tres_usage_out_min[i];
455 /*
456 * At the time of writing Energy was only on a
457 * per node basis.
458 */
459 if (i != TRES_ARRAY_ENERGY)
460 dest->tres_usage_out_min_taskid[i] =
461 from->
462 tres_usage_out_min_taskid[i];
463 dest->tres_usage_out_min_nodeid[i] =
464 from->tres_usage_out_min_nodeid[i];
465 }
466 }
467
468 if (from->tres_usage_out_tot[i] != INFINITE64) {
469 if (dest->tres_usage_out_tot[i] == INFINITE64)
470 dest->tres_usage_out_tot[i] =
471 from->tres_usage_out_tot[i];
472 else
473 dest->tres_usage_out_tot[i] +=
474 from->tres_usage_out_tot[i];
475 }
476 }
477 }
478
_jobacctinfo_2_stats_tres_usage(slurmdb_stats_t * stats,jobacctinfo_t * jobacct)479 static void _jobacctinfo_2_stats_tres_usage(slurmdb_stats_t *stats,
480 jobacctinfo_t *jobacct)
481 {
482 assoc_mgr_lock_t locks = { .tres = READ_LOCK };
483 uint32_t flags = TRES_STR_FLAG_ALLOW_REAL | TRES_STR_FLAG_SIMPLE;
484 assoc_mgr_lock(&locks);
485
486 stats->tres_usage_in_ave = assoc_mgr_make_tres_str_from_array(
487 jobacct->tres_usage_in_tot, flags, true);
488 stats->tres_usage_in_tot = xstrdup(stats->tres_usage_in_ave);
489 stats->tres_usage_in_max = assoc_mgr_make_tres_str_from_array(
490 jobacct->tres_usage_in_max, flags, true);
491 stats->tres_usage_in_max_nodeid = assoc_mgr_make_tres_str_from_array(
492 jobacct->tres_usage_in_max_nodeid, flags, true);
493 stats->tres_usage_in_max_taskid = assoc_mgr_make_tres_str_from_array(
494 jobacct->tres_usage_in_max_taskid, flags, true);
495 stats->tres_usage_in_min = assoc_mgr_make_tres_str_from_array(
496 jobacct->tres_usage_in_min, flags, true);
497 stats->tres_usage_in_min_nodeid = assoc_mgr_make_tres_str_from_array(
498 jobacct->tres_usage_in_min_nodeid, flags, true);
499 stats->tres_usage_in_min_taskid = assoc_mgr_make_tres_str_from_array(
500 jobacct->tres_usage_in_min_taskid, flags, true);
501 stats->tres_usage_out_ave = assoc_mgr_make_tres_str_from_array(
502 jobacct->tres_usage_out_tot, flags, true);
503 stats->tres_usage_out_tot = xstrdup(stats->tres_usage_out_ave);
504 stats->tres_usage_out_max = assoc_mgr_make_tres_str_from_array(
505 jobacct->tres_usage_out_max, flags, true);
506 stats->tres_usage_out_max_taskid = assoc_mgr_make_tres_str_from_array(
507 jobacct->tres_usage_out_max_taskid, flags, true);
508 stats->tres_usage_out_max_nodeid = assoc_mgr_make_tres_str_from_array(
509 jobacct->tres_usage_out_max_nodeid, flags, true);
510 stats->tres_usage_out_min = assoc_mgr_make_tres_str_from_array(
511 jobacct->tres_usage_out_min, flags, true);
512 stats->tres_usage_out_min_nodeid = assoc_mgr_make_tres_str_from_array(
513 jobacct->tres_usage_out_min_nodeid, flags, true);
514 stats->tres_usage_out_min_taskid = assoc_mgr_make_tres_str_from_array(
515 jobacct->tres_usage_out_min_taskid, flags, true);
516 assoc_mgr_unlock(&locks);
517 }
518
jobacct_gather_init(void)519 extern int jobacct_gather_init(void)
520 {
521 char *plugin_type = "jobacct_gather";
522 char *type = NULL;
523 int retval=SLURM_SUCCESS;
524
525 if (slurmdbd_conf || (_init_run_test() && g_context))
526 return retval;
527
528 slurm_mutex_lock(&g_context_lock);
529 if (g_context)
530 goto done;
531
532 type = slurm_get_jobacct_gather_type();
533
534 g_context = plugin_context_create(
535 plugin_type, type, (void **)&ops, syms, sizeof(syms));
536
537 if (!g_context) {
538 error("cannot create %s context for %s", plugin_type, type);
539 retval = SLURM_ERROR;
540 goto done;
541 }
542
543 if (!xstrcasecmp(type, "jobacct_gather/none")) {
544 plugin_polling = false;
545 goto done;
546 }
547
548 slurm_mutex_lock(&init_run_mutex);
549 init_run = true;
550 slurm_mutex_unlock(&init_run_mutex);
551
552 /* only print the WARNING messages if in the slurmctld */
553 if (!running_in_slurmctld())
554 goto done;
555
556 plugin_type = type;
557 type = slurm_get_proctrack_type();
558 if (!xstrcasecmp(type, "proctrack/pgid")) {
559 info("WARNING: We will use a much slower algorithm with "
560 "proctrack/pgid, use Proctracktype=proctrack/linuxproc "
561 "or some other proctrack when using %s",
562 plugin_type);
563 pgid_plugin = true;
564 }
565 xfree(type);
566 xfree(plugin_type);
567
568 type = slurm_get_accounting_storage_type();
569 if (!xstrcasecmp(type, ACCOUNTING_STORAGE_TYPE_NONE)) {
570 error("WARNING: Even though we are collecting accounting "
571 "information you have asked for it not to be stored "
572 "(%s) if this is not what you have in mind you will "
573 "need to change it.", ACCOUNTING_STORAGE_TYPE_NONE);
574 }
575
576 done:
577 slurm_mutex_unlock(&g_context_lock);
578 xfree(type);
579
580 return(retval);
581 }
582
jobacct_gather_fini(void)583 extern int jobacct_gather_fini(void)
584 {
585 int rc = SLURM_SUCCESS;
586
587 slurm_mutex_lock(&g_context_lock);
588 if (g_context) {
589 slurm_mutex_lock(&init_run_mutex);
590 init_run = false;
591 slurm_mutex_unlock(&init_run_mutex);
592
593 if (watch_tasks_thread_id) {
594 slurm_mutex_unlock(&g_context_lock);
595 slurm_mutex_lock(&profile_timer->notify_mutex);
596 slurm_cond_signal(&profile_timer->notify);
597 slurm_mutex_unlock(&profile_timer->notify_mutex);
598 pthread_join(watch_tasks_thread_id, NULL);
599 slurm_mutex_lock(&g_context_lock);
600 }
601
602 rc = plugin_context_destroy(g_context);
603 g_context = NULL;
604 }
605 slurm_mutex_unlock(&g_context_lock);
606
607 return rc;
608 }
609
jobacct_gather_startpoll(uint16_t frequency)610 extern int jobacct_gather_startpoll(uint16_t frequency)
611 {
612 int retval = SLURM_SUCCESS;
613
614 if (!plugin_polling)
615 return SLURM_SUCCESS;
616
617 if (jobacct_gather_init() < 0)
618 return SLURM_ERROR;
619
620 if (!_jobacct_shutdown_test()) {
621 error("jobacct_gather_startpoll: poll already started!");
622 return retval;
623 }
624 slurm_mutex_lock(&jobacct_shutdown_mutex);
625 jobacct_shutdown = false;
626 slurm_mutex_unlock(&jobacct_shutdown_mutex);
627
628 freq = frequency;
629
630 task_list = list_create(jobacctinfo_destroy);
631 if (frequency == 0) { /* don't want dynamic monitoring? */
632 debug2("jobacct_gather dynamic logging disabled");
633 return retval;
634 }
635
636 /* create polling thread */
637 slurm_thread_create(&watch_tasks_thread_id, _watch_tasks, NULL);
638
639 debug3("jobacct_gather dynamic logging enabled");
640
641 return retval;
642 }
643
jobacct_gather_endpoll(void)644 extern int jobacct_gather_endpoll(void)
645 {
646 int retval = SLURM_SUCCESS;
647
648 if (jobacct_gather_init() < 0)
649 return SLURM_ERROR;
650
651 slurm_mutex_lock(&jobacct_shutdown_mutex);
652 jobacct_shutdown = true;
653 slurm_mutex_unlock(&jobacct_shutdown_mutex);
654 slurm_mutex_lock(&task_list_lock);
655 FREE_NULL_LIST(task_list);
656
657 retval = (*(ops.endpoll))();
658
659 slurm_mutex_unlock(&task_list_lock);
660
661 return retval;
662 }
663
jobacct_gather_add_task(pid_t pid,jobacct_id_t * jobacct_id,int poll)664 extern int jobacct_gather_add_task(pid_t pid, jobacct_id_t *jobacct_id,
665 int poll)
666 {
667 struct jobacctinfo *jobacct;
668
669 if (jobacct_gather_init() < 0)
670 return SLURM_ERROR;
671
672 if (!plugin_polling)
673 return SLURM_SUCCESS;
674
675 if (_jobacct_shutdown_test())
676 return SLURM_ERROR;
677
678 jobacct = jobacctinfo_create(jobacct_id);
679
680 slurm_mutex_lock(&task_list_lock);
681 if (pid <= 0) {
682 error("invalid pid given (%d) for task acct", pid);
683 goto error;
684 } else if (!task_list) {
685 error("no task list created!");
686 goto error;
687 }
688
689 jobacct->pid = pid;
690 memcpy(&jobacct->id, jobacct_id, sizeof(jobacct_id_t));
691 debug2("adding task %u pid %d on node %u to jobacct",
692 jobacct_id->taskid, pid, jobacct_id->nodeid);
693 (*(ops.add_task))(pid, jobacct_id);
694 list_push(task_list, jobacct);
695 slurm_mutex_unlock(&task_list_lock);
696
697 if (poll == 1)
698 _poll_data(1);
699
700 return SLURM_SUCCESS;
701 error:
702 slurm_mutex_unlock(&task_list_lock);
703 jobacctinfo_destroy(jobacct);
704 return SLURM_ERROR;
705 }
706
jobacct_gather_stat_task(pid_t pid)707 extern jobacctinfo_t *jobacct_gather_stat_task(pid_t pid)
708 {
709 if (!plugin_polling || _jobacct_shutdown_test())
710 return NULL;
711
712 _poll_data(0);
713
714 if (pid) {
715 struct jobacctinfo *jobacct = NULL;
716 struct jobacctinfo *ret_jobacct = NULL;
717 ListIterator itr = NULL;
718
719 slurm_mutex_lock(&task_list_lock);
720 if (!task_list) {
721 error("no task list created!");
722 goto error;
723 }
724
725 itr = list_iterator_create(task_list);
726 while ((jobacct = list_next(itr))) {
727 if (jobacct->pid == pid)
728 break;
729 }
730 list_iterator_destroy(itr);
731 if (jobacct == NULL)
732 goto error;
733
734 _copy_tres_usage(&ret_jobacct, jobacct);
735
736 error:
737 slurm_mutex_unlock(&task_list_lock);
738 return ret_jobacct;
739 }
740
741 return NULL;
742 }
743
jobacct_gather_remove_task(pid_t pid)744 extern jobacctinfo_t *jobacct_gather_remove_task(pid_t pid)
745 {
746 struct jobacctinfo *jobacct = NULL;
747 ListIterator itr = NULL;
748
749 if (!plugin_polling)
750 return NULL;
751
752 /* poll data one last time before removing task
753 * mainly for updating energy consumption */
754 _poll_data(1);
755
756 if (_jobacct_shutdown_test())
757 return NULL;
758
759 slurm_mutex_lock(&task_list_lock);
760 if (!task_list) {
761 error("no task list created!");
762 goto error;
763 }
764
765 itr = list_iterator_create(task_list);
766 while((jobacct = list_next(itr))) {
767 if (jobacct->pid == pid) {
768 list_remove(itr);
769 break;
770 }
771 }
772 list_iterator_destroy(itr);
773 if (jobacct) {
774 debug2("removing task %u pid %d from jobacct",
775 jobacct->id.taskid, jobacct->pid);
776 } else {
777 debug2("pid(%d) not being watched in jobacct!", pid);
778 }
779 error:
780 slurm_mutex_unlock(&task_list_lock);
781 return jobacct;
782 }
783
jobacct_gather_set_proctrack_container_id(uint64_t id)784 extern int jobacct_gather_set_proctrack_container_id(uint64_t id)
785 {
786 if (!plugin_polling || pgid_plugin)
787 return SLURM_SUCCESS;
788
789 if (cont_id != NO_VAL64)
790 info("Warning: jobacct: set_proctrack_container_id: cont_id "
791 "is already set to %"PRIu64" you are setting it to "
792 "%"PRIu64"", cont_id, id);
793 if (id <= 0) {
794 error("jobacct: set_proctrack_container_id: "
795 "I was given most likely an unset cont_id %"PRIu64"",
796 id);
797 return SLURM_ERROR;
798 }
799 cont_id = id;
800
801 return SLURM_SUCCESS;
802 }
803
jobacct_gather_set_mem_limit(uint32_t job_id,uint32_t step_id,uint64_t mem_limit)804 extern int jobacct_gather_set_mem_limit(uint32_t job_id,
805 uint32_t step_id,
806 uint64_t mem_limit)
807 {
808 if (!plugin_polling)
809 return SLURM_SUCCESS;
810
811 if ((job_id == 0) || (mem_limit == 0)) {
812 error("jobacct_gather_set_mem_limit: jobid:%u "
813 "mem_limit:%"PRIu64"", job_id, mem_limit);
814 return SLURM_ERROR;
815 }
816
817 jobacct_job_id = job_id;
818 jobacct_step_id = step_id;
819 jobacct_mem_limit = mem_limit * 1048576; /* MB to B */
820 jobacct_vmem_limit = jobacct_mem_limit;
821 jobacct_vmem_limit *= (slurm_get_vsize_factor() / 100.0);
822 return SLURM_SUCCESS;
823 }
824
jobacct_gather_handle_mem_limit(uint64_t total_job_mem,uint64_t total_job_vsize)825 extern void jobacct_gather_handle_mem_limit(uint64_t total_job_mem,
826 uint64_t total_job_vsize)
827 {
828 if (!plugin_polling)
829 return;
830
831 if (jobacct_mem_limit) {
832 if (jobacct_step_id == NO_VAL) {
833 debug("Job %u memory used:%"PRIu64" limit:%"PRIu64" B",
834 jobacct_job_id, total_job_mem, jobacct_mem_limit);
835 } else {
836 debug("Step %u.%u memory used:%"PRIu64" "
837 "limit:%"PRIu64" B",
838 jobacct_job_id, jobacct_step_id,
839 total_job_mem, jobacct_mem_limit);
840 }
841 }
842 if (jobacct_job_id && jobacct_mem_limit &&
843 (total_job_mem > jobacct_mem_limit)) {
844 if (jobacct_step_id == NO_VAL) {
845 error("Job %u exceeded memory limit "
846 "(%"PRIu64" > %"PRIu64"), being "
847 "killed", jobacct_job_id, total_job_mem,
848 jobacct_mem_limit);
849 } else {
850 error("Step %u.%u exceeded memory limit "
851 "(%"PRIu64" > %"PRIu64"), "
852 "being killed", jobacct_job_id, jobacct_step_id,
853 total_job_mem, jobacct_mem_limit);
854 }
855 _acct_kill_step();
856 } else if (jobacct_job_id && jobacct_vmem_limit &&
857 (total_job_vsize > jobacct_vmem_limit)) {
858 if (jobacct_step_id == NO_VAL) {
859 error("Job %u exceeded virtual memory limit "
860 "(%"PRIu64" > %"PRIu64"), being killed",
861 jobacct_job_id,
862 total_job_vsize, jobacct_vmem_limit);
863 } else {
864 error("Step %u.%u exceeded virtual memory limit "
865 "(%"PRIu64" > %"PRIu64"), being killed",
866 jobacct_job_id,
867 jobacct_step_id, total_job_vsize,
868 jobacct_vmem_limit);
869 }
870 _acct_kill_step();
871 }
872 }
873
874 /********************* jobacctinfo functions ******************************/
875
jobacctinfo_create(jobacct_id_t * jobacct_id)876 extern jobacctinfo_t *jobacctinfo_create(jobacct_id_t *jobacct_id)
877 {
878 struct jobacctinfo *jobacct;
879 jobacct_id_t temp_id;
880
881 if (!plugin_polling)
882 return NULL;
883
884 jobacct = xmalloc(sizeof(struct jobacctinfo));
885
886 if (!jobacct_id) {
887 temp_id.taskid = NO_VAL;
888 temp_id.nodeid = NO_VAL;
889 jobacct_id = &temp_id;
890 }
891
892 jobacct->dataset_id = -1;
893 jobacct->sys_cpu_sec = 0;
894 jobacct->sys_cpu_usec = 0;
895 jobacct->user_cpu_sec = 0;
896 jobacct->user_cpu_usec = 0;
897
898 _jobacctinfo_create_tres_usage(jobacct_id, jobacct);
899 return jobacct;
900 }
901
jobacctinfo_destroy(void * object)902 extern void jobacctinfo_destroy(void *object)
903 {
904 struct jobacctinfo *jobacct = (struct jobacctinfo *)object;
905
906 _free_tres_usage(jobacct);
907 xfree(jobacct);
908 }
909
jobacctinfo_setinfo(jobacctinfo_t * jobacct,enum jobacct_data_type type,void * data,uint16_t protocol_version)910 extern int jobacctinfo_setinfo(jobacctinfo_t *jobacct,
911 enum jobacct_data_type type, void *data,
912 uint16_t protocol_version)
913 {
914 int rc = SLURM_SUCCESS;
915 int *fd = (int *)data;
916 struct rusage *rusage = (struct rusage *)data;
917 uint64_t *uint64 = (uint64_t *) data;
918 struct jobacctinfo *send = (struct jobacctinfo *) data;
919 Buf buffer = NULL;
920
921 if (!plugin_polling)
922 return SLURM_SUCCESS;
923
924 switch (type) {
925 case JOBACCT_DATA_TOTAL:
926 if (!jobacct) {
927 /* Avoid possible memory leak from _copy_tres_usage() */
928 error("%s: \'jobacct\' argument is NULL", __func__);
929 rc = SLURM_ERROR;
930 } else
931 _copy_tres_usage(&jobacct, send);
932 break;
933 case JOBACCT_DATA_PIPE:
934 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
935 int len;
936 assoc_mgr_lock_t locks = { .tres = READ_LOCK };
937
938 buffer = init_buf(0);
939
940 if (jobacct) {
941 assoc_mgr_lock(&locks);
942 jobacct->tres_list = assoc_mgr_tres_list;
943 }
944
945 jobacctinfo_pack(jobacct, protocol_version,
946 PROTOCOL_TYPE_SLURM, buffer);
947
948 if (jobacct) {
949 assoc_mgr_unlock(&locks);
950 jobacct->tres_list = NULL;
951 }
952
953 len = get_buf_offset(buffer);
954 safe_write(*fd, &len, sizeof(int));
955 safe_write(*fd, get_buf_data(buffer), len);
956 FREE_NULL_BUFFER(buffer);
957 }
958
959 break;
960 case JOBACCT_DATA_RUSAGE:
961 if (rusage->ru_utime.tv_sec > jobacct->user_cpu_sec)
962 jobacct->user_cpu_sec = rusage->ru_utime.tv_sec;
963 jobacct->user_cpu_usec = rusage->ru_utime.tv_usec;
964 if (rusage->ru_stime.tv_sec > jobacct->sys_cpu_sec)
965 jobacct->sys_cpu_sec = rusage->ru_stime.tv_sec;
966 jobacct->sys_cpu_usec = rusage->ru_stime.tv_usec;
967 break;
968 case JOBACCT_DATA_TOT_RSS:
969 jobacct->tres_usage_in_tot[TRES_ARRAY_MEM] = *uint64;
970 break;
971 case JOBACCT_DATA_TOT_VSIZE:
972 jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM] = *uint64;
973 break;
974 default:
975 debug("%s: data_type %d invalid", __func__, type);
976 }
977
978 return rc;
979
980 rwfail:
981 FREE_NULL_BUFFER(buffer);
982 return SLURM_ERROR;
983 }
984
jobacctinfo_getinfo(jobacctinfo_t * jobacct,enum jobacct_data_type type,void * data,uint16_t protocol_version)985 extern int jobacctinfo_getinfo(
986 jobacctinfo_t *jobacct, enum jobacct_data_type type, void *data,
987 uint16_t protocol_version)
988 {
989 int rc = SLURM_SUCCESS;
990 int *fd = (int *)data;
991 uint64_t *uint64 = (uint64_t *) data;
992 struct rusage *rusage = (struct rusage *)data;
993 struct jobacctinfo *send = (struct jobacctinfo *) data;
994 char *buf = NULL;
995
996 if (!plugin_polling)
997 return SLURM_SUCCESS;
998
999 /* jobacct needs to be allocated before this is called. */
1000 xassert(jobacct);
1001
1002 switch (type) {
1003 case JOBACCT_DATA_TOTAL:
1004 if (!send) {
1005 /* Avoid possible memory leak from _copy_tres_usage() */
1006 error("%s: \'data\' argument is NULL", __func__);
1007 rc = SLURM_ERROR;
1008 } else
1009 _copy_tres_usage(&send, jobacct);
1010 break;
1011 case JOBACCT_DATA_PIPE:
1012 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
1013 int len;
1014 Buf buffer;
1015
1016 safe_read(*fd, &len, sizeof(int));
1017 buf = xmalloc(len);
1018 safe_read(*fd, buf, len);
1019 buffer = create_buf(buf, len);
1020 jobacctinfo_unpack(&jobacct, protocol_version,
1021 PROTOCOL_TYPE_SLURM, buffer, 0);
1022 free_buf(buffer);
1023 }
1024
1025 break;
1026 case JOBACCT_DATA_RUSAGE:
1027 memset(rusage, 0, sizeof(struct rusage));
1028 rusage->ru_utime.tv_sec = jobacct->user_cpu_sec;
1029 rusage->ru_utime.tv_usec = jobacct->user_cpu_usec;
1030 rusage->ru_stime.tv_sec = jobacct->sys_cpu_sec;
1031 rusage->ru_stime.tv_usec = jobacct->sys_cpu_usec;
1032 break;
1033 case JOBACCT_DATA_TOT_RSS:
1034 *uint64 = jobacct->tres_usage_in_tot[TRES_ARRAY_MEM];
1035 break;
1036 case JOBACCT_DATA_TOT_VSIZE:
1037 *uint64 = jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM];
1038 break;
1039 default:
1040 debug("%s: data_type %d invalid", __func__, type);
1041 }
1042 return rc;
1043
1044 rwfail:
1045 xfree(buf);
1046 return SLURM_ERROR;
1047 }
1048
jobacctinfo_pack(jobacctinfo_t * jobacct,uint16_t rpc_version,uint16_t protocol_type,Buf buffer)1049 extern void jobacctinfo_pack(jobacctinfo_t *jobacct,
1050 uint16_t rpc_version, uint16_t protocol_type,
1051 Buf buffer)
1052 {
1053 bool no_pack;
1054
1055 no_pack = (!plugin_polling && (protocol_type != PROTOCOL_TYPE_DBD));
1056
1057 if (!jobacct || no_pack) {
1058 pack8((uint8_t) 0, buffer);
1059 return;
1060 }
1061
1062 pack8((uint8_t) 1, buffer);
1063
1064 if (rpc_version >= SLURM_MIN_PROTOCOL_VERSION) {
1065 pack32((uint32_t)jobacct->user_cpu_sec, buffer);
1066 pack32((uint32_t)jobacct->user_cpu_usec, buffer);
1067 pack32((uint32_t)jobacct->sys_cpu_sec, buffer);
1068 pack32((uint32_t)jobacct->sys_cpu_usec, buffer);
1069 pack32((uint32_t)jobacct->act_cpufreq, buffer);
1070 pack64((uint64_t)jobacct->energy.consumed_energy, buffer);
1071
1072 pack32_array(jobacct->tres_ids, jobacct->tres_count, buffer);
1073
1074 slurm_pack_list(jobacct->tres_list,
1075 slurmdb_pack_tres_rec, buffer,
1076 SLURM_PROTOCOL_VERSION);
1077
1078 pack64_array(jobacct->tres_usage_in_max,
1079 jobacct->tres_count, buffer);
1080 pack64_array(jobacct->tres_usage_in_max_nodeid,
1081 jobacct->tres_count, buffer);
1082 pack64_array(jobacct->tres_usage_in_max_taskid,
1083 jobacct->tres_count, buffer);
1084 pack64_array(jobacct->tres_usage_in_min,
1085 jobacct->tres_count, buffer);
1086 pack64_array(jobacct->tres_usage_in_min_nodeid,
1087 jobacct->tres_count, buffer);
1088 pack64_array(jobacct->tres_usage_in_min_taskid,
1089 jobacct->tres_count, buffer);
1090 pack64_array(jobacct->tres_usage_in_tot,
1091 jobacct->tres_count, buffer);
1092 pack64_array(jobacct->tres_usage_out_max,
1093 jobacct->tres_count, buffer);
1094 pack64_array(jobacct->tres_usage_out_max_nodeid,
1095 jobacct->tres_count, buffer);
1096 pack64_array(jobacct->tres_usage_out_max_taskid,
1097 jobacct->tres_count, buffer);
1098 pack64_array(jobacct->tres_usage_out_min,
1099 jobacct->tres_count, buffer);
1100 pack64_array(jobacct->tres_usage_out_min_nodeid,
1101 jobacct->tres_count, buffer);
1102 pack64_array(jobacct->tres_usage_out_min_taskid,
1103 jobacct->tres_count, buffer);
1104 pack64_array(jobacct->tres_usage_out_tot,
1105 jobacct->tres_count, buffer);
1106 } else {
1107 info("jobacctinfo_pack version %u not supported", rpc_version);
1108 return;
1109 }
1110 }
1111
jobacctinfo_unpack(jobacctinfo_t ** jobacct,uint16_t rpc_version,uint16_t protocol_type,Buf buffer,bool alloc)1112 extern int jobacctinfo_unpack(jobacctinfo_t **jobacct,
1113 uint16_t rpc_version, uint16_t protocol_type,
1114 Buf buffer, bool alloc)
1115 {
1116 uint32_t uint32_tmp;
1117 uint8_t uint8_tmp;
1118
1119 if (jobacct_gather_init() < 0)
1120 return SLURM_ERROR;
1121
1122 safe_unpack8(&uint8_tmp, buffer);
1123 if (uint8_tmp == (uint8_t) 0)
1124 return SLURM_SUCCESS;
1125
1126 xassert(jobacct);
1127
1128 if (alloc)
1129 *jobacct = xmalloc(sizeof(struct jobacctinfo));
1130 else {
1131 xassert(*jobacct);
1132 _free_tres_usage(*jobacct);
1133 }
1134
1135 if (rpc_version >= SLURM_MIN_PROTOCOL_VERSION) {
1136 safe_unpack32(&uint32_tmp, buffer);
1137 (*jobacct)->user_cpu_sec = uint32_tmp;
1138 safe_unpack32(&uint32_tmp, buffer);
1139 (*jobacct)->user_cpu_usec = uint32_tmp;
1140 safe_unpack32(&uint32_tmp, buffer);
1141 (*jobacct)->sys_cpu_sec = uint32_tmp;
1142 safe_unpack32(&uint32_tmp, buffer);
1143 (*jobacct)->sys_cpu_usec = uint32_tmp;
1144
1145 safe_unpack32(&(*jobacct)->act_cpufreq, buffer);
1146 safe_unpack64(&(*jobacct)->energy.consumed_energy, buffer);
1147
1148 safe_unpack32_array(&(*jobacct)->tres_ids,
1149 &(*jobacct)->tres_count, buffer);
1150 slurm_unpack_list(&(*jobacct)->tres_list,
1151 slurmdb_unpack_tres_rec,
1152 slurmdb_destroy_tres_rec,
1153 buffer, rpc_version);
1154 safe_unpack64_array(&(*jobacct)->tres_usage_in_max,
1155 &uint32_tmp, buffer);
1156 safe_unpack64_array(&(*jobacct)->tres_usage_in_max_nodeid,
1157 &uint32_tmp, buffer);
1158 safe_unpack64_array(&(*jobacct)->tres_usage_in_max_taskid,
1159 &uint32_tmp, buffer);
1160 safe_unpack64_array(&(*jobacct)->tres_usage_in_min,
1161 &uint32_tmp, buffer);
1162 safe_unpack64_array(&(*jobacct)->tres_usage_in_min_nodeid,
1163 &uint32_tmp, buffer);
1164 safe_unpack64_array(&(*jobacct)->tres_usage_in_min_taskid,
1165 &uint32_tmp, buffer);
1166 safe_unpack64_array(&(*jobacct)->tres_usage_in_tot,
1167 &uint32_tmp, buffer);
1168 safe_unpack64_array(&(*jobacct)->tres_usage_out_max,
1169 &uint32_tmp, buffer);
1170 safe_unpack64_array(&(*jobacct)->tres_usage_out_max_nodeid,
1171 &uint32_tmp, buffer);
1172 safe_unpack64_array(&(*jobacct)->tres_usage_out_max_taskid,
1173 &uint32_tmp, buffer);
1174 safe_unpack64_array(&(*jobacct)->tres_usage_out_min,
1175 &uint32_tmp, buffer);
1176 safe_unpack64_array(&(*jobacct)->tres_usage_out_min_nodeid,
1177 &uint32_tmp, buffer);
1178 safe_unpack64_array(&(*jobacct)->tres_usage_out_min_taskid,
1179 &uint32_tmp, buffer);
1180 safe_unpack64_array(&(*jobacct)->tres_usage_out_tot,
1181 &uint32_tmp, buffer);
1182 } else {
1183 info("jobacctinfo_unpack version %u not supported",
1184 rpc_version);
1185 return SLURM_ERROR;
1186 }
1187
1188 return SLURM_SUCCESS;
1189
1190 unpack_error:
1191 debug2("jobacctinfo_unpack: unpack_error: size_buf(buffer) %u",
1192 size_buf(buffer));
1193 if (alloc)
1194 jobacctinfo_destroy(*jobacct);
1195
1196 return SLURM_ERROR;
1197 }
1198
jobacctinfo_aggregate(jobacctinfo_t * dest,jobacctinfo_t * from)1199 extern void jobacctinfo_aggregate(jobacctinfo_t *dest, jobacctinfo_t *from)
1200 {
1201 if (!plugin_polling)
1202 return;
1203
1204 xassert(dest);
1205
1206 if (!from)
1207 return;
1208
1209 dest->user_cpu_sec += from->user_cpu_sec;
1210 dest->user_cpu_usec += from->user_cpu_usec;
1211 while (dest->user_cpu_usec >= 1E6) {
1212 dest->user_cpu_sec++;
1213 dest->user_cpu_usec -= 1E6;
1214 }
1215 dest->sys_cpu_sec += from->sys_cpu_sec;
1216 dest->sys_cpu_usec += from->sys_cpu_usec;
1217 while (dest->sys_cpu_usec >= 1E6) {
1218 dest->sys_cpu_sec++;
1219 dest->sys_cpu_usec -= 1E6;
1220 }
1221 dest->act_cpufreq += from->act_cpufreq;
1222 if (dest->energy.consumed_energy != NO_VAL64) {
1223 if (from->energy.consumed_energy == NO_VAL64)
1224 dest->energy.consumed_energy = NO_VAL64;
1225 else
1226 dest->energy.consumed_energy +=
1227 from->energy.consumed_energy;
1228 }
1229
1230 _jobacctinfo_aggregate_tres_usage(dest, from);
1231 }
1232
jobacctinfo_2_stats(slurmdb_stats_t * stats,jobacctinfo_t * jobacct)1233 extern void jobacctinfo_2_stats(slurmdb_stats_t *stats, jobacctinfo_t *jobacct)
1234 {
1235 xassert(jobacct);
1236 xassert(stats);
1237
1238 stats->act_cpufreq = (double)jobacct->act_cpufreq;
1239
1240 if (jobacct->energy.consumed_energy == NO_VAL64)
1241 stats->consumed_energy = NO_VAL64;
1242 else
1243 stats->consumed_energy =
1244 (double)jobacct->energy.consumed_energy;
1245
1246 _jobacctinfo_2_stats_tres_usage(stats, jobacct);
1247 }
1248