1 /*****************************************************************************\
2  *  common_jag.c - slurm job accounting gather common plugin functions.
3  *****************************************************************************
4  *  Copyright (C) 2013 SchedMD LLC
5  *  Written by Danny Auble <da@schedmd.com>, who borrowed heavily
6  *  from the original code in jobacct_gather/linux
7  *
8  *  This file is part of Slurm, a resource management program.
9  *  For details, see <https://slurm.schedmd.com/>.
10  *  Please also read the included file: DISCLAIMER.
11  *
12  *  Slurm is free software; you can redistribute it and/or modify it under
13  *  the terms of the GNU General Public License as published by the Free
14  *  Software Foundation; either version 2 of the License, or (at your option)
15  *  any later version.
16  *
17  *  In addition, as a special exception, the copyright holders give permission
18  *  to link the code of portions of this program with the OpenSSL library under
19  *  certain conditions as described in each individual source file, and
20  *  distribute linked combinations including the two. You must obey the GNU
21  *  General Public License in all respects for all of the code used other than
22  *  OpenSSL. If you modify file(s) with this exception, you may extend this
23  *  exception to your version of the file(s), but you are not obligated to do
24  *  so. If you do not wish to do so, delete this exception statement from your
25  *  version.  If you delete this exception statement from all source files in
26  *  the program, then also delete it here.
27  *
28  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
29  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
30  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
31  *  details.
32  *
33  *  You should have received a copy of the GNU General Public License along
34  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
35  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
36  *
37  *  This file is patterned after jobcomp_linux.c, written by Morris Jette and
38  *  Copyright (C) 2002 The Regents of the University of California.
39 \*****************************************************************************/
40 
41 #include <dirent.h>
42 #include <fcntl.h>
43 #include <signal.h>
44 #include <time.h>
45 #include <ctype.h>
46 
47 #include "src/common/slurm_xlator.h"
48 #include "src/common/assoc_mgr.h"
49 #include "src/common/slurm_jobacct_gather.h"
50 #include "src/common/slurm_protocol_api.h"
51 #include "src/common/slurm_protocol_defs.h"
52 #include "src/common/slurm_acct_gather_energy.h"
53 #include "src/common/slurm_acct_gather_filesystem.h"
54 #include "src/common/slurm_acct_gather_interconnect.h"
55 #include "src/common/xstring.h"
56 #include "src/slurmd/common/proctrack.h"
57 
58 #include "common_jag.h"
59 
60 /* These are defined here so when we link with something other than
61  * the slurmstepd we will have these symbols defined.  They will get
62  * overwritten when linking with the slurmstepd.
63  */
64 #if defined (__APPLE__)
65 extern uint32_t g_tres_count __attribute__((weak_import));
66 extern char **assoc_mgr_tres_name_array __attribute__((weak_import));
67 #else
68 uint32_t g_tres_count;
69 char **assoc_mgr_tres_name_array;
70 #endif
71 
72 
73 static int cpunfo_frequency = 0;
74 static long hertz = 0;
75 
76 static int my_pagesize = 0;
77 static DIR  *slash_proc = NULL;
78 static int energy_profile = ENERGY_DATA_NODE_ENERGY_UP;
79 static uint64_t debug_flags = 0;
80 
_find_prec(void * x,void * key)81 static int _find_prec(void *x, void *key)
82 {
83 	jag_prec_t *prec = (jag_prec_t *) x;
84 	struct jobacctinfo *jobacct = (struct jobacctinfo *) key;
85 
86 	if (prec->pid == jobacct->pid)
87 		return 1;
88 
89 	return 0;
90 }
91 
92 /* return weighted frequency in mhz */
_update_weighted_freq(struct jobacctinfo * jobacct,char * sbuf)93 static uint32_t _update_weighted_freq(struct jobacctinfo *jobacct,
94 				      char * sbuf)
95 {
96 	uint32_t tot_cpu;
97 	int thisfreq = 0;
98 
99 	if (cpunfo_frequency)
100 		/* scaling not enabled */
101 		thisfreq = cpunfo_frequency;
102 	else
103 		sscanf(sbuf, "%d", &thisfreq);
104 
105 	jobacct->current_weighted_freq =
106 		jobacct->current_weighted_freq +
107 		(uint32_t)jobacct->this_sampled_cputime * thisfreq;
108 	tot_cpu = (uint32_t) jobacct->tres_usage_in_tot[TRES_ARRAY_CPU];
109 	if (tot_cpu) {
110 		return (uint32_t) (jobacct->current_weighted_freq / tot_cpu);
111 	} else
112 		return thisfreq;
113 }
114 
115 /* Parse /proc/cpuinfo file for CPU frequency.
116  * Store the value in global variable cpunfo_frequency
117  * RET: True if read valid CPU frequency */
_get_freq(char * str)118 inline static bool _get_freq(char *str)
119 {
120 	char *sep = NULL;
121 	double cpufreq_value;
122 	int cpu_mult;
123 
124 	if (strstr(str, "MHz"))
125 		cpu_mult = 1;
126 	else if (strstr(str, "GHz"))
127 		cpu_mult = 1000;	/* Scale to MHz */
128 	else
129 		return false;
130 
131 	sep = strchr(str, ':');
132 	if (!sep)
133 		return false;
134 
135 	if (sscanf(sep + 2, "%lf", &cpufreq_value) < 1)
136 		return false;
137 
138 	cpunfo_frequency = cpufreq_value * cpu_mult;
139 	debug2("cpunfo_frequency=%d", cpunfo_frequency);
140 
141 	return true;
142 }
143 
144 /*
145  * collects the Pss value from /proc/<pid>/smaps
146  */
_get_pss(char * proc_smaps_file,jag_prec_t * prec)147 static int _get_pss(char *proc_smaps_file, jag_prec_t *prec)
148 {
149         uint64_t pss;
150 	uint64_t p;
151         char line[128];
152         FILE *fp;
153 	int i;
154 
155 	fp = fopen(proc_smaps_file, "r");
156         if (!fp) {
157                 return -1;
158         }
159 
160 	if (fcntl(fileno(fp), F_SETFD, FD_CLOEXEC) == -1)
161 		error("%s: fcntl(%s): %m", __func__, proc_smaps_file);
162 	pss = 0;
163 
164         while (fgets(line,sizeof(line),fp)) {
165 
166                 if (xstrncmp(line, "Pss:", 4) != 0) {
167                         continue;
168                 }
169 
170                 for (i = 4; i < sizeof(line); i++) {
171 
172                         if (!isdigit(line[i])) {
173                                 continue;
174                         }
175                         if (sscanf(&line[i],"%"PRIu64"", &p) == 1) {
176                                 pss += p;
177                         }
178                         break;
179                 }
180         }
181 
182 	/* Check for error
183 	 */
184 	if (ferror(fp)) {
185 		fclose(fp);
186 		return -1;
187 	}
188 
189         fclose(fp);
190         /* Sanity checks */
191 
192         if (pss > 0 && prec->tres_data[TRES_ARRAY_MEM].size_read > pss) {
193 		pss *= 1024; /* Scale KB to B */
194                 prec->tres_data[TRES_ARRAY_MEM].size_read = pss;
195         }
196 
197 	debug3("%s: read pss %"PRIu64" for process %s",
198 	       __func__, pss, proc_smaps_file);
199 
200         return 0;
201 }
202 
_get_sys_interface_freq_line(uint32_t cpu,char * filename,char * sbuf)203 static int _get_sys_interface_freq_line(uint32_t cpu, char *filename,
204 					char * sbuf)
205 {
206 	int num_read, fd;
207 	FILE *sys_fp = NULL;
208 	char freq_file[80];
209 	char cpunfo_line [128];
210 
211 	if (cpunfo_frequency)
212 		/* scaling not enabled, static freq obtained */
213 		return 1;
214 
215 	snprintf(freq_file, 79,
216 		 "/sys/devices/system/cpu/cpu%d/cpufreq/%s",
217 		 cpu, filename);
218 	debug2("_get_sys_interface_freq_line: filename = %s ", freq_file);
219 	if ((sys_fp = fopen(freq_file, "r"))!= NULL) {
220 		/* frequency scaling enabled */
221 		fd = fileno(sys_fp);
222 		if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
223 			error("%s: fcntl(%s): %m", __func__, freq_file);
224 		num_read = read(fd, sbuf, (sizeof(sbuf) - 1));
225 		if (num_read > 0) {
226 			sbuf[num_read] = '\0';
227 			debug2(" cpu %d freq= %s", cpu, sbuf);
228 		}
229 		fclose(sys_fp);
230 	} else {
231 		/* frequency scaling not enabled */
232 		if (!cpunfo_frequency) {
233 			snprintf(freq_file, 14, "/proc/cpuinfo");
234 			debug2("_get_sys_interface_freq_line: filename = %s ",
235 			       freq_file);
236 			if ((sys_fp = fopen(freq_file, "r")) != NULL) {
237 				while (fgets(cpunfo_line, sizeof(cpunfo_line),
238 					     sys_fp) != NULL) {
239 					if (_get_freq(cpunfo_line))
240 						break;
241 				}
242 				fclose(sys_fp);
243 			}
244 		}
245 		return 1;
246 	}
247 	return 0;
248 }
249 
250 /*
251  * Check for lightweight processes (POSIX threads)
252  * Should be rewritten for FreeBSD so it doesn't depend on /proc
253  */
254 
_is_a_lwp(uint32_t pid)255 static int _is_a_lwp(uint32_t pid)
256 {
257 	char *filename = NULL;
258 	char bf[4096];
259 	int fd, attempts = 1;
260 	ssize_t n;
261 	char *tgids = NULL;
262 	pid_t tgid = -1;
263 
264 	// Disable check for now, this will only skew process accounting
265 	// slightly by including threads
266 	return 0;
267 
268 	xstrfmtcat(filename, "/proc/%u/status", pid);
269 
270 	fd = open(filename, O_RDONLY);
271 	if (fd < 0) {
272 		xfree(filename);
273 		return SLURM_ERROR;
274 	}
275 
276 again:
277 	n = read(fd, bf, sizeof(bf) - 1);
278 	if (n == -1 && (errno == EINTR || errno == EAGAIN) && attempts < 100) {
279 		attempts++;
280 		goto again;
281 	}
282 	if (n <= 0) {
283 		close(fd);
284 		xfree(filename);
285 		return SLURM_ERROR;
286 	}
287 	bf[n] = '\0';
288 	close(fd);
289 	xfree(filename);
290 
291 	tgids = xstrstr(bf, "Tgid:");
292 
293 	if (tgids) {
294 		tgids += 5; /* strlen("Tgid:") */
295 		tgid = atoi(tgids);
296 	} else
297 		error("%s: Tgid: string not found for pid=%u", __func__, pid);
298 
299 	if (pid != (uint32_t)tgid) {
300 		debug3("%s: pid=%u != tgid=%u is a lightweight process",
301 		       __func__, pid, tgid);
302 		return 1;
303 	} else {
304 		debug3("%s: pid=%u == tgid=%u is the leader LWP",
305 		       __func__, pid, tgid);
306 		return 0;
307 	}
308 }
309 
310 /* _get_process_data_line() - get line of data from /proc/<pid>/stat
311  *
312  * IN:	in - input file descriptor
313  * OUT:	prec - the destination for the data
314  *
315  * RETVAL:	==0 - no valid data
316  * 		!=0 - data are valid
317  *
318  * Based upon stat2proc() from the ps command. It can handle arbitrary
319  * executable file basenames for `cmd', i.e. those with embedded whitespace or
320  * embedded ')'s. Such names confuse %s (see scanf(3)), so the string is split
321  * and %39c is used instead. (except for embedded ')' "(%[^)]c)" would work.
322  */
_get_process_data_line(int in,jag_prec_t * prec)323 static int _get_process_data_line(int in, jag_prec_t *prec) {
324 	char sbuf[512], *tmp;
325 	int num_read, nvals;
326 	char cmd[40], state[1];
327 	int ppid, pgrp, session, tty_nr, tpgid;
328 	long unsigned flags, minflt, cminflt, majflt, cmajflt;
329 	long unsigned utime, stime, starttime, vsize;
330 	long int cutime, cstime, priority, nice, timeout, itrealvalue, rss;
331 	long unsigned f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11, f12, f13;
332 	int exit_signal, last_cpu;
333 
334 	num_read = read(in, sbuf, (sizeof(sbuf) - 1));
335 	if (num_read <= 0)
336 		return 0;
337 	sbuf[num_read] = '\0';
338 
339 	/*
340 	 * split into "PID (cmd" and "<rest>" replace trailing ')' with NULL
341 	 */
342 	tmp = strrchr(sbuf, ')');
343 	if (!tmp)
344 		return 0;
345 	*tmp = '\0';
346 
347 	/* parse these two strings separately, skipping the leading "(". */
348 	nvals = sscanf(sbuf, "%d (%39c", &prec->pid, cmd);
349 	if (nvals < 2)
350 		return 0;
351 
352 	nvals = sscanf(tmp + 2,	 /* skip space after ')' too */
353 		       "%c %d %d %d %d %d "
354 		       "%lu %lu %lu %lu %lu "
355 		       "%lu %lu %ld %ld %ld %ld "
356 		       "%ld %ld %lu %lu %ld "
357 		       "%lu %lu %lu %lu %lu "
358 		       "%lu %lu %lu %lu %lu "
359 		       "%lu %lu %lu %d %d ",
360 		       state, &ppid, &pgrp, &session, &tty_nr, &tpgid,
361 		       &flags, &minflt, &cminflt, &majflt, &cmajflt,
362 		       &utime, &stime, &cutime, &cstime, &priority, &nice,
363 		       &timeout, &itrealvalue, &starttime, &vsize, &rss,
364 		       &f1, &f2, &f3, &f4, &f5 ,&f6, &f7, &f8, &f9, &f10, &f11,
365 		       &f12, &f13, &exit_signal, &last_cpu);
366 	/* There are some additional fields, which we do not scan or use */
367 	if ((nvals < 37) || (rss < 0))
368 		return 0;
369 
370 	/*
371 	 * If current pid corresponds to a Light Weight Process (Thread POSIX)
372 	 * or there was an error, skip it, we will only account the original
373 	 * process (pid==tgid).
374 	 */
375 	if (_is_a_lwp(prec->pid))
376 		return 0;
377 
378 	/* Copy the values that slurm records into our data structure */
379 	prec->ppid  = ppid;
380 
381 	prec->tres_data[TRES_ARRAY_PAGES].size_read = majflt;
382 	prec->tres_data[TRES_ARRAY_VMEM].size_read = vsize;
383 	prec->tres_data[TRES_ARRAY_MEM].size_read = rss * my_pagesize;
384 
385 	/*
386 	 * Store unnormalized times, we will normalize in when
387 	 * transfering to a struct jobacctinfo in job_common_poll_data()
388 	 */
389 	prec->usec = (double)utime;
390 	prec->ssec = (double)stime;
391 	prec->last_cpu = last_cpu;
392 	return 1;
393 }
394 
395 /* _get_process_memory_line() - get line of data from /proc/<pid>/statm
396  *
397  * IN:	in - input file descriptor
398  * OUT:	prec - the destination for the data
399  *
400  * RETVAL:	==0 - no valid data
401  * 		!=0 - data are valid
402  *
403  * The *prec will mostly be filled in. We need to simply subtract the
404  * amount of shared memory used by the process (in KB) from *prec->rss
405  * and return the updated struct.
406  *
407  */
_get_process_memory_line(int in,jag_prec_t * prec)408 static int _get_process_memory_line(int in, jag_prec_t *prec)
409 {
410 	char sbuf[256];
411 	int num_read, nvals;
412 	long int size, rss, share, text, lib, data, dt;
413 
414 	num_read = read(in, sbuf, (sizeof(sbuf) - 1));
415 	if (num_read <= 0)
416 		return 0;
417 	sbuf[num_read] = '\0';
418 
419 	nvals = sscanf(sbuf,
420 		       "%ld %ld %ld %ld %ld %ld %ld",
421 		       &size, &rss, &share, &text, &lib, &data, &dt);
422 	/* There are some additional fields, which we do not scan or use */
423 	if (nvals != 7)
424 		return 0;
425 
426 	/* If shared > rss then there is a problem, give up... */
427 	if (share > rss) {
428 		debug("jobacct_gather_linux: share > rss - bail!");
429 		return 0;
430 	}
431 
432 	/* Copy the values that slurm records into our data structure */
433 	prec->tres_data[TRES_ARRAY_MEM].size_read =
434 		(rss - share) * my_pagesize;;
435 
436 	return 1;
437 }
438 
_remove_share_data(char * proc_stat_file,jag_prec_t * prec)439 static int _remove_share_data(char *proc_stat_file, jag_prec_t *prec)
440 {
441 	FILE *statm_fp = NULL;
442 	char proc_statm_file[256];	/* Allow ~20x extra length */
443 	int rc = 0, fd;
444 
445 	snprintf(proc_statm_file, sizeof(proc_statm_file), "%sm",
446 		 proc_stat_file);
447 	if (!(statm_fp = fopen(proc_statm_file, "r")))
448 		return rc;  /* Assume the process went away */
449 	fd = fileno(statm_fp);
450 	if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
451 		error("%s: fcntl(%s): %m", __func__, proc_statm_file);
452 	rc = _get_process_memory_line(fd, prec);
453 	fclose(statm_fp);
454 	return rc;
455 }
456 
457 /* _get_process_io_data_line() - get line of data from /proc/<pid>/io
458  *
459  * IN:	in - input file descriptor
460  * OUT:	prec - the destination for the data
461  *
462  * RETVAL:	==0 - no valid data
463  * 		!=0 - data are valid
464  *
465  * /proc/<pid>/io content format is:
466  * rchar: <# of characters read>
467  * wrchar: <# of characters written>
468  *   . . .
469  */
_get_process_io_data_line(int in,jag_prec_t * prec)470 static int _get_process_io_data_line(int in, jag_prec_t *prec) {
471 	char sbuf[256];
472 	char f1[7], f3[7];
473 	int num_read, nvals;
474 	uint64_t rchar, wchar;
475 
476 	num_read = read(in, sbuf, (sizeof(sbuf) - 1));
477 	if (num_read <= 0)
478 		return 0;
479 	sbuf[num_read] = '\0';
480 
481 	nvals = sscanf(sbuf, "%s %"PRIu64" %s %"PRIu64"",
482 		       f1, &rchar, f3, &wchar);
483 	if (nvals < 4)
484 		return 0;
485 
486 	if (_is_a_lwp(prec->pid))
487 		return 0;
488 
489 	/* keep real value here since we aren't doubles */
490 	prec->tres_data[TRES_ARRAY_FS_DISK].size_read = rchar;
491 	prec->tres_data[TRES_ARRAY_FS_DISK].size_write = wchar;
492 
493 	return 1;
494 }
495 
_handle_stats(List prec_list,char * proc_stat_file,char * proc_io_file,char * proc_smaps_file,jag_callbacks_t * callbacks,int tres_count)496 static void _handle_stats(List prec_list, char *proc_stat_file,
497 			  char *proc_io_file, char *proc_smaps_file,
498 			  jag_callbacks_t *callbacks,
499 			  int tres_count)
500 {
501 	static int no_share_data = -1;
502 	static int use_pss = -1;
503 	FILE *stat_fp = NULL;
504 	FILE *io_fp = NULL;
505 	int fd, fd2, i;
506 	jag_prec_t *prec = NULL;
507 
508 	if (no_share_data == -1) {
509 		char *acct_params = slurm_get_jobacct_gather_params();
510 		if (acct_params && xstrcasestr(acct_params, "NoShare"))
511 			no_share_data = 1;
512 		else
513 			no_share_data = 0;
514 
515 		if (acct_params && xstrcasestr(acct_params, "UsePss"))
516 			use_pss = 1;
517 		else
518 			use_pss = 0;
519 		xfree(acct_params);
520 	}
521 
522 	if (!(stat_fp = fopen(proc_stat_file, "r")))
523 		return;  /* Assume the process went away */
524 	/*
525 	 * Close the file on exec() of user tasks.
526 	 *
527 	 * NOTE: If we fork() slurmstepd after the
528 	 * fopen() above and before the fcntl() below,
529 	 * then the user task may have this extra file
530 	 * open, which can cause problems for
531 	 * checkpoint/restart, but this should be a very rare
532 	 * problem in practice.
533 	 */
534 	fd = fileno(stat_fp);
535 	if (fcntl(fd, F_SETFD, FD_CLOEXEC) == -1)
536 		error("%s: fcntl(%s): %m", __func__, proc_stat_file);
537 
538 	prec = xmalloc(sizeof(jag_prec_t));
539 
540 	if (!tres_count) {
541 		assoc_mgr_lock_t locks = {
542 			NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK,
543 			READ_LOCK, NO_LOCK, NO_LOCK };
544 		assoc_mgr_lock(&locks);
545 		tres_count = g_tres_count;
546 		assoc_mgr_unlock(&locks);
547 	}
548 
549 	prec->tres_count = tres_count;
550 	prec->tres_data = xmalloc(prec->tres_count *
551 				  sizeof(acct_gather_data_t));
552 
553 	/* Initialize read/writes */
554 	for (i = 0; i < prec->tres_count; i++) {
555 		prec->tres_data[i].num_reads = INFINITE64;
556 		prec->tres_data[i].num_writes = INFINITE64;
557 		prec->tres_data[i].size_read = INFINITE64;
558 		prec->tres_data[i].size_write = INFINITE64;
559 	}
560 
561 	if (!_get_process_data_line(fd, prec)) {
562 		fclose(stat_fp);
563 		goto bail_out;
564 	}
565 
566 	fclose(stat_fp);
567 
568 	if (acct_gather_filesystem_g_get_data(prec->tres_data) < 0) {
569 		debug2("problem retrieving filesystem data");
570 	}
571 
572 	if (acct_gather_interconnect_g_get_data(prec->tres_data) < 0) {
573 		debug2("problem retrieving interconnect data");
574 	}
575 
576 	/* Remove shared data from rss */
577 	if (no_share_data && !_remove_share_data(proc_stat_file, prec))
578 		goto bail_out;
579 
580 	/* Use PSS instead if RSS */
581 	if (use_pss && _get_pss(proc_smaps_file, prec) == -1)
582 		goto bail_out;
583 
584 	if ((io_fp = fopen(proc_io_file, "r"))) {
585 		fd2 = fileno(io_fp);
586 		if (fcntl(fd2, F_SETFD, FD_CLOEXEC) == -1)
587 			error("%s: fcntl: %m", __func__);
588 		if (!_get_process_io_data_line(fd2, prec)) {
589 			fclose(io_fp);
590 			goto bail_out;
591 		}
592 		fclose(io_fp);
593 	}
594 
595 	list_append(prec_list, prec);
596 	return;
597 
598 bail_out:
599 	xfree(prec->tres_data);
600 	xfree(prec);
601 	return;
602 }
603 
_get_precs(List task_list,bool pgid_plugin,uint64_t cont_id,jag_callbacks_t * callbacks)604 static List _get_precs(List task_list, bool pgid_plugin, uint64_t cont_id,
605 		       jag_callbacks_t *callbacks)
606 {
607 	List prec_list = list_create(destroy_jag_prec);
608 	char	proc_stat_file[256];	/* Allow ~20x extra length */
609 	char	proc_io_file[256];	/* Allow ~20x extra length */
610 	char	proc_smaps_file[256];	/* Allow ~20x extra length */
611 	static	int	slash_proc_open = 0;
612 	int i;
613 	struct jobacctinfo *jobacct = NULL;
614 
615 	xassert(task_list);
616 
617 	jobacct = list_peek(task_list);
618 
619 	if (!pgid_plugin) {
620 		pid_t *pids = NULL;
621 		int npids = 0;
622 		/* get only the processes in the proctrack container */
623 		proctrack_g_get_pids(cont_id, &pids, &npids);
624 		if (!npids) {
625 			/* update consumed energy even if pids do not exist */
626 			if (jobacct) {
627 				acct_gather_energy_g_get_sum(
628 					energy_profile,
629 					&jobacct->energy);
630 				jobacct->tres_usage_in_tot[TRES_ARRAY_ENERGY] =
631 					jobacct->energy.consumed_energy;
632 				jobacct->tres_usage_out_tot[TRES_ARRAY_ENERGY] =
633 					jobacct->energy.current_watts;
634 				debug2("%s: energy = %"PRIu64" watts = %"PRIu64,
635 				       __func__,
636 				       jobacct->tres_usage_in_tot[
637 					       TRES_ARRAY_ENERGY],
638 				       jobacct->tres_usage_out_tot[
639 					       TRES_ARRAY_ENERGY]);
640 			}
641 
642 			debug4("no pids in this container %"PRIu64"", cont_id);
643 			goto finished;
644 		}
645 		for (i = 0; i < npids; i++) {
646 			snprintf(proc_stat_file, 256, "/proc/%d/stat", pids[i]);
647 			snprintf(proc_io_file, 256, "/proc/%d/io", pids[i]);
648 			snprintf(proc_smaps_file, 256, "/proc/%d/smaps", pids[i]);
649 			_handle_stats(prec_list, proc_stat_file, proc_io_file,
650 				      proc_smaps_file, callbacks,
651 				      jobacct ? jobacct->tres_count : 0);
652 		}
653 		xfree(pids);
654 	} else {
655 		struct dirent *slash_proc_entry;
656 		char  *iptr = NULL, *optr = NULL, *optr2 = NULL;
657 
658 		if (slash_proc_open) {
659 			rewinddir(slash_proc);
660 		} else {
661 			slash_proc=opendir("/proc");
662 			if (slash_proc == NULL) {
663 				perror("opening /proc");
664 				goto finished;
665 			}
666 			slash_proc_open=1;
667 		}
668 		strcpy(proc_stat_file, "/proc/");
669 		strcpy(proc_io_file, "/proc/");
670 		strcpy(proc_smaps_file, "/proc/");
671 
672 		while ((slash_proc_entry = readdir(slash_proc))) {
673 
674 			/* Save a few cyles by simulating
675 			 * strcat(statFileName, slash_proc_entry->d_name);
676 			 * strcat(statFileName, "/stat");
677 			 * while checking for a numeric filename (which really
678 			 * should be a pid). Then do the same for the
679 			 * /proc/<pid>/io file name.
680 			 */
681 			optr = proc_stat_file + sizeof("/proc");
682 			iptr = slash_proc_entry->d_name;
683 			i = 0;
684 			do {
685 				if ((*iptr < '0') ||
686 				    ((*optr++ = *iptr++) > '9')) {
687 					i = -1;
688 					break;
689 				}
690 			} while (*iptr);
691 
692 			if (i == -1)
693 				continue;
694 			iptr = (char*)"/stat";
695 
696 			do {
697 				*optr++ = *iptr++;
698 			} while (*iptr);
699 			*optr = 0;
700 
701 			optr2 = proc_io_file + sizeof("/proc");
702 			iptr = slash_proc_entry->d_name;
703 			i = 0;
704 			do {
705 				if ((*iptr < '0') ||
706 				    ((*optr2++ = *iptr++) > '9')) {
707 					i = -1;
708 					break;
709 				}
710 			} while (*iptr);
711 			if (i == -1)
712 				continue;
713 			iptr = (char*)"/io";
714 
715 			do {
716 				*optr2++ = *iptr++;
717 			} while (*iptr);
718 			*optr2 = 0;
719 
720 			optr2 = proc_smaps_file + sizeof("/proc");
721 			iptr = slash_proc_entry->d_name;
722 			i = 0;
723 			do {
724 				if ((*iptr < '0') ||
725 				    ((*optr2++ = *iptr++) > '9')) {
726 					i = -1;
727 					break;
728 				}
729 			} while (*iptr);
730 			if (i == -1)
731 				continue;
732 			iptr = (char*)"/smaps";
733 
734 			do {
735 				*optr2++ = *iptr++;
736 			} while (*iptr);
737 			*optr2 = 0;
738 
739 			_handle_stats(prec_list, proc_stat_file, proc_io_file,
740 				      proc_smaps_file,callbacks,
741 				      jobacct ? jobacct->tres_count : 0);
742 		}
743 	}
744 
745 finished:
746 
747 	return prec_list;
748 }
749 
_record_profile(struct jobacctinfo * jobacct)750 static void _record_profile(struct jobacctinfo *jobacct)
751 {
752 	enum {
753 		FIELD_CPUFREQ,
754 		FIELD_CPUTIME,
755 		FIELD_CPUUTIL,
756 		FIELD_RSS,
757 		FIELD_VMSIZE,
758 		FIELD_PAGES,
759 		FIELD_READ,
760 		FIELD_WRITE,
761 		FIELD_CNT
762 	};
763 
764 	acct_gather_profile_dataset_t dataset[] = {
765 		{ "CPUFrequency", PROFILE_FIELD_UINT64 },
766 		{ "CPUTime", PROFILE_FIELD_DOUBLE },
767 		{ "CPUUtilization", PROFILE_FIELD_DOUBLE },
768 		{ "RSS", PROFILE_FIELD_UINT64 },
769 		{ "VMSize", PROFILE_FIELD_UINT64 },
770 		{ "Pages", PROFILE_FIELD_UINT64 },
771 		{ "ReadMB", PROFILE_FIELD_DOUBLE },
772 		{ "WriteMB", PROFILE_FIELD_DOUBLE },
773 		{ NULL, PROFILE_FIELD_NOT_SET }
774 	};
775 
776 	static int64_t profile_gid = -1;
777 	double et;
778 	union {
779 		double d;
780 		uint64_t u64;
781 	} data[FIELD_CNT];
782 
783 	if (profile_gid == -1)
784 		profile_gid = acct_gather_profile_g_create_group("Tasks");
785 
786 	/* Create the dataset first */
787 	if (jobacct->dataset_id < 0) {
788 		char ds_name[32];
789 		snprintf(ds_name, sizeof(ds_name), "%u", jobacct->id.taskid);
790 
791 		jobacct->dataset_id = acct_gather_profile_g_create_dataset(
792 			ds_name, profile_gid, dataset);
793 		if (jobacct->dataset_id == SLURM_ERROR) {
794 			error("JobAcct: Failed to create the dataset for "
795 			      "task %d",
796 			      jobacct->pid);
797 			return;
798 		}
799 	}
800 
801 	if (jobacct->dataset_id < 0)
802 		return;
803 
804 	data[FIELD_CPUFREQ].u64 = jobacct->act_cpufreq;
805 	/* Profile Mem and VMem as KB */
806 	data[FIELD_RSS].u64 =
807 		jobacct->tres_usage_in_tot[TRES_ARRAY_MEM] / 1024;
808 	data[FIELD_VMSIZE].u64 =
809 		jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM] / 1024;
810 	data[FIELD_PAGES].u64 = jobacct->tres_usage_in_tot[TRES_ARRAY_PAGES];
811 
812 	/* delta from last snapshot */
813 	if (!jobacct->last_time) {
814 		data[FIELD_CPUTIME].d = 0;
815 		data[FIELD_CPUUTIL].d = 0.0;
816 		data[FIELD_READ].d = 0.0;
817 		data[FIELD_WRITE].d = 0.0;
818 	} else {
819 		data[FIELD_CPUTIME].d =
820 			((double)jobacct->tres_usage_in_tot[TRES_ARRAY_CPU] -
821 			 jobacct->last_total_cputime) / CPU_TIME_ADJ;
822 
823 		if (data[FIELD_CPUTIME].d < 0)
824 			data[FIELD_CPUTIME].d =
825 				jobacct->tres_usage_in_tot[TRES_ARRAY_CPU] /
826 				CPU_TIME_ADJ;
827 
828 		et = (jobacct->cur_time - jobacct->last_time);
829 		if (!et)
830 			data[FIELD_CPUUTIL].d = 0.0;
831 		else
832 			data[FIELD_CPUUTIL].d =
833 				(100.0 * (double)data[FIELD_CPUTIME].d) /
834 				((double) et);
835 
836 		data[FIELD_READ].d = (double) jobacct->
837 			tres_usage_in_tot[TRES_ARRAY_FS_DISK] -
838 			jobacct->last_tres_usage_in_tot;
839 
840 		if (data[FIELD_READ].d < 0)
841 			data[FIELD_READ].d =
842 				jobacct->tres_usage_in_tot[TRES_ARRAY_FS_DISK];
843 
844 		data[FIELD_WRITE].d = (double) jobacct->
845 			tres_usage_out_tot[TRES_ARRAY_FS_DISK] -
846 			jobacct->last_tres_usage_out_tot;
847 
848 		if (data[FIELD_WRITE].d < 0)
849 			data[FIELD_WRITE].d =
850 				jobacct->tres_usage_out_tot[TRES_ARRAY_FS_DISK];
851 
852 		/* Profile disk as MB */
853 		data[FIELD_READ].d /= 1048576.0;
854 		data[FIELD_WRITE].d /= 1048576.0;
855 	}
856 
857 	if (debug_flags & DEBUG_FLAG_PROFILE) {
858 		char str[256];
859 		info("PROFILE-Task: %s", acct_gather_profile_dataset_str(
860 			     dataset, data, str, sizeof(str)));
861 	}
862 	acct_gather_profile_g_add_sample_data(jobacct->dataset_id,
863 	                                      (void *)data, jobacct->cur_time);
864 }
865 
jag_common_init(long in_hertz)866 extern void jag_common_init(long in_hertz)
867 {
868 	uint32_t profile_opt;
869 
870 	debug_flags = slurm_get_debug_flags();
871 
872 	acct_gather_profile_g_get(ACCT_GATHER_PROFILE_RUNNING,
873 				  &profile_opt);
874 
875 	/* If we are profiling energy it will be checked at a
876 	   different rate, so just grab the last one.
877 	*/
878 	if (profile_opt & ACCT_GATHER_PROFILE_ENERGY)
879 		energy_profile = ENERGY_DATA_NODE_ENERGY;
880 
881 	if (in_hertz) {
882 		hertz = in_hertz;
883 	} else {
884 		hertz = sysconf(_SC_CLK_TCK);
885 
886 		if (hertz < 1) {
887 			error ("_get_process_data: unable to get clock rate");
888 			hertz = 100;	/* default on many systems */
889 		}
890 	}
891 
892 	my_pagesize = getpagesize();
893 }
894 
jag_common_fini(void)895 extern void jag_common_fini(void)
896 {
897 	if (slash_proc)
898 		(void) closedir(slash_proc);
899 }
900 
destroy_jag_prec(void * object)901 extern void destroy_jag_prec(void *object)
902 {
903 	jag_prec_t *prec = (jag_prec_t *)object;
904 	xfree(prec->tres_data);
905 	xfree(prec);
906 	return;
907 }
908 
print_jag_prec(jag_prec_t * prec)909 extern void print_jag_prec(jag_prec_t *prec)
910 {
911 	int i;
912 	assoc_mgr_lock_t locks = {
913 		NO_LOCK, NO_LOCK, NO_LOCK, NO_LOCK,
914 		READ_LOCK, NO_LOCK, NO_LOCK };
915 
916 	info("pid %d (ppid %d)", prec->pid, prec->ppid);
917 	info("act_cpufreq\t%d", prec->act_cpufreq);
918 	info("ssec \t%f", prec->ssec);
919 	assoc_mgr_lock(&locks);
920 	for (i = 0; i < prec->tres_count; i++) {
921 		if (prec->tres_data[i].size_read == INFINITE64)
922 			continue;
923 		info("%s in/read \t%"PRIu64"",
924 		     assoc_mgr_tres_name_array[i],
925 		     prec->tres_data[i].size_read);
926 		info("%s out/write \t%"PRIu64"",
927 		     assoc_mgr_tres_name_array[i],
928 		     prec->tres_data[i].size_write);
929 	}
930 	assoc_mgr_unlock(&locks);
931 	info("usec \t%f", prec->usec);
932 }
933 
jag_common_poll_data(List task_list,bool pgid_plugin,uint64_t cont_id,jag_callbacks_t * callbacks,bool profile)934 extern void jag_common_poll_data(
935 	List task_list, bool pgid_plugin, uint64_t cont_id,
936 	jag_callbacks_t *callbacks, bool profile)
937 {
938 	/* Update the data */
939 	List prec_list = NULL;
940 	uint64_t total_job_mem = 0, total_job_vsize = 0;
941 	ListIterator itr;
942 	jag_prec_t *prec = NULL;
943 	struct jobacctinfo *jobacct = NULL;
944 	static int processing = 0;
945 	char sbuf[72];
946 	int energy_counted = 0;
947 	time_t ct;
948 	static int over_memory_kill = -1;
949 	int i = 0;
950 
951 	xassert(callbacks);
952 
953 	if (!pgid_plugin && (cont_id == NO_VAL64)) {
954 		debug("cont_id hasn't been set yet not running poll");
955 		return;
956 	}
957 
958 	if (processing) {
959 		debug("already running, returning");
960 		return;
961 	}
962 	processing = 1;
963 
964 	if (!callbacks->get_precs)
965 		callbacks->get_precs = _get_precs;
966 
967 	ct = time(NULL);
968 	prec_list = (*(callbacks->get_precs))(task_list, pgid_plugin, cont_id,
969 					      callbacks);
970 
971 	if (!list_count(prec_list) || !task_list || !list_count(task_list))
972 		goto finished;	/* We have no business being here! */
973 
974 	itr = list_iterator_create(task_list);
975 	while ((jobacct = list_next(itr))) {
976 		double cpu_calc;
977 		double last_total_cputime;
978 		if (!(prec = list_find_first(prec_list, _find_prec, jobacct)))
979 			continue;
980 
981 		/*
982 		 * Only jobacct_gather/cgroup uses prec_extra, and we want to
983 		 * make sure we call it once per task, so call it here as we
984 		 * iterate through the tasks instead of in get_precs.
985 		 */
986 		if (callbacks->prec_extra)
987 			(*(callbacks->prec_extra))(prec, jobacct->id.taskid);
988 
989 #if _DEBUG
990 		info("pid:%u ppid:%u rss:%"PRIu64" B",
991 		     prec->pid, prec->ppid,
992 		     prec->tres_data[TRES_ARRAY_MEM].size_read);
993 #endif
994 		/* find all my descendents */
995 		if (callbacks->get_offspring_data)
996 			(*(callbacks->get_offspring_data))
997 				(prec_list, prec, prec->pid);
998 
999 		last_total_cputime =
1000 			(double)jobacct->tres_usage_in_tot[TRES_ARRAY_CPU];
1001 
1002 		cpu_calc = (prec->ssec + prec->usec) / (double)hertz;
1003 
1004 		/*
1005 		 * Since we are not storing things as a double anymore make it
1006 		 * bigger so we don't loose precision.
1007 		 */
1008 		cpu_calc *= CPU_TIME_ADJ;
1009 
1010 		prec->tres_data[TRES_ARRAY_CPU].size_read = (uint64_t)cpu_calc;
1011 
1012 		/* get energy consumption
1013 		 * only once is enough since we
1014 		 * report per node energy consumption.
1015 		 * Energy is stored in read fields, while power is stored
1016 		 * in write fields.*/
1017 		debug2("energycounted = %d", energy_counted);
1018 		if (energy_counted == 0) {
1019 			acct_gather_energy_g_get_sum(
1020 				energy_profile,
1021 				&jobacct->energy);
1022 			prec->tres_data[TRES_ARRAY_ENERGY].size_read =
1023 				jobacct->energy.consumed_energy;
1024 			prec->tres_data[TRES_ARRAY_ENERGY].size_write =
1025 				jobacct->energy.current_watts;
1026 			debug2("%s: energy = %"PRIu64" watts = %"PRIu64" ave_watts = %u",
1027 			       __func__,
1028 			       prec->tres_data[TRES_ARRAY_ENERGY].size_read,
1029 			       prec->tres_data[TRES_ARRAY_ENERGY].size_write,
1030 			       jobacct->energy.ave_watts);
1031 			energy_counted = 1;
1032 		}
1033 
1034 		/* tally their usage */
1035 		for (i = 0; i < jobacct->tres_count; i++) {
1036 			if (prec->tres_data[i].size_read == INFINITE64)
1037 				continue;
1038 			if (jobacct->tres_usage_in_max[i] == INFINITE64)
1039 				jobacct->tres_usage_in_max[i] =
1040 					prec->tres_data[i].size_read;
1041 			else
1042 				jobacct->tres_usage_in_max[i] =
1043 					MAX(jobacct->tres_usage_in_max[i],
1044 					    prec->tres_data[i].size_read);
1045 			/*
1046 			 * Even with min we want to get the max as we are
1047 			 * looking at a specific task aso we are always looking
1048 			 * at the max that task had, not the min (or lots of
1049 			 * things will be zero).  The min is from comparing
1050 			 * ranks later when combining.  So here it will be the
1051 			 * same as the max value set above.
1052 			 * (same thing goes for the out)
1053 			 */
1054 			jobacct->tres_usage_in_min[i] =
1055 				jobacct->tres_usage_in_max[i];
1056 			jobacct->tres_usage_in_tot[i] =
1057 				prec->tres_data[i].size_read;
1058 
1059 			if (jobacct->tres_usage_out_max[i] == INFINITE64)
1060 				jobacct->tres_usage_out_max[i] =
1061 					prec->tres_data[i].size_write;
1062 			else
1063 				jobacct->tres_usage_out_max[i] =
1064 					MAX(jobacct->tres_usage_out_max[i],
1065 					    prec->tres_data[i].size_write);
1066 			jobacct->tres_usage_out_min[i] =
1067 				jobacct->tres_usage_out_max[i];
1068 			jobacct->tres_usage_out_tot[i] =
1069 				prec->tres_data[i].size_write;
1070 		}
1071 
1072 		total_job_mem += jobacct->tres_usage_in_tot[TRES_ARRAY_MEM];
1073 		total_job_vsize += jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM];
1074 
1075 		/* Update the cpu times */
1076 		jobacct->user_cpu_sec = (uint32_t)(prec->usec / (double)hertz);
1077 		jobacct->sys_cpu_sec = (uint32_t)(prec->ssec / (double)hertz);
1078 
1079 		/* compute frequency */
1080 		jobacct->this_sampled_cputime =
1081 			cpu_calc - last_total_cputime;
1082 		_get_sys_interface_freq_line(
1083 			prec->last_cpu,
1084 			"cpuinfo_cur_freq", sbuf);
1085 		jobacct->act_cpufreq =
1086 			_update_weighted_freq(jobacct, sbuf);
1087 
1088 		debug("%s: Task %u pid %d ave_freq = %u mem size/max %"PRIu64"/%"PRIu64" vmem size/max %"PRIu64"/%"PRIu64", disk read size/max (%"PRIu64"/%"PRIu64"), disk write size/max (%"PRIu64"/%"PRIu64"), time %f(%u+%u) Energy tot/max %"PRIu64"/%"PRIu64" TotPower %"PRIu64" MaxPower %"PRIu64" MinPower %"PRIu64,
1089 		      __func__,
1090 		      jobacct->id.taskid,
1091 		      jobacct->pid,
1092 		      jobacct->act_cpufreq,
1093 		      jobacct->tres_usage_in_tot[TRES_ARRAY_MEM],
1094 		      jobacct->tres_usage_in_max[TRES_ARRAY_MEM],
1095 		      jobacct->tres_usage_in_tot[TRES_ARRAY_VMEM],
1096 		      jobacct->tres_usage_in_max[TRES_ARRAY_VMEM],
1097 		      jobacct->tres_usage_in_tot[TRES_ARRAY_FS_DISK],
1098 		      jobacct->tres_usage_in_max[TRES_ARRAY_FS_DISK],
1099 		      jobacct->tres_usage_out_tot[TRES_ARRAY_FS_DISK],
1100 		      jobacct->tres_usage_out_max[TRES_ARRAY_FS_DISK],
1101 		      (double)(jobacct->tres_usage_in_tot[TRES_ARRAY_CPU] /
1102 			       CPU_TIME_ADJ),
1103 		      jobacct->user_cpu_sec,
1104 		      jobacct->sys_cpu_sec,
1105 		      jobacct->tres_usage_in_tot[TRES_ARRAY_ENERGY],
1106 		      jobacct->tres_usage_in_max[TRES_ARRAY_ENERGY],
1107 		      jobacct->tres_usage_out_tot[TRES_ARRAY_ENERGY],
1108 		      jobacct->tres_usage_out_max[TRES_ARRAY_ENERGY],
1109 		      jobacct->tres_usage_out_min[TRES_ARRAY_ENERGY]);
1110 
1111 		if (profile &&
1112 		    acct_gather_profile_g_is_active(ACCT_GATHER_PROFILE_TASK)) {
1113 			jobacct->cur_time = ct;
1114 
1115 			_record_profile(jobacct);
1116 
1117 			jobacct->last_tres_usage_in_tot =
1118 				jobacct->tres_usage_in_tot[TRES_ARRAY_FS_DISK];
1119 			jobacct->last_tres_usage_out_tot =
1120 				jobacct->tres_usage_out_tot[TRES_ARRAY_FS_DISK];
1121 			jobacct->last_total_cputime =
1122 				jobacct->tres_usage_in_tot[TRES_ARRAY_CPU];
1123 
1124 			jobacct->last_time = jobacct->cur_time;
1125 		}
1126 	}
1127 	list_iterator_destroy(itr);
1128 
1129 	if (over_memory_kill == -1)
1130 		over_memory_kill = slurm_get_job_acct_oom_kill();
1131 
1132 	if (over_memory_kill)
1133 		jobacct_gather_handle_mem_limit(total_job_mem,
1134 						total_job_vsize);
1135 
1136 finished:
1137 	FREE_NULL_LIST(prec_list);
1138 	processing = 0;
1139 }
1140