1 /*****************************************************************************\
2 * common_jag.c - slurm job accounting gather common plugin functions.
3 *****************************************************************************
4 * Copyright (C) 2013 SchedMD LLC
5 * Written by Danny Auble <da@schedmd.com>, who borrowed heavily
6 * from the original code in jobacct_gather/linux
7 *
8 * This file is part of Slurm, a resource management program.
9 * For details, see <https://slurm.schedmd.com/>.
10 * Please also read the included file: DISCLAIMER.
11 *
12 * Slurm is free software; you can redistribute it and/or modify it under
13 * the terms of the GNU General Public License as published by the Free
14 * Software Foundation; either version 2 of the License, or (at your option)
15 * any later version.
16 *
17 * In addition, as a special exception, the copyright holders give permission
18 * to link the code of portions of this program with the OpenSSL library under
19 * certain conditions as described in each individual source file, and
20 * distribute linked combinations including the two. You must obey the GNU
21 * General Public License in all respects for all of the code used other than
22 * OpenSSL. If you modify file(s) with this exception, you may extend this
23 * exception to your version of the file(s), but you are not obligated to do
24 * so. If you do not wish to do so, delete this exception statement from your
25 * version. If you delete this exception statement from all source files in
26 * the program, then also delete it here.
27 *
28 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
31 * details.
32 *
33 * You should have received a copy of the GNU General Public License along
34 * with Slurm; if not, write to the Free Software Foundation, Inc.,
35 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
36 *
37 * This file is patterned after jobcomp_linux.c, written by Morris Jette and
38 * Copyright (C) 2002 The Regents of the University of California.
39 \*****************************************************************************/
40
41 #include <dirent.h>
42 #include <fcntl.h>
43 #include <signal.h>
44 #include <time.h>
45 #include <ctype.h>
46
47 #include "src/common/slurm_xlator.h"
48 #include "src/common/assoc_mgr.h"
49 #include "src/common/slurm_jobacct_gather.h"
50 #include "src/common/slurm_protocol_api.h"
51 #include "src/common/slurm_protocol_defs.h"
52 #include "src/common/slurm_acct_gather_energy.h"
53 #include "src/common/slurm_acct_gather_filesystem.h"
54 #include "src/common/slurm_acct_gather_interconnect.h"
55 #include "src/common/xstring.h"
56 #include "src/slurmd/common/proctrack.h"
57
58 #include "common_jag.h"
59
60 /* These are defined here so when we link with something other than
61 * the slurmstepd we will have these symbols defined. They will get
62 * overwritten when linking with the slurmstepd.
63 */
64 #if defined (__APPLE__)
65 extern uint32_t g_tres_count __attribute__((weak_import));
66 extern char **assoc_mgr_tres_name_array __attribute__((weak_import));
67 #else
68 uint32_t g_tres_count;
69 char **assoc_mgr_tres_name_array;
70 #endif
71
72
73 static int cpunfo_frequency = 0;
74 static long hertz = 0;
75
76 static int my_pagesize = 0;
77 static DIR *slash_proc = NULL;
78 static int energy_profile = ENERGY_DATA_NODE_ENERGY_UP;
79 static uint64_t debug_flags = 0;
80
_find_prec(void * x,void * key)81 static int _find_prec(void *x, void *key)
82 {
83 jag_prec_t *prec = (jag_prec_t *) x;
84 struct jobacctinfo *jobacct = (struct jobacctinfo *) key;
85
86 if (prec->pid == jobacct->pid)
87 return 1;
88
89 return 0;
90 }
91
92 /* return weighted frequency in mhz */
_update_weighted_freq(struct jobacctinfo * jobacct,char * sbuf)93 static uint32_t _update_weighted_freq(struct jobacctinfo *jobacct,
94 char * sbuf)
95 {
96 uint32_t tot_cpu;
97 int thisfreq = 0;
98
99 if (cpunfo_frequency)
100 /* scaling not enabled */
101 thisfreq = cpunfo_frequency;
102 else
103 sscanf(sbuf, "%d", &thisfreq);
104
105 jobacct->current_weighted_freq =
106 jobacct->current_weighted_freq +
107 (uint32_t)jobacct->this_sampled_cputime * thisfreq;
108 tot_cpu = (uint32_t) jobacct->tres_usage_in_tot[TRES_ARRAY_CPU];
109 if (tot_cpu) {
110 return (uint32_t) (jobacct->current_weighted_freq / tot_cpu);
111 } else
112 return thisfreq;
113 }
114
115 /* Parse /proc/cpuinfo file for CPU frequency.
116 * Store the value in global variable cpunfo_frequency
117 * RET: True if read valid CPU frequency */
_get_freq(char * str)118 inline static bool _get_freq(char *str)
119 {
120 char *sep = NULL;
121 double cpufreq_value;
122 int cpu_mult;
123
124 if (strstr(str, "MHz"))
125 cpu_mult = 1;
126 else if (strstr(str, "GHz"))
127 cpu_mult = 1000; /* Scale to MHz */
128 else
129 return false;
130
131 sep = strchr(str, ':');
132 if (!sep)
133 return false;
134
135 if (sscanf(sep + 2, "%lf", &cpufreq_value) < 1)
136 return false;
137
138 cpunfo_frequency = cpufreq_value * cpu_mult;
139 debug2("cpunfo_frequency=%d", cpunfo_frequency);
140
141 return true;
142 }
143
144 /*
145 * collects the Pss value from /proc/<pid>/smaps
146 */
_get_pss(char * proc_smaps_file,jag_prec_t * prec)147 static int _get_pss(char *proc_smaps_file, jag_prec_t *prec)
148 {
149 uint64_t pss;
150 uint64_t p;
151 char line[128];
152 FILE *fp;
153 int i;
154
155 fp = fopen(proc_smaps_file, "r");
156 if (!fp) {
157 return -1;
158 }
159
160 if (fcntl(fileno(fp), F_SETFD, FD_CLOEXEC) == -1)
161 error("%s: fcntl(%s): %m", __func__, proc_smaps_file);
162 pss = 0;
163
164 while (fgets(line,sizeof(line),fp)) {
165
166 if (xstrncmp(line, "Pss:", 4) != 0) {
167 continue;
168 }
169
170 for (i = 4; i < sizeof(line); i++) {
171
172 if (!isdigit(line[i])) {
173 continue;
174 }
175 if (sscanf(&line[i],"%"PRIu64"", &p) == 1) {
176 pss += p;
177 }
178 break;
179 }
180 }
181
182 /* Check for error
183 */
184 if (ferror(fp)) {
185 fclose(fp);
186 return -1;
187 }
188
189 fclose(fp);
190 /* Sanity checks */
191
192 if (pss > 0 && prec->tres_data[TRES_ARRAY_MEM].size_read > pss) {
193 pss *= 1024; /* Scale KB to B */
194 prec->tres_data[TRES_ARRAY_MEM].size_read = pss;
195 }
196
197 debug3("%s: read pss %"PRIu64" for process %s",
198 __func__, pss, proc_smaps_file);
199
200 return 0;
201 }
202
_get_sys_interface_freq_line(uint32_t cpu,char * filename,char * sbuf)203 static int _get_sys_interface_freq_line(uint32_t cpu, char *filename,
204 char * sbuf)
205 {
206 int num_read, fd;
207 FILE *sys_fp = NULL;
208 char freq_file[80];
209 char cpunfo_line [128];
210
211 if (cpunfo_frequency)
212 /* scaling not enabled, static freq obtained */
213 return 1;
214
215 snprintf(freq_file, 79,
216 "/sys/devices/system/cpu/cpu%d/cpufreq/%s",
217 cpu, filename);
218 debug2("_get_sys_interface_freq_line: filename = %s ", freq_file);
219 if ((sys_fp = fopen(freq_file, "r"))!= NULL) {
220 /* frequency scaling enabled */
221 fd = fileno(sys_fp);
222 if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
223 error("%s: fcntl(%s): %m", __func__, freq_file);
224 num_read = read(fd, sbuf, (sizeof(sbuf) - 1));
225 if (num_read > 0) {
226 sbuf[num_read] = '\0';
227 debug2(" cpu %d freq= %s", cpu, sbuf);
228 }
229 fclose(sys_fp);
230 } else {
231 /* frequency scaling not enabled */
232 if (!cpunfo_frequency) {
233 snprintf(freq_file, 14, "/proc/cpuinfo");
234 debug2("_get_sys_interface_freq_line: filename = %s ",
235 freq_file);
236 if ((sys_fp = fopen(freq_file, "r")) != NULL) {
237 while (fgets(cpunfo_line, sizeof(cpunfo_line),
238 sys_fp) != NULL) {
239 if (_get_freq(cpunfo_line))
240 break;
241 }
242 fclose(sys_fp);
243 }
244 }
245 return 1;
246 }
247 return 0;
248 }
249
250 /*
251 * Check for lightweight processes (POSIX threads)
252 * Should be rewritten for FreeBSD so it doesn't depend on /proc
253 */
254
_is_a_lwp(uint32_t pid)255 static int _is_a_lwp(uint32_t pid)
256 {
257 char *filename = NULL;
258 char bf[4096];
259 int fd, attempts = 1;
260 ssize_t n;
261 char *tgids = NULL;
262 pid_t tgid = -1;
263
264 // Disable check for now, this will only skew process accounting
265 // slightly by including threads
266 return 0;
267
268 xstrfmtcat(filename, "/proc/%u/status", pid);
269
270 fd = open(filename, O_RDONLY);
271 if (fd < 0) {
272 xfree(filename);
273 return SLURM_ERROR;
274 }
275
276 again:
277 n = read(fd, bf, sizeof(bf) - 1);
278 if (n == -1 && (errno == EINTR || errno == EAGAIN) && attempts < 100) {
279 attempts++;
280 goto again;
281 }
282 if (n <= 0) {
283 close(fd);
284 xfree(filename);
285 return SLURM_ERROR;
286 }
287 bf[n] = '\0';
288 close(fd);
289 xfree(filename);
290
291 tgids = xstrstr(bf, "Tgid:");
292
293 if (tgids) {
294 tgids += 5; /* strlen("Tgid:") */
295 tgid = atoi(tgids);
296 } else
297 error("%s: Tgid: string not found for pid=%u", __func__, pid);
298
299 if (pid != (uint32_t)tgid) {
300 debug3("%s: pid=%u != tgid=%u is a lightweight process",
301 __func__, pid, tgid);
302 return 1;
303 } else {
304 debug3("%s: pid=%u == tgid=%u is the leader LWP",
305 __func__, pid, tgid);
306 return 0;
307 }
308 }
309
310 /* _get_process_data_line() - get line of data from /proc/<pid>/stat
311 *
312 * IN: in - input file descriptor
313 * OUT: prec - the destination for the data
314 *
315 * RETVAL: ==0 - no valid data
316 * !=0 - data are valid
317 *
318 * Based upon stat2proc() from the ps command. It can handle arbitrary
319 * executable file basenames for `cmd', i.e. those with embedded whitespace or
320 * embedded ')'s. Such names confuse %s (see scanf(3)), so the string is split
321 * and %39c is used instead. (except for embedded ')' "(%[^)]c)" would work.
322 */
_get_process_data_line(int in,jag_prec_t * prec)323 static int _get_process_data_line(int in, jag_prec_t *prec) {
324 char sbuf[512], *tmp;
325 int num_read, nvals;
326 char cmd[40], state[1];
327 int ppid, pgrp, session, tty_nr, tpgid;
328 long unsigned flags, minflt, cminflt, majflt, cmajflt;
329 long unsigned utime, stime, starttime, vsize;
330 long int cutime, cstime, priority, nice, timeout, itrealvalue, rss;
331 long unsigned f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13;
332 int exit_signal, last_cpu;
333
334 num_read = read(in, sbuf, (sizeof(sbuf) - 1));
335 if (num_read <= 0)
336 return 0;
337 sbuf[num_read] = '\0';
338
339 /*
340 * split into "PID (cmd" and "<rest>" replace trailing ')' with NULL
341 */
342 tmp = strrchr(sbuf, ')');
343 if (!tmp)
344 return 0;
345 *tmp = '\0';
346
347 /* parse these two strings separately, skipping the leading "(". */
348 nvals = sscanf(sbuf, "%d (%39c", &prec->pid, cmd);
349 if (nvals < 2)
350 return 0;
351
352 nvals = sscanf(tmp + 2, /* skip space after ')' too */
353 "%c %d %d %d %d %d "
354 "%lu %lu %lu %lu %lu "
355 "%lu %lu %ld %ld %ld %ld "
356 "%ld %ld %lu %lu %ld "
357 "%lu %lu %lu %lu %lu "
358 "%lu %lu %lu %lu %lu "
359 "%lu %lu %lu %d %d ",
360 state, &ppid, &pgrp, &session, &tty_nr, &tpgid,
361 &flags, &minflt, &cminflt, &majflt, &cmajflt,
362 &utime, &stime, &cutime, &cstime, &priority, &nice,
363 &timeout, &itrealvalue, &starttime, &vsize, &rss,
364 &f1, &f2, &f3, &f4, &f5 ,&f6, &f7, &f8, &f9, &f10, &f11,
365 &f12, &f13, &exit_signal, &last_cpu);
366 /* There are some additional fields, which we do not scan or use */
367 if ((nvals < 37) || (rss < 0))
368 return 0;
369
370 /*
371 * If current pid corresponds to a Light Weight Process (Thread POSIX)
372 * or there was an error, skip it, we will only account the original
373 * process (pid==tgid).
374 */
375 if (_is_a_lwp(prec->pid))
376 return 0;
377
378 /* Copy the values that slurm records into our data structure */
379 prec->ppid = ppid;
380
381 prec->tres_data[TRES_ARRAY_PAGES].size_read = majflt;
382 prec->tres_data[TRES_ARRAY_VMEM].size_read = vsize;
383 prec->tres_data[TRES_ARRAY_MEM].size_read = rss * my_pagesize;
384
385 /*
386 * Store unnormalized times, we will normalize in when
387 * transfering to a struct jobacctinfo in job_common_poll_data()
388 */
389 prec->usec = (double)utime;
390 prec->ssec = (double)stime;
391 prec->last_cpu = last_cpu;
392 return 1;
393 }
394
395 /* _get_process_memory_line() - get line of data from /proc/<pid>/statm
396 *
397 * IN: in - input file descriptor
398 * OUT: prec - the destination for the data
399 *
400 * RETVAL: ==0 - no valid data
401 * !=0 - data are valid
402 *
403 * The *prec will mostly be filled in. We need to simply subtract the
404 * amount of shared memory used by the process (in KB) from *prec->rss
405 * and return the updated struct.
406 *
407 */
_get_process_memory_line(int in,jag_prec_t * prec)408 static int _get_process_memory_line(int in, jag_prec_t *prec)
409 {
410 char sbuf[256];
411 int num_read, nvals;
412 long int size, rss, share, text, lib, data, dt;
413
414 num_read = read(in, sbuf, (sizeof(sbuf) - 1));
415 if (num_read <= 0)
416 return 0;
417 sbuf[num_read] = '\0';
418
419 nvals = sscanf(sbuf,
420 "%ld %ld %ld %ld %ld %ld %ld",
421 &size, &rss, &share, &text, &lib, &data, &dt);
422 /* There are some additional fields, which we do not scan or use */
423 if (nvals != 7)
424 return 0;
425
426 /* If shared > rss then there is a problem, give up... */
427 if (share > rss) {
428 debug("jobacct_gather_linux: share > rss - bail!");
429 return 0;
430 }
431
432 /* Copy the values that slurm records into our data structure */
433 prec->tres_data[TRES_ARRAY_MEM].size_read =
434 (rss - share) * my_pagesize;;
435
436 return 1;
437 }
438
_remove_share_data(char * proc_stat_file,jag_prec_t * prec)439 static int _remove_share_data(char *proc_stat_file, jag_prec_t *prec)
440 {
441 FILE *statm_fp = NULL;
442 char proc_statm_file[256]; /* Allow ~20x extra length */
443 int rc = 0, fd;
444
445 snprintf(proc_statm_file, sizeof(proc_statm_file), "%sm",
446 proc_stat_file);
447 if (!(statm_fp = fopen(proc_statm_file, "r")))
448 return rc; /* Assume the process went away */
449 fd = fileno(statm_fp);
450 if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
451 error("%s: fcntl(%s): %m", __func__, proc_statm_file);
452 rc = _get_process_memory_line(fd, prec);
453 fclose(statm_fp);
454 return rc;
455 }
456
457 /* _get_process_io_data_line() - get line of data from /proc/<pid>/io
458 *
459 * IN: in - input file descriptor
460 * OUT: prec - the destination for the data
461 *
462 * RETVAL: ==0 - no valid data
463 * !=0 - data are valid
464 *
465 * /proc/<pid>/io content format is:
466 * rchar: <# of characters read>
467 * wrchar: <# of characters written>
468 * . . .
469 */
_get_process_io_data_line(int in,jag_prec_t * prec)470 static int _get_process_io_data_line(int in, jag_prec_t *prec) {
471 char sbuf[256];
472 char f1[7], f3[7];
473 int num_read, nvals;
474 uint64_t rchar, wchar;
475
476 num_read = read(in, sbuf, (sizeof(sbuf) - 1));
477 if (num_read <= 0)
478 return 0;
479 sbuf[num_read] = '\0';
480
481 nvals = sscanf(sbuf, "%s %"PRIu64" %s %"PRIu64"",
482 f1, &rchar, f3, &wchar);
483 if (nvals < 4)
484 return 0;
485
486 if (_is_a_lwp(prec->pid))
487 return 0;
488
489 /* keep real value here since we aren't doubles */
490 prec->tres_data[TRES_ARRAY_FS_DISK].size_read = rchar;
491 prec->tres_data[TRES_ARRAY_FS_DISK].size_write = wchar;
492
493 return 1;
494 }
495
_handle_stats(List prec_list,char * proc_stat_file,char * proc_io_file,char * proc_smaps_file,jag_callbacks_t * callbacks,int tres_count)496 static void _handle_stats(List prec_list, char *proc_stat_file,
497 char *proc_io_file, char *proc_smaps_file,
498 jag_callbacks_t *callbacks,
499 int tres_count)
500 {
501 static int no_share_data = -1;
502 static int use_pss = -1;
503 FILE *stat_fp = NULL;
504 FILE *io_fp = NULL;
505 int fd, fd2, i;
506 jag_prec_t *prec = NULL;
507
508 if (no_share_data == -1) {
509 char *acct_params = slurm_get_jobacct_gather_params();
510 if (acct_params && xstrcasestr(acct_params, "NoShare"))
511 no_share_data = 1;
512 else
513 no_share_data = 0;
514
515 if (acct_params && xstrcasestr(acct_params, "UsePss"))
516 use_pss = 1;
517 else
518 use_pss = 0;
519 xfree(acct_params);
520 }
521
522 if (!(stat_fp = fopen(proc_stat_file, "r")))
523 return; /* Assume the process went away */
524 /*
525 * Close the file on exec() of user tasks.
526 *
527 * NOTE: If we fork() slurmstepd after the
528 * fopen() above and before the fcntl() below,
529 * then the user task may have this extra file
530 * open, which can cause problems for
531 * checkpoint/restart, but this should be a very rare
532 * problem in practice.
533 */
534 fd = fileno(stat_fp);
535 if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
536 error("%s: fcntl(%s): %m", __func__, proc_stat_file);
537
538 prec = xmalloc(sizeof(jag_prec_t));
539
540 if (!tres_count) {
541 assoc_mgr_lock_t locks = {
542 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK,
543 READ_LOCK, NO_LOCK, NO_LOCK };
544 assoc_mgr_lock(&locks);
545 tres_count = g_tres_count;
546 assoc_mgr_unlock(&locks);
547 }
548
549 prec->tres_count = tres_count;
550 prec->tres_data = xmalloc(prec->tres_count *
551 sizeof(acct_gather_data_t));
552
553 /* Initialize read/writes */
554 for (i = 0; i < prec->tres_count; i++) {
555 prec->tres_data[i].num_reads = INFINITE64;
556 prec->tres_data[i].num_writes = INFINITE64;
557 prec->tres_data[i].size_read = INFINITE64;
558 prec->tres_data[i].size_write = INFINITE64;
559 }
560
561 if (!_get_process_data_line(fd, prec)) {
562 fclose(stat_fp);
563 goto bail_out;
564 }
565
566 fclose(stat_fp);
567
568 if (acct_gather_filesystem_g_get_data(prec->tres_data) < 0) {
569 debug2("problem retrieving filesystem data");
570 }
571
572 if (acct_gather_interconnect_g_get_data(prec->tres_data) < 0) {
573 debug2("problem retrieving interconnect data");
574 }
575
576 /* Remove shared data from rss */
577 if (no_share_data && !_remove_share_data(proc_stat_file, prec))
578 goto bail_out;
579
580 /* Use PSS instead if RSS */
581 if (use_pss && _get_pss(proc_smaps_file, prec) == -1)
582 goto bail_out;
583
584 if ((io_fp = fopen(proc_io_file, "r"))) {
585 fd2 = fileno(io_fp);
586 if (fcntl(fd2, F_SETFD, FD_CLOEXEC) == -1)
587 error("%s: fcntl: %m", __func__);
588 if (!_get_process_io_data_line(fd2, prec)) {
589 fclose(io_fp);
590 goto bail_out;
591 }
592 fclose(io_fp);
593 }
594
595 list_append(prec_list, prec);
596 return;
597
598 bail_out:
599 xfree(prec->tres_data);
600 xfree(prec);
601 return;
602 }
603
_get_precs(List task_list,bool pgid_plugin,uint64_t cont_id,jag_callbacks_t * callbacks)604 static List _get_precs(List task_list, bool pgid_plugin, uint64_t cont_id,
605 jag_callbacks_t *callbacks)
606 {
607 List prec_list = list_create(destroy_jag_prec);
608 char proc_stat_file[256]; /* Allow ~20x extra length */
609 char proc_io_file[256]; /* Allow ~20x extra length */
610 char proc_smaps_file[256]; /* Allow ~20x extra length */
611 static int slash_proc_open = 0;
612 int i;
613 struct jobacctinfo *jobacct = NULL;
614
615 xassert(task_list);
616
617 jobacct = list_peek(task_list);
618
619 if (!pgid_plugin) {
620 pid_t *pids = NULL;
621 int npids = 0;
622 /* get only the processes in the proctrack container */
623 proctrack_g_get_pids(cont_id, &pids, &npids);
624 if (!npids) {
625 /* update consumed energy even if pids do not exist */
626 if (jobacct) {
627 acct_gather_energy_g_get_sum(
628 energy_profile,
629 &jobacct->energy);
630 jobacct->tres_usage_in_tot[TRES_ARRAY_ENERGY] =
631 jobacct->energy.consumed_energy;
632 jobacct->tres_usage_out_tot[TRES_ARRAY_ENERGY] =
633 jobacct->energy.current_watts;
634 debug2("%s: energy = %"PRIu64" watts = %"PRIu64,
635 __func__,
636 jobacct->tres_usage_in_tot[
637 TRES_ARRAY_ENERGY],
638 jobacct->tres_usage_out_tot[
639 TRES_ARRAY_ENERGY]);
640 }
641
642 debug4("no pids in this container %"PRIu64"", cont_id);
643 goto finished;
644 }
645 for (i = 0; i < npids; i++) {
646 snprintf(proc_stat_file, 256, "/proc/%d/stat", pids[i]);
647 snprintf(proc_io_file, 256, "/proc/%d/io", pids[i]);
648 snprintf(proc_smaps_file, 256, "/proc/%d/smaps", pids[i]);
649 _handle_stats(prec_list, proc_stat_file, proc_io_file,
650 proc_smaps_file, callbacks,
651 jobacct ? jobacct->tres_count : 0);
652 }
653 xfree(pids);
654 } else {
655 struct dirent *slash_proc_entry;
656 char *iptr = NULL, *optr = NULL, *optr2 = NULL;
657
658 if (slash_proc_open) {
659 rewinddir(slash_proc);
660 } else {
661 slash_proc=opendir("/proc");
662 if (slash_proc == NULL) {
663 perror("opening /proc");
664 goto finished;
665 }
666 slash_proc_open=1;
667 }
668 strcpy(proc_stat_file, "/proc/");
669 strcpy(proc_io_file, "/proc/");
670 strcpy(proc_smaps_file, "/proc/");
671
672 while ((slash_proc_entry = readdir(slash_proc))) {
673
674 /* Save a few cyles by simulating
675 * strcat(statFileName, slash_proc_entry->d_name);
676 * strcat(statFileName, "/stat");
677 * while checking for a numeric filename (which really
678 * should be a pid). Then do the same for the
679 * /proc/<pid>/io file name.
680 */
681 optr = proc_stat_file + sizeof("/proc");
682 iptr = slash_proc_entry->d_name;
683 i = 0;
684 do {
685 if ((*iptr < '0') ||
686 ((*optr++ = *iptr++) > '9')) {
687 i = -1;
688 break;
689 }
690 } while (*iptr);
691
692 if (i == -1)
693 continue;
694 iptr = (char*)"/stat";
695
696 do {
697 *optr++ = *iptr++;
698 } while (*iptr);
699 *optr = 0;
700
701 optr2 = proc_io_file + sizeof("/proc");
702 iptr = slash_proc_entry->d_name;
703 i = 0;
704 do {
705 if ((*iptr < '0') ||
706 ((*optr2++ = *iptr++) > '9')) {
707 i = -1;
708 break;
709 }
710 } while (*iptr);
711 if (i == -1)
712 continue;
713 iptr = (char*)"/io";
714
715 do {
716 *optr2++ = *iptr++;
717 } while (*iptr);
718 *optr2 = 0;
719
720 optr2 = proc_smaps_file + sizeof("/proc");
721 iptr = slash_proc_entry->d_name;
722 i = 0;
723 do {
724 if ((*iptr < '0') ||
725 ((*optr2++ = *iptr++) > '9')) {
726 i = -1;
727 break;
728 }
729 } while (*iptr);
730 if (i == -1)
731 continue;
732 iptr = (char*)"/smaps";
733
734 do {
735 *optr2++ = *iptr++;
736 } while (*iptr);
737 *optr2 = 0;
738
739 _handle_stats(prec_list, proc_stat_file, proc_io_file,
740 proc_smaps_file,callbacks,
741 jobacct ? jobacct->tres_count : 0);
742 }
743 }
744
745 finished:
746
747 return prec_list;
748 }
749
_record_profile(struct jobacctinfo * jobacct)750 static void _record_profile(struct jobacctinfo *jobacct)
751 {
752 enum {
753 FIELD_CPUFREQ,
754 FIELD_CPUTIME,
755 FIELD_CPUUTIL,
756 FIELD_RSS,
757 FIELD_VMSIZE,
758 FIELD_PAGES,
759 FIELD_READ,
760 FIELD_WRITE,
761 FIELD_CNT
762 };
763
764 acct_gather_profile_dataset_t dataset[] = {
765 { "CPUFrequency", PROFILE_FIELD_UINT64 },
766 { "CPUTime", PROFILE_FIELD_DOUBLE },
767 { "CPUUtilization", PROFILE_FIELD_DOUBLE },
768 { "RSS", PROFILE_FIELD_UINT64 },
769 { "VMSize", PROFILE_FIELD_UINT64 },
770 { "Pages", PROFILE_FIELD_UINT64 },
771 { "ReadMB", PROFILE_FIELD_DOUBLE },
772 { "WriteMB", PROFILE_FIELD_DOUBLE },
773 { NULL, PROFILE_FIELD_NOT_SET }
774 };
775
776 static int64_t profile_gid = -1;
777 double et;
778 union {
779 double d;
780 uint64_t u64;
781 } data[FIELD_CNT];
782
783 if (profile_gid == -1)
784 profile_gid = acct_gather_profile_g_create_group("Tasks");
785
786 /* Create the dataset first */
787 if (jobacct->dataset_id < 0) {
788 char ds_name[32];
789 snprintf(ds_name, sizeof(ds_name), "%u", jobacct->id.taskid);
790
791 jobacct->dataset_id = acct_gather_profile_g_create_dataset(
792 ds_name, profile_gid, dataset);
793 if (jobacct->dataset_id == SLURM_ERROR) {
794 error("JobAcct: Failed to create the dataset for "
795 "task %d",
796 jobacct->pid);
797 return;
798 }
799 }
800
801 if (jobacct->dataset_id < 0)
802 return;
803
804 data[FIELD_CPUFREQ].u64 = jobacct->act_cpufreq;
805 /* Profile Mem and VMem as KB */
806 data[FIELD_RSS].u64 =
807 jobacct->tres_usage_in_tot[TRES_ARRAY_MEM] / 1024;
808 data[FIELD_VMSIZE].u64 =
809 jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM] / 1024;
810 data[FIELD_PAGES].u64 = jobacct->tres_usage_in_tot[TRES_ARRAY_PAGES];
811
812 /* delta from last snapshot */
813 if (!jobacct->last_time) {
814 data[FIELD_CPUTIME].d = 0;
815 data[FIELD_CPUUTIL].d = 0.0;
816 data[FIELD_READ].d = 0.0;
817 data[FIELD_WRITE].d = 0.0;
818 } else {
819 data[FIELD_CPUTIME].d =
820 ((double)jobacct->tres_usage_in_tot[TRES_ARRAY_CPU] -
821 jobacct->last_total_cputime) / CPU_TIME_ADJ;
822
823 if (data[FIELD_CPUTIME].d < 0)
824 data[FIELD_CPUTIME].d =
825 jobacct->tres_usage_in_tot[TRES_ARRAY_CPU] /
826 CPU_TIME_ADJ;
827
828 et = (jobacct->cur_time - jobacct->last_time);
829 if (!et)
830 data[FIELD_CPUUTIL].d = 0.0;
831 else
832 data[FIELD_CPUUTIL].d =
833 (100.0 * (double)data[FIELD_CPUTIME].d) /
834 ((double) et);
835
836 data[FIELD_READ].d = (double) jobacct->
837 tres_usage_in_tot[TRES_ARRAY_FS_DISK] -
838 jobacct->last_tres_usage_in_tot;
839
840 if (data[FIELD_READ].d < 0)
841 data[FIELD_READ].d =
842 jobacct->tres_usage_in_tot[TRES_ARRAY_FS_DISK];
843
844 data[FIELD_WRITE].d = (double) jobacct->
845 tres_usage_out_tot[TRES_ARRAY_FS_DISK] -
846 jobacct->last_tres_usage_out_tot;
847
848 if (data[FIELD_WRITE].d < 0)
849 data[FIELD_WRITE].d =
850 jobacct->tres_usage_out_tot[TRES_ARRAY_FS_DISK];
851
852 /* Profile disk as MB */
853 data[FIELD_READ].d /= 1048576.0;
854 data[FIELD_WRITE].d /= 1048576.0;
855 }
856
857 if (debug_flags & DEBUG_FLAG_PROFILE) {
858 char str[256];
859 info("PROFILE-Task: %s", acct_gather_profile_dataset_str(
860 dataset, data, str, sizeof(str)));
861 }
862 acct_gather_profile_g_add_sample_data(jobacct->dataset_id,
863 (void *)data, jobacct->cur_time);
864 }
865
jag_common_init(long in_hertz)866 extern void jag_common_init(long in_hertz)
867 {
868 uint32_t profile_opt;
869
870 debug_flags = slurm_get_debug_flags();
871
872 acct_gather_profile_g_get(ACCT_GATHER_PROFILE_RUNNING,
873 &profile_opt);
874
875 /* If we are profiling energy it will be checked at a
876 different rate, so just grab the last one.
877 */
878 if (profile_opt & ACCT_GATHER_PROFILE_ENERGY)
879 energy_profile = ENERGY_DATA_NODE_ENERGY;
880
881 if (in_hertz) {
882 hertz = in_hertz;
883 } else {
884 hertz = sysconf(_SC_CLK_TCK);
885
886 if (hertz < 1) {
887 error ("_get_process_data: unable to get clock rate");
888 hertz = 100; /* default on many systems */
889 }
890 }
891
892 my_pagesize = getpagesize();
893 }
894
jag_common_fini(void)895 extern void jag_common_fini(void)
896 {
897 if (slash_proc)
898 (void) closedir(slash_proc);
899 }
900
destroy_jag_prec(void * object)901 extern void destroy_jag_prec(void *object)
902 {
903 jag_prec_t *prec = (jag_prec_t *)object;
904 xfree(prec->tres_data);
905 xfree(prec);
906 return;
907 }
908
print_jag_prec(jag_prec_t * prec)909 extern void print_jag_prec(jag_prec_t *prec)
910 {
911 int i;
912 assoc_mgr_lock_t locks = {
913 NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK,
914 READ_LOCK, NO_LOCK, NO_LOCK };
915
916 info("pid %d (ppid %d)", prec->pid, prec->ppid);
917 info("act_cpufreq\t%d", prec->act_cpufreq);
918 info("ssec \t%f", prec->ssec);
919 assoc_mgr_lock(&locks);
920 for (i = 0; i < prec->tres_count; i++) {
921 if (prec->tres_data[i].size_read == INFINITE64)
922 continue;
923 info("%s in/read \t%"PRIu64"",
924 assoc_mgr_tres_name_array[i],
925 prec->tres_data[i].size_read);
926 info("%s out/write \t%"PRIu64"",
927 assoc_mgr_tres_name_array[i],
928 prec->tres_data[i].size_write);
929 }
930 assoc_mgr_unlock(&locks);
931 info("usec \t%f", prec->usec);
932 }
933
jag_common_poll_data(List task_list,bool pgid_plugin,uint64_t cont_id,jag_callbacks_t * callbacks,bool profile)934 extern void jag_common_poll_data(
935 List task_list, bool pgid_plugin, uint64_t cont_id,
936 jag_callbacks_t *callbacks, bool profile)
937 {
938 /* Update the data */
939 List prec_list = NULL;
940 uint64_t total_job_mem = 0, total_job_vsize = 0;
941 ListIterator itr;
942 jag_prec_t *prec = NULL;
943 struct jobacctinfo *jobacct = NULL;
944 static int processing = 0;
945 char sbuf[72];
946 int energy_counted = 0;
947 time_t ct;
948 static int over_memory_kill = -1;
949 int i = 0;
950
951 xassert(callbacks);
952
953 if (!pgid_plugin && (cont_id == NO_VAL64)) {
954 debug("cont_id hasn't been set yet not running poll");
955 return;
956 }
957
958 if (processing) {
959 debug("already running, returning");
960 return;
961 }
962 processing = 1;
963
964 if (!callbacks->get_precs)
965 callbacks->get_precs = _get_precs;
966
967 ct = time(NULL);
968 prec_list = (*(callbacks->get_precs))(task_list, pgid_plugin, cont_id,
969 callbacks);
970
971 if (!list_count(prec_list) || !task_list || !list_count(task_list))
972 goto finished; /* We have no business being here! */
973
974 itr = list_iterator_create(task_list);
975 while ((jobacct = list_next(itr))) {
976 double cpu_calc;
977 double last_total_cputime;
978 if (!(prec = list_find_first(prec_list, _find_prec, jobacct)))
979 continue;
980
981 /*
982 * Only jobacct_gather/cgroup uses prec_extra, and we want to
983 * make sure we call it once per task, so call it here as we
984 * iterate through the tasks instead of in get_precs.
985 */
986 if (callbacks->prec_extra)
987 (*(callbacks->prec_extra))(prec, jobacct->id.taskid);
988
989 #if _DEBUG
990 info("pid:%u ppid:%u rss:%"PRIu64" B",
991 prec->pid, prec->ppid,
992 prec->tres_data[TRES_ARRAY_MEM].size_read);
993 #endif
994 /* find all my descendents */
995 if (callbacks->get_offspring_data)
996 (*(callbacks->get_offspring_data))
997 (prec_list, prec, prec->pid);
998
999 last_total_cputime =
1000 (double)jobacct->tres_usage_in_tot[TRES_ARRAY_CPU];
1001
1002 cpu_calc = (prec->ssec + prec->usec) / (double)hertz;
1003
1004 /*
1005 * Since we are not storing things as a double anymore make it
1006 * bigger so we don't loose precision.
1007 */
1008 cpu_calc *= CPU_TIME_ADJ;
1009
1010 prec->tres_data[TRES_ARRAY_CPU].size_read = (uint64_t)cpu_calc;
1011
1012 /* get energy consumption
1013 * only once is enough since we
1014 * report per node energy consumption.
1015 * Energy is stored in read fields, while power is stored
1016 * in write fields.*/
1017 debug2("energycounted = %d", energy_counted);
1018 if (energy_counted == 0) {
1019 acct_gather_energy_g_get_sum(
1020 energy_profile,
1021 &jobacct->energy);
1022 prec->tres_data[TRES_ARRAY_ENERGY].size_read =
1023 jobacct->energy.consumed_energy;
1024 prec->tres_data[TRES_ARRAY_ENERGY].size_write =
1025 jobacct->energy.current_watts;
1026 debug2("%s: energy = %"PRIu64" watts = %"PRIu64" ave_watts = %u",
1027 __func__,
1028 prec->tres_data[TRES_ARRAY_ENERGY].size_read,
1029 prec->tres_data[TRES_ARRAY_ENERGY].size_write,
1030 jobacct->energy.ave_watts);
1031 energy_counted = 1;
1032 }
1033
1034 /* tally their usage */
1035 for (i = 0; i < jobacct->tres_count; i++) {
1036 if (prec->tres_data[i].size_read == INFINITE64)
1037 continue;
1038 if (jobacct->tres_usage_in_max[i] == INFINITE64)
1039 jobacct->tres_usage_in_max[i] =
1040 prec->tres_data[i].size_read;
1041 else
1042 jobacct->tres_usage_in_max[i] =
1043 MAX(jobacct->tres_usage_in_max[i],
1044 prec->tres_data[i].size_read);
1045 /*
1046 * Even with min we want to get the max as we are
1047 * looking at a specific task aso we are always looking
1048 * at the max that task had, not the min (or lots of
1049 * things will be zero). The min is from comparing
1050 * ranks later when combining. So here it will be the
1051 * same as the max value set above.
1052 * (same thing goes for the out)
1053 */
1054 jobacct->tres_usage_in_min[i] =
1055 jobacct->tres_usage_in_max[i];
1056 jobacct->tres_usage_in_tot[i] =
1057 prec->tres_data[i].size_read;
1058
1059 if (jobacct->tres_usage_out_max[i] == INFINITE64)
1060 jobacct->tres_usage_out_max[i] =
1061 prec->tres_data[i].size_write;
1062 else
1063 jobacct->tres_usage_out_max[i] =
1064 MAX(jobacct->tres_usage_out_max[i],
1065 prec->tres_data[i].size_write);
1066 jobacct->tres_usage_out_min[i] =
1067 jobacct->tres_usage_out_max[i];
1068 jobacct->tres_usage_out_tot[i] =
1069 prec->tres_data[i].size_write;
1070 }
1071
1072 total_job_mem += jobacct->tres_usage_in_tot[TRES_ARRAY_MEM];
1073 total_job_vsize += jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM];
1074
1075 /* Update the cpu times */
1076 jobacct->user_cpu_sec = (uint32_t)(prec->usec / (double)hertz);
1077 jobacct->sys_cpu_sec = (uint32_t)(prec->ssec / (double)hertz);
1078
1079 /* compute frequency */
1080 jobacct->this_sampled_cputime =
1081 cpu_calc - last_total_cputime;
1082 _get_sys_interface_freq_line(
1083 prec->last_cpu,
1084 "cpuinfo_cur_freq", sbuf);
1085 jobacct->act_cpufreq =
1086 _update_weighted_freq(jobacct, sbuf);
1087
1088 debug("%s: Task %u pid %d ave_freq = %u mem size/max %"PRIu64"/%"PRIu64" vmem size/max %"PRIu64"/%"PRIu64", disk read size/max (%"PRIu64"/%"PRIu64"), disk write size/max (%"PRIu64"/%"PRIu64"), time %f(%u+%u) Energy tot/max %"PRIu64"/%"PRIu64" TotPower %"PRIu64" MaxPower %"PRIu64" MinPower %"PRIu64,
1089 __func__,
1090 jobacct->id.taskid,
1091 jobacct->pid,
1092 jobacct->act_cpufreq,
1093 jobacct->tres_usage_in_tot[TRES_ARRAY_MEM],
1094 jobacct->tres_usage_in_max[TRES_ARRAY_MEM],
1095 jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM],
1096 jobacct->tres_usage_in_max[TRES_ARRAY_VMEM],
1097 jobacct->tres_usage_in_tot[TRES_ARRAY_FS_DISK],
1098 jobacct->tres_usage_in_max[TRES_ARRAY_FS_DISK],
1099 jobacct->tres_usage_out_tot[TRES_ARRAY_FS_DISK],
1100 jobacct->tres_usage_out_max[TRES_ARRAY_FS_DISK],
1101 (double)(jobacct->tres_usage_in_tot[TRES_ARRAY_CPU] /
1102 CPU_TIME_ADJ),
1103 jobacct->user_cpu_sec,
1104 jobacct->sys_cpu_sec,
1105 jobacct->tres_usage_in_tot[TRES_ARRAY_ENERGY],
1106 jobacct->tres_usage_in_max[TRES_ARRAY_ENERGY],
1107 jobacct->tres_usage_out_tot[TRES_ARRAY_ENERGY],
1108 jobacct->tres_usage_out_max[TRES_ARRAY_ENERGY],
1109 jobacct->tres_usage_out_min[TRES_ARRAY_ENERGY]);
1110
1111 if (profile &&
1112 acct_gather_profile_g_is_active(ACCT_GATHER_PROFILE_TASK)) {
1113 jobacct->cur_time = ct;
1114
1115 _record_profile(jobacct);
1116
1117 jobacct->last_tres_usage_in_tot =
1118 jobacct->tres_usage_in_tot[TRES_ARRAY_FS_DISK];
1119 jobacct->last_tres_usage_out_tot =
1120 jobacct->tres_usage_out_tot[TRES_ARRAY_FS_DISK];
1121 jobacct->last_total_cputime =
1122 jobacct->tres_usage_in_tot[TRES_ARRAY_CPU];
1123
1124 jobacct->last_time = jobacct->cur_time;
1125 }
1126 }
1127 list_iterator_destroy(itr);
1128
1129 if (over_memory_kill == -1)
1130 over_memory_kill = slurm_get_job_acct_oom_kill();
1131
1132 if (over_memory_kill)
1133 jobacct_gather_handle_mem_limit(total_job_mem,
1134 total_job_vsize);
1135
1136 finished:
1137 FREE_NULL_LIST(prec_list);
1138 processing = 0;
1139 }
1140