1 /*****************************************************************************\
2  *  burst_buffer_common.c - Common logic for managing burst_buffers
3  *
4  *  NOTE: These functions are designed so they can be used by multiple burst
5  *  buffer plugins at the same time (e.g. you might provide users access to
6  *  both burst_buffer/datawarp and burst_buffer/generic on the same system),
7  *  so the state information is largely in the individual plugin and passed
8  *  as a pointer argument to these functions.
9  *****************************************************************************
10  *  Copyright (C) 2014-2015 SchedMD LLC.
11  *  Written by Morris Jette <jette@schedmd.com>
12  *
13  *  This file is part of Slurm, a resource management program.
14  *  For details, see <https://slurm.schedmd.com/>.
15  *  Please also read the included file: DISCLAIMER.
16  *
17  *  Slurm is free software; you can redistribute it and/or modify it under
18  *  the terms of the GNU General Public License as published by the Free
19  *  Software Foundation; either version 2 of the License, or (at your option)
20  *  any later version.
21  *
22  *  In addition, as a special exception, the copyright holders give permission
23  *  to link the code of portions of this program with the OpenSSL library under
24  *  certain conditions as described in each individual source file, and
25  *  distribute linked combinations including the two. You must obey the GNU
26  *  General Public License in all respects for all of the code used other than
27  *  OpenSSL. If you modify file(s) with this exception, you may extend this
28  *  exception to your version of the file(s), but you are not obligated to do
29  *  so. If you do not wish to do so, delete this exception statement from your
30  *  version.  If you delete this exception statement from all source files in
31  *  the program, then also delete it here.
32  *
33  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
34  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
35  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
36  *  details.
37  *
38  *  You should have received a copy of the GNU General Public License along
39  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
40  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
41 \*****************************************************************************/
42 
43 #include "config.h"
44 
45 #define _GNU_SOURCE	/* For POLLRDHUP */
46 #include <fcntl.h>
47 #include <poll.h>
48 #include <stdlib.h>
49 #include <sys/mman.h>	/* memfd_create */
50 #include <sys/stat.h>
51 #include <sys/types.h>
52 #include <unistd.h>
53 
54 #if defined(__APPLE__) || defined(__DragonFly__) || defined(__NetBSD__)
55 #define POLLRDHUP POLLHUP
56 #include <signal.h>
57 #endif
58 
59 #include "slurm/slurm.h"
60 #include "slurm/slurmdb.h"
61 
62 #include "src/common/assoc_mgr.h"
63 #include "src/common/list.h"
64 #include "src/common/macros.h"
65 #include "src/common/pack.h"
66 #include "src/common/parse_config.h"
67 #include "src/common/run_command.h"
68 #include "src/common/slurm_accounting_storage.h"
69 #include "src/common/slurm_protocol_api.h"
70 #include "src/common/timers.h"
71 #include "src/common/uid.h"
72 #include "src/common/xmalloc.h"
73 #include "src/common/xstring.h"
74 #include "src/slurmctld/locks.h"
75 #include "src/slurmctld/slurmctld.h"
76 
77 #include "burst_buffer_common.h"
78 
79 /* For possible future use by burst_buffer/generic */
80 #define _SUPPORT_ALT_POOL 0
81 
82 /* Maximum poll wait time for child processes, in milliseconds */
83 #define MAX_POLL_WAIT 500
84 
85 static void	_bb_job_del2(bb_job_t *bb_job);
86 static uid_t *	_parse_users(char *buf);
87 static char *	_print_users(uid_t *buf);
88 
89 /* Translate comma delimitted list of users into a UID array,
90  * Return value must be xfreed */
_parse_users(char * buf)91 static uid_t *_parse_users(char *buf)
92 {
93 	char *tmp, *tok, *save_ptr = NULL;
94 	int inx = 0, array_size;
95 	uid_t *user_array = NULL;
96 
97 	if (!buf)
98 		return user_array;
99 	tmp = xstrdup(buf);
100 	array_size = 1;
101 	user_array = xmalloc(sizeof(uid_t) * array_size);
102 	tok = strtok_r(tmp, ",", &save_ptr);
103 	while (tok) {
104 		if ((uid_from_string(tok, user_array + inx) == -1) ||
105 		    (user_array[inx] == 0)) {
106 			error("%s: ignoring invalid user: %s", __func__, tok);
107 		} else {
108 			if (++inx >= array_size) {
109 				array_size *= 2;
110 				user_array = xrealloc(user_array,
111 						      sizeof(uid_t)*array_size);
112 			}
113 		}
114 		tok = strtok_r(NULL, ",", &save_ptr);
115 	}
116 	xfree(tmp);
117 	return user_array;
118 }
119 
120 /* Translate an array of (zero terminated) UIDs into a string with colon
121  * delimited UIDs
122  * Return value must be xfreed */
_print_users(uid_t * buf)123 static char *_print_users(uid_t *buf)
124 {
125 	char *user_elem, *user_str = NULL;
126 	int i;
127 
128 	if (!buf)
129 		return user_str;
130 	for (i = 0; buf[i]; i++) {
131 		user_elem = uid_to_string(buf[i]);
132 		if (!user_elem)
133 			continue;
134 		if (user_str)
135 			xstrcat(user_str, ",");
136 		xstrcat(user_str, user_elem);
137 		xfree(user_elem);
138 	}
139 	return user_str;
140 }
141 
142 /* Allocate burst buffer hash tables */
bb_alloc_cache(bb_state_t * state_ptr)143 extern void bb_alloc_cache(bb_state_t *state_ptr)
144 {
145 	state_ptr->bb_ahash = xmalloc(sizeof(bb_alloc_t *) * BB_HASH_SIZE);
146 	state_ptr->bb_jhash = xmalloc(sizeof(bb_job_t *)   * BB_HASH_SIZE);
147 	state_ptr->bb_uhash = xmalloc(sizeof(bb_user_t *)  * BB_HASH_SIZE);
148 }
149 
150 /* Clear all cached burst buffer records, freeing all memory. */
bb_clear_cache(bb_state_t * state_ptr)151 extern void bb_clear_cache(bb_state_t *state_ptr)
152 {
153 	bb_alloc_t *bb_current,   *bb_next;
154 	bb_job_t   *job_current,  *job_next;
155 	bb_user_t  *user_current, *user_next;
156 	int i;
157 
158 	if (state_ptr->bb_ahash) {
159 		for (i = 0; i < BB_HASH_SIZE; i++) {
160 			bb_current = state_ptr->bb_ahash[i];
161 			while (bb_current) {
162 				xassert(bb_current->magic == BB_ALLOC_MAGIC);
163 				bb_next = bb_current->next;
164 				bb_free_alloc_buf(bb_current);
165 				bb_current = bb_next;
166 			}
167 		}
168 		xfree(state_ptr->bb_ahash);
169 	}
170 
171 	if (state_ptr->bb_jhash) {
172 		for (i = 0; i < BB_HASH_SIZE; i++) {
173 			job_current = state_ptr->bb_jhash[i];
174 			while (job_current) {
175 				xassert(job_current->magic == BB_JOB_MAGIC);
176 				job_next = job_current->next;
177 				_bb_job_del2(job_current);
178 				job_current = job_next;
179 			}
180 		}
181 		xfree(state_ptr->bb_jhash);
182 	}
183 
184 	if (state_ptr->bb_uhash) {
185 		for (i = 0; i < BB_HASH_SIZE; i++) {
186 			user_current = state_ptr->bb_uhash[i];
187 			while (user_current) {
188 				xassert(user_current->magic == BB_USER_MAGIC);
189 				user_next = user_current->next;
190 				xfree(user_current);
191 				user_current = user_next;
192 			}
193 		}
194 		xfree(state_ptr->bb_uhash);
195 	}
196 
197 	xfree(state_ptr->name);
198 	FREE_NULL_LIST(state_ptr->persist_resv_rec);
199 }
200 
201 /* Clear configuration parameters, free memory
202  * config_ptr IN - Initial configuration to be cleared
203  * fini IN - True if shutting down, do more complete clean-up */
bb_clear_config(bb_config_t * config_ptr,bool fini)204 extern void bb_clear_config(bb_config_t *config_ptr, bool fini)
205 {
206 	int i;
207 
208 	xassert(config_ptr);
209 	xfree(config_ptr->allow_users);
210 	xfree(config_ptr->allow_users_str);
211 	xfree(config_ptr->create_buffer);
212 	config_ptr->debug_flag = false;
213 	xfree(config_ptr->default_pool);
214 	xfree(config_ptr->deny_users);
215 	xfree(config_ptr->deny_users_str);
216 	xfree(config_ptr->destroy_buffer);
217 	xfree(config_ptr->get_sys_state);
218 	xfree(config_ptr->get_sys_status);
219 	config_ptr->granularity = 1;
220 	if (fini) {
221 		for (i = 0; i < config_ptr->pool_cnt; i++)
222 			xfree(config_ptr->pool_ptr[i].name);
223 		xfree(config_ptr->pool_ptr);
224 		config_ptr->pool_cnt = 0;
225 	} else {
226 		for (i = 0; i < config_ptr->pool_cnt; i++)
227 			config_ptr->pool_ptr[i].total_space = 0;
228 	}
229 	config_ptr->other_timeout = 0;
230 	config_ptr->stage_in_timeout = 0;
231 	config_ptr->stage_out_timeout = 0;
232 	xfree(config_ptr->start_stage_in);
233 	xfree(config_ptr->start_stage_out);
234 	xfree(config_ptr->stop_stage_in);
235 	xfree(config_ptr->stop_stage_out);
236 	config_ptr->validate_timeout = 0;
237 }
238 
239 /* Find a per-job burst buffer record for a specific job.
240  * If not found, return NULL. */
bb_find_alloc_rec(bb_state_t * state_ptr,job_record_t * job_ptr)241 extern bb_alloc_t *bb_find_alloc_rec(bb_state_t *state_ptr,
242 				     job_record_t *job_ptr)
243 {
244 	bb_alloc_t *bb_alloc = NULL;
245 
246 	xassert(job_ptr);
247 	xassert(state_ptr);
248 	bb_alloc = state_ptr->bb_ahash[job_ptr->user_id % BB_HASH_SIZE];
249 	while (bb_alloc) {
250 		if (bb_alloc->job_id == job_ptr->job_id) {
251 			if (bb_alloc->user_id == job_ptr->user_id) {
252 				xassert(bb_alloc->magic == BB_ALLOC_MAGIC);
253 				return bb_alloc;
254 			}
255 			error("%s: Slurm state inconsistent with burst buffer. %pJ has UserID mismatch (%u != %u)",
256 			      __func__, job_ptr,
257 			      bb_alloc->user_id, job_ptr->user_id);
258 			/* This has been observed when slurmctld crashed and
259 			 * the job state recovered was missing some jobs
260 			 * which already had burst buffers configured. */
261 		}
262 		bb_alloc = bb_alloc->next;
263 	}
264 	return bb_alloc;
265 }
266 
267 /* Find a burst buffer record by name
268  * bb_name IN - Buffer's name
269  * user_id IN - Possible user ID, advisory use only
270  * RET the buffer or NULL if not found */
bb_find_name_rec(char * bb_name,uint32_t user_id,bb_state_t * state_ptr)271 extern bb_alloc_t *bb_find_name_rec(char *bb_name, uint32_t user_id,
272 				    bb_state_t *state_ptr)
273 {
274 	bb_alloc_t *bb_alloc = NULL;
275 	int i, hash_inx = user_id % BB_HASH_SIZE;
276 
277 	/* Try this user ID first */
278 	bb_alloc = state_ptr->bb_ahash[hash_inx];
279 	while (bb_alloc) {
280 		if (!xstrcmp(bb_alloc->name, bb_name))
281 			return bb_alloc;
282 		bb_alloc = bb_alloc->next;
283 	}
284 
285 	/* Now search all other records */
286 	for (i = 0; i < BB_HASH_SIZE; i++) {
287 		if (i == hash_inx)
288 			continue;
289 		bb_alloc = state_ptr->bb_ahash[i];
290 		while (bb_alloc) {
291 			if (!xstrcmp(bb_alloc->name, bb_name)) {
292 				xassert(bb_alloc->magic == BB_ALLOC_MAGIC);
293 				return bb_alloc;
294 			}
295 			bb_alloc = bb_alloc->next;
296 		}
297 	}
298 
299 	return bb_alloc;
300 }
301 
302 /* Find a per-user burst buffer record for a specific user ID */
bb_find_user_rec(uint32_t user_id,bb_state_t * state_ptr)303 extern bb_user_t *bb_find_user_rec(uint32_t user_id, bb_state_t *state_ptr)
304 {
305 	int inx = user_id % BB_HASH_SIZE;
306 	bb_user_t *user_ptr;
307 
308 	xassert(state_ptr);
309 	xassert(state_ptr->bb_uhash);
310 	user_ptr = state_ptr->bb_uhash[inx];
311 	while (user_ptr) {
312 		if (user_ptr->user_id == user_id)
313 			return user_ptr;
314 		user_ptr = user_ptr->next;
315 	}
316 	user_ptr = xmalloc(sizeof(bb_user_t));
317 	user_ptr->magic = BB_USER_MAGIC;
318 	user_ptr->next = state_ptr->bb_uhash[inx];
319 	/* user_ptr->size = 0;	initialized by xmalloc */
320 	user_ptr->user_id = user_id;
321 	state_ptr->bb_uhash[inx] = user_ptr;
322 	return user_ptr;
323 }
324 
325 #ifdef HAVE_MEMFD_CREATE
_handle_replacement(job_record_t * job_ptr)326 char *_handle_replacement(job_record_t *job_ptr)
327 {
328 	char *replaced = NULL, *p, *q;
329 
330 	if (!job_ptr->burst_buffer)
331 		return xstrdup("");
332 
333 	/* throw a script header on in case something downstream cares */
334 	xstrcat(replaced, "#!/bin/sh\n");
335 
336 	p = q = job_ptr->burst_buffer;
337 
338 	while (*p != '\0') {
339 		if (*p == '%') {
340 			xmemcat(replaced, q, p);
341 			p++;
342 
343 			switch (*p) {
344 			case '%':	/* '%%' -> '%' */
345 				xstrcatchar(replaced, '%');
346 				break;
347 			case 'A':	/* '%A' => array master job id */
348 				xstrfmtcat(replaced, "%u",
349 					   job_ptr->array_job_id);
350 				break;
351 			case 'a':	/* '%a' => array task id */
352 				xstrfmtcat(replaced, "%u",
353 					   job_ptr->array_task_id);
354 				break;
355 			case 'd':	/* '%d' => workdir */
356 				xstrcat(replaced, job_ptr->details->work_dir);
357 				break;
358 			case 'j':	/* '%j' => jobid */
359 				xstrfmtcat(replaced, "%u", job_ptr->job_id);
360 				break;
361 			case 'u':	/* '%u' => user name */
362 				if (!job_ptr->user_name)
363 					job_ptr->user_name =
364 						uid_to_string_or_null(
365 							job_ptr->user_id);
366 				xstrcat(replaced, job_ptr->user_name);
367 				break;
368 			case 'x':	/* '%x' => job name */
369 				xstrcat(replaced, job_ptr->name);
370 				break;
371 			default:
372 				break;
373 			}
374 
375 			q = ++p;
376 		} else if (*p == '\\' && *(p+1) == '\\') {
377 			/* '\\' => stop further symbol processing */
378 			xstrcat(replaced, p);
379 			q = p;
380 			break;
381 		} else
382 			p++;
383 	}
384 
385 	if (p != q)
386 		xmemcat(replaced, q, p);
387 
388 	/* throw an extra terminating newline in for good measure */
389 	xstrcat(replaced, "\n");
390 
391 	return replaced;
392 }
393 #endif
394 
bb_handle_job_script(job_record_t * job_ptr,bb_job_t * bb_job)395 char *bb_handle_job_script(job_record_t *job_ptr, bb_job_t *bb_job)
396 {
397 	char *script = NULL;
398 
399 	if (bb_job->memfd_path) {
400 		/*
401 		 * Already have an existing symbol-replaced script, so use it.
402 		 */
403 		return xstrdup(bb_job->memfd_path);
404 	}
405 
406 	if (bb_job->need_symbol_replacement) {
407 #ifdef HAVE_MEMFD_CREATE
408 		/*
409 		 * Create a memfd-backed temporary file to write out the
410 		 * symbol-replaced BB script. memfd files will automatically be
411 		 * cleaned up on process termination. This will be recreated if
412 		 * the slurmctld restarts, otherwise kept in memory for the
413 		 * lifespan of the job.
414 		 */
415 		char *filename = NULL, *bb;
416 		pid_t pid = getpid();
417 
418 		xstrfmtcat(filename, "bb_job_script.%u", job_ptr->job_id);
419 
420 		bb_job->memfd = memfd_create(filename, MFD_CLOEXEC);
421 		if (bb_job->memfd < 0)
422 			fatal("%s: failed memfd_create: %m", __func__);
423 		xstrfmtcat(bb_job->memfd_path, "/proc/%lu/fd/%d",
424 			   (unsigned long) pid, bb_job->memfd);
425 
426 		bb = _handle_replacement(job_ptr);
427 		safe_write(bb_job->memfd, bb, strlen(bb));
428 		xfree(bb);
429 
430 		return xstrdup(bb_job->memfd_path);
431 
432 	rwfail:
433 		xfree(bb);
434 		fatal("%s: could not write script file, likely out of memory",
435 		      __func__);
436 #else
437 		error("%s: symbol replacement requested, but not available as memfd_create() could not be found at compile time. "
438 		      "Falling back to the unreplaced job script.",
439 		      __func__);
440 #endif
441 	}
442 
443 	xstrfmtcat(script, "%s/hash.%d/job.%u/script",
444 		   slurmctld_conf.state_save_location, (job_ptr->job_id % 10),
445 		   job_ptr->job_id);
446 
447 	return script;
448 }
449 
450 #if _SUPPORT_ALT_POOL
_atoi(char * tok)451 static uint64_t _atoi(char *tok)
452 {
453 	char *end_ptr = NULL;
454 	int64_t size_i;
455 	uint64_t mult, size_u = 0;
456 
457 	size_i = (int64_t) strtoll(tok, &end_ptr, 10);
458 	if (size_i > 0) {
459 		size_u = (uint64_t) size_i;
460 		if ((mult = suffix_mult(end_ptr)) != NO_VAL64)
461 			size_u *= mult;
462 	}
463 	return size_u;
464 }
465 #endif
466 
467 /* Set the bb_state's tres_id and tres_pos for limit enforcement.
468  * Value is set to -1 if not found. */
bb_set_tres_pos(bb_state_t * state_ptr)469 extern void bb_set_tres_pos(bb_state_t *state_ptr)
470 {
471 	slurmdb_tres_rec_t tres_rec;
472 	int inx;
473 
474 	xassert(state_ptr);
475 	memset(&tres_rec, 0, sizeof(slurmdb_tres_rec_t));
476 	tres_rec.type = "bb";
477 	tres_rec.name = state_ptr->name;
478 	inx = assoc_mgr_find_tres_pos(&tres_rec, false);
479 	state_ptr->tres_pos = inx;
480 	if (inx == -1) {
481 		debug3("%s: Tres %s not found by assoc_mgr",
482 		       __func__, state_ptr->name);
483 	} else {
484 		state_ptr->tres_id  = assoc_mgr_tres_array[inx]->id;
485 	}
486 }
487 
488 /* Load and process configuration parameters */
bb_load_config(bb_state_t * state_ptr,char * plugin_type)489 extern void bb_load_config(bb_state_t *state_ptr, char *plugin_type)
490 {
491 	s_p_hashtbl_t *bb_hashtbl = NULL;
492 	char *bb_conf, *tmp = NULL, *value;
493 #if _SUPPORT_ALT_POOL
494 	char *colon, *save_ptr = NULL, *tok;
495 	uint32_t pool_cnt;
496 #endif
497 	int fd, i;
498 	static s_p_options_t bb_options[] = {
499 		{"AllowUsers", S_P_STRING},
500 #if _SUPPORT_ALT_POOL
501 		{"AltPool", S_P_STRING},
502 #endif
503 		{"CreateBuffer", S_P_STRING},
504 		{"DefaultPool", S_P_STRING},
505 		{"DenyUsers", S_P_STRING},
506 		{"DestroyBuffer", S_P_STRING},
507 		{"Flags", S_P_STRING},
508 		{"GetSysState", S_P_STRING},
509 		{"GetSysStatus", S_P_STRING},
510 		{"Granularity", S_P_STRING},
511 		{"OtherTimeout", S_P_UINT32},
512 		{"StageInTimeout", S_P_UINT32},
513 		{"StageOutTimeout", S_P_UINT32},
514 		{"StartStageIn", S_P_STRING},
515 		{"StartStageOut", S_P_STRING},
516 		{"StopStageIn", S_P_STRING},
517 		{"StopStageOut", S_P_STRING},
518 		{"ValidateTimeout", S_P_UINT32},
519 		{NULL}
520 	};
521 
522 	xfree(state_ptr->name);
523 	if (plugin_type) {
524 		tmp = strchr(plugin_type, '/');
525 		if (tmp)
526 			tmp++;
527 		else
528 			tmp = plugin_type;
529 		state_ptr->name = xstrdup(tmp);
530 	}
531 
532 	/* Set default configuration */
533 	bb_clear_config(&state_ptr->bb_config, false);
534 	if (slurm_get_debug_flags() & DEBUG_FLAG_BURST_BUF)
535 		state_ptr->bb_config.debug_flag = true;
536 	state_ptr->bb_config.flags |= BB_FLAG_DISABLE_PERSISTENT;
537 	state_ptr->bb_config.other_timeout = DEFAULT_OTHER_TIMEOUT;
538 	state_ptr->bb_config.stage_in_timeout = DEFAULT_STATE_IN_TIMEOUT;
539 	state_ptr->bb_config.stage_out_timeout = DEFAULT_STATE_OUT_TIMEOUT;
540 	state_ptr->bb_config.validate_timeout = DEFAULT_VALIDATE_TIMEOUT;
541 
542 	/* First look for "burst_buffer.conf" then with "type" field,
543 	 * for example "burst_buffer_datawarp.conf" */
544 	bb_conf = get_extra_conf_path("burst_buffer.conf");
545 	fd = open(bb_conf, 0);
546 	if (fd >= 0) {
547 		close(fd);
548 	} else {
549 		char *new_path = NULL;
550 		xfree(bb_conf);
551 		xstrfmtcat(new_path, "burst_buffer_%s.conf", state_ptr->name);
552 		bb_conf = get_extra_conf_path(new_path);
553 		fd = open(bb_conf, 0);
554 		if (fd < 0) {
555 			info("%s: Unable to find configuration file %s or "
556 			     "burst_buffer.conf", __func__, new_path);
557 			xfree(bb_conf);
558 			xfree(new_path);
559 			return;
560 		}
561 		close(fd);
562 		xfree(new_path);
563 	}
564 
565 	bb_hashtbl = s_p_hashtbl_create(bb_options);
566 	if (s_p_parse_file(bb_hashtbl, NULL, bb_conf, false) == SLURM_ERROR) {
567 		fatal("%s: something wrong with opening/reading %s: %m",
568 		      __func__, bb_conf);
569 	}
570 	if (s_p_get_string(&state_ptr->bb_config.allow_users_str, "AllowUsers",
571 			   bb_hashtbl)) {
572 		state_ptr->bb_config.allow_users = _parse_users(
573 					state_ptr->bb_config.allow_users_str);
574 	}
575 	s_p_get_string(&state_ptr->bb_config.create_buffer, "CreateBuffer",
576 		       bb_hashtbl);
577 	s_p_get_string(&state_ptr->bb_config.default_pool, "DefaultPool",
578 		       bb_hashtbl);
579 	if (s_p_get_string(&state_ptr->bb_config.deny_users_str, "DenyUsers",
580 			   bb_hashtbl)) {
581 		state_ptr->bb_config.deny_users = _parse_users(
582 					state_ptr->bb_config.deny_users_str);
583 	}
584 	s_p_get_string(&state_ptr->bb_config.destroy_buffer, "DestroyBuffer",
585 		       bb_hashtbl);
586 
587 	if (s_p_get_string(&tmp, "Flags", bb_hashtbl)) {
588 		state_ptr->bb_config.flags = slurm_bb_str2flags(tmp);
589 		xfree(tmp);
590 	}
591 	/* By default, disable persistent buffer creation by normal users */
592 	if (state_ptr->bb_config.flags & BB_FLAG_ENABLE_PERSISTENT)
593 		state_ptr->bb_config.flags &= (~BB_FLAG_DISABLE_PERSISTENT);
594 
595 	s_p_get_string(&state_ptr->bb_config.get_sys_state, "GetSysState",
596 		       bb_hashtbl);
597 	s_p_get_string(&state_ptr->bb_config.get_sys_status, "GetSysStatus",
598 		       bb_hashtbl);
599 	if (s_p_get_string(&tmp, "Granularity", bb_hashtbl)) {
600 		state_ptr->bb_config.granularity = bb_get_size_num(tmp, 1);
601 		xfree(tmp);
602 		if (state_ptr->bb_config.granularity == 0) {
603 			error("%s: Granularity=0 is invalid", __func__);
604 			state_ptr->bb_config.granularity = 1;
605 		}
606 	}
607 #if _SUPPORT_ALT_POOL
608 	if (s_p_get_string(&tmp, "AltPool", bb_hashtbl)) {
609 		tok = strtok_r(tmp, ",", &save_ptr);
610 		while (tok) {
611 			colon = strchr(tok, ':');
612 			if (colon) {
613 				colon[0] = '\0';
614 				pool_cnt = _atoi(colon + 1);
615 			} else
616 				pool_cnt = 1;
617 			state_ptr->bb_config.pool_ptr = xrealloc(
618 				state_ptr->bb_config.pool_ptr,
619 				sizeof(burst_buffer_pool_t) *
620 				(state_ptr->bb_config.pool_cnt + 1));
621 			state_ptr->bb_config.
622 				pool_ptr[state_ptr->bb_config.pool_cnt].name =
623 				xstrdup(tok);
624 			state_ptr->bb_config.
625 				pool_ptr[state_ptr->bb_config.pool_cnt].
626 				avail_space = pool_cnt;
627 			state_ptr->bb_config.pool_cnt++;
628 			tok = strtok_r(NULL, ",", &save_ptr);
629 		}
630 		xfree(tmp);
631 	}
632 #endif
633 
634 	(void) s_p_get_uint32(&state_ptr->bb_config.other_timeout,
635 			     "OtherTimeout", bb_hashtbl);
636 	(void) s_p_get_uint32(&state_ptr->bb_config.stage_in_timeout,
637 			    "StageInTimeout", bb_hashtbl);
638 	(void) s_p_get_uint32(&state_ptr->bb_config.stage_out_timeout,
639 			    "StageOutTimeout", bb_hashtbl);
640 	s_p_get_string(&state_ptr->bb_config.start_stage_in, "StartStageIn",
641 		       bb_hashtbl);
642 	s_p_get_string(&state_ptr->bb_config.start_stage_out, "StartStageOut",
643 			    bb_hashtbl);
644 	s_p_get_string(&state_ptr->bb_config.stop_stage_in, "StopStageIn",
645 		       bb_hashtbl);
646 	s_p_get_string(&state_ptr->bb_config.stop_stage_out, "StopStageOut",
647 		       bb_hashtbl);
648 	(void) s_p_get_uint32(&state_ptr->bb_config.validate_timeout,
649 			     "ValidateTimeout", bb_hashtbl);
650 
651 	s_p_hashtbl_destroy(bb_hashtbl);
652 	xfree(bb_conf);
653 
654 	if (state_ptr->bb_config.debug_flag) {
655 		value = _print_users(state_ptr->bb_config.allow_users);
656 		info("%s: AllowUsers:%s",  __func__, value);
657 		xfree(value);
658 		info("%s: CreateBuffer:%s",  __func__,
659 		     state_ptr->bb_config.create_buffer);
660 		info("%s: DefaultPool:%s",  __func__,
661 		     state_ptr->bb_config.default_pool);
662 		value = _print_users(state_ptr->bb_config.deny_users);
663 		info("%s: DenyUsers:%s",  __func__, value);
664 		xfree(value);
665 		info("%s: DestroyBuffer:%s",  __func__,
666 		     state_ptr->bb_config.destroy_buffer);
667 		info("%s: GetSysState:%s",  __func__,
668 		     state_ptr->bb_config.get_sys_state);
669 		info("%s: GetSysStatus:%s",  __func__,
670 		     state_ptr->bb_config.get_sys_status);
671 		info("%s: Granularity:%"PRIu64"",  __func__,
672 		     state_ptr->bb_config.granularity);
673 		for (i = 0; i < state_ptr->bb_config.pool_cnt; i++) {
674 			info("%s: AltPoolName[%d]:%s:%"PRIu64"", __func__, i,
675 			     state_ptr->bb_config.pool_ptr[i].name,
676 			     state_ptr->bb_config.pool_ptr[i].total_space);
677 		}
678 		info("%s: OtherTimeout:%u", __func__,
679 		     state_ptr->bb_config.other_timeout);
680 		info("%s: StageInTimeout:%u", __func__,
681 		     state_ptr->bb_config.stage_in_timeout);
682 		info("%s: StageOutTimeout:%u", __func__,
683 		     state_ptr->bb_config.stage_out_timeout);
684 		info("%s: StartStageIn:%s",  __func__,
685 		     state_ptr->bb_config.start_stage_in);
686 		info("%s: StartStageOut:%s",  __func__,
687 		     state_ptr->bb_config.start_stage_out);
688 		info("%s: StopStageIn:%s",  __func__,
689 		     state_ptr->bb_config.stop_stage_in);
690 		info("%s: StopStageOut:%s",  __func__,
691 		     state_ptr->bb_config.stop_stage_out);
692 		info("%s: ValidateTimeout:%u", __func__,
693 		     state_ptr->bb_config.validate_timeout);
694 	}
695 }
696 
_pack_alloc(struct bb_alloc * bb_alloc,Buf buffer,uint16_t protocol_version)697 static void _pack_alloc(struct bb_alloc *bb_alloc, Buf buffer,
698 			uint16_t protocol_version)
699 {
700 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
701 		packstr(bb_alloc->account,      buffer);
702 		pack32(bb_alloc->array_job_id,  buffer);
703 		pack32(bb_alloc->array_task_id, buffer);
704 		pack_time(bb_alloc->create_time, buffer);
705 		pack32(bb_alloc->job_id,        buffer);
706 		packstr(bb_alloc->name,         buffer);
707 		packstr(bb_alloc->partition,    buffer);
708 		packstr(bb_alloc->pool,   	buffer);
709 		packstr(bb_alloc->qos,          buffer);
710 		pack64(bb_alloc->size,          buffer);
711 		pack16(bb_alloc->state,         buffer);
712 		pack32(bb_alloc->user_id,       buffer);
713 	}
714 }
715 
716 /* Pack individual burst buffer records into a buffer */
bb_pack_bufs(uid_t uid,bb_state_t * state_ptr,Buf buffer,uint16_t protocol_version)717 extern int bb_pack_bufs(uid_t uid, bb_state_t *state_ptr, Buf buffer,
718 			uint16_t protocol_version)
719 {
720 	int i, rec_count = 0;
721 	struct bb_alloc *bb_alloc;
722 	int eof, offset;
723 
724 	xassert(state_ptr);
725 	offset = get_buf_offset(buffer);
726 	pack32(rec_count,  buffer);
727 	if (!state_ptr->bb_ahash)
728 		return rec_count;
729 
730 	for (i = 0; i < BB_HASH_SIZE; i++) {
731 		bb_alloc = state_ptr->bb_ahash[i];
732 		while (bb_alloc) {
733 			if ((uid == 0) || (uid == bb_alloc->user_id)) {
734 				_pack_alloc(bb_alloc, buffer, protocol_version);
735 				rec_count++;
736 			}
737 			bb_alloc = bb_alloc->next;
738 		}
739 	}
740 	if (rec_count != 0) {
741 		eof = get_buf_offset(buffer);
742 		set_buf_offset(buffer, offset);
743 		pack32(rec_count, buffer);
744 		set_buf_offset(buffer, eof);
745 	}
746 
747 	return rec_count;
748 }
749 
750 /* Pack state and configuration parameters into a buffer */
bb_pack_state(bb_state_t * state_ptr,Buf buffer,uint16_t protocol_version)751 extern void bb_pack_state(bb_state_t *state_ptr, Buf buffer,
752 			  uint16_t protocol_version)
753 {
754 	bb_config_t *config_ptr = &state_ptr->bb_config;
755 	int i;
756 
757 
758 	if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
759 		packstr(config_ptr->allow_users_str, buffer);
760 		packstr(config_ptr->create_buffer,   buffer);
761 		packstr(config_ptr->default_pool,    buffer);
762 		packstr(config_ptr->deny_users_str,  buffer);
763 		packstr(config_ptr->destroy_buffer,  buffer);
764 		pack32(config_ptr->flags,            buffer);
765 		packstr(config_ptr->get_sys_state,   buffer);
766 		packstr(config_ptr->get_sys_status,   buffer);
767 		pack64(config_ptr->granularity,      buffer);
768 		pack32(config_ptr->pool_cnt,         buffer);
769 		for (i = 0; i < config_ptr->pool_cnt; i++) {
770 			packstr(config_ptr->pool_ptr[i].name, buffer);
771 			pack64(config_ptr->pool_ptr[i].total_space, buffer);
772 			pack64(config_ptr->pool_ptr[i].granularity, buffer);
773 			pack64(config_ptr->pool_ptr[i].unfree_space, buffer);
774 			pack64(config_ptr->pool_ptr[i].used_space, buffer);
775 		}
776 		pack32(config_ptr->other_timeout,    buffer);
777 		packstr(config_ptr->start_stage_in,  buffer);
778 		packstr(config_ptr->start_stage_out, buffer);
779 		packstr(config_ptr->stop_stage_in,   buffer);
780 		packstr(config_ptr->stop_stage_out,  buffer);
781 		pack32(config_ptr->stage_in_timeout, buffer);
782 		pack32(config_ptr->stage_out_timeout,buffer);
783 		pack64(state_ptr->total_space,       buffer);
784 		pack64(state_ptr->unfree_space,      buffer);
785 		pack64(state_ptr->used_space,        buffer);
786 		pack32(config_ptr->validate_timeout, buffer);
787 	}
788 }
789 
790 /* Pack individual burst buffer usage records into a buffer (used for limits) */
bb_pack_usage(uid_t uid,bb_state_t * state_ptr,Buf buffer,uint16_t protocol_version)791 extern int bb_pack_usage(uid_t uid, bb_state_t *state_ptr, Buf buffer,
792 			 uint16_t protocol_version)
793 {
794 	int i, rec_count = 0;
795 	bb_user_t *bb_usage;
796 	int eof, offset;
797 
798 	xassert(state_ptr);
799 	offset = get_buf_offset(buffer);
800 	pack32(rec_count,  buffer);
801 	if (!state_ptr->bb_uhash)
802 		return rec_count;
803 
804 	for (i = 0; i < BB_HASH_SIZE; i++) {
805 		bb_usage = state_ptr->bb_uhash[i];
806 		while (bb_usage) {
807 			if (((uid == 0) || (uid == bb_usage->user_id)) &&
808 			    (bb_usage->size != 0)) {
809 				pack64(bb_usage->size,          buffer);
810 				pack32(bb_usage->user_id,       buffer);
811 				rec_count++;
812 			}
813 			bb_usage = bb_usage->next;
814 		}
815 	}
816 	if (rec_count != 0) {
817 		eof = get_buf_offset(buffer);
818 		set_buf_offset(buffer, offset);
819 		pack32(rec_count, buffer);
820 		set_buf_offset(buffer, eof);
821 	}
822 
823 	return rec_count;
824 }
825 
826 /* Translate a burst buffer size specification in string form to numeric form,
827  * recognizing various (case insensitive) sufficies:
828  * K/KiB, M/MiB, G/GiB, T/TiB, P/PiB for powers of 1024
829  * KB, MB, GB, TB, PB for powers of 1000
830  * N/Node/Nodes will consider the size in nodes
831  * Default units are bytes. */
bb_get_size_num(char * tok,uint64_t granularity)832 extern uint64_t bb_get_size_num(char *tok, uint64_t granularity)
833 {
834 	char *tmp = NULL, *unit;
835 	uint64_t bb_size_i, mult;
836 	uint64_t bb_size_u = 0;
837 
838 	bb_size_i = (uint64_t) strtoull(tok, &tmp, 10);
839 	if ((bb_size_i > 0) && tmp) {
840 		bb_size_u = bb_size_i;
841 		unit = xstrdup(tmp);
842 		strtok(unit, " ");
843 		if (!xstrcasecmp(unit, "n") ||
844 		    !xstrcasecmp(unit, "node") ||
845 		    !xstrcasecmp(unit, "nodes")) {
846 			bb_size_u |= BB_SIZE_IN_NODES;
847 			granularity = 1;
848 		} else if ((mult = suffix_mult(unit)) != NO_VAL64) {
849 			bb_size_u *= mult;
850 		}
851 		xfree(unit);
852 	}
853 
854 	if (granularity > 1) {
855 		bb_size_u = ((bb_size_u + granularity - 1) / granularity) *
856 			    granularity;
857 	}
858 
859 	return bb_size_u;
860 }
861 
862 /* Translate a burst buffer size specification in numeric form to string form,
863  * appending various sufficies (KiB, MiB, GB, TB, PB, and Nodes). Default units
864  * are bytes. */
bb_get_size_str(uint64_t size)865 extern char *bb_get_size_str(uint64_t size)
866 {
867 	static char size_str[64];
868 
869 	if (size == 0) {
870 		snprintf(size_str, sizeof(size_str), "%"PRIu64, size);
871 	} else if (size & BB_SIZE_IN_NODES) {
872 		size &= (~BB_SIZE_IN_NODES);
873 		snprintf(size_str, sizeof(size_str), "%"PRIu64"N", size);
874 
875 	} else if ((size % ((uint64_t)1024 * 1024 * 1024 * 1024 * 1024)) == 0) {
876 		size /= ((uint64_t)1024 * 1024 * 1024 * 1024 * 1024);
877 		snprintf(size_str, sizeof(size_str), "%"PRIu64"PiB", size);
878 	} else if ((size % ((uint64_t)1000 * 1000 * 1000 * 1000 * 1000)) == 0) {
879 		size /= ((uint64_t)1000 * 1000 * 1000 * 1000 * 1000);
880 		snprintf(size_str, sizeof(size_str), "%"PRIu64"PB", size);
881 
882 	} else if ((size % ((uint64_t)1024 * 1024 * 1024 * 1024)) == 0) {
883 		size /= ((uint64_t)1024 * 1024 * 1024 * 1024);
884 		snprintf(size_str, sizeof(size_str), "%"PRIu64"TiB", size);
885 	} else if ((size % ((uint64_t)1000 * 1000 * 1000 * 1000)) == 0) {
886 		size /= ((uint64_t)1000 * 1000 * 1000 * 1000);
887 		snprintf(size_str, sizeof(size_str), "%"PRIu64"TB", size);
888 
889 	} else if ((size % ((uint64_t)1024 * 1024 * 1024)) == 0) {
890 		size /= ((uint64_t)1024 * 1024 * 1024);
891 		snprintf(size_str, sizeof(size_str), "%"PRIu64"GiB", size);
892 	} else if ((size % ((uint64_t)1000 * 1000 * 1000)) == 0) {
893 		size /= ((uint64_t)1000 * 1000 * 1000);
894 		snprintf(size_str, sizeof(size_str), "%"PRIu64"GB", size);
895 
896 	} else if ((size % ((uint64_t)1024 * 1024)) == 0) {
897 		size /= ((uint64_t)1024 * 1024);
898 		snprintf(size_str, sizeof(size_str), "%"PRIu64"MiB", size);
899 	} else if ((size % ((uint64_t)1000 * 1000)) == 0) {
900 		size /= ((uint64_t)1000 * 1000);
901 		snprintf(size_str, sizeof(size_str), "%"PRIu64"MB", size);
902 
903 	} else if ((size % ((uint64_t)1024)) == 0) {
904 		size /= ((uint64_t)1024);
905 		snprintf(size_str, sizeof(size_str), "%"PRIu64"KiB", size);
906 	} else if ((size % ((uint64_t)1000)) == 0) {
907 		size /= ((uint64_t)1000);
908 		snprintf(size_str, sizeof(size_str), "%"PRIu64"KB", size);
909 
910 	} else {
911 		snprintf(size_str, sizeof(size_str), "%"PRIu64, size);
912 	}
913 
914 	return size_str;
915 }
916 
917 /* Round up a number based upon some granularity */
bb_granularity(uint64_t start_size,uint64_t granularity)918 extern uint64_t bb_granularity(uint64_t start_size, uint64_t granularity)
919 {
920 	if (start_size) {
921 		start_size = start_size + granularity - 1;
922 		start_size /= granularity;
923 		start_size *= granularity;
924 	}
925 	return start_size;
926 }
927 
bb_job_queue_del(void * x)928 extern void bb_job_queue_del(void *x)
929 {
930 	xfree(x);
931 }
932 
933 /* Sort job queue by expected start time */
bb_job_queue_sort(void * x,void * y)934 extern int bb_job_queue_sort(void *x, void *y)
935 {
936 	bb_job_queue_rec_t *job_rec1 = *(bb_job_queue_rec_t **) x;
937 	bb_job_queue_rec_t *job_rec2 = *(bb_job_queue_rec_t **) y;
938 	job_record_t *job_ptr1 = job_rec1->job_ptr;
939 	job_record_t *job_ptr2 = job_rec2->job_ptr;
940 
941 	if (job_ptr1->start_time > job_ptr2->start_time)
942 		return 1;
943 	if (job_ptr1->start_time < job_ptr2->start_time)
944 		return -1;
945 	return 0;
946 }
947 
948 /* Sort preempt_bb_recs in order of DECREASING use_time */
bb_preempt_queue_sort(void * x,void * y)949 extern int bb_preempt_queue_sort(void *x, void *y)
950 {
951 	struct preempt_bb_recs *bb_ptr1 = *(struct preempt_bb_recs **) x;
952 	struct preempt_bb_recs *bb_ptr2 = *(struct preempt_bb_recs **) y;
953 
954 	if (bb_ptr1->use_time > bb_ptr2->use_time)
955 		return -1;
956 	if (bb_ptr1->use_time < bb_ptr2->use_time)
957 		return 1;
958 	return 0;
959 };
960 
961 /* For each burst buffer record, set the use_time to the time at which its
962  * use is expected to begin (i.e. each job's expected start time) */
bb_set_use_time(bb_state_t * state_ptr)963 extern void bb_set_use_time(bb_state_t *state_ptr)
964 {
965 	job_record_t *job_ptr;
966 	bb_alloc_t *bb_alloc = NULL;
967 	time_t now = time(NULL);
968 	int i;
969 
970 	state_ptr->next_end_time = now + 60 * 60; /* Start estimate now+1hour */
971 	for (i = 0; i < BB_HASH_SIZE; i++) {
972 		bb_alloc = state_ptr->bb_ahash[i];
973 		while (bb_alloc) {
974 			if (bb_alloc->job_id &&
975 			    ((bb_alloc->state == BB_STATE_STAGING_IN) ||
976 			     (bb_alloc->state == BB_STATE_STAGED_IN))) {
977 				job_ptr = find_job_record(bb_alloc->job_id);
978 				if (!job_ptr && !bb_alloc->orphaned) {
979 					bb_alloc->orphaned = true;
980 					error("%s: JobId=%u not found for allocated burst buffer",
981 					      __func__, bb_alloc->job_id);
982 					bb_alloc->use_time = now + 24 * 60 * 60;
983 				} else if (!job_ptr) {
984 					bb_alloc->use_time = now + 24 * 60 * 60;
985 				} else if (job_ptr->start_time) {
986 					bb_alloc->end_time = job_ptr->end_time;
987 					bb_alloc->use_time = job_ptr->start_time;
988 				} else {
989 					/* Unknown start time */
990 					bb_alloc->use_time = now + 60 * 60;
991 				}
992 			} else if (bb_alloc->job_id) {
993 				job_ptr = find_job_record(bb_alloc->job_id);
994 				if (job_ptr)
995 					bb_alloc->end_time = job_ptr->end_time;
996 			} else {
997 				bb_alloc->use_time = now;
998 			}
999 			if (bb_alloc->end_time && bb_alloc->size) {
1000 				if (bb_alloc->end_time <= now)
1001 					state_ptr->next_end_time = now;
1002 				else if (state_ptr->next_end_time >
1003 					 bb_alloc->end_time) {
1004 					state_ptr->next_end_time =
1005 						bb_alloc->end_time;
1006 				}
1007 			}
1008 			bb_alloc = bb_alloc->next;
1009 		}
1010 	}
1011 }
1012 
1013 /* Sleep function, also handles termination signal */
bb_sleep(bb_state_t * state_ptr,int add_secs)1014 extern void bb_sleep(bb_state_t *state_ptr, int add_secs)
1015 {
1016 	struct timespec ts = {0, 0};
1017 	struct timeval  tv = {0, 0};
1018 
1019 	if (gettimeofday(&tv, NULL)) {		/* Some error */
1020 		sleep(1);
1021 		return;
1022 	}
1023 
1024 	ts.tv_sec  = tv.tv_sec + add_secs;
1025 	ts.tv_nsec = tv.tv_usec * 1000;
1026 	slurm_mutex_lock(&state_ptr->term_mutex);
1027 	if (!state_ptr->term_flag) {
1028 		slurm_cond_timedwait(&state_ptr->term_cond,
1029 				     &state_ptr->term_mutex, &ts);
1030 	}
1031 	slurm_mutex_unlock(&state_ptr->term_mutex);
1032 }
1033 
1034 
1035 /* Allocate a named burst buffer record for a specific user.
1036  * Return a pointer to that record.
1037  * Use bb_free_name_rec() to purge the returned record. */
bb_alloc_name_rec(bb_state_t * state_ptr,char * name,uint32_t user_id)1038 extern bb_alloc_t *bb_alloc_name_rec(bb_state_t *state_ptr, char *name,
1039 				     uint32_t user_id)
1040 {
1041 	bb_alloc_t *bb_alloc = NULL;
1042 	time_t now = time(NULL);
1043 	int i;
1044 
1045 	xassert(state_ptr->bb_ahash);
1046 	state_ptr->last_update_time = now;
1047 	bb_alloc = xmalloc(sizeof(bb_alloc_t));
1048 	i = user_id % BB_HASH_SIZE;
1049 	bb_alloc->magic = BB_ALLOC_MAGIC;
1050 	bb_alloc->next = state_ptr->bb_ahash[i];
1051 	state_ptr->bb_ahash[i] = bb_alloc;
1052 	bb_alloc->array_task_id = NO_VAL;
1053 	bb_alloc->name = xstrdup(name);
1054 	bb_alloc->state = BB_STATE_ALLOCATED;
1055 	bb_alloc->state_time = now;
1056 	bb_alloc->seen_time = now;
1057 	bb_alloc->user_id = user_id;
1058 
1059 	return bb_alloc;
1060 }
1061 
1062 /* Allocate a per-job burst buffer record for a specific job.
1063  * Return a pointer to that record.
1064  * Use bb_free_alloc_rec() to purge the returned record. */
bb_alloc_job_rec(bb_state_t * state_ptr,job_record_t * job_ptr,bb_job_t * bb_job)1065 extern bb_alloc_t *bb_alloc_job_rec(bb_state_t *state_ptr,
1066 				    job_record_t *job_ptr,
1067 				    bb_job_t *bb_job)
1068 {
1069 	bb_alloc_t *bb_alloc = NULL;
1070 	int i;
1071 
1072 	xassert(state_ptr->bb_ahash);
1073 	xassert(job_ptr);
1074 	state_ptr->last_update_time = time(NULL);
1075 	bb_alloc = xmalloc(sizeof(bb_alloc_t));
1076 	bb_alloc->account = xstrdup(bb_job->account);
1077 	bb_alloc->array_job_id = job_ptr->array_job_id;
1078 	bb_alloc->array_task_id = job_ptr->array_task_id;
1079 	bb_alloc->assoc_ptr = job_ptr->assoc_ptr;
1080 	bb_alloc->job_id = job_ptr->job_id;
1081 	bb_alloc->magic = BB_ALLOC_MAGIC;
1082 	i = job_ptr->user_id % BB_HASH_SIZE;
1083 	xstrfmtcat(bb_alloc->name, "%u", job_ptr->job_id);
1084 	bb_alloc->next = state_ptr->bb_ahash[i];
1085 	bb_alloc->partition = xstrdup(bb_job->partition);
1086 	bb_alloc->pool = xstrdup(bb_job->job_pool);
1087 	bb_alloc->qos = xstrdup(bb_job->qos);
1088 	state_ptr->bb_ahash[i] = bb_alloc;
1089 	bb_alloc->size = bb_job->total_size;
1090 	bb_alloc->state = BB_STATE_ALLOCATED;
1091 	bb_alloc->state_time = time(NULL);
1092 	bb_alloc->seen_time = time(NULL);
1093 	bb_alloc->user_id = job_ptr->user_id;
1094 
1095 	return bb_alloc;
1096 }
1097 
1098 /* Allocate a burst buffer record for a job and increase the job priority
1099  * if so configured.
1100  * Use bb_free_alloc_rec() to purge the returned record. */
bb_alloc_job(bb_state_t * state_ptr,job_record_t * job_ptr,bb_job_t * bb_job)1101 extern bb_alloc_t *bb_alloc_job(bb_state_t *state_ptr, job_record_t *job_ptr,
1102 				bb_job_t *bb_job)
1103 {
1104 	bb_alloc_t *bb_alloc;
1105 
1106 	bb_alloc = bb_alloc_job_rec(state_ptr, job_ptr, bb_job);
1107 
1108 	return bb_alloc;
1109 }
1110 
1111 /* Free memory associated with allocated bb record, caller is responsible for
1112  * maintaining linked list */
bb_free_alloc_buf(bb_alloc_t * bb_alloc)1113 extern void bb_free_alloc_buf(bb_alloc_t *bb_alloc)
1114 {
1115 	if (bb_alloc) {
1116 		xassert(bb_alloc->magic == BB_ALLOC_MAGIC);
1117 		bb_alloc->magic = 0;
1118 		xfree(bb_alloc->account);
1119 		xfree(bb_alloc->assocs);
1120 		xfree(bb_alloc->name);
1121 		xfree(bb_alloc->partition);
1122 		xfree(bb_alloc->pool);
1123 		xfree(bb_alloc->qos);
1124 		xfree(bb_alloc);
1125 	}
1126 }
1127 
1128 
1129 /* Remove a specific bb_alloc_t from global records.
1130  * RET true if found, false otherwise */
bb_free_alloc_rec(bb_state_t * state_ptr,bb_alloc_t * bb_alloc)1131 extern bool bb_free_alloc_rec(bb_state_t *state_ptr, bb_alloc_t *bb_alloc)
1132 {
1133 	bb_alloc_t *bb_link, **bb_plink;
1134 	int i;
1135 
1136 	xassert(state_ptr);
1137 	xassert(state_ptr->bb_ahash);
1138 	xassert(bb_alloc);
1139 
1140 	i = bb_alloc->user_id % BB_HASH_SIZE;
1141 	bb_plink = &state_ptr->bb_ahash[i];
1142 	bb_link = state_ptr->bb_ahash[i];
1143 	while (bb_link) {
1144 		if (bb_link == bb_alloc) {
1145 			xassert(bb_link->magic == BB_ALLOC_MAGIC);
1146 			*bb_plink = bb_alloc->next;
1147 			bb_free_alloc_buf(bb_alloc);
1148 			state_ptr->last_update_time = time(NULL);
1149 			return true;
1150 		}
1151 		bb_plink = &bb_link->next;
1152 		bb_link = bb_link->next;
1153 	}
1154 	return false;
1155 }
1156 
1157 /* Allocate a bb_job_t record, hashed by job_id, delete with bb_job_del() */
bb_job_alloc(bb_state_t * state_ptr,uint32_t job_id)1158 extern bb_job_t *bb_job_alloc(bb_state_t *state_ptr, uint32_t job_id)
1159 {
1160 	int inx = job_id % BB_HASH_SIZE;
1161 	bb_job_t *bb_job = xmalloc(sizeof(bb_job_t));
1162 
1163 	xassert(state_ptr);
1164 	bb_job->magic = BB_JOB_MAGIC;
1165 	bb_job->next = state_ptr->bb_jhash[inx];
1166 	bb_job->job_id = job_id;
1167 	state_ptr->bb_jhash[inx] = bb_job;
1168 
1169 	return bb_job;
1170 }
1171 
1172 /* Return a pointer to the existing bb_job_t record for a given job_id or
1173  * NULL if not found */
bb_job_find(bb_state_t * state_ptr,uint32_t job_id)1174 extern bb_job_t *bb_job_find(bb_state_t *state_ptr, uint32_t job_id)
1175 {
1176 	bb_job_t *bb_job;
1177 
1178 	xassert(state_ptr);
1179 
1180 	if (!state_ptr->bb_jhash)
1181 		return NULL;
1182 
1183 	bb_job = state_ptr->bb_jhash[job_id % BB_HASH_SIZE];
1184 	while (bb_job) {
1185 		if (bb_job->job_id == job_id) {
1186 			xassert(bb_job->magic == BB_JOB_MAGIC);
1187 			return bb_job;
1188 		}
1189 		bb_job = bb_job->next;
1190 	}
1191 
1192 	return bb_job;
1193 }
1194 
1195 /* Delete a bb_job_t record, hashed by job_id */
bb_job_del(bb_state_t * state_ptr,uint32_t job_id)1196 extern void bb_job_del(bb_state_t *state_ptr, uint32_t job_id)
1197 {
1198 	int inx = job_id % BB_HASH_SIZE;
1199 	bb_job_t *bb_job, **bb_pjob;
1200 
1201 	xassert(state_ptr);
1202 	bb_pjob = &state_ptr->bb_jhash[inx];
1203 	bb_job  =  state_ptr->bb_jhash[inx];
1204 	while (bb_job) {
1205 		if (bb_job->job_id == job_id) {
1206 			xassert(bb_job->magic == BB_JOB_MAGIC);
1207 			bb_job->magic = 0;
1208 			*bb_pjob = bb_job->next;
1209 			_bb_job_del2(bb_job);
1210 			return;
1211 		}
1212 		bb_pjob = &bb_job->next;
1213 		bb_job  =  bb_job->next;
1214 	}
1215 }
1216 
1217 /* Delete a bb_job_t record. DOES NOT UNLINK FROM HASH TABLE */
_bb_job_del2(bb_job_t * bb_job)1218 static void _bb_job_del2(bb_job_t *bb_job)
1219 {
1220 	int i;
1221 
1222 	if (bb_job) {
1223 		(void) close(bb_job->memfd);
1224 
1225 		xfree(bb_job->account);
1226 		for (i = 0; i < bb_job->buf_cnt; i++) {
1227 			xfree(bb_job->buf_ptr[i].access);
1228 			xfree(bb_job->buf_ptr[i].name);
1229 			xfree(bb_job->buf_ptr[i].pool);
1230 			xfree(bb_job->buf_ptr[i].type);
1231 		}
1232 		xfree(bb_job->buf_ptr);
1233 		xfree(bb_job->job_pool);
1234 		xfree(bb_job->memfd_path);
1235 		xfree(bb_job->partition);
1236 		xfree(bb_job->qos);
1237 		xfree(bb_job);
1238 	}
1239 }
1240 
1241 /* Log the contents of a bb_job_t record using "info()" */
bb_job_log(bb_state_t * state_ptr,bb_job_t * bb_job)1242 extern void bb_job_log(bb_state_t *state_ptr, bb_job_t *bb_job)
1243 {
1244 	bb_buf_t *buf_ptr;
1245 	char *out_buf = NULL;
1246 	int i;
1247 
1248 	if (bb_job) {
1249 		xstrfmtcat(out_buf, "%s: JobId=%u UserID:%u ",
1250 			   state_ptr->name, bb_job->job_id, bb_job->user_id);
1251 		xstrfmtcat(out_buf, "Swap:%ux%u ", bb_job->swap_size,
1252 			   bb_job->swap_nodes);
1253 		xstrfmtcat(out_buf, "TotalSize:%"PRIu64"", bb_job->total_size);
1254 		info("%s", out_buf);
1255 		xfree(out_buf);
1256 		for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
1257 		     i++, buf_ptr++) {
1258 			if (buf_ptr->create) {
1259 				info("  Create  Name:%s Pool:%s Size:%"PRIu64
1260 				     " Access:%s Type:%s State:%s",
1261 				     buf_ptr->name, buf_ptr->pool,
1262 				     buf_ptr->size, buf_ptr->access,
1263 				     buf_ptr->type,
1264 				     bb_state_string(buf_ptr->state));
1265 			} else if (buf_ptr->destroy) {
1266 				info("  Destroy Name:%s Hurry:%d",
1267 				     buf_ptr->name, (int) buf_ptr->hurry);
1268 			} else {
1269 				info("  Use  Name:%s", buf_ptr->name);
1270 			}
1271 		}
1272 	}
1273 }
1274 
1275 /* Make claim against resource limit for a user
1276  * user_id IN - Owner of burst buffer
1277  * bb_size IN - Size of burst buffer
1278  * pool IN - Pool containing the burst buffer
1279  * state_ptr IN - Global state to update
1280  * update_pool_unfree IN - If true, update the pool's unfree space */
bb_limit_add(uint32_t user_id,uint64_t bb_size,char * pool,bb_state_t * state_ptr,bool update_pool_unfree)1281 extern void bb_limit_add(uint32_t user_id, uint64_t bb_size, char *pool,
1282 			 bb_state_t *state_ptr, bool update_pool_unfree)
1283 {
1284 	burst_buffer_pool_t *pool_ptr;
1285 	bb_user_t *bb_user;
1286 	int i;
1287 
1288 	/* Update the pool's used_space, plus unfree_space if needed */
1289 	if (!pool || !xstrcmp(pool, state_ptr->bb_config.default_pool)) {
1290 		state_ptr->used_space += bb_size;
1291 		if (update_pool_unfree)
1292 			state_ptr->unfree_space += bb_size;
1293 	} else {
1294 		pool_ptr = state_ptr->bb_config.pool_ptr;
1295 		for (i = 0; i < state_ptr->bb_config.pool_cnt; i++, pool_ptr++){
1296 			if (xstrcmp(pool, pool_ptr->name))
1297 				continue;
1298 			pool_ptr->used_space += bb_size;
1299 			if (update_pool_unfree)
1300 				pool_ptr->unfree_space += bb_size;
1301 			break;
1302 		}
1303 		if (i >= state_ptr->bb_config.pool_cnt)
1304 			error("%s: Unable to located pool %s", __func__, pool);
1305 	}
1306 
1307 	/* Update user space used */
1308 	bb_user = bb_find_user_rec(user_id, state_ptr);
1309 	xassert(bb_user);
1310 	bb_user->size += bb_size;
1311 
1312 }
1313 
1314 /* Release claim against resource limit for a user */
bb_limit_rem(uint32_t user_id,uint64_t bb_size,char * pool,bb_state_t * state_ptr)1315 extern void bb_limit_rem(uint32_t user_id, uint64_t bb_size, char *pool,
1316 			 bb_state_t *state_ptr)
1317 {
1318 	burst_buffer_pool_t *pool_ptr;
1319 	bb_user_t *bb_user;
1320 	int i;
1321 
1322 	if (!pool || !xstrcmp(pool, state_ptr->bb_config.default_pool)) {
1323 		if (state_ptr->used_space >= bb_size) {
1324 			state_ptr->used_space -= bb_size;
1325 		} else {
1326 			error("%s: used_space underflow", __func__);
1327 			state_ptr->used_space = 0;
1328 		}
1329 		if (state_ptr->unfree_space >= bb_size) {
1330 			state_ptr->unfree_space -= bb_size;
1331 		} else {
1332 			/*
1333 			 * This will happen if we reload burst buffer state
1334 			 * after making a claim against resources, but before
1335 			 * the buffer actually gets created.
1336 			 */
1337 			debug2("%s: unfree_space underflow (%"PRIu64" < %"PRIu64")",
1338 			        __func__, state_ptr->unfree_space, bb_size);
1339 			state_ptr->unfree_space = 0;
1340 		}
1341 	} else {
1342 		pool_ptr = state_ptr->bb_config.pool_ptr;
1343 		for (i = 0; i < state_ptr->bb_config.pool_cnt; i++, pool_ptr++){
1344 			if (xstrcmp(pool, pool_ptr->name))
1345 				continue;
1346 			if (pool_ptr->used_space >= bb_size) {
1347 				pool_ptr->used_space -= bb_size;
1348 			} else {
1349 				error("%s: used_space underflow for pool %s",
1350 				      __func__, pool);
1351 				pool_ptr->used_space = 0;
1352 			}
1353 			if (pool_ptr->unfree_space >= bb_size) {
1354 				pool_ptr->unfree_space -= bb_size;
1355 			} else {
1356 				/*
1357 				 * This will happen if we reload burst buffer
1358 				 * state after making a claim against resources,
1359 				 * but before the buffer actually gets created.
1360 				 */
1361 				debug2("%s: unfree_space underflow for pool %s",
1362 				       __func__, pool);
1363 				pool_ptr->unfree_space = 0;
1364 			}
1365 			break;
1366 		}
1367 		if (i >= state_ptr->bb_config.pool_cnt)
1368 			error("%s: Unable to located pool %s", __func__, pool);
1369 	}
1370 
1371 	bb_user = bb_find_user_rec(user_id, state_ptr);
1372 	xassert(bb_user);
1373 	if (bb_user->size >= bb_size)
1374 		bb_user->size -= bb_size;
1375 	else {
1376 		bb_user->size = 0;
1377 		error("%s: user limit underflow for uid %u", __func__, user_id);
1378 	}
1379 
1380 }
1381 
1382 /* Log creation of a persistent burst buffer in the database
1383  * job_ptr IN - Point to job that created, could be NULL at startup
1384  * bb_alloc IN - Pointer to persistent burst buffer state info
1385  * state_ptr IN - Pointer to burst_buffer plugin state info
1386  * NOTE: assoc_mgr association and qos read lock should be set before this.
1387  */
bb_post_persist_create(job_record_t * job_ptr,bb_alloc_t * bb_alloc,bb_state_t * state_ptr)1388 extern int bb_post_persist_create(job_record_t *job_ptr, bb_alloc_t *bb_alloc,
1389 				  bb_state_t *state_ptr)
1390 {
1391 	int rc = SLURM_SUCCESS;
1392 	slurmdb_reservation_rec_t resv;
1393 	uint64_t size_mb;
1394 
1395 	if (!state_ptr->tres_id) {
1396 		debug2("%s: Not tracking this TRES, "
1397 		       "not sending to the database.", __func__);
1398 		return SLURM_SUCCESS;
1399 	}
1400 
1401 	size_mb = (bb_alloc->size / (1024 * 1024));
1402 
1403 	memset(&resv, 0, sizeof(slurmdb_reservation_rec_t));
1404 	resv.assocs = bb_alloc->assocs;
1405 	resv.cluster = slurmctld_conf.cluster_name;
1406 	resv.name = bb_alloc->name;
1407 	resv.id = bb_alloc->id;
1408 	resv.time_start = bb_alloc->create_time;
1409 	xstrfmtcat(resv.tres_str, "%d=%"PRIu64, state_ptr->tres_id, size_mb);
1410 	rc = acct_storage_g_add_reservation(acct_db_conn, &resv);
1411 	xfree(resv.tres_str);
1412 
1413 	if (state_ptr->tres_pos > 0) {
1414 		slurmdb_assoc_rec_t *assoc_ptr = bb_alloc->assoc_ptr;
1415 
1416 		while (assoc_ptr) {
1417 			assoc_ptr->usage->grp_used_tres[state_ptr->tres_pos] +=
1418 				size_mb;
1419 			debug2("%s: after adding persistent bb %s(%u), "
1420 			       "assoc %u(%s/%s/%s) grp_used_tres(%s) "
1421 			       "is %"PRIu64,
1422 			       __func__, bb_alloc->name, bb_alloc->id,
1423 			       assoc_ptr->id, assoc_ptr->acct,
1424 			       assoc_ptr->user, assoc_ptr->partition,
1425 			       assoc_mgr_tres_name_array[state_ptr->tres_pos],
1426 			       assoc_ptr->usage->
1427 			       grp_used_tres[state_ptr->tres_pos]);
1428 
1429 			/* FIXME: should grp_used_tres_run_secs be
1430 			 * done some how? Same for QOS below.
1431 			 */
1432 			/* debug2("%s: after adding persistent bb %s(%u), " */
1433 			/*        "assoc %u(%s/%s/%s) grp_used_tres_run_secs(%s) " */
1434 			/*        "is %"PRIu64, */
1435 			/*        __func__, bb_alloc->name, bb_alloc->id, */
1436 			/*        assoc_ptr->id, assoc_ptr->acct, */
1437 			/*        assoc_ptr->user, assoc_ptr->partition, */
1438 			/*        assoc_mgr_tres_name_array[state_ptr->tres_pos], */
1439 			/*        assoc_ptr->usage-> */
1440 			/*        grp_used_tres_run_secs[state_ptr->tres_pos]); */
1441 			assoc_ptr = assoc_ptr->usage->parent_assoc_ptr;
1442 		}
1443 
1444 		if (job_ptr && job_ptr->tres_alloc_cnt)
1445 			job_ptr->tres_alloc_cnt[state_ptr->tres_pos] -= size_mb;
1446 
1447 		if (bb_alloc->qos_ptr) {
1448 			bb_alloc->qos_ptr->usage->grp_used_tres[
1449 				state_ptr->tres_pos] += size_mb;
1450 		}
1451 	}
1452 
1453 	return rc;
1454 }
1455 
1456 /* Log deletion of a persistent burst buffer in the database */
bb_post_persist_delete(bb_alloc_t * bb_alloc,bb_state_t * state_ptr)1457 extern int bb_post_persist_delete(bb_alloc_t *bb_alloc, bb_state_t *state_ptr)
1458 {
1459 	int rc = SLURM_SUCCESS;
1460 	slurmdb_reservation_rec_t resv;
1461 	uint64_t size_mb;
1462 
1463 	if (!state_ptr->tres_id) {
1464 		debug2("%s: Not tracking this TRES, "
1465 		       "not sending to the database.", __func__);
1466 		return SLURM_SUCCESS;
1467 	}
1468 
1469 	size_mb = (bb_alloc->size / (1024 * 1024));
1470 
1471 	memset(&resv, 0, sizeof(slurmdb_reservation_rec_t));
1472 	resv.assocs = bb_alloc->assocs;
1473 	resv.cluster = slurmctld_conf.cluster_name;
1474 	resv.name = bb_alloc->name;
1475 	resv.id = bb_alloc->id;
1476 	resv.time_end = time(NULL);
1477 	resv.time_start = bb_alloc->create_time;
1478 	xstrfmtcat(resv.tres_str, "%d=%"PRIu64, state_ptr->tres_id, size_mb);
1479 
1480 	rc = acct_storage_g_remove_reservation(acct_db_conn, &resv);
1481 	xfree(resv.tres_str);
1482 
1483 	if (state_ptr->tres_pos > 0) {
1484 		slurmdb_assoc_rec_t *assoc_ptr = bb_alloc->assoc_ptr;
1485 
1486 		while (assoc_ptr) {
1487 			if (assoc_ptr->usage->grp_used_tres[state_ptr->tres_pos]
1488 			    >= size_mb) {
1489 				assoc_ptr->usage->grp_used_tres[
1490 					state_ptr->tres_pos] -= size_mb;
1491 				debug2("%s: after removing persistent "
1492 				       "bb %s(%u), assoc %u(%s/%s/%s) "
1493 				       "grp_used_tres(%s) is %"PRIu64,
1494 				       __func__, bb_alloc->name, bb_alloc->id,
1495 				       assoc_ptr->id, assoc_ptr->acct,
1496 				       assoc_ptr->user, assoc_ptr->partition,
1497 				       assoc_mgr_tres_name_array[
1498 					       state_ptr->tres_pos],
1499 				       assoc_ptr->usage->
1500 				       grp_used_tres[state_ptr->tres_pos]);
1501 			} else {
1502 				error("%s: underflow removing persistent "
1503 				      "bb %s(%u), assoc %u(%s/%s/%s) "
1504 				      "grp_used_tres(%s) had %"PRIu64
1505 				      " but we are trying to remove %"PRIu64,
1506 				      __func__, bb_alloc->name, bb_alloc->id,
1507 				      assoc_ptr->id, assoc_ptr->acct,
1508 				      assoc_ptr->user, assoc_ptr->partition,
1509 				      assoc_mgr_tres_name_array[
1510 					      state_ptr->tres_pos],
1511 				      assoc_ptr->usage->
1512 				      grp_used_tres[state_ptr->tres_pos],
1513 				      size_mb);
1514 				assoc_ptr->usage->grp_used_tres[
1515 					state_ptr->tres_pos] = 0;
1516 			}
1517 
1518 			/* FIXME: should grp_used_tres_run_secs be
1519 			 * done some how? Same for QOS below. */
1520 			/* debug2("%s: after removing persistent bb %s(%u), " */
1521 			/*        "assoc %u(%s/%s/%s) grp_used_tres_run_secs(%s) " */
1522 			/*        "is %"PRIu64, */
1523 			/*        __func__, bb_alloc->name, bb_alloc->id, */
1524 			/*        assoc_ptr->id, assoc_ptr->acct, */
1525 			/*        assoc_ptr->user, assoc_ptr->partition, */
1526 			/*        assoc_mgr_tres_name_array[state_ptr->tres_pos], */
1527 			/*        assoc_ptr->usage-> */
1528 			/*        grp_used_tres_run_secs[state_ptr->tres_pos]); */
1529 			assoc_ptr = assoc_ptr->usage->parent_assoc_ptr;
1530 		}
1531 
1532 		if (bb_alloc->qos_ptr) {
1533 			if (bb_alloc->qos_ptr->usage->grp_used_tres[
1534 				    state_ptr->tres_pos] >= size_mb)
1535 				bb_alloc->qos_ptr->usage->grp_used_tres[
1536 					state_ptr->tres_pos] -= size_mb;
1537 			else
1538 				bb_alloc->qos_ptr->usage->grp_used_tres[
1539 					state_ptr->tres_pos] = 0;
1540 		}
1541 	}
1542 
1543 	return rc;
1544 }
1545 
1546 /* Determine if the specified pool name is valid on this system */
bb_valid_pool_test(bb_state_t * state_ptr,char * pool_name)1547 extern bool bb_valid_pool_test(bb_state_t *state_ptr, char *pool_name)
1548 {
1549 	burst_buffer_pool_t *pool_ptr;
1550 	int i;
1551 
1552 	xassert(state_ptr);
1553 	if (!pool_name)
1554 		return true;
1555 	if (!xstrcmp(pool_name, state_ptr->bb_config.default_pool))
1556 		return true;
1557 	pool_ptr = state_ptr->bb_config.pool_ptr;
1558 	for (i = 0; i < state_ptr->bb_config.pool_cnt; i++, pool_ptr++) {
1559 		if (!xstrcmp(pool_name, pool_ptr->name))
1560 			return true;
1561 	}
1562 	info("%s: Invalid pool requested (%s)", __func__, pool_name);
1563 
1564 	return false;
1565 }
1566