1 /*****************************************************************************\
2 * burst_buffer_common.c - Common logic for managing burst_buffers
3 *
4 * NOTE: These functions are designed so they can be used by multiple burst
5 * buffer plugins at the same time (e.g. you might provide users access to
6 * both burst_buffer/datawarp and burst_buffer/generic on the same system),
7 * so the state information is largely in the individual plugin and passed
8 * as a pointer argument to these functions.
9 *****************************************************************************
10 * Copyright (C) 2014-2015 SchedMD LLC.
11 * Written by Morris Jette <jette@schedmd.com>
12 *
13 * This file is part of Slurm, a resource management program.
14 * For details, see <https://slurm.schedmd.com/>.
15 * Please also read the included file: DISCLAIMER.
16 *
17 * Slurm is free software; you can redistribute it and/or modify it under
18 * the terms of the GNU General Public License as published by the Free
19 * Software Foundation; either version 2 of the License, or (at your option)
20 * any later version.
21 *
22 * In addition, as a special exception, the copyright holders give permission
23 * to link the code of portions of this program with the OpenSSL library under
24 * certain conditions as described in each individual source file, and
25 * distribute linked combinations including the two. You must obey the GNU
26 * General Public License in all respects for all of the code used other than
27 * OpenSSL. If you modify file(s) with this exception, you may extend this
28 * exception to your version of the file(s), but you are not obligated to do
29 * so. If you do not wish to do so, delete this exception statement from your
30 * version. If you delete this exception statement from all source files in
31 * the program, then also delete it here.
32 *
33 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
34 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
35 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
36 * details.
37 *
38 * You should have received a copy of the GNU General Public License along
39 * with Slurm; if not, write to the Free Software Foundation, Inc.,
40 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
41 \*****************************************************************************/
42
43 #include "config.h"
44
45 #define _GNU_SOURCE /* For POLLRDHUP */
46 #include <fcntl.h>
47 #include <poll.h>
48 #include <stdlib.h>
49 #include <sys/mman.h> /* memfd_create */
50 #include <sys/stat.h>
51 #include <sys/types.h>
52 #include <unistd.h>
53
54 #if defined(__APPLE__) || defined(__DragonFly__) || defined(__NetBSD__)
55 #define POLLRDHUP POLLHUP
56 #include <signal.h>
57 #endif
58
59 #include "slurm/slurm.h"
60 #include "slurm/slurmdb.h"
61
62 #include "src/common/assoc_mgr.h"
63 #include "src/common/list.h"
64 #include "src/common/macros.h"
65 #include "src/common/pack.h"
66 #include "src/common/parse_config.h"
67 #include "src/common/run_command.h"
68 #include "src/common/slurm_accounting_storage.h"
69 #include "src/common/slurm_protocol_api.h"
70 #include "src/common/timers.h"
71 #include "src/common/uid.h"
72 #include "src/common/xmalloc.h"
73 #include "src/common/xstring.h"
74 #include "src/slurmctld/locks.h"
75 #include "src/slurmctld/slurmctld.h"
76
77 #include "burst_buffer_common.h"
78
79 /* For possible future use by burst_buffer/generic */
80 #define _SUPPORT_ALT_POOL 0
81
82 /* Maximum poll wait time for child processes, in milliseconds */
83 #define MAX_POLL_WAIT 500
84
85 static void _bb_job_del2(bb_job_t *bb_job);
86 static uid_t * _parse_users(char *buf);
87 static char * _print_users(uid_t *buf);
88
89 /* Translate comma delimitted list of users into a UID array,
90 * Return value must be xfreed */
_parse_users(char * buf)91 static uid_t *_parse_users(char *buf)
92 {
93 char *tmp, *tok, *save_ptr = NULL;
94 int inx = 0, array_size;
95 uid_t *user_array = NULL;
96
97 if (!buf)
98 return user_array;
99 tmp = xstrdup(buf);
100 array_size = 1;
101 user_array = xmalloc(sizeof(uid_t) * array_size);
102 tok = strtok_r(tmp, ",", &save_ptr);
103 while (tok) {
104 if ((uid_from_string(tok, user_array + inx) == -1) ||
105 (user_array[inx] == 0)) {
106 error("%s: ignoring invalid user: %s", __func__, tok);
107 } else {
108 if (++inx >= array_size) {
109 array_size *= 2;
110 user_array = xrealloc(user_array,
111 sizeof(uid_t)*array_size);
112 }
113 }
114 tok = strtok_r(NULL, ",", &save_ptr);
115 }
116 xfree(tmp);
117 return user_array;
118 }
119
120 /* Translate an array of (zero terminated) UIDs into a string with colon
121 * delimited UIDs
122 * Return value must be xfreed */
_print_users(uid_t * buf)123 static char *_print_users(uid_t *buf)
124 {
125 char *user_elem, *user_str = NULL;
126 int i;
127
128 if (!buf)
129 return user_str;
130 for (i = 0; buf[i]; i++) {
131 user_elem = uid_to_string(buf[i]);
132 if (!user_elem)
133 continue;
134 if (user_str)
135 xstrcat(user_str, ",");
136 xstrcat(user_str, user_elem);
137 xfree(user_elem);
138 }
139 return user_str;
140 }
141
142 /* Allocate burst buffer hash tables */
bb_alloc_cache(bb_state_t * state_ptr)143 extern void bb_alloc_cache(bb_state_t *state_ptr)
144 {
145 state_ptr->bb_ahash = xmalloc(sizeof(bb_alloc_t *) * BB_HASH_SIZE);
146 state_ptr->bb_jhash = xmalloc(sizeof(bb_job_t *) * BB_HASH_SIZE);
147 state_ptr->bb_uhash = xmalloc(sizeof(bb_user_t *) * BB_HASH_SIZE);
148 }
149
150 /* Clear all cached burst buffer records, freeing all memory. */
bb_clear_cache(bb_state_t * state_ptr)151 extern void bb_clear_cache(bb_state_t *state_ptr)
152 {
153 bb_alloc_t *bb_current, *bb_next;
154 bb_job_t *job_current, *job_next;
155 bb_user_t *user_current, *user_next;
156 int i;
157
158 if (state_ptr->bb_ahash) {
159 for (i = 0; i < BB_HASH_SIZE; i++) {
160 bb_current = state_ptr->bb_ahash[i];
161 while (bb_current) {
162 xassert(bb_current->magic == BB_ALLOC_MAGIC);
163 bb_next = bb_current->next;
164 bb_free_alloc_buf(bb_current);
165 bb_current = bb_next;
166 }
167 }
168 xfree(state_ptr->bb_ahash);
169 }
170
171 if (state_ptr->bb_jhash) {
172 for (i = 0; i < BB_HASH_SIZE; i++) {
173 job_current = state_ptr->bb_jhash[i];
174 while (job_current) {
175 xassert(job_current->magic == BB_JOB_MAGIC);
176 job_next = job_current->next;
177 _bb_job_del2(job_current);
178 job_current = job_next;
179 }
180 }
181 xfree(state_ptr->bb_jhash);
182 }
183
184 if (state_ptr->bb_uhash) {
185 for (i = 0; i < BB_HASH_SIZE; i++) {
186 user_current = state_ptr->bb_uhash[i];
187 while (user_current) {
188 xassert(user_current->magic == BB_USER_MAGIC);
189 user_next = user_current->next;
190 xfree(user_current);
191 user_current = user_next;
192 }
193 }
194 xfree(state_ptr->bb_uhash);
195 }
196
197 xfree(state_ptr->name);
198 FREE_NULL_LIST(state_ptr->persist_resv_rec);
199 }
200
201 /* Clear configuration parameters, free memory
202 * config_ptr IN - Initial configuration to be cleared
203 * fini IN - True if shutting down, do more complete clean-up */
bb_clear_config(bb_config_t * config_ptr,bool fini)204 extern void bb_clear_config(bb_config_t *config_ptr, bool fini)
205 {
206 int i;
207
208 xassert(config_ptr);
209 xfree(config_ptr->allow_users);
210 xfree(config_ptr->allow_users_str);
211 xfree(config_ptr->create_buffer);
212 config_ptr->debug_flag = false;
213 xfree(config_ptr->default_pool);
214 xfree(config_ptr->deny_users);
215 xfree(config_ptr->deny_users_str);
216 xfree(config_ptr->destroy_buffer);
217 xfree(config_ptr->get_sys_state);
218 xfree(config_ptr->get_sys_status);
219 config_ptr->granularity = 1;
220 if (fini) {
221 for (i = 0; i < config_ptr->pool_cnt; i++)
222 xfree(config_ptr->pool_ptr[i].name);
223 xfree(config_ptr->pool_ptr);
224 config_ptr->pool_cnt = 0;
225 } else {
226 for (i = 0; i < config_ptr->pool_cnt; i++)
227 config_ptr->pool_ptr[i].total_space = 0;
228 }
229 config_ptr->other_timeout = 0;
230 config_ptr->stage_in_timeout = 0;
231 config_ptr->stage_out_timeout = 0;
232 xfree(config_ptr->start_stage_in);
233 xfree(config_ptr->start_stage_out);
234 xfree(config_ptr->stop_stage_in);
235 xfree(config_ptr->stop_stage_out);
236 config_ptr->validate_timeout = 0;
237 }
238
239 /* Find a per-job burst buffer record for a specific job.
240 * If not found, return NULL. */
bb_find_alloc_rec(bb_state_t * state_ptr,job_record_t * job_ptr)241 extern bb_alloc_t *bb_find_alloc_rec(bb_state_t *state_ptr,
242 job_record_t *job_ptr)
243 {
244 bb_alloc_t *bb_alloc = NULL;
245
246 xassert(job_ptr);
247 xassert(state_ptr);
248 bb_alloc = state_ptr->bb_ahash[job_ptr->user_id % BB_HASH_SIZE];
249 while (bb_alloc) {
250 if (bb_alloc->job_id == job_ptr->job_id) {
251 if (bb_alloc->user_id == job_ptr->user_id) {
252 xassert(bb_alloc->magic == BB_ALLOC_MAGIC);
253 return bb_alloc;
254 }
255 error("%s: Slurm state inconsistent with burst buffer. %pJ has UserID mismatch (%u != %u)",
256 __func__, job_ptr,
257 bb_alloc->user_id, job_ptr->user_id);
258 /* This has been observed when slurmctld crashed and
259 * the job state recovered was missing some jobs
260 * which already had burst buffers configured. */
261 }
262 bb_alloc = bb_alloc->next;
263 }
264 return bb_alloc;
265 }
266
267 /* Find a burst buffer record by name
268 * bb_name IN - Buffer's name
269 * user_id IN - Possible user ID, advisory use only
270 * RET the buffer or NULL if not found */
bb_find_name_rec(char * bb_name,uint32_t user_id,bb_state_t * state_ptr)271 extern bb_alloc_t *bb_find_name_rec(char *bb_name, uint32_t user_id,
272 bb_state_t *state_ptr)
273 {
274 bb_alloc_t *bb_alloc = NULL;
275 int i, hash_inx = user_id % BB_HASH_SIZE;
276
277 /* Try this user ID first */
278 bb_alloc = state_ptr->bb_ahash[hash_inx];
279 while (bb_alloc) {
280 if (!xstrcmp(bb_alloc->name, bb_name))
281 return bb_alloc;
282 bb_alloc = bb_alloc->next;
283 }
284
285 /* Now search all other records */
286 for (i = 0; i < BB_HASH_SIZE; i++) {
287 if (i == hash_inx)
288 continue;
289 bb_alloc = state_ptr->bb_ahash[i];
290 while (bb_alloc) {
291 if (!xstrcmp(bb_alloc->name, bb_name)) {
292 xassert(bb_alloc->magic == BB_ALLOC_MAGIC);
293 return bb_alloc;
294 }
295 bb_alloc = bb_alloc->next;
296 }
297 }
298
299 return bb_alloc;
300 }
301
302 /* Find a per-user burst buffer record for a specific user ID */
bb_find_user_rec(uint32_t user_id,bb_state_t * state_ptr)303 extern bb_user_t *bb_find_user_rec(uint32_t user_id, bb_state_t *state_ptr)
304 {
305 int inx = user_id % BB_HASH_SIZE;
306 bb_user_t *user_ptr;
307
308 xassert(state_ptr);
309 xassert(state_ptr->bb_uhash);
310 user_ptr = state_ptr->bb_uhash[inx];
311 while (user_ptr) {
312 if (user_ptr->user_id == user_id)
313 return user_ptr;
314 user_ptr = user_ptr->next;
315 }
316 user_ptr = xmalloc(sizeof(bb_user_t));
317 user_ptr->magic = BB_USER_MAGIC;
318 user_ptr->next = state_ptr->bb_uhash[inx];
319 /* user_ptr->size = 0; initialized by xmalloc */
320 user_ptr->user_id = user_id;
321 state_ptr->bb_uhash[inx] = user_ptr;
322 return user_ptr;
323 }
324
325 #ifdef HAVE_MEMFD_CREATE
_handle_replacement(job_record_t * job_ptr)326 char *_handle_replacement(job_record_t *job_ptr)
327 {
328 char *replaced = NULL, *p, *q;
329
330 if (!job_ptr->burst_buffer)
331 return xstrdup("");
332
333 /* throw a script header on in case something downstream cares */
334 xstrcat(replaced, "#!/bin/sh\n");
335
336 p = q = job_ptr->burst_buffer;
337
338 while (*p != '\0') {
339 if (*p == '%') {
340 xmemcat(replaced, q, p);
341 p++;
342
343 switch (*p) {
344 case '%': /* '%%' -> '%' */
345 xstrcatchar(replaced, '%');
346 break;
347 case 'A': /* '%A' => array master job id */
348 xstrfmtcat(replaced, "%u",
349 job_ptr->array_job_id);
350 break;
351 case 'a': /* '%a' => array task id */
352 xstrfmtcat(replaced, "%u",
353 job_ptr->array_task_id);
354 break;
355 case 'd': /* '%d' => workdir */
356 xstrcat(replaced, job_ptr->details->work_dir);
357 break;
358 case 'j': /* '%j' => jobid */
359 xstrfmtcat(replaced, "%u", job_ptr->job_id);
360 break;
361 case 'u': /* '%u' => user name */
362 if (!job_ptr->user_name)
363 job_ptr->user_name =
364 uid_to_string_or_null(
365 job_ptr->user_id);
366 xstrcat(replaced, job_ptr->user_name);
367 break;
368 case 'x': /* '%x' => job name */
369 xstrcat(replaced, job_ptr->name);
370 break;
371 default:
372 break;
373 }
374
375 q = ++p;
376 } else if (*p == '\\' && *(p+1) == '\\') {
377 /* '\\' => stop further symbol processing */
378 xstrcat(replaced, p);
379 q = p;
380 break;
381 } else
382 p++;
383 }
384
385 if (p != q)
386 xmemcat(replaced, q, p);
387
388 /* throw an extra terminating newline in for good measure */
389 xstrcat(replaced, "\n");
390
391 return replaced;
392 }
393 #endif
394
bb_handle_job_script(job_record_t * job_ptr,bb_job_t * bb_job)395 char *bb_handle_job_script(job_record_t *job_ptr, bb_job_t *bb_job)
396 {
397 char *script = NULL;
398
399 if (bb_job->memfd_path) {
400 /*
401 * Already have an existing symbol-replaced script, so use it.
402 */
403 return xstrdup(bb_job->memfd_path);
404 }
405
406 if (bb_job->need_symbol_replacement) {
407 #ifdef HAVE_MEMFD_CREATE
408 /*
409 * Create a memfd-backed temporary file to write out the
410 * symbol-replaced BB script. memfd files will automatically be
411 * cleaned up on process termination. This will be recreated if
412 * the slurmctld restarts, otherwise kept in memory for the
413 * lifespan of the job.
414 */
415 char *filename = NULL, *bb;
416 pid_t pid = getpid();
417
418 xstrfmtcat(filename, "bb_job_script.%u", job_ptr->job_id);
419
420 bb_job->memfd = memfd_create(filename, MFD_CLOEXEC);
421 if (bb_job->memfd < 0)
422 fatal("%s: failed memfd_create: %m", __func__);
423 xstrfmtcat(bb_job->memfd_path, "/proc/%lu/fd/%d",
424 (unsigned long) pid, bb_job->memfd);
425
426 bb = _handle_replacement(job_ptr);
427 safe_write(bb_job->memfd, bb, strlen(bb));
428 xfree(bb);
429
430 return xstrdup(bb_job->memfd_path);
431
432 rwfail:
433 xfree(bb);
434 fatal("%s: could not write script file, likely out of memory",
435 __func__);
436 #else
437 error("%s: symbol replacement requested, but not available as memfd_create() could not be found at compile time. "
438 "Falling back to the unreplaced job script.",
439 __func__);
440 #endif
441 }
442
443 xstrfmtcat(script, "%s/hash.%d/job.%u/script",
444 slurmctld_conf.state_save_location, (job_ptr->job_id % 10),
445 job_ptr->job_id);
446
447 return script;
448 }
449
450 #if _SUPPORT_ALT_POOL
_atoi(char * tok)451 static uint64_t _atoi(char *tok)
452 {
453 char *end_ptr = NULL;
454 int64_t size_i;
455 uint64_t mult, size_u = 0;
456
457 size_i = (int64_t) strtoll(tok, &end_ptr, 10);
458 if (size_i > 0) {
459 size_u = (uint64_t) size_i;
460 if ((mult = suffix_mult(end_ptr)) != NO_VAL64)
461 size_u *= mult;
462 }
463 return size_u;
464 }
465 #endif
466
467 /* Set the bb_state's tres_id and tres_pos for limit enforcement.
468 * Value is set to -1 if not found. */
bb_set_tres_pos(bb_state_t * state_ptr)469 extern void bb_set_tres_pos(bb_state_t *state_ptr)
470 {
471 slurmdb_tres_rec_t tres_rec;
472 int inx;
473
474 xassert(state_ptr);
475 memset(&tres_rec, 0, sizeof(slurmdb_tres_rec_t));
476 tres_rec.type = "bb";
477 tres_rec.name = state_ptr->name;
478 inx = assoc_mgr_find_tres_pos(&tres_rec, false);
479 state_ptr->tres_pos = inx;
480 if (inx == -1) {
481 debug3("%s: Tres %s not found by assoc_mgr",
482 __func__, state_ptr->name);
483 } else {
484 state_ptr->tres_id = assoc_mgr_tres_array[inx]->id;
485 }
486 }
487
488 /* Load and process configuration parameters */
bb_load_config(bb_state_t * state_ptr,char * plugin_type)489 extern void bb_load_config(bb_state_t *state_ptr, char *plugin_type)
490 {
491 s_p_hashtbl_t *bb_hashtbl = NULL;
492 char *bb_conf, *tmp = NULL, *value;
493 #if _SUPPORT_ALT_POOL
494 char *colon, *save_ptr = NULL, *tok;
495 uint32_t pool_cnt;
496 #endif
497 int fd, i;
498 static s_p_options_t bb_options[] = {
499 {"AllowUsers", S_P_STRING},
500 #if _SUPPORT_ALT_POOL
501 {"AltPool", S_P_STRING},
502 #endif
503 {"CreateBuffer", S_P_STRING},
504 {"DefaultPool", S_P_STRING},
505 {"DenyUsers", S_P_STRING},
506 {"DestroyBuffer", S_P_STRING},
507 {"Flags", S_P_STRING},
508 {"GetSysState", S_P_STRING},
509 {"GetSysStatus", S_P_STRING},
510 {"Granularity", S_P_STRING},
511 {"OtherTimeout", S_P_UINT32},
512 {"StageInTimeout", S_P_UINT32},
513 {"StageOutTimeout", S_P_UINT32},
514 {"StartStageIn", S_P_STRING},
515 {"StartStageOut", S_P_STRING},
516 {"StopStageIn", S_P_STRING},
517 {"StopStageOut", S_P_STRING},
518 {"ValidateTimeout", S_P_UINT32},
519 {NULL}
520 };
521
522 xfree(state_ptr->name);
523 if (plugin_type) {
524 tmp = strchr(plugin_type, '/');
525 if (tmp)
526 tmp++;
527 else
528 tmp = plugin_type;
529 state_ptr->name = xstrdup(tmp);
530 }
531
532 /* Set default configuration */
533 bb_clear_config(&state_ptr->bb_config, false);
534 if (slurm_get_debug_flags() & DEBUG_FLAG_BURST_BUF)
535 state_ptr->bb_config.debug_flag = true;
536 state_ptr->bb_config.flags |= BB_FLAG_DISABLE_PERSISTENT;
537 state_ptr->bb_config.other_timeout = DEFAULT_OTHER_TIMEOUT;
538 state_ptr->bb_config.stage_in_timeout = DEFAULT_STATE_IN_TIMEOUT;
539 state_ptr->bb_config.stage_out_timeout = DEFAULT_STATE_OUT_TIMEOUT;
540 state_ptr->bb_config.validate_timeout = DEFAULT_VALIDATE_TIMEOUT;
541
542 /* First look for "burst_buffer.conf" then with "type" field,
543 * for example "burst_buffer_datawarp.conf" */
544 bb_conf = get_extra_conf_path("burst_buffer.conf");
545 fd = open(bb_conf, 0);
546 if (fd >= 0) {
547 close(fd);
548 } else {
549 char *new_path = NULL;
550 xfree(bb_conf);
551 xstrfmtcat(new_path, "burst_buffer_%s.conf", state_ptr->name);
552 bb_conf = get_extra_conf_path(new_path);
553 fd = open(bb_conf, 0);
554 if (fd < 0) {
555 info("%s: Unable to find configuration file %s or "
556 "burst_buffer.conf", __func__, new_path);
557 xfree(bb_conf);
558 xfree(new_path);
559 return;
560 }
561 close(fd);
562 xfree(new_path);
563 }
564
565 bb_hashtbl = s_p_hashtbl_create(bb_options);
566 if (s_p_parse_file(bb_hashtbl, NULL, bb_conf, false) == SLURM_ERROR) {
567 fatal("%s: something wrong with opening/reading %s: %m",
568 __func__, bb_conf);
569 }
570 if (s_p_get_string(&state_ptr->bb_config.allow_users_str, "AllowUsers",
571 bb_hashtbl)) {
572 state_ptr->bb_config.allow_users = _parse_users(
573 state_ptr->bb_config.allow_users_str);
574 }
575 s_p_get_string(&state_ptr->bb_config.create_buffer, "CreateBuffer",
576 bb_hashtbl);
577 s_p_get_string(&state_ptr->bb_config.default_pool, "DefaultPool",
578 bb_hashtbl);
579 if (s_p_get_string(&state_ptr->bb_config.deny_users_str, "DenyUsers",
580 bb_hashtbl)) {
581 state_ptr->bb_config.deny_users = _parse_users(
582 state_ptr->bb_config.deny_users_str);
583 }
584 s_p_get_string(&state_ptr->bb_config.destroy_buffer, "DestroyBuffer",
585 bb_hashtbl);
586
587 if (s_p_get_string(&tmp, "Flags", bb_hashtbl)) {
588 state_ptr->bb_config.flags = slurm_bb_str2flags(tmp);
589 xfree(tmp);
590 }
591 /* By default, disable persistent buffer creation by normal users */
592 if (state_ptr->bb_config.flags & BB_FLAG_ENABLE_PERSISTENT)
593 state_ptr->bb_config.flags &= (~BB_FLAG_DISABLE_PERSISTENT);
594
595 s_p_get_string(&state_ptr->bb_config.get_sys_state, "GetSysState",
596 bb_hashtbl);
597 s_p_get_string(&state_ptr->bb_config.get_sys_status, "GetSysStatus",
598 bb_hashtbl);
599 if (s_p_get_string(&tmp, "Granularity", bb_hashtbl)) {
600 state_ptr->bb_config.granularity = bb_get_size_num(tmp, 1);
601 xfree(tmp);
602 if (state_ptr->bb_config.granularity == 0) {
603 error("%s: Granularity=0 is invalid", __func__);
604 state_ptr->bb_config.granularity = 1;
605 }
606 }
607 #if _SUPPORT_ALT_POOL
608 if (s_p_get_string(&tmp, "AltPool", bb_hashtbl)) {
609 tok = strtok_r(tmp, ",", &save_ptr);
610 while (tok) {
611 colon = strchr(tok, ':');
612 if (colon) {
613 colon[0] = '\0';
614 pool_cnt = _atoi(colon + 1);
615 } else
616 pool_cnt = 1;
617 state_ptr->bb_config.pool_ptr = xrealloc(
618 state_ptr->bb_config.pool_ptr,
619 sizeof(burst_buffer_pool_t) *
620 (state_ptr->bb_config.pool_cnt + 1));
621 state_ptr->bb_config.
622 pool_ptr[state_ptr->bb_config.pool_cnt].name =
623 xstrdup(tok);
624 state_ptr->bb_config.
625 pool_ptr[state_ptr->bb_config.pool_cnt].
626 avail_space = pool_cnt;
627 state_ptr->bb_config.pool_cnt++;
628 tok = strtok_r(NULL, ",", &save_ptr);
629 }
630 xfree(tmp);
631 }
632 #endif
633
634 (void) s_p_get_uint32(&state_ptr->bb_config.other_timeout,
635 "OtherTimeout", bb_hashtbl);
636 (void) s_p_get_uint32(&state_ptr->bb_config.stage_in_timeout,
637 "StageInTimeout", bb_hashtbl);
638 (void) s_p_get_uint32(&state_ptr->bb_config.stage_out_timeout,
639 "StageOutTimeout", bb_hashtbl);
640 s_p_get_string(&state_ptr->bb_config.start_stage_in, "StartStageIn",
641 bb_hashtbl);
642 s_p_get_string(&state_ptr->bb_config.start_stage_out, "StartStageOut",
643 bb_hashtbl);
644 s_p_get_string(&state_ptr->bb_config.stop_stage_in, "StopStageIn",
645 bb_hashtbl);
646 s_p_get_string(&state_ptr->bb_config.stop_stage_out, "StopStageOut",
647 bb_hashtbl);
648 (void) s_p_get_uint32(&state_ptr->bb_config.validate_timeout,
649 "ValidateTimeout", bb_hashtbl);
650
651 s_p_hashtbl_destroy(bb_hashtbl);
652 xfree(bb_conf);
653
654 if (state_ptr->bb_config.debug_flag) {
655 value = _print_users(state_ptr->bb_config.allow_users);
656 info("%s: AllowUsers:%s", __func__, value);
657 xfree(value);
658 info("%s: CreateBuffer:%s", __func__,
659 state_ptr->bb_config.create_buffer);
660 info("%s: DefaultPool:%s", __func__,
661 state_ptr->bb_config.default_pool);
662 value = _print_users(state_ptr->bb_config.deny_users);
663 info("%s: DenyUsers:%s", __func__, value);
664 xfree(value);
665 info("%s: DestroyBuffer:%s", __func__,
666 state_ptr->bb_config.destroy_buffer);
667 info("%s: GetSysState:%s", __func__,
668 state_ptr->bb_config.get_sys_state);
669 info("%s: GetSysStatus:%s", __func__,
670 state_ptr->bb_config.get_sys_status);
671 info("%s: Granularity:%"PRIu64"", __func__,
672 state_ptr->bb_config.granularity);
673 for (i = 0; i < state_ptr->bb_config.pool_cnt; i++) {
674 info("%s: AltPoolName[%d]:%s:%"PRIu64"", __func__, i,
675 state_ptr->bb_config.pool_ptr[i].name,
676 state_ptr->bb_config.pool_ptr[i].total_space);
677 }
678 info("%s: OtherTimeout:%u", __func__,
679 state_ptr->bb_config.other_timeout);
680 info("%s: StageInTimeout:%u", __func__,
681 state_ptr->bb_config.stage_in_timeout);
682 info("%s: StageOutTimeout:%u", __func__,
683 state_ptr->bb_config.stage_out_timeout);
684 info("%s: StartStageIn:%s", __func__,
685 state_ptr->bb_config.start_stage_in);
686 info("%s: StartStageOut:%s", __func__,
687 state_ptr->bb_config.start_stage_out);
688 info("%s: StopStageIn:%s", __func__,
689 state_ptr->bb_config.stop_stage_in);
690 info("%s: StopStageOut:%s", __func__,
691 state_ptr->bb_config.stop_stage_out);
692 info("%s: ValidateTimeout:%u", __func__,
693 state_ptr->bb_config.validate_timeout);
694 }
695 }
696
_pack_alloc(struct bb_alloc * bb_alloc,Buf buffer,uint16_t protocol_version)697 static void _pack_alloc(struct bb_alloc *bb_alloc, Buf buffer,
698 uint16_t protocol_version)
699 {
700 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
701 packstr(bb_alloc->account, buffer);
702 pack32(bb_alloc->array_job_id, buffer);
703 pack32(bb_alloc->array_task_id, buffer);
704 pack_time(bb_alloc->create_time, buffer);
705 pack32(bb_alloc->job_id, buffer);
706 packstr(bb_alloc->name, buffer);
707 packstr(bb_alloc->partition, buffer);
708 packstr(bb_alloc->pool, buffer);
709 packstr(bb_alloc->qos, buffer);
710 pack64(bb_alloc->size, buffer);
711 pack16(bb_alloc->state, buffer);
712 pack32(bb_alloc->user_id, buffer);
713 }
714 }
715
716 /* Pack individual burst buffer records into a buffer */
bb_pack_bufs(uid_t uid,bb_state_t * state_ptr,Buf buffer,uint16_t protocol_version)717 extern int bb_pack_bufs(uid_t uid, bb_state_t *state_ptr, Buf buffer,
718 uint16_t protocol_version)
719 {
720 int i, rec_count = 0;
721 struct bb_alloc *bb_alloc;
722 int eof, offset;
723
724 xassert(state_ptr);
725 offset = get_buf_offset(buffer);
726 pack32(rec_count, buffer);
727 if (!state_ptr->bb_ahash)
728 return rec_count;
729
730 for (i = 0; i < BB_HASH_SIZE; i++) {
731 bb_alloc = state_ptr->bb_ahash[i];
732 while (bb_alloc) {
733 if ((uid == 0) || (uid == bb_alloc->user_id)) {
734 _pack_alloc(bb_alloc, buffer, protocol_version);
735 rec_count++;
736 }
737 bb_alloc = bb_alloc->next;
738 }
739 }
740 if (rec_count != 0) {
741 eof = get_buf_offset(buffer);
742 set_buf_offset(buffer, offset);
743 pack32(rec_count, buffer);
744 set_buf_offset(buffer, eof);
745 }
746
747 return rec_count;
748 }
749
750 /* Pack state and configuration parameters into a buffer */
bb_pack_state(bb_state_t * state_ptr,Buf buffer,uint16_t protocol_version)751 extern void bb_pack_state(bb_state_t *state_ptr, Buf buffer,
752 uint16_t protocol_version)
753 {
754 bb_config_t *config_ptr = &state_ptr->bb_config;
755 int i;
756
757
758 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
759 packstr(config_ptr->allow_users_str, buffer);
760 packstr(config_ptr->create_buffer, buffer);
761 packstr(config_ptr->default_pool, buffer);
762 packstr(config_ptr->deny_users_str, buffer);
763 packstr(config_ptr->destroy_buffer, buffer);
764 pack32(config_ptr->flags, buffer);
765 packstr(config_ptr->get_sys_state, buffer);
766 packstr(config_ptr->get_sys_status, buffer);
767 pack64(config_ptr->granularity, buffer);
768 pack32(config_ptr->pool_cnt, buffer);
769 for (i = 0; i < config_ptr->pool_cnt; i++) {
770 packstr(config_ptr->pool_ptr[i].name, buffer);
771 pack64(config_ptr->pool_ptr[i].total_space, buffer);
772 pack64(config_ptr->pool_ptr[i].granularity, buffer);
773 pack64(config_ptr->pool_ptr[i].unfree_space, buffer);
774 pack64(config_ptr->pool_ptr[i].used_space, buffer);
775 }
776 pack32(config_ptr->other_timeout, buffer);
777 packstr(config_ptr->start_stage_in, buffer);
778 packstr(config_ptr->start_stage_out, buffer);
779 packstr(config_ptr->stop_stage_in, buffer);
780 packstr(config_ptr->stop_stage_out, buffer);
781 pack32(config_ptr->stage_in_timeout, buffer);
782 pack32(config_ptr->stage_out_timeout,buffer);
783 pack64(state_ptr->total_space, buffer);
784 pack64(state_ptr->unfree_space, buffer);
785 pack64(state_ptr->used_space, buffer);
786 pack32(config_ptr->validate_timeout, buffer);
787 }
788 }
789
790 /* Pack individual burst buffer usage records into a buffer (used for limits) */
bb_pack_usage(uid_t uid,bb_state_t * state_ptr,Buf buffer,uint16_t protocol_version)791 extern int bb_pack_usage(uid_t uid, bb_state_t *state_ptr, Buf buffer,
792 uint16_t protocol_version)
793 {
794 int i, rec_count = 0;
795 bb_user_t *bb_usage;
796 int eof, offset;
797
798 xassert(state_ptr);
799 offset = get_buf_offset(buffer);
800 pack32(rec_count, buffer);
801 if (!state_ptr->bb_uhash)
802 return rec_count;
803
804 for (i = 0; i < BB_HASH_SIZE; i++) {
805 bb_usage = state_ptr->bb_uhash[i];
806 while (bb_usage) {
807 if (((uid == 0) || (uid == bb_usage->user_id)) &&
808 (bb_usage->size != 0)) {
809 pack64(bb_usage->size, buffer);
810 pack32(bb_usage->user_id, buffer);
811 rec_count++;
812 }
813 bb_usage = bb_usage->next;
814 }
815 }
816 if (rec_count != 0) {
817 eof = get_buf_offset(buffer);
818 set_buf_offset(buffer, offset);
819 pack32(rec_count, buffer);
820 set_buf_offset(buffer, eof);
821 }
822
823 return rec_count;
824 }
825
826 /* Translate a burst buffer size specification in string form to numeric form,
827 * recognizing various (case insensitive) sufficies:
828 * K/KiB, M/MiB, G/GiB, T/TiB, P/PiB for powers of 1024
829 * KB, MB, GB, TB, PB for powers of 1000
830 * N/Node/Nodes will consider the size in nodes
831 * Default units are bytes. */
bb_get_size_num(char * tok,uint64_t granularity)832 extern uint64_t bb_get_size_num(char *tok, uint64_t granularity)
833 {
834 char *tmp = NULL, *unit;
835 uint64_t bb_size_i, mult;
836 uint64_t bb_size_u = 0;
837
838 bb_size_i = (uint64_t) strtoull(tok, &tmp, 10);
839 if ((bb_size_i > 0) && tmp) {
840 bb_size_u = bb_size_i;
841 unit = xstrdup(tmp);
842 strtok(unit, " ");
843 if (!xstrcasecmp(unit, "n") ||
844 !xstrcasecmp(unit, "node") ||
845 !xstrcasecmp(unit, "nodes")) {
846 bb_size_u |= BB_SIZE_IN_NODES;
847 granularity = 1;
848 } else if ((mult = suffix_mult(unit)) != NO_VAL64) {
849 bb_size_u *= mult;
850 }
851 xfree(unit);
852 }
853
854 if (granularity > 1) {
855 bb_size_u = ((bb_size_u + granularity - 1) / granularity) *
856 granularity;
857 }
858
859 return bb_size_u;
860 }
861
862 /* Translate a burst buffer size specification in numeric form to string form,
863 * appending various sufficies (KiB, MiB, GB, TB, PB, and Nodes). Default units
864 * are bytes. */
bb_get_size_str(uint64_t size)865 extern char *bb_get_size_str(uint64_t size)
866 {
867 static char size_str[64];
868
869 if (size == 0) {
870 snprintf(size_str, sizeof(size_str), "%"PRIu64, size);
871 } else if (size & BB_SIZE_IN_NODES) {
872 size &= (~BB_SIZE_IN_NODES);
873 snprintf(size_str, sizeof(size_str), "%"PRIu64"N", size);
874
875 } else if ((size % ((uint64_t)1024 * 1024 * 1024 * 1024 * 1024)) == 0) {
876 size /= ((uint64_t)1024 * 1024 * 1024 * 1024 * 1024);
877 snprintf(size_str, sizeof(size_str), "%"PRIu64"PiB", size);
878 } else if ((size % ((uint64_t)1000 * 1000 * 1000 * 1000 * 1000)) == 0) {
879 size /= ((uint64_t)1000 * 1000 * 1000 * 1000 * 1000);
880 snprintf(size_str, sizeof(size_str), "%"PRIu64"PB", size);
881
882 } else if ((size % ((uint64_t)1024 * 1024 * 1024 * 1024)) == 0) {
883 size /= ((uint64_t)1024 * 1024 * 1024 * 1024);
884 snprintf(size_str, sizeof(size_str), "%"PRIu64"TiB", size);
885 } else if ((size % ((uint64_t)1000 * 1000 * 1000 * 1000)) == 0) {
886 size /= ((uint64_t)1000 * 1000 * 1000 * 1000);
887 snprintf(size_str, sizeof(size_str), "%"PRIu64"TB", size);
888
889 } else if ((size % ((uint64_t)1024 * 1024 * 1024)) == 0) {
890 size /= ((uint64_t)1024 * 1024 * 1024);
891 snprintf(size_str, sizeof(size_str), "%"PRIu64"GiB", size);
892 } else if ((size % ((uint64_t)1000 * 1000 * 1000)) == 0) {
893 size /= ((uint64_t)1000 * 1000 * 1000);
894 snprintf(size_str, sizeof(size_str), "%"PRIu64"GB", size);
895
896 } else if ((size % ((uint64_t)1024 * 1024)) == 0) {
897 size /= ((uint64_t)1024 * 1024);
898 snprintf(size_str, sizeof(size_str), "%"PRIu64"MiB", size);
899 } else if ((size % ((uint64_t)1000 * 1000)) == 0) {
900 size /= ((uint64_t)1000 * 1000);
901 snprintf(size_str, sizeof(size_str), "%"PRIu64"MB", size);
902
903 } else if ((size % ((uint64_t)1024)) == 0) {
904 size /= ((uint64_t)1024);
905 snprintf(size_str, sizeof(size_str), "%"PRIu64"KiB", size);
906 } else if ((size % ((uint64_t)1000)) == 0) {
907 size /= ((uint64_t)1000);
908 snprintf(size_str, sizeof(size_str), "%"PRIu64"KB", size);
909
910 } else {
911 snprintf(size_str, sizeof(size_str), "%"PRIu64, size);
912 }
913
914 return size_str;
915 }
916
917 /* Round up a number based upon some granularity */
bb_granularity(uint64_t start_size,uint64_t granularity)918 extern uint64_t bb_granularity(uint64_t start_size, uint64_t granularity)
919 {
920 if (start_size) {
921 start_size = start_size + granularity - 1;
922 start_size /= granularity;
923 start_size *= granularity;
924 }
925 return start_size;
926 }
927
bb_job_queue_del(void * x)928 extern void bb_job_queue_del(void *x)
929 {
930 xfree(x);
931 }
932
933 /* Sort job queue by expected start time */
bb_job_queue_sort(void * x,void * y)934 extern int bb_job_queue_sort(void *x, void *y)
935 {
936 bb_job_queue_rec_t *job_rec1 = *(bb_job_queue_rec_t **) x;
937 bb_job_queue_rec_t *job_rec2 = *(bb_job_queue_rec_t **) y;
938 job_record_t *job_ptr1 = job_rec1->job_ptr;
939 job_record_t *job_ptr2 = job_rec2->job_ptr;
940
941 if (job_ptr1->start_time > job_ptr2->start_time)
942 return 1;
943 if (job_ptr1->start_time < job_ptr2->start_time)
944 return -1;
945 return 0;
946 }
947
948 /* Sort preempt_bb_recs in order of DECREASING use_time */
bb_preempt_queue_sort(void * x,void * y)949 extern int bb_preempt_queue_sort(void *x, void *y)
950 {
951 struct preempt_bb_recs *bb_ptr1 = *(struct preempt_bb_recs **) x;
952 struct preempt_bb_recs *bb_ptr2 = *(struct preempt_bb_recs **) y;
953
954 if (bb_ptr1->use_time > bb_ptr2->use_time)
955 return -1;
956 if (bb_ptr1->use_time < bb_ptr2->use_time)
957 return 1;
958 return 0;
959 };
960
961 /* For each burst buffer record, set the use_time to the time at which its
962 * use is expected to begin (i.e. each job's expected start time) */
bb_set_use_time(bb_state_t * state_ptr)963 extern void bb_set_use_time(bb_state_t *state_ptr)
964 {
965 job_record_t *job_ptr;
966 bb_alloc_t *bb_alloc = NULL;
967 time_t now = time(NULL);
968 int i;
969
970 state_ptr->next_end_time = now + 60 * 60; /* Start estimate now+1hour */
971 for (i = 0; i < BB_HASH_SIZE; i++) {
972 bb_alloc = state_ptr->bb_ahash[i];
973 while (bb_alloc) {
974 if (bb_alloc->job_id &&
975 ((bb_alloc->state == BB_STATE_STAGING_IN) ||
976 (bb_alloc->state == BB_STATE_STAGED_IN))) {
977 job_ptr = find_job_record(bb_alloc->job_id);
978 if (!job_ptr && !bb_alloc->orphaned) {
979 bb_alloc->orphaned = true;
980 error("%s: JobId=%u not found for allocated burst buffer",
981 __func__, bb_alloc->job_id);
982 bb_alloc->use_time = now + 24 * 60 * 60;
983 } else if (!job_ptr) {
984 bb_alloc->use_time = now + 24 * 60 * 60;
985 } else if (job_ptr->start_time) {
986 bb_alloc->end_time = job_ptr->end_time;
987 bb_alloc->use_time = job_ptr->start_time;
988 } else {
989 /* Unknown start time */
990 bb_alloc->use_time = now + 60 * 60;
991 }
992 } else if (bb_alloc->job_id) {
993 job_ptr = find_job_record(bb_alloc->job_id);
994 if (job_ptr)
995 bb_alloc->end_time = job_ptr->end_time;
996 } else {
997 bb_alloc->use_time = now;
998 }
999 if (bb_alloc->end_time && bb_alloc->size) {
1000 if (bb_alloc->end_time <= now)
1001 state_ptr->next_end_time = now;
1002 else if (state_ptr->next_end_time >
1003 bb_alloc->end_time) {
1004 state_ptr->next_end_time =
1005 bb_alloc->end_time;
1006 }
1007 }
1008 bb_alloc = bb_alloc->next;
1009 }
1010 }
1011 }
1012
1013 /* Sleep function, also handles termination signal */
bb_sleep(bb_state_t * state_ptr,int add_secs)1014 extern void bb_sleep(bb_state_t *state_ptr, int add_secs)
1015 {
1016 struct timespec ts = {0, 0};
1017 struct timeval tv = {0, 0};
1018
1019 if (gettimeofday(&tv, NULL)) { /* Some error */
1020 sleep(1);
1021 return;
1022 }
1023
1024 ts.tv_sec = tv.tv_sec + add_secs;
1025 ts.tv_nsec = tv.tv_usec * 1000;
1026 slurm_mutex_lock(&state_ptr->term_mutex);
1027 if (!state_ptr->term_flag) {
1028 slurm_cond_timedwait(&state_ptr->term_cond,
1029 &state_ptr->term_mutex, &ts);
1030 }
1031 slurm_mutex_unlock(&state_ptr->term_mutex);
1032 }
1033
1034
1035 /* Allocate a named burst buffer record for a specific user.
1036 * Return a pointer to that record.
1037 * Use bb_free_name_rec() to purge the returned record. */
bb_alloc_name_rec(bb_state_t * state_ptr,char * name,uint32_t user_id)1038 extern bb_alloc_t *bb_alloc_name_rec(bb_state_t *state_ptr, char *name,
1039 uint32_t user_id)
1040 {
1041 bb_alloc_t *bb_alloc = NULL;
1042 time_t now = time(NULL);
1043 int i;
1044
1045 xassert(state_ptr->bb_ahash);
1046 state_ptr->last_update_time = now;
1047 bb_alloc = xmalloc(sizeof(bb_alloc_t));
1048 i = user_id % BB_HASH_SIZE;
1049 bb_alloc->magic = BB_ALLOC_MAGIC;
1050 bb_alloc->next = state_ptr->bb_ahash[i];
1051 state_ptr->bb_ahash[i] = bb_alloc;
1052 bb_alloc->array_task_id = NO_VAL;
1053 bb_alloc->name = xstrdup(name);
1054 bb_alloc->state = BB_STATE_ALLOCATED;
1055 bb_alloc->state_time = now;
1056 bb_alloc->seen_time = now;
1057 bb_alloc->user_id = user_id;
1058
1059 return bb_alloc;
1060 }
1061
1062 /* Allocate a per-job burst buffer record for a specific job.
1063 * Return a pointer to that record.
1064 * Use bb_free_alloc_rec() to purge the returned record. */
bb_alloc_job_rec(bb_state_t * state_ptr,job_record_t * job_ptr,bb_job_t * bb_job)1065 extern bb_alloc_t *bb_alloc_job_rec(bb_state_t *state_ptr,
1066 job_record_t *job_ptr,
1067 bb_job_t *bb_job)
1068 {
1069 bb_alloc_t *bb_alloc = NULL;
1070 int i;
1071
1072 xassert(state_ptr->bb_ahash);
1073 xassert(job_ptr);
1074 state_ptr->last_update_time = time(NULL);
1075 bb_alloc = xmalloc(sizeof(bb_alloc_t));
1076 bb_alloc->account = xstrdup(bb_job->account);
1077 bb_alloc->array_job_id = job_ptr->array_job_id;
1078 bb_alloc->array_task_id = job_ptr->array_task_id;
1079 bb_alloc->assoc_ptr = job_ptr->assoc_ptr;
1080 bb_alloc->job_id = job_ptr->job_id;
1081 bb_alloc->magic = BB_ALLOC_MAGIC;
1082 i = job_ptr->user_id % BB_HASH_SIZE;
1083 xstrfmtcat(bb_alloc->name, "%u", job_ptr->job_id);
1084 bb_alloc->next = state_ptr->bb_ahash[i];
1085 bb_alloc->partition = xstrdup(bb_job->partition);
1086 bb_alloc->pool = xstrdup(bb_job->job_pool);
1087 bb_alloc->qos = xstrdup(bb_job->qos);
1088 state_ptr->bb_ahash[i] = bb_alloc;
1089 bb_alloc->size = bb_job->total_size;
1090 bb_alloc->state = BB_STATE_ALLOCATED;
1091 bb_alloc->state_time = time(NULL);
1092 bb_alloc->seen_time = time(NULL);
1093 bb_alloc->user_id = job_ptr->user_id;
1094
1095 return bb_alloc;
1096 }
1097
1098 /* Allocate a burst buffer record for a job and increase the job priority
1099 * if so configured.
1100 * Use bb_free_alloc_rec() to purge the returned record. */
bb_alloc_job(bb_state_t * state_ptr,job_record_t * job_ptr,bb_job_t * bb_job)1101 extern bb_alloc_t *bb_alloc_job(bb_state_t *state_ptr, job_record_t *job_ptr,
1102 bb_job_t *bb_job)
1103 {
1104 bb_alloc_t *bb_alloc;
1105
1106 bb_alloc = bb_alloc_job_rec(state_ptr, job_ptr, bb_job);
1107
1108 return bb_alloc;
1109 }
1110
1111 /* Free memory associated with allocated bb record, caller is responsible for
1112 * maintaining linked list */
bb_free_alloc_buf(bb_alloc_t * bb_alloc)1113 extern void bb_free_alloc_buf(bb_alloc_t *bb_alloc)
1114 {
1115 if (bb_alloc) {
1116 xassert(bb_alloc->magic == BB_ALLOC_MAGIC);
1117 bb_alloc->magic = 0;
1118 xfree(bb_alloc->account);
1119 xfree(bb_alloc->assocs);
1120 xfree(bb_alloc->name);
1121 xfree(bb_alloc->partition);
1122 xfree(bb_alloc->pool);
1123 xfree(bb_alloc->qos);
1124 xfree(bb_alloc);
1125 }
1126 }
1127
1128
1129 /* Remove a specific bb_alloc_t from global records.
1130 * RET true if found, false otherwise */
bb_free_alloc_rec(bb_state_t * state_ptr,bb_alloc_t * bb_alloc)1131 extern bool bb_free_alloc_rec(bb_state_t *state_ptr, bb_alloc_t *bb_alloc)
1132 {
1133 bb_alloc_t *bb_link, **bb_plink;
1134 int i;
1135
1136 xassert(state_ptr);
1137 xassert(state_ptr->bb_ahash);
1138 xassert(bb_alloc);
1139
1140 i = bb_alloc->user_id % BB_HASH_SIZE;
1141 bb_plink = &state_ptr->bb_ahash[i];
1142 bb_link = state_ptr->bb_ahash[i];
1143 while (bb_link) {
1144 if (bb_link == bb_alloc) {
1145 xassert(bb_link->magic == BB_ALLOC_MAGIC);
1146 *bb_plink = bb_alloc->next;
1147 bb_free_alloc_buf(bb_alloc);
1148 state_ptr->last_update_time = time(NULL);
1149 return true;
1150 }
1151 bb_plink = &bb_link->next;
1152 bb_link = bb_link->next;
1153 }
1154 return false;
1155 }
1156
1157 /* Allocate a bb_job_t record, hashed by job_id, delete with bb_job_del() */
bb_job_alloc(bb_state_t * state_ptr,uint32_t job_id)1158 extern bb_job_t *bb_job_alloc(bb_state_t *state_ptr, uint32_t job_id)
1159 {
1160 int inx = job_id % BB_HASH_SIZE;
1161 bb_job_t *bb_job = xmalloc(sizeof(bb_job_t));
1162
1163 xassert(state_ptr);
1164 bb_job->magic = BB_JOB_MAGIC;
1165 bb_job->next = state_ptr->bb_jhash[inx];
1166 bb_job->job_id = job_id;
1167 state_ptr->bb_jhash[inx] = bb_job;
1168
1169 return bb_job;
1170 }
1171
1172 /* Return a pointer to the existing bb_job_t record for a given job_id or
1173 * NULL if not found */
bb_job_find(bb_state_t * state_ptr,uint32_t job_id)1174 extern bb_job_t *bb_job_find(bb_state_t *state_ptr, uint32_t job_id)
1175 {
1176 bb_job_t *bb_job;
1177
1178 xassert(state_ptr);
1179
1180 if (!state_ptr->bb_jhash)
1181 return NULL;
1182
1183 bb_job = state_ptr->bb_jhash[job_id % BB_HASH_SIZE];
1184 while (bb_job) {
1185 if (bb_job->job_id == job_id) {
1186 xassert(bb_job->magic == BB_JOB_MAGIC);
1187 return bb_job;
1188 }
1189 bb_job = bb_job->next;
1190 }
1191
1192 return bb_job;
1193 }
1194
1195 /* Delete a bb_job_t record, hashed by job_id */
bb_job_del(bb_state_t * state_ptr,uint32_t job_id)1196 extern void bb_job_del(bb_state_t *state_ptr, uint32_t job_id)
1197 {
1198 int inx = job_id % BB_HASH_SIZE;
1199 bb_job_t *bb_job, **bb_pjob;
1200
1201 xassert(state_ptr);
1202 bb_pjob = &state_ptr->bb_jhash[inx];
1203 bb_job = state_ptr->bb_jhash[inx];
1204 while (bb_job) {
1205 if (bb_job->job_id == job_id) {
1206 xassert(bb_job->magic == BB_JOB_MAGIC);
1207 bb_job->magic = 0;
1208 *bb_pjob = bb_job->next;
1209 _bb_job_del2(bb_job);
1210 return;
1211 }
1212 bb_pjob = &bb_job->next;
1213 bb_job = bb_job->next;
1214 }
1215 }
1216
1217 /* Delete a bb_job_t record. DOES NOT UNLINK FROM HASH TABLE */
_bb_job_del2(bb_job_t * bb_job)1218 static void _bb_job_del2(bb_job_t *bb_job)
1219 {
1220 int i;
1221
1222 if (bb_job) {
1223 (void) close(bb_job->memfd);
1224
1225 xfree(bb_job->account);
1226 for (i = 0; i < bb_job->buf_cnt; i++) {
1227 xfree(bb_job->buf_ptr[i].access);
1228 xfree(bb_job->buf_ptr[i].name);
1229 xfree(bb_job->buf_ptr[i].pool);
1230 xfree(bb_job->buf_ptr[i].type);
1231 }
1232 xfree(bb_job->buf_ptr);
1233 xfree(bb_job->job_pool);
1234 xfree(bb_job->memfd_path);
1235 xfree(bb_job->partition);
1236 xfree(bb_job->qos);
1237 xfree(bb_job);
1238 }
1239 }
1240
1241 /* Log the contents of a bb_job_t record using "info()" */
bb_job_log(bb_state_t * state_ptr,bb_job_t * bb_job)1242 extern void bb_job_log(bb_state_t *state_ptr, bb_job_t *bb_job)
1243 {
1244 bb_buf_t *buf_ptr;
1245 char *out_buf = NULL;
1246 int i;
1247
1248 if (bb_job) {
1249 xstrfmtcat(out_buf, "%s: JobId=%u UserID:%u ",
1250 state_ptr->name, bb_job->job_id, bb_job->user_id);
1251 xstrfmtcat(out_buf, "Swap:%ux%u ", bb_job->swap_size,
1252 bb_job->swap_nodes);
1253 xstrfmtcat(out_buf, "TotalSize:%"PRIu64"", bb_job->total_size);
1254 info("%s", out_buf);
1255 xfree(out_buf);
1256 for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
1257 i++, buf_ptr++) {
1258 if (buf_ptr->create) {
1259 info(" Create Name:%s Pool:%s Size:%"PRIu64
1260 " Access:%s Type:%s State:%s",
1261 buf_ptr->name, buf_ptr->pool,
1262 buf_ptr->size, buf_ptr->access,
1263 buf_ptr->type,
1264 bb_state_string(buf_ptr->state));
1265 } else if (buf_ptr->destroy) {
1266 info(" Destroy Name:%s Hurry:%d",
1267 buf_ptr->name, (int) buf_ptr->hurry);
1268 } else {
1269 info(" Use Name:%s", buf_ptr->name);
1270 }
1271 }
1272 }
1273 }
1274
1275 /* Make claim against resource limit for a user
1276 * user_id IN - Owner of burst buffer
1277 * bb_size IN - Size of burst buffer
1278 * pool IN - Pool containing the burst buffer
1279 * state_ptr IN - Global state to update
1280 * update_pool_unfree IN - If true, update the pool's unfree space */
bb_limit_add(uint32_t user_id,uint64_t bb_size,char * pool,bb_state_t * state_ptr,bool update_pool_unfree)1281 extern void bb_limit_add(uint32_t user_id, uint64_t bb_size, char *pool,
1282 bb_state_t *state_ptr, bool update_pool_unfree)
1283 {
1284 burst_buffer_pool_t *pool_ptr;
1285 bb_user_t *bb_user;
1286 int i;
1287
1288 /* Update the pool's used_space, plus unfree_space if needed */
1289 if (!pool || !xstrcmp(pool, state_ptr->bb_config.default_pool)) {
1290 state_ptr->used_space += bb_size;
1291 if (update_pool_unfree)
1292 state_ptr->unfree_space += bb_size;
1293 } else {
1294 pool_ptr = state_ptr->bb_config.pool_ptr;
1295 for (i = 0; i < state_ptr->bb_config.pool_cnt; i++, pool_ptr++){
1296 if (xstrcmp(pool, pool_ptr->name))
1297 continue;
1298 pool_ptr->used_space += bb_size;
1299 if (update_pool_unfree)
1300 pool_ptr->unfree_space += bb_size;
1301 break;
1302 }
1303 if (i >= state_ptr->bb_config.pool_cnt)
1304 error("%s: Unable to located pool %s", __func__, pool);
1305 }
1306
1307 /* Update user space used */
1308 bb_user = bb_find_user_rec(user_id, state_ptr);
1309 xassert(bb_user);
1310 bb_user->size += bb_size;
1311
1312 }
1313
1314 /* Release claim against resource limit for a user */
bb_limit_rem(uint32_t user_id,uint64_t bb_size,char * pool,bb_state_t * state_ptr)1315 extern void bb_limit_rem(uint32_t user_id, uint64_t bb_size, char *pool,
1316 bb_state_t *state_ptr)
1317 {
1318 burst_buffer_pool_t *pool_ptr;
1319 bb_user_t *bb_user;
1320 int i;
1321
1322 if (!pool || !xstrcmp(pool, state_ptr->bb_config.default_pool)) {
1323 if (state_ptr->used_space >= bb_size) {
1324 state_ptr->used_space -= bb_size;
1325 } else {
1326 error("%s: used_space underflow", __func__);
1327 state_ptr->used_space = 0;
1328 }
1329 if (state_ptr->unfree_space >= bb_size) {
1330 state_ptr->unfree_space -= bb_size;
1331 } else {
1332 /*
1333 * This will happen if we reload burst buffer state
1334 * after making a claim against resources, but before
1335 * the buffer actually gets created.
1336 */
1337 debug2("%s: unfree_space underflow (%"PRIu64" < %"PRIu64")",
1338 __func__, state_ptr->unfree_space, bb_size);
1339 state_ptr->unfree_space = 0;
1340 }
1341 } else {
1342 pool_ptr = state_ptr->bb_config.pool_ptr;
1343 for (i = 0; i < state_ptr->bb_config.pool_cnt; i++, pool_ptr++){
1344 if (xstrcmp(pool, pool_ptr->name))
1345 continue;
1346 if (pool_ptr->used_space >= bb_size) {
1347 pool_ptr->used_space -= bb_size;
1348 } else {
1349 error("%s: used_space underflow for pool %s",
1350 __func__, pool);
1351 pool_ptr->used_space = 0;
1352 }
1353 if (pool_ptr->unfree_space >= bb_size) {
1354 pool_ptr->unfree_space -= bb_size;
1355 } else {
1356 /*
1357 * This will happen if we reload burst buffer
1358 * state after making a claim against resources,
1359 * but before the buffer actually gets created.
1360 */
1361 debug2("%s: unfree_space underflow for pool %s",
1362 __func__, pool);
1363 pool_ptr->unfree_space = 0;
1364 }
1365 break;
1366 }
1367 if (i >= state_ptr->bb_config.pool_cnt)
1368 error("%s: Unable to located pool %s", __func__, pool);
1369 }
1370
1371 bb_user = bb_find_user_rec(user_id, state_ptr);
1372 xassert(bb_user);
1373 if (bb_user->size >= bb_size)
1374 bb_user->size -= bb_size;
1375 else {
1376 bb_user->size = 0;
1377 error("%s: user limit underflow for uid %u", __func__, user_id);
1378 }
1379
1380 }
1381
1382 /* Log creation of a persistent burst buffer in the database
1383 * job_ptr IN - Point to job that created, could be NULL at startup
1384 * bb_alloc IN - Pointer to persistent burst buffer state info
1385 * state_ptr IN - Pointer to burst_buffer plugin state info
1386 * NOTE: assoc_mgr association and qos read lock should be set before this.
1387 */
bb_post_persist_create(job_record_t * job_ptr,bb_alloc_t * bb_alloc,bb_state_t * state_ptr)1388 extern int bb_post_persist_create(job_record_t *job_ptr, bb_alloc_t *bb_alloc,
1389 bb_state_t *state_ptr)
1390 {
1391 int rc = SLURM_SUCCESS;
1392 slurmdb_reservation_rec_t resv;
1393 uint64_t size_mb;
1394
1395 if (!state_ptr->tres_id) {
1396 debug2("%s: Not tracking this TRES, "
1397 "not sending to the database.", __func__);
1398 return SLURM_SUCCESS;
1399 }
1400
1401 size_mb = (bb_alloc->size / (1024 * 1024));
1402
1403 memset(&resv, 0, sizeof(slurmdb_reservation_rec_t));
1404 resv.assocs = bb_alloc->assocs;
1405 resv.cluster = slurmctld_conf.cluster_name;
1406 resv.name = bb_alloc->name;
1407 resv.id = bb_alloc->id;
1408 resv.time_start = bb_alloc->create_time;
1409 xstrfmtcat(resv.tres_str, "%d=%"PRIu64, state_ptr->tres_id, size_mb);
1410 rc = acct_storage_g_add_reservation(acct_db_conn, &resv);
1411 xfree(resv.tres_str);
1412
1413 if (state_ptr->tres_pos > 0) {
1414 slurmdb_assoc_rec_t *assoc_ptr = bb_alloc->assoc_ptr;
1415
1416 while (assoc_ptr) {
1417 assoc_ptr->usage->grp_used_tres[state_ptr->tres_pos] +=
1418 size_mb;
1419 debug2("%s: after adding persistent bb %s(%u), "
1420 "assoc %u(%s/%s/%s) grp_used_tres(%s) "
1421 "is %"PRIu64,
1422 __func__, bb_alloc->name, bb_alloc->id,
1423 assoc_ptr->id, assoc_ptr->acct,
1424 assoc_ptr->user, assoc_ptr->partition,
1425 assoc_mgr_tres_name_array[state_ptr->tres_pos],
1426 assoc_ptr->usage->
1427 grp_used_tres[state_ptr->tres_pos]);
1428
1429 /* FIXME: should grp_used_tres_run_secs be
1430 * done some how? Same for QOS below.
1431 */
1432 /* debug2("%s: after adding persistent bb %s(%u), " */
1433 /* "assoc %u(%s/%s/%s) grp_used_tres_run_secs(%s) " */
1434 /* "is %"PRIu64, */
1435 /* __func__, bb_alloc->name, bb_alloc->id, */
1436 /* assoc_ptr->id, assoc_ptr->acct, */
1437 /* assoc_ptr->user, assoc_ptr->partition, */
1438 /* assoc_mgr_tres_name_array[state_ptr->tres_pos], */
1439 /* assoc_ptr->usage-> */
1440 /* grp_used_tres_run_secs[state_ptr->tres_pos]); */
1441 assoc_ptr = assoc_ptr->usage->parent_assoc_ptr;
1442 }
1443
1444 if (job_ptr && job_ptr->tres_alloc_cnt)
1445 job_ptr->tres_alloc_cnt[state_ptr->tres_pos] -= size_mb;
1446
1447 if (bb_alloc->qos_ptr) {
1448 bb_alloc->qos_ptr->usage->grp_used_tres[
1449 state_ptr->tres_pos] += size_mb;
1450 }
1451 }
1452
1453 return rc;
1454 }
1455
1456 /* Log deletion of a persistent burst buffer in the database */
bb_post_persist_delete(bb_alloc_t * bb_alloc,bb_state_t * state_ptr)1457 extern int bb_post_persist_delete(bb_alloc_t *bb_alloc, bb_state_t *state_ptr)
1458 {
1459 int rc = SLURM_SUCCESS;
1460 slurmdb_reservation_rec_t resv;
1461 uint64_t size_mb;
1462
1463 if (!state_ptr->tres_id) {
1464 debug2("%s: Not tracking this TRES, "
1465 "not sending to the database.", __func__);
1466 return SLURM_SUCCESS;
1467 }
1468
1469 size_mb = (bb_alloc->size / (1024 * 1024));
1470
1471 memset(&resv, 0, sizeof(slurmdb_reservation_rec_t));
1472 resv.assocs = bb_alloc->assocs;
1473 resv.cluster = slurmctld_conf.cluster_name;
1474 resv.name = bb_alloc->name;
1475 resv.id = bb_alloc->id;
1476 resv.time_end = time(NULL);
1477 resv.time_start = bb_alloc->create_time;
1478 xstrfmtcat(resv.tres_str, "%d=%"PRIu64, state_ptr->tres_id, size_mb);
1479
1480 rc = acct_storage_g_remove_reservation(acct_db_conn, &resv);
1481 xfree(resv.tres_str);
1482
1483 if (state_ptr->tres_pos > 0) {
1484 slurmdb_assoc_rec_t *assoc_ptr = bb_alloc->assoc_ptr;
1485
1486 while (assoc_ptr) {
1487 if (assoc_ptr->usage->grp_used_tres[state_ptr->tres_pos]
1488 >= size_mb) {
1489 assoc_ptr->usage->grp_used_tres[
1490 state_ptr->tres_pos] -= size_mb;
1491 debug2("%s: after removing persistent "
1492 "bb %s(%u), assoc %u(%s/%s/%s) "
1493 "grp_used_tres(%s) is %"PRIu64,
1494 __func__, bb_alloc->name, bb_alloc->id,
1495 assoc_ptr->id, assoc_ptr->acct,
1496 assoc_ptr->user, assoc_ptr->partition,
1497 assoc_mgr_tres_name_array[
1498 state_ptr->tres_pos],
1499 assoc_ptr->usage->
1500 grp_used_tres[state_ptr->tres_pos]);
1501 } else {
1502 error("%s: underflow removing persistent "
1503 "bb %s(%u), assoc %u(%s/%s/%s) "
1504 "grp_used_tres(%s) had %"PRIu64
1505 " but we are trying to remove %"PRIu64,
1506 __func__, bb_alloc->name, bb_alloc->id,
1507 assoc_ptr->id, assoc_ptr->acct,
1508 assoc_ptr->user, assoc_ptr->partition,
1509 assoc_mgr_tres_name_array[
1510 state_ptr->tres_pos],
1511 assoc_ptr->usage->
1512 grp_used_tres[state_ptr->tres_pos],
1513 size_mb);
1514 assoc_ptr->usage->grp_used_tres[
1515 state_ptr->tres_pos] = 0;
1516 }
1517
1518 /* FIXME: should grp_used_tres_run_secs be
1519 * done some how? Same for QOS below. */
1520 /* debug2("%s: after removing persistent bb %s(%u), " */
1521 /* "assoc %u(%s/%s/%s) grp_used_tres_run_secs(%s) " */
1522 /* "is %"PRIu64, */
1523 /* __func__, bb_alloc->name, bb_alloc->id, */
1524 /* assoc_ptr->id, assoc_ptr->acct, */
1525 /* assoc_ptr->user, assoc_ptr->partition, */
1526 /* assoc_mgr_tres_name_array[state_ptr->tres_pos], */
1527 /* assoc_ptr->usage-> */
1528 /* grp_used_tres_run_secs[state_ptr->tres_pos]); */
1529 assoc_ptr = assoc_ptr->usage->parent_assoc_ptr;
1530 }
1531
1532 if (bb_alloc->qos_ptr) {
1533 if (bb_alloc->qos_ptr->usage->grp_used_tres[
1534 state_ptr->tres_pos] >= size_mb)
1535 bb_alloc->qos_ptr->usage->grp_used_tres[
1536 state_ptr->tres_pos] -= size_mb;
1537 else
1538 bb_alloc->qos_ptr->usage->grp_used_tres[
1539 state_ptr->tres_pos] = 0;
1540 }
1541 }
1542
1543 return rc;
1544 }
1545
1546 /* Determine if the specified pool name is valid on this system */
bb_valid_pool_test(bb_state_t * state_ptr,char * pool_name)1547 extern bool bb_valid_pool_test(bb_state_t *state_ptr, char *pool_name)
1548 {
1549 burst_buffer_pool_t *pool_ptr;
1550 int i;
1551
1552 xassert(state_ptr);
1553 if (!pool_name)
1554 return true;
1555 if (!xstrcmp(pool_name, state_ptr->bb_config.default_pool))
1556 return true;
1557 pool_ptr = state_ptr->bb_config.pool_ptr;
1558 for (i = 0; i < state_ptr->bb_config.pool_cnt; i++, pool_ptr++) {
1559 if (!xstrcmp(pool_name, pool_ptr->name))
1560 return true;
1561 }
1562 info("%s: Invalid pool requested (%s)", __func__, pool_name);
1563
1564 return false;
1565 }
1566