1 /*****************************************************************************\
2 * burst_buffer_datawarp.c - Plugin for managing a Cray DataWarp burst_buffer
3 *****************************************************************************
4 * Copyright (C) 2014-2018 SchedMD LLC.
5 * Written by Morris Jette <jette@schedmd.com>
6 *
7 * This file is part of Slurm, a resource management program.
8 * For details, see <https://slurm.schedmd.com/>.
9 * Please also read the included file: DISCLAIMER.
10 *
11 * Slurm is free software; you can redistribute it and/or modify it under
12 * the terms of the GNU General Public License as published by the Free
13 * Software Foundation; either version 2 of the License, or (at your option)
14 * any later version.
15 *
16 * In addition, as a special exception, the copyright holders give permission
17 * to link the code of portions of this program with the OpenSSL library under
18 * certain conditions as described in each individual source file, and
19 * distribute linked combinations including the two. You must obey the GNU
20 * General Public License in all respects for all of the code used other than
21 * OpenSSL. If you modify file(s) with this exception, you may extend this
22 * exception to your version of the file(s), but you are not obligated to do
23 * so. If you do not wish to do so, delete this exception statement from your
24 * version. If you delete this exception statement from all source files in
25 * the program, then also delete it here.
26 *
27 * Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
28 * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
29 * FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
30 * details.
31 *
32 * You should have received a copy of the GNU General Public License along
33 * with Slurm; if not, write to the Free Software Foundation, Inc.,
34 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
35 \*****************************************************************************/
36
37 #include "config.h"
38
39 #define _GNU_SOURCE /* For POLLRDHUP */
40 #include <ctype.h>
41 #include <poll.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <sys/stat.h>
45 #include <sys/types.h>
46
47 #if HAVE_JSON_C_INC
48 # include <json-c/json.h>
49 #elif HAVE_JSON_INC
50 # include <json/json.h>
51 #endif
52
53 #include "slurm/slurm.h"
54
55 #include "src/common/assoc_mgr.h"
56 #include "src/common/bitstring.h"
57 #include "src/common/fd.h"
58 #include "src/common/list.h"
59 #include "src/common/macros.h"
60 #include "src/common/pack.h"
61 #include "src/common/parse_config.h"
62 #include "src/common/run_command.h"
63 #include "src/common/slurm_protocol_api.h"
64 #include "src/common/slurm_protocol_defs.h"
65 #include "src/common/timers.h"
66 #include "src/common/uid.h"
67 #include "src/common/xmalloc.h"
68 #include "src/common/xstring.h"
69 #include "src/slurmctld/agent.h"
70 #include "src/slurmctld/job_scheduler.h"
71 #include "src/slurmctld/locks.h"
72 #include "src/slurmctld/node_scheduler.h"
73 #include "src/slurmctld/reservation.h"
74 #include "src/slurmctld/slurmctld.h"
75 #include "src/slurmctld/state_save.h"
76 #include "src/slurmctld/trigger_mgr.h"
77 #include "src/plugins/burst_buffer/common/burst_buffer_common.h"
78
79 #define _DEBUG 0 /* Detailed debugging information */
80 #define TIME_SLOP 60 /* Time allowed to synchronize operations between
81 * threads */
82 #define MAX_RETRY_CNT 2 /* Hold job if "pre_run" operation fails more than
83 * 2 times */
84
85 /* Script line types */
86 #define LINE_OTHER 0
87 #define LINE_BB 1
88 #define LINE_DW 2
89
90 /*
91 * These variables are required by the burst buffer plugin interface. If they
92 * are not found in the plugin, the plugin loader will ignore it.
93 *
94 * plugin_name - a string giving a human-readable description of the
95 * plugin. There is no maximum length, but the symbol must refer to
96 * a valid string.
97 *
98 * plugin_type - a string suggesting the type of the plugin or its
99 * applicability to a particular form of data or method of data handling.
100 * If the low-level plugin API is used, the contents of this string are
101 * unimportant and may be anything. Slurm uses the higher-level plugin
102 * interface which requires this string to be of the form
103 *
104 * <application>/<method>
105 *
106 * where <application> is a description of the intended application of
107 * the plugin (e.g., "burst_buffer" for Slurm burst_buffer) and <method> is a
108 * description of how this plugin satisfies that application. Slurm will only
109 * load a burst_buffer plugin if the plugin_type string has a prefix of
110 * "burst_buffer/".
111 *
112 * plugin_version - an unsigned 32-bit integer containing the Slurm version
113 * (major.minor.micro combined into a single number).
114 */
115 const char plugin_name[] = "burst_buffer datawarp plugin";
116 const char plugin_type[] = "burst_buffer/datawarp";
117 const uint32_t plugin_version = SLURM_VERSION_NUMBER;
118
119 /* Most state information is in a common structure so that we can more
120 * easily use common functions from multiple burst buffer plugins */
121 static bb_state_t bb_state;
122 static uint32_t last_persistent_id = 1;
123 static char * state_save_loc = NULL;
124
125 /* These are defined here so when we link with something other than
126 * the slurmctld we will have these symbols defined. They will get
127 * overwritten when linking with the slurmctld.
128 */
129 #if defined (__APPLE__)
130 extern uint16_t accounting_enforce __attribute__((weak_import));
131 extern void *acct_db_conn __attribute__((weak_import));
132 #else
133 uint16_t accounting_enforce = 0;
134 void *acct_db_conn = NULL;
135 #endif
136
137
138 /* Description of each Cray DW configuration entry
139 */
140 typedef struct bb_configs {
141 uint32_t id;
142 uint32_t instance;
143 } bb_configs_t;
144
145 /* Description of each Cray DW instance entry, including persistent buffers
146 */
147 typedef struct bb_instances {
148 uint32_t id;
149 uint64_t bytes;
150 uint32_t session;
151 } bb_instances_t;
152
153 /* Description of each Cray DW pool entry
154 */
155 typedef struct bb_pools {
156 char *id;
157 char *units;
158 uint64_t granularity;
159 uint64_t quantity;
160 uint64_t free;
161 } bb_pools_t;
162
163 /* Description of each Cray DW pool entry
164 */
165 typedef struct bb_sessions {
166 uint32_t created;
167 uint32_t id;
168 char *token;
169 bool used;
170 uint32_t user_id;
171 } bb_sessions_t;
172
173 typedef struct {
174 char **args;
175 uint32_t job_id;
176 uint32_t timeout;
177 uint32_t user_id;
178 } pre_run_args_t;
179
180 typedef struct {
181 char **args1;
182 char **args2;
183 uint64_t bb_size;
184 uint32_t job_id;
185 char *pool;
186 uint32_t user_id;
187 } stage_args_t;
188
189 typedef struct create_buf_data {
190 char *access; /* Access mode */
191 bool hurry; /* Set to destroy in a hurry (no stage-out) */
192 uint32_t job_id; /* Job ID to use */
193 char *job_script; /* Path to job script */
194 char *name; /* Name of the persistent burst buffer */
195 char *pool; /* Name of pool in which to create the buffer */
196 uint64_t size; /* Size in bytes */
197 char *type; /* Access type */
198 uint32_t user_id;
199 } create_buf_data_t;
200
201 #define BB_UNITS_BYTES 1
202 struct bb_total_size {
203 int units;
204 uint64_t capacity;
205 };
206
207 static void _add_bb_to_script(char **script_body,
208 const char *burst_buffer_file);
209 static int _alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
210 bool job_ready);
211 static void _apply_limits(void);
212 static void * _bb_agent(void *args);
213 static void _bb_free_configs(bb_configs_t *ents, int num_ent);
214 static void _bb_free_instances(bb_instances_t *ents, int num_ent);
215 static void _bb_free_pools(bb_pools_t *ents, int num_ent);
216 static void _bb_free_sessions(bb_sessions_t *ents, int num_ent);
217 static bb_configs_t *_bb_get_configs(int *num_ent, bb_state_t *state_ptr,
218 uint32_t timeout);
219 static bb_instances_t *_bb_get_instances(int *num_ent, bb_state_t *state_ptr,
220 uint32_t timeout);
221 static bb_pools_t *_bb_get_pools(int *num_ent, bb_state_t *state_ptr,
222 uint32_t timeout);
223 static bb_sessions_t *_bb_get_sessions(int *num_ent, bb_state_t *state_ptr,
224 uint32_t timeout);
225 static int _build_bb_script(job_record_t *job_ptr, char *script_file);
226 static int _create_bufs(job_record_t *job_ptr, bb_job_t *bb_job,
227 bool job_ready);
228 static void * _create_persistent(void *x);
229 static void * _destroy_persistent(void *x);
230 static void _free_create_args(create_buf_data_t *create_args);
231 static bb_job_t *_get_bb_job(job_record_t *job_ptr);
232 static bool _have_dw_cmd_opts(bb_job_t *bb_job);
233 static void _job_queue_del(void *x);
234 static bb_configs_t *_json_parse_configs_array(json_object *jobj, char *key,
235 int *num);
236 static bb_instances_t *_json_parse_instances_array(json_object *jobj, char *key,
237 int *num);
238 static struct bb_pools *_json_parse_pools_array(json_object *jobj, char *key,
239 int *num);
240 static struct bb_sessions *_json_parse_sessions_array(json_object *jobj,
241 char *key, int *num);
242 static void _json_parse_configs_object(json_object *jobj,
243 bb_configs_t *ent);
244 static void _json_parse_instances_object(json_object *jobj,
245 bb_instances_t *ent);
246 static void _json_parse_pools_object(json_object *jobj, bb_pools_t *ent);
247 static void _json_parse_sessions_object(json_object *jobj,
248 bb_sessions_t *ent);
249 static struct bb_total_size *_json_parse_real_size(json_object *j);
250 static void _log_script_argv(char **script_argv, char *resp_msg);
251 static void _load_state(bool init_config);
252 static int _open_part_state_file(char **state_file);
253 static int _parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
254 uid_t submit_uid);
255 static void _parse_config_links(json_object *instance, bb_configs_t *ent);
256 static void _parse_instance_capacity(json_object *instance,
257 bb_instances_t *ent);
258 static void _parse_instance_links(json_object *instance,
259 bb_instances_t *ent);
260 static void _pick_alloc_account(bb_alloc_t *bb_alloc);
261 static void _purge_bb_files(uint32_t job_id, job_record_t *job_ptr);
262 static void _purge_vestigial_bufs(void);
263 static void _python2json(char *buf);
264 static void _recover_bb_state(void);
265 static int _queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job);
266 static int _queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job);
267 static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry);
268 static void _reset_buf_state(uint32_t user_id, uint32_t job_id, char *name,
269 int new_state, uint64_t buf_size);
270 static void _save_bb_state(void);
271 static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc);
272 static void * _start_pre_run(void *x);
273 static void * _start_stage_in(void *x);
274 static void * _start_stage_out(void *x);
275 static void * _start_teardown(void *x);
276 static void _test_config(void);
277 static bool _test_persistent_use_ready(bb_job_t *bb_job,
278 job_record_t *job_ptr);
279 static int _test_size_limit(job_record_t *job_ptr, bb_job_t *bb_job);
280 static void _timeout_bb_rec(void);
281 static int _write_file(char *file_name, char *buf);
282 static int _write_nid_file(char *file_name, char *node_list,
283 job_record_t *job_ptr);
284 static int _xlate_batch(job_desc_msg_t *job_desc);
285 static int _xlate_interactive(job_desc_msg_t *job_desc);
286
287 /* Convert a Python string to real JSON format. Specifically replace single
288 * quotes with double quotes and strip leading "u" before the single quotes.
289 * See: https://github.com/stedolan/jq/issues/312 */
_python2json(char * buf)290 static void _python2json(char *buf)
291 {
292 bool quoted = false;
293 int i, o;
294
295 if (!buf)
296 return;
297 for (i = 0, o = 0; ; i++) {
298 if (buf[i] == '\'') {
299 buf[o++] = '\"';
300 quoted = !quoted;
301 } else if ((buf[i] == 'u') && (buf[i+1] == '\'') && !quoted) {
302 /* Skip over unicode flag */
303 } else {
304 buf[o++] = buf[i];
305 if (buf[i] == '\0')
306 break;
307 }
308 }
309 }
310
311 /* Log a command's arguments. */
_log_script_argv(char ** script_argv,char * resp_msg)312 static void _log_script_argv(char **script_argv, char *resp_msg)
313 {
314 char *cmd_line = NULL;
315 int i;
316
317 if (!bb_state.bb_config.debug_flag)
318 return;
319
320 for (i = 0; script_argv[i]; i++) {
321 if (i)
322 xstrcat(cmd_line, " ");
323 xstrcat(cmd_line, script_argv[i]);
324 }
325 info("%s", cmd_line);
326 if (resp_msg && resp_msg[0])
327 info("%s", resp_msg);
328 xfree(cmd_line);
329 }
330
_job_queue_del(void * x)331 static void _job_queue_del(void *x)
332 {
333 bb_job_queue_rec_t *job_rec = (bb_job_queue_rec_t *) x;
334 if (job_rec) {
335 xfree(job_rec);
336 }
337 }
338
339 /* Purge files we have created for the job.
340 * bb_state.bb_mutex is locked on function entry.
341 * job_ptr may be NULL if not found */
_purge_bb_files(uint32_t job_id,job_record_t * job_ptr)342 static void _purge_bb_files(uint32_t job_id, job_record_t *job_ptr)
343
344 {
345 char *hash_dir = NULL, *job_dir = NULL;
346 char *script_file = NULL, *path_file = NULL, *client_nids_file = NULL;
347 char *exec_host_file = NULL;
348 int hash_inx;
349
350 hash_inx = job_id % 10;
351 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
352 (void) mkdir(hash_dir, 0700);
353 xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_id);
354 (void) mkdir(job_dir, 0700);
355
356 xstrfmtcat(client_nids_file, "%s/client_nids", job_dir);
357 (void) unlink(client_nids_file);
358 xfree(client_nids_file);
359
360 xstrfmtcat(exec_host_file, "%s/exec_host", job_dir);
361 (void) unlink(exec_host_file);
362 xfree(exec_host_file);
363
364 xstrfmtcat(path_file, "%s/pathfile", job_dir);
365 (void) unlink(path_file);
366 xfree(path_file);
367
368 if (!job_ptr || (job_ptr->batch_flag == 0)) {
369 xstrfmtcat(script_file, "%s/script", job_dir);
370 (void) unlink(script_file);
371 xfree(script_file);
372 }
373
374 (void) unlink(job_dir);
375 xfree(job_dir);
376 xfree(hash_dir);
377 }
378
379 /* Validate that our configuration is valid for this plugin type */
_test_config(void)380 static void _test_config(void)
381 {
382 if (!bb_state.bb_config.get_sys_state) {
383 debug("%s: %s: GetSysState is NULL", plugin_type, __func__);
384 bb_state.bb_config.get_sys_state =
385 xstrdup("/opt/cray/dw_wlm/default/bin/dw_wlm_cli");
386 }
387 if (!bb_state.bb_config.get_sys_status) {
388 debug("%s: %s: GetSysStatus is NULL", plugin_type, __func__);
389 bb_state.bb_config.get_sys_status =
390 xstrdup("/opt/cray/dws/default/bin/dwstat");
391 }
392 }
393
394 /* Allocate resources to a job and begin setup/stage-in */
_alloc_job_bb(job_record_t * job_ptr,bb_job_t * bb_job,bool job_ready)395 static int _alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
396 bool job_ready)
397 {
398 int rc = SLURM_SUCCESS;
399
400 if (bb_state.bb_config.debug_flag) {
401 info("%s: %s: start job allocate %pJ",
402 plugin_type, __func__, job_ptr);
403 }
404
405 if (bb_job->buf_cnt &&
406 (_create_bufs(job_ptr, bb_job, job_ready) > 0))
407 return EAGAIN;
408
409 if (bb_job->state < BB_STATE_STAGING_IN) {
410 bb_job->state = BB_STATE_STAGING_IN;
411 rc = _queue_stage_in(job_ptr, bb_job);
412 if (rc != SLURM_SUCCESS) {
413 bb_job->state = BB_STATE_TEARDOWN;
414 _queue_teardown(job_ptr->job_id, job_ptr->user_id,
415 true);
416 }
417 }
418
419 return rc;
420 }
421
422 /* Perform periodic background activities */
_bb_agent(void * args)423 static void *_bb_agent(void *args)
424 {
425 /* Locks: write job */
426 slurmctld_lock_t job_write_lock = {
427 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
428
429 while (!bb_state.term_flag) {
430 bb_sleep(&bb_state, AGENT_INTERVAL);
431 if (!bb_state.term_flag) {
432 _load_state(false); /* Has own locking */
433 lock_slurmctld(job_write_lock);
434 slurm_mutex_lock(&bb_state.bb_mutex);
435 _timeout_bb_rec();
436 slurm_mutex_unlock(&bb_state.bb_mutex);
437 unlock_slurmctld(job_write_lock);
438 }
439 _save_bb_state(); /* Has own locks excluding file write */
440 }
441
442 return NULL;
443 }
444
445 /* Given a request size and a pool name (or NULL name for default pool),
446 * return the required buffer size (rounded up by granularity) */
_set_granularity(uint64_t orig_size,char * bb_pool)447 static uint64_t _set_granularity(uint64_t orig_size, char *bb_pool)
448 {
449 burst_buffer_pool_t *pool_ptr;
450 uint64_t new_size;
451 int i;
452
453 if (!bb_pool || !xstrcmp(bb_pool, bb_state.bb_config.default_pool)) {
454 new_size = bb_granularity(orig_size,
455 bb_state.bb_config.granularity);
456 return new_size;
457 }
458
459 for (i = 0, pool_ptr = bb_state.bb_config.pool_ptr;
460 i < bb_state.bb_config.pool_cnt; i++, pool_ptr++) {
461 if (!xstrcmp(bb_pool, pool_ptr->name)) {
462 new_size = bb_granularity(orig_size,
463 pool_ptr->granularity);
464 return new_size;
465 }
466 }
467 debug("Could not find pool %s", bb_pool);
468 return orig_size;
469 }
470
471 /* Return the burst buffer size specification of a job
472 * RET size data structure or NULL of none found
473 * NOTE: delete return value using _del_bb_size() */
_get_bb_job(job_record_t * job_ptr)474 static bb_job_t *_get_bb_job(job_record_t *job_ptr)
475 {
476 char *bb_specs, *bb_hurry, *bb_name, *bb_type, *bb_access, *bb_pool;
477 char *end_ptr = NULL, *save_ptr = NULL, *sub_tok, *tok;
478 bool have_bb = false;
479 uint64_t tmp_cnt;
480 int inx;
481 bb_job_t *bb_job;
482
483 if ((job_ptr->burst_buffer == NULL) ||
484 (job_ptr->burst_buffer[0] == '\0'))
485 return NULL;
486
487 if ((bb_job = bb_job_find(&bb_state, job_ptr->job_id)))
488 return bb_job; /* Cached data */
489
490 bb_job = bb_job_alloc(&bb_state, job_ptr->job_id);
491 bb_job->account = xstrdup(job_ptr->account);
492 if (job_ptr->part_ptr)
493 bb_job->partition = xstrdup(job_ptr->part_ptr->name);
494 if (job_ptr->qos_ptr)
495 bb_job->qos = xstrdup(job_ptr->qos_ptr->name);
496 bb_job->state = BB_STATE_PENDING;
497 bb_job->user_id = job_ptr->user_id;
498 bb_specs = xstrdup(job_ptr->burst_buffer);
499 tok = strtok_r(bb_specs, "\n", &save_ptr);
500 while (tok) {
501 uint32_t bb_flag = 0;
502 if (tok[0] != '#') {
503 tok = strtok_r(NULL, "\n", &save_ptr);
504 continue;
505 }
506 if ((tok[1] == 'B') && (tok[2] == 'B'))
507 bb_flag = BB_FLAG_BB_OP;
508 else if ((tok[1] == 'D') && (tok[2] == 'W'))
509 bb_flag = BB_FLAG_DW_OP;
510
511 /*
512 * Effective Slurm v18.08 and CLE6.0UP06 the create_persistent
513 * and destroy_persistent functions are directly supported by
514 * dw_wlm_cli. Support "#BB" format for backward compatibility.
515 */
516 if (bb_flag != 0) {
517 tok += 3;
518 while (isspace(tok[0]))
519 tok++;
520 }
521
522 /*
523 * Is % symbol replacement required? Only done on "#DW" / "#BB"
524 * lines.
525 */
526 if (bb_flag && strchr(tok, (int) '%'))
527 bb_job->need_symbol_replacement = true;
528
529 if (bb_flag == BB_FLAG_BB_OP) {
530 if (!xstrncmp(tok, "create_persistent", 17)) {
531 have_bb = true;
532 bb_access = NULL;
533 bb_name = NULL;
534 bb_pool = NULL;
535 bb_type = NULL;
536 if ((sub_tok = strstr(tok, "access_mode="))) {
537 bb_access = xstrdup(sub_tok + 12);
538 sub_tok = strchr(bb_access, ' ');
539 if (sub_tok)
540 sub_tok[0] = '\0';
541 } else if ((sub_tok = strstr(tok, "access="))) {
542 bb_access = xstrdup(sub_tok + 7);
543 sub_tok = strchr(bb_access, ' ');
544 if (sub_tok)
545 sub_tok[0] = '\0';
546 }
547 if ((sub_tok = strstr(tok, "capacity="))) {
548 tmp_cnt = bb_get_size_num(sub_tok+9, 1);
549 } else {
550 tmp_cnt = 0;
551 }
552 if ((sub_tok = strstr(tok, "name="))) {
553 bb_name = xstrdup(sub_tok + 5);
554 sub_tok = strchr(bb_name, ' ');
555 if (sub_tok)
556 sub_tok[0] = '\0';
557 }
558 if ((sub_tok = strstr(tok, "pool="))) {
559 bb_pool = xstrdup(sub_tok + 5);
560 sub_tok = strchr(bb_pool, ' ');
561 if (sub_tok)
562 sub_tok[0] = '\0';
563 } else {
564 bb_pool = xstrdup(
565 bb_state.bb_config.default_pool);
566 }
567 if ((sub_tok = strstr(tok, "type="))) {
568 bb_type = xstrdup(sub_tok + 5);
569 sub_tok = strchr(bb_type, ' ');
570 if (sub_tok)
571 sub_tok[0] = '\0';
572 }
573 inx = bb_job->buf_cnt++;
574 bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
575 sizeof(bb_buf_t) *
576 bb_job->buf_cnt);
577 bb_job->buf_ptr[inx].access = bb_access;
578 bb_job->buf_ptr[inx].create = true;
579 bb_job->buf_ptr[inx].flags = bb_flag;
580 //bb_job->buf_ptr[inx].hurry = false;
581 bb_job->buf_ptr[inx].name = bb_name;
582 bb_job->buf_ptr[inx].pool = bb_pool;
583 tmp_cnt = _set_granularity(tmp_cnt, bb_pool);
584 bb_job->buf_ptr[inx].size = tmp_cnt;
585 bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
586 bb_job->buf_ptr[inx].type = bb_type;
587 //bb_job->buf_ptr[inx].use = false;
588 bb_job->persist_add += tmp_cnt;
589 } else if (!xstrncmp(tok, "destroy_persistent", 18)) {
590 have_bb = true;
591 bb_name = NULL;
592 if ((sub_tok = strstr(tok, "name="))) {
593 bb_name = xstrdup(sub_tok + 5);
594 sub_tok = strchr(bb_name, ' ');
595 if (sub_tok)
596 sub_tok[0] = '\0';
597 }
598 /* if ((sub_tok = strstr(tok, "type="))) { */
599 /* bb_type = xstrdup(sub_tok + 5); */
600 /* sub_tok = strchr(bb_type, ' '); */
601 /* if (sub_tok) */
602 /* sub_tok[0] = '\0'; */
603 /* } */
604 bb_hurry = strstr(tok, "hurry");
605 inx = bb_job->buf_cnt++;
606 bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
607 sizeof(bb_buf_t) *
608 bb_job->buf_cnt);
609 //bb_job->buf_ptr[inx].access = NULL;
610 //bb_job->buf_ptr[inx].create = false;
611 bb_job->buf_ptr[inx].destroy = true;
612 bb_job->buf_ptr[inx].flags = bb_flag;
613 bb_job->buf_ptr[inx].hurry = (bb_hurry != NULL);
614 bb_job->buf_ptr[inx].name = bb_name;
615 //bb_job->buf_ptr[inx].pool = NULL;
616 //bb_job->buf_ptr[inx].size = 0;
617 bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
618 //bb_job->buf_ptr[inx].type = NULL;
619 //bb_job->buf_ptr[inx].use = false;
620 } else {
621 /* Ignore other (future) options */
622 }
623 }
624 if (bb_flag == BB_FLAG_DW_OP) {
625 if (!xstrncmp(tok, "jobdw", 5)) {
626 have_bb = true;
627 if ((sub_tok = strstr(tok, "capacity="))) {
628 tmp_cnt = bb_get_size_num(sub_tok+9, 1);
629 } else {
630 tmp_cnt = 0;
631 }
632 if ((sub_tok = strstr(tok, "pool="))) {
633 xfree(bb_job->job_pool);
634 bb_job->job_pool = xstrdup(sub_tok + 5);
635 sub_tok = strchr(bb_job->job_pool, ' ');
636 if (sub_tok)
637 sub_tok[0] = '\0';
638 } else {
639 bb_job->job_pool = xstrdup(
640 bb_state.bb_config.default_pool);
641 }
642 tmp_cnt = _set_granularity(tmp_cnt,
643 bb_job->job_pool);
644 bb_job->req_size += tmp_cnt;
645 bb_job->total_size += tmp_cnt;
646 bb_job->use_job_buf = true;
647 } else if (!xstrncmp(tok, "persistentdw", 12)) {
648 /* Persistent buffer use */
649 have_bb = true;
650 bb_name = NULL;
651 if ((sub_tok = strstr(tok, "name="))) {
652 bb_name = xstrdup(sub_tok + 5);
653 sub_tok = strchr(bb_name, ' ');
654 if (sub_tok)
655 sub_tok[0] = '\0';
656 }
657 inx = bb_job->buf_cnt++;
658 bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
659 sizeof(bb_buf_t) *
660 bb_job->buf_cnt);
661 //bb_job->buf_ptr[inx].access = NULL;
662 //bb_job->buf_ptr[inx].create = false;
663 //bb_job->buf_ptr[inx].destroy = false;
664 //bb_job->buf_ptr[inx].hurry = false;
665 bb_job->buf_ptr[inx].name = bb_name;
666 //bb_job->buf_ptr[inx].size = 0;
667 bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
668 //bb_job->buf_ptr[inx].type = NULL;
669 bb_job->buf_ptr[inx].use = true;
670 } else if (!xstrncmp(tok, "swap", 4)) {
671 have_bb = true;
672 tok += 4;
673 while (isspace(tok[0]))
674 tok++;
675 bb_job->swap_size = strtol(tok, &end_ptr, 10);
676 if (job_ptr->details &&
677 job_ptr->details->max_nodes) {
678 bb_job->swap_nodes =
679 job_ptr->details->max_nodes;
680 } else if (job_ptr->details) {
681 bb_job->swap_nodes =
682 job_ptr->details->min_nodes;
683 } else {
684 bb_job->swap_nodes = 1;
685 }
686 tmp_cnt = (uint64_t) bb_job->swap_size *
687 bb_job->swap_nodes;
688 if ((sub_tok = strstr(tok, "pool="))) {
689 xfree(bb_job->job_pool);
690 bb_job->job_pool = xstrdup(sub_tok + 5);
691 sub_tok = strchr(bb_job->job_pool, ' ');
692 if (sub_tok)
693 sub_tok[0] = '\0';
694 } else if (!bb_job->job_pool) {
695 bb_job->job_pool = xstrdup(
696 bb_state.bb_config.default_pool);
697 }
698 tmp_cnt = _set_granularity(tmp_cnt,
699 bb_job->job_pool);
700 bb_job->req_size += tmp_cnt;
701 bb_job->total_size += tmp_cnt;
702 bb_job->use_job_buf = true;
703 } else {
704 /* Ignore stage-in, stage-out, etc. */
705 }
706 }
707 tok = strtok_r(NULL, "\n", &save_ptr);
708 }
709 xfree(bb_specs);
710
711 if (!have_bb) {
712 xfree(job_ptr->state_desc);
713 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
714 xstrfmtcat(job_ptr->state_desc,
715 "%s: Invalid burst buffer spec (%s)",
716 plugin_type, job_ptr->burst_buffer);
717 job_ptr->priority = 0;
718 info("Invalid burst buffer spec for %pJ (%s)",
719 job_ptr, job_ptr->burst_buffer);
720 bb_job_del(&bb_state, job_ptr->job_id);
721 return NULL;
722 }
723 if (!bb_job->job_pool)
724 bb_job->job_pool = xstrdup(bb_state.bb_config.default_pool);
725 if (bb_state.bb_config.debug_flag)
726 bb_job_log(&bb_state, bb_job);
727 return bb_job;
728 }
729
730 /* At slurmctld start up time, for every currently active burst buffer,
731 * update that user's limit. Also log every recovered buffer */
_apply_limits(void)732 static void _apply_limits(void)
733 {
734 bool emulate_cray = false;
735 bb_alloc_t *bb_alloc;
736 int i;
737
738 if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
739 emulate_cray = true;
740
741 for (i = 0; i < BB_HASH_SIZE; i++) {
742 bb_alloc = bb_state.bb_ahash[i];
743 while (bb_alloc) {
744 info("Recovered buffer Name:%s User:%u Pool:%s Size:%"PRIu64,
745 bb_alloc->name, bb_alloc->user_id,
746 bb_alloc->pool, bb_alloc->size);
747 _set_assoc_mgr_ptrs(bb_alloc);
748 bb_limit_add(bb_alloc->user_id, bb_alloc->size,
749 bb_alloc->pool, &bb_state, emulate_cray);
750 bb_alloc = bb_alloc->next;
751 }
752 }
753 }
754
755 /* Write current burst buffer state to a file so that we can preserve account,
756 * partition, and QOS information of persistent burst buffers as there is no
757 * place to store that information within the DataWarp data structures */
_save_bb_state(void)758 static void _save_bb_state(void)
759 {
760 static time_t last_save_time = 0;
761 static int high_buffer_size = 16 * 1024;
762 time_t save_time = time(NULL);
763 bb_alloc_t *bb_alloc;
764 uint32_t rec_count = 0;
765 Buf buffer;
766 char *old_file = NULL, *new_file = NULL, *reg_file = NULL;
767 int i, count_offset, offset, state_fd;
768 int error_code = 0;
769 uint16_t protocol_version = SLURM_PROTOCOL_VERSION;
770
771 if ((bb_state.last_update_time <= last_save_time) &&
772 !bb_state.term_flag)
773 return;
774
775 /* Build buffer with name/account/partition/qos information for all
776 * named burst buffers so we can preserve limits across restarts */
777 buffer = init_buf(high_buffer_size);
778 pack16(protocol_version, buffer);
779 count_offset = get_buf_offset(buffer);
780 pack32(rec_count, buffer);
781 if (bb_state.bb_ahash) {
782 slurm_mutex_lock(&bb_state.bb_mutex);
783 for (i = 0; i < BB_HASH_SIZE; i++) {
784 bb_alloc = bb_state.bb_ahash[i];
785 while (bb_alloc) {
786 if (bb_alloc->name) {
787 packstr(bb_alloc->account, buffer);
788 pack_time(bb_alloc->create_time,buffer);
789 pack32(bb_alloc->id, buffer);
790 packstr(bb_alloc->name, buffer);
791 packstr(bb_alloc->partition, buffer);
792 packstr(bb_alloc->pool, buffer);
793 packstr(bb_alloc->qos, buffer);
794 pack32(bb_alloc->user_id, buffer);
795 if (bb_state.bb_config.flags &
796 BB_FLAG_EMULATE_CRAY)
797 pack64(bb_alloc->size, buffer);
798 rec_count++;
799 }
800 bb_alloc = bb_alloc->next;
801 }
802 }
803 save_time = time(NULL);
804 slurm_mutex_unlock(&bb_state.bb_mutex);
805 offset = get_buf_offset(buffer);
806 set_buf_offset(buffer, count_offset);
807 pack32(rec_count, buffer);
808 set_buf_offset(buffer, offset);
809 }
810
811 xstrfmtcat(old_file, "%s/%s", slurmctld_conf.state_save_location,
812 "burst_buffer_cray_state.old");
813 xstrfmtcat(reg_file, "%s/%s", slurmctld_conf.state_save_location,
814 "burst_buffer_cray_state");
815 xstrfmtcat(new_file, "%s/%s", slurmctld_conf.state_save_location,
816 "burst_buffer_cray_state.new");
817
818 state_fd = creat(new_file, 0600);
819 if (state_fd < 0) {
820 error("%s: %s: Can't save state, error creating file %s, %m",
821 plugin_type, __func__, new_file);
822 error_code = errno;
823 } else {
824 int pos = 0, nwrite = get_buf_offset(buffer), amount, rc;
825 char *data = (char *)get_buf_data(buffer);
826 high_buffer_size = MAX(nwrite, high_buffer_size);
827 while (nwrite > 0) {
828 amount = write(state_fd, &data[pos], nwrite);
829 if ((amount < 0) && (errno != EINTR)) {
830 error("Error writing file %s, %m", new_file);
831 break;
832 }
833 nwrite -= amount;
834 pos += amount;
835 }
836
837 rc = fsync_and_close(state_fd, "burst_buffer_cray");
838 if (rc && !error_code)
839 error_code = rc;
840 }
841 if (error_code)
842 (void) unlink(new_file);
843 else { /* file shuffle */
844 last_save_time = save_time;
845 (void) unlink(old_file);
846 if (link(reg_file, old_file)) {
847 debug4("unable to create link for %s -> %s: %m",
848 reg_file, old_file);
849 }
850 (void) unlink(reg_file);
851 if (link(new_file, reg_file)) {
852 debug4("unable to create link for %s -> %s: %m",
853 new_file, reg_file);
854 }
855 (void) unlink(new_file);
856 }
857 xfree(old_file);
858 xfree(reg_file);
859 xfree(new_file);
860 free_buf(buffer);
861 }
862
863 /* Open the partition state save file, or backup if necessary.
864 * state_file IN - the name of the state save file used
865 * RET the file description to read from or error code
866 */
_open_part_state_file(char ** state_file)867 static int _open_part_state_file(char **state_file)
868 {
869 int state_fd;
870 struct stat stat_buf;
871
872 *state_file = xstrdup(slurmctld_conf.state_save_location);
873 xstrcat(*state_file, "/burst_buffer_cray_state");
874 state_fd = open(*state_file, O_RDONLY);
875 if (state_fd < 0) {
876 error("Could not open burst buffer state file %s: %m",
877 *state_file);
878 } else if (fstat(state_fd, &stat_buf) < 0) {
879 error("Could not stat burst buffer state file %s: %m",
880 *state_file);
881 (void) close(state_fd);
882 } else if (stat_buf.st_size < 4) {
883 error("Burst buffer state file %s too small", *state_file);
884 (void) close(state_fd);
885 } else /* Success */
886 return state_fd;
887
888 error("NOTE: Trying backup burst buffer state save file. "
889 "Information may be lost!");
890 xstrcat(*state_file, ".old");
891 state_fd = open(*state_file, O_RDONLY);
892 return state_fd;
893 }
894
895 /* Return true if the burst buffer name is that of a job (i.e. numeric) and
896 * and that job is complete. Otherwise return false. */
_is_complete_job(char * name)897 static bool _is_complete_job(char *name)
898 {
899 char *end_ptr = NULL;
900 uint32_t job_id = 0;
901 job_record_t *job_ptr;
902
903 if (name && (name[0] >='0') && (name[0] <='9')) {
904 job_id = strtol(name, &end_ptr, 10);
905 job_ptr = find_job_record(job_id);
906 if (!job_ptr || IS_JOB_COMPLETED(job_ptr))
907 return true;
908 }
909 return false;
910 }
911
912 /* Recover saved burst buffer state and use it to preserve account, partition,
913 * and QOS information for persistent burst buffers. */
_recover_bb_state(void)914 static void _recover_bb_state(void)
915 {
916 char *state_file = NULL, *data = NULL;
917 int data_allocated, data_read = 0;
918 uint16_t protocol_version = NO_VAL16;
919 uint32_t data_size = 0, rec_count = 0, name_len = 0;
920 uint32_t id = 0, user_id = 0;
921 uint64_t size = 0;
922 int i, state_fd;
923 char *account = NULL, *name = NULL;
924 char *partition = NULL, *pool = NULL, *qos = NULL;
925 char *end_ptr = NULL;
926 time_t create_time = 0;
927 bb_alloc_t *bb_alloc;
928 Buf buffer;
929
930 state_fd = _open_part_state_file(&state_file);
931 if (state_fd < 0) {
932 info("No burst buffer state file (%s) to recover",
933 state_file);
934 xfree(state_file);
935 return;
936 }
937 data_allocated = BUF_SIZE;
938 data = xmalloc(data_allocated);
939 while (1) {
940 data_read = read(state_fd, &data[data_size], BUF_SIZE);
941 if (data_read < 0) {
942 if (errno == EINTR)
943 continue;
944 else {
945 error("Read error on %s: %m", state_file);
946 break;
947 }
948 } else if (data_read == 0) /* eof */
949 break;
950 data_size += data_read;
951 data_allocated += data_read;
952 xrealloc(data, data_allocated);
953 }
954 close(state_fd);
955 xfree(state_file);
956
957 buffer = create_buf(data, data_size);
958 safe_unpack16(&protocol_version, buffer);
959 if (protocol_version == NO_VAL16) {
960 if (!ignore_state_errors)
961 fatal("Can not recover burst_buffer/datawarp state, data version incompatible, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
962 error("**********************************************************************");
963 error("Can not recover burst_buffer/datawarp state, data version incompatible");
964 error("**********************************************************************");
965 return;
966 }
967
968 safe_unpack32(&rec_count, buffer);
969 for (i = 0; i < rec_count; i++) {
970 if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
971 safe_unpackstr_xmalloc(&account, &name_len, buffer);
972 safe_unpack_time(&create_time, buffer);
973 safe_unpack32(&id, buffer);
974 safe_unpackstr_xmalloc(&name, &name_len, buffer);
975 safe_unpackstr_xmalloc(&partition, &name_len, buffer);
976 safe_unpackstr_xmalloc(&pool, &name_len, buffer);
977 safe_unpackstr_xmalloc(&qos, &name_len, buffer);
978 safe_unpack32(&user_id, buffer);
979 if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
980 safe_unpack64(&size, buffer);
981 }
982
983 if ((bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) &&
984 _is_complete_job(name)) {
985 info("%s: %s, Ignoring burst buffer state for completed job %s",
986 plugin_type, __func__, name);
987 bb_alloc = NULL;
988 } else if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
989 bb_alloc = bb_alloc_name_rec(&bb_state, name, user_id);
990 bb_alloc->id = id;
991 last_persistent_id = MAX(last_persistent_id, id);
992 if (name && (name[0] >='0') && (name[0] <='9')) {
993 bb_alloc->job_id = strtol(name, &end_ptr, 10);
994 bb_alloc->array_job_id = bb_alloc->job_id;
995 bb_alloc->array_task_id = NO_VAL;
996 }
997 bb_alloc->seen_time = time(NULL);
998 bb_alloc->size = size;
999 } else {
1000 bb_alloc = bb_find_name_rec(name, user_id, &bb_state);
1001 }
1002 if (bb_alloc) {
1003 if (bb_state.bb_config.debug_flag) {
1004 info("Recovered burst buffer %s from user %u",
1005 bb_alloc->name, bb_alloc->user_id);
1006 }
1007 xfree(bb_alloc->account);
1008 bb_alloc->account = account;
1009 account = NULL;
1010 bb_alloc->create_time = create_time;
1011 xfree(bb_alloc->partition);
1012 bb_alloc->partition = partition;
1013 partition = NULL;
1014 xfree(bb_alloc->pool);
1015 bb_alloc->pool = pool;
1016 pool = NULL;
1017 xfree(bb_alloc->qos);
1018 bb_alloc->qos = qos;
1019 qos = NULL;
1020 }
1021 xfree(account);
1022 xfree(name);
1023 xfree(partition);
1024 xfree(pool);
1025 xfree(qos);
1026 }
1027
1028 info("Recovered state of %d burst buffers", rec_count);
1029 free_buf(buffer);
1030 return;
1031
1032 unpack_error:
1033 if (!ignore_state_errors)
1034 fatal("Incomplete burst buffer data checkpoint file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
1035 error("Incomplete burst buffer data checkpoint file");
1036 xfree(account);
1037 xfree(name);
1038 xfree(partition);
1039 xfree(qos);
1040 free_buf(buffer);
1041 return;
1042 }
1043
1044 /* We just found an unexpected session, set default account, QOS, & partition.
1045 * Copy the information from any currently existing session for the same user.
1046 * If none found, use his default account and QOS.
1047 * NOTE: assoc_mgr_locks need to be locked with
1048 * assoc_mgr_lock_t assoc_locks = { READ_LOCK, NO_LOCK, READ_LOCK, NO_LOCK,
1049 * NO_LOCK, NO_LOCK, NO_LOCK };
1050 * before calling this.
1051 */
_pick_alloc_account(bb_alloc_t * bb_alloc)1052 static void _pick_alloc_account(bb_alloc_t *bb_alloc)
1053 {
1054 slurmdb_assoc_rec_t assoc_rec;
1055 slurmdb_qos_rec_t qos_rec;
1056 bb_alloc_t *bb_ptr = NULL;
1057
1058 bb_ptr = bb_state.bb_ahash[bb_alloc->user_id % BB_HASH_SIZE];
1059 while (bb_ptr) {
1060 if ((bb_ptr != bb_alloc) &&
1061 (bb_ptr->user_id == bb_alloc->user_id)) {
1062 xfree(bb_alloc->account);
1063 bb_alloc->account = xstrdup(bb_ptr->account);
1064 bb_alloc->assoc_ptr = bb_ptr->assoc_ptr;
1065 xfree(bb_alloc->partition);
1066 bb_alloc->partition = xstrdup(bb_ptr->partition);
1067 xfree(bb_alloc->qos);
1068 bb_alloc->qos = xstrdup(bb_ptr->qos);
1069 bb_alloc->qos_ptr = bb_ptr->qos_ptr;
1070 xfree(bb_alloc->assocs);
1071 bb_alloc->assocs = xstrdup(bb_ptr->assocs);
1072 return;
1073 }
1074 bb_ptr = bb_ptr->next;
1075 }
1076
1077 /* Set default for this user */
1078 bb_alloc->partition = xstrdup(default_part_name);
1079 memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
1080 memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
1081 assoc_rec.partition = default_part_name;
1082 assoc_rec.uid = bb_alloc->user_id;
1083
1084 if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
1085 accounting_enforce,
1086 &bb_alloc->assoc_ptr,
1087 true) == SLURM_SUCCESS) {
1088 xfree(bb_alloc->account);
1089 bb_alloc->account = xstrdup(assoc_rec.acct);
1090 xfree(bb_alloc->assocs);
1091 if (bb_alloc->assoc_ptr)
1092 bb_alloc->assocs =
1093 xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
1094
1095 assoc_mgr_get_default_qos_info(bb_alloc->assoc_ptr, &qos_rec);
1096 if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec,
1097 accounting_enforce,
1098 &bb_alloc->qos_ptr,
1099 true) == SLURM_SUCCESS) {
1100 xfree(bb_alloc->qos);
1101 if (bb_alloc->qos_ptr)
1102 bb_alloc->qos =
1103 xstrdup(bb_alloc->qos_ptr->name);
1104 }
1105 }
1106 }
1107
1108 /* For a given user/partition/account, set it's assoc_ptr */
_set_assoc_mgr_ptrs(bb_alloc_t * bb_alloc)1109 static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc)
1110 {
1111 /* read locks on assoc */
1112 assoc_mgr_lock_t assoc_locks =
1113 { .assoc = READ_LOCK, .qos = READ_LOCK, .user = READ_LOCK };
1114 slurmdb_assoc_rec_t assoc_rec;
1115 slurmdb_qos_rec_t qos_rec;
1116
1117 memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
1118 assoc_rec.acct = bb_alloc->account;
1119 assoc_rec.partition = bb_alloc->partition;
1120 assoc_rec.uid = bb_alloc->user_id;
1121 assoc_mgr_lock(&assoc_locks);
1122 if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
1123 accounting_enforce,
1124 &bb_alloc->assoc_ptr,
1125 true) == SLURM_SUCCESS) {
1126 xfree(bb_alloc->assocs);
1127 if (bb_alloc->assoc_ptr) {
1128 bb_alloc->assocs =
1129 xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
1130 }
1131 }
1132
1133 memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
1134 qos_rec.name = bb_alloc->qos;
1135 if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec, accounting_enforce,
1136 &bb_alloc->qos_ptr, true) != SLURM_SUCCESS)
1137 verbose("%s: %s: Invalid QOS name: %s",
1138 plugin_type, __func__, bb_alloc->qos);
1139
1140 assoc_mgr_unlock(&assoc_locks);
1141 }
1142
1143 /*
1144 * Determine the current actual burst buffer state.
1145 */
_load_state(bool init_config)1146 static void _load_state(bool init_config)
1147 {
1148 static bool first_run = true;
1149 burst_buffer_pool_t *pool_ptr;
1150 bb_configs_t *configs;
1151 bb_instances_t *instances;
1152 bb_pools_t *pools;
1153 bb_sessions_t *sessions;
1154 bb_alloc_t *bb_alloc;
1155 job_record_t *job_ptr;
1156 int num_configs = 0, num_instances = 0, num_pools = 0, num_sessions = 0;
1157 int i, j, pools_inx;
1158 char *end_ptr = NULL;
1159 time_t now = time(NULL);
1160 uint32_t timeout;
1161 assoc_mgr_lock_t assoc_locks = { .assoc = READ_LOCK,
1162 .qos = READ_LOCK,
1163 .user = READ_LOCK };
1164 bool found_pool;
1165 bitstr_t *pools_bitmap;
1166
1167 slurm_mutex_lock(&bb_state.bb_mutex);
1168 timeout = bb_state.bb_config.other_timeout * 1000;
1169 slurm_mutex_unlock(&bb_state.bb_mutex);
1170
1171 /*
1172 * Load the pools information
1173 */
1174 pools = _bb_get_pools(&num_pools, &bb_state, timeout);
1175 if (pools == NULL) {
1176 error("%s: %s: failed to find DataWarp entries, what now?",
1177 plugin_type, __func__);
1178 return;
1179 }
1180
1181 pools_bitmap = bit_alloc(bb_state.bb_config.pool_cnt + num_pools);
1182 slurm_mutex_lock(&bb_state.bb_mutex);
1183 if (!bb_state.bb_config.default_pool && (num_pools > 0)) {
1184 info("%s: %s: Setting DefaultPool to %s",
1185 plugin_type, __func__, pools[0].id);
1186 bb_state.bb_config.default_pool = xstrdup(pools[0].id);
1187 }
1188
1189 for (i = 0; i < num_pools; i++) {
1190 /* ID: "bytes" */
1191 if (xstrcmp(pools[i].id,
1192 bb_state.bb_config.default_pool) == 0) {
1193 bb_state.bb_config.granularity = pools[i].granularity;
1194 bb_state.total_space = pools[i].quantity *
1195 pools[i].granularity;
1196 bb_state.unfree_space = pools[i].quantity -
1197 pools[i].free;
1198 bb_state.unfree_space *= pools[i].granularity;
1199 continue;
1200 }
1201
1202 found_pool = false;
1203 pool_ptr = bb_state.bb_config.pool_ptr;
1204 for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
1205 if (!xstrcmp(pool_ptr->name, pools[i].id)) {
1206 found_pool = true;
1207 break;
1208 }
1209 }
1210 if (!found_pool) {
1211 if (!first_run) {
1212 info("%s: %s: Newly reported pool %s",
1213 plugin_type, __func__, pools[i].id);
1214 }
1215 bb_state.bb_config.pool_ptr
1216 = xrealloc(bb_state.bb_config.pool_ptr,
1217 sizeof(burst_buffer_pool_t) *
1218 (bb_state.bb_config.pool_cnt + 1));
1219 pool_ptr = bb_state.bb_config.pool_ptr +
1220 bb_state.bb_config.pool_cnt;
1221 pool_ptr->name = xstrdup(pools[i].id);
1222 bb_state.bb_config.pool_cnt++;
1223 }
1224
1225 pools_inx = pool_ptr - bb_state.bb_config.pool_ptr;
1226 bit_set(pools_bitmap, pools_inx);
1227 pool_ptr->total_space = pools[i].quantity *
1228 pools[i].granularity;
1229 pool_ptr->granularity = pools[i].granularity;
1230 pool_ptr->unfree_space = pools[i].quantity - pools[i].free;
1231 pool_ptr->unfree_space *= pools[i].granularity;
1232 }
1233
1234 pool_ptr = bb_state.bb_config.pool_ptr;
1235 for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
1236 if (bit_test(pools_bitmap, j) || (pool_ptr->total_space == 0))
1237 continue;
1238 error("%s: %s: Pool %s no longer reported by system, setting size to zero",
1239 plugin_type, __func__, pool_ptr->name);
1240 pool_ptr->total_space = 0;
1241 pool_ptr->used_space = 0;
1242 pool_ptr->unfree_space = 0;
1243 }
1244 first_run = false;
1245 slurm_mutex_unlock(&bb_state.bb_mutex);
1246 FREE_NULL_BITMAP(pools_bitmap);
1247 _bb_free_pools(pools, num_pools);
1248
1249 /*
1250 * Load the instances information
1251 */
1252 instances = _bb_get_instances(&num_instances, &bb_state, timeout);
1253 if (instances == NULL) {
1254 if (bb_state.bb_config.debug_flag)
1255 debug("%s: %s: No DataWarp instances found",
1256 plugin_type, __func__);
1257 num_instances = 0; /* Redundant, but fixes CLANG bug */
1258 }
1259 sessions = _bb_get_sessions(&num_sessions, &bb_state, timeout);
1260 assoc_mgr_lock(&assoc_locks);
1261 slurm_mutex_lock(&bb_state.bb_mutex);
1262 bb_state.last_load_time = time(NULL);
1263 for (i = 0; i < num_sessions; i++) {
1264 if (!init_config) {
1265 bb_alloc = bb_find_name_rec(sessions[i].token,
1266 sessions[i].user_id,
1267 &bb_state);
1268 if (bb_alloc) {
1269 bb_alloc->seen_time = bb_state.last_load_time;
1270 continue;
1271 }
1272 if (difftime(now, sessions[i].created) <
1273 bb_state.bb_config.other_timeout) {
1274 /* Newly created in other thread. Give that
1275 * thread a chance to add the entry */
1276 continue;
1277 }
1278 error("%s: %s: Unexpected burst buffer found: %s",
1279 plugin_type, __func__, sessions[i].token);
1280 }
1281
1282 bb_alloc = bb_alloc_name_rec(&bb_state, sessions[i].token,
1283 sessions[i].user_id);
1284 bb_alloc->create_time = sessions[i].created;
1285 bb_alloc->id = sessions[i].id;
1286 if ((sessions[i].token != NULL) &&
1287 (sessions[i].token[0] >= '0') &&
1288 (sessions[i].token[0] <= '9')) {
1289 bb_alloc->job_id =
1290 strtol(sessions[i].token, &end_ptr, 10);
1291 job_ptr = find_job_record(bb_alloc->job_id);
1292 if (job_ptr) {
1293 bb_alloc->array_job_id = job_ptr->array_job_id;
1294 bb_alloc->array_task_id =job_ptr->array_task_id;
1295 } else {
1296 bb_alloc->array_task_id = NO_VAL;
1297 }
1298 }
1299 for (j = 0; j < num_instances; j++) {
1300 if (sessions[i].id != instances[j].session)
1301 continue;
1302 bb_alloc->size += instances[j].bytes;
1303 }
1304 bb_alloc->seen_time = bb_state.last_load_time;
1305
1306 if (!init_config) { /* Newly found buffer */
1307 _pick_alloc_account(bb_alloc);
1308 bb_limit_add(bb_alloc->user_id, bb_alloc->size,
1309 bb_alloc->pool, &bb_state, false);
1310 }
1311 if (bb_alloc->job_id == 0)
1312 bb_post_persist_create(NULL, bb_alloc, &bb_state);
1313 }
1314 slurm_mutex_unlock(&bb_state.bb_mutex);
1315 assoc_mgr_unlock(&assoc_locks);
1316 _bb_free_sessions(sessions, num_sessions);
1317 _bb_free_instances(instances, num_instances);
1318
1319 if (!init_config)
1320 return;
1321
1322 /*
1323 * Load the configurations information
1324 * NOTE: This information is currently unused
1325 */
1326 configs = _bb_get_configs(&num_configs, &bb_state, timeout);
1327 if (configs == NULL) {
1328 info("%s: %s: No DataWarp configurations found",
1329 plugin_type, __func__);
1330 num_configs = 0;
1331 }
1332 _bb_free_configs(configs, num_configs);
1333
1334 _recover_bb_state();
1335 _apply_limits();
1336 bb_state.last_update_time = time(NULL);
1337
1338 return;
1339 }
1340
1341 /* Write an string representing the NIDs of a job's nodes to an arbitrary
1342 * file location
1343 * RET 0 or Slurm error code
1344 */
_write_nid_file(char * file_name,char * node_list,job_record_t * job_ptr)1345 static int _write_nid_file(char *file_name, char *node_list,
1346 job_record_t *job_ptr)
1347 {
1348 #if defined(HAVE_NATIVE_CRAY)
1349 char *tmp, *sep, *buf = NULL;
1350 int i, j, rc;
1351
1352 xassert(file_name);
1353 tmp = xstrdup(node_list);
1354 /* Remove any trailing "]" */
1355 sep = strrchr(tmp, ']');
1356 if (sep)
1357 sep[0] = '\0';
1358 /* Skip over "nid[" or "nid" */
1359 sep = strchr(tmp, '[');
1360 if (sep) {
1361 sep++;
1362 } else {
1363 sep = tmp;
1364 for (i = 0; !isdigit(sep[0]) && sep[0]; i++)
1365 sep++;
1366 }
1367 /* Copy numeric portion */
1368 buf = xmalloc(strlen(sep) + 1);
1369 for (i = 0, j = 0; sep[i]; i++) {
1370 /* Skip leading zeros */
1371 if ((sep[i] == '0') && isdigit(sep[i+1]))
1372 continue;
1373 /* Copy significant digits and separator */
1374 while (sep[i]) {
1375 if (sep[i] == ',') {
1376 buf[j++] = '\n';
1377 break;
1378 }
1379 buf[j++] = sep[i];
1380 if (sep[i] == '-')
1381 break;
1382 i++;
1383 }
1384 if (!sep[i])
1385 break;
1386 }
1387 xfree(tmp);
1388
1389 if (buf[0]) {
1390 rc = _write_file(file_name, buf);
1391 } else {
1392 error("%s: %s: %pJ has node list without numeric component (%s)",
1393 plugin_type, __func__, job_ptr, node_list);
1394 rc = EINVAL;
1395 }
1396 xfree(buf);
1397 return rc;
1398 #else
1399 char *tok, *buf = NULL;
1400 int rc;
1401
1402 xassert(file_name);
1403 if (node_list && node_list[0]) {
1404 hostlist_t hl = hostlist_create(node_list);
1405 while ((tok = hostlist_shift(hl))) {
1406 xstrfmtcat(buf, "%s\n", tok);
1407 free(tok);
1408 }
1409 hostlist_destroy(hl);
1410 rc = _write_file(file_name, buf);
1411 xfree(buf);
1412 } else {
1413 error("%s: %s: %pJ lacks a node list",
1414 plugin_type, __func__, job_ptr);
1415 rc = EINVAL;
1416 }
1417 return rc;
1418 #endif
1419 }
1420
1421 /* Write an arbitrary string to an arbitrary file name */
_write_file(char * file_name,char * buf)1422 static int _write_file(char *file_name, char *buf)
1423 {
1424 int amount, fd, nwrite, pos;
1425
1426 (void) unlink(file_name);
1427 fd = creat(file_name, 0600);
1428 if (fd < 0) {
1429 error("Error creating file %s, %m", file_name);
1430 return errno;
1431 }
1432
1433 if (!buf) {
1434 error("%s: %s: buf is NULL", plugin_type, __func__);
1435 return SLURM_ERROR;
1436 }
1437
1438 nwrite = strlen(buf);
1439 pos = 0;
1440 while (nwrite > 0) {
1441 amount = write(fd, &buf[pos], nwrite);
1442 if ((amount < 0) && (errno != EINTR)) {
1443 error("Error writing file %s, %m", file_name);
1444 close(fd);
1445 return ESLURM_WRITING_TO_FILE;
1446 }
1447 nwrite -= amount;
1448 pos += amount;
1449 }
1450
1451 (void) close(fd);
1452 return SLURM_SUCCESS;
1453 }
1454
_queue_stage_in(job_record_t * job_ptr,bb_job_t * bb_job)1455 static int _queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job)
1456 {
1457 char *hash_dir = NULL, *job_dir = NULL, *job_pool;
1458 char *client_nodes_file_nid = NULL;
1459 char **setup_argv, **data_in_argv;
1460 stage_args_t *stage_args;
1461 int hash_inx = job_ptr->job_id % 10;
1462 int rc = SLURM_SUCCESS;
1463 pthread_t tid;
1464
1465 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
1466 (void) mkdir(hash_dir, 0700);
1467 xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
1468 if (job_ptr->sched_nodes) {
1469 xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir);
1470 if (_write_nid_file(client_nodes_file_nid,
1471 job_ptr->sched_nodes, job_ptr))
1472 xfree(client_nodes_file_nid);
1473 }
1474 setup_argv = xcalloc(20, sizeof(char *)); /* NULL terminated */
1475 setup_argv[0] = xstrdup("dw_wlm_cli");
1476 setup_argv[1] = xstrdup("--function");
1477 setup_argv[2] = xstrdup("setup");
1478 setup_argv[3] = xstrdup("--token");
1479 xstrfmtcat(setup_argv[4], "%u", job_ptr->job_id);
1480 setup_argv[5] = xstrdup("--caller");
1481 setup_argv[6] = xstrdup("SLURM");
1482 setup_argv[7] = xstrdup("--user");
1483 xstrfmtcat(setup_argv[8], "%u", job_ptr->user_id);
1484 setup_argv[9] = xstrdup("--groupid");
1485 xstrfmtcat(setup_argv[10], "%u", job_ptr->group_id);
1486 setup_argv[11] = xstrdup("--capacity");
1487 if (bb_job->job_pool)
1488 job_pool = bb_job->job_pool;
1489 else
1490 job_pool = bb_state.bb_config.default_pool;
1491 xstrfmtcat(setup_argv[12], "%s:%s",
1492 job_pool, bb_get_size_str(bb_job->total_size));
1493 setup_argv[13] = xstrdup("--job");
1494 setup_argv[14] = bb_handle_job_script(job_ptr, bb_job);
1495 if (client_nodes_file_nid) {
1496 #if defined(HAVE_NATIVE_CRAY)
1497 setup_argv[15] = xstrdup("--nidlistfile");
1498 #else
1499 setup_argv[15] = xstrdup("--nodehostnamefile");
1500 #endif
1501 setup_argv[16] = xstrdup(client_nodes_file_nid);
1502 }
1503 bb_limit_add(job_ptr->user_id, bb_job->total_size, job_pool, &bb_state,
1504 true);
1505
1506 data_in_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
1507 data_in_argv[0] = xstrdup("dw_wlm_cli");
1508 data_in_argv[1] = xstrdup("--function");
1509 data_in_argv[2] = xstrdup("data_in");
1510 data_in_argv[3] = xstrdup("--token");
1511 xstrfmtcat(data_in_argv[4], "%u", job_ptr->job_id);
1512 data_in_argv[5] = xstrdup("--job");
1513 data_in_argv[6] = bb_handle_job_script(job_ptr, bb_job);
1514
1515 stage_args = xmalloc(sizeof(stage_args_t));
1516 stage_args->bb_size = bb_job->total_size;
1517 stage_args->job_id = job_ptr->job_id;
1518 stage_args->pool = xstrdup(job_pool);
1519 stage_args->user_id = job_ptr->user_id;
1520 stage_args->args1 = setup_argv;
1521 stage_args->args2 = data_in_argv;
1522
1523 slurm_thread_create(&tid, _start_stage_in, stage_args);
1524
1525 xfree(hash_dir);
1526 xfree(job_dir);
1527 xfree(client_nodes_file_nid);
1528 return rc;
1529 }
1530
_update_system_comment(job_record_t * job_ptr,char * operation,char * resp_msg,bool update_database)1531 static void _update_system_comment(job_record_t *job_ptr, char *operation,
1532 char *resp_msg, bool update_database)
1533 {
1534 char *sep = NULL;
1535
1536 if (job_ptr->system_comment &&
1537 (strlen(job_ptr->system_comment) >= 1024)) {
1538 /* Avoid filling comment with repeated BB failures */
1539 return;
1540 }
1541
1542 if (job_ptr->system_comment)
1543 xstrftimecat(sep, "\n%x %X");
1544 else
1545 xstrftimecat(sep, "%x %X");
1546 xstrfmtcat(job_ptr->system_comment, "%s %s: %s: %s",
1547 sep, plugin_type, operation, resp_msg);
1548 xfree(sep);
1549
1550 if (update_database) {
1551 slurmdb_job_cond_t job_cond;
1552 slurmdb_job_rec_t job_rec;
1553 slurmdb_selected_step_t selected_step;
1554 List ret_list;
1555
1556 memset(&job_cond, 0, sizeof(slurmdb_job_cond_t));
1557 memset(&job_rec, 0, sizeof(slurmdb_job_rec_t));
1558 memset(&selected_step, 0, sizeof(slurmdb_selected_step_t));
1559
1560 selected_step.array_task_id = NO_VAL;
1561 selected_step.jobid = job_ptr->job_id;
1562 selected_step.het_job_offset = NO_VAL;
1563 selected_step.stepid = NO_VAL;
1564 job_cond.step_list = list_create(NULL);
1565 list_append(job_cond.step_list, &selected_step);
1566
1567 job_cond.flags = JOBCOND_FLAG_NO_WAIT |
1568 JOBCOND_FLAG_DBD_UID |
1569 JOBCOND_FLAG_NO_DEFAULT_USAGE;
1570
1571 job_cond.cluster_list = list_create(NULL);
1572 list_append(job_cond.cluster_list, slurmctld_conf.cluster_name);
1573
1574 job_cond.usage_start = job_ptr->details->submit_time;
1575
1576 job_rec.system_comment = job_ptr->system_comment;
1577
1578 ret_list = acct_storage_g_modify_job(
1579 acct_db_conn, slurmctld_conf.slurm_user_id,
1580 &job_cond, &job_rec);
1581
1582 FREE_NULL_LIST(job_cond.cluster_list);
1583 FREE_NULL_LIST(job_cond.step_list);
1584 FREE_NULL_LIST(ret_list);
1585 }
1586 }
1587
_start_stage_in(void * x)1588 static void *_start_stage_in(void *x)
1589 {
1590 stage_args_t *stage_args = (stage_args_t *) x;
1591 char **setup_argv, **size_argv, **data_in_argv;
1592 char *resp_msg = NULL, *resp_msg2 = NULL, *op = NULL;
1593 uint64_t real_size = 0;
1594 int rc = SLURM_SUCCESS, status = 0, timeout;
1595 slurmctld_lock_t job_write_lock =
1596 { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
1597 job_record_t *job_ptr;
1598 bb_alloc_t *bb_alloc = NULL;
1599 bb_job_t *bb_job;
1600 bool get_real_size = false;
1601 DEF_TIMERS;
1602 track_script_rec_add(stage_args->job_id, 0, pthread_self());
1603
1604 setup_argv = stage_args->args1;
1605 data_in_argv = stage_args->args2;
1606
1607 timeout = bb_state.bb_config.other_timeout * 1000;
1608 op = "setup";
1609 START_TIMER;
1610 resp_msg = run_command("setup",
1611 bb_state.bb_config.get_sys_state,
1612 setup_argv, timeout, pthread_self(),
1613 &status);
1614 END_TIMER;
1615 info("%s: %s: setup for job JobId=%u ran for %s",
1616 plugin_type, __func__, stage_args->job_id, TIME_STR);
1617
1618 if (track_script_broadcast(pthread_self(), status)) {
1619 /* I was killed by slurmtrack, bail out right now */
1620 info("%s: %s: setup for JobId=%u terminated by slurmctld",
1621 plugin_type, __func__, stage_args->job_id);
1622 free_command_argv(setup_argv);
1623 free_command_argv(data_in_argv);
1624 xfree(resp_msg);
1625 xfree(stage_args->pool);
1626 xfree(stage_args);
1627 track_script_remove(pthread_self());
1628 return NULL;
1629 }
1630 track_script_reset_cpid(pthread_self(), 0);
1631
1632 _log_script_argv(setup_argv, resp_msg);
1633 lock_slurmctld(job_write_lock);
1634 slurm_mutex_lock(&bb_state.bb_mutex);
1635 /*
1636 * The buffer's actual size may be larger than requested by the user.
1637 * Remove limit here and restore limit based upon actual size below
1638 * (assuming buffer allocation succeeded, or just leave it out).
1639 */
1640 bb_limit_rem(stage_args->user_id, stage_args->bb_size, stage_args->pool,
1641 &bb_state);
1642
1643 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
1644 trigger_burst_buffer();
1645 error("%s: %s: setup for JobId=%u status:%u response:%s",
1646 plugin_type, __func__, stage_args->job_id, status,
1647 resp_msg);
1648 rc = SLURM_ERROR;
1649 job_ptr = find_job_record(stage_args->job_id);
1650 if (job_ptr)
1651 _update_system_comment(job_ptr, "setup", resp_msg, 0);
1652 } else {
1653 job_ptr = find_job_record(stage_args->job_id);
1654 bb_job = bb_job_find(&bb_state, stage_args->job_id);
1655 if (!job_ptr) {
1656 error("%s: %s: unable to find job record for JobId=%u",
1657 plugin_type, __func__, stage_args->job_id);
1658 rc = SLURM_ERROR;
1659 } else if (!bb_job) {
1660 error("%s: %s: unable to find bb_job record for %pJ",
1661 plugin_type, __func__, job_ptr);
1662 } else {
1663 bb_job->state = BB_STATE_STAGING_IN;
1664 bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
1665 if (!bb_alloc && bb_job->total_size) {
1666 /* Not found (from restart race condtion) and
1667 * job buffer has non-zero size */
1668 bb_alloc = bb_alloc_job(&bb_state, job_ptr,
1669 bb_job);
1670 bb_limit_add(stage_args->user_id,
1671 bb_job->total_size,
1672 stage_args->pool, &bb_state,
1673 true);
1674 bb_alloc->create_time = time(NULL);
1675 }
1676 }
1677 }
1678 slurm_mutex_unlock(&bb_state.bb_mutex);
1679 unlock_slurmctld(job_write_lock);
1680
1681 if (rc == SLURM_SUCCESS) {
1682 timeout = bb_state.bb_config.stage_in_timeout * 1000;
1683 xfree(resp_msg);
1684
1685 op = "dws_data_in";
1686 START_TIMER;
1687 resp_msg = run_command("dws_data_in",
1688 bb_state.bb_config.get_sys_state,
1689 data_in_argv, timeout, pthread_self(),
1690 &status);
1691 END_TIMER;
1692 info("%s: %s: dws_data_in for JobId=%u ran for %s",
1693 plugin_type, __func__, stage_args->job_id, TIME_STR);
1694 if (track_script_broadcast(pthread_self(), status)) {
1695 /* I was killed by slurmtrack, bail out right now */
1696 info("%s: %s: dws_data_in for JobId=%u terminated by slurmctld",
1697 plugin_type, __func__, stage_args->job_id);
1698 free_command_argv(setup_argv);
1699 free_command_argv(data_in_argv);
1700 xfree(resp_msg);
1701 xfree(stage_args->pool);
1702 xfree(stage_args);
1703 /*
1704 * Don't need to free track_script_rec here,
1705 * it is handled elsewhere since it still being tracked.
1706 */
1707 return NULL;
1708 }
1709 track_script_reset_cpid(pthread_self(), 0);
1710
1711 _log_script_argv(data_in_argv, resp_msg);
1712 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
1713 !strstr(resp_msg, "No matching session")) {
1714 trigger_burst_buffer();
1715 error("%s: %s: dws_data_in for JobId=%u status:%u response:%s",
1716 plugin_type, __func__, stage_args->job_id, status,
1717 resp_msg);
1718 rc = SLURM_ERROR;
1719 lock_slurmctld(job_write_lock);
1720 job_ptr = find_job_record(stage_args->job_id);
1721 if (job_ptr)
1722 _update_system_comment(job_ptr, "data_in",
1723 resp_msg, 0);
1724 unlock_slurmctld(job_write_lock);
1725 }
1726 }
1727
1728 slurm_mutex_lock(&bb_state.bb_mutex);
1729 bb_job = bb_job_find(&bb_state, stage_args->job_id);
1730 if (bb_job && bb_job->req_size)
1731 get_real_size = true;
1732 slurm_mutex_unlock(&bb_state.bb_mutex);
1733
1734 /* Round up job buffer size based upon DW "equalize_fragments"
1735 * configuration parameter */
1736 if (get_real_size) {
1737 size_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
1738 size_argv[0] = xstrdup("dw_wlm_cli");
1739 size_argv[1] = xstrdup("--function");
1740 size_argv[2] = xstrdup("real_size");
1741 size_argv[3] = xstrdup("--token");
1742 xstrfmtcat(size_argv[4], "%u", stage_args->job_id);
1743 START_TIMER;
1744 resp_msg2 = run_command("real_size",
1745 bb_state.bb_config.get_sys_state,
1746 size_argv, timeout, pthread_self(),
1747 &status);
1748 END_TIMER;
1749 if ((DELTA_TIMER > 200000) || /* 0.2 secs */
1750 bb_state.bb_config.debug_flag)
1751 info("%s: %s: real_size ran for %s",
1752 plugin_type, __func__, TIME_STR);
1753
1754 if (track_script_broadcast(pthread_self(), status)) {
1755 /* I was killed by slurmtrack, bail out right now */
1756 info("%s: %s: real_size for JobId=%u terminated by slurmctld",
1757 plugin_type, __func__, stage_args->job_id);
1758 free_command_argv(setup_argv);
1759 free_command_argv(data_in_argv);
1760 xfree(resp_msg);
1761 xfree(resp_msg2);
1762 free_command_argv(size_argv);
1763 xfree(stage_args->pool);
1764 xfree(stage_args);
1765 /*
1766 * Don't need to free track_script_rec here,
1767 * it is handled elsewhere since it still being tracked.
1768 */
1769 return NULL;
1770 }
1771 track_script_reset_cpid(pthread_self(), 0);
1772
1773 /* Use resp_msg2 to preserve resp_msg for error message below */
1774 _log_script_argv(size_argv, resp_msg2);
1775
1776 if (WIFEXITED(status) && (WEXITSTATUS(status) != 0) &&
1777 resp_msg2 &&
1778 (strncmp(resp_msg2, "invalid function", 16) == 0)) {
1779 debug("%s: %s: Old dw_wlm_cli does not support real_size function",
1780 plugin_type, __func__);
1781 } else if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
1782 error("%s: %s: real_size for JobId=%u status:%u response:%s",
1783 plugin_type, __func__, stage_args->job_id,
1784 status, resp_msg2);
1785 } else if (resp_msg2 && resp_msg2[0]) {
1786 json_object *j;
1787 struct bb_total_size *ent;
1788 j = json_tokener_parse(resp_msg2);
1789 if (j == NULL) {
1790 error("%s: %s: json parser failed on \"%s\"",
1791 plugin_type, __func__, resp_msg2);
1792 } else {
1793 ent = _json_parse_real_size(j);
1794 json_object_put(j); /* Frees json memory */
1795 if (ent && (ent->units == BB_UNITS_BYTES))
1796 real_size = ent->capacity;
1797 xfree(ent);
1798 }
1799 }
1800 xfree(resp_msg2);
1801 free_command_argv(size_argv);
1802 }
1803
1804 lock_slurmctld(job_write_lock);
1805 job_ptr = find_job_record(stage_args->job_id);
1806 if (!job_ptr) {
1807 error("%s: %s: unable to find job record for JobId=%u",
1808 plugin_type, __func__, stage_args->job_id);
1809 } else if (rc == SLURM_SUCCESS) {
1810 slurm_mutex_lock(&bb_state.bb_mutex);
1811 bb_job = bb_job_find(&bb_state, stage_args->job_id);
1812 if (bb_job)
1813 bb_job->state = BB_STATE_STAGED_IN;
1814 if (bb_job && bb_job->total_size) {
1815 if (real_size > bb_job->req_size) {
1816 info("%s: %s: %pJ total_size increased from %"PRIu64" to %"PRIu64,
1817 plugin_type, __func__, job_ptr,
1818 bb_job->req_size, real_size);
1819 bb_job->total_size = real_size;
1820 }
1821 bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
1822 if (bb_alloc) {
1823 bb_alloc->state = BB_STATE_STAGED_IN;
1824 bb_alloc->state_time = time(NULL);
1825 if (bb_state.bb_config.debug_flag) {
1826 info("%s: %s: Setup/stage-in complete for %pJ",
1827 plugin_type, __func__, job_ptr);
1828 }
1829 queue_job_scheduler();
1830 bb_state.last_update_time = time(NULL);
1831 } else {
1832 error("%s: %s: unable to find bb_alloc record for %pJ",
1833 plugin_type, __func__, job_ptr);
1834 }
1835 }
1836 slurm_mutex_unlock(&bb_state.bb_mutex);
1837 } else {
1838 xfree(job_ptr->state_desc);
1839 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
1840 xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
1841 plugin_type, op, resp_msg);
1842 job_ptr->priority = 0; /* Hold job */
1843 bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
1844 if (bb_alloc) {
1845 bb_alloc->state_time = time(NULL);
1846 bb_state.last_update_time = time(NULL);
1847 if (bb_state.bb_config.flags &
1848 BB_FLAG_TEARDOWN_FAILURE) {
1849 bb_alloc->state = BB_STATE_TEARDOWN;
1850 _queue_teardown(job_ptr->job_id,
1851 job_ptr->user_id, true);
1852 } else {
1853 bb_alloc->state = BB_STATE_ALLOCATED;
1854 }
1855 } else {
1856 _queue_teardown(job_ptr->job_id, job_ptr->user_id,true);
1857 }
1858 }
1859 unlock_slurmctld(job_write_lock);
1860
1861 xfree(resp_msg);
1862 free_command_argv(setup_argv);
1863 free_command_argv(data_in_argv);
1864 xfree(stage_args->pool);
1865 xfree(stage_args);
1866
1867 track_script_remove(pthread_self());
1868
1869 return NULL;
1870 }
1871
_queue_stage_out(job_record_t * job_ptr,bb_job_t * bb_job)1872 static int _queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job)
1873 {
1874 char *hash_dir = NULL, *job_dir = NULL;
1875 char **post_run_argv, **data_out_argv;
1876 stage_args_t *stage_args;
1877 int hash_inx = bb_job->job_id % 10, rc = SLURM_SUCCESS;
1878 pthread_t tid;
1879
1880 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
1881 xstrfmtcat(job_dir, "%s/job.%u", hash_dir, bb_job->job_id);
1882
1883 data_out_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
1884 data_out_argv[0] = xstrdup("dw_wlm_cli");
1885 data_out_argv[1] = xstrdup("--function");
1886 data_out_argv[2] = xstrdup("data_out");
1887 data_out_argv[3] = xstrdup("--token");
1888 xstrfmtcat(data_out_argv[4], "%u", bb_job->job_id);
1889 data_out_argv[5] = xstrdup("--job");
1890 data_out_argv[6] = bb_handle_job_script(job_ptr, bb_job);
1891
1892 post_run_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
1893 post_run_argv[0] = xstrdup("dw_wlm_cli");
1894 post_run_argv[1] = xstrdup("--function");
1895 post_run_argv[2] = xstrdup("post_run");
1896 post_run_argv[3] = xstrdup("--token");
1897 xstrfmtcat(post_run_argv[4], "%u", bb_job->job_id);
1898 post_run_argv[5] = xstrdup("--job");
1899 post_run_argv[6] = bb_handle_job_script(job_ptr, bb_job);
1900
1901 stage_args = xmalloc(sizeof(stage_args_t));
1902 stage_args->args1 = data_out_argv;
1903 stage_args->args2 = post_run_argv;
1904 stage_args->job_id = bb_job->job_id;
1905 stage_args->user_id = bb_job->user_id;
1906
1907 slurm_thread_create(&tid, _start_stage_out, stage_args);
1908
1909 xfree(hash_dir);
1910 xfree(job_dir);
1911 return rc;
1912 }
1913
_start_stage_out(void * x)1914 static void *_start_stage_out(void *x)
1915 {
1916 stage_args_t *stage_args = (stage_args_t *)x;
1917 char **post_run_argv, **data_out_argv, *resp_msg = NULL, *op = NULL;
1918 int rc = SLURM_SUCCESS, status = 0, timeout;
1919 slurmctld_lock_t job_write_lock =
1920 { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
1921 job_record_t *job_ptr;
1922 bb_alloc_t *bb_alloc = NULL;
1923 bb_job_t *bb_job = NULL;
1924 DEF_TIMERS
1925 track_script_rec_add(stage_args->job_id, 0, pthread_self());
1926
1927 data_out_argv = stage_args->args1;
1928 post_run_argv = stage_args->args2;
1929
1930 timeout = bb_state.bb_config.other_timeout * 1000;
1931 op = "dws_post_run";
1932 START_TIMER;
1933 resp_msg = run_command("dws_post_run",
1934 bb_state.bb_config.get_sys_state,
1935 post_run_argv, timeout, pthread_self(),
1936 &status);
1937 END_TIMER;
1938 if ((DELTA_TIMER > 500000) || /* 0.5 secs */
1939 bb_state.bb_config.debug_flag) {
1940 info("%s: %s: dws_post_run for JobId=%u ran for %s",
1941 plugin_type, __func__, stage_args->job_id, TIME_STR);
1942 }
1943
1944 if (track_script_broadcast(pthread_self(), status)) {
1945 /* I was killed by slurmtrack, bail out right now */
1946 info("%s: %s: dws_post_run for JobId=%u terminated by slurmctld",
1947 plugin_type, __func__, stage_args->job_id);
1948 free_command_argv(post_run_argv);
1949 free_command_argv(data_out_argv);
1950 xfree(resp_msg);
1951 xfree(stage_args->pool);
1952 xfree(stage_args);
1953 track_script_remove(pthread_self());
1954 return NULL;
1955 }
1956 track_script_reset_cpid(pthread_self(), 0);
1957
1958 _log_script_argv(post_run_argv, resp_msg);
1959 lock_slurmctld(job_write_lock);
1960 job_ptr = find_job_record(stage_args->job_id);
1961 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
1962 trigger_burst_buffer();
1963 error("%s: %s: dws_post_run for JobId=%u status:%u response:%s",
1964 plugin_type, __func__, stage_args->job_id, status,
1965 resp_msg);
1966 rc = SLURM_ERROR;
1967 if (job_ptr) {
1968 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
1969 xfree(job_ptr->state_desc);
1970 xstrfmtcat(job_ptr->state_desc, "%s: post_run: %s",
1971 plugin_type, resp_msg);
1972 _update_system_comment(job_ptr, "post_run",
1973 resp_msg, 1);
1974 }
1975 }
1976 if (!job_ptr) {
1977 error("%s: %s: unable to find job record for JobId=%u",
1978 plugin_type, __func__, stage_args->job_id);
1979 } else {
1980 slurm_mutex_lock(&bb_state.bb_mutex);
1981 bb_job = _get_bb_job(job_ptr);
1982 if (bb_job)
1983 bb_job->state = BB_STATE_STAGING_OUT;
1984 slurm_mutex_unlock(&bb_state.bb_mutex);
1985 }
1986 unlock_slurmctld(job_write_lock);
1987
1988 if (rc == SLURM_SUCCESS) {
1989 timeout = bb_state.bb_config.stage_out_timeout * 1000;
1990 op = "dws_data_out";
1991 START_TIMER;
1992 xfree(resp_msg);
1993 resp_msg = run_command("dws_data_out",
1994 bb_state.bb_config.get_sys_state,
1995 data_out_argv, timeout, pthread_self(),
1996 &status);
1997 END_TIMER;
1998 if ((DELTA_TIMER > 1000000) || /* 10 secs */
1999 bb_state.bb_config.debug_flag) {
2000 info("%s: %s: dws_data_out for JobId=%u ran for %s",
2001 plugin_type, __func__, stage_args->job_id,
2002 TIME_STR);
2003 }
2004
2005 if (track_script_broadcast(pthread_self(), status)) {
2006 /* I was killed by slurmtrack, bail out right now */
2007 info("%s: %s: dws_data_out for JobId=%u terminated by slurmctld",
2008 plugin_type, __func__, stage_args->job_id);
2009 free_command_argv(post_run_argv);
2010 free_command_argv(data_out_argv);
2011 xfree(resp_msg);
2012 xfree(stage_args->pool);
2013 xfree(stage_args);
2014 track_script_remove(pthread_self());
2015 return NULL;
2016 }
2017 track_script_reset_cpid(pthread_self(), 0);
2018
2019 _log_script_argv(data_out_argv, resp_msg);
2020 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
2021 !strstr(resp_msg, "No matching session")) {
2022 trigger_burst_buffer();
2023 error("%s: %s: dws_data_out for JobId=%u status:%u response:%s",
2024 plugin_type, __func__, stage_args->job_id,
2025 status, resp_msg);
2026 rc = SLURM_ERROR;
2027 lock_slurmctld(job_write_lock);
2028 job_ptr = find_job_record(stage_args->job_id);
2029 if (job_ptr) {
2030 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
2031 xfree(job_ptr->state_desc);
2032 xstrfmtcat(job_ptr->state_desc,
2033 "%s: stage-out: %s",
2034 plugin_type, resp_msg);
2035 _update_system_comment(job_ptr, "data_out",
2036 resp_msg, 1);
2037 }
2038 unlock_slurmctld(job_write_lock);
2039 }
2040 }
2041
2042 lock_slurmctld(job_write_lock);
2043 job_ptr = find_job_record(stage_args->job_id);
2044 if (!job_ptr) {
2045 error("%s: %s: unable to find job record for JobId=%u",
2046 plugin_type, __func__, stage_args->job_id);
2047 } else {
2048 if (rc != SLURM_SUCCESS) {
2049 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
2050 xfree(job_ptr->state_desc);
2051 xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
2052 plugin_type, op, resp_msg);
2053 } else {
2054 job_ptr->job_state &= (~JOB_STAGE_OUT);
2055 xfree(job_ptr->state_desc);
2056 last_job_update = time(NULL);
2057 }
2058 slurm_mutex_lock(&bb_state.bb_mutex);
2059 bb_job = _get_bb_job(job_ptr);
2060 if ((rc == SLURM_SUCCESS) && bb_job)
2061 bb_job->state = BB_STATE_TEARDOWN;
2062 bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
2063 if (bb_alloc) {
2064 if (rc == SLURM_SUCCESS) {
2065 if (bb_state.bb_config.debug_flag) {
2066 info("%s: %s: Stage-out/post-run complete for %pJ",
2067 plugin_type, __func__, job_ptr);
2068 }
2069 /* bb_alloc->state = BB_STATE_STAGED_OUT; */
2070 bb_alloc->state = BB_STATE_TEARDOWN;
2071 bb_alloc->state_time = time(NULL);
2072 } else {
2073 if (bb_state.bb_config.flags &
2074 BB_FLAG_TEARDOWN_FAILURE) {
2075 bb_alloc->state = BB_STATE_TEARDOWN;
2076 _queue_teardown(stage_args->job_id,
2077 stage_args->user_id,
2078 false);
2079 } else
2080 bb_alloc->state = BB_STATE_STAGED_IN;
2081 if (bb_state.bb_config.debug_flag) {
2082 info("%s: %s: Stage-out failed for %pJ",
2083 plugin_type, __func__, job_ptr);
2084 }
2085 }
2086 bb_state.last_update_time = time(NULL);
2087 } else if (bb_job && bb_job->total_size) {
2088 error("%s: %s: unable to find bb record for %pJ",
2089 plugin_type, __func__, job_ptr);
2090 }
2091 if (rc == SLURM_SUCCESS) {
2092 _queue_teardown(stage_args->job_id, stage_args->user_id,
2093 false);
2094 }
2095 slurm_mutex_unlock(&bb_state.bb_mutex);
2096 }
2097 unlock_slurmctld(job_write_lock);
2098
2099 xfree(resp_msg);
2100 free_command_argv(post_run_argv);
2101 free_command_argv(data_out_argv);
2102 xfree(stage_args);
2103
2104 track_script_remove(pthread_self());
2105
2106 return NULL;
2107 }
2108
_queue_teardown(uint32_t job_id,uint32_t user_id,bool hurry)2109 static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry)
2110 {
2111 struct stat buf;
2112 char *hash_dir = NULL, *job_script = NULL;
2113 char **teardown_argv;
2114 stage_args_t *teardown_args;
2115 int fd, hash_inx = job_id % 10;
2116 pthread_t tid;
2117
2118 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
2119 xstrfmtcat(job_script, "%s/job.%u/script", hash_dir, job_id);
2120 if (stat(job_script, &buf) == -1) {
2121 xfree(job_script);
2122 xstrfmtcat(job_script, "%s/burst_buffer_script",
2123 state_save_loc);
2124 if (stat(job_script, &buf) == -1) {
2125 fd = creat(job_script, 0755);
2126 if (fd >= 0) {
2127 int len;
2128 char *dummy_script = "#!/bin/bash\nexit 0\n";
2129 len = strlen(dummy_script) + 1;
2130 if (write(fd, dummy_script, len) != len) {
2131 verbose("%s: %s: write(%s): %m",
2132 plugin_type, __func__,
2133 job_script);
2134 }
2135 close(fd);
2136 }
2137 }
2138 }
2139
2140 teardown_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
2141 teardown_argv[0] = xstrdup("dw_wlm_cli");
2142 teardown_argv[1] = xstrdup("--function");
2143 teardown_argv[2] = xstrdup("teardown");
2144 teardown_argv[3] = xstrdup("--token");
2145 xstrfmtcat(teardown_argv[4], "%u", job_id);
2146 teardown_argv[5] = xstrdup("--job");
2147 teardown_argv[6] = xstrdup(job_script);
2148 if (hurry)
2149 teardown_argv[7] = xstrdup("--hurry");
2150
2151 teardown_args = xmalloc(sizeof(stage_args_t));
2152 teardown_args->job_id = job_id;
2153 teardown_args->user_id = user_id;
2154 teardown_args->args1 = teardown_argv;
2155
2156 slurm_thread_create(&tid, _start_teardown, teardown_args);
2157
2158 xfree(hash_dir);
2159 xfree(job_script);
2160 }
2161
_start_teardown(void * x)2162 static void *_start_teardown(void *x)
2163 {
2164 static uint32_t previous_job_id = 0;
2165 stage_args_t *teardown_args = (stage_args_t *)x;
2166 char **teardown_argv, *resp_msg = NULL;
2167 int status = 0, timeout;
2168 job_record_t *job_ptr;
2169 bb_alloc_t *bb_alloc = NULL;
2170 bb_job_t *bb_job = NULL;
2171 /* Locks: write job */
2172 slurmctld_lock_t job_write_lock = {
2173 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
2174 DEF_TIMERS;
2175 bool hurry;
2176 track_script_rec_add(teardown_args->job_id, 0, pthread_self());
2177
2178 teardown_argv = teardown_args->args1;
2179
2180 if (previous_job_id == teardown_args->job_id)
2181 sleep(5);
2182 previous_job_id = teardown_args->job_id;
2183
2184 START_TIMER;
2185 timeout = bb_state.bb_config.other_timeout * 1000;
2186 resp_msg = run_command("teardown",
2187 bb_state.bb_config.get_sys_state,
2188 teardown_argv, timeout, pthread_self(),
2189 &status);
2190 END_TIMER;
2191 info("%s: %s: teardown for JobId=%u ran for %s",
2192 plugin_type, __func__, teardown_args->job_id, TIME_STR);
2193
2194 if (track_script_broadcast(pthread_self(), status)) {
2195 /* I was killed by slurmtrack, bail out right now */
2196 info("%s: %s: teardown for JobId=%u terminated by slurmctld",
2197 plugin_type, __func__, teardown_args->job_id);
2198 xfree(resp_msg);
2199 free_command_argv(teardown_argv);
2200 xfree(teardown_args);
2201 track_script_remove(pthread_self());
2202 return NULL;
2203 }
2204 /* track_script_reset_cpid(pthread_self(), 0); */
2205
2206 _log_script_argv(teardown_argv, resp_msg);
2207
2208 /*
2209 * "Teardown" is run at every termination of every job that _might_
2210 * have a burst buffer, so an error of "token not found" should be
2211 * fairly common and not indicative of a problem.
2212 */
2213 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
2214 (!resp_msg ||
2215 (!strstr(resp_msg, "No matching session") &&
2216 !strstr(resp_msg, "token not found")))) {
2217 lock_slurmctld(job_write_lock);
2218 slurm_mutex_lock(&bb_state.bb_mutex);
2219 job_ptr = find_job_record(teardown_args->job_id);
2220 if (job_ptr &&
2221 (bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))) {
2222 bb_alloc->state = BB_STATE_TEARDOWN_FAIL;
2223 }
2224 slurm_mutex_unlock(&bb_state.bb_mutex);
2225 unlock_slurmctld(job_write_lock);
2226
2227 trigger_burst_buffer();
2228 error("%s: %s: teardown for JobId=%u status:%u response:%s",
2229 plugin_type, __func__, teardown_args->job_id, status,
2230 resp_msg);
2231
2232
2233 lock_slurmctld(job_write_lock);
2234 job_ptr = find_job_record(teardown_args->job_id);
2235 if (job_ptr) {
2236 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
2237 xfree(job_ptr->state_desc);
2238 xstrfmtcat(job_ptr->state_desc, "%s: teardown: %s",
2239 plugin_type, resp_msg);
2240 _update_system_comment(job_ptr, "teardown",
2241 resp_msg, 0);
2242 }
2243 unlock_slurmctld(job_write_lock);
2244
2245
2246 if (!xstrcmp(teardown_argv[7], "--hurry"))
2247 hurry = true;
2248 else
2249 hurry = false;
2250 _queue_teardown(teardown_args->job_id, teardown_args->user_id,
2251 hurry);
2252 } else {
2253 lock_slurmctld(job_write_lock);
2254 slurm_mutex_lock(&bb_state.bb_mutex);
2255 job_ptr = find_job_record(teardown_args->job_id);
2256 _purge_bb_files(teardown_args->job_id, job_ptr);
2257 if (job_ptr) {
2258 if ((bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))){
2259 bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
2260 bb_alloc->pool, &bb_state);
2261 (void) bb_free_alloc_rec(&bb_state, bb_alloc);
2262 }
2263 if ((bb_job = _get_bb_job(job_ptr)))
2264 bb_job->state = BB_STATE_COMPLETE;
2265 job_ptr->job_state &= (~JOB_STAGE_OUT);
2266 if (!IS_JOB_PENDING(job_ptr) && /* No email if requeue */
2267 (job_ptr->mail_type & MAIL_JOB_STAGE_OUT)) {
2268 /*
2269 * NOTE: If a job uses multiple burst buffer
2270 * plugins, the message will be sent after the
2271 * teardown completes in the first plugin
2272 */
2273 mail_job_info(job_ptr, MAIL_JOB_STAGE_OUT);
2274 job_ptr->mail_type &= (~MAIL_JOB_STAGE_OUT);
2275 }
2276 } else {
2277 /*
2278 * This will happen when slurmctld restarts and needs
2279 * to clear vestigial buffers
2280 */
2281 char buf_name[32];
2282 snprintf(buf_name, sizeof(buf_name), "%u",
2283 teardown_args->job_id);
2284 bb_alloc = bb_find_name_rec(buf_name,
2285 teardown_args->user_id,
2286 &bb_state);
2287 if (bb_alloc) {
2288 bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
2289 bb_alloc->pool, &bb_state);
2290 (void) bb_free_alloc_rec(&bb_state, bb_alloc);
2291 }
2292
2293 }
2294 slurm_mutex_unlock(&bb_state.bb_mutex);
2295 unlock_slurmctld(job_write_lock);
2296 }
2297
2298 xfree(resp_msg);
2299 free_command_argv(teardown_argv);
2300 xfree(teardown_args);
2301
2302 track_script_remove(pthread_self());
2303
2304 return NULL;
2305 }
2306
2307 /* Reduced burst buffer space in advanced reservation for resources already
2308 * allocated to jobs. What's left is space reserved for future jobs */
_rm_active_job_bb(char * resv_name,char ** pool_name,int64_t * resv_space,int ds_len)2309 static void _rm_active_job_bb(char *resv_name, char **pool_name,
2310 int64_t *resv_space, int ds_len)
2311 {
2312 ListIterator job_iterator;
2313 job_record_t *job_ptr;
2314 bb_job_t *bb_job;
2315 int i;
2316
2317 job_iterator = list_iterator_create(job_list);
2318 while ((job_ptr = list_next(job_iterator))) {
2319 if ((job_ptr->burst_buffer == NULL) ||
2320 (job_ptr->burst_buffer[0] == '\0') ||
2321 (xstrcmp(job_ptr->resv_name, resv_name) == 0))
2322 continue;
2323 bb_job = bb_job_find(&bb_state,job_ptr->job_id);
2324 if (!bb_job || (bb_job->state <= BB_STATE_PENDING) ||
2325 (bb_job->state >= BB_STATE_COMPLETE))
2326 continue;
2327 for (i = 0; i < ds_len; i++) {
2328 if (xstrcmp(bb_job->job_pool, pool_name[i]))
2329 continue;
2330 if (resv_space[i] >= bb_job->total_size)
2331 resv_space[i] -= bb_job->total_size;
2332 else
2333 resv_space[i] = 0;
2334 break;
2335 }
2336 }
2337 list_iterator_destroy(job_iterator);
2338 }
2339
2340 /* Test if a job can be allocated a burst buffer.
2341 * This may preempt currently active stage-in for higher priority jobs.
2342 *
2343 * RET 0: Job can be started now
2344 * 1: Job exceeds configured limits, continue testing with next job
2345 * 2: Job needs more resources than currently available can not start,
2346 * skip all remaining jobs
2347 */
_test_size_limit(job_record_t * job_ptr,bb_job_t * bb_job)2348 static int _test_size_limit(job_record_t *job_ptr, bb_job_t *bb_job)
2349 {
2350 int64_t *add_space = NULL, *avail_space = NULL, *granularity = NULL;
2351 int64_t *preempt_space = NULL, *resv_space = NULL, *total_space = NULL;
2352 uint64_t unfree_space;
2353 burst_buffer_info_msg_t *resv_bb = NULL;
2354 struct preempt_bb_recs *preempt_ptr = NULL;
2355 char **pool_name, *my_pool;
2356 int ds_len;
2357 burst_buffer_pool_t *pool_ptr;
2358 bb_buf_t *buf_ptr;
2359 bb_alloc_t *bb_ptr = NULL;
2360 int i, j, k, rc = 0;
2361 bool avail_ok, do_preempt, preempt_ok;
2362 time_t now = time(NULL);
2363 List preempt_list = NULL;
2364 ListIterator preempt_iter;
2365
2366 xassert(bb_job);
2367
2368 /* Initialize data structure */
2369 ds_len = bb_state.bb_config.pool_cnt + 1;
2370 add_space = xcalloc(ds_len, sizeof(int64_t));
2371 avail_space = xcalloc(ds_len, sizeof(int64_t));
2372 granularity = xcalloc(ds_len, sizeof(int64_t));
2373 pool_name = xcalloc(ds_len, sizeof(char *));
2374 preempt_space = xcalloc(ds_len, sizeof(int64_t));
2375 resv_space = xcalloc(ds_len, sizeof(int64_t));
2376 total_space = xcalloc(ds_len, sizeof(int64_t));
2377 for (i = 0, pool_ptr = bb_state.bb_config.pool_ptr;
2378 i < bb_state.bb_config.pool_cnt; i++, pool_ptr++) {
2379 unfree_space = MAX(pool_ptr->used_space,
2380 pool_ptr->unfree_space);
2381 if (pool_ptr->total_space >= unfree_space)
2382 avail_space[i] = pool_ptr->total_space - unfree_space;
2383 granularity[i] = pool_ptr->granularity;
2384 pool_name[i] = pool_ptr->name;
2385 total_space[i] = pool_ptr->total_space;
2386 }
2387 unfree_space = MAX(bb_state.used_space, bb_state.unfree_space);
2388 if (bb_state.total_space - unfree_space)
2389 avail_space[i] = bb_state.total_space - unfree_space;
2390 granularity[i] = bb_state.bb_config.granularity;
2391 pool_name[i] = bb_state.bb_config.default_pool;
2392 total_space[i] = bb_state.total_space;
2393
2394 /* Determine job size requirements by pool */
2395 if (bb_job->total_size) {
2396 for (j = 0; j < ds_len; j++) {
2397 if (!xstrcmp(bb_job->job_pool, pool_name[j])) {
2398 add_space[j] += bb_granularity(
2399 bb_job->total_size,
2400 granularity[j]);
2401 break;
2402 }
2403 }
2404 }
2405 for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
2406 i++, buf_ptr++) {
2407 if (!buf_ptr->create || (buf_ptr->state >= BB_STATE_ALLOCATING))
2408 continue;
2409 for (j = 0; j < ds_len; j++) {
2410 if (!xstrcmp(buf_ptr->pool, pool_name[j])) {
2411 add_space[j] += bb_granularity(buf_ptr->size,
2412 granularity[j]);
2413 break;
2414 }
2415 }
2416 }
2417
2418 /* Account for reserved resources. Reduce reservation size for
2419 * resources already claimed from the reservation. Assume node reboot
2420 * required since we have not selected the compute nodes yet. */
2421 resv_bb = job_test_bb_resv(job_ptr, now, true);
2422 if (resv_bb) {
2423 burst_buffer_info_t *resv_bb_ptr;
2424 for (i = 0, resv_bb_ptr = resv_bb->burst_buffer_array;
2425 i < resv_bb->record_count; i++, resv_bb_ptr++) {
2426 if (xstrcmp(resv_bb_ptr->name, bb_state.name))
2427 continue;
2428 for (j = 0, pool_ptr = resv_bb_ptr->pool_ptr;
2429 j < resv_bb_ptr->pool_cnt; j++, pool_ptr++) {
2430 if (pool_ptr->name) {
2431 my_pool = pool_ptr->name;
2432 } else {
2433 my_pool =
2434 bb_state.bb_config.default_pool;
2435 }
2436 unfree_space = MAX(pool_ptr->used_space,
2437 pool_ptr->unfree_space);
2438 for (k = 0; k < ds_len; k++) {
2439 if (xstrcmp(my_pool, pool_name[k]))
2440 continue;
2441 resv_space[k] += bb_granularity(
2442 unfree_space,
2443 granularity[k]);
2444 break;
2445 }
2446 }
2447 if (resv_bb_ptr->used_space) {
2448 /* Pool not specified, use default */
2449 my_pool = bb_state.bb_config.default_pool;
2450 for (k = 0; k < ds_len; k++) {
2451 if (xstrcmp(my_pool, pool_name[k]))
2452 continue;
2453 resv_space[k] += bb_granularity(
2454 resv_bb_ptr->used_space,
2455 granularity[k]);
2456 break;
2457 }
2458 }
2459 #if 1
2460 /* Is any of this reserved space already taken? */
2461 _rm_active_job_bb(job_ptr->resv_name,
2462 pool_name, resv_space, ds_len);
2463 #endif
2464 }
2465 }
2466
2467 #if _DEBUG
2468 info("TEST_SIZE_LIMIT for %pJ", job_ptr);
2469 for (j = 0; j < ds_len; j++) {
2470 info("POOL:%s ADD:%"PRIu64" AVAIL:%"PRIu64
2471 " GRANULARITY:%"PRIu64" RESV:%"PRIu64" TOTAL:%"PRIu64,
2472 pool_name[j], add_space[j], avail_space[j], granularity[j],
2473 resv_space[j], total_space[j]);
2474 }
2475 #endif
2476
2477 /* Determine if resources currently are available for the job */
2478 avail_ok = true;
2479 for (j = 0; j < ds_len; j++) {
2480 if (add_space[j] > total_space[j]) {
2481 rc = 1;
2482 goto fini;
2483 }
2484 if ((add_space[j] + resv_space[j]) > avail_space[j])
2485 avail_ok = false;
2486 }
2487 if (avail_ok) {
2488 rc = 0;
2489 goto fini;
2490 }
2491
2492 /* Identify candidate burst buffers to revoke for higher priority job */
2493 preempt_list = list_create(bb_job_queue_del);
2494 for (i = 0; i < BB_HASH_SIZE; i++) {
2495 bb_ptr = bb_state.bb_ahash[i];
2496 while (bb_ptr) {
2497 if ((bb_ptr->job_id != 0) &&
2498 ((bb_ptr->name == NULL) ||
2499 ((bb_ptr->name[0] >= '0') &&
2500 (bb_ptr->name[0] <= '9'))) &&
2501 (bb_ptr->use_time > now) &&
2502 (bb_ptr->use_time > job_ptr->start_time)) {
2503 if (!bb_ptr->pool) {
2504 bb_ptr->name = xstrdup(
2505 bb_state.bb_config.default_pool);
2506 }
2507 preempt_ptr = xmalloc(sizeof(
2508 struct preempt_bb_recs));
2509 preempt_ptr->bb_ptr = bb_ptr;
2510 preempt_ptr->job_id = bb_ptr->job_id;
2511 preempt_ptr->pool = bb_ptr->name;
2512 preempt_ptr->size = bb_ptr->size;
2513 preempt_ptr->use_time = bb_ptr->use_time;
2514 preempt_ptr->user_id = bb_ptr->user_id;
2515 list_push(preempt_list, preempt_ptr);
2516
2517 for (j = 0; j < ds_len; j++) {
2518 if (xstrcmp(bb_ptr->name, pool_name[j]))
2519 continue;
2520 preempt_ptr->size = bb_granularity(
2521 bb_ptr->size,
2522 granularity[j]);
2523 preempt_space[j] += preempt_ptr->size;
2524 break;
2525 }
2526 }
2527 bb_ptr = bb_ptr->next;
2528 }
2529 }
2530
2531 #if _DEBUG
2532 for (j = 0; j < ds_len; j++) {
2533 info("POOL:%s ADD:%"PRIu64" AVAIL:%"PRIu64
2534 " GRANULARITY:%"PRIu64" PREEMPT:%"PRIu64
2535 " RESV:%"PRIu64" TOTAL:%"PRIu64,
2536 pool_name[j], add_space[j], avail_space[j], granularity[j],
2537 preempt_space[j], resv_space[j], total_space[j]);
2538 }
2539 #endif
2540
2541 /* Determine if sufficient resources available after preemption */
2542 rc = 2;
2543 preempt_ok = true;
2544 for (j = 0; j < ds_len; j++) {
2545 if ((add_space[j] + resv_space[j]) >
2546 (avail_space[j] + preempt_space[j])) {
2547 preempt_ok = false;
2548 break;
2549 }
2550 }
2551 if (!preempt_ok)
2552 goto fini;
2553
2554 /* Now preempt/teardown the most appropriate buffers */
2555 list_sort(preempt_list, bb_preempt_queue_sort);
2556 preempt_iter = list_iterator_create(preempt_list);
2557 while ((preempt_ptr = list_next(preempt_iter))) {
2558 do_preempt = false;
2559 for (j = 0; j < ds_len; j++) {
2560 if (xstrcmp(preempt_ptr->pool, pool_name[j]))
2561 continue;
2562 if ((add_space[j] + resv_space[j]) > avail_space[j]) {
2563 avail_space[j] += preempt_ptr->size;
2564 preempt_space[j] -= preempt_ptr->size;
2565 do_preempt = true;
2566 }
2567 break;
2568 }
2569 if (do_preempt) {
2570 preempt_ptr->bb_ptr->cancelled = true;
2571 preempt_ptr->bb_ptr->end_time = 0;
2572 preempt_ptr->bb_ptr->state = BB_STATE_TEARDOWN;
2573 preempt_ptr->bb_ptr->state_time = time(NULL);
2574 _queue_teardown(preempt_ptr->job_id,
2575 preempt_ptr->user_id, true);
2576 if (bb_state.bb_config.debug_flag) {
2577 info("%s: %s: Preempting stage-in of JobId=%u for %pJ",
2578 plugin_type, __func__,
2579 preempt_ptr->job_id, job_ptr);
2580 }
2581 }
2582
2583 }
2584 list_iterator_destroy(preempt_iter);
2585
2586 fini: xfree(add_space);
2587 xfree(avail_space);
2588 xfree(granularity);
2589 xfree(pool_name);
2590 xfree(preempt_space);
2591 xfree(resv_space);
2592 xfree(total_space);
2593 if (resv_bb)
2594 slurm_free_burst_buffer_info_msg(resv_bb);
2595 FREE_NULL_LIST(preempt_list);
2596 return rc;
2597 }
2598
2599 /* Handle timeout of burst buffer events:
2600 * 1. Purge per-job burst buffer records when the stage-out has completed and
2601 * the job has been purged from Slurm
2602 * 2. Test for StageInTimeout events
2603 * 3. Test for StageOutTimeout events
2604 */
_timeout_bb_rec(void)2605 static void _timeout_bb_rec(void)
2606 {
2607 bb_alloc_t **bb_pptr, *bb_alloc = NULL;
2608 job_record_t *job_ptr;
2609 int i;
2610
2611 if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
2612 return;
2613
2614 for (i = 0; i < BB_HASH_SIZE; i++) {
2615 bb_pptr = &bb_state.bb_ahash[i];
2616 bb_alloc = bb_state.bb_ahash[i];
2617 while (bb_alloc) {
2618 if (((bb_alloc->seen_time + TIME_SLOP) <
2619 bb_state.last_load_time) &&
2620 (bb_alloc->state == BB_STATE_TEARDOWN)) {
2621 /*
2622 * Teardown likely complete, but bb_alloc state
2623 * not yet updated; skip the record
2624 */
2625 } else if ((bb_alloc->seen_time + TIME_SLOP) <
2626 bb_state.last_load_time) {
2627 assoc_mgr_lock_t assoc_locks =
2628 { .assoc = READ_LOCK,
2629 .qos = READ_LOCK };
2630 /*
2631 * assoc_mgr needs locking to call
2632 * bb_post_persist_delete
2633 */
2634 if (bb_alloc->job_id == 0) {
2635 info("%s: %s: Persistent burst buffer %s purged",
2636 plugin_type, __func__,
2637 bb_alloc->name);
2638 } else if (bb_state.bb_config.debug_flag) {
2639 info("%s: %s: burst buffer for JobId=%u purged",
2640 plugin_type,
2641 __func__, bb_alloc->job_id);
2642 }
2643 bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
2644 bb_alloc->pool, &bb_state);
2645
2646 assoc_mgr_lock(&assoc_locks);
2647 bb_post_persist_delete(bb_alloc, &bb_state);
2648 assoc_mgr_unlock(&assoc_locks);
2649
2650 *bb_pptr = bb_alloc->next;
2651 bb_free_alloc_buf(bb_alloc);
2652 break;
2653 } else if (bb_alloc->state == BB_STATE_COMPLETE) {
2654 job_ptr = find_job_record(bb_alloc->job_id);
2655 if (!job_ptr || IS_JOB_PENDING(job_ptr)) {
2656 /* Job purged or BB preempted */
2657 *bb_pptr = bb_alloc->next;
2658 bb_free_alloc_buf(bb_alloc);
2659 break;
2660 }
2661 }
2662 bb_pptr = &bb_alloc->next;
2663 bb_alloc = bb_alloc->next;
2664 }
2665 }
2666 }
2667
2668 /* Perform basic burst_buffer option validation */
_parse_bb_opts(job_desc_msg_t * job_desc,uint64_t * bb_size,uid_t submit_uid)2669 static int _parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
2670 uid_t submit_uid)
2671 {
2672 char *bb_script, *save_ptr = NULL;
2673 char *bb_name = NULL, *bb_pool, *capacity;
2674 char *end_ptr = NULL, *sub_tok, *tok;
2675 uint64_t tmp_cnt, swap_cnt = 0;
2676 int rc = SLURM_SUCCESS;
2677 bool enable_persist = false, have_bb = false, have_stage_out = false;
2678
2679 xassert(bb_size);
2680 *bb_size = 0;
2681
2682 if (validate_operator(submit_uid) ||
2683 (bb_state.bb_config.flags & BB_FLAG_ENABLE_PERSISTENT))
2684 enable_persist = true;
2685
2686 if (job_desc->script)
2687 rc = _xlate_batch(job_desc);
2688 else
2689 rc = _xlate_interactive(job_desc);
2690 if ((rc != SLURM_SUCCESS) || (!job_desc->burst_buffer))
2691 return rc;
2692
2693 bb_script = xstrdup(job_desc->burst_buffer);
2694 tok = strtok_r(bb_script, "\n", &save_ptr);
2695 while (tok) {
2696 uint32_t bb_flag = 0;
2697 tmp_cnt = 0;
2698 if (tok[0] != '#')
2699 break; /* Quit at first non-comment */
2700
2701 if ((tok[1] == 'B') && (tok[2] == 'B'))
2702 bb_flag = BB_FLAG_BB_OP;
2703 else if ((tok[1] == 'D') && (tok[2] == 'W'))
2704 bb_flag = BB_FLAG_DW_OP;
2705
2706 /*
2707 * Effective Slurm v18.08 and CLE6.0UP06 the create_persistent
2708 * and destroy_persistent functions are directly supported by
2709 * dw_wlm_cli. Support "#BB" format for backward compatibility.
2710 */
2711 if (bb_flag == BB_FLAG_BB_OP) {
2712 tok += 3;
2713 while (isspace(tok[0]))
2714 tok++;
2715 if (!xstrncmp(tok, "create_persistent", 17) &&
2716 !enable_persist) {
2717 info("%s: %s: User %d disabled from creating persistent burst buffer",
2718 plugin_type, __func__, submit_uid);
2719 rc = ESLURM_BURST_BUFFER_PERMISSION;
2720 break;
2721 } else if (!xstrncmp(tok, "create_persistent", 17)) {
2722 have_bb = true;
2723 bb_name = NULL;
2724 bb_pool = NULL;
2725 if ((sub_tok = strstr(tok, "capacity="))) {
2726 tmp_cnt = bb_get_size_num(sub_tok+9, 1);
2727 }
2728 if (tmp_cnt == 0)
2729 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2730 if ((sub_tok = strstr(tok, "name="))) {
2731 bb_name = xstrdup(sub_tok + 5);
2732 if ((sub_tok = strchr(bb_name, ' ')))
2733 sub_tok[0] = '\0';
2734 } else {
2735 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2736 }
2737 if (!bb_name ||
2738 ((bb_name[0] >= '0') &&
2739 (bb_name[0] <= '9')))
2740 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2741 xfree(bb_name);
2742 if ((sub_tok = strstr(tok, "pool="))) {
2743 bb_pool = xstrdup(sub_tok + 5);
2744 if ((sub_tok = strchr(bb_pool, ' ')))
2745 sub_tok[0] = '\0';
2746 }
2747 if (!bb_valid_pool_test(&bb_state, bb_pool))
2748 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2749 *bb_size += _set_granularity(tmp_cnt, bb_pool);
2750 xfree(bb_pool);
2751 if (rc != SLURM_SUCCESS)
2752 break;
2753 } else if (!xstrncmp(tok, "destroy_persistent", 18) &&
2754 !enable_persist) {
2755 info("%s: %s: User %d disabled from destroying persistent burst buffer",
2756 plugin_type, __func__, submit_uid);
2757 rc = ESLURM_BURST_BUFFER_PERMISSION;
2758 break;
2759 } else if (!xstrncmp(tok, "destroy_persistent", 18)) {
2760 have_bb = true;
2761 if (!(sub_tok = strstr(tok, "name="))) {
2762 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2763 break;
2764 }
2765 } else {
2766 /* Ignore other (future) options */
2767 }
2768 }
2769 if (bb_flag == BB_FLAG_DW_OP) {
2770 tok += 3;
2771 while (isspace(tok[0]))
2772 tok++;
2773 if (!xstrncmp(tok, "jobdw", 5) &&
2774 (capacity = strstr(tok, "capacity="))) {
2775 bb_pool = NULL;
2776 have_bb = true;
2777 tmp_cnt = bb_get_size_num(capacity + 9, 1);
2778 if (tmp_cnt == 0) {
2779 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2780 break;
2781 }
2782 if ((sub_tok = strstr(tok, "pool="))) {
2783 bb_pool = xstrdup(sub_tok + 5);
2784 if ((sub_tok = strchr(bb_pool, ' ')))
2785 sub_tok[0] = '\0';
2786 }
2787 if (!bb_valid_pool_test(&bb_state, bb_pool))
2788 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2789 *bb_size += _set_granularity(tmp_cnt, bb_pool);
2790 xfree(bb_pool);
2791 } else if (!xstrncmp(tok, "persistentdw", 12)) {
2792 have_bb = true;
2793 } else if (!xstrncmp(tok, "swap", 4)) {
2794 bb_pool = NULL;
2795 have_bb = true;
2796 tok += 4;
2797 while (isspace(tok[0]) && (tok[0] != '\0'))
2798 tok++;
2799 swap_cnt += strtol(tok, &end_ptr, 10);
2800 if ((job_desc->max_nodes == 0) ||
2801 (job_desc->max_nodes == NO_VAL)) {
2802 info("%s: %s: user %u submitted job with swap space specification, but no max node count specification",
2803 plugin_type, __func__,
2804 job_desc->user_id);
2805 if (job_desc->min_nodes == NO_VAL)
2806 job_desc->min_nodes = 1;
2807 job_desc->max_nodes =
2808 job_desc->min_nodes;
2809 }
2810 tmp_cnt = swap_cnt * job_desc->max_nodes;
2811 if ((sub_tok = strstr(tok, "pool="))) {
2812 bb_pool = xstrdup(sub_tok + 5);
2813 if ((sub_tok = strchr(bb_pool, ' ')))
2814 sub_tok[0] = '\0';
2815 }
2816 if (!bb_valid_pool_test(&bb_state, bb_pool))
2817 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2818 *bb_size += _set_granularity(tmp_cnt, bb_pool);
2819 xfree(bb_pool);
2820 } else if (!xstrncmp(tok, "stage_out", 9)) {
2821 have_stage_out = true;
2822 } else if (!xstrncmp(tok, "create_persistent", 17) ||
2823 !xstrncmp(tok, "destroy_persistent", 18)) {
2824 /*
2825 * Disable support until Slurm v18.08 to prevent
2826 * user directed persistent burst buffer changes
2827 * outside of Slurm control.
2828 */
2829 rc = ESLURM_BURST_BUFFER_PERMISSION;
2830 break;
2831
2832 }
2833 }
2834 tok = strtok_r(NULL, "\n", &save_ptr);
2835 }
2836 xfree(bb_script);
2837
2838 if (!have_bb)
2839 rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
2840
2841 if (!have_stage_out) {
2842 /* prevent sending stage out email */
2843 job_desc->mail_type &= (~MAIL_JOB_STAGE_OUT);
2844 }
2845
2846 return rc;
2847 }
2848
2849 /* Copy a batch job's burst_buffer options into a separate buffer.
2850 * merge continued lines into a single line */
_xlate_batch(job_desc_msg_t * job_desc)2851 static int _xlate_batch(job_desc_msg_t *job_desc)
2852 {
2853 char *script, *save_ptr = NULL, *tok;
2854 int line_type, prev_type = LINE_OTHER;
2855 bool is_cont = false, has_space = false;
2856 int len, rc = SLURM_SUCCESS;
2857
2858 /*
2859 * Any command line --bb options get added to the script
2860 */
2861 if (job_desc->burst_buffer) {
2862 rc = _xlate_interactive(job_desc);
2863 if (rc != SLURM_SUCCESS)
2864 return rc;
2865 _add_bb_to_script(&job_desc->script, job_desc->burst_buffer);
2866 xfree(job_desc->burst_buffer);
2867 }
2868
2869 script = xstrdup(job_desc->script);
2870 tok = strtok_r(script, "\n", &save_ptr);
2871 while (tok) {
2872 if (tok[0] != '#')
2873 break; /* Quit at first non-comment */
2874
2875 if ((tok[1] == 'B') && (tok[2] == 'B'))
2876 line_type = LINE_BB;
2877 else if ((tok[1] == 'D') && (tok[2] == 'W'))
2878 line_type = LINE_DW;
2879 else
2880 line_type = LINE_OTHER;
2881
2882 if (line_type == LINE_OTHER) {
2883 is_cont = false;
2884 } else {
2885 if (is_cont) {
2886 if (line_type != prev_type) {
2887 /*
2888 * Mixing "#DW" with "#BB" on same
2889 * (continued) line, error
2890 */
2891 rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2892 break;
2893 }
2894 tok += 3; /* Skip "#DW" or "#BB" */
2895 while (has_space && isspace(tok[0]))
2896 tok++; /* Skip duplicate spaces */
2897 } else if (job_desc->burst_buffer) {
2898 xstrcat(job_desc->burst_buffer, "\n");
2899 }
2900 prev_type = line_type;
2901
2902 len = strlen(tok);
2903 if (tok[len - 1] == '\\') {
2904 has_space = isspace(tok[len - 2]);
2905 tok[strlen(tok) - 1] = '\0';
2906 is_cont = true;
2907 } else {
2908 is_cont = false;
2909 }
2910 xstrcat(job_desc->burst_buffer, tok);
2911 }
2912 tok = strtok_r(NULL, "\n", &save_ptr);
2913 }
2914 xfree(script);
2915 if (rc != SLURM_SUCCESS)
2916 xfree(job_desc->burst_buffer);
2917 return rc;
2918 }
2919
2920 /* Parse simple interactive burst_buffer options into an format identical to
2921 * burst_buffer options in a batch script file */
_xlate_interactive(job_desc_msg_t * job_desc)2922 static int _xlate_interactive(job_desc_msg_t *job_desc)
2923 {
2924 char *access = NULL, *bb_copy = NULL, *capacity = NULL, *pool = NULL;
2925 char *swap = NULL, *type = NULL;
2926 char *end_ptr = NULL, *sep, *tok;
2927 uint64_t buf_size = 0, swap_cnt = 0;
2928 int i, rc = SLURM_SUCCESS, tok_len;
2929
2930 if (!job_desc->burst_buffer || (job_desc->burst_buffer[0] == '#'))
2931 return rc;
2932
2933 if (strstr(job_desc->burst_buffer, "create_persistent") ||
2934 strstr(job_desc->burst_buffer, "destroy_persistent")) {
2935 /* Create or destroy of persistent burst buffers NOT supported
2936 * via --bb option. Use --bbf or a batch script instead. */
2937 return ESLURM_INVALID_BURST_BUFFER_REQUEST;
2938 }
2939
2940 bb_copy = xstrdup(job_desc->burst_buffer);
2941 if ((tok = strstr(bb_copy, "access="))) {
2942 access = xstrdup(tok + 7);
2943 sep = strchr(access, ',');
2944 if (sep)
2945 sep[0] = '\0';
2946 sep = strchr(access, ' ');
2947 if (sep)
2948 sep[0] = '\0';
2949 tok_len = strlen(access) + 7;
2950 memset(tok, ' ', tok_len);
2951 }
2952 if ((access == NULL) && /* Not set above with "access=" */
2953 (tok = strstr(bb_copy, "access_mode="))) {
2954 access = xstrdup(tok + 12);
2955 sep = strchr(access, ',');
2956 if (sep)
2957 sep[0] = '\0';
2958 sep = strchr(access, ' ');
2959 if (sep)
2960 sep[0] = '\0';
2961 tok_len = strlen(access) + 12;
2962 memset(tok, ' ', tok_len);
2963 }
2964
2965 if ((tok = strstr(bb_copy, "capacity="))) {
2966 buf_size = bb_get_size_num(tok + 9, 1);
2967 if (buf_size == 0) {
2968 rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
2969 goto fini;
2970 }
2971 capacity = xstrdup(tok + 9);
2972 sep = strchr(capacity, ',');
2973 if (sep)
2974 sep[0] = '\0';
2975 sep = strchr(capacity, ' ');
2976 if (sep)
2977 sep[0] = '\0';
2978 tok_len = strlen(capacity) + 9;
2979 memset(tok, ' ', tok_len);
2980 }
2981
2982
2983 if ((tok = strstr(bb_copy, "pool="))) {
2984 pool = xstrdup(tok + 5);
2985 sep = strchr(pool, ',');
2986 if (sep)
2987 sep[0] = '\0';
2988 sep = strchr(pool, ' ');
2989 if (sep)
2990 sep[0] = '\0';
2991 tok_len = strlen(pool) + 5;
2992 memset(tok, ' ', tok_len);
2993 }
2994
2995 if ((tok = strstr(bb_copy, "swap="))) {
2996 swap_cnt = strtol(tok + 5, &end_ptr, 10);
2997 if (swap_cnt == 0) {
2998 rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
2999 goto fini;
3000 }
3001 swap = xstrdup(tok + 5);
3002 sep = strchr(swap, ',');
3003 if (sep)
3004 sep[0] = '\0';
3005 sep = strchr(swap, ' ');
3006 if (sep)
3007 sep[0] = '\0';
3008 tok_len = strlen(swap) + 5;
3009 memset(tok, ' ', tok_len);
3010 }
3011
3012 if ((tok = strstr(bb_copy, "type="))) {
3013 type = xstrdup(tok + 5);
3014 sep = strchr(type, ',');
3015 if (sep)
3016 sep[0] = '\0';
3017 sep = strchr(type, ' ');
3018 if (sep)
3019 sep[0] = '\0';
3020 tok_len = strlen(type) + 5;
3021 memset(tok, ' ', tok_len);
3022 }
3023
3024 if (rc == SLURM_SUCCESS) {
3025 /* Look for vestigial content. Treating this as an error would
3026 * prevent backward compatibility. Just log it for now. */
3027 for (i = 0; bb_copy[i]; i++) {
3028 if (isspace(bb_copy[i]))
3029 continue;
3030 verbose("%s: %s: Unrecognized --bb content: %s",
3031 plugin_type, __func__, bb_copy + i);
3032 // rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
3033 // goto fini;
3034 }
3035 }
3036
3037 if (rc == SLURM_SUCCESS)
3038 xfree(job_desc->burst_buffer);
3039 if ((rc == SLURM_SUCCESS) && (swap_cnt || buf_size)) {
3040 if (swap_cnt) {
3041 xstrfmtcat(job_desc->burst_buffer,
3042 "#DW swap %"PRIu64"GiB", swap_cnt);
3043 if (pool) {
3044 xstrfmtcat(job_desc->burst_buffer,
3045 " pool=%s", pool);
3046 }
3047 }
3048 if (buf_size) {
3049 if (job_desc->burst_buffer)
3050 xstrfmtcat(job_desc->burst_buffer, "\n");
3051 xstrfmtcat(job_desc->burst_buffer,
3052 "#DW jobdw capacity=%s",
3053 bb_get_size_str(buf_size));
3054 if (access) {
3055 xstrfmtcat(job_desc->burst_buffer,
3056 " access_mode=%s", access);
3057 }
3058 if (pool) {
3059 xstrfmtcat(job_desc->burst_buffer,
3060 " pool=%s", pool);
3061 }
3062 if (type) {
3063 xstrfmtcat(job_desc->burst_buffer,
3064 " type=%s", type);
3065 }
3066 }
3067 }
3068
3069 fini: xfree(access);
3070 xfree(bb_copy);
3071 xfree(capacity);
3072 xfree(pool);
3073 xfree(swap);
3074 xfree(type);
3075 return rc;
3076 }
3077
3078 /* Insert the contents of "burst_buffer_file" into "script_body" */
_add_bb_to_script(char ** script_body,const char * burst_buffer_file)3079 static void _add_bb_to_script(char **script_body,
3080 const char *burst_buffer_file)
3081 {
3082 char *orig_script = *script_body;
3083 char *new_script, *sep, save_char;
3084 char *bb_opt = NULL;
3085 int i;
3086
3087 if (!burst_buffer_file || (burst_buffer_file[0] == '\0'))
3088 return; /* No burst buffer file or empty file */
3089
3090 if (!orig_script) {
3091 *script_body = xstrdup(burst_buffer_file);
3092 return;
3093 }
3094
3095 bb_opt = xstrdup(burst_buffer_file);
3096 i = strlen(bb_opt) - 1;
3097 if (bb_opt[i] != '\n') /* Append new line as needed */
3098 xstrcat(bb_opt, "\n");
3099
3100 if (orig_script[0] != '#') {
3101 /* Prepend burst buffer file */
3102 new_script = xstrdup(bb_opt);
3103 xstrcat(new_script, orig_script);
3104 xfree(*script_body);
3105 *script_body = new_script;
3106 xfree(bb_opt);
3107 return;
3108 }
3109
3110 sep = strchr(orig_script, '\n');
3111 if (sep) {
3112 save_char = sep[1];
3113 sep[1] = '\0';
3114 new_script = xstrdup(orig_script);
3115 xstrcat(new_script, bb_opt);
3116 sep[1] = save_char;
3117 xstrcat(new_script, sep + 1);
3118 xfree(*script_body);
3119 *script_body = new_script;
3120 xfree(bb_opt);
3121 return;
3122 } else {
3123 new_script = xstrdup(orig_script);
3124 xstrcat(new_script, "\n");
3125 xstrcat(new_script, bb_opt);
3126 xfree(*script_body);
3127 *script_body = new_script;
3128 xfree(bb_opt);
3129 return;
3130 }
3131 }
3132
3133 /* For interactive jobs, build a script containing the relevant DataWarp
3134 * commands, as needed by the Cray API */
_build_bb_script(job_record_t * job_ptr,char * script_file)3135 static int _build_bb_script(job_record_t *job_ptr, char *script_file)
3136 {
3137 char *out_buf = NULL;
3138 int rc;
3139
3140 xstrcat(out_buf, "#!/bin/bash\n");
3141 xstrcat(out_buf, job_ptr->burst_buffer);
3142 rc = _write_file(script_file, out_buf);
3143 xfree(out_buf);
3144
3145 return rc;
3146 }
3147
3148 /*
3149 * init() is called when the plugin is loaded, before any other functions
3150 * are called. Read and validate configuration file here. Spawn thread to
3151 * periodically read Datawarp state.
3152 */
init(void)3153 extern int init(void)
3154 {
3155 slurm_mutex_init(&bb_state.bb_mutex);
3156 slurm_mutex_lock(&bb_state.bb_mutex);
3157 bb_load_config(&bb_state, (char *)plugin_type); /* Removes "const" */
3158 _test_config();
3159 if (bb_state.bb_config.debug_flag)
3160 info("%s: %s", plugin_type, __func__);
3161 if (!state_save_loc)
3162 state_save_loc = slurm_get_state_save_location();
3163 bb_alloc_cache(&bb_state);
3164 run_command_init();
3165 slurm_thread_create(&bb_state.bb_thread, _bb_agent, NULL);
3166 slurm_mutex_unlock(&bb_state.bb_mutex);
3167
3168 return SLURM_SUCCESS;
3169 }
3170
3171 /*
3172 * fini() is called when the plugin is unloaded. Free all memory and shutdown
3173 * threads.
3174 */
fini(void)3175 extern int fini(void)
3176 {
3177 int pc, last_pc = 0;
3178
3179 run_command_shutdown();
3180 while ((pc = run_command_count()) > 0) {
3181 if ((last_pc != 0) && (last_pc != pc)) {
3182 info("%s: waiting for %d running processes",
3183 plugin_type, pc);
3184 }
3185 last_pc = pc;
3186 usleep(100000);
3187 }
3188
3189 slurm_mutex_lock(&bb_state.bb_mutex);
3190 if (bb_state.bb_config.debug_flag)
3191 info("%s: %s", plugin_type, __func__);
3192
3193 slurm_mutex_lock(&bb_state.term_mutex);
3194 bb_state.term_flag = true;
3195 slurm_cond_signal(&bb_state.term_cond);
3196 slurm_mutex_unlock(&bb_state.term_mutex);
3197
3198 if (bb_state.bb_thread) {
3199 slurm_mutex_unlock(&bb_state.bb_mutex);
3200 pthread_join(bb_state.bb_thread, NULL);
3201 slurm_mutex_lock(&bb_state.bb_mutex);
3202 bb_state.bb_thread = 0;
3203 }
3204 bb_clear_config(&bb_state.bb_config, true);
3205 bb_clear_cache(&bb_state);
3206 xfree(state_save_loc);
3207 slurm_mutex_unlock(&bb_state.bb_mutex);
3208
3209 return SLURM_SUCCESS;
3210 }
3211
3212 /* Identify and purge any vestigial buffers (i.e. we have a job buffer, but
3213 * the matching job is either gone or completed OR we have a job buffer and a
3214 * pending job, but don't know the status of stage-in) */
_purge_vestigial_bufs(void)3215 static void _purge_vestigial_bufs(void)
3216 {
3217 bb_alloc_t *bb_alloc = NULL;
3218 time_t defer_time = time(NULL) + 60;
3219 int i;
3220
3221 for (i = 0; i < BB_HASH_SIZE; i++) {
3222 bb_alloc = bb_state.bb_ahash[i];
3223 while (bb_alloc) {
3224 job_record_t *job_ptr = NULL;
3225 if (bb_alloc->job_id)
3226 job_ptr = find_job_record(bb_alloc->job_id);
3227 if (bb_alloc->job_id == 0) {
3228 /* Persistent buffer, do not purge */
3229 } else if (!job_ptr) {
3230 info("%s: Purging vestigial buffer for JobId=%u",
3231 plugin_type, bb_alloc->job_id);
3232 _queue_teardown(bb_alloc->job_id,
3233 bb_alloc->user_id, false);
3234 } else if (!IS_JOB_STARTED(job_ptr)) {
3235 /* We do not know the state of file staging,
3236 * so teardown the buffer and defer the job
3237 * for at least 60 seconds (for the teardown) */
3238 debug("%s: Purging buffer for pending JobId=%u",
3239 plugin_type, bb_alloc->job_id);
3240 _queue_teardown(bb_alloc->job_id,
3241 bb_alloc->user_id, true);
3242 if (job_ptr->details &&
3243 (job_ptr->details->begin_time <defer_time)){
3244 job_ptr->details->begin_time =
3245 defer_time;
3246 }
3247 }
3248 bb_alloc = bb_alloc->next;
3249 }
3250 }
3251 }
3252
3253 /*
3254 * Return the total burst buffer size in MB
3255 */
bb_p_get_system_size(void)3256 extern uint64_t bb_p_get_system_size(void)
3257 {
3258 uint64_t size = 0;
3259
3260 slurm_mutex_lock(&bb_state.bb_mutex);
3261 size = bb_state.total_space / (1024 * 1024); /* bytes to MB */
3262 slurm_mutex_unlock(&bb_state.bb_mutex);
3263 return size;
3264 }
3265
3266 /*
3267 * Load the current burst buffer state (e.g. how much space is available now).
3268 * Run at the beginning of each scheduling cycle in order to recognize external
3269 * changes to the burst buffer state (e.g. capacity is added, removed, fails,
3270 * etc.)
3271 *
3272 * init_config IN - true if called as part of slurmctld initialization
3273 * Returns a Slurm errno.
3274 */
bb_p_load_state(bool init_config)3275 extern int bb_p_load_state(bool init_config)
3276 {
3277 if (!init_config)
3278 return SLURM_SUCCESS;
3279
3280 /* In practice the Cray APIs are too slow to run inline on each
3281 * scheduling cycle. Do so on a periodic basis from _bb_agent(). */
3282 if (bb_state.bb_config.debug_flag)
3283 debug("%s: %s", plugin_type, __func__);
3284 _load_state(init_config); /* Has own locking */
3285 slurm_mutex_lock(&bb_state.bb_mutex);
3286 bb_set_tres_pos(&bb_state);
3287 _purge_vestigial_bufs();
3288 slurm_mutex_unlock(&bb_state.bb_mutex);
3289
3290 _save_bb_state(); /* Has own locks excluding file write */
3291
3292 return SLURM_SUCCESS;
3293 }
3294
3295 /*
3296 * Return string containing current burst buffer status
3297 * argc IN - count of status command arguments
3298 * argv IN - status command arguments
3299 * RET status string, release memory using xfree()
3300 */
bb_p_get_status(uint32_t argc,char ** argv)3301 extern char *bb_p_get_status(uint32_t argc, char **argv)
3302 {
3303 char *status_resp, **script_argv;
3304 int i, status = 0;
3305
3306 script_argv = xcalloc((argc + 2), sizeof(char *));
3307 script_argv[0] = "dwstat";
3308 for (i = 0; i < argc; i++)
3309 script_argv[i + 1] = argv[i];
3310 status_resp = run_command("dwstat", bb_state.bb_config.get_sys_status,
3311 script_argv, 2000, 0, &status);
3312 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
3313 xfree(status_resp);
3314 status_resp = xstrdup("Error running dwstat\n");
3315 }
3316 xfree(script_argv);
3317
3318 return status_resp;
3319 }
3320
3321 /*
3322 * Note configuration may have changed. Handle changes in BurstBufferParameters.
3323 *
3324 * Returns a Slurm errno.
3325 */
bb_p_reconfig(void)3326 extern int bb_p_reconfig(void)
3327 {
3328 char *old_default_pool;
3329 int i;
3330
3331 slurm_mutex_lock(&bb_state.bb_mutex);
3332 if (bb_state.bb_config.debug_flag)
3333 info("%s: %s", plugin_type, __func__);
3334 old_default_pool = bb_state.bb_config.default_pool;
3335 bb_state.bb_config.default_pool = NULL;
3336 bb_load_config(&bb_state, (char *)plugin_type); /* Remove "const" */
3337 if (!bb_state.bb_config.default_pool)
3338 bb_state.bb_config.default_pool = old_default_pool;
3339 else
3340 xfree(old_default_pool);
3341 _test_config();
3342 slurm_mutex_unlock(&bb_state.bb_mutex);
3343
3344 /* reconfig is the place we make sure the pointers are correct */
3345 for (i = 0; i < BB_HASH_SIZE; i++) {
3346 bb_alloc_t *bb_alloc = bb_state.bb_ahash[i];
3347 while (bb_alloc) {
3348 _set_assoc_mgr_ptrs(bb_alloc);
3349 bb_alloc = bb_alloc->next;
3350 }
3351 }
3352
3353 return SLURM_SUCCESS;
3354 }
3355
3356 /*
3357 * Pack current burst buffer state information for network transmission to
3358 * user (e.g. "scontrol show burst")
3359 *
3360 * Returns a Slurm errno.
3361 */
bb_p_state_pack(uid_t uid,Buf buffer,uint16_t protocol_version)3362 extern int bb_p_state_pack(uid_t uid, Buf buffer, uint16_t protocol_version)
3363 {
3364 uint32_t rec_count = 0;
3365
3366 slurm_mutex_lock(&bb_state.bb_mutex);
3367 packstr(bb_state.name, buffer);
3368 bb_pack_state(&bb_state, buffer, protocol_version);
3369
3370 if (((bb_state.bb_config.flags & BB_FLAG_PRIVATE_DATA) == 0) ||
3371 validate_operator(uid))
3372 uid = 0; /* User can see all data */
3373 rec_count = bb_pack_bufs(uid, &bb_state, buffer, protocol_version);
3374 (void) bb_pack_usage(uid, &bb_state, buffer, protocol_version);
3375 if (bb_state.bb_config.debug_flag) {
3376 debug("%s: %s: record_count:%u",
3377 plugin_type, __func__, rec_count);
3378 }
3379 slurm_mutex_unlock(&bb_state.bb_mutex);
3380
3381 return SLURM_SUCCESS;
3382 }
3383
3384 /*
3385 * Preliminary validation of a job submit request with respect to burst buffer
3386 * options. Performed after setting default account + qos, but prior to
3387 * establishing job ID or creating script file.
3388 *
3389 * Returns a Slurm errno.
3390 */
bb_p_job_validate(job_desc_msg_t * job_desc,uid_t submit_uid)3391 extern int bb_p_job_validate(job_desc_msg_t *job_desc, uid_t submit_uid)
3392 {
3393 uint64_t bb_size = 0;
3394 int i, rc;
3395
3396 xassert(job_desc);
3397 xassert(job_desc->tres_req_cnt);
3398
3399 rc = _parse_bb_opts(job_desc, &bb_size, submit_uid);
3400 if (rc != SLURM_SUCCESS)
3401 return rc;
3402
3403 if ((job_desc->burst_buffer == NULL) ||
3404 (job_desc->burst_buffer[0] == '\0'))
3405 return rc;
3406
3407 if (bb_state.bb_config.debug_flag) {
3408 info("%s: %s: job_user_id:%u, submit_uid:%d",
3409 plugin_type, __func__, job_desc->user_id, submit_uid);
3410 info("%s: %s: burst_buffer:%s",
3411 plugin_type,__func__, job_desc->burst_buffer);
3412 }
3413
3414 if (job_desc->user_id == 0) {
3415 info("%s: %s: User root can not allocate burst buffers",
3416 plugin_type, __func__);
3417 return ESLURM_BURST_BUFFER_PERMISSION;
3418 }
3419
3420 slurm_mutex_lock(&bb_state.bb_mutex);
3421 if (bb_state.bb_config.allow_users) {
3422 bool found_user = false;
3423 for (i = 0; bb_state.bb_config.allow_users[i]; i++) {
3424 if (job_desc->user_id ==
3425 bb_state.bb_config.allow_users[i]) {
3426 found_user = true;
3427 break;
3428 }
3429 }
3430 if (!found_user) {
3431 rc = ESLURM_BURST_BUFFER_PERMISSION;
3432 goto fini;
3433 }
3434 }
3435
3436 if (bb_state.bb_config.deny_users) {
3437 bool found_user = false;
3438 for (i = 0; bb_state.bb_config.deny_users[i]; i++) {
3439 if (job_desc->user_id ==
3440 bb_state.bb_config.deny_users[i]) {
3441 found_user = true;
3442 break;
3443 }
3444 }
3445 if (found_user) {
3446 rc = ESLURM_BURST_BUFFER_PERMISSION;
3447 goto fini;
3448 }
3449 }
3450
3451 if (bb_state.tres_pos > 0) {
3452 job_desc->tres_req_cnt[bb_state.tres_pos] =
3453 bb_size / (1024 * 1024);
3454 }
3455
3456 fini: slurm_mutex_unlock(&bb_state.bb_mutex);
3457
3458 return rc;
3459 }
3460
3461 /* Add key=value pairs from "resp_msg" to the job's environment */
_update_job_env(job_record_t * job_ptr,char * file_path)3462 static void _update_job_env(job_record_t *job_ptr, char *file_path)
3463 {
3464 struct stat stat_buf;
3465 char *data_buf = NULL, *start, *sep;
3466 int path_fd, i, inx = 0, env_cnt = 0;
3467 ssize_t read_size;
3468
3469 /* Read the DataWarp generated environment variable file */
3470 path_fd = open(file_path, 0);
3471 if (path_fd == -1) {
3472 error("%s: %s: open error on file %s: %m",
3473 plugin_type, __func__, file_path);
3474 return;
3475 }
3476 fd_set_close_on_exec(path_fd);
3477 if (fstat(path_fd, &stat_buf) == -1) {
3478 error("%s: %s: stat error on file %s: %m",
3479 plugin_type, __func__, file_path);
3480 stat_buf.st_size = 2048;
3481 } else if (stat_buf.st_size == 0)
3482 goto fini;
3483 data_buf = xmalloc_nz(stat_buf.st_size + 1);
3484 while (inx < stat_buf.st_size) {
3485 read_size = read(path_fd, data_buf + inx, stat_buf.st_size);
3486 if (read_size < 0)
3487 data_buf[inx] = '\0';
3488 else
3489 data_buf[inx + read_size] = '\0';
3490 if (read_size > 0) {
3491 inx += read_size;
3492 } else if (read_size == 0) { /* EOF */
3493 break;
3494 } else if (read_size < 0) { /* error */
3495 if ((errno == EAGAIN) || (errno == EINTR))
3496 continue;
3497 error("%s: %s: read error on file %s: %m",
3498 plugin_type, __func__, file_path);
3499 break;
3500 }
3501 }
3502 if (bb_state.bb_config.debug_flag)
3503 info("%s: %s: %s", plugin_type, __func__, data_buf);
3504
3505 /* Get count of environment variables in the file */
3506 env_cnt = 0;
3507 if (data_buf) {
3508 for (i = 0; data_buf[i]; i++) {
3509 if (data_buf[i] == '=')
3510 env_cnt++;
3511 }
3512 }
3513
3514 /* Add to supplemental environment variables (in job record) */
3515 if (env_cnt) {
3516 job_ptr->details->env_sup =
3517 xrealloc(job_ptr->details->env_sup,
3518 sizeof(char *) *
3519 (job_ptr->details->env_cnt + env_cnt));
3520 start = data_buf;
3521 for (i = 0; (i < env_cnt) && start[0]; i++) {
3522 sep = strchr(start, '\n');
3523 if (sep)
3524 sep[0] = '\0';
3525 job_ptr->details->env_sup[job_ptr->details->env_cnt++] =
3526 xstrdup(start);
3527 if (sep)
3528 start = sep + 1;
3529 else
3530 break;
3531 }
3532 }
3533
3534 fini: xfree(data_buf);
3535 close(path_fd);
3536 }
3537
3538 /* Return true if #DW options (excludes #BB options) */
_have_dw_cmd_opts(bb_job_t * bb_job)3539 static bool _have_dw_cmd_opts(bb_job_t *bb_job)
3540 {
3541 int i;
3542 bb_buf_t *bb_buf;
3543
3544 xassert(bb_job);
3545 if (bb_job->total_size)
3546 return true;
3547
3548 for (i = 0, bb_buf = bb_job->buf_ptr; i < bb_job->buf_cnt;
3549 i++, bb_buf++) {
3550 if (bb_buf->use)
3551 return true;
3552 }
3553
3554 return false;
3555 }
3556
3557 /*
3558 * Secondary validation of a job submit request with respect to burst buffer
3559 * options. Performed after establishing job ID and creating script file.
3560 *
3561 * NOTE: We run several DW APIs at job submit time so that we can notify the
3562 * user immediately if there is some error, although that can be a relatively
3563 * slow operation. We have a timeout of 3 seconds on the DW APIs here and log
3564 * any times over 0.2 seconds.
3565 *
3566 * NOTE: We do this work inline so the user can be notified immediately if
3567 * there is some problem with their script.
3568 *
3569 * Returns a Slurm errno.
3570 */
bb_p_job_validate2(job_record_t * job_ptr,char ** err_msg)3571 extern int bb_p_job_validate2(job_record_t *job_ptr, char **err_msg)
3572 {
3573 char *hash_dir = NULL, *job_dir = NULL, *script_file = NULL;
3574 char *task_script_file = NULL;
3575 char *resp_msg = NULL, **script_argv;
3576 char *dw_cli_path;
3577 int fd = -1, hash_inx, rc = SLURM_SUCCESS, status = 0;
3578 bb_job_t *bb_job;
3579 uint32_t timeout;
3580 bool using_master_script = false;
3581 DEF_TIMERS;
3582
3583 if ((job_ptr->burst_buffer == NULL) ||
3584 (job_ptr->burst_buffer[0] == '\0')) {
3585 if (job_ptr->details->min_nodes == 0)
3586 rc = ESLURM_INVALID_NODE_COUNT;
3587 return rc;
3588 }
3589
3590 /* Initialization */
3591 slurm_mutex_lock(&bb_state.bb_mutex);
3592 if (bb_state.last_load_time == 0) {
3593 /* Assume request is valid for now, can't test it anyway */
3594 info("%s: %s: Burst buffer down, skip tests for %pJ",
3595 plugin_type, __func__, job_ptr);
3596 slurm_mutex_unlock(&bb_state.bb_mutex);
3597 return rc;
3598 }
3599 bb_job = _get_bb_job(job_ptr);
3600 if (bb_job == NULL) {
3601 slurm_mutex_unlock(&bb_state.bb_mutex);
3602 if (job_ptr->details->min_nodes == 0)
3603 rc = ESLURM_INVALID_NODE_COUNT;
3604 return rc;
3605 }
3606 if ((job_ptr->details->min_nodes == 0) && bb_job->use_job_buf) {
3607 slurm_mutex_unlock(&bb_state.bb_mutex);
3608 return ESLURM_INVALID_BURST_BUFFER_REQUEST;
3609 }
3610
3611 if (!_have_dw_cmd_opts(bb_job)) {
3612 slurm_mutex_unlock(&bb_state.bb_mutex);
3613 return rc;
3614 }
3615
3616 if (bb_state.bb_config.debug_flag)
3617 info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
3618
3619 timeout = bb_state.bb_config.validate_timeout * 1000;
3620 dw_cli_path = xstrdup(bb_state.bb_config.get_sys_state);
3621 slurm_mutex_unlock(&bb_state.bb_mutex);
3622
3623 /* Standard file location for job arrays */
3624 if ((job_ptr->array_task_id != NO_VAL) &&
3625 (job_ptr->array_job_id != job_ptr->job_id)) {
3626 hash_inx = job_ptr->array_job_id % 10;
3627 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
3628 (void) mkdir(hash_dir, 0700);
3629 xstrfmtcat(job_dir, "%s/job.%u", hash_dir,
3630 job_ptr->array_job_id);
3631 (void) mkdir(job_dir, 0700);
3632 xstrfmtcat(script_file, "%s/script", job_dir);
3633 fd = open(script_file, 0);
3634 if (fd >= 0) { /* found the script */
3635 close(fd);
3636 using_master_script = true;
3637 } else {
3638 xfree(hash_dir);
3639 }
3640 } else {
3641 hash_inx = job_ptr->job_id % 10;
3642 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
3643 (void) mkdir(hash_dir, 0700);
3644 xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
3645 (void) mkdir(job_dir, 0700);
3646 xstrfmtcat(script_file, "%s/script", job_dir);
3647 if (job_ptr->batch_flag == 0)
3648 rc = _build_bb_script(job_ptr, script_file);
3649 }
3650
3651 /* Run "job_process" function, validates user script */
3652 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
3653 script_argv[0] = xstrdup("dw_wlm_cli");
3654 script_argv[1] = xstrdup("--function");
3655 script_argv[2] = xstrdup("job_process");
3656 script_argv[3] = xstrdup("--job");
3657 xstrfmtcat(script_argv[4], "%s", script_file);
3658 START_TIMER;
3659 resp_msg = run_command("job_process",
3660 bb_state.bb_config.get_sys_state,
3661 script_argv, timeout, 0, &status);
3662 END_TIMER;
3663 if ((DELTA_TIMER > 200000) || /* 0.2 secs */
3664 bb_state.bb_config.debug_flag)
3665 info("%s: %s: job_process ran for %s",
3666 plugin_type, __func__, TIME_STR);
3667 _log_script_argv(script_argv, resp_msg);
3668 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
3669 error("%s: %s: job_process for %pJ status:%u response:%s",
3670 plugin_type, __func__, job_ptr, status, resp_msg);
3671 if (err_msg) {
3672 xfree(*err_msg);
3673 xstrfmtcat(*err_msg, "%s: %s", plugin_type, resp_msg);
3674 }
3675 rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
3676 }
3677 xfree(resp_msg);
3678 free_command_argv(script_argv);
3679
3680 /* Clean-up */
3681 xfree(hash_dir);
3682 xfree(job_dir);
3683 xfree(dw_cli_path);
3684 if (rc != SLURM_SUCCESS) {
3685 slurm_mutex_lock(&bb_state.bb_mutex);
3686 bb_job_del(&bb_state, job_ptr->job_id);
3687 slurm_mutex_unlock(&bb_state.bb_mutex);
3688 } else if (using_master_script) {
3689 /* Job array's need to have script file in the "standard"
3690 * location for the remaining logic, make hard link */
3691 hash_inx = job_ptr->job_id % 10;
3692 xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
3693 (void) mkdir(hash_dir, 0700);
3694 xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
3695 xfree(hash_dir);
3696 (void) mkdir(job_dir, 0700);
3697 xstrfmtcat(task_script_file, "%s/script", job_dir);
3698 xfree(job_dir);
3699 if ((link(script_file, task_script_file) != 0) &&
3700 (errno != EEXIST)) {
3701 error("%s: %s: link(%s,%s): %m",
3702 plugin_type, __func__, script_file,
3703 task_script_file);
3704 }
3705 }
3706 xfree(task_script_file);
3707 xfree(script_file);
3708
3709 return rc;
3710 }
3711
_json_parse_real_size(json_object * j)3712 static struct bb_total_size *_json_parse_real_size(json_object *j)
3713 {
3714 enum json_type type;
3715 struct json_object_iter iter;
3716 struct bb_total_size *bb_tot_sz;
3717 const char *p;
3718
3719 bb_tot_sz = xmalloc(sizeof(struct bb_total_size));
3720 json_object_object_foreachC(j, iter) {
3721 type = json_object_get_type(iter.val);
3722 switch (type) {
3723 case json_type_string:
3724 if (!xstrcmp(iter.key, "units")) {
3725 p = json_object_get_string(iter.val);
3726 if (!xstrcmp(p, "bytes")) {
3727 bb_tot_sz->units =
3728 BB_UNITS_BYTES;
3729 }
3730 }
3731 break;
3732 case json_type_int:
3733 if (!xstrcmp(iter.key, "capacity")) {
3734 bb_tot_sz->capacity =
3735 json_object_get_int64(iter.val);
3736 }
3737 break;
3738 default:
3739 break;
3740 }
3741 }
3742
3743 return bb_tot_sz;
3744 }
3745
3746 /*
3747 * Fill in the tres_cnt (in MB) based off the job record
3748 * NOTE: Based upon job-specific burst buffers, excludes persistent buffers
3749 * IN job_ptr - job record
3750 * IN/OUT tres_cnt - fill in this already allocated array with tres_cnts
3751 * IN locked - if the assoc_mgr tres read locked is locked or not
3752 */
bb_p_job_set_tres_cnt(job_record_t * job_ptr,uint64_t * tres_cnt,bool locked)3753 extern void bb_p_job_set_tres_cnt(job_record_t *job_ptr, uint64_t *tres_cnt,
3754 bool locked)
3755 {
3756 bb_job_t *bb_job;
3757
3758 if (!tres_cnt) {
3759 error("%s: %s: No tres_cnt given when looking at %pJ",
3760 plugin_type, __func__, job_ptr);
3761 }
3762
3763 if (bb_state.tres_pos < 0) {
3764 /* BB not defined in AccountingStorageTRES */
3765 return;
3766 }
3767
3768 slurm_mutex_lock(&bb_state.bb_mutex);
3769 if ((bb_job = _get_bb_job(job_ptr))) {
3770 tres_cnt[bb_state.tres_pos] =
3771 bb_job->total_size / (1024 * 1024);
3772 }
3773 slurm_mutex_unlock(&bb_state.bb_mutex);
3774 }
3775
3776 /*
3777 * For a given job, return our best guess if when it might be able to start
3778 */
bb_p_job_get_est_start(job_record_t * job_ptr)3779 extern time_t bb_p_job_get_est_start(job_record_t *job_ptr)
3780 {
3781 time_t est_start = time(NULL);
3782 bb_job_t *bb_job;
3783 int rc;
3784
3785 if ((job_ptr->burst_buffer == NULL) ||
3786 (job_ptr->burst_buffer[0] == '\0'))
3787 return est_start;
3788
3789 if (job_ptr->array_recs &&
3790 ((job_ptr->array_task_id == NO_VAL) ||
3791 (job_ptr->array_task_id == INFINITE))) {
3792 est_start += 300; /* 5 minutes, guess... */
3793 return est_start; /* Can't operate on job array struct */
3794 }
3795
3796 slurm_mutex_lock(&bb_state.bb_mutex);
3797 if (bb_state.last_load_time == 0) {
3798 est_start += 3600; /* 1 hour, guess... */
3799 slurm_mutex_unlock(&bb_state.bb_mutex);
3800 return est_start; /* Can't operate on job array struct */
3801 }
3802
3803 if ((bb_job = _get_bb_job(job_ptr)) == NULL) {
3804 slurm_mutex_unlock(&bb_state.bb_mutex);
3805 return est_start;
3806 }
3807
3808 if (bb_state.bb_config.debug_flag)
3809 info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
3810
3811 if ((bb_job->persist_add == 0) && (bb_job->swap_size == 0) &&
3812 (bb_job->total_size == 0)) {
3813 /* Only deleting or using persistent buffers */
3814 if (!_test_persistent_use_ready(bb_job, job_ptr))
3815 est_start += 60 * 60; /* one hour, guess... */
3816 } else if (bb_job->state == BB_STATE_PENDING) {
3817 rc = _test_size_limit(job_ptr, bb_job);
3818 if (rc == 0) { /* Could start now */
3819 ;
3820 } else if (rc == 1) { /* Exceeds configured limits */
3821 est_start += 365 * 24 * 60 * 60;
3822 } else { /* No space currently available */
3823 est_start = MAX(est_start, bb_state.next_end_time);
3824 }
3825 } else { /* Allocation or staging in progress */
3826 est_start++;
3827 }
3828 slurm_mutex_unlock(&bb_state.bb_mutex);
3829
3830 return est_start;
3831 }
3832
3833 /*
3834 * Attempt to allocate resources and begin file staging for pending jobs.
3835 */
bb_p_job_try_stage_in(List job_queue)3836 extern int bb_p_job_try_stage_in(List job_queue)
3837 {
3838 bb_job_queue_rec_t *job_rec;
3839 List job_candidates;
3840 ListIterator job_iter;
3841 job_record_t *job_ptr;
3842 bb_job_t *bb_job;
3843 int rc;
3844
3845 slurm_mutex_lock(&bb_state.bb_mutex);
3846 if (bb_state.bb_config.debug_flag)
3847 info("%s: %s: Mutex locked", plugin_type, __func__);
3848
3849 if (bb_state.last_load_time == 0) {
3850 slurm_mutex_unlock(&bb_state.bb_mutex);
3851 return SLURM_SUCCESS;
3852 }
3853
3854 /* Identify candidates to be allocated burst buffers */
3855 job_candidates = list_create(_job_queue_del);
3856 job_iter = list_iterator_create(job_queue);
3857 while ((job_ptr = list_next(job_iter))) {
3858 if (!IS_JOB_PENDING(job_ptr) ||
3859 (job_ptr->start_time == 0) ||
3860 (job_ptr->burst_buffer == NULL) ||
3861 (job_ptr->burst_buffer[0] == '\0'))
3862 continue;
3863 if (job_ptr->array_recs &&
3864 ((job_ptr->array_task_id == NO_VAL) ||
3865 (job_ptr->array_task_id == INFINITE)))
3866 continue; /* Can't operate on job array struct */
3867 bb_job = _get_bb_job(job_ptr);
3868 if (bb_job == NULL)
3869 continue;
3870 if (bb_job->state == BB_STATE_COMPLETE)
3871 bb_job->state = BB_STATE_PENDING; /* job requeued */
3872 else if (bb_job->state >= BB_STATE_POST_RUN)
3873 continue; /* Requeued job still staging out */
3874 job_rec = xmalloc(sizeof(bb_job_queue_rec_t));
3875 job_rec->job_ptr = job_ptr;
3876 job_rec->bb_job = bb_job;
3877 list_push(job_candidates, job_rec);
3878 }
3879 list_iterator_destroy(job_iter);
3880
3881 /* Sort in order of expected start time */
3882 list_sort(job_candidates, bb_job_queue_sort);
3883
3884 bb_set_use_time(&bb_state);
3885 job_iter = list_iterator_create(job_candidates);
3886 while ((job_rec = list_next(job_iter))) {
3887 job_ptr = job_rec->job_ptr;
3888 bb_job = job_rec->bb_job;
3889 if (bb_job->state >= BB_STATE_STAGING_IN)
3890 continue; /* Job was already allocated a buffer */
3891
3892 rc = _test_size_limit(job_ptr, bb_job);
3893 if (rc == 0) /* Could start now */
3894 (void) _alloc_job_bb(job_ptr, bb_job, true);
3895 else if (rc == 1) /* Exceeds configured limits */
3896 continue;
3897 else /* No space currently available */
3898 break;
3899 }
3900 list_iterator_destroy(job_iter);
3901 slurm_mutex_unlock(&bb_state.bb_mutex);
3902 FREE_NULL_LIST(job_candidates);
3903
3904 return SLURM_SUCCESS;
3905 }
3906
3907 /*
3908 * Determine if a job's burst buffer stage-in is complete
3909 * job_ptr IN - Job to test
3910 * test_only IN - If false, then attempt to allocate burst buffer if possible
3911 *
3912 * RET: 0 - stage-in is underway
3913 * 1 - stage-in complete
3914 * -1 - stage-in not started or burst buffer in some unexpected state
3915 */
bb_p_job_test_stage_in(job_record_t * job_ptr,bool test_only)3916 extern int bb_p_job_test_stage_in(job_record_t *job_ptr, bool test_only)
3917 {
3918 bb_job_t *bb_job = NULL;
3919 int rc = 1;
3920
3921 if ((job_ptr->burst_buffer == NULL) ||
3922 (job_ptr->burst_buffer[0] == '\0'))
3923 return 1;
3924
3925 if (job_ptr->array_recs &&
3926 ((job_ptr->array_task_id == NO_VAL) ||
3927 (job_ptr->array_task_id == INFINITE)))
3928 return -1; /* Can't operate on job array structure */
3929
3930 slurm_mutex_lock(&bb_state.bb_mutex);
3931 if (bb_state.bb_config.debug_flag) {
3932 info("%s: %s: %pJ test_only:%d",
3933 plugin_type, __func__, job_ptr, (int) test_only);
3934 }
3935 if (bb_state.last_load_time != 0)
3936 bb_job = _get_bb_job(job_ptr);
3937 if (bb_job && (bb_job->state == BB_STATE_COMPLETE))
3938 bb_job->state = BB_STATE_PENDING; /* job requeued */
3939 if (bb_job == NULL) {
3940 rc = -1;
3941 } else if (bb_job->state < BB_STATE_STAGING_IN) {
3942 /* Job buffer not allocated, create now if space available */
3943 rc = -1;
3944 if ((test_only == false) &&
3945 (_test_size_limit(job_ptr, bb_job) == 0) &&
3946 (_alloc_job_bb(job_ptr, bb_job, false) == SLURM_SUCCESS)) {
3947 rc = 0; /* Setup/stage-in in progress */
3948 }
3949 } else if (bb_job->state == BB_STATE_STAGING_IN) {
3950 rc = 0;
3951 } else if (bb_job->state == BB_STATE_STAGED_IN) {
3952 rc = 1;
3953 } else {
3954 rc = -1; /* Requeued job still staging in */
3955 }
3956
3957 slurm_mutex_unlock(&bb_state.bb_mutex);
3958
3959 return rc;
3960 }
3961
3962 /* Attempt to claim burst buffer resources.
3963 * At this time, bb_g_job_test_stage_in() should have been run successfully AND
3964 * the compute nodes selected for the job.
3965 *
3966 * Returns a Slurm errno.
3967 */
bb_p_job_begin(job_record_t * job_ptr)3968 extern int bb_p_job_begin(job_record_t *job_ptr)
3969 {
3970 char *client_nodes_file_nid = NULL, *exec_host_file = NULL;
3971 pre_run_args_t *pre_run_args;
3972 char **pre_run_argv = NULL, **script_argv = NULL;
3973 char *job_dir = NULL, *path_file, *resp_msg;
3974 int arg_inx, hash_inx, rc = SLURM_SUCCESS, status = 0;
3975 bb_job_t *bb_job;
3976 uint32_t timeout;
3977 bool do_pre_run, set_exec_host;
3978 DEF_TIMERS;
3979 pthread_t tid;
3980
3981 if ((job_ptr->burst_buffer == NULL) ||
3982 (job_ptr->burst_buffer[0] == '\0'))
3983 return SLURM_SUCCESS;
3984
3985 if (((!job_ptr->job_resrcs || !job_ptr->job_resrcs->nodes)) &&
3986 (job_ptr->details->min_nodes != 0)) {
3987 error("%s: %s: %pJ lacks node allocation",
3988 plugin_type, __func__, job_ptr);
3989 return SLURM_ERROR;
3990 }
3991
3992 slurm_mutex_lock(&bb_state.bb_mutex);
3993 if (bb_state.bb_config.debug_flag)
3994 info("%s: %s: %pJ",
3995 plugin_type, __func__, job_ptr);
3996
3997 if (bb_state.last_load_time == 0) {
3998 info("%s: %s: Burst buffer down, can not start %pJ",
3999 plugin_type, __func__, job_ptr);
4000 slurm_mutex_unlock(&bb_state.bb_mutex);
4001 return SLURM_ERROR;
4002 }
4003 bb_job = _get_bb_job(job_ptr);
4004 if (!bb_job) {
4005 error("%s: %s: no job record buffer for %pJ",
4006 plugin_type, __func__, job_ptr);
4007 xfree(job_ptr->state_desc);
4008 job_ptr->state_desc =
4009 xstrdup("Could not find burst buffer record");
4010 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4011 _queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4012 slurm_mutex_unlock(&bb_state.bb_mutex);
4013 return SLURM_ERROR;
4014 }
4015 do_pre_run = _have_dw_cmd_opts(bb_job);
4016
4017 /* Confirm that persistent burst buffers work has been completed */
4018 if ((_create_bufs(job_ptr, bb_job, true) > 0)) {
4019 xfree(job_ptr->state_desc);
4020 job_ptr->state_desc =
4021 xstrdup("Error managing persistent burst buffers");
4022 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4023 _queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4024 slurm_mutex_unlock(&bb_state.bb_mutex);
4025 return SLURM_ERROR;
4026 }
4027
4028 hash_inx = job_ptr->job_id % 10;
4029 xstrfmtcat(job_dir, "%s/hash.%d/job.%u", state_save_loc, hash_inx,
4030 job_ptr->job_id);
4031 xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir);
4032 if (do_pre_run)
4033 bb_job->state = BB_STATE_PRE_RUN;
4034 else
4035 bb_job->state = BB_STATE_RUNNING;
4036 if (bb_state.bb_config.flags & BB_FLAG_SET_EXEC_HOST)
4037 set_exec_host = true;
4038 else
4039 set_exec_host = false;
4040 slurm_mutex_unlock(&bb_state.bb_mutex);
4041
4042 if (job_ptr->job_resrcs && job_ptr->job_resrcs->nodes &&
4043 _write_nid_file(client_nodes_file_nid, job_ptr->job_resrcs->nodes,
4044 job_ptr)) {
4045 xfree(client_nodes_file_nid);
4046 }
4047 if (set_exec_host && !job_ptr->batch_host && job_ptr->alloc_node) {
4048 xstrfmtcat(exec_host_file, "%s/exec_host", job_dir);
4049 if (_write_nid_file(exec_host_file, job_ptr->alloc_node,
4050 job_ptr)) {
4051 xfree(exec_host_file);
4052 }
4053 }
4054
4055 /* Run "paths" function, get DataWarp environment variables */
4056 if (do_pre_run) {
4057 /* Setup "paths" operation */
4058 timeout = bb_state.bb_config.validate_timeout * 1000;
4059 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminate */
4060 script_argv[0] = xstrdup("dw_wlm_cli");
4061 script_argv[1] = xstrdup("--function");
4062 script_argv[2] = xstrdup("paths");
4063 script_argv[3] = xstrdup("--job");
4064 xstrfmtcat(script_argv[4], "%s/script", job_dir);
4065 script_argv[5] = xstrdup("--token");
4066 xstrfmtcat(script_argv[6], "%u", job_ptr->job_id);
4067 script_argv[7] = xstrdup("--pathfile");
4068 xstrfmtcat(script_argv[8], "%s/path", job_dir);
4069 path_file = script_argv[8];
4070 START_TIMER;
4071 resp_msg = run_command("paths",
4072 bb_state.bb_config.get_sys_state,
4073 script_argv, timeout, 0,
4074 &status);
4075 END_TIMER;
4076 if ((DELTA_TIMER > 200000) || /* 0.2 secs */
4077 bb_state.bb_config.debug_flag)
4078 info("%s: %s: paths ran for %s",
4079 plugin_type, __func__, TIME_STR);
4080 _log_script_argv(script_argv, resp_msg);
4081 #if 1
4082 //FIXME: Cray API returning "job_file_valid True" but exit 1 in some cases
4083 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
4084 (!resp_msg ||
4085 strncmp(resp_msg, "job_file_valid True", 19))) {
4086 #else
4087 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4088 #endif
4089 error("%s: %s: paths for %pJ status:%u response:%s",
4090 plugin_type, __func__, job_ptr, status, resp_msg);
4091 xfree(resp_msg);
4092 rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
4093 free_command_argv(script_argv);
4094 goto fini;
4095 } else {
4096 _update_job_env(job_ptr, path_file);
4097 xfree(resp_msg);
4098 }
4099 free_command_argv(script_argv);
4100
4101 /* Setup "pre_run" operation */
4102 pre_run_argv = xcalloc(12, sizeof(char *));
4103 pre_run_argv[0] = xstrdup("dw_wlm_cli");
4104 pre_run_argv[1] = xstrdup("--function");
4105 pre_run_argv[2] = xstrdup("pre_run");
4106 pre_run_argv[3] = xstrdup("--token");
4107 xstrfmtcat(pre_run_argv[4], "%u", job_ptr->job_id);
4108 pre_run_argv[5] = xstrdup("--job");
4109 xstrfmtcat(pre_run_argv[6], "%s/script", job_dir);
4110 arg_inx = 7;
4111 if (client_nodes_file_nid) {
4112 #if defined(HAVE_NATIVE_CRAY)
4113 pre_run_argv[arg_inx++] = xstrdup("--nidlistfile");
4114 #else
4115 pre_run_argv[arg_inx++] = xstrdup("--nodehostnamefile");
4116 #endif
4117 pre_run_argv[arg_inx++] =
4118 xstrdup(client_nodes_file_nid);
4119 }
4120 if (exec_host_file) {
4121 #if defined(HAVE_NATIVE_CRAY)
4122 pre_run_argv[arg_inx++] =
4123 xstrdup("--jobexecutionnodefilenids");
4124 #else
4125 pre_run_argv[arg_inx++] =
4126 xstrdup("--jobexecutionnodefile");
4127 #endif
4128 pre_run_argv[arg_inx++] =
4129 xstrdup(exec_host_file);
4130 }
4131 pre_run_args = xmalloc(sizeof(pre_run_args_t));
4132 pre_run_args->args = pre_run_argv;
4133 pre_run_args->job_id = job_ptr->job_id;
4134 pre_run_args->timeout = bb_state.bb_config.other_timeout * 1000;
4135 pre_run_args->user_id = job_ptr->user_id;
4136 if (job_ptr->details) { /* Defer launch until completion */
4137 job_ptr->details->prolog_running++;
4138 job_ptr->job_state |= JOB_CONFIGURING;
4139 }
4140
4141 slurm_thread_create(&tid, _start_pre_run, pre_run_args);
4142 }
4143
4144 fini:
4145 xfree(client_nodes_file_nid);
4146 xfree(exec_host_file);
4147 xfree(job_dir);
4148 return rc;
4149 }
4150
4151 /* Kill job from CONFIGURING state */
4152 static void _kill_job(job_record_t *job_ptr, bool hold_job)
4153 {
4154 last_job_update = time(NULL);
4155 job_ptr->end_time = last_job_update;
4156 if (hold_job)
4157 job_ptr->priority = 0;
4158 build_cg_bitmap(job_ptr);
4159 job_ptr->exit_code = 1;
4160 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4161 xfree(job_ptr->state_desc);
4162 job_ptr->state_desc = xstrdup("Burst buffer pre_run error");
4163
4164 job_ptr->job_state = JOB_REQUEUE;
4165 job_completion_logger(job_ptr, true);
4166 job_ptr->job_state = JOB_PENDING | JOB_COMPLETING;
4167
4168 deallocate_nodes(job_ptr, false, false, false);
4169 }
4170
4171 static void *_start_pre_run(void *x)
4172 {
4173 /* Locks: read job */
4174 slurmctld_lock_t job_read_lock = {
4175 NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
4176 /* Locks: write job */
4177 slurmctld_lock_t job_write_lock = {
4178 NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
4179 pre_run_args_t *pre_run_args = (pre_run_args_t *) x;
4180 char *resp_msg = NULL;
4181 bb_job_t *bb_job = NULL;
4182 int status = 0;
4183 job_record_t *job_ptr;
4184 bool run_kill_job = false;
4185 uint32_t timeout;
4186 bool hold_job = false, nodes_ready = false;
4187 DEF_TIMERS;
4188 track_script_rec_add(pre_run_args->job_id, 0, pthread_self());
4189
4190 /* Wait for node boot to complete */
4191 while (!nodes_ready) {
4192 lock_slurmctld(job_read_lock);
4193 job_ptr = find_job_record(pre_run_args->job_id);
4194 if (!job_ptr || IS_JOB_COMPLETED(job_ptr)) {
4195 unlock_slurmctld(job_read_lock);
4196 track_script_remove(pthread_self());
4197 return NULL;
4198 }
4199 if (test_job_nodes_ready(job_ptr))
4200 nodes_ready = true;
4201 unlock_slurmctld(job_read_lock);
4202 if (!nodes_ready)
4203 sleep(60);
4204 }
4205
4206 timeout = pre_run_args->timeout * 1000;
4207
4208 START_TIMER;
4209 resp_msg = run_command("dws_pre_run",
4210 bb_state.bb_config.get_sys_state,
4211 pre_run_args->args, timeout, pthread_self(),
4212 &status);
4213 END_TIMER;
4214
4215 if (track_script_broadcast(pthread_self(), status)) {
4216 /* I was killed by slurmtrack, bail out right now */
4217 info("%s: %s: dws_pre_run for JobId=%u terminated by slurmctld",
4218 plugin_type, __func__, pre_run_args->job_id);
4219 xfree(resp_msg);
4220 free_command_argv(pre_run_args->args);
4221 xfree(pre_run_args);
4222 track_script_remove(pthread_self());
4223 return NULL;
4224 }
4225 /* track_script_reset_cpid(pthread_self(), 0); */
4226
4227 lock_slurmctld(job_write_lock);
4228 slurm_mutex_lock(&bb_state.bb_mutex);
4229 job_ptr = find_job_record(pre_run_args->job_id);
4230 if ((DELTA_TIMER > 500000) || /* 0.5 secs */
4231 bb_state.bb_config.debug_flag) {
4232 info("%s: %s: dws_pre_run for %pJ ran for %s",
4233 plugin_type, __func__, job_ptr, TIME_STR);
4234 }
4235 if (job_ptr)
4236 bb_job = _get_bb_job(job_ptr);
4237 _log_script_argv(pre_run_args->args, resp_msg);
4238 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4239 /* Pre-run failure */
4240 trigger_burst_buffer();
4241 error("%s: %s: dws_pre_run for %pJ status:%u response:%s",
4242 plugin_type, __func__, job_ptr, status, resp_msg);
4243 if (job_ptr) {
4244 _update_system_comment(job_ptr, "pre_run", resp_msg, 0);
4245 if (IS_JOB_RUNNING(job_ptr))
4246 run_kill_job = true;
4247 if (bb_job) {
4248 bb_job->state = BB_STATE_TEARDOWN;
4249 if (bb_job->retry_cnt++ > MAX_RETRY_CNT)
4250 hold_job = true;
4251 }
4252 }
4253 _queue_teardown(pre_run_args->job_id, pre_run_args->user_id,
4254 true);
4255 } else if (bb_job) {
4256 /* Pre-run success and the job's BB record exists */
4257 if (bb_job->state == BB_STATE_ALLOC_REVOKE)
4258 bb_job->state = BB_STATE_STAGED_IN;
4259 else
4260 bb_job->state = BB_STATE_RUNNING;
4261 }
4262 if (job_ptr) {
4263 if (run_kill_job)
4264 job_ptr->job_state &= ~JOB_CONFIGURING;
4265 prolog_running_decr(job_ptr);
4266 }
4267 slurm_mutex_unlock(&bb_state.bb_mutex);
4268 if (run_kill_job) {
4269 /* bb_mutex must be unlocked before calling this */
4270 _kill_job(job_ptr, hold_job);
4271 }
4272 unlock_slurmctld(job_write_lock);
4273
4274 xfree(resp_msg);
4275 free_command_argv(pre_run_args->args);
4276 xfree(pre_run_args);
4277
4278 track_script_remove(pthread_self());
4279
4280 return NULL;
4281 }
4282
4283 /* Revoke allocation, but do not release resources.
4284 * Executed after bb_p_job_begin() if there was an allocation failure.
4285 * Does not release previously allocated resources.
4286 *
4287 * Returns a Slurm errno.
4288 */
4289 extern int bb_p_job_revoke_alloc(job_record_t *job_ptr)
4290 {
4291 bb_job_t *bb_job = NULL;
4292 int rc = SLURM_SUCCESS;
4293
4294 slurm_mutex_lock(&bb_state.bb_mutex);
4295 if (job_ptr)
4296 bb_job = _get_bb_job(job_ptr);
4297 if (bb_job) {
4298 if (bb_job->state == BB_STATE_RUNNING)
4299 bb_job->state = BB_STATE_STAGED_IN;
4300 else if (bb_job->state == BB_STATE_PRE_RUN)
4301 bb_job->state = BB_STATE_ALLOC_REVOKE;
4302 } else {
4303 rc = SLURM_ERROR;
4304 }
4305 slurm_mutex_unlock(&bb_state.bb_mutex);
4306
4307 return rc;
4308 }
4309
4310 /*
4311 * Trigger a job's burst buffer stage-out to begin
4312 *
4313 * Returns a Slurm errno.
4314 */
4315 extern int bb_p_job_start_stage_out(job_record_t *job_ptr)
4316 {
4317 bb_job_t *bb_job;
4318
4319 if ((job_ptr->burst_buffer == NULL) ||
4320 (job_ptr->burst_buffer[0] == '\0'))
4321 return SLURM_SUCCESS;
4322
4323 slurm_mutex_lock(&bb_state.bb_mutex);
4324 if (bb_state.bb_config.debug_flag)
4325 info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4326
4327 if (bb_state.last_load_time == 0) {
4328 info("%s: %s: Burst buffer down, can not stage out %pJ",
4329 plugin_type, __func__, job_ptr);
4330 slurm_mutex_unlock(&bb_state.bb_mutex);
4331 return SLURM_ERROR;
4332 }
4333 bb_job = _get_bb_job(job_ptr);
4334 if (!bb_job) {
4335 /* No job buffers. Assuming use of persistent buffers only */
4336 verbose("%s: %s: %pJ bb job record not found",
4337 plugin_type, __func__, job_ptr);
4338 } else if (bb_job->state < BB_STATE_RUNNING) {
4339 /* Job never started. Just teardown the buffer */
4340 bb_job->state = BB_STATE_TEARDOWN;
4341 _queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4342 } else if (bb_job->state < BB_STATE_POST_RUN) {
4343 bb_job->state = BB_STATE_POST_RUN;
4344 job_ptr->job_state |= JOB_STAGE_OUT;
4345 xfree(job_ptr->state_desc);
4346 xstrfmtcat(job_ptr->state_desc, "%s: Stage-out in progress",
4347 plugin_type);
4348 _queue_stage_out(job_ptr, bb_job);
4349 }
4350 slurm_mutex_unlock(&bb_state.bb_mutex);
4351
4352 return SLURM_SUCCESS;
4353 }
4354
4355 /*
4356 * Determine if a job's burst buffer post_run operation is complete
4357 *
4358 * RET: 0 - post_run is underway
4359 * 1 - post_run complete
4360 * -1 - fatal error
4361 */
4362 extern int bb_p_job_test_post_run(job_record_t *job_ptr)
4363 {
4364 bb_job_t *bb_job;
4365 int rc = -1;
4366
4367 if ((job_ptr->burst_buffer == NULL) ||
4368 (job_ptr->burst_buffer[0] == '\0'))
4369 return 1;
4370
4371 slurm_mutex_lock(&bb_state.bb_mutex);
4372 if (bb_state.bb_config.debug_flag)
4373 info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4374
4375 if (bb_state.last_load_time == 0) {
4376 info("%s: %s: Burst buffer down, can not post_run %pJ",
4377 plugin_type, __func__, job_ptr);
4378 slurm_mutex_unlock(&bb_state.bb_mutex);
4379 return -1;
4380 }
4381 bb_job = bb_job_find(&bb_state, job_ptr->job_id);
4382 if (!bb_job) {
4383 /* No job buffers. Assuming use of persistent buffers only */
4384 verbose("%s: %s: %pJ bb job record not found",
4385 plugin_type, __func__, job_ptr);
4386 rc = 1;
4387 } else {
4388 if (bb_job->state < BB_STATE_POST_RUN) {
4389 rc = -1;
4390 } else if (bb_job->state > BB_STATE_POST_RUN) {
4391 rc = 1;
4392 } else {
4393 rc = 0;
4394 }
4395 }
4396 slurm_mutex_unlock(&bb_state.bb_mutex);
4397
4398 return rc;
4399 }
4400
4401 /*
4402 * Determine if a job's burst buffer stage-out is complete
4403 *
4404 * RET: 0 - stage-out is underway
4405 * 1 - stage-out complete
4406 * -1 - fatal error
4407 */
4408 extern int bb_p_job_test_stage_out(job_record_t *job_ptr)
4409 {
4410 bb_job_t *bb_job;
4411 int rc = -1;
4412
4413 if ((job_ptr->burst_buffer == NULL) ||
4414 (job_ptr->burst_buffer[0] == '\0'))
4415 return 1;
4416
4417 slurm_mutex_lock(&bb_state.bb_mutex);
4418 if (bb_state.bb_config.debug_flag)
4419 info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4420
4421 if (bb_state.last_load_time == 0) {
4422 info("%s: %s: Burst buffer down, can not stage-out %pJ",
4423 plugin_type, __func__, job_ptr);
4424 slurm_mutex_unlock(&bb_state.bb_mutex);
4425 return -1;
4426 }
4427 bb_job = bb_job_find(&bb_state, job_ptr->job_id);
4428 if (!bb_job) {
4429 /* No job buffers. Assuming use of persistent buffers only */
4430 verbose("%s: %s: %pJ bb job record not found",
4431 plugin_type, __func__, job_ptr);
4432 rc = 1;
4433 } else {
4434 if (bb_job->state == BB_STATE_PENDING) {
4435 /*
4436 * No job BB work not started before job was killed.
4437 * Alternately slurmctld daemon restarted after the
4438 * job's BB work was completed.
4439 */
4440 rc = 1;
4441 } else if (bb_job->state < BB_STATE_POST_RUN) {
4442 rc = -1;
4443 } else if (bb_job->state > BB_STATE_STAGING_OUT) {
4444 rc = 1;
4445 } else {
4446 rc = 0;
4447 }
4448 }
4449 slurm_mutex_unlock(&bb_state.bb_mutex);
4450
4451 return rc;
4452 }
4453
4454 /*
4455 * Terminate any file staging and completely release burst buffer resources
4456 *
4457 * Returns a Slurm errno.
4458 */
4459 extern int bb_p_job_cancel(job_record_t *job_ptr)
4460 {
4461 bb_job_t *bb_job;
4462 bb_alloc_t *bb_alloc;
4463
4464 slurm_mutex_lock(&bb_state.bb_mutex);
4465 if (bb_state.bb_config.debug_flag)
4466 info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4467
4468 if (bb_state.last_load_time == 0) {
4469 info("%s: %s: Burst buffer down, can not cancel %pJ",
4470 plugin_type, __func__, job_ptr);
4471 slurm_mutex_unlock(&bb_state.bb_mutex);
4472 return SLURM_ERROR;
4473 }
4474
4475 bb_job = _get_bb_job(job_ptr);
4476 if (!bb_job) {
4477 /* Nothing ever allocated, nothing to clean up */
4478 } else if (bb_job->state == BB_STATE_PENDING) {
4479 bb_job->state = BB_STATE_COMPLETE; /* Nothing to clean up */
4480 } else {
4481 /* Note: Persistent burst buffer actions already completed
4482 * for the job are not reversed */
4483 bb_job->state = BB_STATE_TEARDOWN;
4484 bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
4485 if (bb_alloc) {
4486 bb_alloc->state = BB_STATE_TEARDOWN;
4487 bb_alloc->state_time = time(NULL);
4488 bb_state.last_update_time = time(NULL);
4489
4490 }
4491 _queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4492 }
4493 slurm_mutex_unlock(&bb_state.bb_mutex);
4494
4495 return SLURM_SUCCESS;
4496 }
4497
4498 static void _free_create_args(create_buf_data_t *create_args)
4499 {
4500 if (create_args) {
4501 xfree(create_args->access);
4502 xfree(create_args->job_script);
4503 xfree(create_args->name);
4504 xfree(create_args->pool);
4505 xfree(create_args->type);
4506 xfree(create_args);
4507 }
4508 }
4509
4510 /*
4511 * Create/destroy persistent burst buffers
4512 * job_ptr IN - job to operate upon
4513 * bb_job IN - job's burst buffer data
4514 * job_ready IN - if true, job is ready to run now, if false then do not
4515 * delete persistent buffers
4516 * Returns count of buffer create/destroy requests which are pending
4517 */
4518 static int _create_bufs(job_record_t *job_ptr, bb_job_t *bb_job,
4519 bool job_ready)
4520 {
4521 create_buf_data_t *create_args;
4522 bb_buf_t *buf_ptr;
4523 bb_alloc_t *bb_alloc;
4524 int i, hash_inx, rc = 0;
4525 pthread_t tid;
4526
4527 xassert(bb_job);
4528 for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4529 i++, buf_ptr++) {
4530 if ((buf_ptr->state == BB_STATE_ALLOCATING) ||
4531 (buf_ptr->state == BB_STATE_DELETING)) {
4532 rc++;
4533 } else if (buf_ptr->state != BB_STATE_PENDING) {
4534 ; /* Nothing to do */
4535 } else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
4536 buf_ptr->create) { /* Create the buffer */
4537 bb_alloc = bb_find_name_rec(buf_ptr->name,
4538 job_ptr->user_id,
4539 &bb_state);
4540 if (bb_alloc &&
4541 (bb_alloc->user_id != job_ptr->user_id)) {
4542 info("Attempt by %pJ user %u to create duplicate persistent burst buffer named %s and currently owned by user %u",
4543 job_ptr, job_ptr->user_id,
4544 buf_ptr->name, bb_alloc->user_id);
4545 job_ptr->priority = 0;
4546 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4547 xfree(job_ptr->state_desc);
4548 job_ptr->state_desc = xstrdup(
4549 "Burst buffer create_persistent error");
4550 buf_ptr->state = BB_STATE_COMPLETE;
4551 _update_system_comment(job_ptr,
4552 "create_persistent",
4553 "Duplicate buffer name",
4554 0);
4555 rc++;
4556 break;
4557 } else if (bb_alloc) {
4558 /* Duplicate create likely result of requeue */
4559 debug("Attempt by %pJ to create duplicate persistent burst buffer named %s",
4560 job_ptr, buf_ptr->name);
4561 buf_ptr->create = false; /* Creation complete */
4562 if (bb_job->persist_add >= bb_alloc->size) {
4563 bb_job->persist_add -= bb_alloc->size;
4564 } else {
4565 error("%s: %s: Persistent buffer size underflow for %pJ",
4566 plugin_type, __func__, job_ptr);
4567 bb_job->persist_add = 0;
4568 }
4569 continue;
4570 }
4571 rc++;
4572 if (!buf_ptr->pool) {
4573 buf_ptr->pool =
4574 xstrdup(bb_state.bb_config.default_pool);
4575 }
4576 bb_limit_add(job_ptr->user_id, buf_ptr->size,
4577 buf_ptr->pool, &bb_state, true);
4578 bb_job->state = BB_STATE_ALLOCATING;
4579 buf_ptr->state = BB_STATE_ALLOCATING;
4580 create_args = xmalloc(sizeof(create_buf_data_t));
4581 create_args->access = xstrdup(buf_ptr->access);
4582 create_args->job_id = job_ptr->job_id;
4583 create_args->name = xstrdup(buf_ptr->name);
4584 create_args->pool = xstrdup(buf_ptr->pool);
4585 create_args->size = buf_ptr->size;
4586 create_args->type = xstrdup(buf_ptr->type);
4587 create_args->user_id = job_ptr->user_id;
4588
4589 slurm_thread_create(&tid, _create_persistent,
4590 create_args);
4591 } else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
4592 buf_ptr->destroy && job_ready) {
4593 /* Delete the buffer */
4594 bb_alloc = bb_find_name_rec(buf_ptr->name,
4595 job_ptr->user_id,
4596 &bb_state);
4597 if (!bb_alloc) {
4598 /* Ignore request if named buffer not found */
4599 info("%s: destroy_persistent: No burst buffer with name '%s' found for %pJ",
4600 plugin_type, buf_ptr->name, job_ptr);
4601 continue;
4602 }
4603 rc++;
4604 if ((bb_alloc->user_id != job_ptr->user_id) &&
4605 !validate_super_user(job_ptr->user_id)) {
4606 info("%s: destroy_persistent: Attempt by user %u %pJ to destroy buffer %s owned by user %u",
4607 plugin_type, job_ptr->user_id, job_ptr,
4608 buf_ptr->name, bb_alloc->user_id);
4609 job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4610 xstrfmtcat(job_ptr->state_desc,
4611 "%s: Delete buffer %s permission "
4612 "denied",
4613 plugin_type, buf_ptr->name);
4614 job_ptr->priority = 0; /* Hold job */
4615 continue;
4616 }
4617
4618 bb_job->state = BB_STATE_DELETING;
4619 buf_ptr->state = BB_STATE_DELETING;
4620 create_args = xmalloc(sizeof(create_buf_data_t));
4621 create_args->hurry = buf_ptr->hurry;
4622 create_args->job_id = job_ptr->job_id;
4623 hash_inx = job_ptr->job_id % 10;
4624 xstrfmtcat(create_args->job_script,
4625 "%s/hash.%d/job.%u/script",
4626 state_save_loc, hash_inx, job_ptr->job_id);
4627 create_args->name = xstrdup(buf_ptr->name);
4628 create_args->user_id = job_ptr->user_id;
4629
4630 slurm_thread_create(&tid, _destroy_persistent,
4631 create_args);
4632 } else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
4633 buf_ptr->destroy) {
4634 rc++;
4635 } else if ((buf_ptr->flags != BB_FLAG_BB_OP) &&
4636 buf_ptr->use) {
4637 /*
4638 * Persistent buffer not created or destroyed, but used.
4639 * Just check for existence
4640 */
4641 bb_alloc = bb_find_name_rec(buf_ptr->name,
4642 job_ptr->user_id,
4643 &bb_state);
4644 if (bb_alloc && (bb_alloc->state == BB_STATE_ALLOCATED))
4645 bb_job->state = BB_STATE_ALLOCATED;
4646 else
4647 rc++;
4648 }
4649 }
4650
4651 return rc;
4652 }
4653
4654 /* Test for the existence of persistent burst buffers to be used (but not
4655 * created) by this job. Return true of they are all ready */
4656 static bool _test_persistent_use_ready(bb_job_t *bb_job,
4657 job_record_t *job_ptr)
4658 {
4659 int i, not_ready_cnt = 0;
4660 bb_alloc_t *bb_alloc;
4661 bb_buf_t *buf_ptr;
4662
4663 xassert(bb_job);
4664 for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4665 i++, buf_ptr++) {
4666 if (buf_ptr->create || buf_ptr->destroy)
4667 continue;
4668 bb_alloc = bb_find_name_rec(buf_ptr->name, job_ptr->user_id,
4669 &bb_state);
4670 if (bb_alloc && (bb_alloc->state == BB_STATE_ALLOCATED)) {
4671 bb_job->state = BB_STATE_ALLOCATED;
4672 } else {
4673 not_ready_cnt++;
4674 break;
4675 }
4676 }
4677 if (not_ready_cnt != 0)
4678 return false;
4679 return true;
4680 }
4681
4682 /* Reset data structures based upon a change in buffer state
4683 * IN user_id - User effected
4684 * IN job_id - Job effected
4685 * IN name - Buffer name
4686 * IN new_state - New buffer state
4687 * IN buf_size - Size of created burst buffer only, used to decrement remaining
4688 * space requirement for the job
4689 */
4690 static void _reset_buf_state(uint32_t user_id, uint32_t job_id, char *name,
4691 int new_state, uint64_t buf_size)
4692 {
4693 bb_buf_t *buf_ptr;
4694 bb_job_t *bb_job;
4695 int i, old_state;
4696 bool active_buf = false;
4697
4698 bb_job = bb_job_find(&bb_state, job_id);
4699 if (!bb_job) {
4700 error("%s: %s: Could not find job record for JobId=%u",
4701 plugin_type, __func__, job_id);
4702 return;
4703 }
4704
4705 /* Update the buffer's state in job record */
4706 for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4707 i++, buf_ptr++) {
4708 if (xstrcmp(name, buf_ptr->name))
4709 continue;
4710 old_state = buf_ptr->state;
4711 buf_ptr->state = new_state;
4712 if ((old_state == BB_STATE_ALLOCATING) &&
4713 (new_state == BB_STATE_PENDING)) {
4714 bb_limit_rem(user_id, buf_ptr->size, buf_ptr->pool,
4715 &bb_state);
4716 }
4717 if ((old_state == BB_STATE_DELETING) &&
4718 (new_state == BB_STATE_PENDING)) {
4719 bb_limit_rem(user_id, buf_ptr->size, buf_ptr->pool,
4720 &bb_state);
4721 }
4722 if ((old_state == BB_STATE_ALLOCATING) &&
4723 (new_state == BB_STATE_ALLOCATED) &&
4724 ((name[0] < '0') || (name[0] > '9'))) {
4725 buf_ptr->create = false; /* Buffer creation complete */
4726 if (bb_job->persist_add >= buf_size) {
4727 bb_job->persist_add -= buf_size;
4728 } else {
4729 error("%s: %s: Persistent buffer size underflow for JobId=%u",
4730 plugin_type, __func__, job_id);
4731 bb_job->persist_add = 0;
4732 }
4733 }
4734 break;
4735 }
4736
4737 for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4738 i++, buf_ptr++) {
4739 old_state = buf_ptr->state;
4740 if ((old_state == BB_STATE_PENDING) ||
4741 (old_state == BB_STATE_ALLOCATING) ||
4742 (old_state == BB_STATE_DELETING) ||
4743 (old_state == BB_STATE_TEARDOWN) ||
4744 (old_state == BB_STATE_TEARDOWN_FAIL))
4745 active_buf = true;
4746 break;
4747 }
4748 if (!active_buf) {
4749 if (bb_job->state == BB_STATE_ALLOCATING)
4750 bb_job->state = BB_STATE_ALLOCATED;
4751 else if (bb_job->state == BB_STATE_DELETING)
4752 bb_job->state = BB_STATE_DELETED;
4753 queue_job_scheduler();
4754 }
4755 }
4756
4757 /* Create a persistent burst buffer based upon user specifications. */
4758 static void *_create_persistent(void *x)
4759 {
4760 slurmctld_lock_t job_write_lock =
4761 { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
4762 create_buf_data_t *create_args = (create_buf_data_t *) x;
4763 job_record_t *job_ptr;
4764 bb_alloc_t *bb_alloc;
4765 char **script_argv, *resp_msg;
4766 int i, status = 0;
4767 uint32_t timeout;
4768 DEF_TIMERS;
4769 track_script_rec_add(create_args->job_id, 0, pthread_self());
4770
4771 script_argv = xcalloc(20, sizeof(char *)); /* NULL terminated */
4772 script_argv[0] = xstrdup("dw_wlm_cli");
4773 script_argv[1] = xstrdup("--function");
4774 script_argv[2] = xstrdup("create_persistent");
4775 script_argv[3] = xstrdup("-c");
4776 script_argv[4] = xstrdup("CLI");
4777 script_argv[5] = xstrdup("-t"); /* name */
4778 script_argv[6] = xstrdup(create_args->name);
4779 script_argv[7] = xstrdup("-u"); /* user iD */
4780 xstrfmtcat(script_argv[8], "%u", create_args->user_id);
4781 script_argv[9] = xstrdup("-C"); /* configuration */
4782 xstrfmtcat(script_argv[10], "%s:%"PRIu64"",
4783 create_args->pool, create_args->size);
4784 slurm_mutex_lock(&bb_state.bb_mutex);
4785 timeout = bb_state.bb_config.other_timeout * 1000;
4786 slurm_mutex_unlock(&bb_state.bb_mutex);
4787 i = 11;
4788 if (create_args->access) {
4789 script_argv[i++] = xstrdup("-a");
4790 script_argv[i++] = xstrdup(create_args->access);
4791 }
4792 if (create_args->type) {
4793 script_argv[i++] = xstrdup("-T");
4794 script_argv[i++] = xstrdup(create_args->type);
4795 }
4796 /* NOTE: There is an optional group ID parameter available and
4797 * currently not used by Slurm */
4798
4799 START_TIMER;
4800 resp_msg = run_command("create_persistent",
4801 bb_state.bb_config.get_sys_state,
4802 script_argv, timeout, pthread_self(),
4803 &status);
4804 _log_script_argv(script_argv, resp_msg);
4805 free_command_argv(script_argv);
4806 END_TIMER;
4807 info("create_persistent of %s ran for %s",
4808 create_args->name, TIME_STR);
4809
4810 if (track_script_broadcast(pthread_self(), status)) {
4811 /* I was killed by slurmtrack, bail out right now */
4812 info("%s:%s: create_persistent for JobId=%u terminated by slurmctld",
4813 plugin_type, __func__, create_args->job_id);
4814 xfree(resp_msg);
4815 _free_create_args(create_args);
4816 track_script_remove(pthread_self());
4817 return NULL;
4818 }
4819 /* track_script_reset_cpid(pthread_self(), 0); */
4820
4821 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4822 trigger_burst_buffer();
4823 error("%s: %s: For JobId=%u Name=%s status:%u response:%s",
4824 plugin_type, __func__, create_args->job_id,
4825 create_args->name, status, resp_msg);
4826 lock_slurmctld(job_write_lock);
4827 job_ptr = find_job_record(create_args->job_id);
4828 if (!job_ptr) {
4829 error("%s: %s: unable to find job record for JobId=%u",
4830 plugin_type, __func__, create_args->job_id);
4831 } else {
4832 job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
4833 job_ptr->priority = 0;
4834 xfree(job_ptr->state_desc);
4835 xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
4836 plugin_type, __func__, resp_msg);
4837 _update_system_comment(job_ptr, "create_persistent",
4838 resp_msg, 0);
4839 }
4840 slurm_mutex_lock(&bb_state.bb_mutex);
4841 _reset_buf_state(create_args->user_id, create_args->job_id,
4842 create_args->name, BB_STATE_PENDING, 0);
4843 bb_state.last_update_time = time(NULL);
4844 slurm_mutex_unlock(&bb_state.bb_mutex);
4845 unlock_slurmctld(job_write_lock);
4846 } else if (resp_msg && strstr(resp_msg, "created")) {
4847 assoc_mgr_lock_t assoc_locks =
4848 { .assoc = READ_LOCK, .qos = READ_LOCK };
4849 lock_slurmctld(job_write_lock);
4850 job_ptr = find_job_record(create_args->job_id);
4851 if (!job_ptr) {
4852 error("%s: %s: unable to find job record for JobId=%u",
4853 plugin_type, __func__, create_args->job_id);
4854 }
4855 assoc_mgr_lock(&assoc_locks);
4856 slurm_mutex_lock(&bb_state.bb_mutex);
4857 _reset_buf_state(create_args->user_id, create_args->job_id,
4858 create_args->name, BB_STATE_ALLOCATED,
4859 create_args->size);
4860 bb_alloc = bb_alloc_name_rec(&bb_state, create_args->name,
4861 create_args->user_id);
4862 bb_alloc->size = create_args->size;
4863 bb_alloc->pool = xstrdup(create_args->pool);
4864 if (job_ptr) {
4865 bb_alloc->account = xstrdup(job_ptr->account);
4866 if (job_ptr->assoc_ptr) {
4867 /* Only add the direct association id
4868 * here, we don't need to keep track
4869 * of the tree.
4870 */
4871 slurmdb_assoc_rec_t *assoc = job_ptr->assoc_ptr;
4872 bb_alloc->assoc_ptr = assoc;
4873 xfree(bb_alloc->assocs);
4874 bb_alloc->assocs = xstrdup_printf(
4875 ",%u,", assoc->id);
4876 }
4877 if (job_ptr->qos_ptr) {
4878 slurmdb_qos_rec_t *qos_ptr = job_ptr->qos_ptr;
4879 bb_alloc->qos_ptr = qos_ptr;
4880 bb_alloc->qos = xstrdup(qos_ptr->name);
4881 }
4882
4883 if (job_ptr->part_ptr) {
4884 bb_alloc->partition =
4885 xstrdup(job_ptr->part_ptr->name);
4886 }
4887 }
4888 if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
4889 bb_alloc->create_time = time(NULL);
4890 bb_alloc->id = ++last_persistent_id;
4891 } else {
4892 bb_sessions_t *sessions;
4893 int num_sessions = 0;
4894 sessions = _bb_get_sessions(&num_sessions, &bb_state,
4895 timeout);
4896 for (i = 0; i < num_sessions; i++) {
4897 if (xstrcmp(sessions[i].token,
4898 create_args->name))
4899 continue;
4900 bb_alloc->create_time = sessions[i].created;
4901 bb_alloc->id = sessions[i].id;
4902 break;
4903 }
4904 _bb_free_sessions(sessions, num_sessions);
4905 }
4906 (void) bb_post_persist_create(job_ptr, bb_alloc, &bb_state);
4907 bb_state.last_update_time = time(NULL);
4908 slurm_mutex_unlock(&bb_state.bb_mutex);
4909 assoc_mgr_unlock(&assoc_locks);
4910 unlock_slurmctld(job_write_lock);
4911 }
4912 xfree(resp_msg);
4913 _free_create_args(create_args);
4914
4915 track_script_remove(pthread_self());
4916
4917 return NULL;
4918 }
4919
4920 /* Destroy a persistent burst buffer */
4921 static void *_destroy_persistent(void *x)
4922 {
4923 slurmctld_lock_t job_write_lock =
4924 { NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
4925 create_buf_data_t *destroy_args = (create_buf_data_t *) x;
4926 job_record_t *job_ptr;
4927 bb_alloc_t *bb_alloc;
4928 char **script_argv, *resp_msg;
4929 int status = 0;
4930 uint32_t timeout;
4931 DEF_TIMERS;
4932 track_script_rec_add(destroy_args->job_id, 0, pthread_self());
4933
4934 slurm_mutex_lock(&bb_state.bb_mutex);
4935 bb_alloc = bb_find_name_rec(destroy_args->name, destroy_args->user_id,
4936 &bb_state);
4937 if (!bb_alloc) {
4938 info("%s: destroy_persistent: No burst buffer with name '%s' found for JobId=%u",
4939 plugin_type, destroy_args->name, destroy_args->job_id);
4940 }
4941 timeout = bb_state.bb_config.other_timeout * 1000;
4942 slurm_mutex_unlock(&bb_state.bb_mutex);
4943
4944 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
4945 script_argv[0] = xstrdup("dw_wlm_cli");
4946 script_argv[1] = xstrdup("--function");
4947 script_argv[2] = xstrdup("teardown");
4948 script_argv[3] = xstrdup("--token"); /* name */
4949 script_argv[4] = xstrdup(destroy_args->name);
4950 script_argv[5] = xstrdup("--job"); /* script */
4951 script_argv[6] = xstrdup(destroy_args->job_script);
4952 if (destroy_args->hurry)
4953 script_argv[7] = xstrdup("--hurry");
4954
4955 START_TIMER;
4956 resp_msg = run_command("destroy_persistent",
4957 bb_state.bb_config.get_sys_state,
4958 script_argv, timeout, pthread_self(),
4959 &status);
4960 _log_script_argv(script_argv, resp_msg);
4961 free_command_argv(script_argv);
4962 END_TIMER;
4963 info("destroy_persistent of %s ran for %s",
4964 destroy_args->name, TIME_STR);
4965
4966 if (track_script_broadcast(pthread_self(), status)) {
4967 /* I was killed by slurmtrack, bail out right now */
4968 info("%s: %s: destroy_persistent for JobId=%u terminated by slurmctld",
4969 plugin_type, __func__, destroy_args->job_id);
4970 xfree(resp_msg);
4971 _free_create_args(destroy_args);
4972
4973 track_script_remove(pthread_self());
4974 return NULL;
4975 }
4976 /* track_script_reset_cpid(pthread_self(), 0); */
4977
4978 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4979 trigger_burst_buffer();
4980 error("%s: %s: destroy_persistent for JobId=%u Name=%s status:%u response:%s",
4981 plugin_type, __func__, destroy_args->job_id,
4982 destroy_args->name, status, resp_msg);
4983 lock_slurmctld(job_write_lock);
4984 job_ptr = find_job_record(destroy_args->job_id);
4985 if (!job_ptr) {
4986 error("%s: %s: unable to find job record for JobId=%u",
4987 plugin_type, __func__, destroy_args->job_id);
4988 } else {
4989 _update_system_comment(job_ptr, "teardown",
4990 resp_msg, 0);
4991 job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
4992 xfree(job_ptr->state_desc);
4993 xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
4994 plugin_type, __func__, resp_msg);
4995 }
4996 slurm_mutex_lock(&bb_state.bb_mutex);
4997 _reset_buf_state(destroy_args->user_id, destroy_args->job_id,
4998 destroy_args->name, BB_STATE_PENDING, 0);
4999 bb_state.last_update_time = time(NULL);
5000 slurm_mutex_unlock(&bb_state.bb_mutex);
5001 unlock_slurmctld(job_write_lock);
5002 } else {
5003 assoc_mgr_lock_t assoc_locks =
5004 { .assoc = READ_LOCK, .qos = READ_LOCK };
5005 /* assoc_mgr needs locking to call bb_post_persist_delete */
5006 if (bb_alloc)
5007 assoc_mgr_lock(&assoc_locks);
5008 slurm_mutex_lock(&bb_state.bb_mutex);
5009 _reset_buf_state(destroy_args->user_id, destroy_args->job_id,
5010 destroy_args->name, BB_STATE_DELETED, 0);
5011
5012 /* Modify internal buffer record for purging */
5013 if (bb_alloc) {
5014 bb_alloc->state = BB_STATE_COMPLETE;
5015 bb_alloc->job_id = destroy_args->job_id;
5016 bb_alloc->state_time = time(NULL);
5017 bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
5018 bb_alloc->pool, &bb_state);
5019
5020 (void) bb_post_persist_delete(bb_alloc, &bb_state);
5021
5022 (void) bb_free_alloc_rec(&bb_state, bb_alloc);
5023 }
5024 bb_state.last_update_time = time(NULL);
5025 slurm_mutex_unlock(&bb_state.bb_mutex);
5026 if (bb_alloc)
5027 assoc_mgr_unlock(&assoc_locks);
5028 }
5029 xfree(resp_msg);
5030 _free_create_args(destroy_args);
5031
5032 track_script_remove(pthread_self());
5033
5034 return NULL;
5035 }
5036
5037 /* _bb_get_configs()
5038 *
5039 * Handle the JSON stream with configuration info (instance use details).
5040 */
5041 static bb_configs_t *
5042 _bb_get_configs(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5043 {
5044 bb_configs_t *ents = NULL;
5045 json_object *j;
5046 json_object_iter iter;
5047 int status = 0;
5048 DEF_TIMERS;
5049 char *resp_msg;
5050 char **script_argv;
5051
5052 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
5053 script_argv[0] = xstrdup("dw_wlm_cli");
5054 script_argv[1] = xstrdup("--function");
5055 script_argv[2] = xstrdup("show_configurations");
5056
5057 START_TIMER;
5058 resp_msg = run_command("show_configurations",
5059 state_ptr->bb_config.get_sys_state,
5060 script_argv, timeout, 0, &status);
5061 END_TIMER;
5062 if (bb_state.bb_config.debug_flag)
5063 debug("%s: %s: show_configurations ran for %s",
5064 plugin_type, __func__, TIME_STR);
5065 _log_script_argv(script_argv, resp_msg);
5066 free_command_argv(script_argv);
5067 #if 0
5068 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5069 #else
5070 //FIXME: Cray bug: API returning error if no configurations, use above code when fixed
5071 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
5072 (!resp_msg || (resp_msg[0] != '{'))) {
5073 #endif
5074 trigger_burst_buffer();
5075 error("%s: %s: show_configurations status:%u response:%s",
5076 plugin_type, __func__, status, resp_msg);
5077 }
5078 if (resp_msg == NULL) {
5079 info("%s: %s: %s returned no configurations",
5080 plugin_type, __func__, state_ptr->bb_config.get_sys_state);
5081 return ents;
5082 }
5083
5084
5085 _python2json(resp_msg);
5086 j = json_tokener_parse(resp_msg);
5087 if (j == NULL) {
5088 error("%s: %s: json parser failed on \"%s\"",
5089 plugin_type, __func__, resp_msg);
5090 xfree(resp_msg);
5091 return ents;
5092 }
5093 xfree(resp_msg);
5094
5095 json_object_object_foreachC(j, iter) {
5096 if (ents) {
5097 error("%s: %s: Multiple configuration objects",
5098 plugin_type, __func__);
5099 break;
5100 }
5101 ents = _json_parse_configs_array(j, iter.key, num_ent);
5102 }
5103 json_object_put(j); /* Frees json memory */
5104
5105 return ents;
5106 }
5107
5108 /* _bb_get_instances()
5109 *
5110 * Handle the JSON stream with instance info (resource reservations).
5111 */
5112 static bb_instances_t *
5113 _bb_get_instances(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5114 {
5115 bb_instances_t *ents = NULL;
5116 json_object *j;
5117 json_object_iter iter;
5118 int status = 0;
5119 DEF_TIMERS;
5120 char *resp_msg;
5121 char **script_argv;
5122
5123 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
5124 script_argv[0] = xstrdup("dw_wlm_cli");
5125 script_argv[1] = xstrdup("--function");
5126 script_argv[2] = xstrdup("show_instances");
5127
5128 START_TIMER;
5129 resp_msg = run_command("show_instances",
5130 state_ptr->bb_config.get_sys_state,
5131 script_argv, timeout, 0, &status);
5132 END_TIMER;
5133 if (bb_state.bb_config.debug_flag)
5134 debug("%s: %s: show_instances ran for %s",
5135 plugin_type, __func__, TIME_STR);
5136 _log_script_argv(script_argv, resp_msg);
5137 free_command_argv(script_argv);
5138 #if 0
5139 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5140 #else
5141 //FIXME: Cray bug: API returning error if no instances, use above code when fixed
5142 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
5143 (!resp_msg || (resp_msg[0] != '{'))) {
5144 #endif
5145 trigger_burst_buffer();
5146 error("%s: %s: show_instances status:%u response:%s",
5147 plugin_type, __func__, status, resp_msg);
5148 }
5149 if (resp_msg == NULL) {
5150 info("%s: %s: %s returned no instances",
5151 plugin_type, __func__, state_ptr->bb_config.get_sys_state);
5152 return ents;
5153 }
5154
5155 _python2json(resp_msg);
5156 j = json_tokener_parse(resp_msg);
5157 if (j == NULL) {
5158 error("%s: %s: json parser failed on \"%s\"",
5159 plugin_type, __func__, resp_msg);
5160 xfree(resp_msg);
5161 return ents;
5162 }
5163 xfree(resp_msg);
5164
5165 json_object_object_foreachC(j, iter) {
5166 if (ents) {
5167 error("%s: %s: Multiple instance objects",
5168 plugin_type, __func__);
5169 break;
5170 }
5171 ents = _json_parse_instances_array(j, iter.key, num_ent);
5172 }
5173 json_object_put(j); /* Frees json memory */
5174
5175 return ents;
5176 }
5177
5178 /* _bb_get_pools()
5179 *
5180 * Handle the JSON stream with resource pool info (available resource type).
5181 */
5182 static bb_pools_t *
5183 _bb_get_pools(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5184 {
5185 bb_pools_t *ents = NULL;
5186 json_object *j;
5187 json_object_iter iter;
5188 int status = 0;
5189 DEF_TIMERS;
5190 char *resp_msg;
5191 char **script_argv;
5192
5193 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
5194 script_argv[0] = xstrdup("dw_wlm_cli");
5195 script_argv[1] = xstrdup("--function");
5196 script_argv[2] = xstrdup("pools");
5197
5198 START_TIMER;
5199 resp_msg = run_command("pools",
5200 state_ptr->bb_config.get_sys_state,
5201 script_argv, timeout, 0, &status);
5202 END_TIMER;
5203 if (bb_state.bb_config.debug_flag) {
5204 /* Only log pools data if different to limit volume of logs */
5205 static uint32_t last_csum = 0;
5206 uint32_t i, resp_csum = 0;
5207 debug("%s: %s: pools ran for %s",
5208 plugin_type, __func__, TIME_STR);
5209 for (i = 0; resp_msg[i]; i++)
5210 resp_csum += ((i * resp_msg[i]) % 1000000);
5211 if (last_csum != resp_csum)
5212 _log_script_argv(script_argv, resp_msg);
5213 last_csum = resp_csum;
5214 }
5215 free_command_argv(script_argv);
5216 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5217 trigger_burst_buffer();
5218 error("%s: %s: pools status:%u response:%s",
5219 plugin_type, __func__, status, resp_msg);
5220 }
5221 if (resp_msg == NULL) {
5222 error("%s: %s: %s returned no pools",
5223 plugin_type, __func__,
5224 state_ptr->bb_config.get_sys_state);
5225 return ents;
5226 }
5227
5228 _python2json(resp_msg);
5229 j = json_tokener_parse(resp_msg);
5230 if (j == NULL) {
5231 error("%s: %s: json parser failed on \"%s\"",
5232 plugin_type, __func__, resp_msg);
5233 xfree(resp_msg);
5234 return ents;
5235 }
5236 xfree(resp_msg);
5237
5238 json_object_object_foreachC(j, iter) {
5239 if (ents) {
5240 error("%s: %s: Multiple pool objects",
5241 plugin_type, __func__);
5242 break;
5243 }
5244 ents = _json_parse_pools_array(j, iter.key, num_ent);
5245 }
5246 json_object_put(j); /* Frees json memory */
5247
5248 return ents;
5249 }
5250
5251 static bb_sessions_t *
5252 _bb_get_sessions(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5253 {
5254 bb_sessions_t *ents = NULL;
5255 json_object *j;
5256 json_object_iter iter;
5257 int status = 0;
5258 DEF_TIMERS;
5259 char *resp_msg;
5260 char **script_argv;
5261
5262 script_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
5263 script_argv[0] = xstrdup("dw_wlm_cli");
5264 script_argv[1] = xstrdup("--function");
5265 script_argv[2] = xstrdup("show_sessions");
5266
5267 START_TIMER;
5268 resp_msg = run_command("show_sessions",
5269 state_ptr->bb_config.get_sys_state,
5270 script_argv, timeout, 0, &status);
5271 END_TIMER;
5272 if (bb_state.bb_config.debug_flag)
5273 debug("%s: %s: show_sessions ran for %s",
5274 plugin_type, __func__, TIME_STR);
5275 _log_script_argv(script_argv, resp_msg);
5276 free_command_argv(script_argv);
5277 #if 0
5278 if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5279 #else
5280 //FIXME: Cray bug: API returning error if no sessions, use above code when fixed
5281 if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
5282 (!resp_msg || (resp_msg[0] != '{'))) {
5283 #endif
5284 trigger_burst_buffer();
5285 error("%s: %s: show_sessions status:%u response:%s",
5286 plugin_type, __func__, status, resp_msg);
5287 }
5288 if (resp_msg == NULL) {
5289 info("%s: %s: %s returned no sessions",
5290 plugin_type, __func__, state_ptr->bb_config.get_sys_state);
5291 free_command_argv(script_argv);
5292 return ents;
5293 }
5294
5295 _python2json(resp_msg);
5296 j = json_tokener_parse(resp_msg);
5297 if (j == NULL) {
5298 error("%s: %s: json parser failed on \"%s\"",
5299 plugin_type, __func__, resp_msg);
5300 xfree(resp_msg);
5301 return ents;
5302 }
5303 xfree(resp_msg);
5304
5305 json_object_object_foreachC(j, iter) {
5306 if (ents) {
5307 error("%s: %s: Multiple session objects",
5308 plugin_type, __func__);
5309 break;
5310 }
5311 ents = _json_parse_sessions_array(j, iter.key, num_ent);
5312 }
5313 json_object_put(j); /* Frees json memory */
5314
5315 return ents;
5316 }
5317
5318 /* _bb_free_configs()
5319 */
5320 static void
5321 _bb_free_configs(bb_configs_t *ents, int num_ent)
5322 {
5323 xfree(ents);
5324 }
5325
5326 /* _bb_free_instances()
5327 */
5328 static void
5329 _bb_free_instances(bb_instances_t *ents, int num_ent)
5330 {
5331 xfree(ents);
5332 }
5333
5334 /* _bb_free_pools()
5335 */
5336 static void
5337 _bb_free_pools(bb_pools_t *ents, int num_ent)
5338 {
5339 int i;
5340
5341 for (i = 0; i < num_ent; i++) {
5342 xfree(ents[i].id);
5343 xfree(ents[i].units);
5344 }
5345
5346 xfree(ents);
5347 }
5348
5349 /* _bb_free_sessions()
5350 */
5351 static void
5352 _bb_free_sessions(bb_sessions_t *ents, int num_ent)
5353 {
5354 int i;
5355
5356 for (i = 0; i < num_ent; i++) {
5357 xfree(ents[i].token);
5358 }
5359
5360 xfree(ents);
5361 }
5362
5363 /* _json_parse_configs_array()
5364 */
5365 static bb_configs_t *
5366 _json_parse_configs_array(json_object *jobj, char *key, int *num)
5367 {
5368 json_object *jarray;
5369 int i;
5370 json_object *jvalue;
5371 bb_configs_t *ents;
5372
5373 jarray = jobj;
5374 json_object_object_get_ex(jobj, key, &jarray);
5375
5376 *num = json_object_array_length(jarray);
5377 ents = xcalloc(*num, sizeof(bb_configs_t));
5378
5379 for (i = 0; i < *num; i++) {
5380 jvalue = json_object_array_get_idx(jarray, i);
5381 _json_parse_configs_object(jvalue, &ents[i]);
5382 }
5383
5384 return ents;
5385 }
5386
5387 /* _json_parse_instances_array()
5388 */
5389 static bb_instances_t *
5390 _json_parse_instances_array(json_object *jobj, char *key, int *num)
5391 {
5392 json_object *jarray;
5393 int i;
5394 json_object *jvalue;
5395 bb_instances_t *ents;
5396
5397 jarray = jobj;
5398 json_object_object_get_ex(jobj, key, &jarray);
5399
5400 *num = json_object_array_length(jarray);
5401 ents = xcalloc(*num, sizeof(bb_instances_t));
5402
5403 for (i = 0; i < *num; i++) {
5404 jvalue = json_object_array_get_idx(jarray, i);
5405 _json_parse_instances_object(jvalue, &ents[i]);
5406 }
5407
5408 return ents;
5409 }
5410
5411 /* _json_parse_pools_array()
5412 */
5413 static bb_pools_t *
5414 _json_parse_pools_array(json_object *jobj, char *key, int *num)
5415 {
5416 json_object *jarray;
5417 int i;
5418 json_object *jvalue;
5419 bb_pools_t *ents;
5420
5421 jarray = jobj;
5422 json_object_object_get_ex(jobj, key, &jarray);
5423
5424 *num = json_object_array_length(jarray);
5425 ents = xcalloc(*num, sizeof(bb_pools_t));
5426
5427 for (i = 0; i < *num; i++) {
5428 jvalue = json_object_array_get_idx(jarray, i);
5429 _json_parse_pools_object(jvalue, &ents[i]);
5430 }
5431
5432 return ents;
5433 }
5434
5435 /* _json_parse_sessions_array()
5436 */
5437 static bb_sessions_t *
5438 _json_parse_sessions_array(json_object *jobj, char *key, int *num)
5439 {
5440 json_object *jarray;
5441 int i;
5442 json_object *jvalue;
5443 bb_sessions_t *ents;
5444
5445 jarray = jobj;
5446 json_object_object_get_ex(jobj, key, &jarray);
5447
5448 *num = json_object_array_length(jarray);
5449 ents = xcalloc(*num, sizeof(bb_sessions_t));
5450
5451 for (i = 0; i < *num; i++) {
5452 jvalue = json_object_array_get_idx(jarray, i);
5453 _json_parse_sessions_object(jvalue, &ents[i]);
5454 }
5455
5456 return ents;
5457 }
5458
5459 /* Parse "links" object in the "configuration" object */
5460 static void
5461 _parse_config_links(json_object *instance, bb_configs_t *ent)
5462 {
5463 enum json_type type;
5464 struct json_object_iter iter;
5465 int x;
5466
5467 json_object_object_foreachC(instance, iter) {
5468 type = json_object_get_type(iter.val);
5469 switch (type) {
5470 case json_type_int:
5471 x = json_object_get_int64(iter.val);
5472 if (!xstrcmp(iter.key, "instance"))
5473 ent->instance = x;
5474 break;
5475 default:
5476 break;
5477 }
5478 }
5479 }
5480
5481 /* _json_parse_configs_object()
5482 */
5483 static void
5484 _json_parse_configs_object(json_object *jobj, bb_configs_t *ent)
5485 {
5486 enum json_type type;
5487 struct json_object_iter iter;
5488 int64_t x;
5489
5490 json_object_object_foreachC(jobj, iter) {
5491 type = json_object_get_type(iter.val);
5492 switch (type) {
5493 case json_type_object:
5494 if (xstrcmp(iter.key, "links") == 0)
5495 _parse_config_links(iter.val, ent);
5496 break;
5497 case json_type_int:
5498 x = json_object_get_int64(iter.val);
5499 if (xstrcmp(iter.key, "id") == 0) {
5500 ent->id = x;
5501 }
5502 break;
5503 default:
5504 break;
5505 }
5506 }
5507 }
5508
5509 /* Parse "capacity" object in the "instance" object */
5510 static void
5511 _parse_instance_capacity(json_object *instance, bb_instances_t *ent)
5512 {
5513 enum json_type type;
5514 struct json_object_iter iter;
5515 int64_t x;
5516
5517 json_object_object_foreachC(instance, iter) {
5518 type = json_object_get_type(iter.val);
5519 switch (type) {
5520 case json_type_int:
5521 x = json_object_get_int64(iter.val);
5522 if (!xstrcmp(iter.key, "bytes"))
5523 ent->bytes = x;
5524 break;
5525 default:
5526 break;
5527 }
5528 }
5529 }
5530
5531 /* Parse "links" object in the "instance" object */
5532 static void
5533 _parse_instance_links(json_object *instance, bb_instances_t *ent)
5534 {
5535 enum json_type type;
5536 struct json_object_iter iter;
5537 int64_t x;
5538
5539 json_object_object_foreachC(instance, iter) {
5540 type = json_object_get_type(iter.val);
5541 switch (type) {
5542 case json_type_int:
5543 x = json_object_get_int64(iter.val);
5544 if (!xstrcmp(iter.key, "session"))
5545 ent->session = x;
5546 break;
5547 default:
5548 break;
5549 }
5550 }
5551 }
5552
5553 /* _json_parse_instances_object()
5554 */
5555 static void
5556 _json_parse_instances_object(json_object *jobj, bb_instances_t *ent)
5557 {
5558 enum json_type type;
5559 struct json_object_iter iter;
5560 int64_t x;
5561
5562 json_object_object_foreachC(jobj, iter) {
5563 type = json_object_get_type(iter.val);
5564 switch (type) {
5565 case json_type_object:
5566 if (xstrcmp(iter.key, "capacity") == 0)
5567 _parse_instance_capacity(iter.val, ent);
5568 else if (xstrcmp(iter.key, "links") == 0)
5569 _parse_instance_links(iter.val, ent);
5570 break;
5571 case json_type_int:
5572 x = json_object_get_int64(iter.val);
5573 if (xstrcmp(iter.key, "id") == 0) {
5574 ent->id = x;
5575 }
5576 break;
5577 default:
5578 break;
5579 }
5580 }
5581 }
5582
5583 /* _json_parse_pools_object()
5584 */
5585 static void
5586 _json_parse_pools_object(json_object *jobj, bb_pools_t *ent)
5587 {
5588 enum json_type type;
5589 struct json_object_iter iter;
5590 int64_t x;
5591 const char *p;
5592
5593 json_object_object_foreachC(jobj, iter) {
5594 type = json_object_get_type(iter.val);
5595 switch (type) {
5596 case json_type_int:
5597 x = json_object_get_int64(iter.val);
5598 if (xstrcmp(iter.key, "granularity") == 0) {
5599 ent->granularity = x;
5600 } else if (xstrcmp(iter.key, "quantity") == 0) {
5601 ent->quantity = x;
5602 } else if (xstrcmp(iter.key, "free") == 0) {
5603 ent->free = x;
5604 }
5605 break;
5606 case json_type_string:
5607 p = json_object_get_string(iter.val);
5608 if (xstrcmp(iter.key, "id") == 0) {
5609 ent->id = xstrdup(p);
5610 } else if (xstrcmp(iter.key, "units") == 0) {
5611 ent->units = xstrdup(p);
5612 }
5613 break;
5614 default:
5615 break;
5616 }
5617 }
5618 }
5619
5620 /* _json_parse_session_object()
5621 */
5622 static void
5623 _json_parse_sessions_object(json_object *jobj, bb_sessions_t *ent)
5624 {
5625 enum json_type type;
5626 struct json_object_iter iter;
5627 int64_t x;
5628 const char *p;
5629
5630 json_object_object_foreachC(jobj, iter) {
5631 type = json_object_get_type(iter.val);
5632 switch (type) {
5633 case json_type_int:
5634 x = json_object_get_int64(iter.val);
5635 if (xstrcmp(iter.key, "created") == 0) {
5636 ent->created = x;
5637 } else if (xstrcmp(iter.key, "id") == 0) {
5638 ent->id = x;
5639 } else if (xstrcmp(iter.key, "owner") == 0) {
5640 ent->user_id = x;
5641 }
5642 break;
5643 case json_type_string:
5644 p = json_object_get_string(iter.val);
5645 if (xstrcmp(iter.key, "token") == 0) {
5646 ent->token = xstrdup(p);
5647 }
5648 default:
5649 break;
5650 }
5651 }
5652 }
5653
5654 /*
5655 * Translate a burst buffer string to it's equivalent TRES string
5656 * (e.g. "cray:2G,generic:4M" -> "1004=2048,1005=4")
5657 * Caller must xfree the return value
5658 */
5659 extern char *bb_p_xlate_bb_2_tres_str(char *burst_buffer)
5660 {
5661 char *save_ptr = NULL, *sep, *tmp, *tok;
5662 char *result = NULL;
5663 uint64_t size, total = 0;
5664
5665 if (!burst_buffer || (bb_state.tres_id < 1))
5666 return result;
5667
5668 tmp = xstrdup(burst_buffer);
5669 tok = strtok_r(tmp, ",", &save_ptr);
5670 while (tok) {
5671 sep = strchr(tok, ':');
5672 if (sep) {
5673 if (!xstrncmp(tok, "cray:", 5))
5674 tok += 5;
5675 else
5676 tok = NULL;
5677 }
5678
5679 if (tok) {
5680 uint64_t mb_xlate = 1024 * 1024;
5681 size = bb_get_size_num(tok,
5682 bb_state.bb_config.granularity);
5683 total += (size + mb_xlate - 1) / mb_xlate;
5684 }
5685
5686 tok = strtok_r(NULL, ",", &save_ptr);
5687 }
5688 xfree(tmp);
5689
5690 if (total)
5691 xstrfmtcat(result, "%d=%"PRIu64, bb_state.tres_id, total);
5692
5693 return result;
5694 }
5695